App Runner和Task Pipeline中的数据库连接管理指南

由于 App Runner 中存在一些需要长时间执行的任务,如大语言模型(LLM)生成和外部请求,Flask-Sqlalchemy 的数据库连接池策略是每个请求分配一个连接(事务)。这种方式会导致在执行非数据库任务时,连接仍然被占用,进而在高并发请求时,由于多个长时间运行的任务,无法获取新的连接。

因此,App Runner 和任务管道中的数据库操作必须确保在使用后立即关闭连接,建议传递 ID 而不是 Model 对象,以避免分离(deattach)错误。

一.Flask-SQLAlchemy数据库连接池策略

Flask-SQLAlchemy 的默认行为是每个请求分配一个数据库连接,连接在请求期间保持打开状态,并在请求结束时关闭。这个设计在大多数情况下是合理的,因为 HTTP 请求通常是短时间的操作。然而,当涉及到长时间运行的任务(例如 LLM 生成或外部 API 请求)时,如果数据库连接一直保持打开状态,这就会导致资源浪费,尤其是在高并发的情况下,可能会耗尽连接池,进而引发连接不可用的问题。

1.每个请求分配一个连接

每个请求到来时,Flask-SQLAlchemy 会从连接池中分配一个数据库连接。这意味着当 Flask 处理一个 HTTP 请求时,一个连接会被分配给该请求,直到请求处理完成才会释放。

2.连接在长时间任务中被占用

如果在一个请求中,有一部分任务是非数据库相关的(如大语言模型生成、等待外部 API 响应等),数据库连接会在整个任务执行期间都被占用,尽管这些任务并不需要使用数据库。

3.高并发下连接耗尽

在高并发情况下,如果多个请求同时执行长时间任务,数据库连接池的连接可能会耗尽,导致新的请求无法获取连接,最终出现数据库连接失败的情况。

二.创建新记录

1.默认行为

假设有一个 HTTP 请求需要创建一个新记录并执行一个长时间运行的任务:

@app.route('/create', methods=['POST'])
defcreate_app():
    app = App(id=1)
    db.session.add(app)
    db.session.commit()
    db.session.refresh(app)  # 获取创建时数据库的默认值

# 模拟一个长时间任务,例如 LLM 生成或外部 API 请求
    time.sleep(30)

# 完成后返回结果
return app.id

在这个代码中:

  • 数据库连接在请求开始时被分配,插入了一条新记录,并提交到数据库。

  • 然后进入一个长时间运行的任务,使用 time.sleep(30) 来模拟实际场景中的等待。

  • 即便在这段时间内不需要数据库操作,数据库连接仍然保持打开状态,直到整个请求处理结束。

问题: 如果同时有多个类似请求(例如高并发下多个用户发起类似操作),那么数据库连接池中的连接会被迅速耗尽,因为每个请求的连接在长时间任务期间无法释放,新的请求无法获取连接。

2.解决方案

为了避免这种情况,应该在完成数据库操作后立即关闭连接,将长时间运行的任务与数据库操作隔离开。这可以通过显式关闭 db.session 来实现:

@app.route('/create', methods=['POST'])
defcreate_app():
# 创建新记录并提交
    app = App(id=1)
    db.session.add(app)
    db.session.commit()
    db.session.refresh(app)  # 获取数据库中的默认值
    db.session.close()  # 立即关闭连接

# 模拟一个长时间任务,例如 LLM 生成或外部 API 请求
    time.sleep(30)

# 完成后返回结果
return app.id

在这个代码中:

  • 数据库的操作完成后立即调用 db.session.close(),释放连接。

  • 后续的长时间任务在连接已经关闭的情况下执行,不会占用数据库连接资源。

  • 即使在高并发情况下,也不会因为长时间任务而导致连接池耗尽。

二.从表中获取记录

1.默认行为

对于查询操作,问题类似。假设要从数据库中查询记录,然后执行一个长时间的任务:

@app.route('/fetch', methods=['GET'])
deffetch_app():
    app = db.session.query(App).filter(App.id == 1).first()

# 在查询到记录后进行长时间任务
    time.sleep(30)

return app.created_at

在这个例子中,数据库连接在 db.session.query 之后仍然保持打开,直到请求结束。

2.解决方案

可在完成查询操作后立即关闭连接,再处理长时间任务:

@app.route('/fetch', methods=['GET'])
deffetch_app():
    app = db.session.query(App).filter(App.id == 1).first()
    created_at = app.created_at
    db.session.close()  # 立即关闭连接

# 处理长时间任务
    time.sleep(30)

return created_at

通过这样做,数据库连接仅在需要的时候被占用,其他长时间任务不会占用连接池资源,确保高并发时请求能顺利执行。

三.更新表字段

1.默认行为

以下是一个常见的更新表字段的操作,连接池可能会在不必要的时间段被占用:

@app.route('/update', methods=['POST'])
defupdate_app():
# 根据 id 获取记录
    app = db.session.query(App).filter(App.id == 1).first()

# 更新字段
    app.updated_at = time.utcnow()
    db.session.commit()  # 提交更新

# 模拟长时间任务
    time.sleep(30)

return app.id

在这个例子中:

  • 首先从数据库中查询出一个 App 记录。

  • 然后更新 updated_at 字段,并提交更新操作。

  • 之后再执行一个长时间的任务(用 time.sleep(30) 来模拟)。

问题: 数据库连接在整个请求期间被占用,直到请求结束时才释放。尤其是长时间任务的存在,可能导致其他请求无法及时获取连接,出现连接池耗尽的问题。

2.解决方案

通过在更新数据库字段后立即关闭数据库连接,可以避免连接池资源被不必要地占用。

@app.route('/update', methods=['POST'])
defupdate_app():
# 根据 id 获取记录
    app = db.session.query(App).filter(App.id == 1).first()

# 更新字段
    app.updated_at = time.utcnow()
    db.session.commit()  # 提交更新

# 在更新后立即关闭数据库连接
    db.session.close()

# 模拟长时间任务
    time.sleep(30)

return app.id

在这个改进的代码中:

  • 在更新 updated_at 字段后,立即调用 db.session.close() 关闭数据库连接。

  • 后续的长时间任务与数据库连接无关,因此不会占用连接池资源。

四.注意事项

Flask-SQLAlchemy 的默认行为会在长时间任务中保留数据库连接,从而导致连接池资源紧张。通过在数据库操作后立即关闭连接,能够避免这种问题,确保在高并发请求时,数据库连接池的资源被合理使用。通过这种方式,可以确保数据库连接池的资源得到合理利用,在高并发的情况下依然能够正常处理请求。

1.及时关闭连接

数据库操作(包括更新、插入和查询)结束后,尽量及时关闭连接,以便连接池中的连接能被其他请求使用。

2.长时间任务处理

长时间任务(如外部 API 请求、文件处理等)最好在数据库操作完成并关闭连接后再进行,以避免不必要的资源占用。

参考文献

[1] dify-0.7.2\api\core\app\apps\README.md  

[2] App Runner和Task Pipeline中的数据库连接管理指南:https://z0yrmerhgi8.feishu.cn/wiki/AUNDwcmuKihmRjk9O1YcZrAdnZy



知识星球服务内容:Dify源码剖析及答疑,Dify对话系统源码,NLP电子书籍报告下载,公众号所有付费资料。加微信buxingtianxia21进NLP工程化资料群

(文:NLP工程化)

发表评论

×

下载每时AI手机APP

 

和大家一起交流AI最新资讯!

立即前往