如何正确读取 Flux<DataBuffer> 并将其转换为单个 inputStream

新手上路,请多包涵

我正在为我的 spring-boot 应用程序使用 WebClient 和自定义 BodyExtractor

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> {
     return response.body(new BodyExtractor());
   })

BodyExtractor.java

 @Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;
    }
  }).next();
}

上面的代码适用于小有效载荷但不适用于大有效载荷,我认为这是因为我只读取一个通量值 next 我不确定如何组合和读取所有 dataBuffer

我是反应堆的新手,所以我不知道通量/单声道的很多技巧。

原文由 Bk Santiago 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 3.5k
2 个回答

我能够通过使用 Flux#collectSequenceInputStream 使其工作

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
    .map(inputStream -> {
      try {
        JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
        Unmarshaller unmarshaller = jc.createUnmarshaller();

        return (T) unmarshaller.unmarshal(inputStream);
      } catch(Exception e){
        return null;
      }
  }).next();
}

InputStreamCollector.java

 public class InputStreamCollector {
  private InputStream is;

  public void collectInputStream(InputStream is) {
    if (this.is == null) this.is = is;
    this.is = new SequenceInputStream(this.is, is);
  }

  public InputStream getInputStream() {
    return this.is;
  }
}

原文由 Bk Santiago 发布,翻译遵循 CC BY-SA 3.0 许可协议

这真的不像其他答案所暗示的那么复杂。

正如@jin-kwon 所建议的那样,流式传输数据而不将其全部缓冲在内存中的唯一方法是使用管道。然而,它可以通过使用 Spring 的 BodyExtractorsDataBufferUtils 实用程序类非常简单地完成。

例子:

 private InputStream readAsInputStream(String url) throws IOException {
    PipedOutputStream osPipe = new PipedOutputStream();
    PipedInputStream isPipe = new PipedInputStream(osPipe);

    ClientResponse response = webClient.get().uri(url)
        .accept(MediaType.APPLICATION.XML)
        .exchange()
        .block();
    final int statusCode = response.rawStatusCode();
    // check HTTP status code, can throw exception if needed
    // ....

    Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
        .doOnError(t -> {
            log.error("Error reading body.", t);
            // close pipe to force InputStream to error,
            // otherwise the returned InputStream will hang forever if an error occurs
            try(isPipe) {
              //no-op
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
            }
        })
        .doFinally(s -> {
            try(osPipe) {
              //no-op
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
            }
        });

    DataBufferUtils.write(body, osPipe)
        .subscribe(DataBufferUtils.releaseConsumer());

    return isPipe;
}

如果您不关心检查响应代码或抛出失败状态代码的异常,您可以跳过 block() 调用和中间 ClientResponse 变量通过使用

flatMap(r -> r.body(BodyExtractors.toDataBuffers()))

反而。

原文由 user1585916 发布,翻译遵循 CC BY-SA 4.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题