关于生产者消费者中,如何让消费者不销毁一直处理队列中的消息呢

最忙碌的语言
  • 68

现在我有个生产者-消费者模型,我想在服务器端开启消费者线程,不断去接收队列中的消息;

代码:
全局变量:
private final int NUMBER=6;

private final Executor exec= Executors.newFixedThreadPool(NUMBER);//创建线程池
private volatile BlockingQueue<KaiFaBan> queue=new LinkedBlockingQueue<>();//生成一个全局的无界队列blockingQueue
private volatile int KAFABANNUMBER1=0;//第一个开发版存储的数据
private volatile int KAFABANNUMBER2=0;//第二个开发版存储的数据
private volatile int KAFABANNUMBER3=0;//第三个开发版存储的数据
private volatile Object myLock=new Object();

//开发板发来的的信息(生产者,不断的请求浏览器)

@RequestMapping("/produce")
public void produce(HttpServletRequest request, HttpServletResponse response, KaiFaBan kaiFaBan){
    System.out.println("生产者的信息是:"+kaiFaBan.getName()+",获得的数据是:"+kaiFaBan.getCount());
    Runnable producter=new Runnable() {
        @Override
        public synchronized void run() {
            //单个长耗时任务
            try {
                if(kaiFaBan != null){
                    queue.put(kaiFaBan);//阻塞
                    System.out.println("已存入队列中");
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    };
    exec.execute(producter);
}

//消费者
@RequestMapping("/consumer")
public void consumer(HttpServletRequest request,HttpServletResponse response){
    //遍历队列中的信息
    Iterator<KaiFaBan> iterator = queue.iterator();
    while(iterator.hasNext()){
        KaiFaBan next = iterator.next();
    }
    while (true){
        Runnable consumer=new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("开启消费者");
                    KaiFaBan takeValue = queue.take();//阻塞
                    System.out.println("解开阻塞,开始从队列中获取信息");
                    synchronized (myLock){
                        if(takeValue != null){
                            if(takeValue.getName().equals("开发板1")){
                                if(KAFABANNUMBER1 < takeValue.getCount()){
                                    KAFABANNUMBER1=takeValue.getCount();
                                    System.out.println("KAFABANNUMBER1:"+KAFABANNUMBER1);
                                }
                            }else if(takeValue.getName().equals("开发板2")){
                                if(KAFABANNUMBER2 < takeValue.getCount()){
                                    KAFABANNUMBER2=takeValue.getCount();
                                    System.out.println("KAFABANNUMBER2:"+KAFABANNUMBER2);
                                }
                            }else if(takeValue.getName().equals("开发板3")){
                                if(KAFABANNUMBER3 < takeValue.getCount()){
                                    KAFABANNUMBER3=takeValue.getCount();
                                    System.out.println("KAFABANNUMBER3:"+KAFABANNUMBER3);
                                }
                            }
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        exec.execute(consumer);
    }
}

现在的问题是,如何保证消费者一直在线,不断的去处理队列中的消息呢?
我先调用localhost:8080/consumer,让消费者起来;然后在调用生产者:localhost:8080/produce;消息者进入阻塞后;完全没反应??我本来想的是,此时消息者会离开阻塞,然后去处理队列中消息。。。。

控制台的消息是:
 开启消费者
 开启消费者
 开启消费者
 开启消费者
 开启消费者
 开启消费者

 生产者的信息是:开发板1,获得的数据是:5
  
  


回复
阅读 363
1 个回答

消费者启动线程,不退出

    @RequestMapping("/consumer")
    public void consumer(HttpServletRequest request, HttpServletResponse response) {
        //遍历队列中的信息
        Iterator<KaiFaBan> iterator = queue.iterator();
        while (iterator.hasNext()) {
            KaiFaBan next = iterator.next();
        }
        Runnable consumer = new Runnable() {
            @Override
            public void run() {
                System.out.println("开启消费者");
                while (true) {
                    try {
                        KaiFaBan takeValue = queue.take();//阻塞
                        System.out.println("解开阻塞,开始从队列中获取信息");
                        synchronized (myLock) {
                            if (takeValue != null) {
                                if (takeValue.getName().equals("开发板1")) {
                                    if (KAFABANNUMBER1 < takeValue.getCount()) {
                                        KAFABANNUMBER1 = takeValue.getCount();
                                        System.out.println("KAFABANNUMBER1:" + KAFABANNUMBER1);
                                    }
                                } else if (takeValue.getName().equals("开发板2")) {
                                    if (KAFABANNUMBER2 < takeValue.getCount()) {
                                        KAFABANNUMBER2 = takeValue.getCount();
                                        System.out.println("KAFABANNUMBER2:" + KAFABANNUMBER2);
                                    }
                                } else if (takeValue.getName().equals("开发板3")) {
                                    if (KAFABANNUMBER3 < takeValue.getCount()) {
                                        KAFABANNUMBER3 = takeValue.getCount();
                                        System.out.println("KAFABANNUMBER3:" + KAFABANNUMBER3);
                                    }
                                }
                            }
                        }
                    }
                } catch(InterruptedException e){
                    e.printStackTrace();
                }
            }
        };
        exec.execute(consumer);
    }
撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
你知道吗?

宣传栏