Java Spring WebFlux怎么重试另外的服务?

Spring WebFlux怎么重试另一个url?
我正在做一个LLM gateway的需求,要求用webFlux,逐字吐出数据。同时支持容灾。
调用链路:clientA -> gateway -> serverB
如果gateway -> serverB这一步失败,则采用备选机serverC,即在gateway做重试,gateway -> serverC.
请注意,gateway -> serverB(serverC)这一步是采用webFlux的。
现在的问题是,我如何拿到gateway -> serverB的错误返回码,并且去重试serverC?

gateway -> serverB(serverC)的代码如下(取名为sseHttp):

Flux<Response> responseFlux = WebClient.create(url)
                .post().headers(httpHeaders -> setHeaders(httpHeaders, headers))
                .contentType(MediaType.APPLICATION_JSON).bodyValue(jsonBody)
                .retrieve().onStatus(status -> status != HttpStatus.OK, response -> {
                    String message = String.valueOf(response.bodyToMono(String.class));
                    aimlGatewayFptiEvent.put(INT_ERROR_CODE, String.valueOf(response.statusCode()));
                    aimlGatewayFptiEvent.put(STATUS_CODE, String.valueOf(response.statusCode()));
                    aimlGatewayFptiEvent.put(INT_ERROR_DESC, message);
                    logger.error("url:{}, response.statusCode:{}, message:{}",
                            url, response.statusCode(), message);
                    throw new GatewayException("call upstream seldon error, code:" + response.statusCode(),
                            INTERNAL_SERVER_ERROR);
                })
                .bodyToFlux(typeRef)
                .onErrorResume(WebClientResponseException.class, err -> {
                    aimlGatewayFptiEvent.put(INT_ERROR_CODE, String.valueOf(err.getStatusCode()));
                    aimlGatewayFptiEvent.put(INT_ERROR_DESC, err.getMessage());
                    throw new GatewayException(err.getMessage(), INTERNAL_SERVER_ERROR);
                })
                .doOnNext(event -> {
                    if(chunksCount.get() == 0) {
                        firstChunkReceivedTime.set(System.currentTimeMillis());
                    }
                    chunksCount.getAndIncrement();
                    if(null != event && null != event.data() && (null == bypassPayload || !bypassPayload)) {
                        try {
                            aimlGatewayFptiEvent.putResp(compressStringAndBase64Encode(event.data()));
                        } catch (IOException e) {
                            logger.error("Failed to compress and encode predict result.", e);
                        }
                    }
                })
                .map(event -> Response.ok().entity(
                        buildData2PredictResponse(event.data(), request.getEndpoint())
                ).build())
                .onErrorReturn(GatewayExceptionHandler.toStreamErrorResponse(
                        new GatewayException("Error predict from upstream seldon.", INTERNAL_SERVER_ERROR)))
                .doFinally((signalType) -> {
                    if (chunksCount.get() > 0) {
                        aimlGatewayFptiEvent.putIfAbsent(STATUS_CODE, "200");
                    }
                    aimlGatewayFptiEvent.putResponseTs(Long.toString(System.currentTimeMillis()));
                    aimlGatewayFptiEvent.putApiDuration(Long.toString(System.currentTimeMillis() - beginTs));
                    if(!Objects.equals(aimlGatewayFptiEvent.get(STATUS_CODE), "200")) {
                        calTransaction.setStatus(EXCEPTION);
                        calTransaction.addData(ERR_MSG, aimlGatewayFptiEvent.get(INT_ERROR_DESC));
                        CalLogHelper.logException(calTransaction.getType() + EXCEPTION_POSTFIX,
                                new GatewayException(aimlGatewayFptiEvent.get(INT_ERROR_DESC), INTERNAL_SERVER_ERROR));
                    }
                    calTransaction.completed();
                    // write to fpti
                    trackingUtil.send2Fpti(aimlGatewayFptiEvent, httpParams);

                })

尝试1:
尝试抓取gateway -> serverB抛出的异常,然后重试。

try {
   sseHttp(serverB.url); // call serverB
} catch(Exception e) {
   sseHttp(serverC.url); // call serverC
}

但问题是,call serverB即使失败了,也会先返回给clientA,而不是抛出异常。当serverB不可用,但是serverC可用时候,在clientA仍然是出错的。

