programing

프로세스가 실행되는 동안 하위 프로세스 출력을 지속적으로 인쇄합니다.

shortcode 2022. 11. 24. 21:15
반응형

프로세스가 실행되는 동안 하위 프로세스 출력을 지속적으로 인쇄합니다.

Python-scripts에서 프로그램을 실행하려면 다음 방법을 사용합니다.

def execute(command):
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    output = process.communicate()[0]
    exitCode = process.returncode

    if (exitCode == 0):
        return output
    else:
        raise ProcessException(command, exitCode, output)

가 '나'와 같은 할 때Process.execute("mvn clean install")내 프로그램은 프로세스가 완료될 때까지 기다린 후 내 프로그램의 완전한 출력을 얻습니다.완료하는 데 시간이 걸리는 프로세스를 실행하고 있으면 짜증납니다.

루프에서 완료되기 전에 프로세스 출력을 폴링하여 프로그램에서 프로세스 출력을 한 줄씩 쓸 수 있습니까?

관련이 있을 법한 이 기사를 발견했다.

명령어가 출력되는 즉시 반복기를 사용하여 라인을 처리할 수 있습니다.lines = iter(fd.readline, "")다음은 일반적인 사용 사례를 보여 주는 전체 예입니다(@jfs 지원 덕분에).

from __future__ import print_function # Only Python 2.x
import subprocess

def execute(cmd):
    popen = subprocess.Popen(cmd, stdout=subprocess.PIPE, universal_newlines=True)
    for stdout_line in iter(popen.stdout.readline, ""):
        yield stdout_line 
    popen.stdout.close()
    return_code = popen.wait()
    if return_code:
        raise subprocess.CalledProcessError(return_code, cmd)

# Example
for path in execute(["locate", "a"]):
    print(path, end="")

Python 3에서 stdout 버퍼가 플러시되는 즉시 하위 프로세스의 출력을 한 줄씩 인쇄하려면:

from subprocess import Popen, PIPE, CalledProcessError

with Popen(cmd, stdout=PIPE, bufsize=1, universal_newlines=True) as p:
    for line in p.stdout:
        print(line, end='') # process line here

if p.returncode != 0:
    raise CalledProcessError(p.returncode, p.args)

.p.poll()됩니다.--eof 도of -- -- -- 。 필요 요.iter(p.stdout.readline, '') 버그는 3에서 되었습니다.-- Python 3에서 되었습니다.

Python:subprocess.communicate()로부터의 스트리밍 입력을 읽어냅니다.

네, 스레드 없이 해결할 수 있었습니다(스레드를 사용하는 것이 좋은 이유는 어떤 제안이라도 좋습니다).실행 중에 서브프로세스의 stdout을 가로채기

def execute(command):
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

    # Poll process for new output until finished
    while True:
        nextline = process.stdout.readline()
        if nextline == '' and process.poll() is not None:
            break
        sys.stdout.write(nextline)
        sys.stdout.flush()

    output = process.communicate()[0]
    exitCode = process.returncode

    if (exitCode == 0):
        return output
    else:
        raise ProcessException(command, exitCode, output)

요.print★★★★

import subprocess
import sys

def execute(command):
    subprocess.check_call(command, shell=True, stdout=sys.stdout, stderr=subprocess.STDOUT)

에서는 단순히 의 하위 stdoutsuccess api(성공 api) 예외 api(예외 api)

@토크랜드

코드를 시험하여 3.4로 수정했습니다.Windows dir.dir 명령어는 cmd-file로 저장됩니다.

import subprocess
c = "dir.cmd"

def execute(command):
    popen = subprocess.Popen(command, stdout=subprocess.PIPE,bufsize=1)
    lines_iterator = iter(popen.stdout.readline, b"")
    while popen.poll() is None:
        for line in lines_iterator:
            nline = line.rstrip()
            print(nline.decode("latin"), end = "\r\n",flush =True) # yield line

execute(c)

Python 3에서는 Python > = 3.5를 합니다.subprocess.run뭇매를 맞다

import subprocess

cmd = 'echo foo; sleep 1; echo foo; sleep 2; echo foo'
subprocess.run(cmd, shell=True)

하는 경우, (실행중의 출력은)가도 동작합니다.shell=True) https://docs.python.org/3/library/subprocess.html#subprocess.run

Python 스크립트에서 stdout을 얻기 위해 이 질문에 대한 답변을 시도하는 사람은 Python이 stdout을 버퍼링하므로 stdout을 확인하는 데 시간이 걸릴 수 있습니다.

이 문제는 타깃스크립트에 stdout을 쓸 때마다 다음 사항을 추가하여 해결할 수 있습니다.

sys.stdout.flush()

번째 질문에 이 가장 .stdout로로 your your your 로 직접 이동stdout 같은 할 수 ).stderr(다음 예시와 같이)

p = Popen(cmd, stdout=sys.stdout, stderr=sys.stderr)
p.communicate()

