commands.py中的函数解析3:fix_app_site_missing等

本文使用Dify v1.4.0版本,主要解析了commands.py中的fix_app_site_missingmigrate_data_for_plugin等函数的执行逻辑。源码位置:dify\api\commands.py

一.fix_app_site_missing()函数

在 api/commands.py 文件中,fix-app-site-missing 命令的完整执行示例如下:

python -m flask fix-app-site-missing

如果使用的是 Flask CLI,可以这样:

flask fix-app-site-missing

该函数用于批量修复缺失site的app,通过事件触发 site 重新创建,异常时记录失败并继续,直到全部处理完毕。

@click.command("fix-app-site-missing", help="Fix app related site missing issue.")
deffix_app_site_missing():
"""
    Fix app related site missing issue.
    """

    click.echo(click.style("Starting fix for missing app-related sites.", fg="green"))

    failed_app_ids = []
whileTrue:
        sql = """select apps.id as id from apps left join sites on sites.app_id=apps.id
where sites.id is null limit 1000"""

with db.engine.begin() as conn:
            rs = conn.execute(db.text(sql))

            processed_count = 0
for i in rs:
                processed_count += 1
                app_id = str(i.id)

if app_id in failed_app_ids:
continue

try:
                    app = db.session.query(App).filter(App.id == app_id).first()
ifnot app:
                        print(f"App {app_id} not found")
continue

                    tenant = app.tenant
if tenant:
                        accounts = tenant.get_accounts()
ifnot accounts:
                            print("Fix failed for app {}".format(app.id))
continue

                        account = accounts[0]
                        print("Fixing missing site for app {}".format(app.id))
                        app_was_created.send(app, account=account)
except Exception:
                    failed_app_ids.append(app_id)
                    click.echo(click.style("Failed to fix missing site for app {}".format(app_id), fg="red"))
                    logging.exception(f"Failed to fix app related site missing issue, app_id: {app_id}")
continue

ifnot processed_count:
break

    click.echo(click.style("Fix for missing app-related sites completed successfully!", fg="green"))

1.命令注册与函数定义

@click.command("fix-app-site-missing", help="Fix app related site missing issue.")
deffix_app_site_missing():

通过 click 注册命令行命令 fix-app-site-missing,定义修复函数。

2.开始提示

click.echo(click.style("Starting fix for missing app-related sites.", fg="green"))

输出开始修复的提示信息。

3.初始化失败 app 列表

failed_app_ids = []

4.循环处理所有缺失 site 的 app

whileTrue:
    sql = """select apps.id as id from apps left join sites on sites.app_id=apps.id
where sites.id is null limit 1000"""

with db.engine.begin() as conn:
        rs = conn.execute(db.text(sql))

        processed_count = 0
for i in rs:
            processed_count += 1
            app_id = str(i.id)

if app_id in failed_app_ids:
continue

try:
                app = db.session.query(App).filter(App.id == app_id).first()
ifnot app:
                    print(f"App {app_id} not found")
continue

                tenant = app.tenant
if tenant:
                    accounts = tenant.get_accounts()
ifnot accounts:
                        print("Fix failed for app {}".format(app.id))
continue

                    account = accounts[0]
                    print("Fixing missing site for app {}".format(app.id))
                    app_was_created.send(app, account=account)
except Exception:
                failed_app_ids.append(app_id)
                click.echo(click.style("Failed to fix missing site for app {}".format(app_id), fg="red"))
                logging.exception(f"Failed to fix app related site missing issue, app_id: {app_id}")
continue

ifnot processed_count:
break
  • 通过 SQL 查询找出没有 site 记录的 app(每次最多 1000 条)。

  • 遍历每个 app id,跳过已失败的。

  • 查询 app 实体,若不存在则跳过。

  • 获取 app 的 tenant,若 tenant 存在,获取其 accounts,若无账号则跳过。

  • 取第一个账号,调用 app_was_created.send(app, account=account) 触发 site 创建事件。

  • 若过程中有异常,记录失败 app id 并输出错误日志。

  • 若本轮没有处理任何 app,则跳出循环。

5.结束提示

click.echo(click.style("Fix for missing app-related sites completed successfully!", fg="green"))

输出修复完成提示。

注解:site创建事件,如下所示:

