如何使用 subprocess.Popen 通过管道连接多个进程?

2024-11-29 08:41:00
admin
原创
196
摘要:问题描述:如何使用 Python 模块执行以下 shell 命令subprocess?echo "input data" | awk -f script.awk | sort > outfile.txt 输入数据将来自字符串,因此我实际上不需要echo。我已经走到这一步了,有人能解释...

问题描述:

如何使用 Python 模块执行以下 shell 命令subprocess

echo "input data" | awk -f script.awk | sort > outfile.txt

输入数据将来自字符串,因此我实际上不需要echo。我已经走到这一步了,有人能解释一下我如何让它通过管道传输sort吗?

p_awk = subprocess.Popen(["awk","-f","script.awk"],
                          stdin=subprocess.PIPE,
                          stdout=file("outfile.txt", "w"))
p_awk.communicate( "input data" )

更新:请注意,虽然下面接受的答案实际上并没有回答所问的问题,但我相信 S.Lott 是正确的,最好首先避免解决这个问题!


解决方案 1:

看到下面这些,你可能会更开心一些。

import subprocess

awk_sort = subprocess.Popen( "awk -f script.awk | sort > outfile.txt",
    stdin=subprocess.PIPE, shell=True )
awk_sort.communicate( b"input data
" )

将部分工作委托给 shell。让它用管道连接两个进程。

您会更乐意将“script.awk”重写为 Python,从而消除 awk 和管道。

编辑。 一些认为 awk 没有帮助的原因。

[通过评论来回应的理由太多了。]

  1. Awk 正在添加一个没有重要价值的步骤。awk 的处理没有什么 Python 无法处理的独特之处。

  2. 对于大型数据集,从 awk 到 sort 的流水线化可能会缩短处理时间。对于较短的数据集,它没有显著的好处。快速测量awk >file ; sort fileawk | sort揭示并发性会有所帮助。对于 sort,它几乎没有帮助,因为 sort 不是一次性过滤器。

  3. “Python 进行排序”处理(而不是“Python 进行排序”)的简单性避免了在这里提出确切类型的问题。

  4. Python——虽然比 awk 更冗长——但也更明确,而 awk 有一些隐式规则,这些规则对于新手来说不透明,并且会让非专业人士感到困惑。

  5. Awk(就像 shell 脚本本身一样)增加了另一种编程语言。如果所有这些都可以用一种语言(Python)完成,那么消除 shell 和 awk 编程就消除了两种编程语言,使人们能够专注于任务中产生价值的部分。

底线:awk 无法增加显著的价值。在这种情况下,awk 是一种净成本;它增加了足够的复杂性,因此有必要提出这个问题。删除 awk 将是一种净收益。

侧边栏为什么建立管道(a | b)如此困难。

当 shell 遇到这种情况时,a | b它必须执行以下操作。

  1. 从原有的 shell 中派生出一个子进程。这最终会变成 b。

  2. 构建一个 os 管道。(不是 Python subprocess.PIPE)但调用会os.pipe()返回两个通过公共缓冲区连接的新文件描述符。此时,进程拥有来自其父进程的 stdin、stdout、stderr,以及一个将成为“a 的 stdout”和“b 的 stdin”的文件。

  3. 派生一个子进程。子进程用新 a 的 stdout 替换其 stdout。执行该a进程。

  4. b 子进程关闭,用新 b 的 stdin 替换其 stdin。执行该b进程。

  5. b 子进程等待 a 完成。

  6. 父进程正在等待 b 完成。

我认为上述内容可以递归使用来生成a | b | c,但您必须隐式地将长管道括起来,将它们视为a | (b | c)

由于 Python 有os.pipe()os.exec()os.fork(),并且您可以替换sys.stdinsys.stdout,因此有一种方法可以在纯 Python 中完成上述操作。事实上,您可以使用os.pipe()和找到一些快捷方式subprocess.Popen

但是,将该操作委托给 shell 更加容易。

解决方案 2:

import subprocess

some_string = b'input_data'

sort_out = open('outfile.txt', 'wb', 0)
sort_in = subprocess.Popen('sort', stdin=subprocess.PIPE, stdout=sort_out).stdin
subprocess.Popen(['awk', '-f', 'script.awk'], stdout=sort_in, 
                 stdin=subprocess.PIPE).communicate(some_string)

解决方案 3:

要模拟 shell 管道:

from subprocess import check_call

check_call('echo "input data" | a | b > outfile.txt', shell=True)

无需调用 shell(参见17.1.4.2. 替换 shell 管道):

#!/usr/bin/env python
from subprocess import Popen, PIPE

a = Popen(["a"], stdin=PIPE, stdout=PIPE)
with a.stdin:
    with a.stdout, open("outfile.txt", "wb") as outfile:
        b = Popen(["b"], stdin=a.stdout, stdout=outfile)
    a.stdin.write(b"input data")
statuses = [a.wait(), b.wait()] # both a.stdin/stdout are closed already

plumbum提供了一些语法糖:

#!/usr/bin/env python
from plumbum.cmd import a, b # magic

(a << "input data" | b > "outfile.txt")()

类似物:

#!/bin/sh
echo "input data" | awk -f script.awk | sort > outfile.txt

是:

#!/usr/bin/env python
from plumbum.cmd import awk, sort

(awk["-f", "script.awk"] << "input data" | sort > "outfile.txt")()

解决方案 4:

接受的答案回避了实际问题。这里有一个链接多个进程输出的代码片段:请注意,它还会打印(某种程度上)等效的 shell 命令,以便您可以运行它并确保输出是正确的。

#!/usr/bin/env python3

from subprocess import Popen, PIPE

# cmd1 : dd if=/dev/zero bs=1m count=100
# cmd2 : gzip
# cmd3 : wc -c
cmd1 = ['dd', 'if=/dev/zero', 'bs=1M', 'count=100']
cmd2 = ['tee']
cmd3 = ['wc', '-c']
print(f"Shell style : {' '.join(cmd1)} | {' '.join(cmd2)} | {' '.join(cmd3)}")

p1 = Popen(cmd1, stdout=PIPE, stderr=PIPE) # stderr=PIPE optional, dd is chatty
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE)
p3 = Popen(cmd3, stdin=p2.stdout, stdout=PIPE)

