FastGPT实习:生产者-消费者模式实现企微流式响应
从企业微信流式响应中学习生产者-消费者模式
需求分析:在 FastGPT 平台,将工作流的发布渠道与企业微信智能机器人对接
实现企业微信回调 FastGPT 的地址,FastGPT 将工作流处理好的 AI 回复返回给企微
实现企微智能机器人能够使用 FastGPT 平台的功能
企业微信对接文档-流式响应的架构
企微会给回调 URL(即自己配置的后台程序)发送两种类型的请求
第一种是消息推送, 只会在第一次向后台程序发送请求时发送
第二种是轮询请求。即当企微接收到消息推送请求的响应后,开始发送轮询请求
具体文档可参考 企业微信智能机器人文档
实现难点
- 为什么企业微信要发送轮询请求?消息推送请求和轮询请求的本质区别是什么?需要为两种请求写不一样的逻辑处理吗?
- 等待 AI 输出全部结果,再返回给企微,耗时太长,如何优化
- HTTP 请求是无状态的, 后台程序每收到一次请求, 都不知道上一次请求响应了什么数据, 从而无法实现回复内容的追加
- 如果每接收一次请求, 就调用一次 AI 输出回答. 由于轮询的频率很快, 一次 AI 回答还没输出完, 就要又一次调用 AI 输出. 最后会产生“轮询次数 + 1”倍的 AI 回复
- AI 回复结束, 就要停止企微的轮询请求(发送特定格式的数据包), 那么要给后台程序怎样的逻辑, 才能实现 AI 回复结束,就向企微发送”结束数据包”
解决过程
方案一 一次请求对应一次完整回复
企微发送一个请求,程序接收后,会调用 AI 输出回答,AI 回复结束后,返回给企微一个响应
具体效果
首先是等待 AI 输出完成的时间太长,用户体验不佳
其次每接收一次请求都调用一次 AI,因为有轮询请求,所以会产生非常多次 AI 回复
而企微会渲染最后一次接收的回复,那么前面的回复就浪费了性能
方案二 流式回复
使用异步迭代器,异步流式处理数据
- 异步可迭代对象生成一个异步迭代器
- 异步迭代器自动调用 next()方法, 通过异步请求获取 ai 回复
- 判断迭代结果的 done 值是否为 true, 是则停止循环, 不是则重复第二步请求数据
具体效果
通过异步获取数据, 使得 ai 回复数据传输到 FastGPT 平台是”实时”的, 解决了等待 ai 回复时间过久的问题
但是没有解决从 FastGPT 到企微, 一段一段的流式数据如何传输的问题
方案 n 生产者 - 消费者设计模式
生产者: 程序接收消息推送请求,负责将数据写入 redis
消费者: 程序接受轮询请求,负责从 redis 中取出数据
缓存层: redis
具体效果
用户向企微发送消息后, 企微会先向后台程序发送一次消息推送请求
后台程序接收推送并按照企微的要求响应,响应后程序不会立即停止
而是等待 AI 不断输出新的回复,并不断将新的回复写入 redis 当中
写入是以追加的形式写入,即每次携带的回复都是上一次携带回复数据的追加值(累加值)
直到 AI 输出完成,这一次响应才彻底结束
在企微这边,当接收到消息推送请求的响应后, 就开始发送轮询请求(发送时间间隔 < 1s )
后台程序接收到一次轮询请求后,就从 redis 中取出数据,并返回给企微
由于程序会不断将 AI 的输出结果存入 redis,而 AI 的输出是流式的
所以每一次从 redis 中取出的数据是越来越多,逐渐完善的
从而实现返回给企微的数据是类似 AI 输出的流式效果
当后台程序发现 AI 已经回复完成后,会在文本末尾加上“[DONE]”,表示回复结束
此时程序检测到文本末尾的内容,就会向企微发送一个特殊格式的数据包,标志着回复结束
企微接收到结束数据包,便不再发送请求,轮询结束,一次 AI 对话完成
优点
后续优化方向
缓存层使用消息队列,能更加方便控制缓存的粒度
实时通信方式采用 WebSocket,而不是 HTTP 轮询
HTTP 轮询每一次请求都要重新连接,开销大,而 WebSocket 只需一次连接
HTTP 轮询的服务端只能被动回复客户端,而 WebSocket 的服务端能够主动给通信