该代码监听 app 创建事件,在 app 被创建时自动生成对应的 site 记录,并将其保存到数据库中。具体流程为:当 app 创建事件触发时,获取 sender(即 app 实例)和 account 信息,若 account 存在,则根据 app 和 account 的属性创建 Site 实例,并提交到数据库。

二.migrate_data_for_plugin()函数

完整的执行命令示例,如下所示:

flask migrate-data-for-plugin

migrate_data_for_plugin 是一个 Click 命令行命令,用于迁移插件相关的数据。

@click.command("migrate-data-for-plugin", help="Migrate data for plugin.")
defmigrate_data_for_plugin():
"""
    Migrate data for plugin.
    """

    click.echo(click.style("Starting migrate data for plugin.", fg="white"))

    PluginDataMigration.migrate()

    click.echo(click.style("Migrate data for plugin completed.", fg="green"))

主要流程,如下所示:

  • 输出”开始迁移插件数据”的提示信息(白色字体)。

  • 调用 PluginDataMigration.migrate() 方法,执行实际的数据迁移逻辑。

  • 输出”插件数据迁移完成”的提示信息(绿色字体)。

该命令用于在命令行中触发插件数据的迁移操作,便于维护和升级插件相关的数据结构。

三.data_migration.py – migrate

migrate 类方法是 PluginDataMigration 的主入口,用于批量迁移数据库中与 provider 相关的字段。包括的数据表:providersprovider_modelsprovider_orderstenant_default_modelstenant_preferred_model_providersprovider_model_settingsload_balancing_model_configsdatasetsembeddingsdataset_collection_bindingstool_builtin_providers

@classmethod
def migrate(cls) -> None:
    cls.migrate_db_records("providers", "provider_name")  # large table
    cls.migrate_db_records("provider_models", "provider_name")
    cls.migrate_db_records("provider_orders", "provider_name")
    cls.migrate_db_records("tenant_default_models", "provider_name")
    cls.migrate_db_records("tenant_preferred_model_providers", "provider_name")
    cls.migrate_db_records("provider_model_settings", "provider_name")
    cls.migrate_db_records("load_balancing_model_configs", "provider_name")
    cls.migrate_datasets()
    cls.migrate_db_records("embeddings", "provider_name")  # large table
    cls.migrate_db_records("dataset_collection_bindings", "provider_name")
    cls.migrate_db_records("tool_builtin_providers", "provider")

其执行流程,如下所示:

  • 依次调用 migrate_db_records 方法,对多个表(如 providersprovider_modelsprovider_orders 等)中 provider 字段进行批量迁移。迁移逻辑是将 provider 字段值更新为 DEFAULT_PLUGIN_ID/provider_name/provider_name格式。

  • 对于 datasets 表,调用 migrate_datasets 方法,除 provider 字段外,还会处理 retrieval_model 字段中的嵌套 provider字段(如 reranking_provider_name),并做相应格式化。

  • 继续迁移 embeddingsdataset_collection_bindingstool_builtin_providers 等表的 provider 字段。

整体流程是:遍历所有需要迁移的表,批量查找未迁移的记录,更新 provider 字段为新格式,并输出迁移进度。特殊表(如 datasets)还会处理嵌套 JSON 字段。每步都包含异常处理和日志输出,确保迁移过程可追踪。

四.data_migration.py – migrate_datasets

该方法 migrate_datasets 主要用于迁移 datasets 表中的数据,将 embedding_model_provider 字段的值更新为带有插件前缀的格式,并对 retrieval_model 字段中的 reranking_provider_name 进行相应处理。

@classmethod
defmigrate_datasets(cls) -> None:
    table_name = "datasets"
    provider_column_name = "embedding_model_provider"

    click.echo(click.style(f"Migrating [{table_name}] data for plugin", fg="white"))

    processed_count = 0
    failed_ids = []
whileTrue:
        sql = f"""select id, {provider_column_name} as provider_name, retrieval_model from {table_name}
where {provider_column_name} not like '%/%' and {provider_column_name} is not null and {provider_column_name} != ''
limit 1000"""

with db.engine.begin() as conn:
            rs = conn.execute(db.text(sql))

            current_iter_count = 0
for i in rs:
                record_id = str(i.id)
                provider_name = str(i.provider_name)
                retrieval_model = i.retrieval_model
                print(type(retrieval_model))

if record_id in failed_ids:
continue

                retrieval_model_changed = False
