Natasha

Natasha 查看完整档案

珠海编辑北京师范大学珠海分校  |  软件工程 编辑珠海格力电器股份有限公司  |  大数据开发工程师 编辑 segmentfault.com/u/natasha 编辑
编辑
_ | |__ _ _ __ _ | '_ \| | | |/ _` | | |_) | |_| | (_| | |_.__/ \__,_|\__, | |___/ 个人简介什么都没有

个人动态

Natasha 关注了用户 · 2020-11-06

小明的数据脚印 @xiaomingdeshujujiaoyin

关注 11

Natasha 赞了文章 · 2020-11-06

HDFS主要流程

HDFS的几个典型的流程:客户端读HDFS文件流程、客户端写HDFS文件流程、客户端追加写HDFS文件流程、数据节点与名字节点交互流程以及HDFS HA切换流程等。

一、客户端读HDFS文件流程
  1. 打开HDFS文件:HDFS客户端首先调用DistributedFileSystem.open()方法打开HDFS文件,这个方法在底层会调用ClientProtocol.open()方法,该方法会返回一个HdfsDataInputStream对象用于读取数据块。HdfsDataInputStream其实是一个DFSInputStream的装饰类,真正进行数据块读取操作的是DFSInputStream对象。
  2. 从Namenode获取Datanode地址:在DFSInputStream的构造方法中,会调用ClientProtocol.getBlockLocations()方法向名字节点获取该HDFS文件起始位置数据块的位置信息。Namenode返回的数据块的存储位置是按照与客户端的距离远近排序的,所以DFSInputStream可以选择一个最优的Datanode节点,然后与这个节点建立数据连接读取数据块。
  3. 连接到Datanode读取数据块:HDFS客户端通过调用DFSInputStream.read()方法从这个最优的Datanode读取数据块,数据会以数据包(packet)为单位从数据节点通过流式接口传送到客户端。当达到一个数据块的末尾时,DFSInputStream就会再次调用ClientProtocol.getBlockLocations()获取文件下一个数据块的位置信息,并建立和这个新的数据块的最优节点之间的连接,然后HDFS客户端就可以继续读取数据块了。
  4. 关闭输入流:当客户端成功完成文件读取后,会通过HdfsDataInputStream.close()方法关闭输入流。

流程图如下:

QQ图片20200916122252.png

客户端读取数据块时,很有可能存储这个数据块的数据节点出现异常,也就是无法读取数据。出现这种情况时,DFSInputStream会切换到另一个保存了这个数据块副本的数据节点然后读取数据。同时需要注意的是,数据块的应答包中不仅包含了数据,还包含了校验值。HDFS客户端接收到数据应答包时,会对数据进行校验,如果出现校验错误,也就是数据节点上的这个数据块副本出现了损坏,HDFS客户端就会通过ClientProtocol.reportBadBlocks()向Namenode汇报这个损坏的数据块副本,同时DFSInputStream会尝试从其他的数据节点读取这个数据块。

二、客户端写HDFS文件流程
  1. 创建文件: HDFS客户端写-一个 新的文件时,会首先调用DistributedFileSystem.create()方法在HDFS文件系统中创建一个新的空文件。这个方法在底层会通过调用ClientProtocol.create()方法通知Namenode执行对应的操作, Namenode会首先在文件系统目录树中的指定路径下添加一个新的文件,然后将创建新文件的操作记录到editlog中。完成ClientProtocol.create()调用后, DistributedFileSystem.create()方法就会返回一个HdfsDataOutputStream对象,这个对象在底层包装了一个DFSOutputStream对象,真正执行写数据操作的其实是DFSOutputStream对象。
  2. 建立数据流管道:获取了DFSOutputStream对象后,HDFS客户端就可以调用 DFSOutputStream.write()方法来写数据了。由于DistributedFileSystem.create()方法只是在文件系统目录树中创建了一个空文件,并没有申请任何数据块,所以DFSOutputStream会首先调用ClientProtocol.addBlock()向Namenode申请一个新的空数据块,addBlock()方法会返回一个LocatedBlock对象,这个对象保存了存储这个数据块的所有数据节点的位置信息。获得了数据流管道中所有数据节点的信息后,DFSOutputStream就可以建立数据流管道写数据块了。
  3. 通过数据流管道写入数据: 成功地建立数据流管道后,HDFS客户端就可以向数据流管道写数据了。写入DFSOutputStream中的数据会先被缓存在数据流中,之后这些数据会被切分成一个个数据包(packet)通过数据流管道发送到所有数据节点。通过数据流管道依次写入数据节点的本地存储。每个数据包都有个确认包(ack),确认包会逆序通过数据流管道回到输出流。输出流在确认了所有数据节点已经写入这个数据包之后,就会从对应的缓存队列删除这个数据包。当客户端写满一个数据块之后,会调用addBlock()申请一个新的数据块,然后循环执行上述操作。
  4. 关闭输入流并提交文件: 当HDFS客户端完成了整个文件中所有数据块的写操作之后,就可以调用close()方法关闭输出流,并调用ClientProtocol.complete()方法通知Namenode提交这个文件中的所有数据块,也就完成了整个文件的写入流程。

流程图如下:
image.png

对于Datanode,当 Datanode成功地接受一个新的数据块时,Datanode会通过
DatanodeProtocol.blockReceivedAndDeleted()方法向Namenode汇报,Namenode会更新内存中的数据块与数据节点的对应关系。

如果客户端在写文件时,数据流管道中的数据节点出现故障,则输出流会进行如下操作来进行故障恢复。

  1. 输出流中缓存的没有确认的数据包会重新加入发送队列,这种机制确保了数据节点出现故障时不会丢失任何数据,所有的数据都是经过确认的。输出流会通过调用ClientProtocol.updateBlockForPipeline()方法为数据块申请一个新的时间戳,然后使用这个新的时间戳重新建立数据流管道。这种机制保证了故障Datanode上的数据块的时间戳会过期,然后在故障恢复之后,由于数据块的时间戳与Namenode元数据中的不匹配而被删除,保证了集群中所有数据块的正确性。
  2. 故障数据节点会从输入流管道中删除,然后输出流会通过调用ClientProtocol.getAdditionalDatanode()方法通知Namenode分配新的数据节点到数据流管道中。接下来输出流会将新分配的Datanode添加到数据流管道中,并使用新的时间戳重新建立数据流管道。由于新添加的数据节点上并没有存储这个新的数据块,这时HDFS客户端会通过DataTransferProtocol通知数据流管道中的一个Datanode复制这个数据块到新的Datanode上.
  3. 数据流管道重新建立之后,输出流会调用ClientProtocol.updatePipeline()更新Namenode中的元数据。至此,一个完整的故障恢复流程就完成了,客户端可以正常完成后续的写操作了。
三、Datanode启动、心跳以及执行名字节点指令流程

Datanode启动后与 Namenode 的交互主要包括三个部分:①握手;②注册;③块汇报以 及缓存汇报。

  1. Datanode启动时会首先通过DatanodeProtocol.versionRequest()获取Namenode的版本号以及存储信息等,然后Datanode会对Namenode的当前软件版本号和Datanode的当前软件、版本号进行比较,确保它们是一致的。
  2. 成功地完成握手操作后,Datanode会通过DatanodeProtocol.register()方法向Namenode注册。Namenode接收到注册请求后,会判断当前Datanode的配置是否属于这个集群,它们之间的版本号是否一致。
  3. 注册成功之后,Datanode就需要将本地存储的所有数据块以及缓存的数据块上报到Namenode,Namenode会利用这些信息重新建立内存中数据块与Datanode之间的对应关系。

