哪些 Python 软件包提供独立的事件系统?[关闭]

2025-02-18 09:23:00
admin
原创
100
摘要:问题描述:我知道pydispatcher,但是 Python 中肯定还有其他与事件相关的包。有哪些图书馆可用?我对大型框架中的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型基本解决方案。解决方案 1:PyPI 软件包截至 2024 年 10 月,这些是 PyPI 上可用的与事件相关的软件包,按最近发布日期排...

问题描述:

我知道pydispatcher,但是 Python 中肯定还有其他与事件相关的包。

有哪些图书馆可用?

我对大型框架中的事件管理器不感兴趣,我宁愿使用可以轻松扩展的小型基本解决方案。


解决方案 1:

PyPI 软件包

截至 2024 年 10 月,这些是 PyPI 上可用的与事件相关的软件包,按最近发布日期排序。

  • 闪光灯 1.8.2: 2024 年 5 月

  • pymitter 0.5.1 : 2024 年 5 月

  • psygnal 0.11.1 : 2024 年 5 月

  • pluggy 1.5.0 : 2024 年 4 月

  • 路易 2.0.1:2023 年 7 月

  • 活动 0.5:2023 年 7 月 2020

  • zope.event 5.0:2023 年 6 月

  • PyDispatcher 2.0.7:2023 年 2 月

  • python-dispatch 0.2.2 : 2023 年 6 月

  • python-dispatch 0.2.2 : 2023 年 6 月

  • RxPy3 1.0.1 : 2020 年 6 月

  • PyPubSub 4.0.3:2019 年 1 月

  • pyeventdispatcher 0.2.3a0:2018

  • 公交车道 0.0.5: 2018

  • PyPyDispatcher 2.1.2 : 2017

  • 阿克塞尔 0.0.7: 2016

  • 调度员 1.0:2012

  • py-通知 0.3.1:2008

还有更多

有很多库可供选择,使用非常不同的术语(事件、信号、处理程序、方法分派、钩子等)。

我正在尝试对上述软件包以及这里的答案中提到的技术进行概述。

首先,一些术语……

观察者模式

事件系统的最基本样式是‘处理程序方法包’,它是观察者模式的简单实现。

基本上,处理程序方法(可调用方法)存储在一个数组中,并在事件“触发”时调用。

发布-订阅

观察者事件系统的缺点是,你只能在实际的事件对象(或处理程序列表)上注册处理程序。因此,在注册时,事件必须已经存在。

这就是第二种事件系统样式存在的原因:
发布-订阅模式。在这里,处理程序不在事件对象(或处理程序列表)上注册,而是在中央调度程序上注册。此外,通知程序只与调度程序对话。监听什么或发布什么由“信号”决定,它只不过是一个名称(字符串)。

中介者模式

可能还会引起兴趣:中介模式。

钩子

“钩子”系统通常用于应用程序插件环境中。应用程序包含固定的集成点(钩子),每个插件都可以连接到该钩子并执行某些操作。

其他“事件”

注意:threading.Event不是上述意义上的“事件系统”。它是一个线程同步系统,其中一个线程等待,直到另一个线程“发出信号”事件对象。

网络消息库也经常使用术语“事件”;有时它们在概念上相似;有时则不然。它们当然可以跨越线程、进程和计算机边界。例如,参见
pyzmq、pymq、
Twisted、Tornado、 gevent、eventlet。

弱引用

在 Python 中,保留对方法或对象的引用可确保它不会被垃圾收集器删除。这可能是可取的,但也可能导致内存泄漏:链接的处理程序永远不会被清理。

一些事件系统使用弱引用而不是常规引用来解决这个问题。

关于各种图书馆的一些话

观察者式事件系统:

  • psygnal具有非常清晰的接口,带有 connect() 和 emit() 方法。

  • zope.event展示了其工作原理(参见Lennart 的回答)。注意:此示例甚至不支持处理程序参数。

  • LongPoke 的“可调用列表”实现表明,这样的事件系统可以通过子类化以非常简单的方式实现list

  • Felk 的变体EventHook还可以确保被调用者和调用者的签名。

  • spasig 的 EventHook(Michael Foord 的事件模式)是一个简单的实现。

  • Josip 的 Valued Lessons Event 课程基本相同,但使用set而不是list来存放袋子,并且__call__两者都是合理的补充。

  • PyNotify在概念上类似,还提供了变量和条件的附加概念(“变量改变事件”)。主页不起作用。

  • axel基本上是一个处理程序包,具有更多与线程、错误处理相关的功能……

  • python-dispatch需要从中派生出偶数源类pydispatch.Dispatcher

  • buslane是基于类的,支持单个或多个处理程序并提供广泛的类型提示。

  • Pithikos 的Observer/Event是一种轻量级设计。

