我正在尝试使用 Webflux 将生成的文件流式传输到另一个位置,但是,如果文件的生成遇到错误,api 将返回成功,但在生成文件而不是文件本身时会使用 DTO 详细说明错误。这是使用非常陈旧且设计不佳的 api,因此请原谅使用 post 和 api 设计。
api 调用 (exchange()) 的响应是 ClientResponse。从这里我可以使用 bodyToMono 转换为 ByteArrayResource,它可以流式传输到文件,或者,如果在创建文件时出错,那么我也可以使用 bodyToMono 转换为 DTO。但是,我似乎不能做或取决于 ClientResponse 标头的内容。
在运行时我得到一个 IllegalStateException 引起的
block()/blockFirst()/blockLast()是阻塞的,thread reactor-http-client-epoll-12不支持
我认为我的问题是我不能在同一个函数链中调用 block() 两次。
我的代码片段是这样的:
webClient.post()
.uri(uriBuilder -> uriBuilder.path("/file/")
.queryParams(params).build())
.exchange()
.doOnSuccess(cr -> {
if (MediaType.APPLICATION_JSON_UTF8.equals(cr.headers().contentType().get())) {
NoPayloadResponseDto dto = cr.bodyToMono(NoPayloadResponseDto.class).block();
createErrorFile(dto);
}
else {
ByteArrayResource bAr = cr.bodyToMono(ByteArrayResource.class).block();
createSpreadsheet(bAr);
}
}
)
.block();
基本上我想根据标头中定义的 MediaType 以不同方式处理 ClientResponse。
这可能吗?
原文由 DaithiG 发布,翻译遵循 CC BY-SA 4.0 许可协议
首先,有几件事可以帮助您理解解决此用例的代码片段。
subscribe
也不是一个好主意。它或多或少类似于在单独的线程中将作业作为任务启动。完成后您会收到回调(可以为subscribe
方法提供 lambda),但实际上您正在将当前管道与该任务解耦。在这种情况下,在您有机会读取完整的响应正文并将其写入文件之前,可以关闭客户端 HTTP 响应并清理资源DataBuffer
(想想可以合并的 ByteBuffer 实例)。void
),您可以调用 block,例如在测试用例中。这是您可以用来执行此操作的代码片段:
如您所见,我们没有在任何地方阻塞,并且处理 I/O 的方法正在返回
Mono<Void>
,这是反应式等同于done(error)
回调,它在事情完成时发出信号,如果发生错误。由于我不确定
createErrorFile
方法应该做什么,我提供了一个示例createSpreadsheet
只是将正文字节写入文件。请注意,由于数据缓冲区可能会被回收/合并,因此我们需要在完成后释放它们。通过此实现,您的应用程序将在给定时间在内存中保存一些
DataBuffer
实例(出于性能原因,反应性运算符会预取值)并将以反应性方式写入字节。