至此,Datanode 就完成了启动的所有操作,之后就可以正常对外服务了。

Datanode成功启动之后,需要定期向Namenode发送心跳,让Namenode知道当前Datanode 处于活动状态能够对外服务。Namenode 会在Datanode 的心跳响应中携带名字节点指令,指导Datanode进行数据块的复制、删除以及恢复等操作。

当Datanode成功地添加了一个新的数据块或者删除了-一个已有的数据块时,需要通过 DatanodeProtocol.blockReceivedAndDeleted(方法向Namenode汇报。Namenode接收到这个汇 报之后,会更新Namenode内存中数据块与数据节点之间的对应关系。

查看原文

赞 1 收藏 0 评论 0

Natasha 发布了文章 · 2020-11-06

(五)Kafka的消费者原理及使用详解

消费者和消费者群组

1. 一个消费者从一个Topic中消费数据 :

img

2. 消费者群组 :

当生产者向 Topic 写入消息的速度超过了现有消费者的处理速度,此时需要对消费者进行横向伸缩,用多个消费者从同一个主题读取消息,对消息进行分流。同一个分区不能被一个组中的多个 consumer 消费。

two

Kafka消费者代码样例

读取Kafka消息只需要创建一个KafkaConsumer,除此之外还需要使用四个基本属性,bootstrap.servers、key.deserializer、value.deserializer和group.id。

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

/**
 * @Author Natasha
 * @Description
 * @Date 2020/11/3 14:14
 **/

public class ConsumerDemo {
    public static void main(String[] args)  {
        Properties properties = new Properties();
        //bootstrap.servers是broker服务器列表
        properties.put("bootstrap.servers", "120.27.233.226:9092");
        //group.id是指是消费者的消费组
        properties.put("group.id", "test");
        //key.deserializer和value.deserializer是用来做反序列化的,也就是将字节数组转换成对象
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);
        //订阅主题
        //kafkaConsumer.subscribe(Collections.singletonList("test"));
        kafkaConsumer.assign(Arrays.asList(new TopicPartition("test",0)));
        try{
            //无限循环消处理数据
            while (true) {
                //不断调用poll拉取数据,如果停止拉取,那么Kafka会认为此消费者已经死亡并进行重平衡
                //参数值100是一个超时时间,指明线程如果没有数据会等待多长时间,0表示不等待立即返回
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                //每条记录包含key/value以及主题、分区、位移信息
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, value = %s\n", record.offset(), record.value());
                }
            }
        } catch (WakeupException e) {
            // ignore for shutdown
        } finally {
            //此方法会提交位移,同时发送一个退出消费组的消息到Kafka的组协调者,组协调者收到消息后会立即进行重平衡而无需等待此消费者会话过期。
            kafkaConsumer.close();
        }
    }
}

提交偏移量

1. 自动提交

最简单的提交方式是让消费者自动提交偏移量,如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll() 方法接收到的最大偏移量提交上去。

可能造成的问题:数据重复读

假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s,所以在这 3s内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。

properties.put("enable.auto.commit", "true");

2. 手动提交

2.1 同步提交

public static void main(String[] args)  {
    Properties properties = new Properties();
    properties.put("bootstrap.servers", "120.27.233.226:9092");
    properties.put("group.id", "test");
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    // 把auto.commit.offset设为false,让应用程序决定何时提交偏移量
    properties.put("auto.commit.offset", false);

    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Collections.singletonList("test"));
    try{
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for(ConsumerRecord<String, String> record : records) {
                System.out.println("value = " + record.value() + ", topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
            }
            try{
                // 只要没有发生不可恢复的错误,commitSync() 方法会一直尝试直至提交成功
                consumer.commitSync();
            }catch(CommitFailedException e) {
                // 如果提交失败,我们也只能把异常记录到错误日志里
                System.err.println("commit  failed!" + e.getMessage());
            }
        }
    }finally {
        consumer.close();
    }
}

2.2 异步提交

手动提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡,会增加重复消息的数量。

这时可以使用异步提交,只管发送提交请求,无需等待 broker 的响应。它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。

假设我们发出一个请求用于提交偏移量2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量3000。如果commitAsync()重新尝试提交偏移量2000,它有可能在偏移量3000之后提交成功。这个时候如果发生再均衡,就会出现重复消息。

try{
    while(true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for(ConsumerRecord<String, String> record : records) {
            System.out.println("value = " + record.value() + ", topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
        }
        // 提交最后一个偏移量,然后继续做其他事情。
        consumer.commitAsync();
    }
}finally {
    consumer.close();
}

commitAsync()也支持回调,在broker作出响应时会执行回调,回调经常被用于记录提交错误或生成度量指标;如果要用它来进行重试,一定要注意提交的顺序。

try{
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for(ConsumerRecord<String, String> record : records) {
                System.out.println("value = " + record.value() + ", topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
            }
            // 支持回调函数,用来记录提交错误等
            consumer.commitAsync(new OffsetCommitCallback() {
                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if(exception != null) {
                        log.error("kafka send msg err, exception = {}, offsets = {}", exception, offsets);
                    }
                }
            });

        }
    }finally {
        consumer.close();
    }
}
可以在回调中重试失败的提交的思路:

使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试。如果序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。

2.3 混合同步提交与异步提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(1000);
        for (ConsumerRecord<String, String> record : records) {
            System.out.println("value = " + record.value() + ", topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
        }
        // 如果一切正常,我们使用 commitAsync() 方法来提交
        // 这样速度更快,而且即使这次提交失败,下一次提交很可能会成功
        consumer.commitAsync();
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    try {
        // 使用 commitSync() 方法会一直重试,直到提交成功或发生无法恢复的错误
        // 确保关闭消费者之前成功提交了偏移量
        consumer.commitSync();
    }finally {
        consumer.close();
    }
}

从特定偏移量开始处理记录

不管是自动提交还是使用commitAsync()或者commitSync()来提交偏移量,提交的都是 poll() 方法返回的那批数据的最大偏移量。KafkaConsumer API 允许在调用 commitSync()commitAsync() 方法允许我们指定特定的位移参数,参数为提交的分区与偏移量的map。因为消费者可能不只读取一个分区,需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records) {
        System.out.println("value = " + record.value() + ", topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
        currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
        if (count % 1000 == 0) {
            // 这里调用的是 commitAsync(),不过调用 commitSync() 也是完全可以的
            // 当然,在提交特定偏移量时,仍然要处理可能发生的错误
            consumer.commitAsync(currentOffsets, null);
        }
        count++;
    }
}

数据库的 Exactly Once 语义的实现思路

当处理 Kafka 中的数据涉及到数据库时:假设把数据存储到数据库后,如果没有来得及提交偏移量程序就因某种原因挂掉了,那么程序再次启动后就会重复处理数据,数据库中会有重复的数据。

如果把存储到数据库和提交偏移量在一个原子操作里完成,就可以避免这样的问题,但数据存到数据库,偏移量保存到kafka是无法实现原子操作的,而如果把数据存储到数据库中,偏移量也存储到数据库中,这样就可以利用数据库的事务来把这两个操作设为一个原子操作,同时结合再均衡监听器就可以实现 Exactly Once 语义,以下为伪代码:

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections<String> topics, new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        // 发生分区再均衡之前,提交事务
        commitDBTransaction();
    }
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // 再均衡之后,从数据库获得消费偏移量
        for(TopicPartition topicPartition : partitions) {
            consumer.seek(topicPartition, getOffsetFromDB(topicPartition));
        }
    }
});