发布-订阅库:

  • blinker具有一些巧妙的功能,例如自动断开连接和基于发送者的过滤。

  • PyPubSub是一个稳定的软件包,并承诺提供“方便调试和维护主题和消息的高级功能”。

  • pymitter是 Node.js EventEmitter2 的 Python 端口,提供命名空间、通配符和 TTL。

  • PyDispatcher似乎强调多对多发布等方面的灵活性。支持弱引用。

  • louie是经过重新设计的 PyDispatcher,可以在“各种各样的环境中”工作。

  • pypydispatcher基于(你猜对了......)PyDispatcher 并且也可以在 PyPy 中运行。

  • django.dispatch是一个重写的 PyDispatcher,“具有更有限的接口,但性能更高”。

  • pyeventdispatcher基于 PHP 的 Symfony 框架的事件调度程序。

  • 调度程序是从 django.dispatch 中提取的,但已经相当老了。

  • Cristian Garcia 的EventManger是一个非常简短的实现。

其他的:

  • pluggy包含一个供插件使用的钩子系统pytest

  • RxPy3实现了 Observable 模式并允许合并事件、重试等。

  • Qt 的信号和槽可从PyQt
    或PySide2获得。它们在同一个线程中使用时可用作回调,在两个不同线程之间用作事件(使用事件循环)。信号和槽的局限性在于它们只能在派生自 的类的对象中工作QObject

解决方案 2:

我一直这样做:

class Event(list):
    """Event subscription.

    A list of callable objects. Calling an instance of this will cause a
    call to each item in the list in ascending order by index.

    Example Usage:
    >>> def f(x):
    ...     print 'f(%s)' % x
    >>> def g(x):
    ...     print 'g(%s)' % x
    >>> e = Event()
    >>> e()
    >>> e.append(f)
    >>> e(123)
    f(123)
    >>> e.remove(f)
    >>> e()
    >>> e += (f, g)
    >>> e(10)
    f(10)
    g(10)
    >>> del e[0]
    >>> e(2)
    g(2)

    """
    def __call__(self, *args, **kwargs):
        for f in self:
            f(*args, **kwargs)

    def __repr__(self):
        return "Event(%s)" % list.__repr__(self)

但是,就像我见过的其他东西一样,这个没有自动生成的 pydoc,也没有签名,这真的很糟糕。

解决方案 3:

我们使用 Michael Foord 在他的Event Pattern中所建议的 EventHook :

只需将 EventHooks 添加到你的类中即可:

class MyBroadcaster()
    def __init__():
        self.onChange = EventHook()

theBroadcaster = MyBroadcaster()

# add a listener to the event
theBroadcaster.onChange += myFunction

# remove listener from the event
theBroadcaster.onChange -= myFunction

# fire event
theBroadcaster.onChange.fire()

我们添加了从对象中删除所有监听器的功能到 Michaels 类中,最终得到如下结果:

class EventHook(object):

    def __init__(self):
        self.__handlers = []

    def __iadd__(self, handler):
        self.__handlers.append(handler)
        return self

    def __isub__(self, handler):
        self.__handlers.remove(handler)
        return self

    def fire(self, *args, **keywargs):
        for handler in self.__handlers:
            handler(*args, **keywargs)

    def clearObjectHandlers(self, inObject):
        for theHandler in self.__handlers:
            if theHandler.im_self == inObject:
                self -= theHandler

解决方案 4:

我使用zope.event。它是你能想象到的最简单的框架。:-) 事实上,这里是完整的源代码:

subscribers = []

def notify(event):
    for subscriber in subscribers:
        subscriber(event)

请注意,您不能在进程之间发送消息。它不是一个消息系统,只是一个事件系统,仅此而已。

