现在我有个生产者-消费者模型,我想在服务器端开启消费者线程,不断去接收队列中的消息;
代码:
全局变量:
private final int NUMBER=6;
private final Executor exec= Executors.newFixedThreadPool(NUMBER);//创建线程池
private volatile BlockingQueue<KaiFaBan> queue=new LinkedBlockingQueue<>();//生成一个全局的无界队列blockingQueue
private volatile int KAFABANNUMBER1=0;//第一个开发版存储的数据
private volatile int KAFABANNUMBER2=0;//第二个开发版存储的数据
private volatile int KAFABANNUMBER3=0;//第三个开发版存储的数据
private volatile Object myLock=new Object();
//开发板发来的的信息(生产者,不断的请求浏览器)
@RequestMapping("/produce")
public void produce(HttpServletRequest request, HttpServletResponse response, KaiFaBan kaiFaBan){
System.out.println("生产者的信息是:"+kaiFaBan.getName()+",获得的数据是:"+kaiFaBan.getCount());
Runnable producter=new Runnable() {
@Override
public synchronized void run() {
//单个长耗时任务
try {
if(kaiFaBan != null){
queue.put(kaiFaBan);//阻塞
System.out.println("已存入队列中");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
exec.execute(producter);
}
//消费者
@RequestMapping("/consumer")
public void consumer(HttpServletRequest request,HttpServletResponse response){
//遍历队列中的信息
Iterator<KaiFaBan> iterator = queue.iterator();
while(iterator.hasNext()){
KaiFaBan next = iterator.next();
}
while (true){
Runnable consumer=new Runnable() {
@Override
public void run() {
try {
System.out.println("开启消费者");
KaiFaBan takeValue = queue.take();//阻塞
System.out.println("解开阻塞,开始从队列中获取信息");
synchronized (myLock){
if(takeValue != null){
if(takeValue.getName().equals("开发板1")){
if(KAFABANNUMBER1 < takeValue.getCount()){
KAFABANNUMBER1=takeValue.getCount();
System.out.println("KAFABANNUMBER1:"+KAFABANNUMBER1);
}
}else if(takeValue.getName().equals("开发板2")){
if(KAFABANNUMBER2 < takeValue.getCount()){
KAFABANNUMBER2=takeValue.getCount();
System.out.println("KAFABANNUMBER2:"+KAFABANNUMBER2);
}
}else if(takeValue.getName().equals("开发板3")){
if(KAFABANNUMBER3 < takeValue.getCount()){
KAFABANNUMBER3=takeValue.getCount();
System.out.println("KAFABANNUMBER3:"+KAFABANNUMBER3);
}
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
exec.execute(consumer);
}
}
现在的问题是,如何保证消费者一直在线,不断的去处理队列中的消息呢?
我先调用localhost:8080/consumer,让消费者起来;然后在调用生产者:localhost:8080/produce;消息者进入阻塞后;完全没反应??我本来想的是,此时消息者会离开阻塞,然后去处理队列中消息。。。。
控制台的消息是:
开启消费者
开启消费者
开启消费者
开启消费者
开启消费者
开启消费者
生产者的信息是:开发板1,获得的数据是:5
消费者启动线程,不退出