Flask 返回响应后执行函数

2025-04-16 08:56:00
admin
原创
21
摘要:问题描述:我有一些代码需要在 Flask 返回响应后执行。我认为像 Celery 这样的任务队列不够复杂。关键要求是 Flask 必须在运行此函数之前将响应返回给客户端。它不能等待函数执行完毕。关于这一点存在一些疑问,但似乎没有一个答案解决在响应发送给客户端后运行任务的问题,它们仍然同步执行,然后返回响应。P...

问题描述:

我有一些代码需要在 Flask 返回响应执行。我认为像 Celery 这样的任务队列不够复杂。关键要求是 Flask 必须在运行此函数之前将响应返回给客户端。它不能等待函数执行完毕。

关于这一点存在一些疑问,但似乎没有一个答案解决在响应发送给客户端后运行任务的问题,它们仍然同步执行,然后返回响应。

  • Python Flask 立即发送响应

  • 在Flask中返回响应后需要执行一个函数

  • Flask 结束响应并继续处理


解决方案 1:

长话短说,Flask 没有提供任何特殊功能来实现这一点。对于简单的一次性任务,可以考虑使用如下所示的 Python 多线程。对于更复杂的配置,可以使用 RQ 或 Celery 等任务队列。

为什么?

理解 Flask 提供的功能以及它们为何无法实现预期目标至关重要。这些内容在其他情况下很有用,也值得一读,但对后台任务却没有帮助。

Flask 的after_request处理程序

Flask 的after_request处理程序(详见延迟请求回调模式和每个请求附加不同函数的代码片段)会将请求传递给回调函数。其预期用例是修改请求,例如附加 Cookie。

因此,请求将等待这些处理程序完成执行,因为预期请求本身将因此而改变。

Flask 的teardown_request处理程序

这类似于after_request,但teardown_request不接收request对象。这意味着它不会等待请求,对吗?

这看起来像是一个解决方案,正如Stack Overflow 上一个类似问题的答案所暗示的那样。而且,由于 Flask 的文档解释说,拆卸回调与实际请求无关,并且不接收请求上下文,因此您有充分的理由相信这一点。

遗憾的是,teardown_request它仍然是同步的,只是在 Flask 请求处理的后期,当请求不再可修改时才会发生。Flask仍将等待拆卸函数完成后再返回响应,正如Flask 回调和错误列表所示。

Flask 的流式响应

Flask 可以通过将生成器传递给来流式传输响应Response(),正如Stack Overflow 对类似问题的回答所建议的那样。

使用流式传输时,客户端确实会在请求结束之前开始接收响应。但是,请求仍然是同步运行的,因此处理请求的工作线程会一直处于忙碌状态,直到流式传输完成为止。

这种用于流式传输的 Flask 模式包含一些有关使用的文档stream_with_context(),其中包含请求上下文是必要的。

那么解决方案是什么?

Flask 不提供在后台运行函数的解决方案,因为这不是 Flask 的责任。

大多数情况下,解决这个问题的最佳方法是使用任务队列,例如 RQ 或 Celery。它们可以帮你管理一些棘手的事情,例如配置、调度和分配工作线程。这是这类问题最常见的答案,因为它是最正确的,并且能够迫使你以正确考虑上下文等的方式进行设置。

如果您需要在后台运行某个函数,但又不想设置队列来管理它,那么您可以使用 Python 的内置函数threadingmultiprocessing生成后台工作程序。

您无法request在后台任务中访问 Flask 的线程本地变量或其他变量,因为请求不会在后台任务中激活。相反,您需要在创建视图时将所需的数据传递到后台线程。

@app.route('/start_task')
def start_task():
    def do_work(value):
        # do something that takes a long time
        import time
        time.sleep(value)

    thread = Thread(target=do_work, kwargs={'value': request.args.get('value', 20)})
    thread.start()
    return 'started'

解决方案 2:

Flask 是一个WSGI 应用,因此它本质上无法处理响应之后的任何内容。这就是为什么不存在这样的处理程序的原因,WSGI 应用本身只负责构造发送给 WSGI 服务器的响应迭代器对象。

