现象是:服务启动以后,前几次对话都没问题,可能过一会再用,就不行了,一直收不到消息
伪代码如:
Disposable subscribe = responseFlux
.timeout(Duration.ofSeconds(60))
.doOnSubscribe(subscription -> {
System.out.println("开始订阅:" + message.getMessage());
})
.doOnNext(chunk -> {
try {
System.out.println("result:" + result);
if (result.getText() != null) {
answer.append(result.getText());
cardManager.streamUpdate(message.getCardUUID(), answer.toString());
}
} catch (Exception e) {
System.out.println(e.getMessage());
}
}).doOnError(throwable -> {
throwable.printStackTrace();
System.out.println(throwable.getMessage());
}).doOnComplete(
() -> {
System.out.println("Completed");
}
).subscribe();
也就是,服务启动一会后,一开始doOnNext里能收到消息,过一会后,doOnSubscribe确实打印订阅成功,但是doOnNext一直收不到消息,过你设定的timeout时间后,会触发doOnError
如何解决?
我这里引入了MCP依赖,其中MCP依赖有3个
| / | Server Type | Dependency | Property |
|---|---|---|---|
| STDIO | Standard Input/Output (STDIO) | spring-ai-starter-mcp-server | spring.ai.mcp.server.stdio=true |
| WebMVC | SSE WebMVC | spring-ai-starter-mcp-server-webmvc | spring.ai.mcp.server.protocol=SSE or empty |
| WebMVC | Streamable-HTTP WebMVC | spring-ai-starter-mcp-server-webmvc | spring.ai.mcp.server.protocol=STREAMABLE |
| WebMVC | Stateless Streamable-HTTP WebMVC | spring-ai-starter-mcp-server-webmvc | spring.ai.mcp.server.protocol=STATELESS |
| WebMVC (Reactive) | SSE WebFlux | spring-ai-starter-mcp-server-webflux | spring.ai.mcp.server.protocol=SSE or empty |
| WebMVC (Reactive) | Streamable-HTTP WebFlux | spring-ai-starter-mcp-server-webflux | spring.ai.mcp.server.protocol=STREAMABLE |
| WebMVC (Reactive) | Stateless Streamable-HTTP WebFlux | spring-ai-starter-mcp-server-webflux | spring.ai.mcp.server.protocol=STATELESS |
其中我引入的依赖是starter-mcp-server-webmvc
该依赖默认使用JDK HttpClient,换成spring-ai-starter-mcp-server-webflux这个依赖就好了,webflux使用的httpClient是Reactor Netty
webmvc(JDK HttpClient) vs webflux(Reactor Netty)处理背压的本质区别
| 项目 | webmvc + JDK HttpClient | webflux + Reactor Netty |
|---|---|---|
| IO 模型 | 阻塞式(Blocking) | 非阻塞式 + 背压(Reactive Streams) |
| 背压支持 | 无(JDK HttpClient 不支持 Reactive Streams) | 有(实现 org.reactivestreams 规范) |
| 数据流 | InputStream 一次性读完 | Flux 按需拉取 |
| 消费慢时 | 上游 继续推数据 → 缓冲区爆 → OOM/卡死 | 下游 请求 N 个 → 上游只发 N 个 |
| 线程 | 1 请求 = 1 线程(阻塞等待) | 少量线程 + 事件循环 |
webmvc + JDK(无背压,伪代码)
InputStream in = response.getBody(); // 阻塞读
byte[] buffer = new byte[1024];
int read;
while ((read = in.read(buffer)) != -1) { // 上游疯狂推!
process(buffer); // 你慢?它不管,继续塞!
}
webflux + Reactor(有背压,真实机制)
Flux<DataBuffer> flux = webClient.post()...; // 响应式流
flux.subscribe(
data -> process(data), // doOnNext
error -> {},
() -> {}
// 背压信号:subscribe 时指定 request(n)
);
背压信号:
subscription.request(1); // 我只想要 1 个 chunk
// 处理完后:
subscription.request(1); // 再要 1 个
结果:AI 只发 1 个 chunk,等你处理完再发下一个 → 零堆积。
为什么 webflux 换了就好了?
| 你的痛点 | webmvc 原因 | webflux 解决 |
|---|---|---|
| doOnNext 不触发 | 缓冲区满,线程卡在 read() | 背压暂停上游 |
| Connection timed out | 阻塞等待连接 | 非阻塞,超时不卡线程 |
| 系统假死 | 线程耗尽 | 事件驱动,线程复用 |
本质区别: webmvc = “推模型”(Push) → 上游不管你死活。 webflux = “拉模型”(Pull) → 你说要多少,我发多少。
总结:背压 = 响应式系统的“刹车”
| 概念 | 说明 |
|---|---|
| 背压 | 下游控制上游发送速度 |
| 无背压 | 上游狂推 → 爆内存/卡死 |
| 有背压 | 下游 request(n) → 安全流控 |