使用 ResponseEntity 进行流式传输并确保 InputStream 关闭的正确方式

新手上路,请多包涵

我们的一个应用程序泄漏了文件句柄,我们还没有找到原因。

在代码中,我可以看到几个类似的函数:

 public ResponseEntity<InputStreamResource> getFoo( ... ) {
    InputStream content = getContent(...)
    InputStreamResource isr = new InputStreamResource(content);
    return ResponseEntity.status(HttpServletResponse.SC_OK).body(isr);
}

if 检查和 try / catch 为简洁起见删除)

我确信这部分会导致问题,因为当我使用 JMeter 对这个特定代码进行负载测试时,我可以看到 getContent() 在这个阶段失败:

 is = Files.newInputStream(f.toPath());

通常我会关闭 InputStream 但因为这个简短而简单的代码我无法在 return 或调用 body 之前关闭流。

当我运行 lsof (代码在 Linux 上运行)时,我可以看到数千个文件以读取模式打开。所以我确信这个问题是由流没有关闭引起的。

是否有我应该交易的最佳实践代码?

原文由 Marged 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 4.5k
2 个回答

你可以尝试使用 StreamingResponseBody

流响应体

用于异步请求处理的控制器方法返回值类型,其中应用程序可以直接写入响应 OutputStream 而不会阻塞 Servlet 容器线程。

因为您正在处理一个单独的线程,直接写入响应,所以您在 close() return 问题已解决。

也许您可以从以下示例开始

public ResponseEntity<StreamingResponseBody> export(...) throws FileNotFoundException {
    //...

    InputStream inputStream = new FileInputStream(new File("/path/to/example/file"));

    StreamingResponseBody responseBody = outputStream -> {

        int numberOfBytesToWrite;
        byte[] data = new byte[1024];
        while ((numberOfBytesToWrite = inputStream.read(data, 0, data.length)) != -1) {
            System.out.println("Writing some bytes..");
            outputStream.write(data, 0, numberOfBytesToWrite);
        }

        inputStream.close();
    };

    return ResponseEntity.ok()
            .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=generic_file_name.bin")
            .contentType(MediaType.APPLICATION_OCTET_STREAM)
            .body(responseBody);
}

您也可以尝试使用 Files (从java 7开始)

所以你不必管理 InputStream

     File file = new File("/path/to/example/file");

    StreamingResponseBody responseBody = outputStream -> {
        Files.copy(file.toPath(), outputStream);
    };

正如@Stackee007 在评论中描述的那样,在生产环境中的重负载下,为 — 定义一个 @Configuration TaskExecutor 来调整参数和管理 Async 过程。

 @Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {

    private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);

    private final TaskExecutionProperties taskExecutionProperties;

    public AsyncConfiguration(TaskExecutionProperties taskExecutionProperties) {
        this.taskExecutionProperties = taskExecutionProperties;
    }

    //  ---------------> Tune parameters here
    @Override
    @Bean(name = "taskExecutor")
    public Executor getAsyncExecutor() {
        log.debug("Creating Async Task Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(taskExecutionProperties.getPool().getCoreSize());
        executor.setMaxPoolSize(taskExecutionProperties.getPool().getMaxSize());
        executor.setQueueCapacity(taskExecutionProperties.getPool().getQueueCapacity());
        executor.setThreadNamePrefix(taskExecutionProperties.getThreadNamePrefix());
        return executor;
    }

    //  ---------------> Use this task executor also for async rest methods
    @Bean
    protected WebMvcConfigurer webMvcConfigurer() {
        return new WebMvcConfigurer() {
            @Override
            public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
                configurer.setTaskExecutor(getTaskExecutor());
            }
        };
    }

    @Bean
    protected ConcurrentTaskExecutor getTaskExecutor() {
        return new ConcurrentTaskExecutor(this.getAsyncExecutor());
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

如何使用 mockMvc 进行测试

您可以在集成测试中简单地按照以下示例代码进行操作:

     .andExpect(request().asyncStarted())
    .andDo(MvcResult::getAsyncResult)
    .andExpect(status().isOk()).getResponse().getContentAsByteArray();

ResponseEntity<StreamingResponseBody> 的内容类型是 MediaType.APPLICATION_OCTET_STREAM 在这个例子中,你可以得到 byte[] ( .getContentAsByteArray() )但是你可以得到所有内容的字符串/Json您的身体响应内容类型。

原文由 ValerioMC 发布,翻译遵循 CC BY-SA 4.0 许可协议

假设您使用的是 Spring,您的方法可以返回一个 Resource 并让 Spring 处理其余部分(包括关闭底层流)。 Spring API 中几乎没有 Resource 的实现, 否则您需要实现自己的。最后,您的方法会变得简单,并且会像下面这样

public ResponseEntity<Resource> getFo0(...) {
    return new InputStreamResource(<Your input stream>);
}

原文由 Stackee007 发布,翻译遵循 CC BY-SA 4.0 许可协议

推荐问题