Flask框架中如何实现服务端推送?

2025-02-27 09:07:00
admin
原创
56
摘要:问题描述:我正在尝试在 Flask 微型网络框架上使用服务器推送功能构建一个小型网站,但我不知道是否有可以直接使用的框架。我使用了Juggernaut,但它似乎无法与当前版本的redis-py一起使用,并且 Juggernaut 最近已被弃用。有人对我的情况有什么建议吗?解决方案 1:看看服务器发送事件。服务...

问题描述:

我正在尝试在 Flask 微型网络框架上使用服务器推送功能构建一个小型网站,但我不知道是否有可以直接使用的框架。

我使用了Juggernaut,但它似乎无法与当前版本的redis-py一起使用,并且 Juggernaut 最近已被弃用。

有人对我的情况有什么建议吗?


解决方案 1:

看看服务器发送事件。服务器发送事件是一种浏览器 API,它允许您保持打开服务器套接字,订阅更新流。有关更多信息,请阅读 Alex MacCaw(Juggernaut 的作者)的帖子,了解他为何杀死 juggernaut以及为什么更简单的服务器发送事件在许多情况下比 Websockets 更适合这项工作。

该协议非常简单。只需将 mimetype 添加text/event-stream到您的响应中即可。浏览器将保持连接打开并监听更新。从服务器发送的事件是一行以 开头的文本data:,后面跟着换行符。

data: this is a simple message
<blank line>

如果您想交换结构化数据,只需将数据转储为 json 并通过网络发送 json。

一个优点是您可以在 Flask 中使用 SSE,而无需额外的服务器。github 上有一个简单的聊天应用程序示例,它使用 redis 作为 pub/sub 后端。

def event_stream():
    pubsub = red.pubsub()
    pubsub.subscribe('chat')
    for message in pubsub.listen():
        print message
        yield 'data: %s

' % message['data']


@app.route('/post', methods=['POST'])
def post():
    message = flask.request.form['message']
    user = flask.session.get('user', 'anonymous')
    now = datetime.datetime.now().replace(microsecond=0).time()
    red.publish('chat', u'[%s] %s: %s' % (now.isoformat(), user, message))


@app.route('/stream')
def stream():
    return flask.Response(event_stream(),
                          mimetype="text/event-stream")

您不需要使用 gunicron 来运行示例应用程序。只需确保在运行应用程序时使用线程,否则 SSE 连接将阻止您的开发服务器:

if __name__ == '__main__':
    app.debug = True
    app.run(threaded=True)

在客户端,您只需要一个 Javascript 处理程序函数,当从服务器推送新消息时它将被调用。

var source = new EventSource('/stream');
source.onmessage = function (event) {
     alert(event.data);
};

最新的 Firefox、Chrome 和 Safari 浏览器支持服务器发送事件。Internet Explorer 尚不支持服务器发送事件,但预计将在版本 10 中支持它们。有两个推荐的 Polyfill 来支持旧版浏览器

  • 事件源.js

  • jquery.事件源

解决方案 2:

Redis 有点过头了:使用服务器发送事件 (SSE)

像往常一样,我迟到了,但恕我直言,使用 Redis 可能有点过度了。

只要你使用 Python+Flask,就可以考虑使用Panisuan Joe Chasinga 的这篇优秀文章中描述的生成器函数。其要点如下:

在您的客户端 index.html 中

var targetContainer = document.getElementById("target_div");
var eventSource = new EventSource("/stream")
  eventSource.onmessage = function(e) {
  targetContainer.innerHTML = e.data;
};
...
<div id="target_div">Watch this space...</div>

在你的 Flask 服务器中:

def get_message():
    '''this could be any function that blocks until data is ready'''
    time.sleep(1.0)
    s = time.ctime(time.time())
    return s

@app.route('/')
def root():
    return render_template('index.html')

@app.route('/stream')
def stream():
    def eventStream():
        while True:
            # wait for source data to be available, then push it
            yield 'data: {}

'.format(get_message())
    return Response(eventStream(), mimetype="text/event-stream")

解决方案 3:

作为@peter-hoffmann 回答的后续,我编写了一个 Flask 扩展,专门用于处理服务器发送的事件。它被称为Flask-SSE,可在 PyPI 上使用。要安装它,请运行:

$ pip install flask-sse

你可以像这样使用它:

from flask import Flask
from flask_sse import sse

app = Flask(__name__)
app.config["REDIS_URL"] = "redis://localhost"
app.register_blueprint(sse, url_prefix='/stream')

@app.route('/send')
def send_message():
    sse.publish({"message": "Hello!"}, type='greeting')
    return "Message sent!"

要从 Javascript 连接到事件流,其工作原理如下:

var source = new EventSource("{{ url_for('sse.stream') }}");
source.addEventListener('greeting', function(event) {
    var data = JSON.parse(event.data);
    // do what you want with this data
}, false);

文档可在 ReadTheDocs 上找到。请注意,您需要一个正在运行的Redis服务器来处理发布/订阅。

解决方案 4:

作为https://github.com/WolfgangFahl/pyFlaskBootstrap4的提交者,我遇到了同样的需求,并为不依赖于 redis 的服务器发送事件创建了一个 flask 蓝图。

该解决方案建立在过去给出的其他答案的基础上。

https://github.com/WolfgangFahl/pyFlaskBootstrap4/blob/main/fb4/sse_bp.py有源代码(另请参阅下面的 sse_bp.py)。

单元测试位于https://github.com/WolfgangFahl/pyFlaskBootstrap4/blob/main/tests/test_sse.py

你的想法是,你可以使用不同的模式来创建你的 SSE 流:

  • 通过提供功能

  • 通过提供发电机

  • 使用 PubSub 帮助类

  • 通过使用 PubSub 帮助类并同时使用 pydispatch。

截至 2021-02-12,这仍然是 alpha 代码,但我还是想分享。请在此处评论或作为项目中的问题。

http://fb4demo.bitplan.com/events上有一个演示,http: //wiki.bitplan.com/index.php/PyFlaskBootstrap4#Server_Sent_Events上有一个示例使用说明,例如进度条或时间显示

客户端 javascript/html 代码示例

<div id="event_div">Watch this space...</div>
<script>
    function fillContainerFromSSE(id,url) {
        var targetContainer = document.getElementById(id);
        var eventSource = new EventSource(url)
        eventSource.onmessage = function(e) {
            targetContainer.innerHTML = e.data;
        };
    };
    fillContainerFromSSE("event_div","/eventfeed");
</script>

服务器端代码示例


def getTimeEvent(self):
        '''
        get the next time stamp
        '''
        time.sleep(1.0)
        s=datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
        return s   

def eventFeed(self):
        '''
        create a Server Sent Event Feed
        '''
        sse=self.sseBluePrint
        # stream from the given function
        return sse.streamFunc(self.getTimeEvent)

sse_bp.py

'''
Created on 2021-02-06
@author: wf
'''
from flask import Blueprint, Response, request, abort,stream_with_context
from queue import Queue
from pydispatch import dispatcher
import logging

class SSE_BluePrint(object):
    '''
    a blueprint for server side events 
    '''
    def __init__(self,app,name:str,template_folder:str=None,debug=False,withContext=False):
        '''
        Constructor
        '''
        self.name=name
        self.debug=debug
        self.withContext=False
        if template_folder is not None:
            self.template_folder=template_folder
        else:
            self.template_folder='templates'    
        self.blueprint=Blueprint(name,__name__,template_folder=self.template_folder)
        self.app=app
        app.register_blueprint(self.blueprint)
        
        @self.app.route('/sse/<channel>')
        def subscribe(channel):
            def events():
                PubSub.subscribe(channel)
            self.stream(events)
                
    def streamSSE(self,ssegenerator): 
        '''
        stream the Server Sent Events for the given SSE generator
        '''  
        response=None
        if self.withContext:
            if request.headers.get('accept') == 'text/event-stream':
                response=Response(stream_with_context(ssegenerator), content_type='text/event-stream')
            else:
                response=abort(404)    
        else:
            response= Response(ssegenerator, content_type='text/event-stream')
        return response
        
    def streamGen(self,gen):
        '''
        stream the results of the given generator
        '''
        ssegen=self.generateSSE(gen)
        return self.streamSSE(ssegen)   
            
    def streamFunc(self,func,limit=-1):
        '''
        stream a generator based on the given function
        Args:
            func: the function to convert to a generator
            limit (int): optional limit of how often the generator should be applied - 1 for endless
        Returns:
            an SSE Response stream
        '''
        gen=self.generate(func,limit)
        return self.streamGen(gen)
                
    def generate(self,func,limit=-1):
        '''
        create a SSE generator from a given function
        Args:
            func: the function to convert to a generator
            limit (int): optional limit of how often the generator should be applied - 1 for endless
        Returns:
            a generator for the function
        '''   
        count=0
        while limit==-1 or count<limit:
            # wait for source data to be available, then push it
            count+=1
            result=func()
            yield result
        
    def generateSSE(self,gen):
        for result in gen:
            yield 'data: {}

