由于 App Runner 中存在一些需要长时间执行的任务,如大语言模型(LLM)生成和外部请求,Flask-Sqlalchemy 的数据库连接池策略是每个请求分配一个连接(事务)。这种方式会导致在执行非数据库任务时,连接仍然被占用,进而在高并发请求时,由于多个长时间运行的任务,无法获取新的连接。
因此,App Runner 和任务管道中的数据库操作必须确保在使用后立即关闭连接,建议传递 ID 而不是 Model 对象,以避免分离(deattach)错误。
一.Flask-SQLAlchemy数据库连接池策略
Flask-SQLAlchemy 的默认行为是每个请求分配一个数据库连接,连接在请求期间保持打开状态,并在请求结束时关闭。这个设计在大多数情况下是合理的,因为 HTTP 请求通常是短时间的操作。然而,当涉及到长时间运行的任务(例如 LLM 生成或外部 API 请求)时,如果数据库连接一直保持打开状态,这就会导致资源浪费,尤其是在高并发的情况下,可能会耗尽连接池,进而引发连接不可用的问题。
1.每个请求分配一个连接
每个请求到来时,Flask-SQLAlchemy 会从连接池中分配一个数据库连接。这意味着当 Flask 处理一个 HTTP 请求时,一个连接会被分配给该请求,直到请求处理完成才会释放。
2.连接在长时间任务中被占用
如果在一个请求中,有一部分任务是非数据库相关的(如大语言模型生成、等待外部 API 响应等),数据库连接会在整个任务执行期间都被占用,尽管这些任务并不需要使用数据库。
3.高并发下连接耗尽
在高并发情况下,如果多个请求同时执行长时间任务,数据库连接池的连接可能会耗尽,导致新的请求无法获取连接,最终出现数据库连接失败的情况。
二.创建新记录
1.默认行为
假设有一个 HTTP 请求需要创建一个新记录并执行一个长时间运行的任务:
@app.route('/create', methods=['POST'])
defcreate_app():
app = App(id=1)
db.session.add(app)
db.session.commit()
db.session.refresh(app) # 获取创建时数据库的默认值
# 模拟一个长时间任务,例如 LLM 生成或外部 API 请求
time.sleep(30)
# 完成后返回结果
return app.id
在这个代码中:
-
数据库连接在请求开始时被分配,插入了一条新记录,并提交到数据库。
-
然后进入一个长时间运行的任务,使用
time.sleep(30)
来模拟实际场景中的等待。 -
即便在这段时间内不需要数据库操作,数据库连接仍然保持打开状态,直到整个请求处理结束。
问题: 如果同时有多个类似请求(例如高并发下多个用户发起类似操作),那么数据库连接池中的连接会被迅速耗尽,因为每个请求的连接在长时间任务期间无法释放,新的请求无法获取连接。
2.解决方案
为了避免这种情况,应该在完成数据库操作后立即关闭连接,将长时间运行的任务与数据库操作隔离开。这可以通过显式关闭 db.session
来实现:
@app.route('/create', methods=['POST'])
defcreate_app():
# 创建新记录并提交
app = App(id=1)
db.session.add(app)
db.session.commit()
db.session.refresh(app) # 获取数据库中的默认值
db.session.close() # 立即关闭连接
# 模拟一个长时间任务,例如 LLM 生成或外部 API 请求
time.sleep(30)
# 完成后返回结果
return app.id
在这个代码中:
-
数据库的操作完成后立即调用
db.session.close()
,释放连接。 -
后续的长时间任务在连接已经关闭的情况下执行,不会占用数据库连接资源。
-
即使在高并发情况下,也不会因为长时间任务而导致连接池耗尽。
二.从表中获取记录
1.默认行为
对于查询操作,问题类似。假设要从数据库中查询记录,然后执行一个长时间的任务:
@app.route('/fetch', methods=['GET'])
deffetch_app():
app = db.session.query(App).filter(App.id == 1).first()
# 在查询到记录后进行长时间任务
time.sleep(30)
return app.created_at
在这个例子中,数据库连接在 db.session.query
之后仍然保持打开,直到请求结束。
2.解决方案
可在完成查询操作后立即关闭连接,再处理长时间任务:
@app.route('/fetch', methods=['GET'])
deffetch_app():
app = db.session.query(App).filter(App.id == 1).first()
created_at = app.created_at
db.session.close() # 立即关闭连接
# 处理长时间任务
time.sleep(30)
return created_at
通过这样做,数据库连接仅在需要的时候被占用,其他长时间任务不会占用连接池资源,确保高并发时请求能顺利执行。
三.更新表字段
1.默认行为
以下是一个常见的更新表字段的操作,连接池可能会在不必要的时间段被占用:
@app.route('/update', methods=['POST'])
defupdate_app():
# 根据 id 获取记录
app = db.session.query(App).filter(App.id == 1).first()
# 更新字段
app.updated_at = time.utcnow()
db.session.commit() # 提交更新
# 模拟长时间任务
time.sleep(30)
return app.id
在这个例子中:
-
首先从数据库中查询出一个
App
记录。 -
然后更新
updated_at
字段,并提交更新操作。 -
之后再执行一个长时间的任务(用
time.sleep(30)
来模拟)。
问题: 数据库连接在整个请求期间被占用,直到请求结束时才释放。尤其是长时间任务的存在,可能导致其他请求无法及时获取连接,出现连接池耗尽的问题。
2.解决方案
通过在更新数据库字段后立即关闭数据库连接,可以避免连接池资源被不必要地占用。
@app.route('/update', methods=['POST'])
defupdate_app():
# 根据 id 获取记录
app = db.session.query(App).filter(App.id == 1).first()
# 更新字段
app.updated_at = time.utcnow()
db.session.commit() # 提交更新
# 在更新后立即关闭数据库连接
db.session.close()
# 模拟长时间任务
time.sleep(30)
return app.id
在这个改进的代码中:
-
在更新
updated_at
字段后,立即调用db.session.close()
关闭数据库连接。 -
后续的长时间任务与数据库连接无关,因此不会占用连接池资源。
四.注意事项
Flask-SQLAlchemy 的默认行为会在长时间任务中保留数据库连接,从而导致连接池资源紧张。通过在数据库操作后立即关闭连接,能够避免这种问题,确保在高并发请求时,数据库连接池的资源被合理使用。通过这种方式,可以确保数据库连接池的资源得到合理利用,在高并发的情况下依然能够正常处理请求。
1.及时关闭连接
数据库操作(包括更新、插入和查询)结束后,尽量及时关闭连接,以便连接池中的连接能被其他请求使用。
2.长时间任务处理
长时间任务(如外部 API 请求、文件处理等)最好在数据库操作完成并关闭连接后再进行,以避免不必要的资源占用。
参考文献
[1] dify-0.7.2\api\core\app\apps\README.md
[2] App Runner和Task Pipeline中的数据库连接管理指南:https://z0yrmerhgi8.feishu.cn/wiki/AUNDwcmuKihmRjk9O1YcZrAdnZy
知识星球服务内容:Dify源码剖析及答疑,Dify对话系统源码,NLP电子书籍报告下载,公众号所有付费资料。加微信buxingtianxia21进NLP工程化资料群。
(文:NLP工程化)