/**
 * 消费之前调用一次 poll(),让消费者加入到消费组中,并获取分配的分区
 * 然后马上调用 seek() 方法定位分区的偏移量
 * seek() 设置消费偏移量,设置的偏移量是从数据库读出来的,说明本次设置的偏移量已经被处理过
 * 下一次调用 poll() 就会在本次设置的偏移量上加1,开始处理没有处理过的数据
 * 如果seek()发生错误,比如偏移量不存在,则会抛出异常
 */
consumer.poll(0);
for(TopicPartition topicPartition : consumer.assignment()) {
    consumer.seek(topicPartition, getOffsetFromDB(topicPartition));
}

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1);
            // 处理数据
            processRecord(record);
            // 把数据存储到数据库中
            storeRecordInDB(record);
            // 提交偏移量
            consumer.commitAsync(currentOffsets, null);
        }
    }
} finally {
    consumer.close();
}

把偏移量和记录保存到用一个外部系统来实现 Exactly Once 有很多方法,但核心思想都是:结合 ConsumerRebalanceListener 和 seek() 方法来确保能够及时保存偏移量,并保证消费者总是能够从正确的位置开始读取消息。

再均衡

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡(Rebalance)。再均衡非常重要,为消费者组带来了高可用性和伸缩性,可以放心的增加或移除消费者。以下是触发再均衡的三种行为:

  1. 当一个 消费者 加入组时,读取了原本由其他消费者读取的分区,会触发再均衡。
  2. 当一个 消费者 离开组时(被关闭或发生崩溃),原本由它读取的分区将被组里的其他 消费者 来读取,会触发再均衡。
  3. 当 Topic 发生变化时,比如添加了新的分区,会发生分区重分配,会触发再均衡。

消费者通过向作为组协调器的 broker发送心跳来维持和群组以及分区的关系。心跳的意思是表明消费者在读取分区里的消息。消费者会在轮询消息或提交偏移量时发送心跳。如果消费者超过一定时间没有发送心跳,会话就会过期,组协调器认为该消费者宕机,会触发再均衡。可以看到,从消费者会话过期到宕机是有一定时间的,这段时间内该消费者的分区都不能进行消息消费。

在 Kafka 0.10.1版本,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响

再均衡监听器

在分区重平衡前,如果消费者知道它即将不再负责某个分区,那么它需要将它已经处理过的消息位移提交。Kafka的API允许我们在消费者新增分区或者失去分区时进行处理,我们只需要在调用subscribe()方法时传入ConsumerRebalanceListener对象,该对象有两个方法:

public void onPartitionRevoked(Collection partitions):此方法会在消费者停止消费后,在重平衡开始前调用。
public void onPartitionAssigned(Collection partitions):此方法在分区分配给消费者后,在消费者开始读取消息前调用。

相关的实战项目具体代码可以看 (四)Kafka 再均衡监听器 实战小例子 ,可以把代码拷下来在本地运行。

序列化和反序列化

Kafka 生产者将对象序列化成字节数组并发送到服务器,消费者需要将字节数组转换成对象(反序列化)。序列化与反序列化需要匹配,与生产者类似,推荐使用 Avro 序列化方式。(关于生产者的序列化可以参考 (三)Kafka的生产者原理及使用详解

Properties props = new Properties();
props.put("bootstrap.servers", "120.27.233.226:9092");
props.put("group.id", "test");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", schemaUrl);
String topic = "test"

KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));

while (true) {
    // 这里使用之前生产者使用的Avro生成的Customer类
    ConsumerRecords<String, Customer> records = consumer.poll(1000);
    for (ConsumerRecord<String, Customer> record: records) {
        System.out.println("Current customer name is: " + record.value().getName());
    }
    consumer.commitSync();
}

Kafka的分区分配过程

  1. 确定群组协调器(GroupCoordinator),每当我们创建一个消费组,kafka 会为我们分配一个 broker 作为该消费组的 coordinator。
  2. 注册消费者 并选出 leader consumer,当我们的有了 coordinator 之后,消费者将会开始往该 coordinator上进行注册,第一个注册的消费者将成为该消费组的 leader,其他的为 follower.
  3. 当 leader 选出来后,他会从coordinator那里实时获取分区 和 consumer信息,并根据分区策略给每个consumer分配分区,并将分配结果告诉 coordinator。
  4. follower 消费者将从 coordinator 那里获取到自己相关的分区信息进行消费,对于所有的 follower 消费者而言, 他们只知道自己消费的分区,并不知道其他消费者的存在。
  5. 至此,消费者都知道自己的消费的分区,分区过程结束,当发生 分区再均衡 的时候,leader 将会重复分配过程。

消费者分区分配策略

1. Range

将partitions的个数除于消费者线程的总数来决定每个消费者线程消费几个分区。如果除不尽,那么前面几个消费者线程将会多消费一个分区。

我们有10个分区,3个消费者线程, 10 / 3 = 3,而且除不尽,那么消费者线程 C1-0 将会多消费一个分区:

  • C1-0 将消费 0, 1, 2, 3 分区
  • C2-0 将消费 4, 5, 6 分区
  • C2-1 将消费 7, 8, 9 分区

2. RoundRobin

RoundRobin策略的工作原理:将所有主题的分区组成 TopicAndPartition 列表,然后对 TopicAndPartition 列表按照 hashCode 进行排序,然后通过轮询方式逐个将分区以此分配给每个消费者。

如按照 hashCode 排序完的topic-partitions组依次为T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我们的消费者线程排序为C1-0, C1-1, C2-0, C2-1,最后分区分配的结果为:

  • C1-0 将消费 T1-5, T1-2, T1-6 分区;
  • C1-1 将消费 T1-3, T1-1, T1-9 分区;
  • C2-0 将消费 T1-0, T1-4 分区;
  • C2-1 将消费 T1-8, T1-7 分区;

消费者拦截器

消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作,只需要实现ConsumerInterceptor类中的方法就可以:

 public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
 public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
 public void close();

它会在poll方法返回之前调用拦截器的onConsume()方法来对消息进行相应的定制化操作,比如修改返回的消息内容,按照某些规则进行过滤数据。

它会在提交完消费位移之后调用拦截器的onCommit()方法,可以使用这个方法来记录跟踪所提交的位移信息。

properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
package interceptor;

import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @Author Natasha
 * @Description
 * @Date 2020/11/4 14:32
 **/
public class MyConsumerInterceptor implements ConsumerInterceptor<String,String> {
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> recs = records.records(partition);
            List<ConsumerRecord<String, String>> newRecs = new ArrayList<>();
            for (ConsumerRecord<String, String> rec : recs) {
                String newValue = "interceptor-" + rec.value();
                ConsumerRecord<String, String> newRec = new ConsumerRecord<>(rec.topic(), rec.partition(), rec.offset(), rec.key(), newValue);
                newRecs.add(newRec);
            }
            newRecords.put(partition, newRecs);
        }
        return new ConsumerRecords<>(newRecords);
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        offsets.forEach((tp, offsetAndMetadata) -> {
            System.out.println(tp + " : " + offsetAndMetadata.offset());
        });
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

独立的消费者

一般情况下我们都是使用消费组(即便只有一个消费者)来消费消息的,因为这样可以在增加或减少消费者时自动进行分区重平衡。这种方式是推荐的方式。

如果在知道主题和分区的情况下,我们也可以使用单个消费者来进行消费。对于这种情况,我们需要的是给消费者分配消费分区,而不是让消费者订阅(成为消费组)主题。

List<PartitionInfo> partitionInfos = null;
List<TopicPartition> partitions = new ArrayList<>();
//主动获取主题下所有的分区。如果你知道所指定的分区,可以跳过这一步
partitionInfos = kafkaConsumer.partitionsFor("rebalance-topic-three-part");

