CompletableFuture 的使用?

有没有这样一种办法

  1. 让两个或多个任务并行执行
  2. 谁先执行完, 获取结果, 判断结果是否符合要求
  3. 如果不符合, 等待下一个, 直到某一个异步任务结果符合要求时, 响应结果
阅读 2.3k
2 个回答

anyOf方法

    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            // 执行任务1
            System.out.println("Task 1 result");
            return "Task 1 result";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            // 执行任务2
            //这里故意睡眠,这样一定不会执行后续方法.
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            System.out.println("Task 2 result");
            return "Task 2 result";
        });

        CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future1, future2);

        anyFuture.thenApply(result -> {
            if (result.equals("Task 1 result")) {
                // 响应结果
                return "success";

            } else {
                // 等待下一个完成的任务
                return anyFuture.join();
            }
        });
        Object join = anyFuture.join();
        System.out.println(join);

    }

理解错了。不好意思,再写一个demo

List<CompletableFuture<String>> futures = new ArrayList<>();

// 创建并执行多个异步任务
for (int i = 0; i < taskCount; i++) {
    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
        // 执行任务
        return result;
    });
    futures.add(future);
}

// 等待任务完成并处理结果
for (CompletableFuture<String> future : futures) {
    String result = future.get();
    if (result.equals(expectedResult)) {
        // 响应结果
        break;
    }
}

没必要使用CompletableFuture

  public static void main(String[] args) {
        int taskCount = 3;
        List<String> dataObj = new ArrayList<>();
        Callback<String> callback = new CallbackImpl(dataObj, Thread.currentThread(), taskCount);
        ExecutorService executorService = new ThreadPoolExecutor(1, 1, 100, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
        for (int i = 0; i < taskCount; i++) {
            executorService.execute(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                callback.callback("result");
            });
        }
        LockSupport.park();
        handleResult(dataObj);
        executorService.shutdown();

    }

Callback 如下:

class CallbackImpl implements Callback<String> {
    private final List<String> list;
    private final Thread thread;

    private final int taskCount;

    private final AtomicInteger callTimes = new AtomicInteger(0);
    private final AtomicBoolean atomicBoolean=new AtomicBoolean(false);

    public CallbackImpl(List<String> list, Thread thread, int taskCount) {
        this.list = list;
        this.taskCount = taskCount;
        this.thread = thread;

    }

    @Override
    public  void callback(String s) {
        if ("result".equals(s)&& !atomicBoolean.get()) {
            System.out.println(s);

            list.add(s);
            atomicBoolean.set(true);
            LockSupport.unpark(thread);
        } else if(!atomicBoolean.get()) {
            System.out.println(s);
            if (callTimes.incrementAndGet() == taskCount) {

                LockSupport.unpark(thread);
            }
        }
    }
}
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题