“ 技术上最容易犯的错就是经验主义,以及拿来主义”
最近在对接GPT实现一个功能,具体功能就不说了;主要是这个功能需要流式返回,因此踩了一些坑;所以就在此记录一下。至于什么是流式返回,不清楚的可以自己问度娘。
大模型流式返回带来的问题
自chatGPT推出以来,其一个字一个字的出现,就像一个打字机;这效果惊艳了很多人,因此在很多场景下很多人都会选择打字机的效果。而打字机效果背后的实现就是流式返回。
对技术有过了解的人应该都知道,正常情况下接口是在所有业务处理完成之后一起返回;但流式返回是分多批次返回。简单来说就是处理了一部分就返回一部分,不用等全部完成之后再返回。
如下图所示就是一个典型的流式返回:
那目前流式返回所遇到的问题是什么呢?
其实从后端的角度来说,流式返回没有任何问题;不论是使用大模型官方提供的SDK亦或者是调用他们的接口,都是正常的流式返回。但问题是,调用第三方接口的目的是为了完成业务功能,因此怎么把这个流式返回也用流式返回给前端就是一个需要思考的问题了。
从web开发的角度来说,现在前后端交互主要使用的是http协议;但http协议是前端向后端发起请求,而不能从后端向前端发起请求;为了解决这个问题,因此就有了websocket和SSE协议。
这两个协议的区别是websocket是全双工的,而SSE是半双工的;意思就是说,websocket建立连接之后,前端可以主动向后端发消息,后端也可以主动向前端发消息;而SSE是只能后端向前端发消息。
但不论是websocket还是SSE协议,本质上只是一种通讯协议,和业务没什么具体的关系;这就类似于,搞货运的目的是把货物安全的送到目的地,至于你是用汽车运,还是用火车运都可以。
那问题出在哪里呢?
刚开始我们使用的是websocket作为流式返回的通讯工具;但再实际使用中才发现一个很大的问题,那就是websocket无法在短时间内接受大量的网络传输需求;一旦过量就会导致websocket缓冲区溢出,也就是TEXT_FULL_WRITRING异常;简单来说就是,websocket为了减轻网络压力,每次发送消息都会先把缓冲区写满;然后再一次性发送。
但由于流式返回速度较快,有时候websocket上一条消息还没发送出去,下一条新的数据又进来了;因此就会导致websocket报错,即使使用的是异步发送也会报错。
public void sendText(String text) {
for(Session session : sessions.values()){
if (session.isOpen()) {
try {
//异步发送
session.getAsyncRemote().sendText(text);
} catch (Exception e) {
log.error("发送会话异常");
}
}else{
log.error("socket 在不可发送状态");
}
}
}
为了解决这个问题, 因此就在网上查了一下发现;类似于这种流式返回,大部分人的处理方式都是用SSE协议;因为SSE协议相对websocket更简单,效率更高。而在java语言中,使用SSE有两种方式,第一种就是自己手动创建SSE对象,使用SseEmitter 对象来实现。
但这种原生的实现方式存在很多问题,比如需要自己去控制sse与用户的关联关系,sse的状态判断,自动重连等等。
因此,springboot就提供了另一种方式,那就是Flux流式处理。
OpenAIClient client = new OpenAIClientBuilder().credential(new AzureKeyCredential(key)).endpoint(endPoint).buildClient();
IterableStream<ChatCompletions> stream = client.getChatCompletionsStream(modelName, new ChatCompletionsOptions(messages));
StringBuffer stringBuffer = new StringBuffer();
return Flux.<String>create(sink -> {
stream.iterator().forEachRemaining(
chatCompletions -> {
if (chatCompletions.getChoices() != null && chatCompletions.getChoices().size() > 0) {
if (chatCompletions.getChoices().get(0).getDelta() != null) {
String content = chatCompletions.getChoices().get(0).getDelta().getContent();
log.info(content);
if (content != null) {
stringBuffer.append(content);
sink.next(content);
}
}
}
}
);
sink.complete();
}).map(data -> ServerSentEvent.<String>builder().data(data).build())
// 每隔一段时间发送一个字符
.delayElements(Duration.ofMillis(10))
// 停止
.takeWhile(i -> !redisUtil.hasKey(stopKey))
// 最后执行
.doOnComplete(() -> {
//传输完成 业务处理
});
如上所示,Flux通过sink封装大模型的流式返回,然后调用next方法主动把数据返回给前端,以此达到流式效果。
虽然从操作上来说,各种技术已经逐渐成熟,我们都可以直接拿来主义,拿过来用就好了;但实际上存在的一个问题就是,当你不知道其原理,又没有经验时,你还是会踩很多坑。
(文:AI探索时代)