if (partitionInfos != null) {
    for (PartitionInfo partition : partitionInfos){
        partitions.add(new TopicPartition(partition.topic(), partition.partition()));
    }
    //为消费者指定分区
    kafkaConsumer.assign(partitions);
    while (true) {
        ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
        for (ConsumerRecord<String, String> record: records) {
            System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
        }
        kafkaConsumer.commitSync();
    }
}

除了需要主动获取分区以及没有分区重平衡,其他的处理逻辑都是一样的。需要注意的是,如果添加了新的分区,这个消费者是感知不到的,需要通过consumer.partitionsFor()来重新获取分区。

消费者配置

Kafka 与消费者相关的配置大部分参数都有合理的默认值,一般不需要修改,不过有一些参数与消费者的性能和可用性有很大关系。接下来介绍这些重要的属性。

  1. fetch.min.bytes

指定消费者从服务器获取记录的最小字节数。如果服务器在收到消费者的数据小于 fetch.min.bytes,那么会等到有足够的可用数据时才返回给消费者。

合理的设置可以降低消费者和 broker 的工作负载,在 Topic 消息生产不活跃时,减少处理消息次数。

如果没有很多可用数据,但消费者的 CPU 使用率却很高,需要调高该属性的值。

如果消费者的数量比较多,调高该属性的值也可以降低 broker 的工作负载。

  1. fetch.max.wait.ms

指定在 broker 中的等待时间,默认是500ms。如果没有足够的数据流入 Kafka,即数据量没有达到 fetch.min.bytes,500ms后会返回数据给消费者。

fetch.max.wait.msfetch.min.bytes 有一个满足条件就会返回数据。

  1. max.parition.fetch.bytes

指定了服务器从每个分区里的数据返回给消费者的最大字节数,默认值是1MB。

如果一个主题有20个分区和5个消费者(同一个组内),那么每个消费者需要至少4MB 的可用内存(每个消费者读取4个分区)来接收记录。如果组内有消费者发生崩溃,剩下的消费者需要处理更多的分区。

max.parition.fetch.bytes 必须比 broker 能够接收的最大消息的字节数(max.message.size)大,否则消费者可能无法读取这些消息,导致消费者一直重试。

另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用 poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把 max.parition.fetch.bytes 值改小或者延长会话过期时间。
  1. session.timeout.ms

指定了消费者与服务器断开连接的最大时间,默认是3s。如果消费者没有在指定的时间内发送心跳给 GroupCoordinator,就被认为已经死亡,会触发再均衡,把它的分区分配给其他消费者。

该属性与 heartbeat.interval.ms 紧密相关,heartbeat.interval.ms 指定了 poll() 方法向协调器发送心跳的频率,session.timeout.ms 指定了消费者最长多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一,如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 1s。
  1. auto.offset.reset

指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下默认是 latest,另一个值是 earliest,消费者将从起始位置读取分区的记录。

  1. enable.auto.commit

指定了消费者是否自动提交偏移量,默认值是 true,自动提交。如果设为 true,需要通过配置 auto.commit.interval.ms 属性来控制提交的频率。设为 false 可以程序自己控制何时提交偏移量。

  1. partition.assignment.strategy

决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略:

  • Range,把 Topic 的若干个连续的分区分配给消费者。
  • RoundRobin,把所有分区逐个分配给消费者。

默认值是 org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了 Range 策略。

  1. receive.buffer.bytessend.buffer.bytes

分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果设为-1就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

优雅退出

package graceexit;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

/**
 * @Author Natasha
 * @Description
 * @Date 2020/11/4 14:21
 **/
public class QuitConsumer {
    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "120.27.233.226:9092");
        props.put("group.id", "test");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        consumer.subscribe(Arrays.asList("test"));
        final Thread mainThread = Thread.currentThread();

        /*
         * 注册 JVM 关闭时的回调,当 JVM 关闭时调用
         * 退出循环需要通过另一个线程调用consumer.wakeup()方法
         * 调用consumer.wakeup()可以退出poll(),并抛出WakeupException异常
         * 我们不需要处理 WakeupException,因为它只是用于跳出循环的一种方式
         * consumer.wakeup()是消费者唯一一个可以从其他线程里安全调用的方法
         */
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                System.out.println("Starting exit...");
                // 调用消费者的 wakeup 方法通知主线程退出
                consumer.wakeup();
                try {
                    // 主线程继续执行,以便可以关闭consumer,提交偏移量
                    mainThread.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic = " + record.topic() + ", partition = " + record.partition() + ", offset = " + record.offset());
                }
                consumer.commitAsync();
            }
        }catch (WakeupException e) {
            // 不处理异常
        } finally {
            // 在退出线程之前调用consumer.close()是很有必要的,它会提交任何还没有提交的东西,并向组协调器发送消息,告知自己要离开群组。
            // 接下来就会触发再均衡,而不需要等待会话超时。
            consumer.commitSync();
            consumer.close();
            System.out.println("Closed consumer and we are done");
        }
    }
}

github

本文章中相关代码样例已上传 github :https://github.com/ShawnVanorGit/hello_kafka

查看原文

赞 0 收藏 0 评论 0

Natasha 发布了文章 · 2020-11-02

(三)Kafka的生产者原理及使用详解

建议先看上一篇(二)Kafka之集群架构原理,可以让我们更清楚的认识到Kafka的面貌,避免管中窥豹。
今天这篇我们分析Kafka的核心模块——生产者。

Kafka生产者的流程概览

img

1、Kafka生产者会将消息封装成一个 ProducerRecord 向 kafka集群中的某个 topic 发送消息;

2、发送的消息首先会经过序列化器进行序列化,以便在网络中传输;

3、发送的消息需要经过分区器来决定该消息会分发到 topic 对应的 partition,当然如果指定了分区,那么就不需要分区器了;

4、这个时候消息离开生产者开始往kafka集群指定的 topic 和 partition 发送;

5、如果写入成功,kafka集群会回应 生产者一个 RecordMetaData 的消息,如果失败会根据配置的允许失败次数进行重试,如果还是失败,那么消息写入失败,并告诉生产者。

Kafka生产者的详细流程

2

步骤一:一条消息过来首先会被封装成为一个ProducerRecord 对象

步骤二:接下来要对这个对象进行序列化,因为 Kafka 的消息需要从客户端传到服务端,涉及到网络传输,所以需要实现序列。Kafka 提供了默认的序列化机制,也支持自定义序列化(这种设计也值得我们积累,提高项目的扩展性)。

步骤三:消息序列化完了以后,对消息要进行分区,分区的时候需要获取集群的元数据。分区的这个过程很关键,因为这个时候就决定了,我们的这条消息会被发送到 Kafka 服务端到哪个主题的哪个分区了。

步骤四:分好区的消息不是直接被发送到服务端,而是放入了生产者的一个缓存里面。在这个缓存里面,多条消息会被封装成为一个批次(batch),默认一个批次的大小是 16K。

步骤五:Sender 线程启动以后会从缓存里面去获取可以发送的批次。

步骤六:Sender 线程把一个一个批次发送到服务端。大家要注意这个设计,在 Kafka0.8 版本以前,Kafka 生产者的设计是来一条数据,就往服务端发送一条数据,频繁的发生网络请求,结果性能很差。后面的版本再次架构演进的时候把这儿改成了批处理的方式,性能指数级的提升,这个设计值得我们积累。 生产者细节深度剖析。

Kafka生产者中重难点分析

ProducerRecord

生产者需要往集群发送消息前,要先把每一条消息封装成ProducerRecord对象,这是生产者内部完成的。

序列化器

