并发请求业务场景实现

1.开发语言java,框架springboot等。
2.业务场景:以电子卡激活为例,服务B接收到用户A(也可能是第三方程序)提交的激活电子卡请求,这一笔订单中需要激活n张卡(1<=n<=200),每次激活一张卡都需要向C发起一次HTTP请求,而且C每次激活都需要一定的时间(时间在几十毫秒到几秒间),而且B给A返回此次订单中所有激活卡的信息(从C获取到卡密或者激活失败);
3.主要问题在从B到C之间的设计,如何并发的发送这n次请求,而且将返回的信息归档在同一个订单号中返回给
A,并且使用较少的时间。
4.之前使用了akka,但是对akka不大熟悉,正在努力。

阅读 7.1k
5 个回答

这么好的问题为什么没有关注度?
上面的回答中的方式是使用同步的 HTTP 请求,这样的好处是简单不容易出错,缺点肯定就是效率会偏低和占用不必要的 CPU 资源。

讲下我的思路:

  1. 首先,A->B 这块,既然使用的是 SpringBoot,了解下 SpringMVC 的异步模式(关键类:DefferedResultWebAsyncTask
  2. B->C 这块,既然 B->C 是发送 HTTP 请求,那么可以考虑异步的 HTTP 操作(可以简单了解下 okhttp 或者 httpclient 中的异步操作)。使用一个线程专门发送异步 HTTP 请求,在异步 HTTP 的回调中,可以将请求的结果写入到消息队列(如果业务不复杂可以直接使用 BlockingQueue);另一个线程不停的(或者每隔一小段时间)从队列中取出消息并进行消费:即根据订单号将结果进行归档,通过某个订单号对应的已归档请求的个数来判断是否将结果返回给 A。
  3. 至于线程的管理,如果将异步 HTTP 请求交给了 okhttp 或者 httpclient,那么之外涉及的线程不多,可能没有必要使用 akka —— 可以考虑下 SpringMVC 内置的线程池。

不能让C提供一个批量激活的接口吗?
当然估计是不能,那我说下我的想法,其实业务场景不复杂,可以直接用线程处理,在所有线程处理完之前阻塞住主线程,大概代码如下:

//n张卡
List<Card> cardList = new ArrayList<>();
//线程池处理请求
ExecutorService executorService = Executors.newFixedThreadPool(8);
final CountDownLatch cdl = new CountDownLatch(cardList.size());
final List<String> result = new ArrayList<>();
for (Object card : cardList) {
    executorService.execute(new Runnable() {
                
        @Override
        public void run() {
            //请求C并拿到响应
            result.add(post(card));
            cdl.countDown();
        }
    });
}
cdl.await();
    public static void main(String[] args) {
        try {
            TestMain testMain = new TestMain();
            testMain.exec1();
            testMain.exec2();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    void exec1() throws ExecutionException, InterruptedException {
        System.out.println("FutureTask========== begin");
        int quantity = 200; // 激活卡数量
        final long facePrice = 5000;  //相同面值
        //创建异步任务列表
        ArrayList<FutureTask<Card>> futureTasks = new ArrayList<>();
        //初始化线程池
        ExecutorService executorService = Executors.newFixedThreadPool(quantity);
        long start = System.currentTimeMillis();
        Callable<Card> callable = new Callable<Card>() {
            @Override
            public Card call() throws Exception {
                Integer res = new Random().nextInt(100);
                Thread.sleep(res);
                Card card = new Card();
                card.setCardNo(Thread.currentThread().getId());
                card.setCardPass(res);
                card.setFacePrice(facePrice);
                return card;
            }
        };
        for (int i=0;i<quantity;i++){
            //创建一个异步任务
            FutureTask<Card> cardFutureTask = new FutureTask<>(callable);
            futureTasks.add(cardFutureTask);
            //提交异步任务到线程池,让线程池管理任务,由于是异步并行任务,所以不会发生阻塞
            executorService.submit(cardFutureTask);
        }
        ArrayList<Card> result = new ArrayList<>();
        for (FutureTask<Card> futureTask : futureTasks){
            //futureTask.get() 得到任务执行的结果
            Card card = futureTask.get();
            result.add(card);
        }
        long end = System.currentTimeMillis();
        int count = 0;
        for (Card card : result){
//            System.out.println("获取结果数据"+card);
            count+=card.getCardPass();
        }
        System.out.println("线程池的任务全部完成:结果为:"+count+",main线程关闭,进行线程的清理");
        System.out.println("使用时间end:"+(end-start)+"ms");
        //清理线程池
        executorService.shutdown();
        System.out.println("FutureTask========== end");
    }
    void exec2() throws InterruptedException {
        System.out.println("使用阻塞线程=-===========begin");
        int quantity = 200; // 激活卡数量
        final long facePrice = 5000;  //相同面值
        long start = System.currentTimeMillis();
        //线程池处理请求
        ExecutorService executorService = Executors.newFixedThreadPool(8);
        final CountDownLatch cdl = new CountDownLatch(quantity);
        final List<Card> result = new ArrayList<>();
        for (int i=0;i<quantity;i++) {
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    //请求C并拿到响应
                    Integer res = new Random().nextInt(100);
                    try {
                        Thread.sleep(res);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Card card = new Card();
                    card.setCardNo(Thread.currentThread().getId());
                    card.setCardPass(res);
                    card.setFacePrice(facePrice);
                    result.add(card);
                    cdl.countDown();
                }
            });
        }
        cdl.await();
        long end = System.currentTimeMillis();
        int count = 0;
        for (Card card : result){
            count += card.getCardPass();
//            System.out.println(card);
        }
        System.out.println("线程池的任务全部完成:结果为:"+count+",main线程关闭,进行线程的清理");
        System.out.println("使用时间end:"+(end-start)+"ms");
        //清理线程池
        executorService.shutdown();
        System.out.println("使用阻塞线程=-===========end");
    }
    class Card{
        long cardNo;
        long cardPass;
        long facePrice; 
        //getter setter 
        

改造业务流程啊:
1:用户提交激活请求后,后台把所有卡送到任务池等待激活,立刻返回“你的卡正在激活中”
2:然后后台任务池逐一执行激活动作
3:用户终端逐一获取每张卡的激活状态(也可以由后台push状态)

你所说的“使用最少时间”这个要求没太大意义:因为时间可能不受你的控制,C服务的处理时长和处理能力可能你控制不了(比如银行的接口),你只能主动向C妥协。

业务改造之后,通过任务池可以方便的控制调用C的频次,可以更好的适应C的处理能力。

最好不要并发去多次发送请求,这样服务器性能会下降,另一个重要原因是数据会紊乱,做一次请求就好,后面可以轮询你的数据是否ok了

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题