프로세스가 실행되는 동안 하위 프로세스 출력을 지속적으로 인쇄합니다.
Python-scripts에서 프로그램을 실행하려면 다음 방법을 사용합니다.
def execute(command):
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
output = process.communicate()[0]
exitCode = process.returncode
if (exitCode == 0):
return output
else:
raise ProcessException(command, exitCode, output)
가 '나'와 같은 할 때Process.execute("mvn clean install")
내 프로그램은 프로세스가 완료될 때까지 기다린 후 내 프로그램의 완전한 출력을 얻습니다.완료하는 데 시간이 걸리는 프로세스를 실행하고 있으면 짜증납니다.
루프에서 완료되기 전에 프로세스 출력을 폴링하여 프로그램에서 프로세스 출력을 한 줄씩 쓸 수 있습니까?
관련이 있을 법한 이 기사를 발견했다.
명령어가 출력되는 즉시 반복기를 사용하여 라인을 처리할 수 있습니다.lines = iter(fd.readline, "")
다음은 일반적인 사용 사례를 보여 주는 전체 예입니다(@jfs 지원 덕분에).
from __future__ import print_function # Only Python 2.x
import subprocess
def execute(cmd):
popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True)
for stdout_line in iter(popen.stdout.readline, ""):
yield stdout_line
popen.stdout.close()
return_code = popen.wait()
if return_code:
raise subprocess.CalledProcessError(return_code, cmd)
# Example
for path in execute(["locate", "a"]):
print(path, end="")
Python 3에서 stdout 버퍼가 플러시되는 즉시 하위 프로세스의 출력을 한 줄씩 인쇄하려면:
from subprocess import Popen, PIPE, CalledProcessError
with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
for line in p.stdout:
print(line, end='') # process line here
if p.returncode != 0:
raise CalledProcessError(p.returncode, p.args)
.p.poll()
됩니다.--eof 도of -- -- -- 。 필요 요.iter(p.stdout.readline, '')
버그는 3에서 되었습니다.-- Python 3에서 되었습니다.
「Python:subprocess.communicate()로부터의 스트리밍 입력을 읽어냅니다.
네, 스레드 없이 해결할 수 있었습니다(스레드를 사용하는 것이 좋은 이유는 어떤 제안이라도 좋습니다).실행 중에 서브프로세스의 stdout을 가로채기
def execute(command):
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Poll process for new output until finished
while True:
nextline = process.stdout.readline()
if nextline == '' and process.poll() is not None:
break
sys.stdout.write(nextline)
sys.stdout.flush()
output = process.communicate()[0]
exitCode = process.returncode
if (exitCode == 0):
return output
else:
raise ProcessException(command, exitCode, output)
요.print
★★★★
import subprocess
import sys
def execute(command):
subprocess.check_call(command, shell=True, stdout=sys.stdout, stderr=subprocess.STDOUT)
에서는 단순히 의 하위 stdout
success api(성공 api) 예외 api(예외 api)
@토크랜드
코드를 시험하여 3.4로 수정했습니다.Windows dir.dir 명령어는 cmd-file로 저장됩니다.
import subprocess
c = "dir.cmd"
def execute(command):
popen = subprocess.Popen(command, stdout=subprocess.PIPE,bufsize=1)
lines_iterator = iter(popen.stdout.readline, b"")
while popen.poll() is None:
for line in lines_iterator:
nline = line.rstrip()
print(nline.decode("latin"), end = "\r\n",flush =True) # yield line
execute(c)
Python 3에서는 Python > = 3.5를 합니다.subprocess.run
뭇매를 맞다
import subprocess
cmd = 'echo foo; sleep 1; echo foo; sleep 2; echo foo'
subprocess.run(cmd, shell=True)
하는 경우, (실행중의 출력은)가도 동작합니다.shell=True
) https://docs.python.org/3/library/subprocess.html#subprocess.run
Python 스크립트에서 stdout을 얻기 위해 이 질문에 대한 답변을 시도하는 사람은 Python이 stdout을 버퍼링하므로 stdout을 확인하는 데 시간이 걸릴 수 있습니다.
이 문제는 타깃스크립트에 stdout을 쓸 때마다 다음 사항을 추가하여 해결할 수 있습니다.
sys.stdout.flush()
번째 질문에 이 가장 .stdout
로로 your your your 로 직접 이동stdout
같은 할 수 ).stderr
(다음 예시와 같이)
p = Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
p.communicate()
둘 다 읽으려고 하는 경우stdout
그리고.stderr
동시에 스레드를 사용하여 생각해낸 것은 다음과 같습니다.
import threading
import subprocess
import Queue
class AsyncLineReader(threading.Thread):
def __init__(self, fd, outputQueue):
threading.Thread.__init__(self)
assert isinstance(outputQueue, Queue.Queue)
assert callable(fd.readline)
self.fd = fd
self.outputQueue = outputQueue
def run(self):
map(self.outputQueue.put, iter(self.fd.readline, ''))
def eof(self):
return not self.is_alive() and self.outputQueue.empty()
@classmethod
def getForFd(cls, fd, start=True):
queue = Queue.Queue()
reader = cls(fd, queue)
if start:
reader.start()
return reader, queue
process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdoutReader, stdoutQueue) = AsyncLineReader.getForFd(process.stdout)
(stderrReader, stderrQueue) = AsyncLineReader.getForFd(process.stderr)
# Keep checking queues until there is no more output.
while not stdoutReader.eof() or not stderrReader.eof():
# Process all available lines from the stdout Queue.
while not stdoutQueue.empty():
line = stdoutQueue.get()
print 'Received stdout: ' + repr(line)
# Do stuff with stdout line.
# Process all available lines from the stderr Queue.
while not stderrQueue.empty():
line = stderrQueue.get()
print 'Received stderr: ' + repr(line)
# Do stuff with stderr line.
# Sleep for a short time to avoid excessive CPU use while waiting for data.
sleep(0.05)
print "Waiting for async readers to finish..."
stdoutReader.join()
stderrReader.join()
# Close subprocess' file descriptors.
process.stdout.close()
process.stderr.close()
print "Waiting for process to exit..."
returnCode = process.wait()
if returnCode != 0:
raise subprocess.CalledProcessError(returnCode, command)
저는 이 질문을 하고 싶었을 뿐입니다. 비슷한 질문을 하려고 했지만, 어떤 대답도 제 문제를 해결해주지 못했습니다.그게 누군가에게 도움이 됐으면 좋겠네요!
제 사용 사례에서 외부 프로세스는 다음과 같은 프로세스를 중지합니다.Popen()
.
이 PoC는 프로세스의 출력을 항상 읽어내고 필요할 때 액세스할 수 있습니다.마지막 결과만 유지되고 다른 모든 출력은 폐기되므로 PIPE의 메모리 부족이 방지됩니다.
import subprocess
import time
import threading
import Queue
class FlushPipe(object):
def __init__(self):
self.command = ['python', './print_date.py']
self.process = None
self.process_output = Queue.LifoQueue(0)
self.capture_output = threading.Thread(target=self.output_reader)
def output_reader(self):
for line in iter(self.process.stdout.readline, b''):
self.process_output.put_nowait(line)
def start_process(self):
self.process = subprocess.Popen(self.command,
stdout=subprocess.PIPE)
self.capture_output.start()
def get_output_for_processing(self):
line = self.process_output.get()
print ">>>" + line
if __name__ == "__main__":
flush_pipe = FlushPipe()
flush_pipe.start_process()
now = time.time()
while time.time() - now < 10:
flush_pipe.get_output_for_processing()
time.sleep(2.5)
flush_pipe.capture_output.join(timeout=0.001)
flush_pipe.process.kill()
print_date.화이
#!/usr/bin/env python
import time
if __name__ == "__main__":
while True:
print str(time.time())
time.sleep(0.01)
output: 최대 2.5초 인터벌로부터의 출력만 있는 것을 알 수 있습니다.
>>>1520535158.51
>>>1520535161.01
>>>1520535163.51
>>>1520535166.01
적어도 Python3.4에서는 동작합니다.
import subprocess
process = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
for line in process.stdout:
print(line.decode().strip())
여기 있는 답변 중 어느 것도 제 모든 요구를 충족시키지 못했습니다.
- stdout용 스레드 없음(큐도 없음 등)
- 다른 상황을 확인해야 하므로 비차단
- 필요에 따라 PIPE를 사용하여 스트림 출력, 로그 파일에 쓰기, 출력 문자열 복사본 반환 등 여러 작업을 수행합니다.
약간의 배경:스레드 풀을 관리하기 위해 ThreadPoolExecutor를 사용하고 있습니다.각각의 서브프로세스를 기동해, 동시성을 실행하고 있습니다(Python2.7에서는, 이것은 새로운 3.x에서도 동작합니다).가능한 한 많은 스레드를 출력 수집에만 사용하고 싶지 않습니다(20개의 프로세스 풀은 실행에만 40개의 스레드를 사용합니다.1개는 프로세스 스레드에, 1개는 stdout에 사용합니다).더 많은 정보를 얻을 수 있을 것 같습니다.
여기서 많은 예외를 제거하고 있기 때문에, 이것은 실가동중의 코드에 근거하고 있습니다.내가 복사해서 붙여넣어서 망치지 않았으면 좋겠어.또, 피드백은 매우 환영합니다!
import time
import fcntl
import subprocess
import time
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Make stdout non-blocking when using read/readline
proc_stdout = proc.stdout
fl = fcntl.fcntl(proc_stdout, fcntl.F_GETFL)
fcntl.fcntl(proc_stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
def handle_stdout(proc_stream, my_buffer, echo_streams=True, log_file=None):
"""A little inline function to handle the stdout business. """
# fcntl makes readline non-blocking so it raises an IOError when empty
try:
for s in iter(proc_stream.readline, ''): # replace '' with b'' for Python 3
my_buffer.append(s)
if echo_streams:
sys.stdout.write(s)
if log_file:
log_file.write(s)
except IOError:
pass
# The main loop while subprocess is running
stdout_parts = []
while proc.poll() is None:
handle_stdout(proc_stdout, stdout_parts)
# ...Check for other things here...
# For example, check a multiprocessor.Value('b') to proc.kill()
time.sleep(0.01)
# Not sure if this is needed, but run it again just to be sure we got it all?
handle_stdout(proc_stdout, stdout_parts)
stdout_str = "".join(stdout_parts) # Just to demo
여기에 추가되는 오버헤드는 확실하지만 내 경우에는 걱정할 필요가 없습니다.기능적으로는 내가 필요한 것을 한다.유일하게 해결되지 않은 것은 이것이 로그 메시지에서 완벽하게 동작하는 이유인데 몇 가지 문제가 있습니다.print
메시지는 나중에 한꺼번에 표시됩니다.
import time
import sys
import subprocess
import threading
import queue
cmd='esptool.py --chip esp8266 write_flash -z 0x1000 /home/pi/zero2/fw/base/boot_40m.bin'
cmd2='esptool.py --chip esp32 -b 115200 write_flash -z 0x1000 /home/pi/zero2/fw/test.bin'
cmd3='esptool.py --chip esp32 -b 115200 erase_flash'
class ExecutorFlushSTDOUT(object):
def __init__(self,timeout=15):
self.process = None
self.process_output = queue.Queue(0)
self.capture_output = threading.Thread(target=self.output_reader)
self.timeout=timeout
self.result=False
self.validator=None
def output_reader(self):
start=time.time()
while self.process.poll() is None and (time.time() - start) < self.timeout:
try:
if not self.process_output.full():
line=self.process.stdout.readline()
if line:
line=line.decode().rstrip("\n")
start=time.time()
self.process_output.put(line)
if self.validator:
if self.validator in line: print("Valid");self.result=True
except:pass
self.process.kill()
return
def start_process(self,cmd_list,callback=None,validator=None,timeout=None):
if timeout: self.timeout=timeout
self.validator=validator
self.process = subprocess.Popen(cmd_list,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
self.capture_output.start()
line=None
self.result=False
while self.process.poll() is None:
try:
if not self.process_output.empty():
line = self.process_output.get()
if line:
if callback:callback(line)
#print(line)
line=None
except:pass
error = self.process.returncode
if error:
print("Error Found",str(error))
raise RuntimeError(error)
return self.result
execute = ExecutorFlushSTDOUT()
def liveOUTPUT(line):
print("liveOUTPUT",line)
try:
if "Writing" in line:
line=''.join([n for n in line.split(' ')[3] if n.isdigit()])
print("percent={}".format(line))
except Exception as e:
pass
result=execute.start_process(cmd2,callback=liveOUTPUT,validator="Hash of data verified.")
print("Finish",result)
@jfs의 훌륭한 답변을 바탕으로 플레이를 위한 완전한 작업 예를 제시하겠습니다.Python 3.7 이상이 필요합니다.
sub.py
import time
for i in range(10):
print(i, flush=True)
time.sleep(1)
main.py
from subprocess import PIPE, Popen
import sys
with Popen([sys.executable, 'sub.py'], bufsize=1, stdout=PIPE, text=True) as sub:
for line in sub.stdout:
print(line, end='')
자스크립트에서 사용되는 인수에 주의해 주세요.
단순한 것이 복잡한 것보다 낫다.
os
에는 모듈 'Module'이되어 있습니다.system
코드를 실행하여 출력을 확인해야 합니다.
import os
os.system("python --version")
# Output
"""
Python 3.8.6
0
"""
후에는 됩니다.0
.
Python 3.6에서는 다음을 사용했습니다.
import subprocess
cmd = "command"
output = subprocess.call(cmd, shell=True)
print(process)
언급URL : https://stackoverflow.com/questions/4417546/constantly-print-subprocess-output-while-process-is-running
'programing' 카테고리의 다른 글
Python / NumPy에서 메시 그리드의 목적은 무엇입니까? (0) | 2022.11.24 |
---|---|
인덱스로 두 데이터 프레임 병합 (0) | 2022.11.24 |
JDBC에서 접속 풀을 확립하려면 어떻게 해야 합니까? (0) | 2022.11.24 |
Django가 있는 AngularJS - 템플릿 태그가 충돌합니다. (0) | 2022.11.24 |
Vue.js - 키와 값을 사용하여 객체를 객체 배열로 변환 (0) | 2022.11.24 |