Skip to content

示例 2:批量数据处理断点续跑

处理 N 条数据,每条独立调用 AI。中途崩了重跑时,从上次中断的位置继续,已处理项不重做。

适用场景

  • 批量翻译:100 篇文档逐篇翻译。
  • 批量摘要:长文章列表,每篇生成摘要。
  • 批量打标:用户评论列表,每条做情感分类。

工作流结构

Input: items (待处理列表), start_index (续跑位置, 默认 0)

Loop over items
   ├── If index < start_index    → 跳过(已处理过)
   └── Else                      → 处理当前项 + 持久化结果 + 更新 start_index

Final: 输出处理统计

关键代码

输入参数

markdown
- **items** (`list`, required) — 待处理的数据项 <!-- wn:input {"id":"items","name":"items","type":"list","required":true} /-->
- **start_index** (`number`) — 续跑起始索引(首次运行为 0) <!-- wn:input {"id":"start_index","name":"start_index","type":"number","required":false} /-->
- **session_id** (`text`, required) — 会话 ID,作为持久化键 <!-- wn:input {"id":"session_id","name":"session_id","type":"text","required":true} /-->

主循环

markdown
<!-- wn:loop {"id":"process","name":"process","loopType":"list","list":{"sourceType":"variable","sourceValue":["items","__default__"]}} -->

<!-- wn:if-else {"id":"check_skip","name":"check_skip"} -->
<!-- wn:if {"id":"already_done","name":"already_done","condition":{"first":{"sourceType":"variable","sourceValue":["process","index"]},"compare":"LessThan","second":{"sourceType":"variable","sourceValue":["start_index","__default__"]}}} -->
跳过第 @{process.index} 项:已处理过
<!-- /wn:if -->
<!-- wn:else {"id":"do_work","name":"do_work"} -->
请为以下内容生成摘要:

@{process.item}

`summary`<!-- wn:generation {"id":"summary","name":"summary","modelId":"gpt-4o-mini"} /-->

```wn:javascript {"id":"persist","name":"persist","nodeModules":["axios"]}
const axios = require('axios')
await axios.post('https://your-store.example.com/results', {
  sessionId: @{session_id},
  index: @{process.index},
  item: @{process.item},
  summary: @{summary.__default__},
  completedAt: new Date().toISOString()
})
return 'ok'   // 返回值未被下游引用;外部系统通过持久化层读取已完成 index
```
<!-- /wn:else -->
<!-- /wn:if-else -->

<!-- /wn:loop -->

Final:输出处理统计

markdown
```wn:javascript {"id":"final_stats","name":"final_stats","nodeModules":["axios"]}
const axios = require('axios')
const { data } = await axios.get('https://your-store.example.com/results', {
  params: { sessionId: @{session_id} }
})
return {
  sessionId: @{session_id},
  total: Array.isArray(@{items.__default__}) ? @{items.__default__}.length : 0,
  completed: Array.isArray(data?.records) ? data.records.length : 0,
}
```

本次运行处理统计:@{final_stats.output}

final_stats.output 是工作流最终输出,由触发方读取以决定是否还需要再次续跑。

续跑机制

外部触发系统(业务后端 / 调度器)负责:

  1. 首次调用:传入 start_index=0session_id=<新生成 UUID>
  2. 轮询持久化层:定期读取 https://your-store.example.com/results?sessionId=xxx 获取最大已完成 index
  3. 崩溃后重跑:以「最大已完成 index + 1」作为新的 start_index 重新调用工作流,传入相同的 session_id 和完整 items

工作流内部不需要知道「上次跑到哪了」——这是外部触发系统的职责,工作流只负责「按指令跳过前 N 项」。

重跑行为

假设 items 有 100 项,第 30 项处理后崩溃:

重跑参数行为
start_index=0全部 100 项重新处理(不推荐,浪费 token)
start_index=30跳过前 30 项,从第 31 项继续,处理 70 项
start_index=100全部跳过,工作流空跑

测试方法

  1. 首次运行:传入 5 项数据、start_index=0,观察 5 项全部处理。
  2. 模拟崩溃续跑:保留同一 session_id,传入相同 5 项 + start_index=2,观察前 2 项跳过、后 3 项执行。
  3. 空跑验证start_index=5,观察全部跳过且不报错。

注意事项

  • start_index 必须显式传 0(首次运行):If-Else 的条件是 index LessThan start_index.__default__。若 start_index 留空,undefined 参与数值比较时 evaluateCondition 直接判 false,恰好等价于「全部处理」——但这是依赖比较语义的偶然行为,触发方应始终显式传 start_index(首次为 0,续跑为上次中断位置),避免误导后续维护者。
  • 变量作用域@{process.index} 是 Loop 的 inside 变量,仅在 Loop 内部可引用。Loop 外引用会报错。
  • Loop 性能:处理 100+ 项时考虑用 Promise Block 并行化(注意并发限制和速率)。
  • 持久化键设计session_id 必须由外部生成(如 UUID)并保持唯一,避免不同运行互相污染。
  • 持久化层:示例用了虚拟 HTTP 接口;实际项目按团队约定接入数据库 / 知识库 / 业务后端。CodeBlock 沙箱不支持 fetch,必须在 nodeModules 中声明 axios 并通过它发请求(详见 Code Block 文档)。

AI Workflow Editor