print("Output from last process : " + (p3.communicate()[0]).decode())

# thoretically p1 and p2 may still be running, this ensures we are collecting their return codes
p1.wait()
p2.wait()
print("p1 return: ", p1.returncode)
print("p2 return: ", p2.returncode)
print("p3 return: ", p3.returncode)

解决方案 5:

http://www.python.org/doc/2.5.2/lib/node535.html对此进行了很好的介绍。其中还有你不明白的部分吗?

您的程序会非常相似,但第二个程序Popen会将 stdout= 指向一个文件,并且您不需要它的输出.communicate()

解决方案 6:

受到@Cristian 回答的启发。我遇到了同样的问题,但使用了不同的命令。所以我放上我测试过的例子,我相信这可能会有所帮助:

grep_proc = subprocess.Popen(["grep", "rabbitmq"],
                             stdin=subprocess.PIPE, 
                             stdout=subprocess.PIPE)
subprocess.Popen(["ps", "aux"], stdout=grep_proc.stdin)
out, err = grep_proc.communicate()

这已经经过测试。

已采取的措施

  • 声明使用管道中的 stdin 进行惰性执行。当管道被 的 stdout 填充时,grep此命令将在命令执行时执行。ps`ps`

  • 调用主命令ps,并将 stdout 定向到该命令所使用的管道grep

  • Grep 进行通信以从管道获取标准输出。

我喜欢这种方式,因为它是自然的管道概念,用subprocess接口轻轻包裹。

解决方案 7:

前面的答案遗漏了一个要点。正如 geocar 指出的那样,替换 shell 管道基本上是正确的。在管道的最后一个元素上运行几乎就足够了。communicate

剩下的问题是将输入数据传递到管道。对于多个子进程,communicate(input_data)最后一个元素上的简单操作不起作用 - 它会永远挂起。您需要像这样手动创建一个管道和一个子管道:

import os
import subprocess

input = """\ninput data
more input
""" * 10

rd, wr = os.pipe()
if os.fork() != 0: # parent
    os.close(wr)
else:              # child
    os.close(rd)
    os.write(wr, input)
    os.close(wr)
    exit()

p_awk = subprocess.Popen(["awk", "{ print $2; }"],
                         stdin=rd,
                         stdout=subprocess.PIPE)
p_sort = subprocess.Popen(["sort"], 
                          stdin=p_awk.stdout,
                          stdout=subprocess.PIPE)
p_awk.stdout.close()
out, err = p_sort.communicate()
print (out.rstrip())

现在子进程通过管道提供输入,父进程调用communication(),这将按预期工作。使用这种方法,您可以创建任意长的管道,而无需“将部分工作委托给shell”。不幸的是,子进程文档没有提到这一点。

有一些方法可以不使用管道来实现相同的效果:

from tempfile import TemporaryFile
tf = TemporaryFile()
tf.write(input)
tf.seek(0, 0)

现在使用stdin=tfp_awk这取决于您的口味。

上面的代码仍然不是 100% 等同于 bash 管道,因为信号处理不同。如果添加另一个截断 输出的管道元素sort,例如,则可以看到这一点head -n 10。使用上面的代码,sort将向 打印“Broken pipe”错误消息stderr。在 shell 中运行相同的管道时,您将看不到此消息。(但这是唯一的区别, 的结果stdout是相同的)。原因似乎是 python 的Popen设置SIG_IGNSIGPIPE,而 shell 将其保留在SIG_DFL,并且sort的信号处理在这两种情况下是不同的。

解决方案 8:

编辑: pipes在 Windows 上可用,但关键的是,似乎实际上无法在 Windows 上运行。请参阅下面的评论。

Python 标准库现在包含pipes处理此问题所需的模块:

https://docs.python.org/2/library/pipes.htmlhttps://docs.python.org/3.4/library/pipes.html​​

我不确定这个模块已经存在多久了,但这种方法似乎比乱搞要简单得多subprocess

解决方案 9:

对我来说,下面的方法是最干净、最容易阅读的

from subprocess import Popen, PIPE

def string_to_2_procs_to_file(input_s, first_cmd, second_cmd, output_filename):
    with open(output_filename, 'wb') as out_f:
        p2 = Popen(second_cmd, stdin=PIPE, stdout=out_f)
        p1 = Popen(first_cmd, stdout=p2.stdin, stdin=PIPE)
        p1.communicate(input=bytes(input_s))
        p1.wait()
        p2.stdin.close()
        p2.wait()

可以这样调用:

string_to_2_procs_to_file('input data', ['awk', '-f', 'script.awk'], ['sort'], 'output.txt')
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   3998  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   2749  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Freshdesk、ClickUp、nTask、Hubstaff、Plutio、Productive、Targa、Bonsai、Wrike。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在项目管理过程中面临着诸多痛点,如任务分配不...
项目管理系统   85  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Monday、TeamGantt、Filestage、Chanty、Visor、Smartsheet、Productive、Quire、Planview。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多项目经理和团队在管理复杂项目时,常...
开源项目管理工具   96  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Smartsheet、GanttPRO、Backlog、Visor、ResourceGuru、Productive、Xebrio、Hive、Quire。在当今快节奏的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在选择项目管理工具时常常面临困惑:...
项目管理系统   83  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用