concurrentHashMap 并发场景下写入数据后丢失

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Test {
    public static void main(String[] args) throws Exception{
        while (true) {
            AtomicInteger atomicInteger = new AtomicInteger(0) ;
            ConcurrentHashMap<String, ConcurrentHashMap<String, String>> concurrentHashMapConcurrentHashMap = new ConcurrentHashMap<String, ConcurrentHashMap<String, String>>();
            Lock lock = new ReentrantLock() ;
            Condition condition = lock.newCondition() ;

            ExecutorService executor = Executors.newCachedThreadPool();
            for (int i = 0; i < 10; i++) {
                executor.execute(new PutThread(concurrentHashMapConcurrentHashMap ,atomicInteger , lock , condition));
            }

            lock.lock();
            while (atomicInteger.get() != 10){
                condition.await();
            }
            lock.unlock();

            System.out.println(concurrentHashMapConcurrentHashMap.get("topic").size());
            if (concurrentHashMapConcurrentHashMap.get("topic").size() != 500) {
                System.out.println("not ...");
            }

            concurrentHashMapConcurrentHashMap.clear();

            executor.shutdownNow() ;
        }
    }

    public static void put(String topic , String producerKey , String value , ConcurrentHashMap<String , ConcurrentHashMap<String , String>> concurrentHashMapConcurrentHashMap){
        synchronized (concurrentHashMapConcurrentHashMap) {
            if (concurrentHashMapConcurrentHashMap.containsKey(topic)) {
                concurrentHashMapConcurrentHashMap.get(topic).put(producerKey, value);
            } else {
                concurrentHashMapConcurrentHashMap.put(topic, new ConcurrentHashMap<String, String>());
                concurrentHashMapConcurrentHashMap.get(topic).put(producerKey, value);
            }
        }
    }
}

class PutThread implements Runnable{
    private volatile ConcurrentHashMap<String , ConcurrentHashMap<String , String>> concurrentHashMapConcurrentHashMap ;
    private volatile AtomicInteger atomicInteger ;
    private Lock lock ;
    private Condition condition ;

    public PutThread(ConcurrentHashMap<String, ConcurrentHashMap<String, String>> concurrentHashMapConcurrentHashMap, AtomicInteger atomicInteger, Lock lock, Condition condition) {
        this.concurrentHashMapConcurrentHashMap = concurrentHashMapConcurrentHashMap;
        this.atomicInteger = atomicInteger;
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        for(int i=0 ; i<50 ; i++){
            String id = Thread.currentThread().getName()+i ;
            Test.put("topic" , id , "xx" , concurrentHashMapConcurrentHashMap);
        }
        atomicInteger.addAndGet(1) ;

        lock.lock();
        condition.signal();
        lock.unlock();
    }
}

这段代码有什么问题吗?
为什么会有not ...打印呢?高并发场景下必显
java 版本是:java version "1.7.0_76"
clipboard.png

阅读 8.5k
2 个回答

线程池复用导致key 重复了。。。

新手上路,请多包涵
executor.execute(new PutThread(concurrentHashMapConcurrentHashMap ,atomicInteger , lock , condition));

这个并不能保证执行用的是不同线程,比如线程A将第一个PutThread已经执行完毕,第二个PutThread还是被分配到线程A执行
那么:

String id = Thread.currentThread().getName()+i ;
Test.put("topic" , id , "xx" , concurrentHashMapConcurrentHashMap);

此时两次执行id均为A1-A50
也就是说,第二个PutThread把第一个PutThread覆盖掉了
Map.put(A1,xx)执行两次它的size是1,这个应该懂吧

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题