Flask 返回响应后执行函数
- 2025-04-16 08:56:00
- admin 原创
- 21
问题描述:
我有一些代码需要在 Flask 返回响应后执行。我认为像 Celery 这样的任务队列不够复杂。关键要求是 Flask 必须在运行此函数之前将响应返回给客户端。它不能等待函数执行完毕。
关于这一点存在一些疑问,但似乎没有一个答案解决在响应发送给客户端后运行任务的问题,它们仍然同步执行,然后返回响应。
Python Flask 立即发送响应
在Flask中返回响应后需要执行一个函数
Flask 结束响应并继续处理
解决方案 1:
长话短说,Flask 没有提供任何特殊功能来实现这一点。对于简单的一次性任务,可以考虑使用如下所示的 Python 多线程。对于更复杂的配置,可以使用 RQ 或 Celery 等任务队列。
为什么?
理解 Flask 提供的功能以及它们为何无法实现预期目标至关重要。这些内容在其他情况下很有用,也值得一读,但对后台任务却没有帮助。
Flask 的after_request
处理程序
Flask 的after_request
处理程序(详见延迟请求回调模式和每个请求附加不同函数的代码片段)会将请求传递给回调函数。其预期用例是修改请求,例如附加 Cookie。
因此,请求将等待这些处理程序完成执行,因为预期请求本身将因此而改变。
Flask 的teardown_request
处理程序
这类似于after_request
,但teardown_request
不接收request
对象。这意味着它不会等待请求,对吗?
这看起来像是一个解决方案,正如Stack Overflow 上一个类似问题的答案所暗示的那样。而且,由于 Flask 的文档解释说,拆卸回调与实际请求无关,并且不接收请求上下文,因此您有充分的理由相信这一点。
遗憾的是,teardown_request
它仍然是同步的,只是在 Flask 请求处理的后期,当请求不再可修改时才会发生。Flask仍将等待拆卸函数完成后再返回响应,正如Flask 回调和错误列表所示。
Flask 的流式响应
Flask 可以通过将生成器传递给来流式传输响应Response()
,正如Stack Overflow 对类似问题的回答所建议的那样。
使用流式传输时,客户端确实会在请求结束之前开始接收响应。但是,请求仍然是同步运行的,因此处理请求的工作线程会一直处于忙碌状态,直到流式传输完成为止。
这种用于流式传输的 Flask 模式包含一些有关使用的文档stream_with_context()
,其中包含请求上下文是必要的。
那么解决方案是什么?
Flask 不提供在后台运行函数的解决方案,因为这不是 Flask 的责任。
大多数情况下,解决这个问题的最佳方法是使用任务队列,例如 RQ 或 Celery。它们可以帮你管理一些棘手的事情,例如配置、调度和分配工作线程。这是这类问题最常见的答案,因为它是最正确的,并且能够迫使你以正确考虑上下文等的方式进行设置。
如果您需要在后台运行某个函数,但又不想设置队列来管理它,那么您可以使用 Python 的内置函数threading
或multiprocessing
生成后台工作程序。
您无法request
在后台任务中访问 Flask 的线程本地变量或其他变量,因为请求不会在后台任务中激活。相反,您需要在创建视图时将所需的数据传递到后台线程。
@app.route('/start_task')
def start_task():
def do_work(value):
# do something that takes a long time
import time
time.sleep(value)
thread = Thread(target=do_work, kwargs={'value': request.args.get('value', 20)})
thread.start()
return 'started'
解决方案 2:
Flask 是一个WSGI 应用,因此它本质上无法处理响应之后的任何内容。这就是为什么不存在这样的处理程序的原因,WSGI 应用本身只负责构造发送给 WSGI 服务器的响应迭代器对象。
然而, WSGI 服务器(如gunicorn)可以非常轻松地提供此功能,但出于多种原因,将应用程序绑定到服务器是一个非常糟糕的主意。
正因如此,WSGI 提供了Middleware的规范,而 Werkzeug 也提供了许多辅助类来简化常用的 Middleware 功能。其中就有一个ClosingIterator类,它允许你将方法挂接到close
响应迭代器的方法上,该迭代器会在请求关闭后执行。
下面是一个after_response
作为 Flask 扩展完成的简单实现的示例:
import traceback
from werkzeug.wsgi import ClosingIterator
class AfterResponse:
def __init__(self, app=None):
self.callbacks = []
if app:
self.init_app(app)
def __call__(self, callback):
self.callbacks.append(callback)
return callback
def init_app(self, app):
# install extension
app.after_response = self
# install middleware
app.wsgi_app = AfterResponseMiddleware(app.wsgi_app, self)
def flush(self):
for fn in self.callbacks:
try:
fn()
except Exception:
traceback.print_exc()
class AfterResponseMiddleware:
def __init__(self, application, after_response_ext):
self.application = application
self.after_response_ext = after_response_ext
def __call__(self, environ, after_response):
iterator = self.application(environ, after_response)
try:
return ClosingIterator(iterator, [self.after_response_ext.flush])
except Exception:
traceback.print_exc()
return iterator
您可以像这样使用此扩展:
import flask
app = flask.Flask("after_response")
AfterResponse(app)
@app.after_response
def say_hi():
print("hi")
@app.route("/")
def home():
return "Success!
"
当您卷曲“/”时,您会在日志中看到以下内容:
127.0.0.1 - - [24/Jun/2018 19:30:48] "GET / HTTP/1.1" 200 -
hi
这简单地解决了该问题,无需引入线程(GIL??)或安装和管理任务队列和客户端软件。
解决方案 3:
Flask 现在通过 Werkzeug 支持call_on_close
在响应对象上使用回调装饰器。使用方法如下:
@app.after_request
def response_processor(response):
# Prepare all the local variables you need since the request context
# will be gone in the callback function
@response.call_on_close
def process_after_request():
# Do whatever is necessary here
pass
return response
优点:
call_on_close
使用 WSGI 规范的方法设置在响应返回后调用的函数close
。无线程,无后台任务,无复杂设置。它在同一个线程中运行,不会阻塞请求返回。
缺点:
没有请求上下文或应用上下文。你必须保存所需的变量,以便将其传递给闭包。
没有本地堆栈,因为所有东西都将被拆除。如果需要,您必须创建自己的应用上下文。
如果您尝试写入数据库,Flask-SQLAlchemy 将会静默失败(我还没弄清楚原因,但可能是由于上下文关闭了)。session.add
(它可以工作,但如果您有一个现有对象,则必须使用或将其添加到新会话中session.merge
;这不算缺点!)
解决方案 4:
有 3 种方法可以实现这一点,均有效:
线程
@app.route('/inner')
def foo():
for i in range(10):
sleep(1)
print(i)
return
@app.route('/inner', methods=['POST'])
def run_jobs():
try:
thread = Thread(target=foo)
thread.start()
return render_template("index_inner.html", img_path=DIR_OF_PHOTOS, video_path=UPLOAD_VIDEOS_FOLDER)
AfterResponse 装饰器
app = Flask(__name__)
AfterResponse(app)
@app.route('/inner', methods=['POST'])
def save_data():
pass
@app.after_response
def foo():
for i in range(10):
sleep(1)
print(i)
return
call_on_close
from time import sleep
from flask import Flask, Response, request
app = Flask('hello')
@app.route('/')
def hello():
response = Response('hello')
@response.call_on_close
def on_close():
for i in range(10):
sleep(1)
print(i)
return response
if __name__ == '__main__':
app.run()
解决方案 5:
Flask Blueprints 的中间件解决方案
这和 Matthew Story 提出的解决方案相同(在我看来,这是完美的解决方案 - 感谢 Matthew),并针对 Flask Blueprints 进行了调整。这里的秘诀是使用 current_app 代理获取应用上下文。更多信息请阅读此处(http://flask.pocoo.org/docs/1.0/appcontext/ )
假设 AfterThisResponse 和 AfterThisResponseMiddleware 类位于 .utils.after_this_response.py 模块中
然后,在 Flask 对象创建发生的地方,您可能会有,例如......
__init__.py
from api.routes import my_blueprint
from .utils.after_this_response import AfterThisResponse
app = Flask( __name__ )
AfterThisResponse( app )
app.register_blueprint( my_blueprint.mod )
然后在你的蓝图模块中...
蓝图.py
from flask import Blueprint, current_app
mod = Blueprint( 'a_blueprint', __name__, url_prefix=URL_PREFIX )
@mod.route( "/some_resource", methods=['GET', 'POST'] )
def some_resource():
# do some stuff here if you want
@current_app.after_this_response
def post_process():
# this will occur after you finish processing the route & return (below):
time.sleep(2)
print("after_response")
# do more stuff here if you like & then return like so:
return "Success!
"
解决方案 6:
除了其他解决方案之外,您还可以通过结合 after_this_request 和 respond.call_on_close 来执行特定于路由的操作:
@app.route('/')
def index():
# Do your pre-response work here
msg = 'Hello World!'
@flask.after_this_request
def add_close_action(response):
@response.call_on_close
def process_after_request():
# Do your post-response work here
time.sleep(3.0)
print('Delayed: ' + msg)
return response
return msg
解决方案 7:
感谢Matthew Story和Paul Brackin 的帮助,但我需要修改他们的建议。所以可行的解决方案是:
.
├── __init__.py
├── blueprint.py
└── library.py
# __init__.py
from flask import Flask
from .blueprint import bp
from .library import AfterResponse
app = Flask(__name__)
with app.app_context():
app.register_blueprint(bp, url_prefix='/')
AfterResponse(app)
# blueprint.py
from flask import Blueprint, request, current_app as app
from time import sleep
bp = Blueprint('app', __name__)
@bp.route('/')
def root():
body = request.json
@app.after_response
def worker():
print(body)
sleep(5)
print('finished_after_processing')
print('returned')
return 'finished_fast', 200
# library.py
from werkzeug.wsgi import ClosingIterator
from traceback import print_exc
class AfterResponse:
def __init__(self, application=None):
self.functions = list()
if application:
self.init_app(application)
def __call__(self, function):
self.functions.append(function)
def init_app(self, application):
application.after_response = self
application.wsgi_app = AfterResponseMiddleware(application.wsgi_app, self)
def flush(self):
while self.functions:
try:
self.functions.pop()()
except Exception:
print_exc()
class AfterResponseMiddleware:
def __init__(self, application, after_response_ext):
self.application = application
self.after_response_ext = after_response_ext
def __call__(self, environ, after_response):
iterator = self.application(environ, after_response)
try:
return ClosingIterator(iterator, [self.after_response_ext.flush])
except Exception:
print_exc()
return iterator
源代码可以在这里找到
解决方案 8:
该信号request_finished
接收一个Response
实例作为参数。任何后续处理都可以通过连接到该信号进行。
来自https://flask-doc.readthedocs.io/en/latest/signals.html:
def log_response(sender, response, **extra):
sender.logger.debug('Request context is about to close down. '
'Response: %s', response)
from flask import request_finished
request_finished.connect(log_response, app)
got_request_exception
注意:如果出现错误,可以使用信号代替。
解决方案 9:
读了很多主题之后,我找到了适合我的解决方案,如果使用 Blueprint,它适用于 Python 3.8 和 SQLAlchemy。
初始化.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
import dir
import time
from flask_mail import Mail
from flask_cors import CORS
import flask_excel as excel
# init SQLAlchemy so we can use it later in our models
dbb = SQLAlchemy()
def create_app():
app = Flask(__name__)
from .bp_route_1 import auth as bp_route_1_blueprint
app.register_blueprint(bp_route_1_blueprint)
CORS(app)
return app
bp_route_1.py
from flask import Blueprint, request, redirect, Response, url_for, abort, flash, render_template, \n copy_current_request_context
from . import dbb
from .models import #Import Models
from threading import Thread
bp_route_1 = Blueprint('bp_route_1', __name__)
@bp_route_1.route('/wehooks', methods=['POST'])
def route_1_wehooks_post():
@copy_current_request_context #to copy request
def foo_main():
# insert your code here
do_long_time_webhook(request)
Thread(target=foo_main).start()
print("do Webhook by Thread")
return Response(status=200)
def do_long_time_webhook(request):
try:
data = request.get_data()
print(data)
#do long tim function for webhook data
except Exception as e:
print('Dont do webhook', e)
解决方案 10:
您可以使用此代码,我已经尝试过了。它有效。
这段代码将在3秒后打印字符串“message”。从计划时间开始。您可以根据需要自行更改时间。
import time, traceback
import threading
def every(delay,message, task):
next_time = time.time() + delay
time.sleep(max(0, next_time - time.time()))
task(message)
def foo(message):
print(message+" :foo", time.time())
def main(message):
threading.Thread(target=lambda: every(3,message, foo)).start()
main("message")
扫码咨询,免费领取项目管理大礼包!