子进程命令的实时输出

2024-12-18 08:38:00
admin
原创
222
摘要:问题描述:我正在使用 Python 脚本作为流体动力学代码的驱动程序。当需要运行模拟时,我使用它subprocess.Popen来运行代码,从 --- 收集输出stdout并将stderr其存入subprocess.PIPE---,然后我可以打印(并保存到日志文件)输出信息并检查是否有任何错误。问题是,我不知...

问题描述:

我正在使用 Python 脚本作为流体动力学代码的驱动程序。当需要运行模拟时,我使用它subprocess.Popen来运行代码,从 --- 收集输出stdout并将stderr其存入subprocess.PIPE---,然后我可以打印(并保存到日志文件)输出信息并检查是否有任何错误。问题是,我不知道代码进展如何。如果我直接从命令行运行它,它会给我输出关于它处于什么迭代、什么时间、下一个时间步骤是什么等的信息。

有没有办法既可以存储输出(用于日志记录和错误检查),又可以产生实时流输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed

%s

" % (errors)
    success = False

if( errors ): log_file.write("

%s

" % errors)

最初我通过管道传输,run_command以便tee副本直接进入日志文件,并且流仍然直接输出到终端 - 但这样我就无法存储任何错误(据我所知)。


我目前的临时解决方案:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一个终端中运行tail -f log.txt(st log_file = 'log.txt')。


解决方案 1:

Python 3 的 TLDR:

import subprocess
import sys

with open("test.log", "wb") as f:
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), b""):
        sys.stdout.buffer.write(c)
        f.buffer.write(c)

您可以通过两种方式执行此操作,要么从readreadline函数创建一个迭代器并执行:

import subprocess
import sys

# replace "w" with "wb" for Python 3
with open("test.log", "w") as f:
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    # replace "" with b'' for Python 3
    for c in iter(lambda: process.stdout.read(1), ""):
        sys.stdout.write(c)
        f.write(c)

或者

import subprocess
import sys

# replace "w" with "wb" for Python 3
with open("test.log", "w") as f:
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    # replace "" with b"" for Python 3
    for line in iter(process.stdout.readline, ""):
        sys.stdout.write(line)
        f.write(line)

或者您可以创建一个reader和一个writer文件。将传递writerPopen并从中读取reader

import io
import time
import subprocess
import sys

filename = "test.log"
with io.open(filename, "wb") as writer, io.open(filename, "rb", 1) as reader:
    process = subprocess.Popen(command, stdout=writer)
    while process.poll() is None:
        sys.stdout.write(reader.read())
        time.sleep(0.5)
    # Read the remaining
    sys.stdout.write(reader.read())

test.log这样,您就可以在标准输出中写入数据。

文件方法的唯一优点是您的代码不会阻塞。因此,您可以同时做任何您想做的事情,并以reader非阻塞方式随时从中读取。当您使用时PIPEreadreadline函数将分别阻塞,直到将一个字符写入管道或将一行写入管道。

解决方案 2:

执行摘要(或“tl;dr”版本):当最多只有一个时,它很容易subprocess.PIPE,否则就很难。

现在可能该是解释一下subprocess.Popen其工作原理的时候了。

(警告:这是针对 Python 2.x 的,尽管 3.x 类似;而且我对 Windows 版本相当模糊。我对 POSIX 的东西了解得更透彻。)

Popen函数需要同时处理零到三个 I/O 流。照例,它们分别表示为stdinstdoutstderr

您可以提供:

  • None,表示您不想重定向流。它将照常继承这些。请注意,至少在 POSIX 系统上,这并不意味着它将使用 Python 的sys.stdout,而只是 Python 的实际stdout;请参阅最后的演示。

  • 一个int值。这是一个“原始”文件描述符(至少在 POSIX 中)。(旁注:PIPESTDOUT实际上int在内部是 ,但是是“不可能”的描述符,-1 和 -2。)

  • 流 - 实际上是任何具有fileno方法的对象。 Popen将使用 找到该流的描述符stream.fileno(),然后继续获取int值。

  • subprocess.PIPE,表明 Python 应该创建一个管道。

  • subprocess.STDOUTstderr仅限 ):告诉 Python 使用与 相同的描述符。这仅在您为 提供了(非 )值stdout时才有意义,即使如此,也仅在您设置 时才需要。(否则,您只需提供与 相同的参数,例如。)None`stdoutstdout=subprocess.PIPEstdout`Popen(..., stdout=stream, stderr=stream)

最简单的情况(无管道)

如果您不重定向任何内容(保留所有三个作为默认None值或提供显式None),Pipe则相当容易。它只需要分离子进程并让其运行。或者,如果您重定向到非PIPE- -int或流的fileno()- - 这仍然很容易,因为操作系统会完成所有工作。Python 只需要分离子进程,将其 stdin、stdout 和/或 stderr 连接到提供的文件描述符。

仍然简单的情况:一个管道

如果您只重定向一个流,Pipe事情仍然很简单。让我们一次选择一个流并观看。

假设您想要提供一些stdin,但释放stdoutstderr取消重定向,或转到文件描述符。作为父进程,您的 Python 程序只需使用write()将数据发送到管道即可。您可以自己执行此操作,例如:

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data
') # etc

