使用队列的生产者/消费者线程

新手上路,请多包涵

我想创建某种 Producer/Consumer 线程应用程序。但我不确定在两者之间实现队列的最佳方式是什么。

所以我想出了两个想法(这两个想法都可能是完全错误的)。我想知道哪个更好,如果它们都很糟糕,那么实现队列的最佳方式是什么。我关心的主要是我在这些示例中对队列的实现。我正在扩展一个队列类,它是一个内部类并且是线程安全的。下面是两个示例,每个示例有 4 个类。

主要类-

 public class SomeApp
{
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        consumer = new Consumer();
        producer = new Producer();
    }
}

消费类-

 public class Consumer implements Runnable
{
    public Consumer()
    {
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = QueueHandler.dequeue();
            //do some stuff with the object
        }
    }
}

制作人班——

 public class Producer implements Runnable
{
    public Producer()
    {
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {
        while(true)
        {
            //add to the queue some sort of unique object
            QueueHandler.enqueue(new Object());
        }
    }
}

队列类-

 public class QueueHandler
{
    //This Queue class is a thread safe (written in house) class
    public static Queue<Object> readQ = new Queue<Object>(100);

    public static void enqueue(Object object)
    {
        //do some stuff
        readQ.add(object);
    }

    public static Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

或者

主要类-

 public class SomeApp
{
    Queue<Object> readQ;
    private Consumer consumer;
    private Producer producer;

    public static void main (String args[])
    {
        readQ = new Queue<Object>(100);
        consumer = new Consumer(readQ);
        producer = new Producer(readQ);
    }
}

消费类-

 public class Consumer implements Runnable
{
    Queue<Object> queue;

    public Consumer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread consumer = new Thread(this);
        consumer.start();
    }

    public void run()
    {
        while(true)
        {
            //get an object off the queue
            Object object = queue.dequeue();
            //do some stuff with the object
        }
    }
}

制作人班——

 public class Producer implements Runnable
{
    Queue<Object> queue;

    public Producer(Queue<Object> readQ)
    {
        queue = readQ;
        Thread producer = new Thread(this);
        producer.start();
    }

    public void run()
    {

        while(true)
        {
            //add to the queue some sort of unique object
            queue.enqueue(new Object());
        }
    }
}

队列类-

 //the extended Queue class is a thread safe (written in house) class
public class QueueHandler extends Queue<Object>
{
    public QueueHandler(int size)
    {
        super(size); //All I'm thinking about now is McDonalds.
    }

    public void enqueue(Object object)
    {
        //do some stuff
        readQ.add();
    }

    public Object dequeue()
    {
        //do some stuff
        return readQ.get();
    }
}

去!

原文由 Gareth 发布,翻译遵循 CC BY-SA 4.0 许可协议

阅读 478
2 个回答

Java 5+ 拥有完成此类任务所需的所有工具。你会想要:

  1. 将所有生产者放在一个 ExecutorService 中;
  2. 将所有消费者放在另一个 ExecutorService 中;
  3. 如有必要,使用 BlockingQueue 在两者之间进行通信。

我对 (3) 说“如有必要”,因为根据我的经验,这是一个不必要的步骤。您所做的就是向消费者执行程序服务提交新任务。所以:

 final ExecutorService producers = Executors.newFixedThreadPool(100);
final ExecutorService consumers = Executors.newFixedThreadPool(100);
while (/* has more work */) {
  producers.submit(...);
}
producers.shutdown();
producers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
consumers.shutdown();
consumers.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);

所以 producers 直接提交到 consumers

原文由 cletus 发布,翻译遵循 CC BY-SA 3.0 许可协议

好的,正如其他人所说,最好的办法是使用 java.util.concurrent 包。我强烈推荐“Java 并发实践”。这是一本很棒的书,几乎涵盖了您需要了解的所有内容。

至于您的特定实现,正如我在评论中指出的那样,不要从构造函数启动线程——这可能是不安全的。

撇开这一点不谈,第二个实现似乎更好。您不想将队列放在静态字段中。您可能只是无缘无故地失去了灵活性。

如果您想继续自己的实施(我猜是出于学习目的?),请至少提供一个 start() 方法。你应该构造对象(你可以实例化 Thread 对象),然后调用 start() 启动线程。

编辑: ExecutorService 有自己的队列,所以这可能会造成混淆。这里有一些可以帮助您入门的东西。

 public class Main {
    public static void main(String[] args) {
        //The numbers are just silly tune parameters. Refer to the API.
        //The important thing is, we are passing a bounded queue.
        ExecutorService consumer = new ThreadPoolExecutor(1,4,30,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(100));

        //No need to bound the queue for this executor.
        //Use utility method instead of the complicated Constructor.
        ExecutorService producer = Executors.newSingleThreadExecutor();

        Runnable produce = new Produce(consumer);
        producer.submit(produce);
    }
}

class Produce implements Runnable {
    private final ExecutorService consumer;

    public Produce(ExecutorService consumer) {
        this.consumer = consumer;
    }

    @Override
    public void run() {
        Pancake cake = Pan.cook();
        Runnable consume = new Consume(cake);
        consumer.submit(consume);
    }
}

class Consume implements Runnable {
    private final Pancake cake;

    public Consume(Pancake cake){
        this.cake = cake;
    }

    @Override
    public void run() {
        cake.eat();
    }
}

进一步编辑:对于制作人,而不是 while(true) ,您可以执行以下操作:

 @Override
public void run(){
    while(!Thread.currentThread().isInterrupted()){
        //do stuff
    }
}

这样你就可以通过调用 .shutdownNow() 关闭执行器。如果您使用 while(true) ,它不会关闭。

另请注意, Producer 仍然容易受到 RuntimeExceptions 的攻击(即一个 RuntimeException 将停止处理)

原文由 Enno Shioji 发布,翻译遵循 CC BY-SA 3.0 许可协议

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题