从企业微信流式响应中学习生产者-消费者模式

需求分析:在 FastGPT 平台,将工作流的发布渠道与企业微信智能机器人对接

实现企业微信回调 FastGPT 的地址,FastGPT 将工作流处理好的 AI 回复返回给企微

实现企微智能机器人能够使用 FastGPT 平台的功能


企业微信对接文档-流式响应的架构

企微会给回调 URL(即自己配置的后台程序)发送两种类型的请求

第一种是消息推送, 只会在第一次向后台程序发送请求时发送

第二种是轮询请求。即当企微接收到消息推送请求的响应后,开始发送轮询请求

具体文档可参考 企业微信智能机器人文档


实现难点

  1. 为什么企业微信要发送轮询请求?消息推送请求和轮询请求的本质区别是什么?需要为两种请求写不一样的逻辑处理吗?
  2. 等待 AI 输出全部结果,再返回给企微,耗时太长,如何优化
  3. HTTP 请求是无状态的, 后台程序每收到一次请求, 都不知道上一次请求响应了什么数据, 从而无法实现回复内容的追加
  4. 如果每接收一次请求, 就调用一次 AI 输出回答. 由于轮询的频率很快, 一次 AI 回答还没输出完, 就要又一次调用 AI 输出. 最后会产生“轮询次数 + 1”倍的 AI 回复
  5. AI 回复结束, 就要停止企微的轮询请求(发送特定格式的数据包), 那么要给后台程序怎样的逻辑, 才能实现 AI 回复结束,就向企微发送”结束数据包”

解决过程

方案一 一次请求对应一次完整回复

企微发送一个请求,程序接收后,会调用 AI 输出回答,AI 回复结束后,返回给企微一个响应

具体效果

首先是等待 AI 输出完成的时间太长,用户体验不佳

其次每接收一次请求都调用一次 AI,因为有轮询请求,所以会产生非常多次 AI 回复

而企微会渲染最后一次接收的回复,那么前面的回复就浪费了性能

方案二 流式回复

使用异步迭代器,异步流式处理数据

  1. 异步可迭代对象生成一个异步迭代器
  2. 异步迭代器自动调用 next()方法, 通过异步请求获取 ai 回复
  3. 判断迭代结果的 done 值是否为 true, 是则停止循环, 不是则重复第二步请求数据

具体效果

通过异步获取数据, 使得 ai 回复数据传输到 FastGPT 平台是”实时”的, 解决了等待 ai 回复时间过久的问题

但是没有解决从 FastGPT 到企微, 一段一段的流式数据如何传输的问题

方案 n 生产者 - 消费者设计模式

生产者: 程序接收消息推送请求,负责将数据写入 redis

消费者: 程序接受轮询请求,负责从 redis 中取出数据

缓存层: redis

具体效果

用户向企微发送消息后, 企微会先向后台程序发送一次消息推送请求

后台程序接收推送并按照企微的要求响应,响应后程序不会立即停止

而是等待 AI 不断输出新的回复,并不断将新的回复写入 redis 当中

写入是以追加的形式写入,即每次携带的回复都是上一次携带回复数据的追加值(累加值)

直到 AI 输出完成,这一次响应才彻底结束

在企微这边,当接收到消息推送请求的响应后, 就开始发送轮询请求(发送时间间隔 < 1s )

后台程序接收到一次轮询请求后,就从 redis 中取出数据,并返回给企微

由于程序会不断将 AI 的输出结果存入 redis,而 AI 的输出是流式的

所以每一次从 redis 中取出的数据是越来越多,逐渐完善的

从而实现返回给企微的数据是类似 AI 输出的流式效果

当后台程序发现 AI 已经回复完成后,会在文本末尾加上“[DONE]”,表示回复结束

此时程序检测到文本末尾的内容,就会向企微发送一个特殊格式的数据包,标志着回复结束

企微接收到结束数据包,便不再发送请求,轮询结束,一次 AI 对话完成

优点

后续优化方向

缓存层使用消息队列,能更加方便控制缓存的粒度

实时通信方式采用 WebSocket,而不是 HTTP 轮询

HTTP 轮询每一次请求都要重新连接,开销大,而 WebSocket 只需一次连接

HTTP 轮询的服务端只能被动回复客户端,而 WebSocket 的服务端能够主动给通信