java技术爱好者

java技术爱好者 查看完整档案

广州编辑广东白云学院  |  机电学院 编辑家里蹲  |  java搬砖爱好者 编辑 me.lovebilibili.com 编辑
编辑

公众号:java技术爱好者
掘金:https://juejin.im/user/448256...
个人博客:https://me.lovebilibili.com/

个人动态

java技术爱好者 发布了文章 · 2020-09-13

八种经典排序算法,给你总结了,带动画演示

思维导图

文章已收录Github精选,欢迎Starhttps://github.com/yehongzhi/learningSummary

前言

算法和数据结构是一个程序员的内功,所以经常在一些笔试中都会要求手写一些简单的排序算法,以此考验面试者的编程水平。下面我就简单介绍八种常见的排序算法,一起学习一下。

一、冒泡排序

思路:

  • 比较相邻的元素。如果第一个比第二个大,就交换它们两个;
  • 对每一对相邻元素作同样的工作,从开始第一对到结尾的最后一对,这样在最后的元素就是最大的数;
  • 排除最大的数,接着下一轮继续相同的操作,确定第二大的数...
  • 重复步骤1-3,直到排序完成。

动画演示:

实现代码:

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name BubbleSort
 * @date 2020-09-05 21:38
 **/
public class BubbleSort extends BaseSort {
    
    public static void main(String[] args) {
        BubbleSort sort = new BubbleSort();
        sort.printNums();
    }
    
    @Override
    protected void sort(int[] nums) {
        if (nums == null || nums.length < 2) {
            return;
        }
        for (int i = 0; i < nums.length - 1; i++) {
            for (int j = 0; j < nums.length - i - 1; j++) {
                if (nums[j] > nums[j + 1]) {
                    int temp = nums[j];
                    nums[j] = nums[j + 1];
                    nums[j + 1] = temp;
                }
            }
        }
    }
}
//10万个数的数组,耗时:21554毫秒

平均时间复杂度:O(n²)

空间复杂度:O(1)

算法稳定性:稳定

二、插入排序

思路:

  1. 从第一个元素开始,该元素可以认为已经被排序;
  2. 取出下一个元素,在前面已排序的元素序列中,从后向前扫描;
  3. 如果该元素(已排序)大于新元素,将该元素移到下一位置;
  4. 重复步骤3,直到找到已排序的元素小于或者等于新元素的位置;
  5. 将新元素插入到该位置后;
  6. 重复步骤2~5。

动画演示:

实现代码:

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name InsertSort
 * @date 2020-09-05 22:34
 **/
public class InsertSort extends BaseSort {
    public static void main(String[] args) {
        BaseSort sort = new InsertSort();
        sort.printNums();
    }
    @Override
    protected void sort(int[] nums) {
        if (nums == null || nums.length < 2) {
            return;
        }
        for (int i = 0; i < nums.length - 1; i++) {
            //当前值
            int curr = nums[i + 1];
            //上一个数的指针
            int preIndex = i;
            //在数组中找到一个比当前遍历的数小的第一个数
            while (preIndex >= 0 && curr < nums[preIndex]) {
                //把比当前遍历的数大的数字往后移动
                nums[preIndex + 1] = nums[preIndex];
                //需要插入的数的下标往前移动
                preIndex--;
            }
            //插入到这个数的后面
            nums[preIndex + 1] = curr;
        }
    }
}
//10万个数的数组,耗时:2051毫秒

平均时间复杂度:O(n²)

空间复杂度:O(1)

算法稳定性:稳定

三、选择排序

思路:

第一轮,找到最小的元素,和数组第一个数交换位置。

第二轮,找到第二小的元素,和数组第二个数交换位置...

直到最后一个元素,排序完成。

动画演示:

实现代码:

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name SelectSort
 * @date 2020-09-06 22:27
 **/
public class SelectSort extends BaseSort {
    public static void main(String[] args) {
        SelectSort sort = new SelectSort();
        sort.printNums();
    }
    @Override
    protected void sort(int[] nums) {
        for (int i = 0; i < nums.length; i++) {
            int minIndex = i;
            for (int j = i + 1; j < nums.length; j++) {
                if (nums[j] < nums[minIndex]) {
                    minIndex = j;
                }
            }
            if (minIndex != i) {
                int temp = nums[i];
                nums[minIndex] = temp;
                nums[i] = nums[minIndex];
            }
        }
    }
}
//10万个数的数组,耗时:8492毫秒

平均时间复杂度:O(n²)

算法空间复杂度:O(1)

算法稳定性:不稳定

四、希尔排序

思路:

把数组分割成若干(h)个小组(一般数组长度length/2),然后对每一个小组分别进行插入排序。每一轮分割的数组的个数逐步缩小,h/2->h/4->h/8,并且进行排序,保证有序。当h=1时,则数组排序完成。

动画演示:

实现代码:

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name SelectSort
 * @date 2020-09-06 22:27
 **/
public class ShellSort extends BaseSort {

    public static void main(String[] args) {
        ShellSort sort = new ShellSort();
        sort.printNums();
    }

    @Override
    protected void sort(int[] nums) {
        if (nums == null || nums.length < 2) {
            return;
        }
        int length = nums.length;
        int temp;
        //步长
        int gap = length / 2;
        while (gap > 0) {
            for (int i = gap; i < length; i++) {
                temp = nums[i];
                int preIndex = i - gap;
                while (preIndex >= 0 && nums[preIndex] > temp) {
                    nums[preIndex + gap] = nums[preIndex];
                    preIndex -= gap;
                }
                nums[preIndex + gap] = temp;
            }
            gap /= 2;
        }
    }
}
//10万个数的数组,耗时:261毫秒

平均时间复杂度:O(nlog2n)

算法空间复杂度:O(1)

算法稳定性:稳定

五、快速排序

快排,面试最喜欢问的排序算法。这是运用分治法的一种排序算法。

思路:

  1. 从数组中选一个数做为基准值,一般选第一个数,或者最后一个数。
  2. 采用双指针(头尾两端)遍历,从左往右找到比基准值大的第一个数,从右往左找到比基准值小的第一个数,交换两数位置,直到头尾指针相等或头指针大于尾指针,把基准值与头指针的数交换。这样一轮之后,左边的数就比基准值小,右边的数就比基准值大。
  3. 对左边的数列,重复上面1,2步骤。对右边重复1,2步骤。
  4. 左右两边数列递归结束后,排序完成。

动画演示:

实现代码:

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name SelectSort
 * @date 2020-09-06 22:27
 **/
public class QuickSort extends BaseSort {

    public static void main(String[] args) {
        QuickSort sort = new QuickSort();
        sort.printNums();
    }

    @Override
    protected void sort(int[] nums) {
        if (nums == null || nums.length < 2) {
            return;
        }
        quickSort(nums, 0, nums.length - 1);
    }

    private void quickSort(int[] nums, int star, int end) {
        if (star > end) {
            return;
        }
        int i = star;
        int j = end;
        int key = nums[star];
        while (i < j) {
            while (i < j && nums[j] > key) {
                j--;
            }
            while (i < j && nums[i] <= key) {
                i++;
            }
            if (i < j) {
                int temp = nums[i];
                nums[i] = nums[j];
                nums[j] = temp;
            }
        }
        nums[star] = nums[i];
        nums[i] = key;
        quickSort(nums, star, i - 1);
        quickSort(nums, i + 1, end);
    }
}
//10万个数的数组,耗时:50毫秒

平均时间复杂度:O(nlogn)

算法空间复杂度:O(1)

算法稳定性:不稳定

六、归并排序

归并排序是采用分治法的典型应用,而且是一种稳定的排序方式,不过需要使用到额外的空间。

思路:

  1. 把数组不断划分成子序列,划成长度只有2或者1的子序列。
  2. 然后利用临时数组,对子序列进行排序,合并,再把临时数组的值复制回原数组。
  3. 反复操作1~2步骤,直到排序完成。

归并排序的优点在于最好情况和最坏的情况的时间复杂度都是O(nlogn),所以是比较稳定的排序方式。

动画演示:

实现代码:

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name MergeSort
 * @date 2020-09-08 23:30
 **/
public class MergeSort extends BaseSort {

    public static void main(String[] args) {
        MergeSort sort = new MergeSort();
        sort.printNums();
    }

    @Override
    protected void sort(int[] nums) {
        if (nums == null || nums.length < 2) {
            return;
        }
        //归并排序
        mergeSort(0, nums.length - 1, nums, new int[nums.length]);
    }