或者您可以将 stdin 数据传递给proc.communicate(),然后它会执行stdin.write上面显示的操作。没有输出返回,因此communicate()只有另一个实际工作:它还会为您关闭管道。(如果您不调用,proc.communicate()则必须调用proc.stdin.close()来关闭管道,以便子进程知道没有更多数据通过。)

假设您想捕获stdout但不要stdin管它stderr。同样,这很容易:只需调用proc.stdout.read()(或等效)直到没有更多输出。由于proc.stdout()是普通的 Python I/O 流,您可以在其上使用所有常规构造,例如:

for line in proc.stdout:

或者,您也可以使用proc.communicate(),它只会为您完成操作read()

如果仅想捕获stderr,其工作原理与 相同stdout

在事情变得困难之前,还有一个技巧。假设您想要捕获stdout,并且还捕获stderr,但在与 stdout 相同的管道上:

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

在这种情况下,这是subprocess“作弊”!好吧,它必须这样做,所以它并不是真正的作弊:它启动子进程,并将其 stdout 和 stderr 定向到(单个)管道描述符中,该管道描述符会反馈给其父进程(Python)。在父进程方面,同样只有一个管道描述符用于读取输出。所有“stderr”输出都显示在 中proc.stdout,如果您调用proc.communicate(),则 stderr 结果(元组中的第二个值)将是None,而不是字符串。

困难的情况:两个或多个管道

当你想使用至少两个管道时,问题就出现了。事实上,代码subprocess本身有这一点:

def communicate(self, input=None):
    ...
    # Optimization: If we are only using one pipe, or no pipe at
    # all, using select() or threads is unnecessary.
    if [self.stdin, self.stdout, self.stderr].count(None) >= 2:

但是,唉,这里我们至少制作了两个,也许三个不同的管道,所以count(None)返回 1 或 0。我们必须用困难的方式做事。

在 Windows 上,这用于累积和的threading.Thread结果,并让父线程传递输入数据(然后关闭管道)。self.stdout`self.stderr`self.stdin

在 POSIX 上,如果可用则使用poll,否则使用select,以累积输出并传递 stdin 输入。所有这些都在(单个)父进程/线程中运行。

这里需要线程或轮询/选择来避免死锁。例如,假设我们将所有三个流重定向到三个单独的管道。进一步假设在写入过程暂停之前,可以塞入管道的数据量有一个小限制,等待读取过程从另一端“清理”管道。让我们将这个小限制设置为一个字节,仅供说明。(事实上这就是工作原理,只是限制远大于一个字节。)

