Java 执行者:如何在任务完成时得到通知而不阻塞?

新手上路,请多包涵

假设我有一个队列,里面装满了我需要提交给执行程序服务的任务。我希望他们一次处理一个。我能想到的最简单的方法是:

  1. 从队列中取出一个任务
  2. 提交给执行人
  3. 在返回的 Future 上调用 .get 并阻塞直到结果可用
  4. 从队列中取出另一个任务…

但是,我试图避免完全阻塞。如果我有 10,000 个这样的队列,它们需要一次处理一个任务,我就会用完堆栈空间,因为它们中的大多数都将占用阻塞的线程。

我想要的是提交任务并提供任务完成时调用的回调。我将使用该回调通知作为发送下一个任务的标志。 (functionaljava和jetlang显然使用了这样的非阻塞算法,但我看不懂他们的代码)

我如何使用 JDK 的 java.util.concurrent 来做到这一点,而不是编写我自己的执行程序服务?

(为我提供这些任务的队列本身可能会阻塞,但这是一个稍后要解决的问题)

原文由 Shahbaz 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 377
2 个回答

定义一个回调接口来接收您想要在完成通知中传递的任何参数。然后在任务结束时调用它。

您甚至可以为 Runnable 任务编写通用包装器,并将它们提交给 ExecutorService 。或者,请参阅下面的 Java 8 内置机制。

 class CallbackTask implements Runnable {

  private final Runnable task;

  private final Callback callback;

  CallbackTask(Runnable task, Callback callback) {
    this.task = task;
    this.callback = callback;
  }

  public void run() {
    task.run();
    callback.complete();
  }

}


借助 CompletableFuture ,Java 8 包含了一种更精细的方法来组合可以异步和有条件地完成进程的管道。这是一个人为但完整的通知示例。

 import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

public class GetTaskNotificationWithoutBlocking {

  public static void main(String... argv) throws Exception {
    ExampleService svc = new ExampleService();
    GetTaskNotificationWithoutBlocking listener = new GetTaskNotificationWithoutBlocking();
    CompletableFuture<String> f = CompletableFuture.supplyAsync(svc::work);
    f.thenAccept(listener::notify);
    System.out.println("Exiting main()");
  }

  void notify(String msg) {
    System.out.println("Received message: " + msg);
  }

}

class ExampleService {

  String work() {
    sleep(7000, TimeUnit.MILLISECONDS); /* Pretend to be busy... */
    char[] str = new char[5];
    ThreadLocalRandom current = ThreadLocalRandom.current();
    for (int idx = 0; idx < str.length; ++idx)
      str[idx] = (char) ('A' + current.nextInt(26));
    String msg = new String(str);
    System.out.println("Generated message: " + msg);
    return msg;
  }

  public static void sleep(long average, TimeUnit unit) {
    String name = Thread.currentThread().getName();
    long timeout = Math.min(exponential(average), Math.multiplyExact(10, average));
    System.out.printf("%s sleeping %d %s...%n", name, timeout, unit);
    try {
      unit.sleep(timeout);
      System.out.println(name + " awoke.");
    } catch (InterruptedException abort) {
      Thread.currentThread().interrupt();
      System.out.println(name + " interrupted.");
    }
  }

  public static long exponential(long avg) {
    return (long) (avg * -Math.log(1 - ThreadLocalRandom.current().nextDouble()));
  }

}

原文由 erickson 发布,翻译遵循 CC BY-SA 3.0 许可协议

在 Java 8 中,您可以使用 CompletableFuture 。这是我的代码中的一个示例,我使用它从我的用户服务中获取用户,将它们映射到我的视图对象,然后更新我的视图或显示错误对话框(这是一个 GUI 应用程序):

     CompletableFuture.supplyAsync(
            userService::listUsers
    ).thenApply(
            this::mapUsersToUserViews
    ).thenAccept(
            this::updateView
    ).exceptionally(
            throwable -> { showErrorDialogFor(throwable); return null; }
    );

它异步执行。我正在使用两种私有方法: mapUsersToUserViewsupdateView

原文由 Matt 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题