Spring WebFlux怎么重试另一个url?
我正在做一个LLM gateway的需求,要求用webFlux,逐字吐出数据。同时支持容灾。
调用链路:clientA -> gateway -> serverB
如果gateway -> serverB这一步失败,则采用备选机serverC,即在gateway做重试,gateway -> serverC.
请注意,gateway -> serverB(serverC)这一步是采用webFlux的。
现在的问题是,我如何拿到gateway -> serverB的错误返回码,并且去重试serverC?
gateway -> serverB(serverC)的代码如下(取名为sseHttp):
Flux<Response> responseFlux = WebClient.create(url)
.post().headers(httpHeaders -> setHeaders(httpHeaders, headers))
.contentType(MediaType.APPLICATION_JSON).bodyValue(jsonBody)
.retrieve().onStatus(status -> status != HttpStatus.OK, response -> {
String message = String.valueOf(response.bodyToMono(String.class));
aimlGatewayFptiEvent.put(INT_ERROR_CODE, String.valueOf(response.statusCode()));
aimlGatewayFptiEvent.put(STATUS_CODE, String.valueOf(response.statusCode()));
aimlGatewayFptiEvent.put(INT_ERROR_DESC, message);
logger.error("url:{}, response.statusCode:{}, message:{}",
url, response.statusCode(), message);
throw new GatewayException("call upstream seldon error, code:" + response.statusCode(),
INTERNAL_SERVER_ERROR);
})
.bodyToFlux(typeRef)
.onErrorResume(WebClientResponseException.class, err -> {
aimlGatewayFptiEvent.put(INT_ERROR_CODE, String.valueOf(err.getStatusCode()));
aimlGatewayFptiEvent.put(INT_ERROR_DESC, err.getMessage());
throw new GatewayException(err.getMessage(), INTERNAL_SERVER_ERROR);
})
.doOnNext(event -> {
if(chunksCount.get() == 0) {
firstChunkReceivedTime.set(System.currentTimeMillis());
}
chunksCount.getAndIncrement();
if(null != event && null != event.data() && (null == bypassPayload || !bypassPayload)) {
try {
aimlGatewayFptiEvent.putResp(compressStringAndBase64Encode(event.data()));
} catch (IOException e) {
logger.error("Failed to compress and encode predict result.", e);
}
}
})
.map(event -> Response.ok().entity(
buildData2PredictResponse(event.data(), request.getEndpoint())
).build())
.onErrorReturn(GatewayExceptionHandler.toStreamErrorResponse(
new GatewayException("Error predict from upstream seldon.", INTERNAL_SERVER_ERROR)))
.doFinally((signalType) -> {
if (chunksCount.get() > 0) {
aimlGatewayFptiEvent.putIfAbsent(STATUS_CODE, "200");
}
aimlGatewayFptiEvent.putResponseTs(Long.toString(System.currentTimeMillis()));
aimlGatewayFptiEvent.putApiDuration(Long.toString(System.currentTimeMillis() - beginTs));
if(!Objects.equals(aimlGatewayFptiEvent.get(STATUS_CODE), "200")) {
calTransaction.setStatus(EXCEPTION);
calTransaction.addData(ERR_MSG, aimlGatewayFptiEvent.get(INT_ERROR_DESC));
CalLogHelper.logException(calTransaction.getType() + EXCEPTION_POSTFIX,
new GatewayException(aimlGatewayFptiEvent.get(INT_ERROR_DESC), INTERNAL_SERVER_ERROR));
}
calTransaction.completed();
// write to fpti
trackingUtil.send2Fpti(aimlGatewayFptiEvent, httpParams);
})
尝试1:
尝试抓取gateway -> serverB抛出的异常,然后重试。
try {
sseHttp(serverB.url); // call serverB
} catch(Exception e) {
sseHttp(serverC.url); // call serverC
}
但问题是,call serverB即使失败了,也会先返回给clientA,而不是抛出异常。当serverB不可用,但是serverC可用时候,在clientA仍然是出错的。
尝试2:
使用subscribe获取gateway -> serverB的返回码。
Flux<Response> responseFlux = sseHttp(serverB.url);
responseFlux.subscribe(response -> {
//设置map标记
if (response.getStatus() != 200) {
aimlGatewayFptiEvent.put(STATUS_CODE, String.valueOf(response.getStatus()));
}
});
//检查标记
if(null != aimlGatewayFptiEvent.get(STATUS_CODE) && !"200".equals(aimlGatewayFptiEvent.get(STATUS_CODE))) {
responseFlux = sseHttp(serverC.url);
}
很遗憾,由于subscribe是非阻塞的,因此在aimlGatewayFptiEvent被设置前,代码已经走到下面,并且返回了失败。
我如何能在clientA中得到成功的返回,当serverB不可用但是serverC可用?
我使用了AI提供的算法,它确实是有效的。然后,当serverB和serverC都可用的时候,会返回2次成功结果。如何才能失败后再重试?