尝试2:
使用subscribe获取gateway -> serverB的返回码。

Flux<Response> responseFlux = sseHttp(serverB.url);
responseFlux.subscribe(response -> {
            //设置map标记
            if (response.getStatus() != 200) {
                aimlGatewayFptiEvent.put(STATUS_CODE, String.valueOf(response.getStatus()));
            }
        });
//检查标记
if(null != aimlGatewayFptiEvent.get(STATUS_CODE) && !"200".equals(aimlGatewayFptiEvent.get(STATUS_CODE))) {
            responseFlux = sseHttp(serverC.url);
        }

很遗憾,由于subscribe是非阻塞的,因此在aimlGatewayFptiEvent被设置前,代码已经走到下面,并且返回了失败。
我如何能在clientA中得到成功的返回,当serverB不可用但是serverC可用?

我使用了AI提供的算法,它确实是有效的。然后,当serverB和serverC都可用的时候,会返回2次成功结果。如何才能失败后再重试?

阅读 515
avatarAI BotBETA

在WebFlux中,你可以使用retryWhen操作符来重试失败的请求。retryWhen接受一个Publisher,当原始Flux发出错误信号时,它将这个错误信号发送给这个Publisher。你可以在这个Publisher中定义你的重试逻辑。

在你的情况下,你可以在retryWhen中检查错误的HTTP状态码,如果状态码表示请求失败(例如500错误),那么你可以重试另一个URL。如果新的URL也失败,你可以继续重试,直到达到最大重试次数。

以下是一个可能的实现:

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;

// ...

Flux<Response> responseFlux = Flux.just(serverB.url, serverC.url) // 创建一个包含两个URL的Flux
        .concatMap(url -> WebClient.create(url)
                .post()
                .headers(httpHeaders -> setHeaders(httpHeaders, headers))
                .contentType(MediaType.APPLICATION_JSON)
                .bodyValue(jsonBody)
                .retrieve()
                .onStatus(HttpStatus::isError, clientResponse -> {
                    // 在这里处理错误状态码
                    String message = String.valueOf(clientResponse.bodyToMono(String.class));
                    // 记录错误日志
                    logger.error("url:{}, response.statusCode:{}, message:{}",
                            url, clientResponse.statusCode(), message);
                    throw new GatewayException("call upstream error, code:" + clientResponse.statusCode(),
                            INTERNAL_SERVER_ERROR);
                })
                .bodyToFlux(typeRef)
                .onErrorResume(throwable -> {
                    // 在这里处理错误并返回一个新的Flux,或者抛出一个异常
                    if (throwable instanceof WebClientResponseException) {
                        WebClientResponseException exception = (WebClientResponseException) throwable;
                        if (exception.getStatusCode() == HttpStatus.INTERNAL_SERVER_ERROR) {
                            // 如果是500错误,重试下一个URL
                            return Flux.error(new RuntimeException("Retry with another URL"));
                        }
                    }
                    // 如果不是500错误,直接抛出异常
                    return Flux.error(throwable);
                })
        )
        .retryWhen(retries -> retries.delayElements(Duration.ofMillis(1000)) // 每次重试间隔1秒
                .take(2) // 最大重试次数为2(即重试serverB和serverC各一次)
        );

// 订阅并处理响应
responseFlux.subscribe(response -> {
    // 处理响应
}, throwable -> {
    // 处理所有重试失败后的错误
    logger.error("Failed to call both servers", throwable);
    throw new GatewayException("Failed to call both servers", INTERNAL_SERVER_ERROR);
});

在这个例子中,我们创建了一个包含两个URL的Flux,然后使用concatMap对每个URL发起请求。如果请求失败并抛出WebClientResponseException,我们检查错误状态码是否为500。如果是500错误,我们返回一个包含错误消息的新Flux,这将触发retryWhen操作符进行重试。如果不是500错误,我们直接抛出异常。

retryWhen中,我们使用delayElements来设置每次重试之间的延迟,并使用take来限制最大重试次数。最后,我们订阅responseFlux并处理响应或错误。

这种方式可以确保在serverB不可用但serverC可用时,客户端A能够收到成功的返回。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题