    private void mergeSort(int star, int end, int[] nums, int[] temp) {
        //递归终止条件
        if (star >= end) {
            return;
        }
        int mid = star + (end - star) / 2;
        //左边进行归并排序
        mergeSort(star, mid, nums, temp);
        //右边进行归并排序
        mergeSort(mid + 1, end, nums, temp);
        //合并左右
        merge(star, end, mid, nums, temp);
    }

    private void merge(int star, int end, int mid, int[] nums, int[] temp) {
        int index = 0;
        int i = star;
        int j = mid + 1;
        while (i <= mid && j <= end) {
            if (nums[i] > nums[j]) {
                temp[index++] = nums[j++];
            } else {
                temp[index++] = nums[i++];
            }
        }
        while (i <= mid) {
            temp[index++] = nums[i++];
        }
        while (j <= end) {
            temp[index++] = nums[j++];
        }
        //把临时数组中已排序的数复制到nums数组中
        if (index >= 0) System.arraycopy(temp, 0, nums, star, index);
    }
}
//10万个数的数组,耗时:26毫秒

//10万个数的数组,耗时:26毫秒

平均时间复杂度:O(nlogn)

算法空间复杂度:O(n)

算法稳定性:稳定

七、堆排序

大顶堆概念:每个节点的值都大于或者等于它的左右子节点的值,所以顶点的数就是最大值。

思路:

  1. 对原数组构建成大顶堆。
  2. 交换头尾值,尾指针索引减一,固定最大值。
  3. 重新构建大顶堆。
  4. 重复步骤2~3,直到最后一个元素,排序完成。

构建大顶堆的思路,可以看代码注释。

动画演示:

实现代码:

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name HeapSort
 * @date 2020-09-08 23:34
 **/
public class HeapSort extends BaseSort {

    public static void main(String[] args) {
        HeapSort sort = new HeapSort();
        sort.printNums();
    }

    @Override
    protected void sort(int[] nums) {
        if (nums == null || nums.length < 2) {
            return;
        }
        heapSort(nums);
    }

    private void heapSort(int[] nums) {
        if (nums == null || nums.length < 2) {
            return;
        }
        //构建大根堆
        createTopHeap(nums);
        int size = nums.length;
        while (size > 1) {
            //大根堆的交换头尾值,固定最大值在末尾
            swap(nums, 0, size - 1);
            //末尾的索引值往左减1
            size--;
            //重新构建大根堆
            updateHeap(nums, size);
        }
    }

    private void createTopHeap(int[] nums) {
        for (int i = 0; i < nums.length; i++) {
            //当前插入的索引
            int currIndex = i;
            //父节点的索引
            int parentIndex = (currIndex - 1) / 2;
            //如果当前遍历的值比父节点大的话,就交换值。然后继续往上层比较
            while (nums[currIndex] > nums[parentIndex]) {
                //交换当前遍历的值与父节点的值
                swap(nums, currIndex, parentIndex);
                //把父节点的索引指向当前遍历的索引
                currIndex = parentIndex;
                //往上计算父节点索引
                parentIndex = (currIndex - 1) / 2;
            }
        }
    }

    private void updateHeap(int[] nums, int size) {
        int index = 0;
        //左节点索引
        int left = 2 * index + 1;
        //右节点索引
        int right = 2 * index + 2;
        while (left < size) {
            //最大值的索引
            int largestIndex;
            //如果右节点大于左节点,则最大值索引指向右子节点索引
            if (right < size && nums[left] < nums[right]) {
                largestIndex = right;
            } else {
                largestIndex = left;
            }
            //如果父节点大于最大值,则把父节点索引指向最大值索引
            if (nums[index] > nums[largestIndex]) {
                largestIndex = index;
            }
            //如果父节点索引指向最大值索引,证明已经是大根堆,退出循环
            if (largestIndex == index) {
                break;
            }
            //如果不是大根堆,则交换父节点的值
            swap(nums, largestIndex, index);
            //把最大值的索引变成父节点索引
            index = largestIndex;
            //重新计算左节点索引
            left = 2 * index + 1;
            //重新计算右节点索引
            right = 2 * index + 2;
        }
    }

    private void swap(int[] nums, int i, int j) {
        int temp = nums[i];
        nums[i] = nums[j];
        nums[j] = temp;
    }
}
//10万个数的数组,耗时:38毫秒

平均时间复杂度:O(nlogn)

算法空间复杂度:O(1)

算法稳定性:不稳定

八、桶排序

思路:

  1. 找出最大值,最小值。
  2. 根据数组的长度,创建出若干个桶。
  3. 遍历数组的元素,根据元素的值放入到对应的桶中。
  4. 对每个桶的元素进行排序(可使用快排,插入排序等)。
  5. 按顺序合并每个桶的元素,排序完成。

对于数组中的元素分布均匀的情况,排序效率较高。相反的,如果分布不均匀,则会导致大部分的数落入到同一个桶中,使效率降低。

动画演示(来源于五分钟学算法,侵删):

实现代码:

/**
 * @author Ye Hongzhi 公众号:java技术爱好者
 * @name BucketSort
 * @date 2020-09-08 23:37
 **/
public class BucketSort extends BaseSort {

    public static void main(String[] args) {
        BucketSort sort = new BucketSort();
        sort.printNums();
    }

    @Override
    protected void sort(int[] nums) {
        if (nums == null || nums.length < 2) {
            return;
        }
        bucketSort(nums);
    }

    public void bucketSort(int[] nums) {
        if (nums == null || nums.length < 2) {
            return;
        }
        //找出最大值,最小值
        int max = Integer.MIN_VALUE;
        int min = Integer.MAX_VALUE;
        for (int num : nums) {
            min = Math.min(min, num);
            max = Math.max(max, num);
        }
        int length = nums.length;
        //桶的数量
        int bucketCount = (max - min) / length + 1;
        int[][] bucketArrays = new int[bucketCount][];
        //遍历数组,放入桶内
        for (int i = 0; i < length; i++) {
            //找到桶的下标
            int index = (nums[i] - min) / length;
            //添加到指定下标的桶里,并且使用插入排序排序
            bucketArrays[index] = insertSortArrays(bucketArrays[index], nums[i]);
        }
        int k = 0;
        //合并全部桶的
        for (int[] bucketArray : bucketArrays) {
            if (bucketArray == null || bucketArray.length == 0) {
                continue;
            }
            for (int i : bucketArray) {
                //把值放回到nums数组中
                nums[k++] = i;
            }
        }
    }

    //每个桶使用插入排序进行排序
    private int[] insertSortArrays(int[] arr, int num) {
        if (arr == null || arr.length == 0) {
            return new int[]{num};
        }
        //创建一个temp数组,长度是arr数组的长度+1
        int[] temp = new int[arr.length + 1];
        //把传进来的arr数组,复制到temp数组
        for (int i = 0; i < arr.length; i++) {
            temp[i] = arr[i];
        }
        //找到一个位置,插入,形成新的有序的数组
        int i;
        for (i = temp.length - 2; i >= 0 && temp[i] > num; i--) {
            temp[i + 1] = temp[i];
        }
        //插入需要添加的值
        temp[i + 1] = num;
        //返回
        return temp;
    }
}
//10万个数的数组,耗时:8750毫秒

平均时间复杂度:O(M+N)

算法空间复杂度:O(M+N)

算法稳定性:稳定(取决于桶内的排序算法,这里使用的是插入排序所以是稳定的)。

总结

动画演示来源于算法学习网站:https://visualgo.net

讲完这些排序算法后,可能有人会问学这些排序算法有什么用呢,难道就为了应付笔试面试?平时开发也没用得上这些。

我觉得我们应该换个角度来看,比如高中时我们学物理,化学,数学,那么多公式定理,现在也没怎么用得上,但是高中课本为什么要教这些呢?

我的理解是:第一,普及一些常识性的问题。第二,锻炼思维,提高解决问题的能力。第三,为了区分人才。

回到学排序算法有什么用的问题上,实际上也一样。这些最基本的排序算法就是一些常识性的问题,作为开发者应该了解掌握。同时也锻炼了编程思维,其中包含有双指针,分治,递归等等的思想。最后在面试中体现出来的就是人才的划分,懂得这些基本的排序算法当然要比不懂的人要更有竞争力。

建议大家看完之后,能找时间动手写一下,加深理解。

上面所有例子的代码都上传Github了:

https://github.com/yehongzhi/mall

觉得有用就点个赞吧,你的点赞是我创作的最大动力~

拒绝做一条咸鱼,我是一个努力让大家记住的程序员。我们下期再见!!!

能力有限,如果有什么错误或者不当之处,请大家批评指正,一起学习交流!
查看原文