在创建ProducerRecord时,必须指定序列化器,除了默认提供的序列化器之外推荐使用序列化框架Avro、Thrift、ProtoBuf等,不推荐自己创建序列化器。因为如果一旦需要修改,那么在维护新旧消息代码的兼容性时会遇到不同程度的问题。

Apache Avro
Apache Avro是一个数据序列化系统,它支持丰富的数据结构,提供了紧凑的,快速的,二进制的数据格式。
在使用 Avro 之前,需要先定义模式(schema),模式通常使用 JSON 来编写。

(1)创建一个类代表客户,作为消息的value

class Custom {
    private int customID;
    privat String customerName;
    public Custom(int customID, String customerName) {
        super();
        this.customID = customID;
        this.customerName = customerName;
    }
    public int getCustomID() {
        return customID;
    }
    public String getCustomerName() {
        return customerName;
    }
}

(2)定义schema

{  
  "namespace": "customerManagement.avro",  
   "type": "record",  
   "name": "Customer",  
   "fields":[  
       {  
          "name": "id", "type": "string"  
       },  
       {  
          "name": "name",  "type": "string"  
       },  
   ]  
}

(3)生成Avro对象发送到Kafka。当使用Avro读取消息时,需要先读取整个的schema。为了实现这一点,可以使用了一个名为Schema Registry的架构。

Properties props = new Properties();  
props.put("bootstrap", "loacalhost:9092");  
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");  
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");  
props.put("schema.registry.url", schemaUrl);//schema.registry.url指向射麻的存储位置
String topic = "CustomerContacts";
Producer<String, Customer> produer = new KafkaProducer<String, Customer>(props);
//不断生成消息并发送
while (true) {
  Customer customer = CustomerGenerator.getNext();
  ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);
  producer.send(record);//将customer作为消息的值发送出去,KafkaAvroSerializer会处理剩下的事情
}

分区

当key为空且使用默认的分区器时,消息会被随机发送到指定topic的其中一个可用分区,会使用round-robin算法均衡分区间的消息。

当key不为空且使用默认的分区器时,Kafka会计算该key的hash值,并使用得到的hash值把消息映射到特定的分区,把一个key始终映射到同一分区是非常重要的。

只要一个topic的分区数量不变,key与分区的映射关系就能保证一致。但是如果你添加一个新的分区到一个topic时,虽然存在的数据仍然会保存在原来的分区里,但具有相同key的新消息不能保证还会写入到原来的分区。所以在创建topic时最好预先定义好需要的分区数量,避免后期添加新的分区造成映射关系的不一致。

缓冲区

一个消息被分区以后,消息首先会被放到一个缓存里面,我们看一下里面具体的细节。
默认缓存块的大小是 32M,这个缓存块里面有一个重要的数据结构:batches,这个数据结构是 key-value 的数据结构。key 就是消息主题的分区,value 是一个队列,里面存的是发送到对应分区的批次。
3

生产者高级设计之自定义数据结构

生产者把批次信息用 batches 这个对象进行存储。如果是大家,大家会考虑用什么数据结构去存储批次信息?

Kafka 这儿采取的方式是自定义了一个数据结构:CopyOnWriteMap。熟悉 Java 的同学都知道,JUC 下面是有一个 CopyOnWriteArrayList 的数据结构的,但是没有 CopyOnWriteMap,我这儿给大家解释一下 Kafka 为什么要设计这样的一个数据结构。

1.他们存储的信息的是 key-value 的结构,key 是分区,value 是要存到这个分区的对应批次(批次可能有多个,所以用的是队列),故因为是 key-value 的数据结构,所以锁定用 Map 数据结构。

2.这个 Kafka 生产者面临的是一个高并发的场景,大量的消息会涌入这个这个数据结构,所以这个数据结构需要保证线程安全,这样我们就不能使用 HashMap 这样的数据结构了。

3.这个数据结构需要支持的是读多写少的场景。读多是因为每条消息过来都会根据 key 读取 value 的信息,假如有 1000 万条消息,那么就会读取 batches 对象 1000 万次。写少是因为,比如我们生产者发送数据需要往一个主题里面去发送数据,假设这个主题有 50 个分区,那么这个 batches 里面就需要写 50 个 key-value 数据就可以了(大家要搞清楚我们虽然要写 1000 万条数据,但是这 1000 万条是写入 queue 队列的 batch 里的,并不是直接写入 batches,所以就我们刚刚说的这个场景,batches 里只需要最多写 50 条数据就可以了)。

根据第二和第三个场景我们总结出来,Kafka 这儿需要一个能保证线程安全的,支持读多写少的 Map 数据结构。但是 Java 里面并没有提供出来的这样的一个数据,唯一跟这个需求比较接近的是 CopyOnWriteArrayList,但是偏偏它又不是 Map 结构,所以 Kafka 这儿模仿 CopyOnWriteArrayList 设计了 CopyOnWriteMap。采用了读写分离的思想解决了线程安全且支持读多写少等问题。

高效的数据结构保证了生产者的性能。(CopyOnWriteArrayList 不熟悉的同学,可以尝试百度学习)。这儿笔者建议大家可以去看看 Kafka 生产者往 batches 里插入数据的源码,生产者为了保证插入数据的高性能,采用了多线程,又为了线程安全,使用了分段加锁等多种手段,源码非常精彩。
生产者高级设计之内存池设计

刚刚我们看到 batches 里面存储的是批次,批次默认的大小是 16K,整个缓存的大小是 32M,生产者每封装一个批次都需要去申请内存,正常情况下如果一个批次发送出去了以后,那么这 16K 的内存就等着 GC 来回收了。但是如果是这样的话,就可能会频繁的引发 FullGC,故而影响生产者的性能,所以在缓存里面设计了一个内存池(类似于我们平时用的数据库的连接池),一个 16K 的内存用完了以后,把数据清空,放入到内存池里,下个批次用的时候直接从里面获取就可以。这样大大的减少了 GC 的频率,保证了生产者的稳定和高效(Java 的 GC 问题是一个头疼的问题,所以这种设计也非常值得我们去积累)。

Sender

把消息放进缓冲区之后,与此同时会有一个独立线程Sender去把一个个Batch发送给对应的主机。

生产者代码

//1. 设置参数
Properties properties = new Properties();
properties.put("bootstrap.servers", "120.27.233.226:9092");

properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

properties.put("acks", "-1");
properties.put("retries", 3);
properties.put("batch.size", 16384);
properties.put("linger.ms", 10);
properties.put("buffer.memory", 33554432);
properties.put("max.block.ms", 3000);
properties.put("max.request.size", 1048576);
properties.put("request.timeout.ms", 30000);
//2.创建Producer实例,跟broker建立socket
Producer<String, String> producer = null;
try {
    producer = new KafkaProducer<String, String>(properties);
    for (int i = 0; i < 100; i++) {
        String msg = "This is Message " + i;
        //3. 创建消息
        ProducerRecord<String, String> record = new ProducerRecord("test_topic", "test", msg);
        //4. 发送消息
        producer.send(record);
        System.out.println("Sent:" + msg);

        Thread.sleep(1000);
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    //5. 关闭连接
    producer.close();
}

关于消息发送

3.1 Fire-and-forget

发送消息后不需要关心是否发送成功。因为Kafka是高可用的,而且生产者会自动重新发送,所以大多数情况都会成功,但是有时也会失败。

ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry",
        "Precision Products", "France");
try {
    producer.send(record);
} catch (Exception e) {
    e.printStackTrace();
}

在发送消息之前有可能会发生异常,例如是,序列化消息失败的SerializationException,缓冲区满的BufferExhaustedException,发送超时的TimeoutException或者发送的线程被中断的InterruptException。

3.2 Synchronous send