if retrieval_model:
if (
"reranking_model"in retrieval_model
and"reranking_provider_name"in retrieval_model["reranking_model"]
and retrieval_model["reranking_model"]["reranking_provider_name"]
and"/"notin retrieval_model["reranking_model"]["reranking_provider_name"]
                    ):
                        click.echo(
                            click.style(
f"[{processed_count}] Migrating {table_name}{record_id} "
f"(reranking_provider_name: "
f"{retrieval_model['reranking_model']['reranking_provider_name']})",
                                fg="white",
                            )
                        )
                        retrieval_model["reranking_model"]["reranking_provider_name"] = (
f"{DEFAULT_PLUGIN_ID}/{retrieval_model['reranking_model']['reranking_provider_name']}/{retrieval_model['reranking_model']['reranking_provider_name']}"
                        )
                        retrieval_model_changed = True

                click.echo(
                    click.style(
f"[{processed_count}] Migrating [{table_name}{record_id} ({provider_name})",
                        fg="white",
                    )
                )

try:
# update provider name append with "langgenius/{provider_name}/{provider_name}"
                    params = {"record_id": record_id}
                    update_retrieval_model_sql = ""
if retrieval_model and retrieval_model_changed:
                        update_retrieval_model_sql = ", retrieval_model = :retrieval_model"
                        params["retrieval_model"] = json.dumps(retrieval_model)

                    sql = f"""update {table_name}
                    set {provider_column_name} =
                    concat('{DEFAULT_PLUGIN_ID}/', {provider_column_name}, '/', {provider_column_name})
{update_retrieval_model_sql}
                    where id = :record_id"""

                    conn.execute(db.text(sql), params)
                    click.echo(
                        click.style(
f"[{processed_count}] Migrated [{table_name}{record_id} ({provider_name})",
                            fg="green",
                        )
                    )
except Exception:
                    failed_ids.append(record_id)
                    click.echo(
                        click.style(
f"[{processed_count}] Failed to migrate [{table_name}{record_id} ({provider_name})",
                            fg="red",
                        )
                    )
                    logger.exception(
f"[{processed_count}] Failed to migrate [{table_name}{record_id} ({provider_name})"
                    )
continue

                current_iter_count += 1
                processed_count += 1

ifnot current_iter_count:
break

    click.echo(
        click.style(f"Migrate [{table_name}] data for plugin completed, total: {processed_count}", fg="green")
    )

1.初始化与准备

  • 设置表名、字段名。

  • 输出迁移开始信息。

  • 初始化计数器和失败列表。

table_name = "datasets"
provider_column_name = "embedding_model_provider"

click.echo(click.style(f"Migrating [{table_name}] data for plugin", fg="white"))

processed_count = 0
failed_ids = []

2.主循环:批量处理未迁移的数据

  • 构造 SQL 查询,查找 provider 字段未带 / 且不为空的记录,每次最多处理 1000 条。

  • 使用数据库连接执行查询。

whileTrue:
    sql = f"""select id, {provider_column_name} as provider_name, retrieval_model from {table_name}
where {provider_column_name} not like '%/%' and {provider_column_name} is not null and {provider_column_name} != ''
limit 1000"""

with db.engine.begin() as conn:
        rs = conn.execute(db.text(sql))

3.遍历结果集,处理每条记录

  • 取出 id、provider_name、retrieval_model。

  • 跳过已失败的 id。

  • 检查并处理 retrieval_model 字段中的 reranking_provider_name,如果需要则加前缀。

  • 输出迁移中信息。

current_iter_count = 0
for i in rs:
    record_id = str(i.id)
    provider_name = str(i.provider_name)
    retrieval_model = i.retrieval_model
    print(type(retrieval_model))

if record_id in failed_ids:
continue

    retrieval_model_changed = False
if retrieval_model:
if (
"reranking_model"in retrieval_model
and"reranking_provider_name"in retrieval_model["reranking_model"]
and retrieval_model["reranking_model"]["reranking_provider_name"]
and"/"notin retrieval_model["reranking_model"]["reranking_provider_name"]
        ):
            click.echo(
                click.style(
f"[{processed_count}] Migrating {table_name}{record_id} "
f"(reranking_provider_name: "
f"{retrieval_model['reranking_model']['reranking_provider_name']})",
                    fg="white",
                )
            )
            retrieval_model["reranking_model"]["reranking_provider_name"] = (
f"{DEFAULT_PLUGIN_ID}/{retrieval_model['reranking_model']['reranking_provider_name']}/{retrieval_model['reranking_model']['reranking_provider_name']}"
            )
            retrieval_model_changed = True

    click.echo(
        click.style(
f"[{processed_count}] Migrating [{table_name}{record_id} ({provider_name})",
            fg="white",
        )
    )