如果父进程(Python)尝试写入几个字节(例如,`'go
'写入)proc.stdin`,则第一个字节进入,然后第二个字节导致 Python 进程挂起,等待子进程读取第一个字节,清空管道。

同时,假设子进程决定打印友好的“你好!不要惊慌!”问候语。H进入其标准输出管道,但e导致其暂停,等待其父进程读取H,清空标准输出管道。

现在我们陷入了困境:Python 进程处于睡眠状态,等待说完“go”,子进程也处于睡眠状态,等待说完“Hello! Don't Panic!”。

代码subprocess.Popen通过线程或 select/poll 避免了这个问题。当字节可以通过管道时,它们就可以通过。当它们不能通过时,只有一个线程(而不是整个进程)必须休眠 — 或者,在 select/poll 的情况下,Python 进程同时等待“可以写入”或“数据可用”,只有当有空间时才写入进程的标准输入,只有当数据准备好时才读取其标准输出和/或标准错误。一旦所有标准输入数据(如果有)都已发送并且所有标准输出和/或标准错误数据都已累积,proc.communicate()代码(实际上_communicate是处理棘手情况的地方)就会返回。

如果您想要在两个不同的管道上读取stdoutstderr(无论是否有任何stdin重定向),您也需要避免死锁。这里的死锁场景有所不同——stderr当您从中提取数据时,子进程向写入一些长数据stdout,或反之亦然——但它仍然存在。


演示

我承诺要证明,在未重定向的情况下,Python subprocesses 会写入底层 stdout,而不是sys.stdout。因此,以下是一些代码:

from cStringIO import StringIO
import os
import subprocess
import sys

def show1():
   print 'start show1'
   save = sys.stdout
   sys.stdout = StringIO()
   print 'sys.stdout being buffered'
   proc = subprocess.Popen(['echo', 'hello'])
   proc.wait()
   in_stdout = sys.stdout.getvalue()
   sys.stdout = save
   print 'in buffer:', in_stdout

def show2():
   print 'start show2'
   save = sys.stdout
   sys.stdout = open(os.devnull, 'w')
   print 'after redirect sys.stdout'
   proc = subprocess.Popen(['echo', 'hello'])
   proc.wait()
   sys.stdout = save

show1()
show2()

运行时:

$ python out.py
start show1
hello
in buffer: sys.stdout being buffered

start show2
hello

请注意,如果添加 ,第一个例程将失败stdout=sys.stdout,因为StringIO对象没有。如果添加 ,fileno第二个例程将省略 ,因为已重定向到。hello`stdout=sys.stdoutsys.stdoutos.devnull`

(如果重定向 Python 的文件描述符 1,则子进程遵循该重定向。该调用将生成一个大于 2open(os.devnull, 'w')的流。)fileno()

解决方案 3:

我们还可以使用默认文件迭代器来读取 stdout,而不是使用 iter 构造readline()

import subprocess
import sys

process = subprocess.Popen(
    your_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
for line in process.stdout:
    sys.stdout.write(line)

解决方案 4:

除了所有这些答案之外,还有一个简单的方法如下:

process = subprocess.Popen(your_command, stdout=subprocess.PIPE)

while process.stdout.readable():
    line = process.stdout.readline()

    if not line:
        break

    print(line.strip())

只要可读,就循环遍历可读流,如果得到空结果,则停止。

这里的关键是,只要有输出,readline()就返回一行(`
`末尾带有),如果真的在末尾,则返回空。

希望这对某人有帮助。

解决方案 5:

如果您需要的只是输出在控制台上可见,那么对我来说最简单的解决方案是将以下参数传递给Popen

with Popen(cmd, stdout=sys.stdout, stderr=sys.stderr) as proc:

它将使用你的python脚本的stdio文件句柄

解决方案 6:

如果您能够使用第三方库,您可能能够使用类似的东西sarge(披露:我是它的维护者)。此库允许非阻塞地访问来自子进程的输出流 - 它位于subprocess模块之上。

解决方案 7:

解决方案 1:实时并发stdout记录stderr

一个简单的解决方案,将 stdout 和 stderr 同时逐行实时记录日志文件中。

import subprocess as sp
from concurrent.futures import ThreadPoolExecutor


def log_popen_pipe(p, stdfile):

    with open("mylog.txt", "w") as f:

        while p.poll() is None:
            f.write(stdfile.readline())
            f.flush()

        # Write the rest from the buffer
        f.write(stdfile.read())


with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    with ThreadPoolExecutor(2) as pool:
        r1 = pool.submit(log_popen_pipe, p, p.stdout)
        r2 = pool.submit(log_popen_pipe, p, p.stderr)
        r1.result()
        r2.result()

解决方案 2:read_popen_pipes()允许你同时实时迭代两个管道(stdout/stderr)的函数

import subprocess as sp
from queue import Queue, Empty
from concurrent.futures import ThreadPoolExecutor


def enqueue_output(file, queue):
    for line in iter(file.readline, ''):
        queue.put(line)
    file.close()


def read_popen_pipes(p):

    with ThreadPoolExecutor(2) as pool:
        q_stdout, q_stderr = Queue(), Queue()

        pool.submit(enqueue_output, p.stdout, q_stdout)
        pool.submit(enqueue_output, p.stderr, q_stderr)

        while True:

            if p.poll() is not None and q_stdout.empty() and q_stderr.empty():
                break

            out_line = err_line = ''

            try:
                out_line = q_stdout.get_nowait()
                err_line = q_stderr.get_nowait()
            except Empty:
                pass

            yield (out_line, err_line)

# The function in use:

with sp.Popen(["ls"], stdout=sp.PIPE, stderr=sp.PIPE, text=True) as p:

    for out_line, err_line in read_popen_pipes(p):
        print(out_line, end='')
        print(err_line, end='')

    p.poll()

解决方案 8:

与之前的答案类似,但以下解决方案在 Windows 上对我有用,使用 Python3 提供了一种常用的方法来实时打印和登录(来源)

def print_and_log(command, logFile):
    with open(logFile, 'wb') as f:
        command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)

        while True:
            output = command.stdout.readline()
            if not output and command.poll() is not None:
                f.close()
                break
            if output:
                f.write(output)
                print(str(output.strip(), 'utf-8'), flush=True)
        return command.poll()

解决方案 9:

一个好但“重量级”的解决方案是使用 Twisted - 见底部。

如果您愿意只使用 stdout,那么类似的方法应该可行:

import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
   stdoutdata = popenobj.stdout.readline()
   if stdoutdata:
      sys.stdout.write(stdoutdata)
   else:
      break
print "Return code", popenobj.returncode

(如果使用 read(),它会尝试读取整个“文件”,这是没用的,我们真正可以使用的是读取管道中当前所有数据的东西)

人们也可以尝试用线程来解决这个问题,例如:

import subprocess
import sys
import threading

popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)

def stdoutprocess(o):
   while True:
      stdoutdata = o.stdout.readline()
      if stdoutdata:
         sys.stdout.write(stdoutdata)
      else:
         break

t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode

现在我们可以通过两个线程来添加 stderr。

