检查 python 脚本是否正在运行
- 2025-03-04 08:24:00
- admin 原创
- 102
问题描述:
我的 Web 应用程序中有一个正在运行的 python 守护程序/我如何快速检查(使用 python)我的守护程序是否正在运行,如果没有,则启动它?
我想通过这种方式来修复守护进程的任何崩溃,这样脚本就不必手动运行,它会在调用时自动运行并保持运行。
我如何(使用 python)检查我的脚本是否正在运行?
解决方案 1:
Linux 系统上很方便的一种技术是使用域套接字:
import socket
import sys
import time
def get_lock(process_name):
# Without holding a reference to our socket somewhere it gets garbage
# collected when the function exits
get_lock._lock_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
# The null byte ( ) means the socket is created
# in the abstract namespace instead of being created
# on the file system itself.
# Works only in Linux
get_lock._lock_socket.bind(' ' + process_name)
print 'I got the lock'
except socket.error:
print 'lock exists'
sys.exit()
get_lock('running_test')
while True:
time.sleep(3)
它是原子的,并且避免了如果你的进程收到 SIGKILL 信号,锁文件就会四处乱放的问题
您可以在文档中读到,socket.close
垃圾收集时套接字会自动关闭。
解决方案 2:
将 pidfile 放在某个地方(例如 /tmp)。然后,您可以通过检查文件中的 PID 是否存在来检查进程是否正在运行。不要忘记在干净关闭时删除该文件,并在启动时检查它。
#/usr/bin/env python
import os
import sys
pid = str(os.getpid())
pidfile = "/tmp/mydaemon.pid"
if os.path.isfile(pidfile):
print "%s already exists, exiting" % pidfile
sys.exit()
file(pidfile, 'w').write(pid)
try:
# Do some actual work here
finally:
os.unlink(pidfile)
然后,您可以通过检查 /tmp/mydaemon.pid 的内容是否为现有进程来检查该进程是否正在运行。Monit(上面提到)可以为您执行此操作,或者您可以编写一个简单的 shell 脚本,使用 ps 的返回代码为您检查。
ps up `cat /tmp/mydaemon.pid ` >/dev/null && echo "Running" || echo "Not running"
为了获得额外的奖励,您可以使用 atexit 模块来确保您的程序在任何情况下(被杀死、出现异常等)清理其 pidfile。
解决方案 3:
pid库可以做到这一点。
from pid import PidFile
with PidFile():
do_something()
它还将自动处理 pidfile 存在但进程未运行的情况。
解决方案 4:
当然,丹的例子不会发挥应有的作用。
确实,如果脚本崩溃、出现异常或者没有清理 pid 文件,则该脚本将会运行多次。
我根据另一个网站提出以下建议:
这是为了检查是否已经存在锁定文件
#/usr/bin/env python
import os
import sys
if os.access(os.path.expanduser("~/.lockfile.vestibular.lock"), os.F_OK):
#if the lockfile is already there then check the PID number
#in the lock file
pidfile = open(os.path.expanduser("~/.lockfile.vestibular.lock"), "r")
pidfile.seek(0)
old_pid = pidfile.readline()
# Now we check the PID from lock file matches to the current
# process PID
if os.path.exists("/proc/%s" % old_pid):
print "You already have an instance of the program running"
print "It is running as process %s," % old_pid
sys.exit(1)
else:
print "File is there but the program is not running"
print "Removing lock file for the: %s as it can be there because of the program last time it was run" % old_pid
os.remove(os.path.expanduser("~/.lockfile.vestibular.lock"))
这是我们将 PID 文件放入锁文件中的代码的一部分
pidfile = open(os.path.expanduser("~/.lockfile.vestibular.lock"), "w")
pidfile.write("%s" % os.getpid())
pidfile.close()
此代码将检查 pid 的值与现有正在运行的进程进行比较,避免双重执行。
我希望这会有所帮助。
解决方案 5:
我的解决方案是检查进程和命令行参数在 windows 和 ubuntu linux 上测试
import psutil
import os
def is_running(script):
for q in psutil.process_iter():
if q.name().startswith('python'):
if len(q.cmdline())>1 and script in q.cmdline()[1] and q.pid !=os.getpid():
print("'{}' Process is already running".format(script))
return True
return False
if not is_running("test.py"):
n = input("What is Your Name? ")
print ("Hello " + n)
解决方案 6:
在 UNIX 上,有很多非常好的软件包可用于重新启动进程。monit 提供了有关如何构建和配置它的出色教程。只需进行一些调整,您就可以拥有一项坚如磐石的经过验证的技术来维护您的守护进程。
解决方案 7:
我自己遇到了这个老问题并寻求解决方案。
使用psutil:
import psutil
import sys
from subprocess import Popen
for process in psutil.process_iter():
if process.cmdline() == ['python', 'your_script.py']:
sys.exit('Process found: exiting.')
print('Process not found: starting it.')
Popen(['python', 'your_script.py'])
解决方案 8:
有很多种选择。一种方法是使用系统调用或为您执行此类调用的 Python 库。另一种方法是简单地生成一个进程,例如:
ps ax | grep processName
并解析输出。许多人选择这种方法,在我看来,这不一定是一种坏方法。
解决方案 9:
依赖于以下项的便携式解决方案multiprocessing.shared_memory
:
import atexit
from multiprocessing import shared_memory
_ensure_single_process_store = {}
def ensure_single_process(name: str):
if name in _ensure_single_process_store:
return
try:
shm = shared_memory.SharedMemory(name='ensure_single_process__' + name,
create=True,
size=1)
except FileExistsError:
print(f"{name} is already running!")
raise
_ensure_single_process_store[name] = shm
atexit.register(shm.unlink)
通常您不必使用atexit
,但有时它有助于在异常退出时进行清理。
解决方案 10:
我非常喜欢使用Supervisor来管理守护进程。它是用 Python 编写的,因此有很多如何与 Python 交互或从 Python 扩展它的示例。对于您的目的,XML-RPC 进程控制 API应该可以很好地工作。
解决方案 11:
尝试其他版本
def checkPidRunning(pid):
'''Check For the existence of a unix pid.
'''
try:
os.kill(pid, 0)
except OSError:
return False
else:
return True
# Entry point
if __name__ == '__main__':
pid = str(os.getpid())
pidfile = os.path.join("/", "tmp", __program__+".pid")
if os.path.isfile(pidfile) and checkPidRunning(int(file(pidfile,'r').readlines()[0])):
print "%s already exists, exiting" % pidfile
sys.exit()
else:
file(pidfile, 'w').write(pid)
# Do some actual work here
main()
os.unlink(pidfile)
解决方案 12:
您不需要开发自己的 PID 文件解决方案(它比您想象的更微妙,也更复杂),而是看一下Supervisord——这是一个过程控制系统,可以轻松地围绕现有 Python 脚本包装作业控制和守护进程行为。
解决方案 13:
其他答案对于 cron 作业之类的事情来说非常棒,但如果您正在运行守护进程,则应该使用daemontools之类的程序来监视它。
解决方案 14:
ps ax | grep processName
如果你的 pycharm 中的调试脚本总是退出
pydevd.py --multiproc --client 127.0.0.1 --port 33882 --file processName
解决方案 15:
试试这个:
#/usr/bin/env python
import os, sys, atexit
try:
# Set PID file
def set_pid_file():
pid = str(os.getpid())
f = open('myCode.pid', 'w')
f.write(pid)
f.close()
def goodby():
pid = str('myCode.pid')
os.remove(pid)
atexit.register(goodby)
set_pid_file()
# Place your code here
except KeyboardInterrupt:
sys.exit(0)
解决方案 16:
下面是更有用的代码(检查 python 是否确实执行了脚本):
#! /usr/bin/env python
import os
from sys import exit
def checkPidRunning(pid):
global script_name
if pid<1:
print "Incorrect pid number!"
exit()
try:
os.kill(pid, 0)
except OSError:
print "Abnormal termination of previous process."
return False
else:
ps_command = "ps -o command= %s | grep -Eq 'python .*/%s'" % (pid,script_name)
process_exist = os.system(ps_command)
if process_exist == 0:
return True
else:
print "Process with pid %s is not a Python process. Continue..." % pid
return False
if __name__ == '__main__':
script_name = os.path.basename(__file__)
pid = str(os.getpid())
pidfile = os.path.join("/", "tmp/", script_name+".pid")
if os.path.isfile(pidfile):
print "Warning! Pid file %s existing. Checking for process..." % pidfile
r_pid = int(file(pidfile,'r').readlines()[0])
if checkPidRunning(r_pid):
print "Python process with pid = %s is already running. Exit!" % r_pid
exit()
else:
file(pidfile, 'w').write(pid)
else:
file(pidfile, 'w').write(pid)
# main programm
....
....
os.unlink(pidfile)
这是字符串:
ps_command = "ps -o command= %s | grep -Eq 'python .*/%s'" % (pid,script_name)
如果“grep”成功则返回 0,并且进程“python”当前正在以脚本名称作为参数运行。
解决方案 17:
一个简单的例子,如果您只寻找一个进程名称是否存在:
import os
def pname_exists(inp):
os.system('ps -ef > /tmp/psef')
lines=open('/tmp/psef', 'r').read().split('
')
res=[i for i in lines if inp in i]
return True if res else False
Result:
In [21]: pname_exists('syslog')
Out[21]: True
In [22]: pname_exists('syslog_')
Out[22]: False
解决方案 18:
我正在寻找这个问题的答案,就我而言,我想到了一个非常简单且非常好的解决方案(因为我猜这不可能存在误报 - 如果程序不这样做,TXT 上的时间戳如何更新):
--> 只需根据需要在某个时间间隔内继续在 TXT 上写入当前时间戳(这里每半小时一次是完美的)。
如果您检查时 TXT 上的时间戳相对于当前时间戳已过时,则程序存在问题,应该重新启动或执行您希望执行的操作。
解决方案 19:
考虑以下示例来解决您的问题:
#!/usr/bin/python
# -*- coding: latin-1 -*-
import os, sys, time, signal
def termination_handler (signum,frame):
global running
global pidfile
print 'You have requested to terminate the application...'
sys.stdout.flush()
running = 0
os.unlink(pidfile)
running = 1
signal.signal(signal.SIGINT,termination_handler)
pid = str(os.getpid())
pidfile = '/tmp/'+os.path.basename(__file__).split('.')[0]+'.pid'
if os.path.isfile(pidfile):
print "%s already exists, exiting" % pidfile
sys.exit()
else:
file(pidfile, 'w').write(pid)
# Do some actual work here
while running:
time.sleep(10)
我建议使用这个脚本,因为它只能执行一次。
解决方案 20:
使用 bash 查找具有当前脚本名称的进程。没有额外的文件。
import commands
import os
import time
import sys
def stop_if_already_running():
script_name = os.path.basename(__file__)
l = commands.getstatusoutput("ps aux | grep -e '%s' | grep -v grep | awk '{print $2}'| awk '{print $2}'" % script_name)
if l[1]:
sys.exit(0);
为了测试,添加
stop_if_already_running()
print "running normally"
while True:
time.sleep(3)
解决方案 21:
这是我在 Linux 中使用的方法,以避免启动已经运行的脚本:
import os
import sys
script_name = os.path.basename(__file__)
pidfile = os.path.join("/tmp", os.path.splitext(script_name)[0]) + ".pid"
def create_pidfile():
if os.path.exists(pidfile):
with open(pidfile, "r") as _file:
last_pid = int(_file.read())
# Checking if process is still running
last_process_cmdline = "/proc/%d/cmdline" % last_pid
if os.path.exists(last_process_cmdline):
with open(last_process_cmdline, "r") as _file:
cmdline = _file.read()
if script_name in cmdline:
raise Exception("Script already running...")
with open(pidfile, "w") as _file:
pid = str(os.getpid())
_file.write(pid)
def main():
"""Your application logic goes here"""
if __name__ == "__main__":
create_pidfile()
main()
这种方法无需依赖任何外部模块即可发挥良好的效果。
扫码咨询,免费领取项目管理大礼包!