赞 0 收藏 0 评论 0

java技术爱好者 发布了文章 · 2020-09-04

从秒杀聊到ZooKeeper分布式锁

思维导图

文章已收录到Github精选,欢迎Star
https://github.com/yehongzhi/learningSummary

前言

经过《ZooKeeper入门》后,我们学会了ZooKeeper的基本用法。

实际上ZooKeeper的应用是非常广泛的,实现分布式锁只是其中一种。接下来我们就ZooKeeper实现分布式锁解决秒杀超卖问题进行展开。

一、什么是秒杀超卖问题

秒杀活动应该都不陌生,不用过多解释。

不难想象,在这种"秒杀"的场景中,实际上会出现多个用户争抢"资源"的情况,也就是多个线程同时并发,这种情况是很容易出现数据不准确,也就是超卖问题

1.1 项目演示

下面使用程序演示,我使用了SpringBoot2.0、Mybatis、Mybatis-Plus、SpringMVC搭建了一个简单的项目,github地址:

https://github.com/yehongzhi/...

创建一个商品信息表:

CREATE TABLE `tb_commodity_info` (
  `id` varchar(32) NOT NULL,
  `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
  `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
  `number` int(10) DEFAULT '0' COMMENT '商品数量',
  `description` varchar(2048) DEFAULT '' COMMENT '商品描述',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

添加一个商品[叉烧包]进去:

核心的代码逻辑是这样的:

    @Override
    public boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
        //1.先查询数据库中商品的数量
        TbCommodityInfo commodityInfo = commodityInfoMapper.selectById(commodityId);
        //2.判断商品数量是否大于0,或者购买的数量大于库存
        Integer count = commodityInfo.getNumber();
        if (count <= 0 || number > count) {
            //商品数量小于或者等于0,或者购买的数量大于库存,则返回false
            return false;
        }
        //3.如果库存数量大于0,并且购买的数量小于或者等于库存。则更新商品数量
        count -= number;
        commodityInfo.setNumber(count);
        boolean bool = commodityInfoMapper.updateById(commodityInfo) == 1;
        if (bool) {
            //如果更新成功,则打印购买商品成功
            System.out.println("购买商品[ " + commodityInfo.getCommodityName() + " ]成功,数量为:" + number);
        }
        return bool;
    }

逻辑示意图如下:

上面这个逻辑,如果单线程请求的话是没有问题的。

但是多线程的话就出现问题了。现在我就创建多个线程,通过HttpClient进行请求,看会发生什么:

    public static void main(String[] args) throws Exception {
        //请求地址
        String url = "http://localhost:8080/mall/commodity/purchase";
        //请求参数,商品ID,数量
        Map<String, String> map = new HashMap<>();
        map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
        map.put("number", "1");
        //创建10个线程通过HttpClient进行发送请求,测试
        for (int i = 0; i < 10; i++) {
            //这个线程的逻辑仅仅是发送请求
            CommodityThread commodityThread = new CommodityThread(url, map);
            commodityThread.start();
        }
    }

说明一下,叉烧包的数量是100,这里有10个线程同时去购买,假设都购买成功的话,库存数量应该是90。

实际上,10个线程的确都购买成功了:

但是数据库的商品库存,却不准确:

二、尝试使用本地锁

上面的场景,大概流程如下所示:

可以看出问题的关键在于两个线程"同时"去查询剩余的库存,然后更新库存导致的。要解决这个问题,其实只要保证多个线程在这段逻辑是顺序执行即可,也就是加锁

本地锁JDK提供有两种:synchronized和Lock锁。

两种方式都可以,我这里为了简便,使用synchronized:

    //使用synchronized修饰方法
    @Override
    public synchronized boolean purchaseCommodityInfo(String commodityId, Integer number) throws Exception {
        //省略...
    }

然后再测试刚刚多线程并发抢购的情况,看看结果:

问题得到解决!!!

你以为事情就这样结束了吗,看了看进度条,发现事情并不简单。

我们知道在实际项目中,往往不会只部署一台服务器,所以不妨我们启动两台服务器,端口号分别是8080、8081,模拟实际项目的场景:

写一个交替请求的测试脚本,模拟多台服务器分别处理请求,用户秒杀抢购的场景:

    public static void main(String[] args) throws Exception {
        //请求地址
        String url = "http://localhost:%s/mall/commodity/purchase";
        //请求参数,商品ID,数量
        Map<String, String> map = new HashMap<>();
        map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
        map.put("number", "1");
        //创建10个线程通过HttpClient进行发送请求,测试
        for (int i = 0; i < 10; i++) {
            //8080、8081交替请求,每个服务器处理5个请求
            String port = "808" + (i % 2);
            CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
            commodityThread.start();
        }
    }

首先看购买的情况,肯定都是购买成功的:

关键是库存数量是否正确:

有10个请求购买成功,库存应该是90才对,这里库存是95。事实证明本地锁是不能解决多台服务器秒杀抢购出现超卖的问题

为什么会这样呢,请看示意图:

其实和多线程问题是差不多的原因,多个服务器去查询数据库,获取到相同的库存,然后更新库存,导致数据不正确。要保证库存的数量正确,关键在于多台服务器要保证只能一台服务器在执行这段逻辑,也就是要加分布式锁。

这也体现出分布式锁的作用,就是要保证多台服务器只能有一台服务器执行。

分布式锁有三种实现方式,分别是redis、ZooKeeper、数据库(比如mysql)。

三、使用ZooKeeper实现分布式锁

3.1 原理

实际上是利用ZooKeeper的临时顺序节点的特性实现分布式锁。怎么实现呢?

假设现在有一个客户端A,需要加锁,那么就在"/Lock"路径下创建一个临时顺序节点。然后获取"/Lock"下的节点列表,判断自己的序号是否是最小的,如果是最小的序号,则加锁成功!

现在又有另一个客户端,客户端B需要加锁,那么也是在"/Lock"路径下创建临时顺序节点。依然获取"/Lock"下的节点列表,判断自己的节点序号是否最小的。发现不是最小的,加锁失败,接着对自己的上一个节点进行监听。

怎么释放锁呢,其实就是把临时节点删除。假设客户端A释放锁,把节点01删除了。那就会触发节点02的监听事件,客户端就再次获取节点列表,然后判断自己是否是最小的序号,如果是最小序号则加锁。

如果多个客户端其实也是一样,一上来就会创建一个临时节点,然后开始判断自己是否是最小的序号,如果不是就监听上一个节点,形成一种排队的机制。也就形成了锁的效果,保证了多台服务器只有一台执行。

假设其中有一个客户端宕机了,根据临时节点的特点,ZooKeeper会自动删除对应的临时节点,相当于自动释放了锁。

3.2 手写代码实现分布式锁

首先加入Maven依赖

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>
<dependency>
    <groupId>com.101tec</groupId>
    <artifactId>zkclient</artifactId>
    <version>0.4</version>
</dependency>

接着按照上面分析的思路敲代码,创建ZkLock类:

public class ZkLock implements Lock {
    //计数器,用于加锁失败时,阻塞
    private static CountDownLatch cdl = new CountDownLatch(1);
    //ZooKeeper服务器的IP端口
    private static final String IP_PORT = "127.0.0.1:2181";
    //锁的根路径
    private static final String ROOT_NODE = "/Lock";
    //上一个节点的路径
    private volatile String beforePath;
    //当前上锁的节点路径
    private volatile String currPath;
    //创建ZooKeeper客户端
    private ZkClient zkClient = new ZkClient(IP_PORT);

    public ZkLock() {
        //判断是否存在根节点
        if (!zkClient.exists(ROOT_NODE)) {
            //不存在则创建
            zkClient.createPersistent(ROOT_NODE);
        }
    }
    
    //加锁
    public void lock() {
        if (tryLock()) {
            System.out.println("加锁成功!!");
        } else {
            // 尝试加锁失败,进入等待 监听
            waitForLock();
            // 再次尝试加锁
            lock();
        }

    }
    
    //尝试加锁
    public synchronized boolean tryLock() {
        // 第一次就进来创建自己的临时节点
        if (StringUtils.isBlank(currPath)) {
            currPath = zkClient.createEphemeralSequential(ROOT_NODE + "/", "lock");
        }
        // 对节点排序
        List<String> children = zkClient.getChildren(ROOT_NODE);
        Collections.sort(children);

        // 当前的是最小节点就返回加锁成功
        if (currPath.equals(ROOT_NODE + "/" + children.get(0))) {
            return true;
        } else {
            // 不是最小节点 就找到自己的前一个 依次类推 释放也是一样
            int beforePathIndex = Collections.binarySearch(children, currPath.substring(ROOT_NODE.length() + 1)) - 1;
            beforePath = ROOT_NODE + "/" + children.get(beforePathIndex);
            //返回加锁失败
            return false;
        }
    }
    