解决方案 5:

我在Valued Lessons上找到了这个小脚本。它似乎具有我所追求的恰到好处的简单性/功能比。Peter Thatcher 是以下代码的作者(未提及许可)。

class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount

class MockFileWatcher:
    def __init__(self):
        self.fileChanged = Event()

    def watchFiles(self):
        source_path = "foo"
        self.fileChanged(source_path)

def log_file_change(source_path):
    print "%r changed." % (source_path,)

def log_file_change2(source_path):
    print "%r changed!" % (source_path,)

watcher              = MockFileWatcher()
watcher.fileChanged += log_file_change2
watcher.fileChanged += log_file_change
watcher.fileChanged -= log_file_change2
watcher.watchFiles()

解决方案 6:

这是一个应该可以正常工作的最小设计。您需要做的只是继承Observer一个类,然后使用它observe(event_name, callback_fn)来监听特定事件。每当在代码中的任何地方触发该特定事件(即Event('USB connected')),就会触发相应的回调。

class Observer():
    _observers = []
    def __init__(self):
        self._observers.append(self)
        self._observed_events = []
    def observe(self, event_name, callback_fn):
        self._observed_events.append({'event_name' : event_name, 'callback_fn' : callback_fn})


class Event():
    def __init__(self, event_name, *callback_args):
        for observer in Observer._observers:
            for observable in observer._observed_events:
                if observable['event_name'] == event_name:
                    observable['callback_fn'](*callback_args)

例子:

class Room(Observer):
    def __init__(self):
        print("Room is ready.")
        Observer.__init__(self) # DON'T FORGET THIS
    def someone_arrived(self, who):
        print(who + " has arrived!")

# Observe for specific event
room = Room()
room.observe('someone arrived',  room.someone_arrived)

# Fire some events
Event('someone left',    'John')
Event('someone arrived', 'Lenard') # will output "Lenard has arrived!"
Event('someone Farted',  'Lenard')

解决方案 7:

我创建了一个EventManager类(代码在最后)。语法如下:

#Create an event with no listeners assigned to it
EventManager.addEvent( eventName = [] )

#Create an event with listeners assigned to it
EventManager.addEvent( eventName = [fun1, fun2,...] )

#Create any number event with listeners assigned to them
EventManager.addEvent( eventName1 = [e1fun1, e1fun2,...], eventName2 = [e2fun1, e2fun2,...], ... )

#Add or remove listener to an existing event
EventManager.eventName += extra_fun
EventManager.eventName -= removed_fun

#Delete an event
del EventManager.eventName

#Fire the event
EventManager.eventName()

以下是一个例子:

def hello(name):
    print "Hello {}".format(name)
    
def greetings(name):
    print "Greetings {}".format(name)

EventManager.addEvent( salute = [greetings] )
EventManager.salute += hello

print "
Initial salute"
EventManager.salute('Oscar')

print "
Now remove greetings"
EventManager.salute -= greetings
EventManager.salute('Oscar')

输出:

最初的敬礼

问候奥斯卡

你好奥斯卡

现在删除问候语

Hello Oscar

EventManger 代码:

class EventManager:
    
    class Event:
        def __init__(self,functions):
            if type(functions) is not list:
                raise ValueError("functions parameter has to be a list")
            self.functions = functions
            
        def __iadd__(self,func):
            self.functions.append(func)
            return self
            
        def __isub__(self,func):
            self.functions.remove(func)
            return self
            
        def __call__(self,*args,**kvargs):
            for func in self.functions : func(*args,**kvargs)
            
    @classmethod
    def addEvent(cls,**kvargs):
        """
        addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... )
        creates events using **kvargs to create any number of events. Each event recieves a list of functions,
        where every function in the list recieves the same parameters.
        
        Example:
        
        def hello(): print "Hello ",
        def world(): print "World"
        
        EventManager.addEvent( salute = [hello] )
        EventManager.salute += world
        
        EventManager.salute()
        
        Output:
        Hello World
        """
        for key in kvargs.keys():
            if type(kvargs[key]) is not list:
                raise ValueError("value has to be a list")
            else:
                kvargs[key] = cls.Event(kvargs[key])
        
        cls.__dict__.update(kvargs)

解决方案 8:

您可以看看pymitter ( pypi )。它是一种小型单文件 (~250 loc) 方法,“提供命名空间、通配符和 TTL”。

这是一个基本的例子:

from pymitter import EventEmitter

ee = EventEmitter()

# decorator usage
@ee.on("myevent")
def handler1(arg):
   print "handler1 called with", arg

# callback usage
def handler2(arg):
    print "handler2 called with", arg
ee.on("myotherevent", handler2)

# emit
ee.emit("myevent", "foo")
# -> "handler1 called with foo"

ee.emit("myotherevent", "bar")
# -> "handler2 called with bar"

解决方案 9:

我对 Longpoke 的简约方法进行了改进,同时确保了被调用者和调用者的签名:

class EventHook(object):
    '''
    A simple implementation of the Observer-Pattern.
    The user can specify an event signature upon inizializazion,
    defined by kwargs in the form of argumentname=class (e.g. id=int).
    The arguments' types are not checked in this implementation though.
    Callables with a fitting signature can be added with += or removed with -=.
    All listeners can be notified by calling the EventHook class with fitting
    arguments.

    >>> event = EventHook(id=int, data=dict)
    >>> event += lambda id, data: print("%d %s" % (id, data))
    >>> event(id=5, data={"foo": "bar"})
    5 {'foo': 'bar'}

    >>> event = EventHook(id=int)
    >>> event += lambda wrong_name: None
    Traceback (most recent call last):
        ...
    ValueError: Listener must have these arguments: (id=int)

    >>> event = EventHook(id=int)
    >>> event += lambda id: None
    >>> event(wrong_name=0)
    Traceback (most recent call last):
        ...
    ValueError: This EventHook must be called with these arguments: (id=int)
    '''
    def __init__(self, **signature):
        self._signature = signature
        self._argnames = set(signature.keys())
        self._handlers = []

    def _kwargs_str(self):
        return ", ".join(k+"="+v.__name__ for k, v in self._signature.items())

    def __iadd__(self, handler):
        params = inspect.signature(handler).parameters
        valid = True
        argnames = set(n for n in params.keys())
        if argnames != self._argnames:
            valid = False
        for p in params.values():
            if p.kind == p.VAR_KEYWORD:
                valid = True
                break
            if p.kind not in (p.POSITIONAL_OR_KEYWORD, p.KEYWORD_ONLY):
                valid = False
                break
        if not valid:
            raise ValueError("Listener must have these arguments: (%s)"
                             % self._kwargs_str())
        self._handlers.append(handler)
        return self

    def __isub__(self, handler):
        self._handlers.remove(handler)
        return self

    def __call__(self, *args, **kwargs):
        if args or set(kwargs.keys()) != self._argnames:
            raise ValueError("This EventHook must be called with these " +
                             "keyword arguments: (%s)" % self._kwargs_str())
        for handler in self._handlers[:]:
            handler(**kwargs)

    def __repr__(self):
        return "EventHook(%s)" % self._kwargs_str()

解决方案 10:

如果我在 pyQt 中编写代码,我会使用 QT 套接字/信号范例,对于 django 也是如此

如果我正在进行异步 I/OI,请使用本机选择模块

如果我使用 SAX Python 解析器,我会使用 SAX 提供的事件 API。所以看起来我是底层 API 的受害者 :-)

也许你应该问问自己,你对事件框架/模块有什么期望。我个人倾向于使用 QT 的 Socket/Signal 范例。有关更多信息,请参见此处

解决方案 11:

如果您想要执行更复杂的事情(如合并事件或重试),则可以使用 Observable 模式和实现该模式的成熟库。https ://github.com/ReactiveX/RxPY。Observable在 Javascript 和 Java 中非常常见,并且非常方便用于某些异步任务。

from rx import Observable, Observer


def push_five_strings(observer):
        observer.on_next("Alpha")
        observer.on_next("Beta")
        observer.on_next("Gamma")
        observer.on_next("Delta")
        observer.on_next("Epsilon")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

source = Observable.create(push_five_strings)

source.subscribe(PrintObserver())

输出

Received Alpha
Received Beta
Received Gamma
Received Delta
Received Epsilon
Done!

解决方案 12:

这是另一个可供考虑的模块。对于要求更高的应用程序来说,这似乎是可行的选择。