同步发送,调用send()方法后返回一个Future对象,再调用get()方法会等待直到结果返回,根据返回的结果可以判断是否发送成功。

简单的使用下面的代码替换上面try里面的一行代码:

producer.send(record).get();

在调用send()方法后再调用get()方法等待结果返回。如果发送失败会抛出异常,如果发送成功会返回一个RecordMetadata对象,然后可以调用offset()方法获取该消息在当前分区的偏移量。

KafkaProducer有两种类型的异常,第一种是可以重试的Retriable,该类异常可以通过重新发送消息解决。例如是连接异常后重新连接、“no leader”异常后重新选取新的leader。KafkaProducer可以配置为遇到该类异常后自动重新发送消息直到超过重试次数。第二类是不可重试的,例如是“message size too large”(消息太大),该类异常会马上返回错误。

3.3 Asynchronous send

异步发送,在调用send()方法的时候指定一个callback函数,当broker接收到返回的时候,该callback函数会被触发执行。

class DemoProducerCallback implements Callback {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            e.printStackTrace();
        }
    }
}
producer.send(record, new DemoProducerCallback());

要使用callback函数,先要实现org.apache.kafka.clients.producer.Callback接口,该接口只有一个onCompletion方法。如果发送异常,onCompletion的参数Exception e会为非空。

Kafka中的异常

1)LeaderNotAvailableException:这个就是如果某台机器挂了,此时leader副本不可用,会导致你写入失败。需要等待其他follower副本切换为leader副本之后,才能继续写入,此时可以重试发送即可。如果说你平时重启kafka的broker进程,肯定会导致leader切换,会报LeaderNotAvailableException异常。

2)NotControllerException:这个也是同理,如果说Controller所在Broker挂了,那么此时会有问题,需要等待Controller重新选举,此时也是一样就是重试即可

3)NetworkException:网络异常,重试即可 我们之前配置了一个参数,retries,他会自动重试的,但是如果重试几次之后还是不行,就会提供Exception给我们来处理了。

代码参数调优

① acks 消息验证
acks消息发送成功判断
-1leader & all follower接收
1leader接收
0消息发送即可
② retries 重试次数(重要)
props.put("retries", 3);

在kafka中可能会遇到各种各样的异常,特别是网络突然出现问题,但是集群不可能每次出现异常都抛出,因为可能下一秒网络就恢复了,所以我们要设置重试机制。

③ batch.size 批次大小
props.put("batch.size", 32384);

批次的大小默认是16K,这里设置了32K,设置大一点可以稍微提高一下吞吐量,设置这个批次的大小还和消息的大小有关,假设一条消息的大小为16K,一个批次也是16K,这样的话批次就失去意义了。

④ linger.ms 发送时间限制
props.put("linger.ms", 100);

比如我现在设置了批次大小为32K,而一条消息是2K,此时已经有了3条消息发送过来,总大小为6K,而生产者这边就没有消息过来了,那在没够32K的情况下就不发送过去集群了吗?显然不是,linger.ms就是设置了固定多长时间,就算没塞满Batch,也会发送,上面我设置了100毫秒,所以就算我的Batch迟迟没有满32K,100毫秒过后都会向集群发送Batch。

⑤ buffer.memory 缓冲区大小
props.put("buffer.memory", 33554432);

当我们的Sender线程处理非常缓慢,而生产数据的速度很快时,我们中间的缓冲区如果容量不够,生产者就无法再继续生产数据了,所以我们有必要把缓冲区的内存调大一点,缓冲区默认大小为32M,其实基本也是合理的。

⑥ max.request.size 最大消息大小
props.put("max.request.size", 1048576);    

max.request.size:这个参数用来控制发送出去的消息的大小,默认是1048576字节,也就1M,这个一般太小了,很多消息可能都会超过1mb的大小,所以需要自己优化调整,把它设置更大一些(企业一般设置成10M),不然程序跑的好好的突然来了一条2M的消息,系统就报错了,那就得不偿失

⑦ request.timeout.ms 请求超时
props.put("request.timeout.ms", 30000); 

request.timeout.ms:这个就是说发送一个请求出去之后,他有一个超时的时间限制,默认是30秒,如果30秒都收不到响应(也就是上面的回调函数没有返回),那么就会认为异常,会抛出一个TimeoutException来让我们进行处理。如果公司网络不好,要适当调整此参数。

查看原文

赞 1 收藏 1 评论 0

Natasha 发布了文章 · 2020-11-02

程序媛和程序猿的爱情

2020-11-2
舍友总是说我给你迷得团团转,其实她不知道呀,你的温柔,把我这个一直以来硬邦邦的糖融成了棉花糖。跟你在一起,我真的觉得好幸福!
一起为未来努力呀!大猫~

查看原文

赞 0 收藏 0 评论 0

Natasha 发布了文章 · 2020-11-02

程序员,你和搬砖的区别?

一直以来,程序员总自嘲「搬砖」,时间久了已经成了习惯,言语聊天间都会说一句「搬砖去了」。将编程与搬砖类比,聊到了一个略显刺激却相对客观的事实 ———— 重复简单的工作,干了一年又一年,都叫搬砖,并且,即使在同一搬砖起跑线上,不同的努力会造成几年后的差距越来越大。而在编程方面,要往上做到资深工程师、专家、CTO 等岗位," 其实大部分人还真是在搬砖 "。

何为成长?成长是指自我提升,一方面是本身的个人能力,另一方面是社会对你的认可度。最终,程序员的职位和薪水都能在成长中得以体现。

很多人对成长有误解,在他们眼中,随着工作年限的提高,成长是理所当然的事情,这其实是一个误区。两个程序员同时工作 3 年,难道他们两个的成长就完全一样吗?其实是不一样的。很多岗位在招聘的时候都要求 3 年以上工作经验,这个 3 年工作经验是指持续成长的三年,而不是指浑浑噩噩混日子的三年。下面举个通俗易懂的例子,大家一定能理解。

这里拿盖大楼举例,比如某大公司发布了如下一个招聘需求,招聘资深建筑工程师,提供具有行业竞争力的薪酬,要求如下:

5 年房屋建设工作经验;

对房屋建设的各个阶段有深刻了解;

熟悉各种类型的砖头、钢筋、水泥等原材料的使用方式;

要求持有国家二级以上建造师证书;

吃苦耐劳优先。

看到这个JD后,只要是有 5 年经验的建造师都跃跃欲试,都觉得自己可以。

其实这个岗位是干什么的呢?这家大公司想盖一栋 50 层的高端商业写字楼,需要招一个资深工程师来完成整个高楼的地基、框架和外形的设计,还需要考虑容灾和抗震等级。

而来应聘的一大部分建筑工程师,他们虽然工作了 5 年,可是他们平时都在做什么呢?大概是这样的:

搬砖、和水泥、砌墙等;

建过的房子不超过 6 层;

从来没有考虑过地基的搭建、框架和外形的设计,因为他们不需要考虑,只要按照已有的方案来干就行。

就这样工作了 5 年,在自己的工作领域(搬砖、和水泥、砌墙)驾轻就熟,觉得建房子不就这么简单嘛,觉得自己已经精通了建房子。

但事实上,他们真的能够胜任 50 层高楼的建筑工作吗?很显然,不能!

什么样的人能够胜任这类工作呢?他们也工作了 5 年,他们平时所做的事情大概是这样的:

第一年:搬砖、和水泥、砌墙等,参与建造一些 6 层小楼;

第二年:搬砖、和水泥、砌墙等,同时研究各种砖头、水泥、钢筋等原材料的特性,知道不同原材料的适用场景,可以独立建造 6 层小楼了;

