示例 1:记忆驱动的 Agent Pipeline
多个 Agent 串行调用,每个 Agent 跑完后把结果存入持久化层。重跑时已跑过的 Agent 自动跳过,节省 token。
适用场景
- 长链路调研:先抓数据 → 再清洗 → 再分析 → 最后输出报告,每个 Agent 都很贵。
- 多轮分析:相同的输入文档,反复调整后半段 prompt,前半段不希望重复消费。
- 调试迭代:开发阶段反复跑工作流,希望前置步骤的结果保持稳定。
工作流结构
Input: query, run_id ← 外部传入查询和运行 ID(用于持久化键)
↓
[Step 1] CodeBlock: 检查 step1 是否有缓存
↓
[If-Else]
if cache hit → 跳过 Step 1 Agent,直接使用 @{cache_step1.value}
else → 运行 Step 1 Agent,结果写入持久化层
↓
[Step 2] CodeBlock: 检查 step2 是否有缓存
↓
[If-Else]
if cache hit → 跳过 Step 2 Agent
else → 运行 Step 2 Agent,结果写入持久化层
↓
[Final] Generation: 综合 step1 + step2 输出最终报告关键代码
输入参数
markdown
- **query** (`text`, required) — 用户查询 <!-- wn:input {"id":"query","name":"query","type":"text","required":true} /-->
- **run_id** (`text`, required) — 本次运行 ID(作为缓存键的命名空间) <!-- wn:input {"id":"run_id","name":"run_id","type":"text","required":true} /-->
- **cache_step1** (`object`) — 上次 Step 1 的结果(外部系统重跑时回传) <!-- wn:input {"id":"cache_step1","name":"cache_step1","type":"object","required":false} /-->
- **cache_step2** (`object`) — 上次 Step 2 的结果 <!-- wn:input {"id":"cache_step2","name":"cache_step2","type":"object","required":false} /-->缓存检查(Step 1)
markdown
```wn:javascript {"id":"check_step1","name":"check_step1"}
const cached = @{cache_step1}
return cached !== undefined
&& cached !== null
&& typeof cached === 'object'
&& typeof cached.value === 'string'
&& cached.value.length > 0
```分支执行(Step 1)
markdown
<!-- wn:if-else {"id":"branch_step1","name":"branch_step1"} -->
<!-- wn:if {"id":"step1_hit","name":"step1_hit","condition":{"first":{"sourceType":"variable","sourceValue":["check_step1","output"]},"compare":"Equals","second":{"sourceType":"boolean","sourceValue":true}}} -->
Step 1 缓存命中:@{cache_step1.value}
<!-- /wn:if -->
<!-- wn:else {"id":"step1_miss","name":"step1_miss"} -->
请基于查询「@{query}」做初步资料抓取。
`step1_result`<!-- wn:generation {"id":"step1_result","name":"step1_result","modelId":"gpt-4o"} /-->
```wn:javascript {"id":"persist_step1","name":"persist_step1","nodeModules":["axios"]}
// 把 step1_result 写入持久化层(示例:写入业务后端的缓存接口)
// 实际项目根据团队约定接入 Redis / 数据库 / 知识库
const axios = require('axios')
await axios.post('https://your-cache.example.com/cache', {
key: `pipeline:${@{run_id}}:step1`,
value: { value: @{step1_result.__default__}, fetchedAt: new Date().toISOString() }
})
return 'persisted'
```
<!-- /wn:else -->
<!-- /wn:if-else -->统一 Step 1 输出(缓存命中 / Agent 输出二选一)
If-Else 两个分支的变量互斥(参见 If-Else Block 文档),需要先用一个 CodeBlock 把两个分支的输出统一成一个变量,下游才能稳定引用。
markdown
```wn:javascript {"id":"step1_value","name":"step1_value"}
const fromCache = @{cache_step1}
const fromAgent = @{step1_result.__default__}
return fromCache?.value || fromAgent || ''
```之后下游统一引用 @{step1_value.output}。
Step 2(与 Step 1 对称)
markdown
```wn:javascript {"id":"check_step2","name":"check_step2"}
const cached = @{cache_step2}
return cached !== undefined
&& cached !== null
&& typeof cached === 'object'
&& typeof cached.value === 'string'
&& cached.value.length > 0
```
<!-- wn:if-else {"id":"branch_step2","name":"branch_step2"} -->
<!-- wn:if {"id":"step2_hit","name":"step2_hit","condition":{"first":{"sourceType":"variable","sourceValue":["check_step2","output"]},"compare":"Equals","second":{"sourceType":"boolean","sourceValue":true}}} -->
Step 2 缓存命中:@{cache_step2.value}
<!-- /wn:if -->
<!-- wn:else {"id":"step2_miss","name":"step2_miss"} -->
基于 Step 1 的结果做下一步分析:
@{step1_value.output}
`step2_result`<!-- wn:generation {"id":"step2_result","name":"step2_result","modelId":"gpt-4o"} /-->
```wn:javascript {"id":"persist_step2","name":"persist_step2","nodeModules":["axios"]}
const axios = require('axios')
await axios.post('https://your-cache.example.com/cache', {
key: `pipeline:${@{run_id}}:step2`,
value: { value: @{step2_result.__default__}, fetchedAt: new Date().toISOString() }
})
return 'persisted'
```
<!-- /wn:else -->
<!-- /wn:if-else -->
```wn:javascript {"id":"step2_value","name":"step2_value"}
const fromCache = @{cache_step2}
const fromAgent = @{step2_result.__default__}
return fromCache?.value || fromAgent || ''
```Final:综合输出报告
markdown
请综合下面两步的产出,写一份最终报告:
Step 1 产出:
@{step1_value.output}
Step 2 产出:
@{step2_value.output}
`final_report`<!-- wn:generation {"id":"final_report","name":"final_report","modelId":"gpt-4o"} /-->final_report.__default__ 是工作流的最终输出。
重跑行为
| 场景 | cache_step1 / cache_step2 值 | 实际执行 |
|---|---|---|
| 首次运行 | 都为 undefined | Step 1、Step 2、Final 全部执行 |
| Step 1 跑完崩溃 | cache_step1 = 上次结果,cache_step2 = undefined | 跳过 Step 1,执行 Step 2、Final |
| 都跑完了再调整 prompt 重跑 | 都有值 | 跳过 Step 1、Step 2,只跑 Final |
外部触发系统(业务后端 / 调度器)负责记录每次运行后哪些缓存可用,下次重跑时把对应缓存作为 input 传入。
测试方法
- 首次运行:把
cache_step1和cache_step2留空,观察 Step 1、Step 2 都执行。 - 第二次运行:把首次运行的
step1_result输出包成{ value, fetchedAt }作为cache_step1传入,观察 Step 1 被跳过。 - 第三次运行:两个缓存都传入,观察只跑 Final。
每次运行的 token 消耗对比,可以验证幂等行为是否生效。