Spring AI流式调用模型卡死无响应

现象是:服务启动以后,前几次对话都没问题,可能过一会再用,就不行了,一直收不到消息

伪代码如:

Disposable subscribe = responseFlux
                    .timeout(Duration.ofSeconds(60))
                    .doOnSubscribe(subscription -> {
                        System.out.println("开始订阅:" + message.getMessage());
                    })
                    .doOnNext(chunk -> {
                        try {
                            System.out.println("result:" + result);
                            if (result.getText() != null) {
                                answer.append(result.getText());
                                cardManager.streamUpdate(message.getCardUUID(), answer.toString());
                            }
                        } catch (Exception e) {
                            System.out.println(e.getMessage());
                        }
                    }).doOnError(throwable -> {
                        throwable.printStackTrace();
                        System.out.println(throwable.getMessage());
                    }).doOnComplete(
                            () -> {
                                System.out.println("Completed");
                            }
                    ).subscribe();

也就是,服务启动一会后,一开始doOnNext里能收到消息,过一会后,doOnSubscribe确实打印订阅成功,但是doOnNext一直收不到消息,过你设定的timeout时间后,会触发doOnError

如何解决?
我这里引入了MCP依赖,其中MCP依赖有3个

/ Server Type Dependency Property
STDIO Standard Input/Output (STDIO) spring-ai-starter-mcp-server spring.ai.mcp.server.stdio=true
WebMVC SSE WebMVC spring-ai-starter-mcp-server-webmvc spring.ai.mcp.server.protocol=SSE or empty
WebMVC Streamable-HTTP WebMVC spring-ai-starter-mcp-server-webmvc spring.ai.mcp.server.protocol=STREAMABLE
WebMVC Stateless Streamable-HTTP WebMVC spring-ai-starter-mcp-server-webmvc spring.ai.mcp.server.protocol=STATELESS
WebMVC (Reactive) SSE WebFlux spring-ai-starter-mcp-server-webflux spring.ai.mcp.server.protocol=SSE or empty
WebMVC (Reactive) Streamable-HTTP WebFlux spring-ai-starter-mcp-server-webflux spring.ai.mcp.server.protocol=STREAMABLE
WebMVC (Reactive) Stateless Streamable-HTTP WebFlux spring-ai-starter-mcp-server-webflux spring.ai.mcp.server.protocol=STATELESS

其中我引入的依赖是starter-mcp-server-webmvc
该依赖默认使用JDK HttpClient,换成spring-ai-starter-mcp-server-webflux这个依赖就好了,webflux使用的httpClient是Reactor Netty

webmvc(JDK HttpClient) vs webflux(Reactor Netty)处理背压的本质区别

项目 webmvc + JDK HttpClient webflux + Reactor Netty
IO 模型 阻塞式(Blocking) 非阻塞式 + 背压(Reactive Streams)
背压支持 (JDK HttpClient 不支持 Reactive Streams) (实现 org.reactivestreams 规范)
数据流 InputStream 一次性读完 Flux 按需拉取
消费慢时 上游 继续推数据 → 缓冲区爆 → OOM/卡死 下游 请求 N 个 → 上游只发 N 个
线程 1 请求 = 1 线程(阻塞等待) 少量线程 + 事件循环

webmvc + JDK(无背压,伪代码)

InputStream in = response.getBody();  // 阻塞读
byte[] buffer = new byte[1024];
int read;
while ((read = in.read(buffer)) != -1) {  // 上游疯狂推!
    process(buffer);  // 你慢?它不管,继续塞!
}

webflux + Reactor(有背压,真实机制)

Flux<DataBuffer> flux = webClient.post()...;  // 响应式流

flux.subscribe(
    data -> process(data),  // doOnNext
    error -> {},
    () -> {}
    // 背压信号:subscribe 时指定 request(n)
);

背压信号:

subscription.request(1);  // 我只想要 1 个 chunk
// 处理完后:
subscription.request(1);  // 再要 1 个

结果:AI 只发 1 个 chunk,等你处理完再发下一个 → 零堆积。

为什么 webflux 换了就好了?

你的痛点 webmvc 原因 webflux 解决
doOnNext 不触发 缓冲区满,线程卡在 read() 背压暂停上游
Connection timed out 阻塞等待连接 非阻塞,超时不卡线程
系统假死 线程耗尽 事件驱动,线程复用

本质区别webmvc = “推模型”(Push) → 上游不管你死活。 webflux = “拉模型”(Pull) → 你说要多少,我发多少。

总结:背压 = 响应式系统的“刹车”

概念 说明
背压 下游控制上游发送速度
无背压 上游狂推 → 爆内存/卡死
有背压 下游 request(n) → 安全流控