单生产者多消费者模型中,如果一个消费者出现异常,怎么通知其他消费者停止处理任务?

如题,在处理海量数据的时候,生产者生产数据的速度远大于消费者,故为了平衡处理能力,增加多个消费者进行处理,但是这里有个问题就是,如果这批数据一旦有那个消费者处理出现问题,其他消费者要停止处理,等待下一批数据,有什么办法能做到吗?

阅读 5.5k
3 个回答

@边城 帮忙review下,我这里写了个demo,用的是观察者模式实现的。看看是否合适,是否有更好的方法。多谢,多谢。

生产者类DataProducer

import java.util.Observable;
import java.util.Observer;
import java.util.concurrent.BlockingQueue;

public class DataProducer implements Observer, Runnable {

    private BlockingQueue<String> blockingQueue;

    private boolean handling = true;

    public DataProducer(BlockingQueue<String> blockingQueue) {
        this.blockingQueue = blockingQueue;
    }

    @Override
    public void run() {
        while (handling) {
            try
            {
                String str =  handleData();
                blockingQueue.offer(str);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private String handleData() {
        return null;
    }

    @Override
    public void update(Observable o, Object arg) {
        if (arg.toString().equals("stopHandling")) {
            System.out.println("stopHandling data");
            handling = false;
            blockingQueue.clear();
        }
    }
}

消费者类DataConsumer

import java.util.Observable;
import java.util.concurrent.BlockingQueue;

public class DataConsumer extends Observable implements Runnable {

    private BlockingQueue<String> blockingQueue;

    public DataConsumer(BlockingQueue<String> blockingDeque) {
        this.blockingQueue = blockingDeque;
    }

    @Override
    public void run() {
        while (true) {
            try {
                String str = blockingQueue.take();
                handleData(str);
            } catch (Exception e) {
                notifyObservers("stopHandling");
            }
        }

    }

    private void handleData(String str) {
    }

    @Override
    public void notifyObservers(Object arg) {
        super.setChanged();

        super.notifyObservers(arg);

    }
}

主线程启动任务,测试程序

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedTransferQueue;

public class TestData {
    private static final ExecutorService executor = Executors.newFixedThreadPool(5);

    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new LinkedTransferQueue<>();
        DataProducer dataProducer = new DataProducer(blockingQueue);
        executor.execute(dataProducer);
        for (int i = 0; i < 4; i++) {
            executor.execute(new DataConsumer(blockingQueue));
        }
    }
}

在传递给消费者的参数里加上状态,一但某个消费者处理出问题,就修改其状态标记问题,其它消费者检查到问题标记之后就不继续处理了(但是已经在处理过程中的仍然会继续,除非过程中多次检查状态)

新手上路,请多包涵

可以在队列里面塞一个‘毒丸’ 元素 ,消费者检测到该对象就停止消费

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题