关于Java无锁RingBuffer的线程安全问题

一个极简RingBuffer的实现。

不考虑多线程读/多线程写。

public class RingBuffer {
    private final static int BUFFER_SIZE = 1024;
    private String[] buffer = new String[BUFFER_SIZE];
    private int tail = 0;
    private int head = 0;

    public boolean put(String v) {
        if ((head + 1) % BUFFER_SIZE == tail) {
            return false;
        }
        buffer[head] = v;
        head = (head + 1) % BUFFER_SIZE;
        return true;
    }

    public String get() {
        if (tail == head) {
            return null;
        }
        String result = buffer[tail];
        tail = (tail + 1) % BUFFER_SIZE;
        return result;
    }
}

我的测试用例如下

import org.junit.Before;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class RingBufferTest {

    private RingBuffer ringbuffer;

    @Before
    public void setup() {
        ringbuffer = new RingBuffer();
    }

    @Test
    public void test_put() throws InterruptedException {
        Thread read = new Thread(new RingBufferTestRunnable("read"));
        Thread write = new Thread(new RingBufferTestRunnable("write"));
        read.start();
        write.start();
        read.join();
        write.join();
    }

    class RingBufferTestRunnable implements Runnable {
        private String type;
        private static final int MAX = 100000;

        RingBufferTestRunnable(String type) {
            this.type = type;
        }

        @Override
        public void run() {
            if (type.equals("write")) {
                int counter = 0;
                while (true) {
                    if (ringbuffer.put(String.valueOf(counter))) {
                        counter++;
                    }
                    if (counter == MAX) {
                        break;
                    }
                }

            } else {
                int counter = 0;
                List<String> list = new ArrayList<>();
                while (true) {
                    String temp;
                    if ((temp = ringbuffer.get()) != null) {
                        counter++;
                        list.add(temp);
                    }
                    if (counter == MAX) {
                        break;
                    }
                }
                checkDuplication(list);
            }
        }

        private void checkDuplication(List<String> list) {
            Set<String> set = new HashSet<>();
            for (String s : list) {
                if (!set.add(s)) {
                    throw new IllegalStateException("found duplication");
                }
            }
            System.out.println("duplication not found");
        }
    }
}

我们不期望一个线程的修改对于另一个线程立即可见,所以生产线程观察到的tail在没有任何同步措施的情况下有可能是滞后的,所以对于生产线程来说,有可能在RingBuffer没有满的情况下观察到(head + 1) % BUFFER_SIZE == tail,同理消费线程也有可能在RingBuffer不为空的情况下观察到tail == head
另外理论上没有任何同步措施的前提下,一个线程观察buffer这个变量是应该有可能观察不到另一个线程的的修改。所以上面的测试用例里会抛出found duplication的异常。请求如何用最小的损耗修改Ringbuffer,使上面的代码不会抛出异常

阅读 3k
1 个回答

RingBuffer在对只有一个线程写和一个线程读的情况下才不需要进行同步保证,但必须保证head和tail的可见性,将head和tail声明为volitale即可。

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题