둘 다 읽으려고 하는 경우stdout그리고.stderr동시에 스레드를 사용하여 생각해낸 것은 다음과 같습니다.

import threading
import subprocess
import Queue

class AsyncLineReader(threading.Thread):
    def __init__(self, fd, outputQueue):
        threading.Thread.__init__(self)

        assert isinstance(outputQueue, Queue.Queue)
        assert callable(fd.readline)

        self.fd = fd
        self.outputQueue = outputQueue

    def run(self):
        map(self.outputQueue.put, iter(self.fd.readline, ''))

    def eof(self):
        return not self.is_alive() and self.outputQueue.empty()

    @classmethod
    def getForFd(cls, fd, start=True):
        queue = Queue.Queue()
        reader = cls(fd, queue)

        if start:
            reader.start()

        return reader, queue


process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdoutReader, stdoutQueue) = AsyncLineReader.getForFd(process.stdout)
(stderrReader, stderrQueue) = AsyncLineReader.getForFd(process.stderr)

# Keep checking queues until there is no more output.
while not stdoutReader.eof() or not stderrReader.eof():
   # Process all available lines from the stdout Queue.
   while not stdoutQueue.empty():
       line = stdoutQueue.get()
       print 'Received stdout: ' + repr(line)

       # Do stuff with stdout line.

   # Process all available lines from the stderr Queue.
   while not stderrQueue.empty():
       line = stderrQueue.get()
       print 'Received stderr: ' + repr(line)

       # Do stuff with stderr line.

   # Sleep for a short time to avoid excessive CPU use while waiting for data.
   sleep(0.05)

print "Waiting for async readers to finish..."
stdoutReader.join()
stderrReader.join()

# Close subprocess' file descriptors.
process.stdout.close()
process.stderr.close()

print "Waiting for process to exit..."
returnCode = process.wait()

if returnCode != 0:
   raise subprocess.CalledProcessError(returnCode, command)

저는 이 질문을 하고 싶었을 뿐입니다. 비슷한 질문을 하려고 했지만, 어떤 대답도 제 문제를 해결해주지 못했습니다.그게 누군가에게 도움이 됐으면 좋겠네요!

제 사용 사례에서 외부 프로세스는 다음과 같은 프로세스를 중지합니다.Popen().

이 PoC는 프로세스의 출력을 항상 읽어내고 필요할 때 액세스할 수 있습니다.마지막 결과만 유지되고 다른 모든 출력은 폐기되므로 PIPE의 메모리 부족이 방지됩니다.

import subprocess
import time
import threading
import Queue


class FlushPipe(object):
    def __init__(self):
        self.command = ['python', './print_date.py']
        self.process = None
        self.process_output = Queue.LifoQueue(0)
        self.capture_output = threading.Thread(target=self.output_reader)

    def output_reader(self):
        for line in iter(self.process.stdout.readline, b''):
            self.process_output.put_nowait(line)

    def start_process(self):
        self.process = subprocess.Popen(self.command,
                                        stdout=subprocess.PIPE)
        self.capture_output.start()

    def get_output_for_processing(self):
        line = self.process_output.get()
        print ">>>" + line


if __name__ == "__main__":
    flush_pipe = FlushPipe()
    flush_pipe.start_process()

    now = time.time()
    while time.time() - now < 10:
        flush_pipe.get_output_for_processing()
        time.sleep(2.5)

    flush_pipe.capture_output.join(timeout=0.001)
    flush_pipe.process.kill()

print_date.화이

#!/usr/bin/env python
import time

if __name__ == "__main__":
    while True:
        print str(time.time())
        time.sleep(0.01)

output: 최대 2.5초 인터벌로부터의 출력만 있는 것을 알 수 있습니다.

>>>1520535158.51
>>>1520535161.01
>>>1520535163.51
>>>1520535166.01

적어도 Python3.4에서는 동작합니다.

import subprocess

process = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
for line in process.stdout:
    print(line.decode().strip())

여기 있는 답변 중 어느 것도 제 모든 요구를 충족시키지 못했습니다.

  1. stdout용 스레드 없음(큐도 없음 등)
  2. 다른 상황을 확인해야 하므로 비차단
  3. 필요에 따라 PIPE를 사용하여 스트림 출력, 로그 파일에 쓰기, 출력 문자열 복사본 반환 등 여러 작업을 수행합니다.

약간의 배경:스레드 풀을 관리하기 위해 ThreadPoolExecutor를 사용하고 있습니다.각각의 서브프로세스를 기동해, 동시성을 실행하고 있습니다(Python2.7에서는, 이것은 새로운 3.x에서도 동작합니다).가능한 한 많은 스레드를 출력 수집에만 사용하고 싶지 않습니다(20개의 프로세스 풀은 실행에만 40개의 스레드를 사용합니다.1개는 프로세스 스레드에, 1개는 stdout에 사용합니다).더 많은 정보를 얻을 수 있을 것 같습니다.

