logo资料库

图解 Kafka 之实战指南.pdf

第1页 / 共71页
第2页 / 共71页
第3页 / 共71页
第4页 / 共71页
第5页 / 共71页
第6页 / 共71页
第7页 / 共71页
第8页 / 共71页
资料共71页,剩余部分请下载后查看
初识Kafka Kafka 起初是 由 LinkedIn 公司采⽤ Scala 语⾔开发的⼀个多分区、多副本且基于 ZooKeeper 协调的分布式消息系统,现已被捐献给 Apache 基⾦会。⽬前 Kafka 已经定位为⼀个分布式流式处理平台,它以 ⾼吞吐、可持久化、可⽔平扩展、⽀持流数据处理等多种特性⽽被⼴泛使⽤。⽬前越来越多的开源分布式处理系统如 Cloudera、Storm、Spark、Flink 等都⽀持与 Kafka 集成。 Kafka 之所以受到越来越多的青睐,与它所“扮演”的三⼤⾓⾊是分不开的: 消息系统: Kafka 和传统的消息系统(也称作消息中间件)都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了⼤多数消息系统难以实 现的消息顺序性保障及回溯消费的功能。 存储系统: Kafka 把消息持久化到磁盘,相⽐于其他基于内存存储的系统⽽⾔,有效地降低了数据丢失的风险。也正是得益于 Kafka 的消息持久化功能和多副本机制,我们可以把 Kafka 作为长期的 数据存储系统来使⽤,只需要把对应的数据保留策略设置为“永久”或启⽤主题的⽇志压缩功能即可。 流式处理平台: Kafka 不仅为每个流⾏的流式处理框架提供了可靠的数据来源,还提供了⼀个完整的流式处理类库,⽐如窗⼜、连接、变换和聚合等各类操作。 基本概念 ⼀个典型的 Kafka 体系架构包括若⼲ Producer、若⼲ Broker、若⼲ Consumer,以及⼀个 ZooKeeper 集群,如下图所⽰。其中 ZooKeeper 是 Kafka ⽤来负责集群元数据的管理、控制器的选举等操作的。 Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,⽽ Consumer 负责从 Broker 订阅并消费消息。 整个 Kafka 体系结构中引⼊了以下3个术语: 1. Producer: ⽣产者,也就是发送消息的⼀⽅。⽣产者负责创建消息,然后将其投递到 Kafka 中。 2. Consumer: 消费者,也就是接收消息的⼀⽅。消费者连接到 Kafka 上并接收消息,进⽽进⾏相应的业务逻辑处理。 3. Broker: 服务代理节点。对于 Kafka ⽽⾔,Broker 可以简单地看作⼀个独⽴的 Kafka 服务节点或 Kafka 服务实例。⼤多数情况下也可以将 Broker 看作⼀台 Kafka 服务器,前提是这台服务器上只部 署了⼀个 Kafka 实例。⼀个或多个 Broker 组成了⼀个 Kafka 集群。⼀般⽽⾔,我们更习惯使⽤⾸字母⼩写的 broker 来表⽰服务代理节点。 在 Kafka 中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka 中的消息以主题为单位进⾏归类,⽣产者负责将消息发送到特定的主题(发送到 Kafka 集群中的每⼀条消息都要指定⼀个 主题),⽽消费者负责订阅主题并进⾏消费。 主题是⼀个逻辑上的概念,它还可以细分为多个分区,⼀个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同⼀主题下的不同分区包含的消息是不同的,分区在存储层⾯可 以看作⼀个可追加的⽇志(Log)⽂件,消息在被追加到分区⽇志⽂件的时候都会分配⼀个特定的偏移量(offset)。 offset 是消息在分区中的唯⼀标识,Kafka 通过它来保证消息在分区内的顺序性,不过 offset 并不跨越分区,也就是说,Kafka 保证的是分区有序⽽不是主题有序。
如上图所⽰,主题中有4个分区,消息被顺序追加到每个分区⽇志⽂件的尾部。Kafka 中的分区可以分布在不同的服务器(broker)上,也就是说,⼀个主题可以横跨多个 broker,以此来提供⽐单个 broker 更强⼤的性能。 每⼀条消息被发送到 broker 之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。如果⼀个主题只对应⼀个⽂件,那么这个⽂件 所在的机器I/O将会成为这个主题的性能瓶颈,⽽分区解决了这个问题。在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数 量可以实现⽔平扩展。 Kafka 为分区引⼊了多副本(Replica)机制,通过增加副本数量可以提升容灾能⼒。 同⼀分区的不同副本中保存的是相同的消息(在同⼀时刻,副本之间并⾮完全⼀样),副本之间是“⼀主多从”的关系,其中 leader 副本负责处理读写请求,follower 副本只负责与 leader 副本的消息同步。 副本处于不同的 broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的⾃动转移,当 Kafka 集群中某个 broker 失效时仍然能 保证服务可⽤。 如上图所⽰,Kafka 集群中有4个 broker,某个主题中有3个分区,且副本因⼦(即副本个数)也为3,如此每个分区便有1个 leader 副本和2个 follower 副本。⽣产者和消费者只与 leader 副本进⾏交互,⽽ follower 副本只负责消息的同步,很多时候 follower 副本中的消息相对 leader 副本⽽⾔会有⼀定的滞后。 Kafka 消费端也具备⼀定的容灾能⼒。Consumer 使⽤拉(Pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进⾏消 费,这样就不会造成消息丢失。 分区中的所有副本统称为 AR(Assigned Replicas)。所有与 leader 副本保持⼀定程度同步的副本(包括 leader 副本在内)组成ISR(In-Sync Replicas),ISR 集合是 AR 集合中的⼀个⼦集。消息会先发送 到 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进⾏同步,同步期间内 follower 副本相对于 leader 副本⽽⾔会有⼀定程度的滞后。
前⾯所说的“⼀定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进⾏配置。与 leader 副本同步滞后过多的副本(不包括 leader 副本)组成 OSR(Out-of-Sync Replicas),由此可 见,AR=ISR+OSR。在正常情况下,所有的 follower 副本都应该与 leader 副本保持⼀定程度的同步,即 AR=ISR,OSR 集合为空。 leader 副本负责维护和跟踪 ISR 集合中所有 follower 副本的滞后状态,当 follower 副本落后太多或失效时,leader 副本会把它从 ISR 集合中剔除。如果 OSR 集合中有 follower 副本“追上”了 leader 副本,那 么 leader 副本会把它从 OSR 集合转移⾄ ISR 集合。默认情况下,当 leader 副本发⽣故障时,只有在 ISR 集合中的副本才有资格被选举为新的 leader,⽽在 OSR 集合中的副本则没有任何机会(不过这个原 则也可以通过修改相应的参数配置来改变)。 ISR 与 HW 和 LEO 也有紧密的关系。HW 是 High Watermark 的缩写,俗称⾼⽔位,它标识了⼀个特定的消息偏移量(offset),消费者只能拉取到这个 offset 之前的消息。 如上图所⽰,它代表⼀个⽇志⽂件,这个⽇志⽂件中有9条消息,第⼀条消息的 offset(LogStartOffset)为0,最后⼀条消息的 offset 为8,offset 为9的消息⽤虚线框表⽰,代表下⼀条待写⼊的消息。⽇志 ⽂件的 HW 为6,表⽰消费者只能拉取到 offset 在0⾄5之间的消息,⽽ offset 为6的消息对消费者⽽⾔是不可见的。 LEO 是 Log End Offset 的缩写,它标识当前⽇志⽂件中下⼀条待写⼊消息的 offset,上图中 offset 为9的位置即为当前⽇志⽂件的 LEO,LEO 的⼤⼩相当于当前⽇志分区中最后⼀条消息的 offset 值加1。分 区 ISR 集合中的每个副本都会维护⾃⾝的 LEO,⽽ ISR 集合中最⼩的 LEO 即为分区的 HW,对消费者⽽⾔只能消费 HW 之前的消息。 注意要点:很多资料中误将上图中的 offset 为5的位置看作 HW,⽽把 offset 为8的位置看作 LEO,这显然是不对的。 为了让读者更好地理解 ISR 集合,以及 HW 和 LEO 之间的关系,下⾯通过⼀个简单的⽰例来进⾏相关的说明。如上图所⽰,假设某个分区的 ISR 集合中有3个副本,即⼀个 leader 副本和2个 follower 副 本,此时分区的 LEO 和 HW 都为3。消息3和消息4从⽣产者发出之后会被先存⼊ leader 副本,如下图所⽰。 在消息写⼊ leader 副本之后,follower 副本会发送拉取请求来拉取消息3和消息4以进⾏消息同步。
在同步过程中,不同的 follower 副本的同步效率也不尽相同。如上图所⽰,在某⼀时刻 follower1 完全跟上了 leader 副本⽽ follower2 只同步了消息3,如此 leader 副本的 LEO 为5,follower1 的 LEO 为 5,follower2 的 LEO 为4,那么当前分区的 HW 取最⼩值4,此时消费者可以消费到 offset 为0⾄3之间的消息。 写⼊消息(情形4)如下图所⽰,所有的副本都成功写⼊了消息3和消息4,整个分区的 HW 和 LEO 都变为5,因此消费者可以消费到 offset 为4的消息了。 由此可见,Kafka 的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能⼯作的 follower 副本都复制完,这条消息才会被确认为已成功提交,这种复制⽅式极⼤地影响 了性能。⽽在异步复制⽅式下,follower 副本异步地从 leader 副本中复制数据,数据只要被 leader 副本写⼊就被认为已经成功提交。在这种情况下,如果 follower 副本都还没有复制完⽽落后于 leader 副 本,突然 leader 副本宕机,则会造成数据丢失。Kafka 使⽤的这种 ISR 的⽅式则有效地权衡了数据可靠性和性能之间的关系。 安装与配置 本节详细介绍 Kafka 运⾏环境的搭建,为了节省篇幅,本节的内容以 Linux CentOS 作为安装演⽰的操作系统,其他 Linux 系列的操作系统也可以参考本节的内容。具体的操作系统的信息如下: [root@node1 ~]# uname -a Linux node1 2.6.32-504.23.4.el6.x86_64 #1 SMP Tue Jun 9 20:57:37 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux [root@node1 ~]# cat /etc/issue CentOS release 6.6 (Final) Kernel \r on an \m 由第1节中第1张图可知,搭建 Kafka 运⾏环境还需要涉及 ZooKeeper,Kafka 和 ZooKeeper 都是运⾏在 JVM 之上的服务,所以还需要安装 JDK。Kafka 从2.0.0版本开始就不再⽀持 JDK7 及以下版本,本节 就以 JDK8 为例来进⾏演⽰。 1. JDK的安装与配置 很多学习 Kafka 的读者也都是 JVM 系语⾔的⽀持者,如果你的操作系统中已经安装了 JDK8 及以上版本则可以跳过这段内容。 安装 JDK 的第⼀步就是下载 JDK 1.8的安装包,可以进⼊ Oracle 官⽹页⾯进⾏下载。⽰例中选择的安装包是 jdk-8u181-linux-x64.tar.gz,我们这⾥将其先复制⾄/opt ⽬录下,本书所有与安装有关的操作都 在这个⽬录下进⾏。 其次将/opt ⽬录下的安装包解压,相关信息如下: [root@node1 opt]# ll jdk-8u181-linux-x64.tar.gz -rw-r--r-- 1 root root 185646832 Aug 31 14:48 jdk-8u181-linux-x64.tar.gz [root@node1 opt]# tar zxvf jdk-8u181-linux-x64.tar.gz # 解压之后当前/opt⽬录下⽣成⼀个名为jdk1.8.0_181的⽂件夹 [root@node1 opt]# cd jdk1.8.0_181/ [root@node1 jdk1.8.0_181]# pwd /opt/jdk1.8.0_181 # 上⾯这个就是当前JDK8的安装⽬录 然后配置 JDK 的环境变量。修改/etc/profile ⽂件并向其中添加如下配置: export JAVA_HOME=/opt/jdk1.8.0_181 export JRE_HOME=$JAVA_HOME/jre export PATH=$PATH:$JAVA_HOME/bin export CLASSPATH=./://$JAVA_HOME/lib:$JRE_HOME/lib 再执⾏ source /etc/profile 命令使配置⽣效,最后可以通过 java –version命令验证 JDK 是否已经安装配置成功。如果安装配置成功,则会正确显⽰出 JDK 的版本信息,参考如下: [root@node1 ~]# java -version java version "1.8.0_181" Java(TM) SE Runtime Environment (build 1.8.0_181-b13) Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode) 2. ZooKeeper安装与配置 ZooKeeper 是安装 Kafka 集群的必要组件,Kafka 通过 ZooKeeper 来实施对元数据信息的管理,包括集群、broker、主题、分区等内容。 ZooKeeper 是⼀个开源的分布式协调服务,是 Google Chubby的⼀个开源实现。分布式应⽤程序可以基于 ZooKeeper 实现诸如数据发布/订阅、负载均衡、命名服务、分布式协调/通知、集群管理、Master 选举、配置维护等功能。 在 ZooKeeper 中共有3个⾓⾊:leader、follower 和 observer,同⼀时刻 ZooKeeper 集群中只会有⼀个 leader,其他的都是 follower 和 observer。observer 不参与投票,默认情况下 ZooKeeper 中只有 leader 和 follower 两个⾓⾊。更多相关知识可以查阅 ZooKeeper 官⽅⽹站来获得。 安装 ZooKeeper 的第⼀步也是下载相应的安装包,安装包可以从官⽹中获得,⽰例中使⽤的安装包是 zookeeper-3.4.12.tar.gz,同样将其复制到/opt ⽬录下,然后解压缩,参考如下: [root@node1 opt]# ll zookeeper-3.4.12.tar.gz -rw-r--r-- 1 root root 36667596 Aug 31 15:55 zookeeper-3.4.12.tar.gz [root@node1 opt]# tar zxvf zookeeper-3.4.12.tar.gz # 解压之后当前/opt⽬录下⽣成⼀个名为zookeeper-3.4.12的⽂件夹 [root@node1 opt]# cd zookeeper-3.4.12 [root@node1 zookeeper-3.4.12]# pwd /opt/zookeeper-3.4.12 第⼆步,向/etc/profile 配置⽂件中添加如下内容,并执⾏ source /etc/profile 命令使配置⽣效: export ZOOKEEPER_HOME=/opt/zookeeper-3.4.12 export PATH=$PATH:$ZOOKEEPER_HOME/bin 第三步,修改 ZooKeeper 的配置⽂件。⾸先进⼊$ZOOKEEPER_HOME/conf⽬录,并将zoo_sample.cfg ⽂件修改为 zoo.cfg:
[root@node1 zookeeper-3.4.12]# cd conf [root@node1 conf]# cp zoo_sample.cfg zoo.cfg 然后修改 zoo.cfg 配置⽂件,zoo.cfg ⽂件的内容参考如下: # ZooKeeper服务器⼼跳时间,单位为ms tickTime=2000 # 投票选举新leader的初始化时间 initLimit=10 # leader与follower⼼跳检测最⼤容忍时间,响应超过syncLimit*tickTime,leader认为 # follower“死掉”,从服务器列表中删除follower syncLimit=5 # 数据⽬录 dataDir=/tmp/zookeeper/data # ⽇志⽬录 dataLogDir=/tmp/zookeeper/log # ZooKeeper对外服务端⼝ clientPort=2181 默认情况下,Linux 系统中没有/tmp/zookeeper/data 和/tmp/zookeeper/log 这两个⽬录,所以接下来还要创建这两个⽬录: [root@node1 conf]# mkdir -p /tmp/zookeeper/data [root@node1 conf]# mkdir -p /tmp/zookeeper/log 第四步,在${dataDir}⽬录(也就是/tmp/zookeeper/data)下创建⼀个 myid ⽂件,并写⼊⼀个数值,⽐如0。myid ⽂件⾥存放的是服务器的编号。 第五步,启动 Zookeeper 服务,详情如下: [root@node1 conf]# zkServer.sh start JMX enabled by default Using config: /opt/zookeeper-3.4.6/bin/../conf/zoo.cfg Starting zookeeper ... STARTED 可以通过 zkServer.sh status 命令查看 Zookeeper 服务状态,⽰例如下: [root@node1 ]# zkServer.sh status JMX enabled by default Using config: /opt/zookeeper-3.4.12/bin/../conf/zoo.cfg Mode: Standalone 以上是关于 ZooKeeper 单机模式的安装与配置,⼀般在⽣产环境中使⽤的都是集群模式,集群模式的配置也⽐较简单,相⽐单机模式⽽⾔只需要修改⼀些配置即可。下⾯以3台机器为例来配置⼀个 ZooKeeper 集群。⾸先在这3台机器的 /etc/hosts ⽂件中添加3台集群的IP地址与机器域名的映射,⽰例如下(3个IP地址分别对应3台机器): 192.168.0.2 node1 192.168.0.3 node2 192.168.0.4 node3 然后在这3台机器的 zoo.cfg ⽂件中添加以下配置: server.0=192.168.0.2:2888:3888 server.1=192.168.0.3:2888:3888 server.2=192.168.0.4:2888:3888 为了便于讲解上⾯的配置,这⾥抽象出⼀个公式,即 server.A=B:C:D。其中A是⼀个数字,代表服务器的编号,就是前⾯所说的 myid ⽂件⾥⾯的值。集群中每台服务器的编号都必须唯⼀,所以要保证每 台服务器中的 myid ⽂件中的值不同。B代表服务器的IP地址。C表⽰服务器与集群中的 leader 服务器交换信息的端⼜。D表⽰选举时服务器相互通信的端⼜。如此,集群模式的配置就告⼀段落,可以在这 3台机器上各⾃执⾏ zkServer.sh start 命令来启动服务。 3. Kafka的安装与配置 在安装完 JDK 和 ZooKeeper 之后,就可以执⾏ Kafka broker 的安装了,⾸先也是从官⽹中下载安装包,⽰例中选⽤按照包的是 kafka_2.11-2.0.0.tgz,将其复制⾄/opt ⽬录下并进⾏解压缩,⽰例如下: [root@node1 opt]# ll kafka_2.11-2.0.0.tgz -rw-r--r-- 1 root root 55751827 Jul 31 10:45 kafka_2.11-2.0.0.tgz [root@node1 opt]# tar zxvf kafka_2.11-2.0.0.tgz # 解压之后当前/opt⽬录下⽣成⼀个名为kafka_2.11-2.0.0的⽂件夹 [root@node1 opt]# cd kafka_2.11-2.0.0 [root@node1 kafka_2.11-2.0.0]# # Kafka的根⽬录$KAFKA_HOME即为/opt/kafka_2.11-2.0.0,可以将Kafka_HOME添加到/etc/profile⽂件中,具体做法可以参考前⾯JDK和ZooKeeper的安装示例 接下来需要修改 broker 的配置⽂件 $KAFKA_HOME/conf/server.properties。主要关注以下⼏个配置参数即可: # broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同 broker.id=0 # broker对外提供的服务⼊⼝地址 listeners=PLAINTEXT://localhost:9092 # 存放消息⽇志⽂件的地址 log.dirs=/tmp/kafka-logs # Kafka所需的ZooKeeper集群地址,为了⽅便演示,我们假设Kafka和ZooKeeper都安装在本机 zookeeper.connect=localhost:2181/kafka 如果是单机模式,那么修改完上述配置参数之后就可以启动服务。如果是集群模式,那么只需要对单机模式的配置⽂件做相应的修改即可:确保集群中每个 broker 的 broker.id 配置参数的值不⼀样,以及 listeners 配置参数也需要修改为与 broker 对应的IP地址或域名,之后就可以各⾃启动服务。注意,在启动 Kafka 服务之前同样需要确保 zookeeper.connect 参数所配置的 ZooKeeper 服务已经正确启动。 启动 Kafka 服务的⽅式⽐较简单,在$KAFKA_HOME ⽬录下执⾏下⾯的命令即可: bin/kafka-server-start.sh config/server.properties 如果要在后台运⾏ Kafka 服务,那么可以在启动命令中加⼊ -daemon 参数或&字符,⽰例如下: bin/kafka-server-start.sh –daemon config/server.properties # 或者bin/kafka-server-start.sh config/server.properties & 可以通过 jps 命令查看 Kafka 服务进程是否已经启动,⽰例如下: [root@node1 kafka_2.11-2.0.0]# jps -l 23152 sun.tools.jps.Jps 16052 org.apache.zookeeper.server.quorum.QuorumPeerMain 22807 kafka.Kafka # 这个就是Kafka服务端的进程 jps 命令只是⽤来确认 Kafka 服务的进程已经正常启动。它是否能够正确地对外提供服务,还需要通过发送和消费消息来进⾏验证,验证的过程可以参考下⾯的内容。 ⽣产与消费 由第1节的内容可知,⽣产者将消息发送⾄ Kafka 的主题中,或者更加确切地说应该是主题的分区中,⽽消费者也是通过订阅主题从⽽消费消息的。在演⽰⽣产与消费消息之前,需要创建⼀个主题作为消 息的载体。 Kafka 提供了许多实⽤的脚本⼯具,存放在 $KAFKA_HOME 的 bin ⽬录下,其中与主题有关的就是 kafka-topics.sh 脚本,下⾯我们⽤它演⽰创建⼀个分区数为4、副本因⼦为3的主题 topic-demo,⽰例如 下(Kafka集群模式下,broker数为3): [root@node1 kafka_2.11-2.0.0]# bin/kafka-topics.sh --zookeeper localhost: 2181/kafka --create --topic topic-demo --replication-factor 3 --partitions 4 Created topic "topic-demo". 其中 --zookeeper 指定了 Kafka 所连接的 ZooKeeper 服务地址,--topic 指定了所要创建主题的名称,--replication-factor 指定了副本因⼦,--partitions 指定了分区个数,--create 是创建主题的动作指令。 还可以通过 --describe 展⽰主题的更多具体信息,⽰例如下: [root@node1 kafka_2.11-2.0.0]# bin/kafka-topics.sh --zookeeper localhost: 2181/kafka --describe --topic topic-demo Topic:topic-demo PartitionCount:4 ReplicationFactor:3 Configs: Topic: topic-demo Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0 Topic: topic-demo Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1 Topic: topic-demo Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2 Topic: topic-demo Partition: 3 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1 创建主题 topic-demo 之后我们再来检测⼀下 Kafka 集群是否可以正常地发送和消费消息。$KAFKA_HOME/bin ⽬录下还提供了两个脚本 kafka-console-producer.sh 和 kafka-console- consumer.sh,通过控制 台收发消息。⾸先我们打开⼀个 shell 终端,通过 kafka-console-consumer.sh 脚本来订阅主题 topic-demo,⽰例如下: [root@node1 kafka_2.11-2.0.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo 其中 --bootstrap-server 指定了连接的 Kafka 集群地址,--topic 指定了消费者订阅的主题。⽬前主题 topic-demo 尚未有任何消息存⼊,所以此脚本还不能消费任何消息。 我们再打开⼀个 shell 终端,然后使⽤ kafka-console-producer.sh 脚本发送⼀条消息“Hello, Kafka!”⾄主题 topic-demo,⽰例如下: [root@node1 kafka_2.11-2.0.0]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-demo >Hello, Kafka! >
其中 --broker-list 指定了连接的 Kafka 集群地址,--topic 指定了发送消息时的主题。⽰例中的第⼆⾏是通过⼈⼯键⼊的⽅式输⼊的,按下回车键后会跳到第三⾏,即“>”字符处。此时原先执⾏ kafka- console-consumer.sh 脚本的 shell 终端中出现了刚刚输⼊的消息“Hello, Kafka!”,⽰例如下: [root@node1 kafka_2.11-2.0.0]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-demo Hello, Kafka! 读者也可以通过输⼊⼀些其他⾃定义的消息来熟悉消息的收发及这两个脚本的⽤法。不过这两个脚本⼀般⽤来做⼀些测试类的⼯作,在实际应⽤中,不会只是简单地使⽤这两个脚本来做复杂的与业务逻 辑相关的消息⽣产与消费的⼯作,具体的⼯作还需要通过编程的⼿段来实施。下⾯就以 Kafka ⾃⾝提供的 Java 客户端来演⽰消息的收发,与 Kafka 的 Java 客户端相关的 Maven 依赖如下: org.apache.kafka kafka-clients 2.0.0 要往 Kafka 中写⼊消息,⾸先要创建⼀个⽣产者客户端实例并设置⼀些配置参数,然后构建消息的 ProducerRecord 对象,其中必须包含所要发往的主题及消息的消息体,进⽽再通过⽣产者客户端实例将 消息发出,最后可以通过 close()⽅法来关闭⽣产者客户端实例并回收相应的资源。 具体的⽰例如代码清单2-1 所⽰,与脚本演⽰时⼀样,⽰例中仅发送⼀条内容为“Hello, Kafka!”的消息到主题 topic-demo。 //代码清单2-1 ⽣产者客户端示例代码 import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class ProducerFastStart { public static final String brokerList = "localhost:9092"; public static final String topic = "topic-demo"; public static void main(String[] args) { Properties properties = new Properties(); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("bootstrap.servers", brokerList); KafkaProducer producer = new KafkaProducer<>(properties); ProducerRecord record = new ProducerRecord<>(topic, "hello, Kafka!"); try { producer.send(record); } catch (Exception e) { e.printStackTrace(); } producer.close(); } } 对应的消费消息也⽐较简单,⾸先创建⼀个消费者客户端实例并配置相应的参数,然后订阅主题并消费即可,具体的⽰例代码如代码清单2-2所⽰。 //代码清单2-2 消费者客户端示例代码 import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.time.Duration; import java.util.Arrays; import java.util.Properties; public class ConsumerFastStart { public static final String brokerList = "localhost:9092"; public static final String topic = "topic-demo"; public static final String groupId = "group.demo"; public static void main(String[] args) { Properties properties = new Properties(); properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("bootstrap.servers", brokerList); //设置消费组的名称,具体的释义可以参⻅第3章 properties.put("group.id", groupId); //创建⼀个消费者客户端实例 KafkaConsumer consumer = new KafkaConsumer<>(properties); //订阅主题 consumer.subscribe(Collections.singletonList(topic)); //循环消费消息 while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord record : records) { System.out.println(record.value()); } } } } 通过这些⽰例,相信各位读者对 Kafka 应该有了初步的认识。这仅仅是⼀个开始,要正确、灵活地运⽤好 Kafka 还需要对它进⾏深⼊探索,包括⽣产者和消费者客户端的使⽤细节及原理、服务端的使⽤ 细节及原理、运维、监控等,每⼀个⽅⾯都等着读者去⼀⼀攻破。 服务端参数配置 前⾯的 Kafka 安装与配置的说明中只是简单地表述了⼏个必要的服务端参数⽽没有对其进⾏详细的介绍,并且 Kafka 服务端参数(broker configs)也并⾮只有这⼏个。Kafka 服务端还有很多参数配置,涉 及使⽤、调优的各个⽅⾯,虽然这些参数在⼤多数情况下不需要更改,但了解这些参数,以及在特殊应⽤需求的情况下进⾏有针对性的调优,可以更好地利⽤ Kafka 为我们⼯作。 下⾯挑选⼀些重要的服务端参数来做细致的说明,这些参数都配置在$KAFKA_HOME/config/server.properties ⽂件中。 1. zookeeper.connect 该参数指明 broker 要连接的 ZooKeeper 集群的服务地址(包含端⼜号),没有默认值,且此参数为必填项。可以配置为 localhost:2181,如果 ZooKeeper 集群中有多个节点,则可以⽤逗号将每个节点隔 开,类似于 localhost1:2181,localhost2:2181,localhost3:2181 这种格式。最佳的实践⽅式是再加⼀个 chroot 路径,这样既可以明确指明该 chroot 路径下的节点是为 Kafka 所⽤的,也可以实现多个 Kafka 集群 复⽤⼀套 ZooKeeper 集群,这样可以节省更多的硬件资源。包含 chroot 路径的配置类似于 localhost1:2181,localhost2:2181,localhost3:2181/kafka 这种,如果不指定 chroot,那么默认使⽤ ZooKeeper 的根路 径。 2. listeners 该参数指明 broker 监听客户端连接的地址列表,即为客户端要连接 broker 的⼊⼜地址列表,配置格式为 protocol1://hostname1:port1,protocol2://hostname2:port2,其中 protocol 代表协议类型,Kafka 当前⽀ 持的协议类型有 PLAINTEXT、SSL、SASL_SSL 等,如果未开启安全认证,则使⽤简单的 PLAINTEXT 即可。hostname 代表主机名,port 代表服务端⼜,此参数的默认值为 null。⽐如此参数配置为 PLAINTEXT://198.162.0.2:9092,如果有多个地址,则中间以逗号隔开。如果不指定主机名,则表⽰绑定默认⽹卡,注意有可能会绑定到127.0.0.1,这样⽆法对外提供服务,所以主机名最好不要为空;如 果主机名是0.0.0.0,则表⽰绑定所有的⽹卡。 与此参数关联的还有 advertised.listeners,作⽤和 listeners 类似,默认值也为 null。不过 advertised.listeners 主要⽤于 IaaS(Infrastructure as a Service)环境,⽐如公有云上的机器通常配备有多块⽹卡,即包 含私⽹⽹卡和公⽹⽹卡,对于这种情况⽽⾔,可以设置 advertised.listeners 参数绑定公⽹IP供外部客户端使⽤,⽽配置 listeners 参数来绑定私⽹IP地址供 broker 间通信使⽤。 3. broker.id 该参数⽤来指定 Kafka 集群中 broker 的唯⼀标识,默认值为-1。如果没有设置,那么 Kafka 会⾃动⽣成⼀个。这个参数还和 meta.properties ⽂件及服务端参数 broker.id.generation. enable 和 reserved.broker.max.id 有关,相关深度解析可以参考《图解Kafka之核⼼原理》的相关内容。 4. log.dir和log.dirs Kafka 把所有的消息都保存在磁盘上,⽽这两个参数⽤来配置 Kafka ⽇志⽂件存放的根⽬录。⼀般情况下,log.dir ⽤来配置单个根⽬录,⽽ log.dirs ⽤来配置多个根⽬录(以逗号分隔),但是 Kafka 并没 有对此做强制性限制,也就是说,log.dir 和 log.dirs 都可以⽤来配置单个或多个根⽬录。log.dirs 的优先级⽐ log.dir ⾼,但是如果没有配置 log.dirs,则会以 log.dir 配置为准。默认情况下只配置了 log.dir 参 数,其默认值为 /tmp/kafka-logs。 5. message.max.bytes 该参数⽤来指定 broker 所能接收消息的最⼤值,默认值为1000012(B),约等于976.6KB。如果 Producer 发送的消息⼤于这个参数所设置的值,那么(Producer)就会报出 RecordTooLargeException 的异 常。如果需要修改这个参数,那么还要考虑 max.request.size(客户端参数)、max.message.bytes(topic端参数)等参数的影响。为了避免修改此参数⽽引起级联的影响,建议在修改此参数之前考虑分拆消 息的可⾏性。
还有⼀些服务端参数在本节没有提及,这些参数同样⾮常重要,它们需要⽤单独的章节或者场景来描述,⽐如 unclean.leader.election.enable、log.segment.bytes 等参数都会在后⾯的章节中提及。 总结 通过前⾯这2个章节的内容介绍,相信读者对 Kafka 已经有了初步的了解,接下来我们就可以正式开始研究如何正确、有效地使⽤ Kafka 了。 ⽣产者-客户端开发 从编程的⾓度⽽⾔,⽣产者就是负责向 Kafka 发送消息的应⽤程序。在 Kafka 的历史变迁中,⼀共有两个⼤版本的⽣产者客户端:第⼀个是于 Kafka 开源之初使⽤ Scala 语⾔编写的客户端,我们可以称之 为旧⽣产者客户端(Old Producer)或 Scala 版⽣产者客户端;第⼆个是从 Kafka 0.9.x 版本开始推出的使⽤ Java 语⾔编写的客户端,我们可以称之为新⽣产者客户端(New Producer)或 Java 版⽣产者客户 端,它弥补了旧版客户端中存在的诸多设计缺陷。 虽然 Kafka 是⽤ Java/Scala 语⾔编写的,但这并不妨碍它对于多语⾔的⽀持,在 Kafka 官⽹中,“CLIENTS”的⼊⼜提供了⼀份多语⾔的⽀持列表,其中包括常⽤的 C/C++、Python、Go 等语⾔,不过这些 其他类语⾔的客户端并⾮由 Kafka 社区维护,如果使⽤则需要另⾏下载。本章主要针对现下流⾏的新⽣产者(Java 语⾔编写的)客户端做详细介绍,⽽旧⽣产者客户端已被淘汰,故不再做相应的介绍 了。 客户端开发 ⼀个正常的⽣产逻辑需要具备以下⼏个步骤: 1. 配置⽣产者客户端参数及创建相应的⽣产者实例。 2. 构建待发送的消息。 3. 发送消息。 4. 关闭⽣产者实例。 代码清单2-1中已经简单对⽣产者客户端的编码做了⼀个基本演⽰,本节对其修改以做具体的分析,如代码清单3-1所⽰。 //代码清单3-1 ⽣产者客户端示例代码 public class KafkaProducerAnalysis { public static final String brokerList = "localhost:9092"; public static final String topic = "topic-demo"; public static Properties initConfig(){ Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("client.id", "producer.client.id.demo"); return props; } public static void main(String[] args) { Properties props = initConfig(); KafkaProducer producer = new KafkaProducer<>(props); ProducerRecord record = new ProducerRecord<>(topic, "Hello, Kafka!"); try { producer.send(record); } catch (Exception e) { e.printStackTrace(); } } } 相⽐代码清单2-1⽽⾔,这⾥仅仅是让编码的逻辑显得更加“正统”⼀些,也更加⽅便下⾯内容的陈述。 这⾥有必要单独说明的是构建的消息对象 ProducerRecord,它并不是单纯意义上的消息,它包含了多个属性,原本需要发送的与业务相关的消息体只是其中的⼀个 value 属性,⽐如“Hello, Kafka!”只是 ProducerRecord 对象中的⼀个属性。ProducerRecord 类的定义如下(只截取成员变量): public class ProducerRecord { private final String topic; //主题 private final Integer partition; //分区号 private final Headers headers; //消息头部 private final K key; //键 private final V value; //值 private final Long timestamp; //消息的时间戳 //省略其他成员⽅法和构造⽅法 } 其中 topic 和 partition 字段分别代表消息要发往的主题和分区号。headers 字段是消息的头部,Kafka 0.11.x 版本才引⼊这个属性,它⼤多⽤来设定⼀些与应⽤相关的信息,如⽆需要也可以不⽤设置。key 是⽤来指定消息的键,它不仅是消息的附加信息,还可以⽤来计算分区号进⽽可以让消息发往特定的分区。前⾯提及消息以主题为单位进⾏归类,⽽这个 key 可以让消息再进⾏⼆次归类,同⼀个 key 的 消息会被划分到同⼀个分区中,详情参见第4节中的分区器。 有 key 的消息还可以⽀持⽇志压缩的功能。value 是指消息体,⼀般不为空,如果为空则表⽰特定的消息—墓碑消息。timestamp 是指消息的时间戳,它有 CreateTime 和 LogAppendTime 两种类型,前者表 ⽰消息创建的时间,后者表⽰消息追加到⽇志⽂件的时间。以上这些深⼊原理性的东西都会在《图解Kafka之核⼼原理》中呈现给⼤家。 接下来我们将按照⽣产逻辑的各个步骤来⼀⼀做相应分析。 必要的参数配置 在创建真正的⽣产者实例前需要配置相应的参数,⽐如需要连接的 Kafka 集群地址。参照代码清单3-1中的 initConfig()⽅法,在 Kafka ⽣产者客户端 KafkaProducer 中有3个参数是必填的。 bootstrap.servers:该参数⽤来指定⽣产者客户端连接 Kafka 集群所需的 broker 地址清单,具体的内容格式为 host1:port1,host2:port2,可以设置⼀个或多个地址,中间以逗号隔开,此参数的默认值 为“”。注意这⾥并⾮需要所有的 broker 地址,因为⽣产者会从给定的 broker ⾥查找到其他 broker 的信息。不过建议⾄少要设置两个以上的 broker 地址信息,当其中任意⼀个宕机时,⽣产者仍然可 以连接到 Kafka 集群上。 key.serializer 和 value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在。代码清单3-1中⽣产者使⽤的 KafkaProducer和 ProducerRecord 中的泛型 对应的就是消息中 key 和 value 的类型,⽣产者客户端使⽤这种⽅式可以让代码具有良好的可读性,不过在发往 broker 之前需要将消息中对应的 key 和 value 做相应的序列化操作来转 换成字节数组。key.serializer 和 value.serializer 这两个参数分别⽤来指定 key 和 value 序列化操作的序列化器,这两个参数⽆默认值。注意这⾥必须填写序列化器的全限定名,如代码清单3-1中的 org.apache.kafka.common.serialization.StringSerializer,单单指定 StringSerializer 是错误的,更多有关序列化的内容可以参考第4节。 注意到代码清单3-1中的 initConfig() ⽅法⾥还设置了⼀个参数 client.id,这个参数⽤来设定 KafkaProducer 对应的客户端id,默认值为“”。如果客户端不设置,则 KafkaProducer 会⾃动⽣成⼀个⾮空字符 串,内容形式如“producer-1”、“producer-2”,即字符串“producer-”与数字的拼接。 KafkaProducer 中的参数众多,远⾮⽰例 initConfig()⽅法中的那样只有4个,开发⼈员可以根据业务应⽤的实际需求来修改这些参数的默认值,以达到灵活调配的⽬的。⼀般情况下,普通开发⼈员⽆法记 住所有的参数名称,只能有个⼤致的印象。 在实际使⽤过程中,诸如“key.serializer”、“max.request.size”、“interceptor.classes”之类的字符串经常由于⼈为因素⽽书写错误。为此,我们可以直接使⽤客户端中的 org.apache.kafka.clients.producer.ProducerConfig 类来做⼀定程度上的预防措施,每个参数在 ProducerConfig 类中都有对应的名称,以代码清单3-1中的 initConfig()⽅法为例,引⼊ ProducerConfig 后的修改结 果如下: public static Properties initConfig(){ Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer.client.id.demo"); return props; } 注意到上⾯的代码中 key.serializer 和 value.serializer 参数对应类的全限定名⽐较长,也⽐较容易写错,这⾥通过 Java 中的技巧来做进⼀步的改进,相关代码如下: props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 如此代码便简洁了许多,同时进⼀步降低了⼈为出错的可能性。在配置完参数之后,我们就可以使⽤它来创建⼀个⽣产者实例,⽰例如下: KafkaProducer producer = new KafkaProducer<>(props); KafkaProducer 是线程安全的,可以在多个线程中共享单个 KafkaProducer 实例,也可以将 KafkaProducer 实例进⾏池化来供其他线程调⽤。 KafkaProducer 中有多个构造⽅法,⽐如在创建 KafkaProducer 实例时并没有设定 key.serializer 和 value.serializer 这两个配置参数,那么就需要在构造⽅法中添加对应的序列化器,⽰例如下: KafkaProducer producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); 其内部原理和⽆序列化器的构造⽅法⼀样,不过就实际应⽤⽽⾔,⼀般都选⽤ public KafkaProducer(Properties properties)这个构造⽅法来创建 KafkaProducer 实例。
消息的发送 在创建完⽣产者实例之后,接下来的⼯作就是构建消息,即创建 ProducerRecord 对象。通过代码清单3-1中我们已经了解了 ProducerRecord 的属性结构,其中 topic 属性和 value 属性是必填项,其余属性是 选填项,对应的 ProducerRecord 的构造⽅法也有多种,参考如下: public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable
headers) public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) public ProducerRecord(String topic, Integer partition, K key, V value, Iterable
headers) public ProducerRecord(String topic, Integer partition, K key, V value) public ProducerRecord(String topic, K key, V value) public ProducerRecord(String topic, V value) 代码清单3-1中使⽤的是最后⼀种构造⽅法,也是最简单的⼀种,这种⽅式相当于将 ProducerRecord 中除 topic 和 value 外的属性全部值设置为 null。在实际的应⽤中,还会⽤到其他构造⽅法,⽐如要指定 key,或者添加 headers 等。有可能会遇到这些构造⽅法都不满⾜需求的情况,需要⾃⾏添加更多的构造⽅法,⽐如下⾯的⽰例: public ProducerRecord(String topic, Long timestamp, V value, Iterable
headers) 注意,针对不同的消息,需要构建不同的 ProducerRecord 对象,在实际应⽤中创建 ProducerRecord 对象是⼀个⾮常频繁的动作。 创建⽣产者实例和构建消息之后,就可以开始发送消息了。发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)。 代码清单3-1中的这种发送⽅式就是发后即忘,它只管往 Kafka 中发送消息⽽并不关⼼消息是否正确到达。在⼤多数情况下,这种发送⽅式没有什么问题,不过在某些时候(⽐如发⽣不可重试异常时)会 造成消息的丢失。这种发送⽅式的性能最⾼,可靠性也最差。 KafkaProducer 的 send()⽅法并⾮是 void 类型,⽽是 Future类型,send()⽅法有2个重载⽅法,具体定义如下: public Future send(ProducerRecord record) public Future send(ProducerRecord record, Callback callback) 要实现同步的发送⽅式,可以利⽤返回的 Future 对象实现,⽰例如下: try { producer.send(record).get(); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } 实际上 send() ⽅法本⾝就是异步的,send() ⽅法返回的 Future 对象可以使调⽤⽅稍后获得发送的结果。⽰例中在执⾏ send() ⽅法之后直接链式调⽤了 get() ⽅法来阻塞等待 Kafka 的响应,直到消息发送成 功,或者发⽣异常。如果发⽣异常,那么就需要捕获异常并交由外层逻辑处理。 也可以在执⾏完 send() ⽅法之后不直接调⽤ get() ⽅法,⽐如下⾯的⼀种同步发送⽅式的实现: try { Future future = producer.send(record); RecordMetadata metadata = future.get(); System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset()); } catch (ExecutionException | InterruptedException e) { e.printStackTrace(); } 这样可以获取⼀个 RecordMetadata 对象,在 RecordMetadata 对象⾥包含了消息的⼀些元数据信息,⽐如当前消息的主题、分区号、分区中的偏移量(offset)、时间戳等。如果在应⽤代码中需要这些信 息,则可以使⽤这个⽅式。如果不需要,则直接采⽤ producer.send(record).get() 的⽅式更省事。 Future 表⽰⼀个任务的⽣命周期,并提供了相应的⽅法来判断任务是否已经完成或取消,以及获取任务的结果和取消任务等。既然 KafkaProducer.send() ⽅法的返回值是⼀个 Future 类型的对象,那么完全 可以⽤ Java 语⾔层⾯的技巧来丰富应⽤的实现,⽐如使⽤ Future 中的 get(long timeout, TimeUnit unit) ⽅法实现可超时的阻塞。 KafkaProducer 中⼀般会发⽣两种类型的异常:可重试的异常和不可重试的异常。常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、 NotEnoughReplicasException、NotCoordinatorException 等。⽐如 NetworkException 表⽰⽹络异常,这个有可能是由于⽹络瞬时故障⽽导致的异常,可以通过重试解决;又⽐如 LeaderNotAvailableException 表⽰分区的 leader 副本不可⽤,这个异常通常发⽣在 leader 副本下线⽽新的 leader 副本选举完成之前,重试之后可以重新恢复。不可重试的异常,⽐如第2节中提及的 RecordTooLargeException 异常,暗 ⽰了所发送的消息太⼤,KafkaProducer 对此不会进⾏任何重试,直接抛出异常。 对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内⾃⾏恢复了,就不会抛出异常。retries 参数的默认值为0,配置⽅式参考如下: props.put(ProducerConfig.RETRIES_CONFIG, 10); ⽰例中配置了10次重试。如果重试了10次之后还没有恢复,那么仍会抛出异常,进⽽发送的外层逻辑就要处理这些异常了。 同步发送的⽅式可靠性⾼,要么消息被发送成功,要么发⽣异常。如果发⽣异常,则可以捕获并进⾏相应的处理,⽽不会像“发后即忘”的⽅式直接造成消息的丢失。不过同步发送的⽅式的性能会差很 多,需要阻塞等待⼀条消息发送完之后才能发送下⼀条。 我们再来了解⼀下异步发送的⽅式,⼀般是在 send() ⽅法⾥指定⼀个 Callback 的回调函数,Kafka 在返回响应时调⽤该函数来实现异步的发送确认。 有读者或许会有疑问,send() ⽅法的返回值类型就是 Future,⽽ Future 本⾝就可以⽤作异步的逻辑处理。这样做不是不⾏,只不过 Future ⾥的 get() ⽅法在何时调⽤,以及怎么调⽤都是需要⾯对的问题, 消息不停地发送,那么诸多消息对应的 Future 对象的处理难免会引起代码处理逻辑的混乱。使⽤ Callback 的⽅式⾮常简洁明了,Kafka 有响应时就会回调,要么发送成功,要么抛出异常。 异步发送⽅式的⽰例如下: producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { exception.printStackTrace(); } else { System.out.println(metadata.topic() + "-" + metadata.partition() + ":" + metadata.offset()); } } }); ⽰例代码中遇到异常时(exception!=null)只是做了简单的打印操作,在实际应⽤中应该使⽤更加稳妥的⽅式来处理,⽐如可以将异常记录以便⽇后分析,也可以做⼀定的处理来进⾏消息重发。 onCompletion() ⽅法的两个参数是互斥的,消息发送成功时,metadata 不为 null ⽽ exception 为 null;消息发送异常时,metadata 为 null ⽽ exception 不为 null。 producer.send(record1, callback1); producer.send(record2, callback2); 对于同⼀个分区⽽⾔,如果消息 record1 于 record2 之前先发送(参考上⾯的⽰例代码),那么 KafkaProducer 就可以保证对应的 callback1 在 callback2 之前调⽤,也就是说,回调函数的调⽤也可以保证分 区有序。 通常,⼀个 KafkaProducer 不会只负责发送单条消息,更多的是发送多条消息,在发送完这些消息之后,需要调⽤ KafkaProducer 的 close() ⽅法来回收资源。下⾯的⽰例中发送了100条消息,之后就调⽤ 了 close() ⽅法来回收所占⽤的资源: int i = 0; while (i < 100) { ProducerRecord record = new ProducerRecord<>(topic, "msg"+i++); try { producer.send(record).get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } producer.close(); close() ⽅法会阻塞等待之前所有的发送请求完成后再关闭 KafkaProducer。与此同时,KafkaProducer 还提供了⼀个带超时时间的 close() ⽅法,具体定义如下: public void close(long timeout, TimeUnit timeUnit) 如果调⽤了带超时时间 timeout 的 close() ⽅法,那么只会在等待 timeout 时间内来完成所有尚未完成的请求处理,然后强⾏退出。在实际应⽤中,⼀般使⽤的都是⽆参的 close() ⽅法。 序列化 ⽣产者需要⽤序列化器(Serializer)把对象转换成字节数组才能通过⽹络发送给 Kafka。⽽在对侧,消费者需要⽤反序列化器(Deserializer)把从 Kafka 中收到的字节数组转换成相应的对象。在代码清单 3-1中,为了⽅便,消息的 key 和 value 都使⽤了字符串,对应程序中的序列化器也使⽤了客户端⾃带的 org.apache.kafka.common.serialization.StringSerializer,除了⽤于 String 类型的序列化器,还有 ByteArray、ByteBuffer、Bytes、Double、Integer、Long 这⼏种类型,它们都实现了 org.apache.kafka.common.serialization.Serializer 接⼜,此接⼜有3个⽅法: public void configure(Map configs, boolean isKey) public byte[] serialize(String topic, T data) public void close()
分享到:
收藏