4.执行更新操作

  • 构造更新 SQL,将 provider 字段加前缀,必要时更新 retrieval_model 字段。

  • 执行更新,输出成功或失败信息,记录失败 id 并打印异常。

try:
# update provider name append with "langgenius/{provider_name}/{provider_name}"
    params = {"record_id": record_id}
    update_retrieval_model_sql = ""
if retrieval_model and retrieval_model_changed:
        update_retrieval_model_sql = ", retrieval_model = :retrieval_model"
        params["retrieval_model"] = json.dumps(retrieval_model)

    sql = f"""update {table_name}
    set {provider_column_name} =
    concat('{DEFAULT_PLUGIN_ID}/', {provider_column_name}, '/', {provider_column_name})
{update_retrieval_model_sql}
    where id = :record_id"""

    conn.execute(db.text(sql), params)
    click.echo(
        click.style(
f"[{processed_count}] Migrated [{table_name}{record_id} ({provider_name})",
            fg="green",
        )
    )
except Exception:
    failed_ids.append(record_id)
    click.echo(
        click.style(
f"[{processed_count}] Failed to migrate [{table_name}{record_id} ({provider_name})",
            fg="red",
        )
    )
    logger.exception(
f"[{processed_count}] Failed to migrate [{table_name}{record_id} ({provider_name})"
    )
continue

current_iter_count += 1
processed_count += 1

5.判断是否还有未处理的数据

如果本轮没有处理任何数据,则跳出循环。

ifnot current_iter_count:
break

6.输出迁移完成信息

click.echo(
    click.style(f"Migrate [{table_name}] data for plugin completed, total: {processed_count}", fg="green")
)

整体流程就是:循环批量查询未迁移的数据,处理每条记录的 provider 字段和 retrieval_model 字段,更新数据库,记录失败项,直到全部迁移完成。

五.data_migration.py – migrate_db_records

该方法 migrate_db_records 主要用于批量迁移数据库表中 provider 字段的数据格式。整体流程就是分页查询、批量处理、批量更新、异常处理和进度输出,直到所有数据迁移完成。

@classmethod
defmigrate_db_records(cls, table_name: str, provider_column_name: str) -> None:
    click.echo(click.style(f"Migrating [{table_name}] data for plugin", fg="white"))

    processed_count = 0
    failed_ids = []
    last_id = "00000000-0000-0000-0000-000000000000"

whileTrue:
        sql = f"""
            SELECT id, {provider_column_name} AS provider_name
            FROM {table_name}
            WHERE {provider_column_name} NOT LIKE '%/%'
                AND {provider_column_name} IS NOT NULL
                AND {provider_column_name} != ''
                AND id > :last_id
            ORDER BY id ASC
            LIMIT 5000
        """

        params = {"last_id": last_id or""}

with db.engine.begin() as conn:
            rs = conn.execute(db.text(sql), params)

            current_iter_count = 0
            batch_updates = []

for i in rs:
                current_iter_count += 1
                processed_count += 1
                record_id = str(i.id)
                last_id = record_id
                provider_name = str(i.provider_name)

if record_id in failed_ids:
continue

                click.echo(
                    click.style(
f"[{processed_count}] Migrating [{table_name}{record_id} ({provider_name})",
                        fg="white",
                    )
                )

try:
                    updated_value = f"{DEFAULT_PLUGIN_ID}/{provider_name}/{provider_name}"
                    batch_updates.append((updated_value, record_id))
except Exception as e:
                    failed_ids.append(record_id)
                    click.echo(
                        click.style(
f"[{processed_count}] Failed to migrate [{table_name}{record_id} ({provider_name})",
                            fg="red",
                        )
                    )
                    logger.exception(
f"[{processed_count}] Failed to migrate [{table_name}{record_id} ({provider_name})"
                    )
continue