    //解锁
    public void unlock() {
        //删除节点并关闭客户端
        zkClient.delete(currPath);
        zkClient.close();
    }
    
    //等待上锁,加锁失败进入阻塞,监听上一个节点
    private void waitForLock() {
        IZkDataListener listener = new IZkDataListener() {
            //监听节点更新事件
            public void handleDataChange(String s, Object o) throws Exception {
            }

            //监听节点被删除事件
            public void handleDataDeleted(String s) throws Exception {
                //解除阻塞
                cdl.countDown();
            }
        };
        // 监听上一个节点
        this.zkClient.subscribeDataChanges(beforePath, listener);
        //判断上一个节点是否存在
        if (zkClient.exists(beforePath)) {
            //上一个节点存在
            try {
                System.out.println("加锁失败 等待");
                //加锁失败,阻塞等待
                cdl.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 释放监听
        zkClient.unsubscribeDataChanges(beforePath, listener);
    }

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    public void lockInterruptibly() throws InterruptedException {
    }

    public Condition newCondition() {
        return null;
    }
}

在Controller层加上锁:

    @PostMapping("/purchase")
    public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId, @RequestParam(name = "number") Integer number) throws Exception {
        boolean bool;
        //获取ZooKeeper分布式锁
        ZkLock zkLock = new ZkLock();
        try {
            //上锁
            zkLock.lock();
            //调用秒杀抢购的service方法
            bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
        } catch (Exception e) {
            e.printStackTrace();
            bool = false;
        } finally {
            //解锁
            zkLock.unlock();
        }
        return bool;
    }

测试,依然起两台服务器,8080、8081。然后跑测试脚本:

    public static void main(String[] args) throws Exception {
        //请求地址
        String url = "http://localhost:%s/mall/commodity/purchase";
        //请求参数,商品ID,数量
        Map<String, String> map = new HashMap<>();
        map.put("commodityId", "4f863bb5266b9508e0c1f28c61ea8de1");
        map.put("number", "1");
        //创建10个线程通过HttpClient进行发送请求,测试
        for (int i = 0; i < 10; i++) {
            //8080、8081交替请求
            String port = "808" + (i % 2);
            CommodityThread commodityThread = new CommodityThread(String.format(url, port), map);
            commodityThread.start();
        }
    }

结果正确:

3.3 造好的轮子

Curator是Apache开源的一个操作ZooKeeper的框架。其中就有实现ZooKeeper分布式锁的功能。

当然分布式锁的实现只是这个框架的其中一个很小的部分,除此之外还有很多用途,大家可以到官网去学习。

首先添加Maven依赖:

    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>
        <version>4.3.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-recipes</artifactId>
        <version>4.3.0</version>
    </dependency>

还是一样在需要加锁的地方进行加锁:

    @PostMapping("/purchase")
    public boolean purchaseCommodityInfo(@RequestParam(name = "commodityId") String commodityId,
                                         @RequestParam(name = "number") Integer number) throws Exception {
        boolean bool = false;
        //设置重试策略
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", retryPolicy);
        // 启动客户端
        client.start();
        InterProcessMutex mutex = new InterProcessMutex(client, "/locks");
        try {
            //加锁
            if (mutex.acquire(3, TimeUnit.SECONDS)) {
                //调用抢购秒杀service方法
                bool = commodityInfoService.purchaseCommodityInfo(commodityId, number);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //解锁
            mutex.release();
            client.close();
        }
        return bool;
    }

四、遇到的坑

我尝试用原生的ZooKeeper写分布式锁,有点炸裂。遇到不少坑,最终放弃了,用zkclient的API。可能我太菜了不太会用。

下面我分享我遇到的一些问题,希望你们在遇到同类型的异常时能迅速定位问题。

4.1 Session expired

这个错误是使用原生ZooKeeper的API出现的错误。主要是我在进入debug模式进行调试出现的。

因为原生的ZooKeeper需要设定一个会话超时时间,一般debug模式我们都会卡在一个地方去调试,肯定就超出了设置的会话时间~

4.2 KeeperErrorCode = ConnectionLoss

这个也是原生ZooKeeper的API的错误,怎么出现的呢?

主要是创建的ZooKeeper客户端连接服务器时是异步的,由于连接需要时间,还没连接成功,代码已经开始执行create()或者exists(),然后就报这个错误。

解决方法:使用CountDownLatch计数器阻塞,连接成功后再停止阻塞,然后执行create()或者exists()等操作。

4.3 并发查询更新出现数据不一致

这个错误真的太炸裂了~

一开始我是把分布式锁加在service层,然后以为搞定了。接着启动8080、8081进行并发测试。10个线程都是购买成功,结果居然是不正确!

第一反应觉得自己实现的代码有问题,于是换成curator框架实现的分布式锁,开源框架应该没问题了吧。没想到还是不行~

既然不是锁本身的问题,是不是事务问题。上一个事务更新库存的操作还没提交,然后下一个请求就进来查询。于是我就把加锁的范围放大一点,放在Controller层。居然成功了!

你可能已经注意到,我在上面的例子就是把分布式锁加在Controller层,其实我不太喜欢在Controller层写太多代码。

也许有更加优雅的方式,可惜本人能力不足,如果你有更好的实现方式,可以分享一下~

补充:下面评论有位大佬说,在原来的方法外再包裹一层,亲测是可以的。这应该是事务的问题。

上面放在Controller层可以成功是不是因为Controller层没有事务,原来写在service我是写了一个@Transactional注解在类上,所以整个类里面的都有事务,所以解锁后还没提交事务去更新数据库,然后下一个请求进来就查到了没更新的数据。

为了优雅一点,就把@Transactional注解放在抢购的service方法上

然后再包裹一个没有事务的方法,用于上锁。

五、总结

最后,我们回顾总结一下吧:

  • 首先我们模拟单机多线程的秒杀场景,单机的话可以使用本地锁解决问题。
  • 接着模拟多服务器多线程的场景,思路是使用ZooKeeper实现分布式锁解决。
  • 图解ZooKeeper实现分布式锁的原理。
  • 然后动手写代码,实现分布式锁。
  • 最后总结遇到的坑。

希望这篇文章对你有用,觉得有用就点个赞吧~

查看原文

赞 3 收藏 2 评论 1

java技术爱好者 发布了文章 · 2020-09-01

ZooKeeper入门,看这篇就够了

思维导图

前言

在很多时候,我们都可以在各种框架应用中看到ZooKeeper的身影,比如Kafka中间件,Dubbo框架,Hadoop等等。为什么到处都看到ZooKeeper?

一、什么是ZooKeeper

ZooKeeper是一个分布式服务协调框架,提供了分布式数据一致性的解决方案,基于ZooKeeper的数据结构,Watcher,选举机制等特点,可以实现数据的发布/订阅,软负载均衡,命名服务,统一配置管理,分布式锁,集群管理等等。

二、为什么使用ZooKeeper

ZooKeeper能保证:

  • 更新请求顺序进行。来自同一个client的更新请求按其发送顺序依次执行
  • 数据更新原子性。一次数据更新要么成功,要么失败
  • 全局唯一数据视图。client无论连接到哪个server,数据视图都是一致的
  • 实时性。在一定时间范围内,client读到的数据是最新的

三、数据结构

ZooKeeper的数据结构和Unix文件系统很类似,总体上可以看做是一棵树,每一个节点称之为一个ZNode,每一个ZNode默认能存储1M的数据。每一个ZNode可通过唯一的路径标识。如下图所示:

创建ZNode时,可以指定以下四种类型,包括:

  • PERSISTENT,持久性ZNode。创建后,即使客户端与服务端断开连接也不会删除,只有客户端主动删除才会消失。
  • PERSISTENT_SEQUENTIAL,持久性顺序编号ZNode。和持久性节点一样不会因为断开连接后而删除,并且ZNode的编号会自动增加。
  • EPHEMERAL,临时性ZNode。客户端与服务端断开连接,该ZNode会被删除。
  • EPEMERAL_SEQUENTIAL,临时性顺序编号ZNode。和临时性节点一样,断开连接会被删除,并且ZNode的编号会自动增加。

四、监听通知机制

Watcher是基于观察者模式实现的一种机制。如果我们需要实现当某个ZNode节点发生变化时收到通知,就可以使用Watcher监听器。

客户端通过设置监视点(watcher)向 ZooKeeper 注册需要接收通知的 znode,在 znode 发生变化时 ZooKeeper 就会向客户端发送消息

这种通知机制是一次性的。一旦watcher被触发,ZooKeeper就会从相应的存储中删除。如果需要不断监听ZNode的变化,可以在收到通知后再设置新的watcher注册到ZooKeeper。

监视点的类型有很多,如监控ZNode数据变化、监控ZNode子节点变化、监控ZNode 创建或删除

五、选举机制

ZooKeeper是一个高可用的应用框架,因为ZooKeeper是支持集群的。ZooKeeper在集群状态下,配置文件是不会指定Master和Slave,而是在ZooKeeper服务器初始化时就在内部进行选举,产生一台做为Leader,多台做为Follower,并且遵守半数可用原则。

由于遵守半数可用原则,所以5台服务器和6台服务器,实际上最大允许宕机数量都是3台,所以为了节约成本,集群的服务器数量一般设置为奇数

如果在运行时,如果长时间无法和Leader保持连接的话,则会再次进行选举,产生新的Leader,以保证服务的可用

六、初の体验

首先在官网下载ZooKeeper,我这里用的是3.3.6版本。

然后解压,复制一下/conf目录下的zoo_sample.cfg文件,重命名为zoo.cfg。

修改zoo.cfg中dataDir的值,并创建对应的目录:

最后到/bin目录下启动,我用的是window系统,所以启动zkServer.cmd,双击即可:

启动成功的话就可以看到这个对话框:

可视化界面的话,我推荐使用ZooInspector,操作比较简便:

6.1 使用java连接ZooKeeper

首先引入Maven依赖:

<dependency>
    <groupId>org.apache.zookeeper</groupId>
    <artifactId>zookeeper</artifactId>
    <version>3.4.6</version>
</dependency>

接着我们写一个Main方法,进行操作:

    //连接地址及端口号
    private static final String SERVER_HOST = "127.0.0.1:2181";

    //会话超时时间
    private static final int SESSION_TIME_OUT = 2000;

    public static void main(String[] args) throws Exception {
        //参数一:服务端地址及端口号
        //参数二:超时时间
        //参数三:监听器
        ZooKeeper zooKeeper = new ZooKeeper(SERVER_HOST, SESSION_TIME_OUT, new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                //获取事件的状态
                Event.KeeperState state = watchedEvent.getState();
                //判断是否是连接事件
                if (Event.KeeperState.SyncConnected == state) {
                    Event.EventType type = watchedEvent.getType();
                    if (Event.EventType.None == type) {
                        System.out.println("zk客户端已连接...");
                    }
                }
            }
        });
        zooKeeper.create("/java", "Hello World".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println("新增ZNode成功");
        zooKeeper.close();
    }

创建一个持久性ZNode,路径是/java,值为"Hello World":

七、API概述

7.1 创建

public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)

