拦截 WebClient ,遇到 Only one connection receive subscriber allowed

拦截 WebClient 请求响应数据,遇到 Only one connection receive subscriber allowed。

@Test
    public void testWebClient2() {
        WebClient client = WebClient.builder()
                .filter(logResponse()).build();
        Mono<String> mono = client.get().uri("https://www.baidu.com").retrieve().bodyToMono(String.class);
        System.out.println(mono.block());
    }

    private ExchangeFilterFunction logResponse() {
        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
            log.info("Response Status : " + clientResponse.statusCode());
            Mono<String> body = clientResponse.bodyToMono(String.class);
            body.subscribe(str -> System.out.println(str));
            return Mono.just(clientResponse);
        });
    }

运行结果:

java.lang.IllegalStateException: Only one connection receive subscriber allowed.

    at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:304)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ Body from GET https://www.baidu.com [DefaultClientResponse]
Stack trace:
        at reactor.netty.channel.FluxReceive.startReceiver(FluxReceive.java:304)

filter里面消费了response body, 业务代码就不能消费了,目的是收集 WebClient 的请求和响应数据,有什么方式解决吗?求解,谢谢。

阅读 5.5k
2 个回答

    @Test
    public void testWeb() throws InterruptedException {
        WebClient client = WebClient
                .builder()
                .filter(logResponse())
                .build();
          client
                .get()
                .uri("https://www.baidu.com")
                .retrieve()
                .bodyToMono(String.class)
                .subscribe(s->System.out.println(s));
          TimeUnit.SECONDS.sleep(2);
    }

    private ExchangeFilterFunction logResponse() {
        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
            return clientResponse
                    .bodyToMono(String.class)
                    .doOnNext(s -> System.out.println("aaa" + s))
                    .map(s -> ClientResponse.from(clientResponse).body(s).build())
                    ;
        });
    }

你可以尝试下这个。不过这违背了流的初衷,属于非主流操作。

image.png
你消费了两次返回值


首先你可以考虑不再logResponse()上消费可以尝试是用Mono#doOnSuccess,而且你用 mono.block() 是同步的,那用Mono,Flux这种异步库其实有点画龙点睛了

 @Test
    public void testWebClient2() {
        WebClient client = WebClient.builder()
                .filter(logResponse()).build();
        Mono<String> mono = client
                .get()
                .uri("https://www.baidu.com")
                .retrieve()
                .bodyToMono(String.class)
                .doOnSuccess(it -> {
                    // 一些其他操作
                    System.out.println(it);
                });
        mono.block();
        // System.out.println(mono.block());
    }

    private ExchangeFilterFunction logResponse() {
        return ExchangeFilterFunction.ofResponseProcessor(clientResponse -> {
            log.info("Response Status : " + clientResponse.statusCode());
            return Mono.just(clientResponse);
        });
    }
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题