if batch_updates:
                update_sql = f"""
                    UPDATE {table_name}
                    SET {provider_column_name} = :updated_value
                    WHERE id = :record_id
                """

                conn.execute(db.text(update_sql), [{"updated_value": u, "record_id": r} for u, r in batch_updates])
                click.echo(
                    click.style(
f"[{processed_count}] Batch migrated [{len(batch_updates)}] records from [{table_name}]",
                        fg="green",
                    )
                )

ifnot current_iter_count:
break

    click.echo(
        click.style(f"Migrate [{table_name}] data for plugin completed, total: {processed_count}", fg="green")
    )

1.初始化与准备

  • 输出迁移开始信息。

  • 初始化计数器、失败ID列表、last_id(用于分页)。

@click.echo(click.style(f"Migrating [{table_name}] data for plugin", fg="white"))

processed_count = 0
failed_ids = []
last_id = "00000000-0000-0000-0000-000000000000"

2.循环分页处理

  • 构造 SQL 查询,查找 provider 字段未迁移(不含 /)的记录,按 id 升序分页(每次 5000 条)。

  • 用 last_id 控制分页,避免遗漏或重复。

whileTrue:
    sql = f"""
        SELECT id, {provider_column_name} AS provider_name
        FROM {table_name}
        WHERE {provider_column_name} NOT LIKE '%/%'
            AND {provider_column_name} IS NOT NULL
            AND {provider_column_name} != ''
            AND id > :last_id
        ORDER BY id ASC
        LIMIT 5000
    """

    params = {"last_id": last_id or""}

3.处理每一批数据

with db.engine.begin() as conn:
    rs = conn.execute(db.text(sql), params)

    current_iter_count = 0
    batch_updates = []

for i in rs:
        current_iter_count += 1
        processed_count += 1
        record_id = str(i.id)
        last_id = record_id
        provider_name = str(i.provider_name)

if record_id in failed_ids:
continue

        click.echo(
            click.style(
f"[{processed_count}] Migrating [{table_name}{record_id} ({provider_name})",
                fg="white",
            )
        )

try:
            updated_value = f"{DEFAULT_PLUGIN_ID}/{provider_name}/{provider_name}"
            batch_updates.append((updated_value, record_id))
except Exception as e:
            failed_ids.append(record_id)
            click.echo(
                click.style(
f"[{processed_count}] Failed to migrate [{table_name}{record_id} ({provider_name})",
                    fg="red",
                )
            )
            logger.exception(
f"[{processed_count}] Failed to migrate [{table_name}{record_id} ({provider_name})"
            )
continue

遍历查询结果过程,如下所示:

  • 计数,更新 last_id。

  • 跳过失败过的记录。

  • 输出迁移进度。

  • 构造迁移后的 provider 字段值(格式为 DEFAULT_PLUGIN_ID/provider_name/provider_name)。

  • 加入批量更新列表。

  • 异常时记录失败ID并输出错误日志。

4.批量更新

  • 如果有需要更新的记录,批量执行 UPDATE 语句。

  • 输出批量迁移成功信息。

if batch_updates:
    update_sql = f"""
        UPDATE {table_name}
        SET {provider_column_name} = :updated_value
        WHERE id = :record_id
    """

    conn.execute(db.text(update_sql), [{"updated_value": u, "record_id": r} for u, r in batch_updates])
    click.echo(
        click.style(
f"[{processed_count}] Batch migrated [{len(batch_updates)}] records from [{table_name}]",
            fg="green",
        )
    )

5.终止条件

如果本批没有数据,跳出循环。

ifnot current_iter_count:
break

输出迁移完成信息。

6.结束输出

click.echo(
    click.style(f"Migrate [{table_name}] data for plugin completed, total: {processed_count}", fg="green")
)

参考文献

[0] commands.py中的函数解析3:fix_app_site_missing等:https://z0yrmerhgi8.feishu.cn/wiki/SOMfwSDdBiEf3CkukxackjQfnQd

[1] click github:https://github.com/pallets/click

[2] click官方文档:https://click.palletsprojects.com/en/stable/

[3] click-extra github:https://github.com/kdeldycke/click-extra

[4] click-extra官方文档:https://kdeldycke.github.io/click-extra/


知识星球服务内容:Dify源码剖析及答疑,Dify对话系统源码,NLP电子书籍报告下载,公众号所有付费资料。加微信buxingtianxia21进NLP工程化资料群

(文:NLP工程化)

发表评论