Move output_grabber

This commit is contained in:
Philippe Teuwen 2024-08-07 10:29:40 +02:00
parent 11c1c8490c
commit e3fb13de39
6 changed files with 16 additions and 88 deletions

5
.gitignore vendored
View file

@ -119,4 +119,7 @@ fpga_version_info.c
# local codeql
_codeql*
/codeql
/codeql
# Pyton venvs
venv/

View file

@ -8,6 +8,6 @@ p=pm3.pm3()
print("Device:", p.name)
with out:
p.console("hw status")
for line in out.capturedtext.split('\n'):
for line in out.captured_output.split('\n'):
if "Unique ID" in line:
print(line)

View file

@ -1,78 +0,0 @@
import os
import sys
import threading
import time
# From https://stackoverflow.com/a/29834357
class OutputGrabber(object):
"""
Class used to grab standard output or another stream.
"""
escape_char = "\b"
def __init__(self, stream=None, threaded=False):
self.origstream = stream
self.threaded = threaded
if self.origstream is None:
self.origstream = sys.stdout
self.origstreamfd = self.origstream.fileno()
self.capturedtext = ""
def __enter__(self):
self.start()
return self
def __exit__(self, type, value, traceback):
self.stop()
def start(self):
"""
Start capturing the stream data.
"""
self.capturedtext = ""
# Create a pipe so the stream can be captured:
self.pipe_out, self.pipe_in = os.pipe()
# Save a copy of the stream:
self.streamfd = os.dup(self.origstreamfd)
# Replace the original stream with our write pipe:
os.dup2(self.pipe_in, self.origstreamfd)
if self.threaded:
# Start thread that will read the stream:
self.workerThread = threading.Thread(target=self.readOutput)
self.workerThread.start()
# Make sure that the thread is running and os.read() has executed:
time.sleep(0.01)
def stop(self):
"""
Stop capturing the stream data and save the text in `capturedtext`.
"""
# Print the escape character to make the readOutput method stop:
self.origstream.write(self.escape_char)
# Flush the stream to make sure all our data goes in before
# the escape character:
self.origstream.flush()
if self.threaded:
# wait until the thread finishes so we are sure that
# we have until the last character:
self.workerThread.join()
else:
self.readOutput()
# Close the pipe:
os.close(self.pipe_in)
os.close(self.pipe_out)
# Restore the original stream:
os.dup2(self.streamfd, self.origstreamfd)
# Close the duplicate stream:
os.close(self.streamfd)
def readOutput(self):
"""
Read the stream data (one byte at a time)
and save the text in `capturedtext`.
"""
while True:
char = os.read(self.pipe_out,1).decode(self.origstream.encoding, errors='replace')
if not char or self.escape_char in char:
break
self.capturedtext += char

View file

@ -8,6 +8,6 @@ p=pm3.pm3("/dev/ttyACM0")
print("Device:", p.name)
with out:
p.console("hw status")
for line in out.capturedtext.split('\n'):
for line in out.captured_output.split('\n'):
if "Unique ID" in line:
print(line)

View file

@ -16,7 +16,7 @@ class OutputGrabber(object):
if self.origstream is None:
self.origstream = sys.stdout
self.origstreamfd = self.origstream.fileno()
self.capturedtext = ""
self.captured_output = ""
def __enter__(self):
self.start()
@ -29,7 +29,7 @@ class OutputGrabber(object):
"""
Start capturing the stream data.
"""
self.capturedtext = ""
self.captured_output = ""
# Create a pipe so the stream can be captured:
self.pipe_out, self.pipe_in = os.pipe()
# Save a copy of the stream:
@ -45,7 +45,7 @@ class OutputGrabber(object):
def stop(self):
"""
Stop capturing the stream data and save the text in `capturedtext`.
Stop capturing the stream data and save the text in `captured_output`.
"""
# Print the escape character to make the readOutput method stop:
self.origstream.write(self.escape_char)
@ -69,10 +69,13 @@ class OutputGrabber(object):
def readOutput(self):
"""
Read the stream data (one byte at a time)
and save the text in `capturedtext`.
and save the text in `captured_output`.
"""
while True:
char = os.read(self.pipe_out,1).decode(self.origstream.encoding, errors='replace')
if not char or self.escape_char in char:
break
self.capturedtext += char
self.captured_output += char
if __name__ == "__main__":
print("This is a library, don't use it as a script")

View file

@ -76,5 +76,5 @@ class pm3(object):
# Register pm3 in _pm3:
_pm3.pm3_swigregister(pm3)
if __name__ == "__main__":
print("This is a library, don't use it as a script")