여기서 많은 예외를 제거하고 있기 때문에, 이것은 실가동중의 코드에 근거하고 있습니다.내가 복사해서 붙여넣어서 망치지 않았으면 좋겠어.또, 피드백은 매우 환영합니다!

import time
import fcntl
import subprocess
import time

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

# Make stdout non-blocking when using read/readline
proc_stdout = proc.stdout
fl = fcntl.fcntl(proc_stdout, fcntl.F_GETFL)
fcntl.fcntl(proc_stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)

def handle_stdout(proc_stream, my_buffer, echo_streams=True, log_file=None):
    """A little inline function to handle the stdout business. """
    # fcntl makes readline non-blocking so it raises an IOError when empty
    try:
        for s in iter(proc_stream.readline, ''):   # replace '' with b'' for Python 3
            my_buffer.append(s)

            if echo_streams:
                sys.stdout.write(s)

            if log_file:
                log_file.write(s)
    except IOError:
        pass

# The main loop while subprocess is running
stdout_parts = []
while proc.poll() is None:
    handle_stdout(proc_stdout, stdout_parts)

    # ...Check for other things here...
    # For example, check a multiprocessor.Value('b') to proc.kill()

    time.sleep(0.01)

# Not sure if this is needed, but run it again just to be sure we got it all?
handle_stdout(proc_stdout, stdout_parts)

stdout_str = "".join(stdout_parts)  # Just to demo

여기에 추가되는 오버헤드는 확실하지만 내 경우에는 걱정할 필요가 없습니다.기능적으로는 내가 필요한 것을 한다.유일하게 해결되지 않은 것은 이것이 로그 메시지에서 완벽하게 동작하는 이유인데 몇 가지 문제가 있습니다.print메시지는 나중에 한꺼번에 표시됩니다.

import time
import sys
import subprocess
import threading
import queue

cmd='esptool.py --chip esp8266 write_flash -z 0x1000 /home/pi/zero2/fw/base/boot_40m.bin'
cmd2='esptool.py --chip esp32 -b 115200 write_flash -z 0x1000 /home/pi/zero2/fw/test.bin'
cmd3='esptool.py --chip esp32 -b 115200 erase_flash'

class ExecutorFlushSTDOUT(object):
    def __init__(self,timeout=15):
        self.process = None
        self.process_output = queue.Queue(0)
        self.capture_output = threading.Thread(target=self.output_reader)
        self.timeout=timeout
        self.result=False
        self.validator=None
        
    def output_reader(self):
        start=time.time()
        while self.process.poll() is None and (time.time() - start) < self.timeout:
            try:
                if not self.process_output.full():
                    line=self.process.stdout.readline()
                    if line:
                        line=line.decode().rstrip("\n")
                        start=time.time()
                        self.process_output.put(line)
                        if self.validator:
                            if self.validator in line: print("Valid");self.result=True

            except:pass
        self.process.kill()
        return
            
    def start_process(self,cmd_list,callback=None,validator=None,timeout=None):
        if timeout: self.timeout=timeout
        self.validator=validator
        self.process = subprocess.Popen(cmd_list,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
        self.capture_output.start()
        line=None
        self.result=False
        while self.process.poll() is None:
            try:
                if not self.process_output.empty():
                    line = self.process_output.get()
                if line:
                    if callback:callback(line)
                    #print(line)
                    line=None
            except:pass                
        error = self.process.returncode
        if error:
            print("Error Found",str(error))
            raise RuntimeError(error)
        return self.result

execute = ExecutorFlushSTDOUT()

def liveOUTPUT(line):
    print("liveOUTPUT",line)
    try:
        if "Writing" in line:
            line=''.join([n for n in line.split(' ')[3] if n.isdigit()])
            print("percent={}".format(line))
    except Exception as e:
        pass
    


result=execute.start_process(cmd2,callback=liveOUTPUT,validator="Hash of data verified.")

print("Finish",result)

@jfs의 훌륭한 답변을 바탕으로 플레이를 위한 완전한 작업 예를 제시하겠습니다.Python 3.7 이상이 필요합니다.

sub.py

import time

for i in range(10):
    print(i, flush=True)
    time.sleep(1)

main.py

from subprocess import PIPE, Popen
import sys

with Popen([sys.executable, 'sub.py'], bufsize=1, stdout=PIPE, text=True) as sub:
    for line in sub.stdout:
        print(line, end='')

자스크립트에서 사용되는 인수에 주의해 주세요.

단순한 것이 복잡한 것보다 낫다.

os에는 모듈 'Module'이되어 있습니다.system코드를 실행하여 출력을 확인해야 합니다.

import os
os.system("python --version")
# Output
"""
Python 3.8.6
0
"""

후에는 됩니다.0.

Python 3.6에서는 다음을 사용했습니다.

import subprocess

cmd = "command"
output = subprocess.call(cmd, shell=True)
print(process)

언급URL : https://stackoverflow.com/questions/4417546/constantly-print-subprocess-output-while-process-is-running

반응형