然而, WSGI 服务器(如gunicorn)可以非常轻松地提供此功能,但出于多种原因,将应用程序绑定到服务器是一个非常糟糕的主意。

正因如此,WSGI 提供了Middleware的规范,而 Werkzeug 也提供了许多辅助类来简化常用的 Middleware 功能。其中就有一个ClosingIterator类,它允许你将方法挂接到close响应迭代器的方法上,该迭代器会在请求关闭后执行。

下面是一个after_response作为 Flask 扩展完成的简单实现的示例:

import traceback
from werkzeug.wsgi import ClosingIterator

class AfterResponse:
    def __init__(self, app=None):
        self.callbacks = []
        if app:
            self.init_app(app)

    def __call__(self, callback):
        self.callbacks.append(callback)
        return callback

    def init_app(self, app):
        # install extension
        app.after_response = self

        # install middleware
        app.wsgi_app = AfterResponseMiddleware(app.wsgi_app, self)

    def flush(self):
        for fn in self.callbacks:
            try:
                fn()
            except Exception:
                traceback.print_exc()

class AfterResponseMiddleware:
    def __init__(self, application, after_response_ext):
        self.application = application
        self.after_response_ext = after_response_ext

    def __call__(self, environ, after_response):
        iterator = self.application(environ, after_response)
        try:
            return ClosingIterator(iterator, [self.after_response_ext.flush])
        except Exception:
            traceback.print_exc()
            return iterator

您可以像这样使用此扩展:

import flask
app = flask.Flask("after_response")
AfterResponse(app)

@app.after_response
def say_hi():
    print("hi")

@app.route("/")
def home():
    return "Success!
"

当您卷曲“/”时,您会在日志中看到以下内容:

127.0.0.1 - - [24/Jun/2018 19:30:48] "GET / HTTP/1.1" 200 -
hi

这简单地解决了该问题,无需引入线程(GIL??)或安装和管理任务队列和客户端软件。

解决方案 3:

Flask 现在通过 Werkzeug 支持call_on_close在响应对象上使用回调装饰器。使用方法如下:

@app.after_request
def response_processor(response):
    # Prepare all the local variables you need since the request context
    # will be gone in the callback function

    @response.call_on_close
    def process_after_request():
        # Do whatever is necessary here
        pass

    return response

优点:

  1. call_on_close使用 WSGI 规范的方法设置在响应返回后调用的函数close

  2. 无线程,无后台任务,无复杂设置。它在同一个线程中运行,不会阻塞请求返回。

缺点:

  1. 没有请求上下文或应用上下文。你必须保存所需的变量,以便将其传递给闭包。

  2. 没有本地堆栈,因为所有东西都将被拆除。如果需要,您必须创建自己的应用上下文。

  3. 如果您尝试写入数据库,Flask-SQLAlchemy 将会静默失败(我还没弄清楚原因,但可能是由于上下文关闭了)。session.add (它可以工作,但如果您有一个现有对象,则必须使用或将其添加到新会话中session.merge;这不算缺点!)

解决方案 4:

有 3 种方法可以实现这一点,均有效:

  1. 线程

@app.route('/inner')
def foo():
    for i in range(10):
        sleep(1)
        print(i)
    return 

@app.route('/inner', methods=['POST'])
def run_jobs():
    try:
        thread = Thread(target=foo)
        thread.start()
        return render_template("index_inner.html", img_path=DIR_OF_PHOTOS, video_path=UPLOAD_VIDEOS_FOLDER)
                            
  1. AfterResponse 装饰器

app = Flask(__name__)
AfterResponse(app)

@app.route('/inner', methods=['POST'])
def save_data():
    pass

@app.after_response
def foo():
    for i in range(10):
        sleep(1)
        print(i)
    return 
  1. call_on_close

from time import sleep

from flask import Flask, Response, request


app = Flask('hello')


@app.route('/')
def hello():

    response = Response('hello')

    @response.call_on_close
    def on_close():
        for i in range(10):
            sleep(1)
            print(i)

    return response


if __name__ == '__main__':
    app.run()

解决方案 5:

Flask Blueprints 的中间件解决方案