'.format(result)
            
    def enableDebug(self,debug:bool):
        '''
        set my debugging
        
        Args:
            debug(bool): True if debugging should be switched on
        '''
        self.debug=debug
        if self.debug:
            logging.basicConfig(level=logging.DEBUG, format='%(asctime)s.%(msecs)03d %(levelname)s:    %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
            
    def publish(self, message:str, channel:str='sse', debug=False):
        """
        Publish data as a server-sent event.
        
        Args:
            message(str): the message to send
            channel(str): If you want to direct different events to different
                clients, you may specify a channel for this event to go to.
                Only clients listening to the same channel will receive this event.
                Defaults to "sse".
            debug(bool): if True  enable debugging
        """
        return PubSub.publish(channel=channel, message=message,debug=debug)

    def subscribe(self,channel,limit=-1,debug=False):
        def stream():
            for message in PubSub.subscribe(channel,limit,debug=debug):
                yield str(message)
                
        return self.streamGen(stream)
    
class PubSub:
    '''
    redis pubsub duck replacement
    '''
    pubSubByChannel={}
    
    def __init__(self,channel:str='sse',maxsize:int=15, debug=False,dispatch=False):
        '''
        Args:
            channel(string): the channel name
            maxsize(int): the maximum size of the queue
            debug(bool): whether debugging should be switched on
            dispatch(bool): if true use the pydispatch library - otherwise only a queue
        '''
        self.channel=channel
        self.queue=Queue(maxsize=maxsize)
        self.debug=debug
        self.receiveCount=0
        self.dispatch=False
        if dispatch:
            dispatcher.connect(self.receive,signal=channel,sender=dispatcher.Any)
        
    @staticmethod
    def reinit():
        '''
        reinitialize the pubSubByChannel dict
        '''
        PubSub.pubSubByChannel={}
        
    @staticmethod
    def forChannel(channel):    
        '''
        return a PubSub for the given channel
        
        Args:
            channel(str): the id of the channel
        Returns:
            PubSub: the PubSub for the given channel
        '''
        if channel in PubSub.pubSubByChannel:
            pubsub=PubSub.pubSubByChannel[channel]
        else:
            pubsub=PubSub(channel)
            PubSub.pubSubByChannel[channel]=pubsub
        return pubsub
    
    @staticmethod    
    def publish(channel:str,message:str,debug=False):
        '''
        publish a message via the given channel
        
        Args:
            channel(str): the id of the channel to use
            message(str): the message to publish/send
        Returns:
            PubSub: the pub sub for the channel
            
        '''
        pubsub=PubSub.forChannel(channel)
        pubsub.debug=debug
        pubsub.send(message)
        return pubsub
        
    @staticmethod    
    def subscribe(channel,limit=-1,debug=False): 
        '''
        subscribe to the given channel
        
        Args:
            channel(str): the id of the channel to use
            limit(int): limit the maximum amount of messages to be received        
            debug(bool): if True debugging info is printed
        '''  
        pubsub=PubSub.forChannel(channel)
        pubsub.debug=debug
        return pubsub.listen(limit)
    
    def send(self,message):
        '''
        send the given message
        '''
        sender=object();
        if self.dispatch:
            dispatcher.send(signal=self.channel,sender=sender,msg=message)
        else:
            self.receive(sender,message)
        
    def receive(self,sender,message):
        '''
        receive a message
        '''
        if sender is not None:
            self.receiveCount+=1;
            if self.debug:
                logging.debug("received %d:%s" % (self.receiveCount,message))
            self.queue.put(message)
        
    def listen(self,limit=-1):
        '''
        listen to my channel
        
        this is a generator for the queue content of received messages
        
        Args:
            limit(int): limit the maximum amount of messages to be received
        
        Return:
            generator: received messages to be yielded
        '''
        if limit>0 and self.receiveCount>limit:
            return
        yield self.queue.get()
    
    def unsubscribe(self):
        '''
        unsubscribe me
        '''
        if self.dispatch:
            dispatcher.disconnect(self.receive, signal=self.channel)
        pass

解决方案 5:

我打算使用 gunicorn 在 flask 中将更新的事件推送到客户端,而不是使用 websocket。阅读完其他答案后,我注意到:

  1. 当 gunicorn 运行多个 woker 时,我必须为 gunicorn 派生的所有进程共享事件变量。这意味着需要 redis 或外部存储,我使用 redis pubsub。

  2. 服务器不能先关闭连接,因为客户端会重新连接。

  3. 当客户端关闭连接时,服务器在调用阻塞函数时无法检测到它,直到产生某些结果。服务器可以使用以下代码来检测连接是否关闭(不是实时检测)。

  4. 异常GeneratorExit不起作用,对我来说,它没有以某种方式引发。

  5. Nginx 配置在使用缓存时会破坏 sse,需要使用response.headers['X-Accel-Buffering'] = 'no'禁用缓存。

  6. gunicorn 可能会因为超时后没有活动而重新启动 worker(参见https://docs.gunicorn.org/en/stable/settings.html#timeout),需要使用gunicorn --timeout 0,注意,如果你的 flask 项目非常复杂并且需要自动重启,它可能会破坏服务器,我的超时秒数设置是 30,让客户端在服务器重启后重新连接。

  7. 如果没有活动的话,Nginx 会断开连接,但是客户端会重新连接,服务端只需要处理请求头即可Last-Event-ID

  8. gunicorn 需要 gevent 供多个用户使用,否则 sse 连接将阻塞其他连接。

  9. 单台服务器socket最大连接数为65536,也就是说如果单机的话,同时访问网站的用户数不能超过65536个。

如何检测客户端关闭的连接

def iter():
    try:
        while True:
            events = get_update_events(timeout=1)  # note that this is blocking function
            if not event:
                continue
            yield events
    finally:
        connection_closed()

return Response(iter(), mimetype="text/event-stream")

或者使用response.call_on_close替代方案:

response = Response(iter(), mimetype="text/event-stream")
response.call_on_close(connection_closed)
return response
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2974  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1836  
  PLM(产品生命周期管理)系统在企业的产品研发、生产与管理过程中扮演着至关重要的角色。然而,在实际运行中,资源冲突是经常会遇到的难题。资源冲突可能导致项目进度延迟、成本增加以及产品质量下降等一系列问题,严重影响企业的效益与竞争力。因此,如何有效应对PLM系统中的资源冲突,成为众多企业关注的焦点。接下来,我们将详细探讨5...
plm项目管理系统   47  
  敏捷项目管理与产品生命周期管理(PLM)的融合,正成为企业在复杂多变的市场环境中提升研发效率、增强竞争力的关键举措。随着技术的飞速发展和市场需求的快速更迭,传统的研发流程面临着诸多挑战,而将敏捷项目管理理念融入PLM,有望在2025年实现研发流程的深度优化,为企业创造更大的价值。理解敏捷项目管理与PLM的核心概念敏捷项...
plm项目   47  
  模块化设计在现代产品开发中扮演着至关重要的角色,它能够提升产品开发效率、降低成本、增强产品的可维护性与可扩展性。而产品生命周期管理(PLM)系统作为整合产品全生命周期信息的关键平台,对模块化设计有着强大的支持能力。随着技术的不断发展,到 2025 年,PLM 系统在支持模块化设计方面将有一系列令人瞩目的技术实践。数字化...
plm软件   48  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用