问题描述
批量处理请求内容。程序接收一个请求,请求中的list中有1000个Map,要用这1000个Map的数据去调用同一个方法进行处理,处理完成后,把这1000个处理结果响应回去。
问题出现的环境背景及自己尝试过哪些方法
调用的处理业务的方法执行时间为1~2秒。
业务处理内容:
{
1.字段校验
2.查询数据库,根据返回结果进行校验(耗时)
3.查询数据库,根据返回结果进行校验(耗时)
4.将请求数据部分字段处理然后保存到数据库
5.使用第4步中处理完的数据调用其他系统等待结果(耗时,300~800毫秒左右)
6.根据第5步返回的结果更新第4步中存入数据库的信息(耗时)
7.返回处理结果
}
尝试方案1:使用Java8的parallelStream()进行并行处理。
结果:不指定线程数时,最多使用4个线程。指定线程数后,最多使用16个线程。且CPU使用率都很低
尝试方案2:使用线程池(定长线程池和缓冲线程池)
结果:执行需要时间仍然很长,CPU使用率很低。
相关代码
//如下为部分代码
方案1:
List<Map<String,Object>> reqMapList = reqMap.get("reqList");
List<Map> resultList = new ArrayList<>();
reqMapList.parallelStream().forEach(map -> {
Map<String, Object> map1 = doService.doService(map);
resultList.add(map1);
}))
List<Map<String,Object>> reqMapList = reqMap.get("reqList");
List<Map> resultList = new ArrayList<>();
new ForkJoinPool(parallelism).submit(() -> reqMapList.parallelStream().forEach(map -> {
Map<String, Object> map1 = doService.doService(map);
resultList.add(map1);
})).get();
//方案2
ExecutorService tpe = Executors.newCachedThreadPool();
ExecutorService tpe = Executors.newFixedThreadPool(50);
//存储线程的返回值
List<Future<Map<String, Object> >> results = new LinkedList<Future<Map<String, Object> >>();
for (Map<String, Object> map : reqMapList) {
//执行任务的线程
Task task = new Task(map);
//调用submit可以获得线程的返回值
Future<Map<String, Object>> result = tpe.submit(task);
results.add(result);
}
你期待的结果是什么?实际看到的错误信息又是什么?
想在30秒内处理完这1000笔业务。请各位给点想法建议,不胜感激~
看了下伪代码流程,感觉可以优化下流程,其实大部分应该在数据库的IO开销上(可能数据库量比较大),以及外部RPC的接口的消耗上。其实第4步数据存储和第6步再更新的逻辑,没必要第四部就存储,量不大的情况可以放内存,直接数据处理完了第6步再统一存储。以及RPC的调用,可以分批并发量小的数据去请求