第三年:跑到另一个建筑队,建造 10-20 层的普通住宅,开始跟着工头一起参与地基、框架和外形的设计,业余时间阅读一些建筑书籍,了解一些国内外知名建筑的设计思想和理念,并时常在建筑论坛发表自己对建筑设计的理解;

第四年:开始独立负责 10-20 层普通住宅的建造了,在业余时间开始研究 30-40 层商业建筑的设计和建造,考虑设计理念、建筑容灾等问题,一心想自己设计一栋更高的大楼;

第五年:一个合适的契机,加入这家大公司,主导 50 层高端商业写字楼的设计和建造。

到这里,我想大家都明白了。搬 5 年砖也还只是一个搬砖的,无论如何也设计不了摩天大楼。

对于程序员来说,待在同一个岗位重复着搬砖的工作,是无法有很大成长的,只有不断地挑战自我才是正确的成长姿势。

查看原文

赞 0 收藏 0 评论 0

Natasha 发布了文章 · 2020-10-31

(二)Kafka之集群架构原理

原理至关重要,面试的时候不可能问你命令的,都是问原理,懂了原理线上如果使用kafka出了问题才可能快速定位,而不是一脸蒙圈。必须要明白原理,如果不说原理直接实战,就真成搬砖了。

Topic

创建一个TopicA的主题,3个分区分别存储在不同的服务器,注意Topic是一个逻辑上的概念。

img

Partition & Partition副本

Kafka的topic可以划分成一个或多个partition,Partition 是物理上的概念。如果一个topic的副本数设为3,那么每个partition对应还会有3个相同的副本。下图我们对TopicA的分区0,1,2分别设置了3个副本,再分别存储在broker0,1,2。

img

日志分段存储

img

由于生产者生产的消息会不断追加到 log 文件末尾,为防止 log 文件过大导致数据定位效率低下,Kafka 采取了分片和索引机制。

它将每个 Partition 分为多个 Segment,每个 Segment 对应两个文件:“.index” 索引文件和 “.log” 数据文件。

Leader & Follow

而且每个副本都是有角色之分的,它们会选举一个副本作为leader,其余的为follower。生产者在发送数据的时候,是直接发送到 leader partition,然后follower partition自行去leader进行数据同步,消费者消费数据的时候,也是从leader中消费数据。(下图在TopicA-partition-0在broker0是leader,同理其他TopicA-partition-N也有leader)

img

Consumer & Consumer group

一个消费组由一个或多个消费者实例组成,便于扩容与容错。一个分区不会让同一个消费者组里面的多个消费者去消费,一个消费者是可以去消费多个分区的数据的。

img

img

Kafka的网络设计

img

  1. 客户端将请求发送给Acceptor,broker里有3个processor的线程(默认是3),Acceptor不会对客户端的请求做任何的处理,而是封装成socketChannel,然后发送给3个processor线程,形成一个队列。发送的方式是轮询,就是发送给第一个processor,然后是第二个,第三个...
  2. 消费者线程会以request请求去消费这些socketChannel;
  3. 线程池里面默认有8个ReaderThreadPool线程,这些线程是用来处理request的,解析请求,返回响应结果response;
  4. processor会从response中读取响应数据,然后再返回给客户端。

所以如果我们需要对kafka进行增强调优,增加processor并增加线程池里面的处理线程,就可以达到效果。request和response那一块部分其实就是起到了一个缓存的效果,是考虑到processor们生成请求太快,线程数不够不能及时处理的问题。
所以这就是一个加强版的reactor网络线程模型。

Kafka零拷贝

传统IO:

img

//读取文件,再用socket发送出去
buffer = File.read 
Socket.send(buffer)

1、第一次:将磁盘文件,读取到操作系统内核缓冲区;
2、第二次:将内核缓冲区的数据,copy到application应用程序的buffer;
3、第三步:将application应用程序buffer中的数据,copy到socket网络发送缓冲区(属于操作系统内核的缓冲区);
4、第四次:将socket buffer的数据,copy到网卡,由网卡进行网络传输。

传统方式,读取磁盘文件并进行网络发送,经过的四次数据copy是非常繁琐的。实际IO读写,需要进行IO中断,需要CPU响应中断(带来上下文切换),尽管后来引入DMA来接管CPU的中断请求,但四次copy是存在“不必要的拷贝”的。

零拷贝:

img

Kafka使用的zero-copy的应用程序要求内核直接将数据从磁盘文件拷贝到套接字,而无需通过应用程序。零拷贝不仅大大地提高了应用程序的性能,而且还减少了内核与用户模式间的上下文切换。

zookeeper在kafka集群中的作用

1、Broker注册

Broker是分布式部署并且相互独立,但是需要有一个注册系统能够将整个集群中的Broker管理起来,此时就使用到了Zookeeper。在Zookeeper上会有一个专门用来进行Broker服务器列表记录的节点:/brokers/ids

每个Broker在启动时,都会到Zookeeper上进行注册,即到/brokers/ids下创建属于自己的节点,如/brokers/ids/[0...N]。

Kafka使用了全局唯一的数字ID来指代每个Broker服务器,创建完节点后,每个Broker就会将自己的IP地址和端口信息记录到该节点中去。其中,Broker创建的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。

2、Topic注册

在Kafka中,Topic的消息分区与Broker的对应关系也都是由Zookeeper在维护,由专门的节点来记录,如:/borkers/topics

Kafka中每个Topic都会以/brokers/topics/[topic]的形式被记录,如 /brokers/topics/login 和 /brokers/topics/search 等。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册自己的Broker ID,并写入该Topic的分区总数,如/brokers/topics/login/3->2,这表示Broker ID为3的节点对"login"这个Topic提供了2个分区进行消息存储。同样,这个分区节点也是临时节点。

3、消费者注册

①、注册节点到消费者分组。每个消费者服务器启动时,都会到Zookeeper的指定节点下创建一个属于自己的消费者节点,例如/consumers/[group_id]/ids/[consumer_id],完成节点的创建后,消费者就会将自己订阅的Topic信息写入该临时节点。

②、对消费者分组中的消费者的变化注册监听。每个 消费者都需要关注所属消费者分组 其他消费者服务器的变化情况,即对/consumers/[group_id]/ids节点注册子节点变化的Watcher监听,一旦发现消费者新增或减少,就触发消费者的负载均衡。

4、分区 与 消费者 的关系

在Kafka中,规定了每个消息分区只能被同组的一个消费者进行消费,因此,需要在 Zookeeper 上记录 消息分区 与 Consumer 之间的关系,每个消费者一旦确定了对一个消息分区的消费权力,需要将其Consumer ID 写入到 Zookeeper 对应消息分区的临时节点上,例如:

/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]

其中,[broker_id-partition_id]就是一个 消息分区 的标识,节点内容就是该 消息分区 上 消费者的Consumer ID。

5、消息消费进度Offset 记录

在消费者对指定消息分区进行消费中,需要定时地将分区消息的消费进度Offset记录到Zookeeper上,以便在该消费者进行重启或者其他消费者重新接管该消息分区的消息消费后,能够从之前的进度开始继续进行消息消费。Offset在Zookeeper中由一个专门节点进行记录,其节点路径为:

/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]

节点内容就是Offset的值。

6、生产者负载均衡

由于同一个Topic消息会被分区,并被分布在多个Broker上,因此,生产者需要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。

(1) 四层负载均衡,通常,一个生产者只会对应单个Broker,然后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每个生产者不需要同其他系统建立额外的TCP连接,只需要和Broker维护单个TCP连接即可。但是,其无法做到真正的负载均衡,因为实际系统中的每个生产者产生的消息量及每个Broker的消息存储量都是不一样的,如果有些生产者产生的消息远多于其他生产者的话,那么会导致不同的Broker接收到的消息总数差异巨大,同时,生产者也无法实时感知到Broker的新增和删除。