这和 Matthew Story 提出的解决方案相同(在我看来,这是完美的解决方案 - 感谢 Matthew),并针对 Flask Blueprints 进行了调整。这里的秘诀是使用 current_app 代理获取应用上下文。更多信息请阅读此处(http://flask.pocoo.org/docs/1.0/appcontext/ )

假设 AfterThisResponse 和 AfterThisResponseMiddleware 类位于 .utils.after_this_response.py 模块中

然后,在 Flask 对象创建发生的地方,您可能会有,例如......

__init__.py

from api.routes import my_blueprint
from .utils.after_this_response import AfterThisResponse

app = Flask( __name__ )
AfterThisResponse( app )
app.register_blueprint( my_blueprint.mod )

然后在你的蓝图模块中...

蓝图.py

from flask import Blueprint, current_app

mod = Blueprint( 'a_blueprint', __name__, url_prefix=URL_PREFIX )

@mod.route( "/some_resource", methods=['GET', 'POST'] )
def some_resource():
    # do some stuff here if you want

    @current_app.after_this_response
    def post_process():
        # this will occur after you finish processing the route & return (below):
        time.sleep(2)
        print("after_response")

    # do more stuff here if you like & then return like so:
    return "Success!
"

解决方案 6:

除了其他解决方案之外,您还可以通过结合 after_this_request 和 respond.call_on_close 来执行特定于路由的操作:

@app.route('/')
def index():
    # Do your pre-response work here
    msg = 'Hello World!'
    @flask.after_this_request
    def add_close_action(response):
        @response.call_on_close
        def process_after_request():
            # Do your post-response work here
            time.sleep(3.0)
            print('Delayed: ' + msg)
        return response
    return msg

解决方案 7:

感谢Matthew StoryPaul Brackin 的帮助,但我需要修改他们的建议。所以可行的解决方案是:

.
├── __init__.py
├── blueprint.py
└── library.py
# __init__.py
from flask import Flask
from .blueprint import bp
from .library import AfterResponse

app = Flask(__name__)

with app.app_context():
    app.register_blueprint(bp, url_prefix='/')
    AfterResponse(app)
# blueprint.py
from flask import Blueprint, request, current_app as app
from time import sleep

bp = Blueprint('app', __name__)


@bp.route('/')
def root():
    body = request.json

    @app.after_response
    def worker():
        print(body)
        sleep(5)
        print('finished_after_processing')

    print('returned')
    return 'finished_fast', 200
# library.py
from werkzeug.wsgi import ClosingIterator
from traceback import print_exc


class AfterResponse:
    def __init__(self, application=None):
        self.functions = list()
        if application:
            self.init_app(application)    

    def __call__(self, function):
        self.functions.append(function)

    def init_app(self, application):
        application.after_response = self
        application.wsgi_app = AfterResponseMiddleware(application.wsgi_app, self)

    def flush(self):
        while self.functions:
            try:
                self.functions.pop()()
            except Exception:
                print_exc()


class AfterResponseMiddleware:
    def __init__(self, application, after_response_ext):
        self.application = application
        self.after_response_ext = after_response_ext

    def __call__(self, environ, after_response):
        iterator = self.application(environ, after_response)
        try:
            return ClosingIterator(iterator, [self.after_response_ext.flush])
        except Exception:
            print_exc()
            return iterator

源代码可以在这里找到

解决方案 8:

该信号request_finished接收一个Response实例作为参数。任何后续处理都可以通过连接到该信号进行。

来自https://flask-doc.readthedocs.io/en/latest/signals.html

def log_response(sender, response, **extra):
    sender.logger.debug('Request context is about to close down.  '
                        'Response: %s', response)

from flask import request_finished
request_finished.connect(log_response, app)

got_request_exception注意:如果出现错误,可以使用信号代替。

解决方案 9:

读了很多主题之后,我找到了适合我的解决方案,如果使用 Blueprint,它适用于 Python 3.8 和 SQLAlchemy。

初始化.py

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
import dir
import time
from flask_mail import Mail
from flask_cors import CORS
import flask_excel as excel
    # init SQLAlchemy so we can use it later in our models
dbb = SQLAlchemy()
def create_app():
     app = Flask(__name__)
     from .bp_route_1 import auth as bp_route_1_blueprint
     app.register_blueprint(bp_route_1_blueprint)
     CORS(app)
     return app

bp_route_1.py

from flask import Blueprint, request, redirect, Response, url_for, abort, flash, render_template, \n    copy_current_request_context
from . import dbb 
from .models import #Import Models
from threading import Thread
bp_route_1 = Blueprint('bp_route_1', __name__)

@bp_route_1.route('/wehooks', methods=['POST'])
def route_1_wehooks_post():
    @copy_current_request_context #to copy request
    def foo_main():
        # insert your code here
        do_long_time_webhook(request)
    Thread(target=foo_main).start()
    print("do Webhook by Thread")
    return Response(status=200)
def do_long_time_webhook(request):
    try:
        data = request.get_data()
        print(data)
        #do long tim function for webhook data

    except Exception as e:
        print('Dont do webhook', e)

解决方案 10:

您可以使用此代码,我已经尝试过了。它有效。

这段代码将在3秒后打印字符串“message”。从计划时间开始。您可以根据需要自行更改时间。

import time, traceback
import threading

def every(delay,message, task):
  next_time = time.time() + delay
  time.sleep(max(0, next_time - time.time()))
  task(message)

def foo(message):
  print(message+"  :foo", time.time())



def main(message):
    threading.Thread(target=lambda: every(3,message, foo)).start()

main("message")
相关推荐
  政府信创国产化的10大政策解读一、信创国产化的背景与意义信创国产化,即信息技术应用创新国产化,是当前中国信息技术领域的一个重要发展方向。其核心在于通过自主研发和创新,实现信息技术应用的自主可控,减少对外部技术的依赖,并规避潜在的技术制裁和风险。随着全球信息技术竞争的加剧,以及某些国家对中国在科技领域的打压,信创国产化显...
工程项目管理   2482  
  为什么项目管理通常仍然耗时且低效?您是否还在反复更新电子表格、淹没在便利贴中并参加每周更新会议?这确实是耗费时间和精力。借助软件工具的帮助,您可以一目了然地全面了解您的项目。如今,国内外有足够多优秀的项目管理软件可以帮助您掌控每个项目。什么是项目管理软件?项目管理软件是广泛行业用于项目规划、资源分配和调度的软件。它使项...
项目管理软件   1533  
  PLM(产品生命周期管理)项目对于企业优化产品研发流程、提升产品质量以及增强市场竞争力具有至关重要的意义。然而,在项目推进过程中,范围蔓延是一个常见且棘手的问题,它可能导致项目进度延迟、成本超支以及质量下降等一系列不良后果。因此,有效避免PLM项目范围蔓延成为项目成功的关键因素之一。以下将详细阐述三大管控策略,助力企业...
plm系统   0  
  PLM(产品生命周期管理)项目管理在企业产品研发与管理过程中扮演着至关重要的角色。随着市场竞争的加剧和产品复杂度的提升,PLM项目面临着诸多风险。准确量化风险优先级并采取有效措施应对,是确保项目成功的关键。五维评估矩阵作为一种有效的风险评估工具,能帮助项目管理者全面、系统地评估风险,为决策提供有力支持。五维评估矩阵概述...
免费plm软件   0  
  引言PLM(产品生命周期管理)开发流程对于企业产品的全生命周期管控至关重要。它涵盖了从产品概念设计到退役的各个阶段,直接影响着产品质量、开发周期以及企业的市场竞争力。在当今快速发展的科技环境下,客户对产品质量的要求日益提高,市场竞争也愈发激烈,这就使得优化PLM开发流程成为企业的必然选择。缺陷管理工具和六西格玛方法作为...
plm产品全生命周期管理   0  
热门文章
项目管理软件有哪些?
曾咪二维码

扫码咨询,免费领取项目管理大礼包!

云禅道AD
禅道项目管理软件

云端的项目管理软件

尊享禅道项目软件收费版功能

无需维护,随时随地协同办公

内置subversion和git源码管理

每天备份,随时转为私有部署

免费试用