mirror of
https://github.com/RfidResearchGroup/proxmark3.git
synced 2025-01-12 19:23:54 +08:00
79 lines
2.5 KiB
Python
79 lines
2.5 KiB
Python
|
import os
|
||
|
import sys
|
||
|
import threading
|
||
|
import time
|
||
|
|
||
|
# From https://stackoverflow.com/a/29834357
|
||
|
class OutputGrabber(object):
|
||
|
"""
|
||
|
Class used to grab standard output or another stream.
|
||
|
"""
|
||
|
escape_char = "\b"
|
||
|
|
||
|
def __init__(self, stream=None, threaded=False):
|
||
|
self.origstream = stream
|
||
|
self.threaded = threaded
|
||
|
if self.origstream is None:
|
||
|
self.origstream = sys.stdout
|
||
|
self.origstreamfd = self.origstream.fileno()
|
||
|
self.capturedtext = ""
|
||
|
# Create a pipe so the stream can be captured:
|
||
|
self.pipe_out, self.pipe_in = os.pipe()
|
||
|
|
||
|
def __enter__(self):
|
||
|
self.start()
|
||
|
return self
|
||
|
|
||
|
def __exit__(self, type, value, traceback):
|
||
|
self.stop()
|
||
|
|
||
|
def start(self):
|
||
|
"""
|
||
|
Start capturing the stream data.
|
||
|
"""
|
||
|
self.capturedtext = ""
|
||
|
# Save a copy of the stream:
|
||
|
self.streamfd = os.dup(self.origstreamfd)
|
||
|
# Replace the original stream with our write pipe:
|
||
|
os.dup2(self.pipe_in, self.origstreamfd)
|
||
|
if self.threaded:
|
||
|
# Start thread that will read the stream:
|
||
|
self.workerThread = threading.Thread(target=self.readOutput)
|
||
|
self.workerThread.start()
|
||
|
# Make sure that the thread is running and os.read() has executed:
|
||
|
time.sleep(0.01)
|
||
|
|
||
|
def stop(self):
|
||
|
"""
|
||
|
Stop capturing the stream data and save the text in `capturedtext`.
|
||
|
"""
|
||
|
# Print the escape character to make the readOutput method stop:
|
||
|
self.origstream.write(self.escape_char)
|
||
|
# Flush the stream to make sure all our data goes in before
|
||
|
# the escape character:
|
||
|
self.origstream.flush()
|
||
|
if self.threaded:
|
||
|
# wait until the thread finishes so we are sure that
|
||
|
# we have until the last character:
|
||
|
self.workerThread.join()
|
||
|
else:
|
||
|
self.readOutput()
|
||
|
# Close the pipe:
|
||
|
os.close(self.pipe_in)
|
||
|
os.close(self.pipe_out)
|
||
|
# Restore the original stream:
|
||
|
os.dup2(self.streamfd, self.origstreamfd)
|
||
|
# Close the duplicate stream:
|
||
|
os.close(self.streamfd)
|
||
|
|
||
|
def readOutput(self):
|
||
|
"""
|
||
|
Read the stream data (one byte at a time)
|
||
|
and save the text in `capturedtext`.
|
||
|
"""
|
||
|
while True:
|
||
|
char = os.read(self.pipe_out,1).decode(self.origstream.encoding)
|
||
|
if not char or self.escape_char in char:
|
||
|
break
|
||
|
self.capturedtext += char
|