参数解释:

  • path ZNode路径
  • data ZNode存储的数据
  • acl ACL权限控制
  • createMode ZNode类型

ACL权限控制,有三个是ZooKeeper定义的常用权限,在ZooDefs.Ids类中:

/**
 * This is a completely open ACL.
 * 完全开放的ACL,任何连接的客户端都可以操作该属性znode
 */
public final ArrayList<ACL> OPEN_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, ANYONE_ID_UNSAFE)));

/**
 * This ACL gives the creators authentication id's all permissions.
 * 只有创建者才有ACL权限
 */
public final ArrayList<ACL> CREATOR_ALL_ACL = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.ALL, AUTH_IDS)));

/**
 * This ACL gives the world the ability to read.
 * 只能读取ACL
 */
public final ArrayList<ACL> READ_ACL_UNSAFE = new ArrayList<ACL>(Collections.singletonList(new ACL(Perms.READ, ANYONE_ID_UNSAFE)));

createMode就是前面讲过的四种ZNode类型:

public enum CreateMode {
    /**
     * 持久性ZNode
     */
    PERSISTENT (0, false, false),
    /**
     * 持久性自动增加顺序号ZNode
     */
    PERSISTENT_SEQUENTIAL (2, false, true),
    /**
     * 临时性ZNode
     */
    EPHEMERAL (1, true, false),
    /**
     * 临时性自动增加顺序号ZNode
     */
    EPHEMERAL_SEQUENTIAL (3, true, true);
}

7.2 查询

//同步获取节点数据
public byte[] getData(String path, boolean watch, Stat stat){
    ...
}

//异步获取节点数据
public void getData(final String path, Watcher watcher, DataCallback cb, Object ctx){
    ...
}

同步getData()方法中的stat参数是用于接收返回的节点描述信息:

public byte[] getData(final String path, Watcher watcher, Stat stat){
    //省略...
    GetDataResponse response = new GetDataResponse();
    //发送请求到ZooKeeper服务器,获取到response
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (stat != null) {
        //把response的Stat赋值到传入的stat中
        DataTree.copyStat(response.getStat(), stat);
    }
}

使用同步getData()获取数据:

    //数据的描述信息,包括版本号,ACL权限,子节点信息等等
    Stat stat = new Stat();
    //返回结果是byte[]数据,getData()方法底层会把描述信息复制到stat对象中
    byte[] bytes = zooKeeper.getData("/java", false, stat);
    //打印结果
    System.out.println("ZNode的数据data:" + new String(bytes));//Hello World
    System.out.println("获取到dataVersion版本号:" + stat.getVersion());//默认数据版本号是0

7.3 更新

public Stat setData(final String path, byte data[], int version){
    ...
}

值得注意的是第三个参数version,使用CAS机制,这是为了防止多个客户端同时更新节点数据,所以需要在更新时传入版本号,每次更新都会使版本号+1,如果服务端接收到版本号,对比发现不一致的话,则会抛出异常。

所以,在更新前需要先查询获取到版本号,否则你不知道当前版本号是多少,就没法更新:

    //获取节点描述信息
    Stat stat = new Stat();
    zooKeeper.getData("/java", false, stat);
    System.out.println("更新ZNode数据...");
    //更新操作,传入路径,更新值,版本号三个参数,返回结果是新的描述信息
    Stat setData = zooKeeper.setData("/java", "fly!!!".getBytes(), stat.getVersion());
    System.out.println("更新后的版本号为:" + setData.getVersion());//更新后的版本号为:1

更新后,版本号增加了:

如果传入的版本参数是"-1",就是告诉zookeeper服务器,客户端需要基于数据的最新版本进行更新操作。但是-1并不是一个合法的版本号,而是一个标识符。

7.4 删除

public void delete(final String path, int version){
    ...
}
  • path 删除节点的路径
  • version 版本号

这里也需要传入版本号,调用getData()方法即可获取到版本号,很简单:

Stat stat = new Stat();
zooKeeper.getData("/java", false, stat);
//删除ZNode
zooKeeper.delete("/java", stat.getVersion());

7.5 watcher机制

在上面第三点提到,ZooKeeper是可以使用通知监听机制,当ZNode发生变化会收到通知消息,进行处理。基于watcher机制,ZooKeeper能玩出很多花样。怎么使用?

ZooKeeper的通知监听机制,总的来说可以分为三个过程:

①客户端注册 Watcher
②服务器处理 Watcher
③客户端回调 Watcher客户端。

注册 watcher 有 4 种方法,new ZooKeeper()、getData()、exists()、getChildren()。下面演示一下使用exists()方法注册watcher:

首先需要实现Watcher接口,新建一个监听器:

public class MyWatcher implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        //获取事件类型
        Event.EventType eventType = event.getType();
        //通知状态
        Event.KeeperState eventState = event.getState();
        //节点路径
        String eventPath = event.getPath();
        System.out.println("监听到的事件类型:" + eventType.name());
        System.out.println("监听到的通知状态:" + eventState.name());
        System.out.println("监听到的ZNode路径:" + eventPath);
    }
}

然后调用exists()方法,注册监听器:

zooKeeper.exists("/java", new MyWatcher());
//对ZNode进行更新数据的操作,触发监听器
zooKeeper.setData("/java", "fly".getBytes(), -1);

然后在控制台就可以看到打印的信息:

这里我们可以看到Event.EventType对象就是事件类型,我们可以对事件类型进行判断,再配合Event.KeeperState通知状态,做相关的业务处理,事件类型有哪些?

打开EventType、KeeperState的源码查看:

//事件类型是一个枚举
public enum EventType {
    None (-1),//无
    NodeCreated (1),//Watcher监听的数据节点被创建
    NodeDeleted (2),//Watcher监听的数据节点被删除
    NodeDataChanged (3),//Watcher监听的数据节点内容发生变更
    NodeChildrenChanged (4);//Watcher监听的数据节点的子节点列表发生变更
}

//通知状态也是一个枚举
public enum KeeperState {
    Unknown (-1),//属性过期
    Disconnected (0),//客户端与服务端断开连接
    NoSyncConnected (1),//属性过期
    SyncConnected (3),//客户端与服务端正常连接
    AuthFailed (4),//身份认证失败
    ConnectedReadOnly (5),//返回这个状态给客户端,客户端只能处理读请求
    SaslAuthenticated(6),//服务器采用SASL做校验时
    Expired (-112);//会话session失效
}

7.5.1 watcher的特性

  • 一次性。一旦watcher被触发,ZK都会从相应的存储中移除。
    zooKeeper.exists("/java", new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println("我是exists()方法的监听器");
        }
    });
    //对ZNode进行更新数据的操作,触发监听器
    zooKeeper.setData("/java", "fly".getBytes(), -1);
    //企图第二次触发监听器
    zooKeeper.setData("/java", "spring".getBytes(), -1);

  • 串行执行。客户端Watcher回调的过程是一个串行同步的过程,这是为了保证顺序。
    zooKeeper.exists("/java", new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println("我是exists()方法的监听器");
        }
    });
    Stat stat = new Stat();
    zooKeeper.getData("/java", new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            System.out.println("我是getData()方法的监听器");
        }
    }, stat);
    //对ZNode进行更新数据的操作,触发监听器
    zooKeeper.setData("/java", "fly".getBytes(), stat.getVersion());

打印结果,说明先调用exists()方法的监听器,再调用getData()方法的监听器。因为exists()方法先注册了。

  • 轻量级。WatchedEvent是ZK整个Watcher通知机制的最小通知单元。WatchedEvent包含三部分:通知状态,事件类型,节点路径。Watcher通知仅仅告诉客户端发生了什么事情,而不会说明事件的具体内容。
查看原文

赞 0 收藏 0 评论 0

java技术爱好者 发布了文章 · 2020-09-01

超详细Netty入门,看这篇就够了!

思维导图

本文已收录到Github精选,欢迎Starhttps://github.com/yehongzhi/...

前言

本文主要讲述Netty框架的一些特性以及重要组件,希望看完之后能对Netty框架有一个比较直观的感受,希望能帮助读者快速入门Netty,减少一些弯路。

一、Netty概述

官方的介绍:

Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.

Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端

二、为什么使用Netty

从官网上介绍,Netty是一个网络应用程序框架,开发服务器和客户端。也就是用于网络编程的一个框架。既然是网络编程,Socket就不谈了,为什么不用NIO呢?

2.1 NIO的缺点

对于这个问题,之前我写了一篇文章《NIO入门》对NIO有比较详细的介绍,NIO的主要问题是:

  • NIO的类库和API繁杂,学习成本高,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等。
  • 需要熟悉Java多线程编程。这是因为NIO编程涉及到Reactor模式,你必须对多线程和网络编程非常熟悉,才能写出高质量的NIO程序。
  • 臭名昭著的epoll bug。它会导致Selector空轮询,最终导致CPU 100%。直到JDK1.7版本依然没得到根本性的解决。

2.2 Netty的优点

相对地,Netty的优点有很多:

  • API使用简单,学习成本低。
  • 功能强大,内置了多种解码编码器,支持多种协议。
  • 性能高,对比其他主流的NIO框架,Netty的性能最优。
  • 社区活跃,发现BUG会及时修复,迭代版本周期短,不断加入新的功能。
  • Dubbo、Elasticsearch都采用了Netty,质量得到验证。

三、架构图

上面这张图就是在官网首页的架构图,我们从上到下分析一下。

绿色的部分Core核心模块,包括零拷贝、API库、可扩展的事件模型。

橙色部分Protocol Support协议支持,包括Http协议、webSocket、SSL(安全套接字协议)、谷歌Protobuf协议、zlib/gzip压缩与解压缩、Large File Transfer大文件传输等等。

红色的部分Transport Services传输服务,包括Socket、Datagram、Http Tunnel等等。

以上可看出Netty的功能、协议、传输方式都比较全,比较强大。

四、永远的Hello Word

首先搭建一个HelloWord工程,先熟悉一下API,还有为后面的学习做铺垫。以下面这张图为依据:

4.1 引入Maven依赖

使用的版本是4.1.20,相对比较稳定的一个版本。

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.20.Final</version>
</dependency>

4.2 创建服务端启动类

public class MyServer {
    public static void main(String[] args) throws Exception {
        //创建两个线程组 boosGroup、workerGroup
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            //创建服务端的启动对象,设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            //设置两个线程组boosGroup和workerGroup
            bootstrap.group(bossGroup, workerGroup)
                //设置服务端通道实现类型    
                .channel(NioServerSocketChannel.class)
                //设置线程队列得到连接个数    
                .option(ChannelOption.SO_BACKLOG, 128)
                //设置保持活动连接状态    
                .childOption(ChannelOption.SO_KEEPALIVE, true)
                //使用匿名内部类的形式初始化通道对象    
                .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            //给pipeline管道设置处理器
                            socketChannel.pipeline().addLast(new MyServerHandler());
                        }
                    });//给workerGroup的EventLoop对应的管道设置处理器
            System.out.println("java技术爱好者的服务端已经准备就绪...");
            //绑定端口号,启动服务端
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            //对关闭通道进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

4.3 创建服务端处理器

/**
 * 自定义的Handler需要继承Netty规定好的HandlerAdapter
 * 才能被Netty框架所关联,有点类似SpringMVC的适配器模式
 **/
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取客户端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到客户端" + ctx.channel().remoteAddress() + "发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //发送消息给客户端
        ctx.writeAndFlush(Unpooled.copiedBuffer("服务端已收到消息,并给你发送一个问号?", CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //发生异常,关闭通道
        ctx.close();
    }
}

4.4 创建客户端启动类

public class MyClient {

    public static void main(String[] args) throws Exception {
        NioEventLoopGroup eventExecutors = new NioEventLoopGroup();
        try {
            //创建bootstrap对象,配置参数
            Bootstrap bootstrap = new Bootstrap();
            //设置线程组
            bootstrap.group(eventExecutors)
                //设置客户端的通道实现类型    
                .channel(NioSocketChannel.class)
                //使用匿名内部类初始化通道
                .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //添加客户端通道的处理器
                            ch.pipeline().addLast(new MyClientHandler());
                        }
                    });
            System.out.println("客户端准备就绪,随时可以起飞~");
            //连接服务端
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
            //对通道关闭进行监听
            channelFuture.channel().closeFuture().sync();
        } finally {
            //关闭线程组
            eventExecutors.shutdownGracefully();
        }
    }
}

4.5 创建客户端处理器

public class MyClientHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //发送消息到服务端
        ctx.writeAndFlush(Unpooled.copiedBuffer("歪比巴卜~茉莉~Are you good~马来西亚~", CharsetUtil.UTF_8));
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //接收服务端发送过来的消息
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("收到服务端" + ctx.channel().remoteAddress() + "的消息:" + byteBuf.toString(CharsetUtil.UTF_8));
    }
}

4.6 测试

先启动服务端,再启动客户端,就可以看到结果:

MyServer打印结果:

MyClient打印结果:

五、Netty的特性与重要组件

5.1 taskQueue任务队列

如果Handler处理器有一些长时间的业务处理,可以交给taskQueue异步处理。怎么用呢,请看代码演示:

public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //获取到线程池eventLoop,添加线程,执行
        ctx.channel().eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //长时间操作,不至于长时间的业务操作导致Handler阻塞
                    Thread.sleep(1000);
                    System.out.println("长时间的业务处理");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

我们打一个debug调试,是可以看到添加进去的taskQueue有一个任务。

5.2 scheduleTaskQueue延时任务队列

延时任务队列和上面介绍的任务队列非常相似,只是多了一个可延迟一定时间再执行的设置,请看代码演示:

ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
        try {
            //长时间操作,不至于长时间的业务操作导致Handler阻塞
            Thread.sleep(1000);
            System.out.println("长时间的业务处理");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
},5, TimeUnit.SECONDS);//5秒后执行

依然打开debug进行调试查看,我们可以有一个scheduleTaskQueue任务待执行中

5.3 Future异步机制

在搭建HelloWord工程的时候,我们看到有一行这样的代码:

ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666);

很多操作都返回这个ChannelFuture对象,究竟这个ChannelFuture对象是用来做什么的呢?