Py-notify 是一个 Python 包,提供用于实现观察者编程模式的工具。这些工具包括信号、条件和变量。

信号是发出信号时调用的处理程序列表。条件基本上是布尔变量加上条件状态改变时发出的信号。它们可以使用标准逻辑运算符(非、与等)组合成复合条件。变量与条件不同,可以保存任何 Python 对象,而不仅仅是布尔值,但它们不能组合。

解决方案 13:

如果您需要跨进程或网络边界工作的事件总线,您可以尝试PyMQ。它目前支持发布/订阅、消息队列和同步 RPC。默认版本在 Redis 后端上运行,因此您需要一个正在运行的 Redis 服务器。还有一个内存后端用于测试。您也可以编写自己的后端。

import pymq

# common code
class MyEvent:
    pass

# subscribe code
@pymq.subscriber
def on_event(event: MyEvent):
    print('event received')

# publisher code
pymq.publish(MyEvent())

# you can also customize channels
pymq.subscribe(on_event, channel='my_channel')
pymq.publish(MyEvent(), channel='my_channel')

初始化系统:

from pymq.provider.redis import RedisConfig

# starts a new thread with a Redis event loop
pymq.init(RedisConfig())

# main application control loop

pymq.shutdown()

免责声明:我是这个库的作者

解决方案 14:

另一个方便的包是events。它封装了事件订阅和事件触发的核心,感觉就像语言的“自然”部分。它看起来类似于 C# 语言,它提供了一种方便的方式来声明、订阅和触发事件。从技术上讲,事件是一个可以附加回调函数(事件处理程序)的“插槽”,这个过程称为订阅事件。

# Define a callback function
def something_changed(reason):
    print "something changed because %s" % reason

# Use events module to create an event and register one or more callback functions
from events import Events
events = Events()
events.on_change += something_changed

触发事件时,将按顺序调用所有附加的事件处理程序。要触发事件,请在插槽上执行调用:

events.on_change('it had to happen')

这将输出:

'something changed because it had to happen'

可以在github repo或文档中找到更多文档。

解决方案 15:

你可以尝试buslane模块。

该库使基于消息的系统的实现更加容易。它支持命令(单个处理程序)和事件(0 个或多个处理程序)方法。Buslane 使用 Python 类型注释来正确注册处理程序。

简单示例:

from dataclasses import dataclass

from buslane.commands import Command, CommandHandler, CommandBus


@dataclass(frozen=True)
class RegisterUserCommand(Command):
    email: str
    password: str


class RegisterUserCommandHandler(CommandHandler[RegisterUserCommand]):

    def handle(self, command: RegisterUserCommand) -> None:
        assert command == RegisterUserCommand(
            email='john@lennon.com',
            password='secret',
        )


command_bus = CommandBus()
command_bus.register(handler=RegisterUserCommandHandler())
command_bus.execute(command=RegisterUserCommand(
    email='john@lennon.com',
    password='secret',
))

要安装 buslane,只需使用 pip:

$ pip install buslane

解决方案 16:

前段时间我写了一个库,可能对你有用。它允许你拥有本地和全局监听器、注册它们的多种不同方式、执行优先级等等。

from pyeventdispatcher import register

register("foo.bar", lambda event: print("second"))
register("foo.bar", lambda event: print("first "), -100)

dispatch(Event("foo.bar", {"id": 1}))
# first second

看看pyeventdispatcher

相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   3998  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   2749  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Freshdesk、ClickUp、nTask、Hubstaff、Plutio、Productive、Targa、Bonsai、Wrike。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在项目管理过程中面临着诸多痛点,如任务分配不...
项目管理系统   85  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Monday、TeamGantt、Filestage、Chanty、Visor、Smartsheet、Productive、Quire、Planview。在当今快速变化的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多项目经理和团队在管理复杂项目时,常...
开源项目管理工具   96  
  本文介绍了以下10款项目管理软件工具:禅道项目管理软件、Smartsheet、GanttPRO、Backlog、Visor、ResourceGuru、Productive、Xebrio、Hive、Quire。在当今快节奏的商业环境中,项目管理已成为企业成功的关键因素之一。然而,许多企业在选择项目管理工具时常常面临困惑:...
项目管理系统   83  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用