(2) 使用Zookeeper进行负载均衡,由于每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更,这样就可以实现动态的负载均衡机制。

7、消费者负载均衡

与生产者类似,Kafka中的消费者同样需要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息。

查看原文

赞 1 收藏 1 评论 0

Natasha 发布了文章 · 2020-10-31

(五)Windows下Hadoop的启动,HDFS上传文件等命令

PS:一定要在管理员窗口打开,不要打开powershell!
PS:一定要在管理员窗口打开,不要打开powershell!
PS:一定要在管理员窗口打开,不要打开powershell!

PS D:\open_source\hadoop-2.5.2\bin> hdfs namenode -format
PS D:\open_source\hadoop-2.5.2\bin> start-all.cmd

PS D:\open_source\hadoop-2.5.2\bin> hadoop fs -mkdir hdfs://localhost:9000/input
PS D:\open_source\hadoop-2.5.2\bin> hadoop fs -mkdir hdfs://localhost:9000/output
PS D:\open_source\hadoop-2.5.2\bin> hadoop fs -put D:\open_source\hadoop-2.5.2\AvgTemperature.txt hdfs://localhost:9000/input/ 
1、停止集群(切换到/sbin目录下)
stop-all.sh
2、重新格式化namenode(切换到hadoop目录下的bin目录下)
hdfs namenode -format
3、重新启动hadoop集群(切换到hadoop目录下的sbin目录下)
start-all.sh
4、
hadoop dfsadmin -report
D:\open_source\hadoop-2.5.2\bin>hadoop fs -ls /input/
D:\open_source\hadoop-2.5.2\bin>hadoop fs -cat /output/part-00000
查看原文

赞 0 收藏 0 评论 0

Natasha 发布了文章 · 2020-10-31

(四)Hadoop之MapReduce实战小例子

输入数据文件 AvgTemperature.txt

DATE,HOUR,COND,PRES,HUM,TMP,AQI,PM2.5,PM10
20160602,00,霾,1984,130,9,390,348,300
20160802,01,霾,1163,81,8,393,368,302
20160706,02,霾,1079,108,17,360,394,306
20160706,03,霾,1116,79,6,339,387,303
20160502,04,霾,1198,98,16,357,325,307 
20160602,05,霾,1762,126,9,324,316,301
20160408,06,霾,1996,131,3,349,344,301
20160604,07,霾,1952,119,26,347,300,309
20160105,08,霾,1410,81,8,350,395,307
20160104,09,霾,1718,130,4,352,335,308
20160501,10,霾,1714,119,27,310,336,307
20160601,11,霾,1660,130,23,311,364,302
20160606,12,霾,1598,96,12,369,346,309
20160602,13,霾,1673,127,2,343,346,303
20160706,14,霾,1578,122,8,360,323,307
20160707,15,霾,1237,118,12,384,384,301
20160205,16,霾,1231,78,9,361,357,302
20160605,17,霾,1166,86,30,350,388,307
20160506,18,霾,1426,94,2,378,372,305
20160805,19,霾,1874,144,20,376,327,302
20160405,20,霾,1778,94,22,360,335,304
20160104,21,霾,1055,64,22,376,361,305
20160304,22,霾,1349,78,15,367,384,308
20160203,23,霾,2004,110,2,359,371,304
20160603,24,霾,1375,115,19,308,301,308
20160402,25,霾,1201,69,5,387,342,305
20160707,26,霾,1272,112,23,348,333,307
20160702,27,霾,1738,60,12,393,300,303
20160301,28,霾,1752,107,12,364,331,301
20160704,29,霾,1442,65,9,332,369,308

第一题:编写月平均气温统计程序

image.png

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class AvgTemperature {

    public static class StatMapper extends Mapper<Object, Text, Text, IntWritable> {
        private IntWritable intValue = new IntWritable();
        private Text dateKey = new Text();
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] items = value.toString().split(",");
            String date = items[0];
            String tmp = items[5];
            if(!"DATE".equals(date) && !"N/A".equals(tmp)){//排除第一行说明以及未取到数据的行
                dateKey.set(date.substring(0, 6));
                intValue.set(Integer.parseInt(tmp));
                context.write(dateKey, intValue);
            }
        }
    }

    public static class StatReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int tmp_sum = 0;
            int count = 0;
            for(IntWritable val : values){
                tmp_sum += val.get();
                count++;
            }
            int tmp_avg = tmp_sum/count;
            result.set(tmp_avg);
            context.write(key, result);
        }
    }

    public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = new Job(conf, "AvgTemperature");
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        job.setJarByClass(AvgTemperature.class);

        job.setMapperClass(StatMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(StatReducer.class);
        job.setPartitionerClass(HashPartitioner.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        TextOutputFormat.setOutputPath(job, new Path(args[1]));
        TextInputFormat.setInputPaths(job, args[0]);
        job.setNumReduceTasks(Integer.parseInt(args[2]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

image.png

运行结果:

201601    11
201602    5
201603    13
201604    10
201605    15
201606    16
201607    12
201608    14

第二题:编写每日空气质量统计程序

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

import java.io.IOException;

/**
 * @Author Natasha
 * @Description
 * @Date 2020/10/30 20:37
 **/
public class AirQuality {
    public static class AirQualityMapprer extends Mapper<Object, Text, Text, IntWritable>{

        private Text text = new Text();
        private IntWritable intWritable = new IntWritable();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            String[] item = value.toString().split(",");

            String date = item[0];
            String kongqi = item[6];
            if(!"DATE".equals(date) && !"N/A".equals(kongqi)){//排除第一行说明以及未取到数据的行
                text.set(date.substring(0, 6));
                intWritable.set(Integer.parseInt(kongqi));
                context.write(text, intWritable);
            }
        }
    }

    public static class AirQualityReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
        private IntWritable res = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> value, Context context) throws IOException, InterruptedException {
            int aqi = 0;
            int cnt = 0;
            for(IntWritable iw : value){
                aqi += iw.get();
                cnt++;
            }
            int aqi_avg = aqi/cnt;
            res.set(aqi_avg);
            context.write(key, res);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = new Job(conf, "AirQuality");

        job.setJarByClass(AirQuality.class);
        job.setInputFormatClass(TextInputFormat.class);

        job.setMapperClass(AirQualityMapprer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setPartitionerClass(HashPartitioner.class);

        job.setReducerClass(AirQualityReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setNumReduceTasks(Integer.parseInt(args[2]));

        TextInputFormat.setInputPaths(job, args[0]);
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

运行结果:

201601    359
201602    360
201603    365
201604    365
201605    348
201606    342
201607    359
201608    384
查看原文

赞 0 收藏 0 评论 0

Natasha 关注了用户 · 2020-10-30

codecraft @codecraft

当一个代码的工匠回首往事时,不因虚度年华而悔恨,也不因碌碌无为而羞愧,这样,当他老的时候,可以很自豪告诉世人,我曾经将代码注入生命去打造互联网的浪潮之巅,那是个很疯狂的时代,我在一波波的浪潮上留下了或重如泰山或轻如鸿毛的几笔。

关注 1108

认证与成就

  • 获得 3 次点赞
  • 获得 3 枚徽章 获得 0 枚金徽章, 获得 0 枚银徽章, 获得 3 枚铜徽章

擅长技能
编辑

开源项目 & 著作
编辑

(゚∀゚ )
暂时没有

注册于 2019-04-26
个人主页被 1.5k 人浏览