ChannelFuture提供操作完成时一种异步通知的方式。一般在Socket编程中,等待响应结果都是同步阻塞的,而Netty则不会造成阻塞,因为ChannelFuture是采取类似观察者模式的形式进行获取结果。请看一段代码演示:

//添加监听器
channelFuture.addListener(new ChannelFutureListener() {
    //使用匿名内部类,ChannelFutureListener接口
    //重写operationComplete方法
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        //判断是否操作成功    
        if (future.isSuccess()) {
            System.out.println("连接成功");
        } else {
            System.out.println("连接失败");
        }
    }
});

5.4 Bootstrap与ServerBootStrap

Bootstrap和ServerBootStrap是Netty提供的一个创建客户端和服务端启动器的工厂类,使用这个工厂类非常便利地创建启动类,根据上面的一些例子,其实也看得出来能大大地减少了开发的难度。首先看一个类图:

可以看出都是继承于AbstractBootStrap抽象类,所以大致上的配置方法都相同。

一般来说,使用Bootstrap创建启动器的步骤可分为以下几步:

5.4.1 group()

在上一篇文章《Reactor模式》中,我们就讲过服务端要使用两个线程组:

  • bossGroup 用于监听客户端连接,专门负责与客户端创建连接,并把连接注册到workerGroup的Selector中。
  • workerGroup用于处理每一个连接发生的读写事件。

一般创建线程组直接使用以下new就完事了:

EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

有点好奇的是,既然是线程组,那线程数默认是多少呢?深入源码:

    //使用一个常量保存
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    static {
        //NettyRuntime.availableProcessors() * 2,cpu核数的两倍赋值给常量
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        //如果不传入,则使用常量的值,也就是cpu核数的两倍
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

通过源码可以看到,默认的线程数是cpu核数的两倍。假设想自定义线程数,可以使用有参构造器:

//设置bossGroup线程数为1
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
//设置workerGroup线程数为16
EventLoopGroup workerGroup = new NioEventLoopGroup(16);

5.4.2 channel()

这个方法用于设置通道类型,当建立连接后,会根据这个设置创建对应的Channel实例。

使用debug模式可以看到

通道类型有以下:

NioSocketChannel: 异步非阻塞的客户端 TCP Socket 连接。

NioServerSocketChannel: 异步非阻塞的服务器端 TCP Socket 连接。

常用的就是这两个通道类型,因为是异步非阻塞的。所以是首选。

OioSocketChannel: 同步阻塞的客户端 TCP Socket 连接。

OioServerSocketChannel: 同步阻塞的服务器端 TCP Socket 连接。

稍微在本地调试过,用起来和Nio有一些不同,是阻塞的,所以API调用也不一样。因为是阻塞的IO,几乎没什么人会选择使用Oio,所以也很难找到例子。我稍微琢磨了一下,经过几次报错之后,总算调通了。代码如下:
//server端代码,跟上面几乎一样,只需改三个地方
//这个地方使用的是OioEventLoopGroup
EventLoopGroup bossGroup = new OioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup)//只需要设置一个线程组boosGroup
        .channel(OioServerSocketChannel.class)//设置服务端通道实现类型

//client端代码,只需改两个地方
//使用的是OioEventLoopGroup
EventLoopGroup eventExecutors = new OioEventLoopGroup();
//通道类型设置为OioSocketChannel
bootstrap.group(eventExecutors)//设置线程组
        .channel(OioSocketChannel.class)//设置客户端的通道实现类型

NioSctpChannel: 异步的客户端 Sctp(Stream Control Transmission Protocol,流控制传输协议)连接。
NioSctpServerChannel: 异步的 Sctp 服务器端连接。

本地没启动成功,网上看了一些网友的评论,说是只能在linux环境下才可以启动。从报错信息看:SCTP not supported on this platform,不支持这个平台。因为我电脑是window系统,所以网友说的有点道理。

5.4.3 option()与childOption()

首先说一下这两个的区别。

option()设置的是服务端用于接收进来的连接,也就是boosGroup线程。

childOption()是提供给父管道接收到的连接,也就是workerGroup线程。

搞清楚了之后,我们看一下常用的一些设置有哪些:

SocketChannel参数,也就是childOption()常用的参数:

SO_RCVBUF Socket参数,TCP数据接收缓冲区大小。
TCP_NODELAY TCP参数,立即发送数据,默认值为Ture。
SO_KEEPALIVE Socket参数,连接保活,默认值为False。启用该功能时,TCP会主动探测空闲连接的有效性。

ServerSocketChannel参数,也就是option()常用参数:

SO_BACKLOG Socket参数,服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝。默认值,Windows为200,其他为128。

由于篇幅限制,其他就不列举了,大家可以去网上找资料看看,了解一下。

5.4.4 设置流水线(重点)

ChannelPipeline是Netty处理请求的责任链,ChannelHandler则是具体处理请求的处理器。实际上每一个channel都有一个处理器的流水线。

在Bootstrap中childHandler()方法需要初始化通道,实例化一个ChannelInitializer,这时候需要重写initChannel()初始化通道的方法,装配流水线就是在这个地方进行。代码演示如下:

//使用匿名内部类的形式初始化通道对象
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        //给pipeline管道设置自定义的处理器
        socketChannel.pipeline().addLast(new MyServerHandler());
    }
});

处理器Handler主要分为两种:

ChannelInboundHandlerAdapter(入站处理器)、ChannelOutboundHandler(出站处理器)

入站指的是数据从底层java NIO Channel到Netty的Channel。

出站指的是通过Netty的Channel来操作底层的java NIO Channel。

ChannelInboundHandlerAdapter处理器常用的事件有

  1. 注册事件 fireChannelRegistered。
  2. 连接建立事件 fireChannelActive。
  3. 读事件和读完成事件 fireChannelRead、fireChannelReadComplete。
  4. 异常通知事件 fireExceptionCaught。
  5. 用户自定义事件 fireUserEventTriggered。
  6. Channel 可写状态变化事件 fireChannelWritabilityChanged。
  7. 连接关闭事件 fireChannelInactive。

ChannelOutboundHandler处理器常用的事件有

  1. 端口绑定 bind。
  2. 连接服务端 connect。
  3. 写事件 write。
  4. 刷新时间 flush。
  5. 读事件 read。
  6. 主动断开连接 disconnect。
  7. 关闭 channel 事件 close。
还有一个类似的handler(),主要用于装配parent通道,也就是bossGroup线程。一般情况下,都用不上这个方法。

5.4.5 bind()

提供用于服务端或者客户端绑定服务器地址和端口号,默认是异步启动。如果加上sync()方法则是同步。

有五个同名的重载方法,作用都是用于绑定地址端口号。不一一介绍了。

5.4.6 优雅地关闭EventLoopGroup

//释放掉所有的资源,包括创建的线程
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();

会关闭所有的child Channel。关闭之后,释放掉底层的资源。

5.5 Channel

Channel是什么?不妨看一下官方文档的说明:

A nexus to a network socket or a component which is capable of I/O operations such as read, write, connect, and bind

翻译大意:一种连接到网络套接字或能进行读、写、连接和绑定等I/O操作的组件。

如果上面这段说明比较抽象,下面还有一段说明:

A channel provides a user:

the current state of the channel (e.g. is it open? is it connected?),
the configuration parameters of the channel (e.g. receive buffer size),
the I/O operations that the channel supports (e.g. read, write, connect, and bind), and
the ChannelPipeline which handles all I/O events and requests associated with the channel.

翻译大意:

channel为用户提供:

  1. 通道当前的状态(例如它是打开?还是已连接?)
  2. channel的配置参数(例如接收缓冲区的大小)
  3. channel支持的IO操作(例如读、写、连接和绑定),以及处理与channel相关联的所有IO事件和请求的ChannelPipeline。

5.5.1 获取channel的状态

boolean isOpen(); //如果通道打开,则返回true
boolean isRegistered();//如果通道注册到EventLoop,则返回true
boolean isActive();//如果通道处于活动状态并且已连接,则返回true
boolean isWritable();//当且仅当I/O线程将立即执行请求的写入操作时,返回true。

以上就是获取channel的四种状态的方法。

5.5.2 获取channel的配置参数

获取单条配置信息,使用getOption(),代码演示:

ChannelConfig config = channel.config();//获取配置参数
//获取ChannelOption.SO_BACKLOG参数,
Integer soBackLogConfig = config.getOption(ChannelOption.SO_BACKLOG);
//因为我启动器配置的是128,所以我这里获取的soBackLogConfig=128