但请注意,子进程文档不鼓励直接使用这些文件并建议使用communicate()(主要关注死锁,我认为这不是上述问题)并且解决方案有点笨拙,所以看起来子进程模块真的不能完全胜任这项工作(另请参阅:http: //www.python.org/dev/peps/pep-3145/)我们需要考虑其他事情。

更复杂的解决方案是使用Twisted,如下所示: https: //twistedmatrix.com/documents/11.1.0/core/howto/process.html

使用Twisted执行此操作的方法是使用reactor.spawnprocess()并提供一个ProcessProtocol,然后异步处理输出。Twisted 示例 Python 代码在此处:https://twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py

解决方案 10:

为什么不直接设置stdoutsys.stdout?如果你还需要输出到日志,那么你可以简单地重写 f 的 write 方法。

import sys
import subprocess

class SuperFile(open.__class__):

    def write(self, data):
        sys.stdout.write(data)
        super(SuperFile, self).write(data)

f = SuperFile("log.txt","w+")       
process = subprocess.Popen(command, stdout=f, stderr=f)

解决方案 11:

基于以上所有内容,我建议稍微修改一下版本(python3):

  • while 循环调用 readline (建议的 iter 解决方案似乎对我来说永远阻塞 - Python 3,Windows 7)

  • 结构化,因此在轮询返回后不需要重复处理读取数据None

  • stderr 通过管道传输到 stdout,因此两个输出都被读取

  • 添加代码来获取 cmd 的退出值。

代码:

import subprocess
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT, universal_newlines=True)
while True:
    rd = proc.stdout.readline()
    print(rd, end='')  # and whatever you want to do...
    if not rd:  # EOF
        returncode = proc.poll()
        if returncode is not None:
            break
        time.sleep(0.1)  # cmd closed stdout, but not exited yet

# You may want to check on ReturnCode here

解决方案 12:

没有一个 Pythonic 解决方案对我有用。事实证明proc.stdout.read()或类似方法可能会永远阻塞。

因此,我tee这样使用:

subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash')

如果您已经在使用,那么这个解决方案很方便shell=True

${PIPESTATUS}捕获整个命令链的成功状态(仅在 Bash 中可用)。如果我省略了&& exit ${PIPESTATUS},那么这将始终返回零,因为tee永远不会失败。

unbuffer可能需要立即将每行打印到终端中,而不是等待太长时间直到“管道缓冲区”填满。但是,unbuffer 会吞掉 assert (SIG Abort) 的退出状态...

2>&1还将 stderror 记录到文件中。

解决方案 13:

我找到了一个解决非常复杂问题的简单方法。

  1. stdout 和 stderr 都需要流式传输。

  2. 两者都需要非阻塞:当没有输出时和当输出太多时。

  3. 不想使用Threading或者multiprocessing,也不愿意使用pexpect。

该解决方案使用了我在这里找到的要点

import subprocess as sbp
import fcntl
import os

def non_block_read(output):
    fd = output.fileno()
    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
    try:
        return output.readline()
    except:
        return ""

with sbp.Popen('find / -name fdsfjdlsjf',
                shell=True,
                universal_newlines=True,
                encoding='utf-8',
                bufsize=1,
                stdout=sbp.PIPE,
                stderr=sbp.PIPE) as p:
    while True:
        out = non_block_read(p.stdout)
        err = non_block_read(p.stderr)
        if out:
            print(out, end='')
        if err:
            print('E: ' + err, end='')
        if p.poll() is not None:
            break

解决方案 14:

看起来行缓冲输出对你有用,在这种情况下,类似下面的方法可能适合。(警告:未经测试。)这将仅实时提供子进程的标准输出。如果你想实时同时拥有标准错误和标准输出,你必须对 做一些更复杂的事情select

proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while proc.poll() is None:
    line = proc.stdout.readline()
    print line
    log_file.write(line + '
')
# Might still be data on stdout at this point.  Grab any
# remainder.
for line in proc.stdout.read().split('
'):
    print line
    log_file.write(line + '
')
# Do whatever you want with proc.stderr here...

解决方案 15:

我尝试过的所有上述解决方案要么无法分离 stderr 和 stdout 输出(多个管道),要么当 OS 管道缓冲区已满时永久阻塞,这种情况发生在您正在运行的命令输出太快时(子进程的 python poll() 手册上有关于此问题的警告)。我发现唯一可靠的方法是通过 select,但这是一个仅限 posix 的解决方案:

import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    poller = select.poll()
    poller.register(p.stdout, select.POLLIN)
    poller.register(p.stderr, select.POLLIN)

    coutput=''
    cerror=''
    fdhup={}
    fdhup[p.stdout.fileno()]=0
    fdhup[p.stderr.fileno()]=0
    while sum(fdhup.values()) < len(fdhup):
        try:
            r = poller.poll(1)
        except select.error, err:
            if err.args[0] != EINTR:
                raise
            r=[]
        for fd, flags in r:
            if flags & (select.POLLIN | select.POLLPRI):
                c = os.read(fd, 1024)
                if rtoutput:
                    sys.stdout.write(c)
                    sys.stdout.flush()
                if fd == p.stderr.fileno():
                    cerror+=c
                else:
                    coutput+=c
            else:
                fdhup[fd]=1
    return p.poll(), coutput.strip(), cerror.strip()

解决方案 16:

我认为该subprocess.communicate方法有点误导:它实际上填充了您在中指定的stdoutstderrsubprocess.Popen

subprocess.PIPE但是,从您可以提供给subprocess.Popenstdoutstderr参数的读取最终会填满操作系统管道缓冲区并使您的应用程序死锁(特别是如果您有多个必须使用的进程/线程subprocess)。

我提出的解决方案是向stdoutstderr提供文件 - 并读取文件的内容,而不是从死锁中读取PIPE。这些文件可以tempfile.NamedTemporaryFile()- 在被写入时也可以访问并读取subprocess.communicate

以下是一个示例用法:

try:
    with ProcessRunner(
        ("python", "task.py"), env=os.environ.copy(), seconds_to_wait=0.01
    ) as process_runner:
        for out in process_runner:
            print(out)
except ProcessError as e:
    print(e.error_message)
    raise

这是可以直接使用的源代码,我提供了尽可能多的注释来解释它的作用:

如果您使用的是 python 2,请确保首先从 pypi 安装最新版本的subprocess32包。

import os
import sys
import threading
import time
import tempfile
import logging

if os.name == 'posix' and sys.version_info[0] < 3:
    # Support python 2
    import subprocess32 as subprocess
else:
    # Get latest and greatest from python 3
    import subprocess

logger = logging.getLogger(__name__)


class ProcessError(Exception):
    """Base exception for errors related to running the process"""


class ProcessTimeout(ProcessError):
    """Error that will be raised when the process execution will exceed a timeout"""


class ProcessRunner(object):
    def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
        """
        Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
        Process Runner. This is a class that should be used as a context manager - and that provides an iterator
        for reading captured output from subprocess.communicate in near realtime.

        Example usage:


        try:
            with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        except ProcessError as e:
            print(e.error_message)
            raise

        :param args: same as subprocess.Popen
        :param env: same as subprocess.Popen
        :param timeout: same as subprocess.communicate
        :param bufsize: same as subprocess.Popen
        :param seconds_to_wait: time to wait between each readline from the temporary file
        :param kwargs: same as subprocess.Popen
        """
        self._seconds_to_wait = seconds_to_wait
        self._process_has_timed_out = False
        self._timeout = timeout
        self._process_done = False
        self._std_file_handle = tempfile.NamedTemporaryFile()
        self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
                                         stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
        self._thread = threading.Thread(target=self._run_process)
        self._thread.daemon = True

    def __enter__(self):
        self._thread.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._thread.join()
        self._std_file_handle.close()

    def __iter__(self):
        # read all output from stdout file that subprocess.communicate fills
        with open(self._std_file_handle.name, 'r') as stdout:
            # while process is alive, keep reading data
            while not self._process_done:
                out = stdout.readline()
                out_without_trailing_whitespaces = out.rstrip()
                if out_without_trailing_whitespaces:
                    # yield stdout data without trailing 

                    yield out_without_trailing_whitespaces
                else:
                    # if there is nothing to read, then please wait a tiny little bit
                    time.sleep(self._seconds_to_wait)

            # this is a hack: terraform seems to write to buffer after process has finished
            out = stdout.read()
            if out:
                yield out

        if self._process_has_timed_out:
            raise ProcessTimeout('Process has timed out')

        if self._process.returncode != 0:
            raise ProcessError('Process has failed')

    def _run_process(self):
        try:
            # Start gathering information (stdout and stderr) from the opened process
            self._process.communicate(timeout=self._timeout)
            # Graceful termination of the opened process
            self._process.terminate()
        except subprocess.TimeoutExpired:
            self._process_has_timed_out = True
            # Force termination of the opened process
            self._process.kill()

        self._process_done = True

    @property
    def return_code(self):
        return self._process.returncode



解决方案 17:

这是我在其中一个项目中使用的类。它将子进程的输出重定向到日志。起初,我尝试简单地覆盖写入方法,但这并不奏效,因为子进程永远不会调用它(重定向发生在文件描述符级别)。所以我使用自己的管道,类似于在子进程模块中执行的方式。这样做的好处是将所有日志记录/打印逻辑封装在适配器中,您只需将记录器的实例传递给Popensubprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))

class LogAdapter(threading.Thread):

    def __init__(self, logname, level = logging.INFO):
        super().__init__()
        self.log = logging.getLogger(logname)
        self.readpipe, self.writepipe = os.pipe()

        logFunctions = {
            logging.DEBUG: self.log.debug,
            logging.INFO: self.log.info,
            logging.WARN: self.log.warn,
            logging.ERROR: self.log.warn,
        }

        try:
            self.logFunction = logFunctions[level]
        except KeyError:
            self.logFunction = self.log.info

    def fileno(self):
        #when fileno is called this indicates the subprocess is about to fork => start thread
        self.start()
        return self.writepipe

    def finished(self):
       """If the write-filedescriptor is not closed this thread will
       prevent the whole program from exiting. You can use this method
       to clean up after the subprocess has terminated."""
       os.close(self.writepipe)

    def run(self):
        inputFile = os.fdopen(self.readpipe)

        while True:
            line = inputFile.readline()

            if len(line) == 0:
                #no new data was added
                break

            self.logFunction(line.strip())

如果您不需要日志记录而只是想使用,print()您显然可以删除大部分代码并缩短类。您还可以通过__enter__and__exit__方法对其进行扩展并调用,finished以便__exit__您可以轻松地将其用作上下文。

解决方案 18:

import os

def execute(cmd, callback):
    for line in iter(os.popen(cmd).readline, ''): 
            callback(line[:-1])

execute('ls -a', print)

解决方案 19:

遇到了同样的问题,并制定了一个简单而干净的解决方案process.sdtout.read1(),它完全满足我在 python3 中的需求。

以下是使用该命令的演示ping(需要互联网连接):

from subprocess import Popen, PIPE

cmd = "ping 8.8.8.8"
proc = Popen([cmd], shell=True, stdout=PIPE)
while True:
    print(proc.stdout.read1())

由于 ping 命令会实时报告其数据,因此每隔一秒左右,python 控制台中就会打印一行新行。

解决方案 20:

我对 @Rotareti 的这个答案做了一点小修改,它对于仅生成 std​​err 的进程来说工作得很好。原始答案使用了一个 try-catch 块,当 stdout 为空时阻止获取 stderr 输出。我还在这里添加了对超时的支持。

# https://stackoverflow.com/a/57084403
from typing import List, Tuple
import sys, io, queue, psutil
import subprocess
from concurrent.futures import ThreadPoolExecutor

    
class Shell:
    def __init__(
        self,
        shell_exec: bool = True,
        print_out: bool = True,
        print_cmd: bool = True,
        print_file: io.TextIOWrapper | None = None,
        return_list: bool = False,
    ) -> None:
        self.shell_exec = shell_exec
        self.print_out = print_out
        self.print_cmd = print_cmd
        self.print_file = print_file
        self.return_list = return_list


    def _read_popen_pipes(self, p: subprocess.Popen, timeout_sec: float|None = None):

        def _enqueue_output(file: io.TextIOWrapper, q: queue.Queue):
            for line in iter(file.readline, ''):
                q.put(line)
            file.close()

        def _timeout():
            try:
                p.wait(timeout=timeout_sec)
            except subprocess.TimeoutExpired:
                parent = psutil.Process(p.pid)
                for child in parent.children(recursive=True):
                    child.terminate()
                parent.terminate()

        with ThreadPoolExecutor(3) as pool:
            q_stdout, q_stderr = queue.Queue(), queue.Queue()

            if timeout_sec is not None:
                pool.submit(_timeout)
            pool.submit(_enqueue_output, p.stdout, q_stdout)
            pool.submit(_enqueue_output, p.stderr, q_stderr)

            while p.poll() is None or not q_stdout.empty() or not q_stderr.empty():
                out_line = err_line = ''

                try:
                    out_line = q_stdout.get_nowait()
                except queue.Empty:
                    pass

                try:
                    err_line = q_stderr.get_nowait()
                except queue.Empty:
                    pass

                yield (out_line, err_line)
    

    def run(self, cmd: str | List[str], timeout: float|None = None) -> Tuple[str|List[str], str|List[str], int]:
        with subprocess.Popen(
            cmd, shell=self.shell_exec, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
        ) as p:
            if self.print_cmd:
                if self.print_out:
                    print(f'+ {cmd}', file=sys.stderr, flush=True)
                if self.print_file:
                    print(f'+ {cmd}', file=self.print_file, flush=True)
            out: List[str] = []
            err: List[str] = []
            for out_line, err_line in self._read_popen_pipes(p, timeout):
                out.append(out_line)
                err.append(err_line)
                if self.print_out:
                    print(out_line, end='', flush=True)
                    print(err_line, end='', file=sys.stderr, flush=True)
                if self.print_file:
                    print(out_line, end='', flush=True, file=self.print_file)
                    print(err_line, end='', flush=True, file=self.print_file)
            # end for
            if self.return_list:
                return out, err, p.returncode
            else:
                return ''.join(out), ''.join(err), p.returncode


if __name__ == '__main__':
    Shell().run('''echo '#!/bin/bash

for i in {1..10}
do
    echo "Sleep $i to stdout" >> /dev/stdout
    echo "Sleep $i to stderr" >> /dev/stderr
    sleep 1
done
' > sleep.sh && chmod +x sleep.sh''')

    out, err, code = Shell().run('./sleep.sh', timeout=2)
    print(f'{out = }')
    print(f'{err = }')
    print(f'{code = }')

    Shell().run('rm sleep.sh')

解决方案 21:

stdout通过在运行时进行迭代可以处理命令的实时输出流subprocess.Popen

此实现:

  • 使用 with 语句关闭标准文件描述符,并等待进程

  • 将关键字参数传播到子进程构造函数

  • 默认text=True自动将字节串解码为字符串

  • CalledProcessError如果check=True失败subprocess.run则引发

  • 成功CompletedProcess后返回subprocess.run

  • 使用两个线程同时处理 stdout 和 stderr(有关将 stdout 重定向到 stderr 而不使用线程的版本,请参阅我的简化答案)

import logging
from collections import deque
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from subprocess import PIPE, CalledProcessError, CompletedProcess, Popen


def stream_command(
    args,
    *,
    stdout_handler=logging.info,
    stderr_handler=logging.error,
    check=True,
    text=True,
    stdout=PIPE,
    stderr=PIPE,
    **kwargs,
):
    """Mimic subprocess.run, while processing the command output in real time."""
    with (
        Popen(args, text=text, stdout=stdout, stderr=stderr, **kwargs) as process,
        ThreadPoolExecutor(2) as pool,  # two threads to handle the (live) streams separately
    ):
        exhaust = partial(deque, maxlen=0)  # collections recipe: exhaust an iterable at C-speed
        exhaust_async = partial(pool.submit, exhaust)  # exhaust non-blocking in a background thread
        exhaust_async(stdout_handler(line[:-1]) for line in process.stdout)
        exhaust_async(stderr_handler(line[:-1]) for line in process.stderr)
    retcode = process.poll()  # block until both iterables are exhausted (process finished)
    if check and retcode:
        raise CalledProcessError(retcode, process.args)
    return CompletedProcess(process.args, retcode)

然后,记录到文件就变得像设置一样简单logging

logging.basicConfig(
    level=logging.INFO,
    filename="./capture.log",
    filemode="w",
    encoding="utf-8",
)
logging.info("test from python")
stream_command(["echo", "test from subprocess"])

使用生成的文件:

$ cat ./capture.log
INFO:root:test from python
INFO:root:test from subprocess

可以根据偏好调整行为(print而不是logging.info,或两者等):

stream_command(["echo", "test"])
# INFO:root:test
stream_command("cat ./nonexist", shell=True, check=False)
# ERROR:root:cat: ./nonexist: No such file or directory
stream_command(["echo", "test"], stdout_handler=print)
# test
stdout_lines = []
def handler(line):
    print(line)
    logging.info(line)
    stdout_lines.append(line)
stream_command(["echo", "test"], stdout_handler=handler)
# test
# INFO:root:test
print(stdout_lines)
# ['test']

解决方案 22:

在我看来,“子进程命令的实时输出”意味着 stdout 和 stderr 都应该是实时的。并且 stdin 也应该传送到子进程。

以下片段在 stdout 和 stderr 上产生实时输出,并将它们捕获为 output.{stdout,stderr} 中的字节。

这个技巧涉及正确使用 select 和 poll。

对我来说,在 Python 3.9 上运行良好。


        if self.log == 1:
            print(f"** cmnd= {fullCmndStr}")

        self.outcome.stdcmnd = fullCmndStr
        try:
            process = subprocess.Popen(
                fullCmndStr,
                shell=True,
                encoding='utf8',
                executable="/bin/bash",
                stdin=subprocess.PIPE,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )
        except OSError:
            self.outcome.error = OSError
        else:
            process.stdin.write(stdin)
            process.stdin.close() # type: ignore

        stdoutStrFile = io.StringIO("")
        stderrStrFile = io.StringIO("")

        pollStdout = select.poll()
        pollStderr = select.poll()

        pollStdout.register(process.stdout, select.POLLIN)
        pollStderr.register(process.stderr, select.POLLIN)

        stdoutEOF = False
        stderrEOF = False

        while True:
            stdoutActivity = pollStdout.poll(0)
            if stdoutActivity:
                c= process.stdout.read(1)
                if c:
                    stdoutStrFile.write(c)
                    if self.log == 1:
                        sys.stdout.write(c)
                else:
                   stdoutEOF = True

            stderrActivity = pollStderr.poll(0)
            if stderrActivity:
                c= process.stderr.read(1)
                if c:
                    stderrStrFile.write(c)
                    if self.log == 1:
                        sys.stderr.write(c)
                else:
                   stderrEOF = True
            if stdoutEOF and stderrEOF:
                break

        if self.log == 1:
            print(f"** cmnd={fullCmndStr}")

        process.wait() # type: ignore

        self.outcome.stdout = stdoutStrFile.getvalue()
        self.outcome.stderr = stderrStrFile.getvalue()
        self.outcome.error = process.returncode # type: ignore

解决方案 23:

我发现在 Python 中以流式方式读取子进程的输出(同时将其捕获到变量中)(对于多个输出流,即和stdout)的唯一方法stderr是向子进程传递一个名为临时文件的写入,然后在单独的读取句柄中打开同一个临时文件。

注意:这是针对 Python 3 的

    stdout_write = tempfile.NamedTemporaryFile()
    stdout_read = io.open(stdout_write.name, "r")
    stderr_write = tempfile.NamedTemporaryFile()
    stderr_read = io.open(stderr_write.name, "r")

    stdout_captured = ""
    stderr_captured = ""

    proc = subprocess.Popen(["command"], stdout=stdout_write, stderr=stderr_write)
    while True:
        proc_done: bool = cli_process.poll() is not None

        while True:
            content = stdout_read.read(1024)
            sys.stdout.write(content)
            stdout_captured += content
            if len(content) < 1024:
                break

        while True:
            content = stderr_read.read(1024)
            sys.stderr.write(content)
            stdout_captured += content
            if len(content) < 1024:
                break

        if proc_done:
            break

        time.sleep(0.1)

    stdout_write.close()
    stdout_read.close()
    stderr_write.close()
    stderr_read.close()

但是,如果您不需要捕获输出,那么您可以简单地将sys.stdoutPythonsys.stderr脚本中的流传递到被调用的子进程,正如 xaav在他的回答中所建议的那样:

subprocess.Popen(["command"], stdout=sys.stdout, stderr=sys.stderr)

解决方案 24:

这是一篇旧帖子,但在Python 3中——在Python 3.11中测试——以下代码对我有用,可以使用模块流式传输实时或“实时”输出subprocess

import sys
from os import fdopen
from subprocess import Popen, PIPE, STDOUT


with Popen(command,
           shell=True,
           stdout=PIPE,
           stderr=STDOUT) as sp:

    with fdopen(sys.stdout.fileno(), 'wb', closefd=False) as stdout:

        for line in sp.stdout:
            stdout.write(line)
            stdout.flush()

便利功能

因为它是惯用的,所以我通常会创建一个便利函数run来链接终端中的命令列表并实时流式传输输出。

请注意,我&&在这里使用分隔符,但您可以轻松地使用另一个,例如,;如果您不想因错误而提前失败,甚至&改为使用。

import sys
from os import fdopen
from subprocess import Popen, PIPE, STDOUT

def run(cmds, join='&&'):
    with Popen(join.join(cmds),
               shell=True,
               stdout=PIPE,
               stderr=STDOUT) as sp:
        with fdopen(sys.stdout.fileno(), 'wb', closefd=False) as stdout:
            for line in sp.stdout:
                stdout.write(line)
                stdout.flush()

使用方式如下:

commands = [
    'echo hello',
    'sleep 3',
    'echo world',
    'sleep 2',
    'echo !',
]
run(commands)

解决方案 25:

它们对我都不起作用。Python3.11 / Python3.12 Linux/Windows11

如果命令已完成,我首先获取 stdout。对于 Linux,我发现:

p=pexpect.spawn(cmd) / p.read_nonblocking()

但不幸的是,这不适用于 Windows。我仍在寻找适用于 Windows 的解决方案。

解决方案 26:

根本不使用任何管道:

ret_val = subprocess.Popen(run_command, stderr=subprocess.PIPE, bufsize = 1, universal_newlines = True, shell=True)

查看文档:

stdin、stdout 和 stderr 分别指定执行程序的标准输入、标准输出和标准错误文件句柄。有效值为 None、PIPE、DEVNULL、现有文件描述符(正整数)和具有有效文件描述符的现有文件对象。默认设置为 None,则不会发生重定向

https://docs.python.org/3/library/subprocess.html#subprocess.PIPE

解决方案 27:

蟒蛇3:

def init_realtime_console():
    try:
        command = f'cmd /c "<YOUR_COMMAND>"'

        # Run in the current working directory
        process = subprocess.Popen(
            f'{command}',
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
            text=True,
            encoding="utf-8",
            cwd=os.getcwd()
        )

        # Reading output in real-time
        try:
            while True:
                stdout_line = process.stdout.readline()

                # Print lines if available
                if stdout_line:
                    print(stdout_line, end='')

                # Break when the process ends and buffers are empty
                if not stdout_line and process.poll() is not None:
                    break
        except KeyboardInterrupt:
            process.terminate()
            raise

        # Wait for the process to finish
        process.stdout.close()
        process.wait()

        # Check if the command was successful
        if process.returncode == 0:
            print("Successfully!")
        else:
            print("Failed")
            
    except Exception as e:
        print(f"An unexpected error occurred: {e}")

解决方案 28:

对我有用(Python 3.12):

import subprocess
from time import sleep

def run_shell(command):

    def printShellOut(process):
        out = process.stdout.readline()
        if out:
            print(out.decode(), flush = True, end='')
            return True
    
    def printShellErr(process):
        err = process.stderr.readline()
        if err:
            print('Error: ' + err.decode(), flush = True, end='')
            return True
    
    def printShell(process):
        while printShellOut(process):
            pass
        
        while printShellErr(process):
            pass
            
    
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    while process.poll() is None:
        printShell(process)
        sleep(0.5)
    
    printShell(process)
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2579  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1553  
  IPD(Integrated Product Development)流程作为一种先进的产品开发管理模式,在众多企业中得到了广泛应用。其中,技术评审与决策评审是IPD流程中至关重要的环节,它们既有明显的区别,又存在紧密的协同关系。深入理解这两者的区别与协同,对于企业有效实施IPD流程,提升产品开发效率与质量具有重要意义...
IPD管理流程   27  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、ClickUp、Freshdesk、GanttPRO、Planview、Smartsheet、Asana、Nifty、HubPlanner、Teamwork。在当今快速变化的商业环境中,项目管理软件已成为企业提升效率、优化资源分配和确保项目按时交付的关键工具。然而...
项目管理系统   22  
  建设工程项目质量关乎社会公众的生命财产安全,也影响着企业的声誉和可持续发展。高质量的建设工程不仅能为使用者提供舒适、安全的环境,还能提升城市形象,推动经济的健康发展。在实际的项目操作中,诸多因素会对工程质量产生影响,从规划设计到施工建设,再到后期的验收维护,每一个环节都至关重要。因此,探寻并运用有效的方法来提升建设工程...
工程项目管理制度   19  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用