kafka原理

Kafka 2.0.0。

一、简介

分布式的消息和订阅系统,高性能、高吞吐量。
内置分区(对数据做分片处理)、实现集群、有容错能力和数据复制能力。

二、产生背景

领英要对用户的行为进行统计。

三、应用场景

行为跟踪:收集用户的操作行为。可以根据爱好做推送。
日志收集。

四、术语

  • Topic
    存储消息的逻辑概念。
  • Partition
  1. 每个topic可以划分多个分区。
  2. 相同topic下的不同分区的消息是不同的。如果是集群,分区会均分在集群机子上。

    设置分区目的:减少消息容量,从而提升io性能。

五、架构


一个kafka集群会包含多个broker,它又依赖zookeeper的集群,去实现broker之间的master-slaver机制。
多个producer,多个consumer。
消费者去broker里pull消息的。而mq是主动推送给消费者。

六、消息的同步发送和异步发送

  • 异步发送:
    kafka1.0以后,默认的client使用的都是异步发送消息。消息通过kafka producer发送后,这个消息放到了后台一个消息队列里,然后通过一个线程不断的从队列里取出消息进行发送。消息发送成功后,会进行一个callback回调。
  • 同步发送:
    通过future和get。get方法是个阻塞。同步去获得结果。

七、消息分发策略

消息由key和value组成,key是可选项,producer会根据key和partition机制来判断当前这条消息应该放到哪个partition里面。默认算法是哈希取模。如果key为null,则随机分配,根据metadata.max.age.ms来,十分钟更新一次。
分区分配策略:

  • Range(范围) -> 默认
    针对于同一个topic中的多个partition而言的。首先会对这个topic中的partition进行排序,然后 partition数量除以consumer数量,加入有0-9个partition,3个consumer,那consumer1会消费前4个分区。如果是多个topic,那每次多消费的都是第一个。
  • 轮询Round-Robin
    把所有的partition和consumer数量列出来,然后按照hashcode进行排序,通过轮询算法分配partition和consumer。

也可以自定义分发规则,implements Partitioner。

什么时候会触发rebalance ?
1、对于同一个consumer group,新增consumer
2、consumer离开
3、consumer取消订阅
4、topic中新增分区

谁来执行rebalance,以及管理consumer group?
coordinator

八、消息的存储策略

  • 消息保存的路径
    默认tmp,也可以自定义。
  • 消息的写入性能
    顺序写入
    零拷贝
  • 消息的存储机制
    日志分段,方便清理和压缩。根据时间(默认保留7天)或者 大小,满足其中之一,就会被清理掉。

九、Partition副本机制

分区是对数据内容的分片,每个分区里的内容不一样,当一个分区不可用时,有一部分消息就没办法消费。所以为了提高分区的可用性,去实现冗余的备份,就是副本。如果有多个副本,一定会有个leader副本和follower副本。命令中通过--replication-factor参数去设置。
第i个分区的第j个副本,会落在 第 (i+j)% broker counts 个broker上。
查看分区状态

1
get /brokers/topics/topic_name/partitions/partition_num/state

其中 isr维护的是当前分区,所有的副本集。follower的内容必须跟leader的在一定阈值范围内保持一致,如果不一致,就会被踢出去,直到follower的内容与leader内容保持在一定阈值范围内,follower才会被加进来。
leader副本 负责接收客户端的写入和读取请求。
follower副本 负责从leader副本中读取数据。

十、消息消费原理

consumer可以指定消费哪个partition。如果不指定,会按照一定的策略进行负载,比如三个consumer、三个partition,就一个consumer消费一个partition。如果consumer数量小于partition数量,则有的consumer会多消费一些partition。如果consumer数量大于partition数量,则有的consumer会消费不到消息,会造成一定的浪费,所以不建议设置太多的consumer。因为在一个partition上是不允许并发的。consumer的数量最好是partition数量的整数倍。如果consumer从多个partition上读到消息,是不保证顺序的。

十、集群

集群的构建是基于zookeeper的。
修改config/server.properties中三个地方:

  1. zookeeper的地址,
  2. broker.id(在kafka集群中必须是唯一的)。
  3. listeners,是为了各个节点互相通信,所以需要写各个节点自己的ip。

启动集群各节点kafka后,会看到zookeeper上多了几个节点。
启动zookeeper客户端:

1
./zookeeper-3.5.4-beta/bin/zkCli.sh

查看所有kafka集群节点的id:

1
ls /brokers/ids

查看kafka主节点:

1
get /controller

写请求会进入master节点,读请求进入其他节点。
选举规则:最小的节点,也就是最早注册的节点是leader。

十二、使用api

  1. 引入kafka依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.1.0</version>
    </dependency>
  2. 消息发送端
    (1) 设置kafka配置信息(集群地址、序列化),创建KafkaProducer。
    可设置参数如:

  • ProducerConfig.ACKS_CONFIG,
    0:消息发送给broker以后,不需要确认(性能较高,但是数据会丢失)。
    1:只需要获得kafka集群中leader节点的确认,即可返回。
    all(-1):需要集群中的所有节点确认。(最安全,性能最低的)

  • batch.size(默认16kb)
    producer对于同一个分区来说,会按照batch.size的大小进行统一收集后,批量发送。

  • linger.ms
    按时间间隔,进行统一收集后,批量发送。
    如果都设置了,满足其中任意一个条件,消息就会立马发送。
    为了解决大量的小数据包频繁的发送,这个问题。

  • max.request.size(默认1M)
    控制请求的大小

(2) 通过send方法发送消息,入参为ProducerRecord(topic,message)。

  1. 消息接受端
    (1) 设置kafka配置信息(集群地址、序列化),创建KafkaConsumer。
    不同的组,只要订阅了,每个组都可以获取消息。
    同一个组内的消费者们,只能有一个消费者能获取到消息,其他成员不会获取到。
    可设置参数如:
  • AUTO_OFFSET_RESET_CONFIG:
    earliest:对于新的group id来说,它会从最早的消息开始消费。对于已经消费过消息的group id来说,它还是会从已经消费过的最大的offset里去取。
    latest:对于新的group_id来说,直接从已经消费过并且提交的最大的偏移量开始取。
    • ENABLE_AUTO_COMMIT_CONFIG:自动提交
    • AUTO_COMMIT_INTERVAL_MS_CONFIG:自动提交的间隔毫秒
      就是说每xx毫秒,对这个时间段内的所有消息,进行提交确认。
      消息消费完以后,要进行提交确认。如果设置为false的话,消费一次后,还允许再次消费。设置为true,则消费一次后,不会被再次消费到。
    • MAX_POLL_RECORDS_CONFIG:每一次调用poll,获取到的消息数。这样可以根据消费端的处理性能,来预设一个数量,减少poll的次数,提升性能。

(2) 通过subscribe方法订阅消息。

本文由 lilyssh创作。可自由转载、引用,但需署名作者且注明文章出处。


当前网速较慢或者你使用的浏览器不支持博客特定功能,请尝试刷新或换用Chrome、Firefox等现代浏览器