如何使用 subprocess.Popen 通过管道连接多个进程?
- 2024-11-29 08:41:00
- admin 原创
- 197
问题描述:
如何使用 Python 模块执行以下 shell 命令subprocess
?
echo "input data" | awk -f script.awk | sort > outfile.txt
输入数据将来自字符串,因此我实际上不需要echo
。我已经走到这一步了,有人能解释一下我如何让它通过管道传输sort
吗?
p_awk = subprocess.Popen(["awk","-f","script.awk"],
stdin=subprocess.PIPE,
stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )
更新:请注意,虽然下面接受的答案实际上并没有回答所问的问题,但我相信 S.Lott 是正确的,最好首先避免解决这个问题!
解决方案 1:
看到下面这些,你可能会更开心一些。
import subprocess
awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data
" )
将部分工作委托给 shell。让它用管道连接两个进程。
您会更乐意将“script.awk”重写为 Python,从而消除 awk 和管道。
编辑。 一些认为 awk 没有帮助的原因。
[通过评论来回应的理由太多了。]
Awk 正在添加一个没有重要价值的步骤。awk 的处理没有什么 Python 无法处理的独特之处。
对于大型数据集,从 awk 到 sort 的流水线化可能会缩短处理时间。对于较短的数据集,它没有显著的好处。快速测量
awk >file ; sort file
和awk | sort
揭示并发性会有所帮助。对于 sort,它几乎没有帮助,因为 sort 不是一次性过滤器。“Python 进行排序”处理(而不是“Python 进行排序”)的简单性避免了在这里提出确切类型的问题。
Python——虽然比 awk 更冗长——但也更明确,而 awk 有一些隐式规则,这些规则对于新手来说不透明,并且会让非专业人士感到困惑。
Awk(就像 shell 脚本本身一样)增加了另一种编程语言。如果所有这些都可以用一种语言(Python)完成,那么消除 shell 和 awk 编程就消除了两种编程语言,使人们能够专注于任务中产生价值的部分。
底线:awk 无法增加显著的价值。在这种情况下,awk 是一种净成本;它增加了足够的复杂性,因此有必要提出这个问题。删除 awk 将是一种净收益。
侧边栏为什么建立管道(a | b
)如此困难。
当 shell 遇到这种情况时,a | b
它必须执行以下操作。
从原有的 shell 中派生出一个子进程。这最终会变成 b。
构建一个 os 管道。(不是 Python subprocess.PIPE)但调用会
os.pipe()
返回两个通过公共缓冲区连接的新文件描述符。此时,进程拥有来自其父进程的 stdin、stdout、stderr,以及一个将成为“a 的 stdout”和“b 的 stdin”的文件。派生一个子进程。子进程用新 a 的 stdout 替换其 stdout。执行该
a
进程。b 子进程关闭,用新 b 的 stdin 替换其 stdin。执行该
b
进程。b 子进程等待 a 完成。
父进程正在等待 b 完成。
我认为上述内容可以递归使用来生成a | b | c
,但您必须隐式地将长管道括起来,将它们视为a | (b | c)
。
由于 Python 有os.pipe()
、os.exec()
和os.fork()
,并且您可以替换sys.stdin
和sys.stdout
,因此有一种方法可以在纯 Python 中完成上述操作。事实上,您可以使用os.pipe()
和找到一些快捷方式subprocess.Popen
。
但是,将该操作委托给 shell 更加容易。
解决方案 2:
import subprocess
some_string = b'input_data'
sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in,
stdin=subprocess.PIPE).communicate(some_string)
解决方案 3:
要模拟 shell 管道:
from subprocess import check_call
check_call('echo "input data" | a | b > outfile.txt', shell=True)
无需调用 shell(参见17.1.4.2. 替换 shell 管道):
#!/usr/bin/env python
from subprocess import Popen, PIPE
a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
with a.stdout, open("outfile.txt", "wb") as outfile:
b = Popen(["b"], stdin=a.stdout, stdout=outfile)
a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already
plumbum
提供了一些语法糖:
#!/usr/bin/env python
from plumbum.cmd import a, b # magic
(a << "input data" | b > "outfile.txt")()
类似物:
#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt
是:
#!/usr/bin/env python
from plumbum.cmd import awk, sort
(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()
解决方案 4:
接受的答案回避了实际问题。这里有一个链接多个进程输出的代码片段:请注意,它还会打印(某种程度上)等效的 shell 命令,以便您可以运行它并确保输出是正确的。
#!/usr/bin/env python3
from subprocess import Popen, PIPE
# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : {' '.join(cmd1)} | {' '.join(cmd2)} | {' '.join(cmd3)}")
p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)
print("Output from last process : " + (p3.communicate()[0]).decode())
# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)
解决方案 5:
http://www.python.org/doc/2.5.2/lib/node535.html对此进行了很好的介绍。其中还有你不明白的部分吗?
您的程序会非常相似,但第二个程序Popen
会将 stdout= 指向一个文件,并且您不需要它的输出.communicate()
。
解决方案 6:
受到@Cristian 回答的启发。我遇到了同样的问题,但使用了不同的命令。所以我放上我测试过的例子,我相信这可能会有所帮助:
grep_proc = subprocess.Popen(["grep", "rabbitmq"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()
这已经经过测试。
已采取的措施
声明使用管道中的 stdin 进行惰性执行。当管道被 的 stdout 填充时,
grep
此命令将在命令执行时执行。ps
`ps`调用主命令
ps
,并将 stdout 定向到该命令所使用的管道grep
。Grep 进行通信以从管道获取标准输出。
我喜欢这种方式,因为它是自然的管道概念,用subprocess
接口轻轻包裹。
解决方案 7:
前面的答案遗漏了一个要点。正如 geocar 指出的那样,替换 shell 管道基本上是正确的。在管道的最后一个元素上运行几乎就足够了。communicate
剩下的问题是将输入数据传递到管道。对于多个子进程,communicate(input_data)
最后一个元素上的简单操作不起作用 - 它会永远挂起。您需要像这样手动创建一个管道和一个子管道:
import os
import subprocess
input = """\ninput data
more input
""" * 10
rd, wr = os.pipe()
if os.fork() != 0: # parent
os.close(wr)
else: # child
os.close(rd)
os.write(wr, input)
os.close(wr)
exit()
p_awk = subprocess.Popen(["awk", "{ print $2; }"],
stdin=rd,
stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"],
stdin=p_awk.stdout,
stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())
现在子进程通过管道提供输入,父进程调用communication(),这将按预期工作。使用这种方法,您可以创建任意长的管道,而无需“将部分工作委托给shell”。不幸的是,子进程文档没有提到这一点。
有一些方法可以不使用管道来实现相同的效果:
from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)
现在使用stdin=tf
。p_awk
这取决于您的口味。
上面的代码仍然不是 100% 等同于 bash 管道,因为信号处理不同。如果添加另一个截断 输出的管道元素sort
,例如,则可以看到这一点head -n 10
。使用上面的代码,sort
将向 打印“Broken pipe”错误消息stderr
。在 shell 中运行相同的管道时,您将看不到此消息。(但这是唯一的区别, 的结果stdout
是相同的)。原因似乎是 python 的Popen
设置SIG_IGN
为SIGPIPE
,而 shell 将其保留在SIG_DFL
,并且sort
的信号处理在这两种情况下是不同的。
解决方案 8:
编辑: pipes
在 Windows 上可用,但关键的是,似乎实际上无法在 Windows 上运行。请参阅下面的评论。
Python 标准库现在包含pipes
处理此问题所需的模块:
https://docs.python.org/2/library/pipes.html,https://docs.python.org/3.4/library/pipes.html
我不确定这个模块已经存在多久了,但这种方法似乎比乱搞要简单得多subprocess
。
解决方案 9:
对我来说,下面的方法是最干净、最容易阅读的
from subprocess import Popen, PIPE
def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
with open(output_filename, 'wb') as out_f:
p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
p1.communicate(input=bytes(input_s))
p1.wait()
p2.stdin.close()
p2.wait()
可以这样调用:
string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')
扫码咨询,免费领取项目管理大礼包!