🌊 流式输出
# 开启模型流式模式
如果希望模型请求本身使用流式传输,需要在 Agent 的 model() 中显式设置 .stream(true):
@Override
protected ModelSpec<?> model() {
return OpenAICompatible.custom("gateway", modelName)
.stream(true);
}
ModelSpec.stream(true) 负责开启底层模型请求的流式模式;ExecuteOption.eventListener(...) 负责把 Agent 执行过程中的事件转发给调用方。实际使用流式输出时,建议两者同时配置。
提示
如果覆写了 buildModel() 自行构造模型,则 model().resolve(agentConfig()) 不会被调用,需要在底层模型 builder 上自行设置 .stream(true)。
# 注册事件监听器
LiteFlow 的 execute2Resp(...) 本身仍然是阻塞调用——整条 chain 执行完成后才返回。如果希望在 Agent 执行过程中实时拿到输出,可以通过 ExecuteOption.eventListener(...) 注册事件监听器:
List<FlowEvent> events = new CopyOnWriteArrayList<>();
LiteflowResponse response = flowExecutor.execute2Resp(
"deepseekChain",
"用一句话介绍 LiteFlow",
ExecuteOption.of()
.conversationId("chat-user-1-conv-1")
.eventListener(event -> {
if ("agent.reasoning".equals(event.getType()) && event.getText() != null) {
// 转发到 SSE、WebSocket 或命令行
System.out.print(event.getText());
}
events.add(event);
})
);
# 事件类型
FlowEvent#getType() | 含义 | 典型内容 |
|---|---|---|
agent.reasoning | 模型推理/回复过程 | 增量文本、最终 assistant 消息、工具调用请求 |
agent.tool_result | 工具执行结果 | 工具输出或增量片段 |
agent.summary | 达到最大迭代次数后的总结(见 迭代次数与 Summary) | summary 生成过程或最终 summary |
agent.result | 本轮 Agent 最终结果 | 与最终 handleReply(reply) 使用的消息一致 |
# FlowEvent 字段
每个 FlowEvent 携带以下信息:
| 字段 | 说明 |
|---|---|
chainId | 链路 ID |
nodeId | 节点 ID(多 Agent 链路中用于区分来源) |
requestId | 请求 ID |
conversationId | 会话 ID |
text | 文本内容 |
last | 是否为该类型最后一个事件 |
timestamp | 时间戳 |
data | 原始数据 |
# 多 Agent 并发的流式处理
WHEN(agentA, agentB) 并发执行时,多个 Agent 的流式事件可能交错到达。调用方应按 nodeId 或 conversationId 分组展示:
.eventListener(event -> {
String key = event.getNodeId(); // 用 nodeId 区分不同 Agent
// 按 key 分组推送到不同的 SSE 连接
})
# 重要说明
- 没有注册 listener 时,Agent 会走阻塞调用路径,不会产生额外事件开销
- 注册 listener 后,Agent 会使用
agent.stream(...)执行,流结束后照常调用handleReply(reply) - 需要底层模型流式传输时,请在
model()返回的ModelSpec上显式调用.stream(true) - listener 是同步回调,在 chain 执行线程中运行。生产环境建议在 listener 中只做轻量入队,不要执行耗时 I/O
- listener 抛出异常会向上传播,可能导致链路失败
帮助我们改善此文档 (opens new window)
上次更新: 2026/06/02, 00:29:19


