示例 2:批量数据处理断点续跑
处理 N 条数据,每条独立调用 AI。中途崩了重跑时,从上次中断的位置继续,已处理项不重做。
适用场景
- 批量翻译:100 篇文档逐篇翻译。
- 批量摘要:长文章列表,每篇生成摘要。
- 批量打标:用户评论列表,每条做情感分类。
工作流结构
Input: items (待处理列表), start_index (续跑位置, 默认 0)
↓
Loop over items
├── If index < start_index → 跳过(已处理过)
└── Else → 处理当前项 + 持久化结果 + 更新 start_index
↓
Final: 输出处理统计关键代码
输入参数
markdown
- **items** (`list`, required) — 待处理的数据项 <!-- wn:input {"id":"items","name":"items","type":"list","required":true} /-->
- **start_index** (`number`) — 续跑起始索引(首次运行为 0) <!-- wn:input {"id":"start_index","name":"start_index","type":"number","required":false} /-->
- **session_id** (`text`, required) — 会话 ID,作为持久化键 <!-- wn:input {"id":"session_id","name":"session_id","type":"text","required":true} /-->主循环
markdown
<!-- wn:loop {"id":"process","name":"process","loopType":"list","list":{"sourceType":"variable","sourceValue":["items","__default__"]}} -->
<!-- wn:if-else {"id":"check_skip","name":"check_skip"} -->
<!-- wn:if {"id":"already_done","name":"already_done","condition":{"first":{"sourceType":"variable","sourceValue":["process","index"]},"compare":"LessThan","second":{"sourceType":"variable","sourceValue":["start_index","__default__"]}}} -->
跳过第 @{process.index} 项:已处理过
<!-- /wn:if -->
<!-- wn:else {"id":"do_work","name":"do_work"} -->
请为以下内容生成摘要:
@{process.item}
`summary`<!-- wn:generation {"id":"summary","name":"summary","modelId":"gpt-4o-mini"} /-->
```wn:javascript {"id":"persist","name":"persist","nodeModules":["axios"]}
const axios = require('axios')
await axios.post('https://your-store.example.com/results', {
sessionId: @{session_id},
index: @{process.index},
item: @{process.item},
summary: @{summary.__default__},
completedAt: new Date().toISOString()
})
return 'ok' // 返回值未被下游引用;外部系统通过持久化层读取已完成 index
```
<!-- /wn:else -->
<!-- /wn:if-else -->
<!-- /wn:loop -->Final:输出处理统计
markdown
```wn:javascript {"id":"final_stats","name":"final_stats","nodeModules":["axios"]}
const axios = require('axios')
const { data } = await axios.get('https://your-store.example.com/results', {
params: { sessionId: @{session_id} }
})
return {
sessionId: @{session_id},
total: Array.isArray(@{items.__default__}) ? @{items.__default__}.length : 0,
completed: Array.isArray(data?.records) ? data.records.length : 0,
}
```
本次运行处理统计:@{final_stats.output}final_stats.output 是工作流最终输出,由触发方读取以决定是否还需要再次续跑。
续跑机制
外部触发系统(业务后端 / 调度器)负责:
- 首次调用:传入
start_index=0、session_id=<新生成 UUID>。 - 轮询持久化层:定期读取
https://your-store.example.com/results?sessionId=xxx获取最大已完成index。 - 崩溃后重跑:以「最大已完成
index+ 1」作为新的start_index重新调用工作流,传入相同的session_id和完整items。
工作流内部不需要知道「上次跑到哪了」——这是外部触发系统的职责,工作流只负责「按指令跳过前 N 项」。
重跑行为
假设 items 有 100 项,第 30 项处理后崩溃:
| 重跑参数 | 行为 |
|---|---|
start_index=0 | 全部 100 项重新处理(不推荐,浪费 token) |
start_index=30 | 跳过前 30 项,从第 31 项继续,处理 70 项 |
start_index=100 | 全部跳过,工作流空跑 |
测试方法
- 首次运行:传入 5 项数据、
start_index=0,观察 5 项全部处理。 - 模拟崩溃续跑:保留同一
session_id,传入相同 5 项 +start_index=2,观察前 2 项跳过、后 3 项执行。 - 空跑验证:
start_index=5,观察全部跳过且不报错。
注意事项
start_index必须显式传 0(首次运行):If-Else 的条件是index LessThan start_index.__default__。若start_index留空,undefined参与数值比较时evaluateCondition直接判 false,恰好等价于「全部处理」——但这是依赖比较语义的偶然行为,触发方应始终显式传start_index(首次为 0,续跑为上次中断位置),避免误导后续维护者。- 变量作用域:
@{process.index}是 Loop 的inside变量,仅在 Loop 内部可引用。Loop 外引用会报错。 - Loop 性能:处理 100+ 项时考虑用 Promise Block 并行化(注意并发限制和速率)。
- 持久化键设计:
session_id必须由外部生成(如 UUID)并保持唯一,避免不同运行互相污染。 - 持久化层:示例用了虚拟 HTTP 接口;实际项目按团队约定接入数据库 / 知识库 / 业务后端。CodeBlock 沙箱不支持
fetch,必须在nodeModules中声明axios并通过它发请求(详见 Code Block 文档)。