获取多条配置信息,使用getOptions(),代码演示:

ChannelConfig config = channel.config();
Map<ChannelOption<?>, Object> options = config.getOptions();
for (Map.Entry<ChannelOption<?>, Object> entry : options.entrySet()) {
    System.out.println(entry.getKey() + " : " + entry.getValue());
}
/**
SO_REUSEADDR : false
WRITE_BUFFER_LOW_WATER_MARK : 32768
WRITE_BUFFER_WATER_MARK : WriteBufferWaterMark(low: 32768, high: 65536)
SO_BACKLOG : 128
以下省略...
*/

5.5.3 channel支持的IO操作

写操作,这里演示从服务端写消息发送到客户端:

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.channel().writeAndFlush(Unpooled.copiedBuffer("这波啊,这波是肉蛋葱鸡~", CharsetUtil.UTF_8));
}

客户端控制台:

//收到服务端/127.0.0.1:6666的消息:这波啊,这波是肉蛋葱鸡~

连接操作,代码演示:

ChannelFuture connect = channelFuture.channel().connect(new InetSocketAddress("127.0.0.1", 6666));//一般使用启动器,这种方式不常用

通过channel获取ChannelPipeline,并做相关的处理:

//获取ChannelPipeline对象
ChannelPipeline pipeline = ctx.channel().pipeline();
//往pipeline中添加ChannelHandler处理器,装配流水线
pipeline.addLast(new MyServerHandler());

5.6 Selector

在NioEventLoop中,有一个成员变量selector,这是nio包的Selector,在之前《NIO入门》中,我已经讲过Selector了。

Netty中的Selector也和NIO的Selector是一样的,就是用于监听事件,管理注册到Selector中的channel,实现多路复用器。

5.7 PiPeline与ChannelPipeline

在前面介绍Channel时,我们知道可以在channel中装配ChannelHandler流水线处理器,那一个channel不可能只有一个channelHandler处理器,肯定是有很多的,既然是很多channelHandler在一个流水线工作,肯定是有顺序的。

于是pipeline就出现了,pipeline相当于处理器的容器。初始化channel时,把channelHandler按顺序装在pipeline中,就可以实现按序执行channelHandler了。

在一个Channel中,只有一个ChannelPipeline。该pipeline在Channel被创建的时候创建。ChannelPipeline包含了一个ChannelHander形成的列表,且所有ChannelHandler都会注册到ChannelPipeline中。

5.8 ChannelHandlerContext

在Netty中,Handler处理器是有我们定义的,上面讲过通过集成入站处理器或者出站处理器实现。这时如果我们想在Handler中获取pipeline对象,或者channel对象,怎么获取呢。

于是Netty设计了这个ChannelHandlerContext上下文对象,就可以拿到channel、pipeline等对象,就可以进行读写等操作。

通过类图,ChannelHandlerContext是一个接口,下面有三个实现类。

实际上ChannelHandlerContext在pipeline中是一个链表的形式。看一段源码就明白了:

//ChannelPipeline实现类DefaultChannelPipeline的构造器方法
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    //设置头结点head,尾结点tail
    tail = new TailContext(this);
    head = new HeadContext(this);
    
    head.next = tail;
    tail.prev = head;
}

下面我用一张图来表示,会更加清晰一点:

5.9 EventLoopGroup

我们先看一下EventLoopGroup的类图:

其中包括了常用的实现类NioEventLoopGroup。OioEventLoopGroup在前面的例子中也有使用过。

从Netty的架构图中,可以知道服务器是需要两个线程组进行配合工作的,而这个线程组的接口就是EventLoopGroup。

每个EventLoopGroup里包括一个或多个EventLoop,每个EventLoop中维护一个Selector实例。

5.9.1 轮询机制的实现原理

我们不妨看一段DefaultEventExecutorChooserFactory的源码:

private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;

@Override
public EventExecutor next() {
    //idx.getAndIncrement()相当于idx++,然后对任务长度取模
    return executors[idx.getAndIncrement() & executors.length - 1];
}

这段代码可以确定执行的方式是轮询机制,接下来debug调试一下:

它这里还有一个判断,如果线程数不是2的N次方,则采用取模算法实现。

@Override
public EventExecutor next() {
    return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}

写在最后

参考Netty官网文档:API文档

创作不易,觉得有用就点个赞吧。

本文已收录到Github精选,欢迎Starhttps://github.com/yehongzhi/...
查看原文

赞 0 收藏 0 评论 0

java技术爱好者 关注了专栏 · 2020-08-16

SegmentFault 思否观察

SegmentFault 思否对开发者行业的洞见、观察与报道

关注 26737

java技术爱好者 关注了用户 · 2020-08-16

飞狐 @feihu

专注AI量化,Julia、Flux.jl、终端TensorFlow.js

关注 1044

java技术爱好者 关注了专栏 · 2020-08-16

力扣加加

努力做西湖区最好的算法题解

关注 3244

java技术爱好者 关注了用户 · 2020-08-16

why技术 @whyjishu

欢迎关注公众号【why技术】。在这里我会分享一些技术相关的东西,主攻java方向,用匠心敲代码,对每一行代码负责。偶尔也会荒腔走板的聊一聊生活,写一写书评,影评。愿你我共同进步。

关注 4754

java技术爱好者 关注了用户 · 2020-08-16

江南一点雨 @lenve

《Spring Boot+Vue全栈开发实战》作者
公众号:江南一点雨
微信:a_java_boy
专注于Spring Boot、Spring Cloud、前端Vue等技术

关注 2790

java技术爱好者 关注了标签 · 2020-08-16

程序员

一种近几十年来出现的新物种,是工业革命的产物。英文(Programmer Monkey)是一种非常特殊的、可以从事程序开发、维护的动物。一般分为程序设计猿和程序编码猿,但两者的界限并不非常清楚,都可以进行开发、维护工作,特别是在中国,而且最重要的一点,二者都是一种非常悲剧的存在。

国外的程序员节

国外的程序员节,(英语:Programmer Day,俄语:День программи́ста)是一个俄罗斯官方节日,日期是每年的第 256(0x100) 天,也就是平年的 9 月 13 日和闰年的 9 月 12 日,选择 256 是因为它是 2 的 8 次方,比 365 少的 2 的最大幂。

1024程序员节,中国程序员节

1024是2的十次方,二进制计数的基本计量单位之一。程序员(英文Programmer)是从事程序开发、维护的专业人员。程序员就像是一个个1024,以最低调、踏实、核心的功能模块搭建起这个科技世界。1GB=1024M,而1GB与1级谐音,也有一级棒的意思。

从2012年,SegmentFault 创办开始我们就从网络上引导社区的开发者,发展成中国程序员的节日 :) 计划以后每年10月24日定义为程序员节。以一个节日的形式,向通过Coding 改变世界,也以实际行动在浮躁的世界里,固执地坚持自己对于知识、技术和创新追求的程序员们表示致敬。并于之后的最为临近的周末为程序员们举行了一个盛大的狂欢派对。

2015的10月24日,我们SegmentFault 也在5个城市同时举办黑客马拉松这个特殊的形式,聚集开发者开一个编程大爬梯。

特别推荐:

【SF 黑客马拉松】:http://segmentfault.com/hacka...
【1024程序员闯关秀】小游戏,欢迎来挑战 http://segmentfault.com/game/

  • SF 开发者交流群:206236214
  • 黑客马拉松交流群:280915731
  • 开源硬件交流群:372308136
  • Android 开发者交流群:207895295
  • iOS 开发者交流群:372279630
  • 前端开发者群:174851511

欢迎开发者加入~

交流群信息


程序员相关问题集锦:

  1. 《程序员如何选择自己的第二语言》
  2. 《如何成为一名专业的程序员?》
  3. 《如何用各种编程语言书写hello world》
  4. 《程序员们最常说的谎话是什么?》
  5. 《怎么加入一个开源项目?》
  6. 《是要精于单挑,还是要善于合作?》
  7. 《来秀一下你屎一般的代码...》
  8. 《如何区分 IT 青年的“普通/文艺/二逼”属性?》
  9. 程序员必读书籍有哪些?
  10. 你经常访问的技术社区或者技术博客(IT类)有哪些?
  11. 如何一行代码弄崩你的程序?我先来一发
  12. 编程基础指的是什么?
  13. 后端零起步:学哪一种比较好?
  14. 大家都用什么键盘写代码的?

爱因斯坦

程序猿崛起

关注 145711

认证与成就

  • 获得 3 次点赞
  • 获得 0 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 0 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

注册于 2020-08-16
个人主页被 275 人浏览