运行命令并像在终端中一样近乎实时地分别获取其 stdout 和 stderr
- 2025-03-05 09:16:00
- admin 原创
- 63
问题描述:
我正在尝试在 Python 中找到一种运行其他程序的方法:
正在运行的程序的 stdout 和 stderr 可以分别记录。
可以近乎实时地查看正在运行的程序的 stdout 和 stderr,这样如果子进程挂起,用户就可以看到。(即我们不等待执行完成后才将 stdout/stderr 打印给用户)
附加标准:正在运行的程序不知道它是通过 python 运行的,因此不会做意外的事情(例如将其输出分块而不是实时打印,或者因为它需要终端来查看其输出而退出)。我认为这个小标准几乎意味着我们需要使用 pty。
这是我目前得到的...方法 1:
def method1(command):
## subprocess.communicate() will give us the stdout and stderr sepurately,
## but we will have to wait until the end of command execution to print anything.
## This means if the child process hangs, we will never know....
proc=subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable='/bin/bash')
stdout, stderr = proc.communicate() # record both, but no way to print stdout/stderr in real-time
print ' ######### REAL-TIME ######### '
######## Not Possible
print ' ########## RESULTS ########## '
print 'STDOUT:'
print stdout
print 'STDOUT:'
print stderr
方法 2
def method2(command):
## Using pexpect to run our command in a pty, we can see the child's stdout in real-time,
## however we cannot see the stderr from "curl google.com", presumably because it is not connected to a pty?
## Furthermore, I do not know how to log it beyond writing out to a file (p.logfile). I need the stdout and stderr
## as strings, not files on disk! On the upside, pexpect would give alot of extra functionality (if it worked!)
proc = pexpect.spawn('/bin/bash', ['-c', command])
print ' ######### REAL-TIME ######### '
proc.interact()
print ' ########## RESULTS ########## '
######## Not Possible
方法 3:
def method3(command):
## This method is very much like method1, and would work exactly as desired
## if only proc.xxx.read(1) wouldn't block waiting for something. Which it does. So this is useless.
proc=subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable='/bin/bash')
print ' ######### REAL-TIME ######### '
out,err,outbuf,errbuf = '','','',''
firstToSpeak = None
while proc.poll() == None:
stdout = proc.stdout.read(1) # blocks
stderr = proc.stderr.read(1) # also blocks
if firstToSpeak == None:
if stdout != '': firstToSpeak = 'stdout'; outbuf,errbuf = stdout,stderr
elif stderr != '': firstToSpeak = 'stderr'; outbuf,errbuf = stdout,stderr
else:
if (stdout != '') or (stderr != ''): outbuf += stdout; errbuf += stderr
else:
out += outbuf; err += errbuf;
if firstToSpeak == 'stdout': sys.stdout.write(outbuf+errbuf);sys.stdout.flush()
else: sys.stdout.write(errbuf+outbuf);sys.stdout.flush()
firstToSpeak = None
print ''
print ' ########## RESULTS ########## '
print 'STDOUT:'
print out
print 'STDERR:'
print err
要尝试这些方法,您需要import sys,subprocess,pexpect
pexpect 是纯 Python 的,可以使用
sudo pip 安装 pexpect
我认为解决方案将涉及 python 的 pty 模块 - 这有点像黑魔法,我找不到任何知道如何使用的人。也许 SO 知道 :) 提醒一下,我建议您使用“curl www.google.com”作为测试命令,因为它出于某种原因将其状态打印在 stderr 上 :D
更新 1:
好的,所以 pty 库不适合人类使用。文档本质上是源代码。任何提出的阻塞和非异步解决方案在这里都不起作用。Padraic Cunningham 的 Threads/Queue 方法效果很好,尽管无法添加 pty 支持 - 而且它很“肮脏”(引用 Freenode 的 #python)。似乎唯一适合生产标准代码的解决方案是使用 Twisted 框架,它甚至支持 pty 作为布尔开关来运行进程,就像它们是从 shell 调用的一样。但将 Twisted 添加到项目中需要完全重写所有代码。这真是太糟糕了 :/
更新2:
提供了两个答案,其中一个解决了前两个标准,并且在您只需要使用 stdout 和 stderr 的情况下效果很好
Threads and Queue
。另一个答案使用select
,一种读取文件描述符的非阻塞方法,以及 pty ,一种“欺骗”生成的进程相信它在真实终端中运行的方法,就像它是直接从 Bash 运行一样 - 但可能有副作用,也可能没有副作用。我希望我可以接受这两个答案,因为“正确”的方法实际上取决于情况以及您首先进行子处理的原因,但遗憾的是,我只能接受一个。
解决方案 1:
正在运行的程序的 stdout 和 stderr 可以分别记录。
您不能使用pexpect
,因为 stdout 和 stderr 都指向同一个方向pty
,并且之后没有办法将它们分开。
可以近乎实时地查看正在运行的程序的 stdout 和 stderr,这样如果子进程挂起,用户就可以看到。(即我们不等待执行完成后才将 stdout/stderr 打印给用户)
如果子进程的输出不是 tty,那么它很可能使用块缓冲,因此如果它不产生太多输出,那么它就不会是“实时”的,例如,如果缓冲区是 4K,那么父 Python 进程将看不到任何内容,直到子进程打印 4K 个字符并且缓冲区溢出或它被明确刷新(在子进程内部)。此缓冲区位于子进程内部,没有标准方法从外部管理它。以下图片显示了 stdio 缓冲区和command 1 | command2
shell 管道的管道缓冲区:
正在运行的程序不知道它是通过 python 运行的,因此不会做意外的事情(比如分块其输出而不是实时打印,或者因为它需要终端来查看其输出而退出)。
看来,您的意思恰恰相反,即如果输出重定向到管道(stdout=PIPE
在 Python 中使用时),您的子进程很可能会将其输出分块,而不是尽快刷新每个输出行。这意味着默认线程或asyncio 解决方案不会像您那样工作。
有几种方法可以解决这个问题:
该命令可以接受命令行参数(例如
grep --line-buffered
或python -u
)以禁用块缓冲。stdbuf
适用于某些程序,即,您可以使用上面的线程或 asyncio 解决方案运行['stdbuf', '-oL', '-eL'] + command
,并且您应该分别获得 stdout、stderr,并且行应该近乎实时地出现:
#!/usr/bin/env python3
import os
import sys
from select import select
from subprocess import Popen, PIPE
with Popen(['stdbuf', '-oL', '-e0', 'curl', 'www.google.com'],
stdout=PIPE, stderr=PIPE) as p:
readable = {
p.stdout.fileno(): sys.stdout.buffer, # log separately
p.stderr.fileno(): sys.stderr.buffer,
}
while readable:
for fd in select(readable, [], [])[0]:
data = os.read(fd, 1024) # read available
if not data: # EOF
del readable[fd]
else:
readable[fd].write(data)
readable[fd].flush()
最后,你可以尝试使用两个s 的
pty
+解决方案:select
`pty`
#!/usr/bin/env python3
import errno
import os
import pty
import sys
from select import select
from subprocess import Popen
masters, slaves = zip(pty.openpty(), pty.openpty())
with Popen([sys.executable, '-c', r'''import sys, time
print('stdout', 1) # no explicit flush
time.sleep(.5)
print('stderr', 2, file=sys.stderr)
time.sleep(.5)
print('stdout', 3)
time.sleep(.5)
print('stderr', 4, file=sys.stderr)
'''],
stdin=slaves[0], stdout=slaves[0], stderr=slaves[1]):
for fd in slaves:
os.close(fd) # no input
readable = {
masters[0]: sys.stdout.buffer, # log separately
masters[1]: sys.stderr.buffer,
}
while readable:
for fd in select(readable, [], [])[0]:
try:
data = os.read(fd, 1024) # read available
except OSError as e:
if e.errno != errno.EIO:
raise #XXX cleanup
del readable[fd] # EIO means EOF on some systems
else:
if not data: # EOF
del readable[fd]
else:
readable[fd].write(data)
readable[fd].flush()
for fd in masters:
os.close(fd)
我不知道对 stdout、stderr 使用不同的 s 有什么副作用pty
。您可以尝试单个 pty 是否足够,例如,设置stderr=PIPE
并使用p.stderr.fileno()
而不是masters[1]
。源代码中的评论sh
表明,如果stderr not in {STDOUT, pipe}
解决方案 2:
如果您想从 stderr 和 stdout 读取并分别获取输出,则可以使用带有队列的线程,虽然没有经过过度测试,但类似于以下内容:
import threading
import queue
def run(fd, q):
for line in iter(fd.readline, ''):
q.put(line)
q.put(None)
def create(fd):
q = queue.Queue()
t = threading.Thread(target=run, args=(fd, q))
t.daemon = True
t.start()
return q, t
process = Popen(["curl","www.google.com"], stdout=PIPE, stderr=PIPE,
universal_newlines=True)
std_q, std_out = create(process.stdout)
err_q, err_read = create(process.stderr)
while std_out.is_alive() or err_read.is_alive():
for line in iter(std_q.get, None):
print(line)
for line in iter(err_q.get, None):
print(line)
解决方案 3:
虽然 JF Sebastian 的答案确实解决了问题的核心,但我运行的是 python 2.7(这不在原始标准中),所以我只是把它抛给其他只想剪切/粘贴一些代码的疲惫旅行者。我还没有彻底测试过,但在我尝试过的所有命令上,它似乎都运行良好 :) 您可能需要将 .decode('ascii') 更改为 .decode('utf-8') - 我仍在测试这一点。
#!/usr/bin/env python2.7
import errno
import os
import pty
import sys
from select import select
import subprocess
stdout = ''
stderr = ''
command = 'curl google.com ; sleep 5 ; echo "hey"'
masters, slaves = zip(pty.openpty(), pty.openpty())
p = subprocess.Popen(command, stdin=slaves[0], stdout=slaves[0], stderr=slaves[1], shell=True, executable='/bin/bash')
for fd in slaves: os.close(fd)
readable = { masters[0]: sys.stdout, masters[1]: sys.stderr }
try:
print ' ######### REAL-TIME ######### '
while readable:
for fd in select(readable, [], [])[0]:
try: data = os.read(fd, 1024)
except OSError as e:
if e.errno != errno.EIO: raise
del readable[fd]
finally:
if not data: del readable[fd]
else:
if fd == masters[0]: stdout += data.decode('ascii')
else: stderr += data.decode('ascii')
readable[fd].write(data)
readable[fd].flush()
except:
print "Unexpected error:", sys.exc_info()[0]
raise
finally:
p.wait()
for fd in masters: os.close(fd)
print ''
print ' ########## RESULTS ########## '
print 'STDOUT:'
print stdout
print 'STDERR:'
print stderr
扫码咨询,免费领取项目管理大礼包!