Main 图灵程序设计丛书:大规模数据处理入门与实战(套装全10册 Kafka权威指南 Flink基础教程..

图灵程序设计丛书:大规模数据处理入门与实战(套装全10册 Kafka权威指南 Flink基础教程 数据科学实战 SQL反模式 SQL必知必会(第4版) Spark快速大数据分析 数据科学入门 Python数据挖掘入门与实践 Hadoop安全:大数据平台隐私保护 Hadoop数据分析)

, , , , , , , , , , , , , , , , , ,
5.0 / 5.0
How much do you like this book?
What’s the quality of the file?
Download the book for quality assessment
What’s the quality of the downloaded files?
Year:
2018
Publisher:
人民邮电出版社有限公司
Language:
chinese
ISBN 13:
9787115490063
ISBN:
2444301359
File:
EPUB, 17.65 MB
Download (epub, 17.65 MB)

You may be interested in Powered by Rec2Me

 

Most frequently terms

 
0 comments
 

You can write a book review and share your experiences. Other readers will always be interested in your opinion of the books you've read. Whether you've loved the book or not, if you give your honest and detailed thoughts then people will find new books that are right for them.
1

Algebra for JEE (Advanced)

Language:
english
File:
PDF, 35.85 MB
0 / 0
总目录


Kafka权威指南

Flink基础教程

数据科学实战

SQL反模式

SQL必知必会(第4版)

Spark快速大数据分析

数据科学入门

Python数据挖掘入门与实践

Hadoop安全:大数据平台隐私保护

Hadoop数据分析





版权信息


书名:Kafka权威指南

作者:[美] Neha Narkhede Gwen Shapira Todd Palino

译者:薛命灯

ISBN:978-7-115-47327-1

本书由北京图灵文化发展有限公司发行数字版。版权所有,侵权必究。

091507240605ToBeReplacedWithUserId





版权声明

O'Reilly Media, Inc. 介绍

业界评论

序

前言

读者对象

排版约定

使用代码示例

O'Reilly Safari

联系我们

致谢

电子书

第 1 章 初识 Kafka

1.1 发布与订阅消息系统

1.1.1 如何开始

1.1.2 独立的队列系统

1.2 Kafka登场

1.2.1 消息和批次

1.2.2 模式

1.2.3 主题和分区

1.2.4 生产者和消费者

1.2.5 broker和集群

1.2.6 多集群

1.3 为什么选择Kafka

1.3.1 多个生产者

1.3.2 多个消费者

1.3.3 基于磁盘的数据存储

1.3.4 伸缩性

1.3.5 高性能

1.4 数据生态系统

使用场景

1.5 起源故事

1.5.1 LinkedIn的问题

1.5.2 Kafka的诞生

1.5.3 走向开源

1.5.4 命名

1.6 开始Kafka之旅

第 2 章 安装 Kafka

2.1 要事先行

2.1.1 选择操作系统

2.1.2 安装Java

2.1.3 安装Zookeeper

2.2 安装Kafka Broker

2.3 broker配置

2.3.1 常规配置

2.3.2 主题的默认配置

2.4 硬件的选择

2.4.1 磁盘吞吐量

2.4.2 磁盘容量

2.4.3 内存

2.4.4 网络

2.4.5 CPU

2.5 云端的Kafka

2.6 Kafka集群

2.6.1 需要多少个broker

2.6.2 broker配置

2.6.3 操作系统调优

2.7 生产环境的注意事项

2.7.1 垃圾回收器选项

2.7.2 数据中心布局

2.7.3 共享Zookeeper

2.8 总结

第 3 章 Kafka 生产者——向 Kafka 写入数据

3.1 生产者概览

3.2 创建Kafka生产者

3.3 发送消息到Kafka

3.3.1 同步发送消息

3.3.2 异步发送消息

3.4 生产者的配置

3.5 序列化器

3.5.1 自定义序列化器

3.5.2 使用Avro序列化

3.5.3 在Kafka里使用Avro

3.6 分区

3.7 旧版的生产者API

3.8 总结

第 4 章 Kafka 消费者——从 Kafka 读取数据

4.1 KafkaConsumer概念

4.1.1 消费者和消费者群组

4.1.2 消费者群组和分区再均衡

4.2 创建Kafka消费者

4.3 订阅主题

4.4 轮询

4.5 消费者的配置

4.6 提交和偏移量

4.6.1 自动提交

4.6.2 提交当前偏移量

4.6.3 异步提交

4.6.4 同步和异步组合提交

4.6.5 提交特定的偏移量

4.7 再均衡监听器

4.8 从特定偏移量处开始处理记录

4.9 如何退出

4.10 反序列化器

4.11 独立消费者——为什么以及怎样使用没有群组的消费者

4.12 旧版的消费者API

4.13 总结

第 5 章 深入 Kafka

5.1 集群成员关系

5.2 控制器

5.3 复制

5.4 处理请求

5.4.1 生产请求

5.4.2 获取请求

5.4.3 其他请求

5.5 物理存储

5.5.1 分区分配

5.5.2 文件管理

5.5.3 文件格式

5.5.4 索引

5.5.5 清理

5.5.6 清理的工作原理

5.5.7 被删除的事件

5.5.8 何时会清理主题

5.6 总结

第 6 章 可靠的数据传递

6.1 可靠性保证

6.2 复制

6.3 broker配置

6.3.1 复制系数

6.3.2 不完全的首领选举

6.3.3 最少同步副本

6.4 在可靠的系统里使用生产者

6.4.1 发送确认

6.4.2 配置生产者的重试参数

6.4.3 额外的错误处理

6.5 在可靠的系统里使用消费者

6.5.1 消费者的可靠性配置

6.5.2 显式提交偏移量

6.6 验证系统可靠性

6; .6.1 配置验证

6.6.2 应用程序验证

6.6.3 在生产环境监控可靠性

6.7 总结

第 7 章 构建数据管道

7.1 构建数据管道时需要考虑的问题

7.1.1 及时性

7.1.2 可靠性

7.1.3 高吞吐量和动态吞吐量

7.1.4 数据格式

7.1.5 转换

7.1.6 安全性

7.1.7 故障处理能力

7.1.8 耦合性和灵活性

7.2 如何在Connect API和客户端API之间作出选择

7.3 Kafka Connect

7.3.1 运行Connect

7.3.2 连接器示例——文件数据源和文件数据池

7.3.3 连接器示例——从MySQL到ElasticSearch

7.3.4 深入理解Connect

7.4 Connect之外的选择

7.4.1 用于其他数据存储的摄入框架

7.4.2 基于图形界面的ETL工具

7.4.3 流式处理框架

7.5 总结

第 8 章 跨集群数据镜像

8.1 跨集群镜像的使用场景

8.2 多集群架构

8.2.1 跨数据中心通信的一些现实情况

8.2.2 Hub和Spoke架构

8.2.3 双活架构

8.2.4 主备架构

8.2.5 延展集群

8.3 Kafka的MirrorMaker

8.3.1 如何配置

8.3.2 在生产环境部署MirrorMaker

8.3.3 MirrorMaker调优

8.4 其他跨集群镜像方案

8.4.1 优步的uReplicator

8.4.2 Confluent的Replicator

8.5 总结

第 9 章 管理 Kafka

9.1 主题操作

9.1.1 创建主题

9.1.2 增加分区

9.1.3 删除主题

9.1.4 列出集群里的所有主题

9.1.5 列出主题详细信息

9.2 消费者群组

9.2.1 列出并描述群组

9.2.2 删除群组

9.2.3 偏移量管理

9.3 动态配置变更

9.3.1 覆盖主题的默认配置

9.3.2 覆盖客户端的默认配置

9.3.3 列出被覆盖的配置

9.3.4 移除被覆盖的配置

9.4 分区管理

9.4.1 首选的首领选举

9.4.2 修改分区副本

9.4.3 修改复制系数

9.4.4 转储日志片段

9.4.5 副本验证

9.5 消费和生产

9.5.1 控制台消费者

9.5.2 控制台生产者

9.6 客户端ACL

9.7 不安全的操作

9.7.1 移动集群控制器

9.7.2 取消分区重分配

9.7.3 移除待删除的主题

9.7.4 手动删除主题

9.8 总结

第 10 章 监控 Kafka

10.1 度量指标基础

10.1.1 度量指标在哪里

10.1.2 内部或外部度量

10.1.3 应用程序健康检测

10.1.4 度量指标的覆盖面

10.2 broker的度量指标

10.2.1 非同步分区

10.2.2 broker度量指标

10.2.3 主题和分区的度量指标

10.2.4 Java虚拟机监控

10.2.5 操作系统监控

10.2.6 日志

10.3 客户端监控

10.3.1 生产者度量指标

10.3.2 消费者度量指标

10.3.3 配额

10.4 延时监控

10.5 端到端监控

10.6 总结

第 11 章 流式处理

11.1 什么是流式处理

11.2 流式处理的一些概念

11.2.1 时间

11.2.2 状态

11.2.3 流和表的二元性

11.2.4 时间窗口

11.3 流式处理的设计模式

11.3.1 单个事件处理

11.3.2 使用本地状态

11.3.3 多阶段处理和重分区

11.3.4 使用外部查找——流和表的连接

11.3.5 流与流的连接

11.3.6 乱序的事件

11.3.7 重新处理

11.4 Streams示例

11.4.1 字数统计

11.4.2 股票市场统计

11.4.3 填充点击事件流

11.5 Kafka Streams的架构概览

11.5.1 构建拓扑

11.5.2 对拓扑进行伸缩

11.5.3 从故障中存活下来

11.6 流式处理使用场景

11.7 如何选择流式处理框架

11.8 总结

附录 A 在其他操作系统上安装 Kafka

A.1 在Windows上安装Kafka

A.1.1 使用Windows的Linux子系统

A.1.2 使用本地Java

A.2 在MacOS上安装Kafka

A.2.1 使用Homebrew

A.2.2 手动安装

作者介绍

封面介绍





版权声明


© 2017 by Neha Narkhede, Gwen Shapira, Todd Palino.

Simplified Chinese Edition, jointly published by O'Reilly Media, Inc.and Posts & Telecom Press, 2018. Authorized translation of the English edition, 2017 O'Reilly Media, Inc., the owner of all rights to publish and sell the same.

All rights reserved including the rights of reproduction in whole or in part in any form.



英文原版由 O'Reilly Media, Inc. 出版,2017。

简体中文版由人民邮电出版社出版,2018。英文原版的翻译得到 O'Reilly Media, Inc. 的授权。此简体中文版的出版和销售得到出版权和销售权的所有者—— O'Reilly Media,Inc. 的许可。

版权所有,未得书面许可,本书的任何部分和全部不得以任何形式重制。





O'Reilly Media, Inc. 介绍


O'Reilly Media 通过图书、杂志、在线服务、调查研究和会议等方式传播创新知识。自 1978 年开始,O'Reilly 一直都是前沿发展的见证者和推动者。超级极客们正在开创着未来,而我们关注真正重要的技术趋势——通过放大那些“细微的信号”来刺激社会对新科技的应用。作为技术社区中活跃的参与者,O'Reilly 的发展充满了对创新的倡导、创造和发扬光大。

O'Reilly 为软件开发人员带来革命性的“动物书”;创建第一个商业网站(GNN);组织了影响深远的开放源代码峰会,以至于开源软件运动以此命名;创立了 Make 杂志,从而成为 DIY 革命的主要先锋;公司一如既往地通过多种形式缔结信息与人的纽带。 O'Reilly 的会议和峰会集聚了众多超级极客和高瞻远瞩的商业领袖,共同描绘出开创新产业的革命性思想。作为技术人士获取信息的选择,O'Reilly 现在还将先锋专家的知识传递给普通的计算机用户。无论是通过图书出版、在线服务或者面授课程,每一项 O'Reilly 的产品都反映了公司不可动摇的理念——信息是激发创新的力量。





业界评论


“O'Reilly Radar 博客有口皆碑。”

——Wired

“O'Reilly 凭借一系列(真希望当初我也想到了)非凡想法建立了数百万美元的业务。”

——Business 2.0

“O'Reilly Conference 是聚集关键思想领袖的绝对典范。”

——CRN

“一本 O'Reilly 的书就代表一个有用、有前途、需要学习的主题。”

——Irish Times

“Tim 是位特立独行的商人,他不光放眼于最长远、最广阔的视野,并且切实地按照 Yogi Berra 的建议去做了:‘如果你在路上遇到岔路口,走小路(岔路)。’回顾过去,Tim 似乎每一次都选择了小路,而且有几次都是一闪即逝的机会,尽管大路也不错。”

——Linux Journal





序


这是一个激动人心的时刻,成千上万的企业在使用 Kafka,三分之一多的世界 500 强公司也在其中。Kafka 是成长最快的开源项目之一,它的生态系统也在蓬勃发展。Kafka 正在成为管理和处理流式数据的利器。

Kafka 从何而来?我们为什么要开发 Kafka ? Kafka 到底是什么?

Kafka 最初是 LinkedIn 的一个内部基础设施系统。我们发现,虽然有很多数据库和系统可以用来存储数据,但在我们的架构里,刚好缺一个可以帮助处理持续数据流的组件。在开发 Kafka 之前,我们实验了各种现成的解决方案,从消息系统到日志聚合系统,再到 ETL 工具,它们都无法满足我们的需求。

最后,我们决定从头开发一个系统。我们不想只是开发一个能够存储数据的系统,比如传统的关系型数据库、键值存储引擎、搜索引擎或缓存系统,我们希望能够把数据看成是持续变化和不断增长的流,并基于这样的想法构建出一个数据系统;事实上,是一个数据架构。

这个想法实现后比我们最初预想的适用性更广。Kafka 一开始被用在社交网络的实时应用和数据流当中,而现在已经成为下一代数据架构的基础。大型零售商正在基于持续数据流改造他们的基础业务流程,汽车公司正在从互联网汽车那里收集和处理实时数据流,银行也在重新思考基于 Kafka 改造他们的基础流程和系统。

那么 Kafka 在这当中充当了怎样的角色?它与现有的系统有什么区别?

我们认为 Kafka 是一个流平台:在这个平台上可以发布和订阅数据流,并把它们保存起来、进行处理,这就是构建 Kafka 的初衷。以这种方式来看待数据确实与人们习惯的想法有所不同,但它确实在构建应用和架构方面表现出了强大的抽象能力。Kafka 经常会被拿来与现有的技术作比较:企业级消息系统、大数据系统(如 Hadoop)和数据集成或 ETL 工具。这里的每一项比较都有一定的道理,但也有失偏颇。

Kafka 有点像消息系统,允许发布和订阅消息流。从这点来看,它类似于 ActiveMQ、 RabbitMQ 或 IBM 的 MQSeries 等产品。尽管看上去有些相似,但 Kafka 与这些传统的消息系统仍然存在很多重要的不同点,这些差异使它完全不同于消息系统。首先,作为一个现代的分布式系统,Kafka 以集群的方式运行,可以自由伸缩,处理公司的所有应用程序。Kafka 集群并不是一组独立运行的 broker,而是一个可以灵活伸缩的中心平台,可以处理整个公司所有的数据流。其次,Kafka 可以按照你的要求存储数据,保存多久都可以。作为数据连接层,Kafka 提供了数据传递保证——可复制、持久化,保留多长时间完全可以由你来决定。最后,流式处理将数据处理的层次提升到了新高度。消息系统只会传递消息,而 Kafka 的流式处理能力让你只用很少的代码就能够动态地处理派生流和数据集。 Kafka 的这些独到之处足以让你刮目相看,它不只是“另一个消息队列”。

从另一个角度来看 Kafka,我们会把它看成实时版的 Hadoop——这也是我们设计和构建 Kafka 的原始动机之一。Hadoop 可以存储和定期处理大量的数据文件,而 Kafka 可以存储和持续处理大型的数据流。从技术角度来看,它们有着惊人的相似之处,很多人将新兴的流式处理看成批处理的超集。它们之间的最大不同体现在持续的低延迟处理和批处理之间的差异上。Hadoop 和大数据主要应用在数据分析上,而 Kafka 因其低延迟的特点更适合用在核心的业务应用上。业务事件时刻在发生,Kafka 能够及时对这些事件作出响应,基于 Kafka 构建的服务直接为业务运营提供支撑,提升用户体验。

Kafka 与 ETL 工具或其他数据集成工具之间也可以进行一番比较。Kafka 和这些工具都擅长移动数据,但我想它们最大的不同在于 Kafka 颠覆了传统的思维。Kafka 并非只是把数据从一个系统拆解出来再塞进另一个系统,它其实是一个面向实时数据流的平台。也就是说,它不仅可以将现有的应用程序和数据系统连接起来,它还能用于加强这些触发相同数据流的应用。我们认为这种以数据流为中心的架构是非常重要的。在某种程度上说,这些数据流是现代数字科技公司的核心,与他们的现金流一样重要。

将上述的三个领域聚合在一起,将所有的数据流整合到一起,流平台因此变得极具吸引力。

当然,除了这些不同点之外,对于那些习惯了开发请求与响应风格应用和关系型数据库的人来说,要学会基于持续数据流构建应用程序也着实是一个巨大的思维转变。借助这本书来学习 Kafka 再好不过了,从内部架构到 API,都是由对 Kafka 最了解的人亲手呈现的。我希望你们能够像我一样喜欢这本书!

—— Jay Kreps,Confluent 联合创始人兼 CEO





前言


给予一个技术书籍作者最好的赞赏莫过于这句话——“如果在一开始接触这门技术时能看到这本书就好了”。在开始写这本书的时候,我们就是以这句话作为写作目标。我们开发 Kafka,在生产环境运行 Kafka,帮助很多公司构建基于 Kafka 的系统,帮助他们管理数据管道,积累了很多经验,但也困惑:“应该把哪些东西分享给 Kafka 新用户,让他们从新手变成专家?”这本书就是我们日常工作最好的写照:运行 Kafka 并帮助其他人更好地使用 Kafka。

我们相信,书中提供的这些内容能够帮助 Kafka 用户在生产环境运行 Kafka 以及基于 Kafka 构建健壮的高性能应用程序。我们列举了一些非常流行的应用场景:用于事件驱动微服务系统的消息总线、流式应用和大规模数据管道。这本书通俗易懂,能够帮助每一个 Kafka 用户在任意的架构或应用场景里使用好 Kafka。书中介绍了如何安装和配置 Kafka、如何使用 Kafka API、Kafka 的设计原则和可靠性保证,以及 Kafka 的一些架构细节,如复制协议、控制器和存储层。我们相信,Kafka 的设计原理和内部架构不仅会成为分布式系统构建者的兴趣所在,对于那些在生产环境部署 Kafka 或使用 Kafka 构建应用程序的人来说也是非常有用的。越是了解 Kafka,就越是能够更好地作出权衡。

在软件工程里,条条道路通罗马,每一个问题都有多种解决方案。Kafka 为专家级别的用户提供了巨大的灵活性,而新手则需要克服陡峭的学习曲线才能成为专家。Kafka 通常会告诉你如何使用某个功能特性,但不会告诉你为什么要用它或者为什么不该用它。我们会尽可能地解释我们的设计决策和权衡背后的缘由,以及用户在哪些情况下应该或不应该使用 Kafka 提供的特性。





读者对象


这本书是为使用 Kafka API 开发应用程序的工程师和在生产环境安装、配置、调优、监控 Kafka 的运维工程师(也可以叫作 SRE、运维人员或系统管理员)而写的。我们也考虑到了数据架构师和数据工程师,他们负责设计和构建整个组织的数据基础架构。某些章节(特别是第 3 章、第 4 章和第 11 章)主要面向 Java 开发人员,并假设读者已经熟悉基本的 Java 语言编程,比如异常处理和并发编程。其他章节(特别是第 2 章、第 8 章、第 9 章和第 10 章)则假设读者在 Linux 的运行、存储和网络配置方面有一定的经验。本书的其余部分则讨论了一般性的软件架构,不要求读者具备特定的知识。

另一类可能对本书感兴趣的人是那些经理或架构师,他们不直接使用 Kafka,但会与使用 Kafka 的工程师打交道。他们有必要了解 Kafka 所能提供的保证机制,以及他们的同事在构建基于 Kafka 的系统时所作出的权衡。这本书可以成为企业管理人员的利器,确保他们的工程师在 Kafka 方面训练有素,让他们的团队了解他们本该知道的知识。





排版约定


本书使用了下列排版约定。

黑体

表示新术语或重点强调的内容。



等宽字体(constant width)

表示程序片段,以及正文中出现的变量、函数名、数据库、数据类型、环境变量、语句和关键字等。



加粗等宽字体(constant width bold)

表示应该由用户输入的命令或其他文本。



等宽斜体(constant width italic)

表示应该由用户输入的值或根据上下文确定的值替换的文本。





 该图标表示提示或建议。



 该图标表示一般注记。



 该图标表示警告或警示。





使用代码示例


本书是要帮你完成工作的。一般来说,如果本书提供了示例代码,你可以把它用在你的程序或文档中。除非你使用了很大一部分代码,否则无需联系我们获得许可。比如,用本书的几个代码片段写一个程序就无需获得许可,销售或分发 O'Reilly 图书的示例光盘则需要获得许可;引用本书中的示例代码回答问题无需获得许可,将书中大量的代码放到你的产品文档中则需要获得许可。

我们很希望但并不强制要求你在引用本书内容时加上引用说明。引用说明一般包括书名、作者、出版社和 ISBN。例如“Kafka 权威指南,作者 Neha Narkhede、Gwen Shapira 和 Todd Palino(O'Reilly),版权归 Neha Narkhede、Gwen Shapira 和 Todd Palino 所有,978-1-4919-3616-0”。

如果你觉得自己对示例代码的用法超出了上述许可的范围,欢迎你通过 permissions@ oreilly.com 与我们联系。





O'Reilly Safari




Safari(原来叫 Safari Books Online)是面向企业、政府、教育从业者和个人的会员制培训和参考咨询平台。

我们向会员开放成千上万本图书以及培训视频、学习路线、交互式教程和专业视频。这些资源来自 250 多家出版机构,其中包括 O'Reilly Media、Harvard Business Review、Prentice Hall Professional、Addison-Wesley Professional、Microsoft Press、Sams、Que、Peachpit Press、Adobe、Focal Press、Cisco Press、John Wiley & Sons、Syngress、Morgan Kaufmann、IBM Redbooks、Packt、Adobe Press、FT Press、Apress、Manning、New Riders、McGraw-Hill、Jones & Bartlett 和 Course Technology。

更多信息,请访问 http://oreilly.com/safari。





联系我们


请把对本书的评价和问题发给出版社。

美国:

  O'Reilly Media, Inc.

  1005 Gravenstein Highway North

  Sebastopol, CA 95472

中国:

  北京市西城区西直门南大街 2 号成铭大厦 C 座 807 室(100035)

  奥莱利技术咨询(北京)有限公司

O'Reilly 的每一本书都有专属网页,你可以在那儿找到本书的相关信息,包括勘误表、示例代码以及其他信息。本书的网站地址是 http://oreil.ly/2tVmYjk。

对于本书的评论和技术性问题,请发送电子邮件到:bookquestions@oreilly.com

要了解更多 O'Reilly 图书、培训课程、会议和新闻的信息,请访问以下网站:

  http://www.oreilly.com

我们在 Facebook 的地址如下:http://facebook.com/oreilly

请关注我们的 Twitter 动态:http://twitter.com/oreillymedia

我们的 YouTube 视频地址如下:http://www.youtube.com/oreillymedia





致谢


我们想感谢众多为 Kafka 和它的生态系统做出贡献的人。如果没有他们艰辛的工作,就不会有这本书的问世。特别感谢 Jay Kreps、Neha Narkhede 和 Jun Rao,以及他们在 LinkedIn 的同事和领导,他们创造了 Kafka,并把它捐献给了 Apache 软件基金会。

很多人在早前为本书提供了很多有价值的反馈,我们非常感激他们为此付出的时间,也很钦佩他们的专业能力,这些人包括:Apurva Mehta、Arseniy Tashoyan、Dylan Scott、Ewen Cheslack-Postava、Grant Henke、Ismael Juma、James Cheng、Jason Gustafson、Jeff Holoman、Joel Koshy、Jonathan Seidman、Matthias Sax、Michael Noll、Paolo Castagna。我们还想感谢众多在网站上留下评论和反馈的读者。

很多审稿人提供了有价值的意见,极大改进了本书的质量。书中的遗留错误理应由我们作者负责。

我们要感谢 O'Reilly 编辑 Shannon Cutt 的鼓励、耐心和深谋远虑。对于一个作者来说,与 O'Reilly 一起合作是一段非凡的经历——他们所提供的支持,从工具到签名售书,都是无可匹敌的。我们感谢每一个参与本书相关工作的人,很感激他们愿意与我们一起工作。

另外,我们也想感谢我们的领导和同事,感谢他们在我们写作这本书的过程中给予的帮助和鼓励。

Gwen 要感谢她的丈夫 Omer Shapira,在她写书的几个月时间里,他一直给予她支持和耐心。还有她的父亲 Lior Shapira,让她学会了如何在困难面前不轻言放弃,尽管这种生活哲学总是让她麻烦不断。

Todd 要感谢他的妻子 Marcy 和女儿 Bella 及 Kaylee,她们一直在背后默默地支持他。因为有了她们的支持,他才有更多的时间写作,才能厘清思路,坚持到最后。





电子书


扫描如下二维码,即可购买本书电子版。





第 1 章 初识 Kafka


数据为企业的发展提供动力。我们从数据中获取信息,对它们进行分析处理,然后生成更多的数据。每个应用程序都会产生数据,包括日志消息、度量指标、用户活动记录、响应消息等。数据的点点滴滴都在暗示一些重要的事情,比如下一步行动的方向。我们把数据从源头移动到可以对它们进行分析处理的地方,然后把得到的结果应用到实际场景中,这样才能够确切地知道这些数据要告诉我们什么。例如,我们每天在 Amazon 网站上浏览感兴趣的商品,浏览信息被转化成商品推荐,并在稍后展示给我们。

这个过程完成得越快,组织的反应就越敏捷。花费越少的精力在数据移动上,就越能专注于核心业务。这就是为什么在一个以数据为驱动的企业里,数据管道会成为关键性组件。如何移动数据,几乎变得与数据本身一样重要。

每一次科学家们发生分歧,都是因为掌握的数据不够充分。所以我们可以先就获取哪一类数据达成一致。只要获取了数据,问题也就迎刃而解了。要么我是对的,要么你是对的,要么我们都是错的。然后我们继续研究。

——Neil deGrasse Tyson





1.1 发布与订阅消息系统


在正式讨论 Apache Kafka(以下简称 Kafka)之前,先来了解发布与订阅消息系统的概念,并认识这个系统的重要性。数据(消息)的发送者(发布者)不会直接把消息发送给接收者,这是发布与订阅消息系统的一个特点。发布者以某种方式对消息进行分类,接收者(订阅者)订阅它们,以便接收特定类型的消息。发布与订阅系统一般会有一个 broker,也就是发布消息的中心点。





1.1.1 如何开始


发布与订阅消息系统的大部分应用场景都是从一个简单的消息队列或一个进程间通道开始的。例如,你的应用程序需要往别处发送监控信息,可以直接在你的应用程序和另一个可以在仪表盘上显示度量指标的应用程序之间建立连接,然后通过这个连接推送度量指标,如图 1-1 所示。



图 1-1:单个直连的度量指标发布者

这是刚接触监控系统时简单问题的应对方案。过了不久,你需要分析更长时间片段的度量指标,而此时的仪表盘程序满足不了需求,于是,你启动了一个新的服务来接收度量指标。该服务把度量指标保存起来,然后进行分析。与此同时,你修改了原来的应用程序,把度量指标同时发送到两个仪表盘系统上。现在,你又多了 3 个可以生成度量指标的应用程序,它们都与这两个服务直接相连。而你的同事认为最好可以对这些服务进行轮询以便获得告警功能,于是你为每一个应用程序增加了一个服务器,用于提供度量指标。再过一阵子,有更多的应用程序出于各自的目的,都从这些服务器获取度量指标。这时的架构看起来就像图 1-2 所示的那样,节点间的连接一团糟。



图 1-2:多个直连的度量指标发布者

这时,技术债务开始凸显出来,于是你决定偿还掉一些。你创建了一个独立的应用程序,用于接收来自其他应用程序的度量指标,并为其他系统提供了一个查询服务器。这样,之前架构的复杂度被降低到图 1-3 所示的那样。那么恭喜你,你已经创建了一个基于发布与订阅的消息系统。



图 1-3:度量指标发布与订阅系统





1.1.2 独立的队列系统


在你跟度量指标打得不可开交的时候,你的一个同事也正在跟日志消息奋战。还有另一个同事正在跟踪网站用户的行为,为负责机器学习开发的同事提供信息,同时为管理团队生成报告。你和同事们使用相同的方式创建这些系统,解耦信息的发布者和订阅者。图 1-4 所示的架构包含了 3 个独立的发布与订阅系统。



图 1-4:多个发布与订阅系统

这种方式比直接使用点对点的连接(图 1-2)要好得多,但这里有太多重复的地方。你的公司因此要为数据队列维护多个系统,每个系统又有各自的缺陷和不足。而且,接下来可能会有更多的场景需要用到消息系统。此时,你真正需要的是一个单一的集中式系统,它可以用来发布通用类型的数据,其规模可以随着公司业务的增长而增长。





1.2 Kafka登场


Kafka 就是为了解决上述问题而设计的一款基于发布与订阅的消息系统。它一般被称为“分布式提交日志”或者“分布式流平台”。文件系统或数据库提交日志用来提供所有事务的持久记录,通过重放这些日志可以重建系统的状态。同样地,Kafka 的数据是按照一定顺序持久化保存的,可以按需读取。此外,Kafka 的数据分布在整个系统里,具备数据故障保护和性能伸缩能力。





1.2.1 消息和批次


Kafka 的数据单元被称为消息。如果你在使用 Kafka 之前已经有数据库使用经验,那么可以把消息看成是数据库里的一个“数据行”或一条“记录”。消息由字节数组组成,所以对于 Kafka 来说,消息里的数据没有特别的格式或含义。消息可以有一个可选的元数据,也就是键。键也是一个字节数组,与消息一样,对于 Kafka 来说也没有特殊的含义。当消息以一种可控的方式写入不同的分区时,会用到键。最简单的例子就是为键生成一个一致性散列值,然后使用散列值对主题分区数进行取模,为消息选取分区。这样可以保证具有相同键的消息总是被写到相同的分区上。第 3 章将详细介绍键的用法。

为了提高效率,消息被分批次写入 Kafka。批次就是一组消息,这些消息属于同一个主题和分区。如果每一个消息都单独穿行于网络,会导致大量的网络开销,把消息分成批次传输可以减少网络开销。不过,这要在时间延迟和吞吐量之间作出权衡:批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。批次数据会被压缩,这样可以提升数据的传输和存储能力,但要做更多的计算处理。





1.2.2 模式


对于 Kafka 来说,消息不过是晦涩难懂的字节数组,所以有人建议用一些额外的结构来定义消息内容,让它们更易于理解。根据应用程序的需求,消息模式(schema)有许多可用的选项。像 JSON 和 XML 这些简单的系统,不仅易用,而且可读性好。不过,它们缺乏强类型处理能力,不同版本之间的兼容性也不是很好。Kafka 的许多开发者喜欢使用 Apache Avro,它最初是为 Hadoop 开发的一款序列化框架。Avro 提供了一种紧凑的序列化格式,模式和消息体是分开的,当模式发生变化时,不需要重新生成代码;它还支持强类型和模式进化,其版本既向前兼容,也向后兼容。

数据格式的一致性对于 Kafka 来说很重要,它消除了消息读写操作之间的耦合性。如果读写操作紧密地耦合在一起,消息订阅者需要升级应用程序才能同时处理新旧两种数据格式。在消息订阅者升级了之后,消息发布者才能跟着升级,以便使用新的数据格式。新的应用程序如果需要使用数据,就要与消息发布者发生耦合,导致开发者需要做很多繁杂的工作。定义良好的模式,并把它们存放在公共仓库,可以方便我们理解 Kafka 的消息结构。第 3 章将详细讨论模式和序列化。





1.2.3 主题和分区


Kafka 的消息通过主题进行分类。主题就好比数据库的表,或者文件系统里的文件夹。主题可以被分为若干个分区,一个分区就是一个提交日志。消息以追加的方式写入分区,然后以先入先出的顺序读取。要注意,由于一个主题一般包含几个分区,因此无法在整个主题范围内保证消息的顺序,但可以保证消息在单个分区内的顺序。图 1-5 所示的主题有 4 个分区,消息被追加写入每个分区的尾部。Kafka 通过分区来实现数据冗余和伸缩性。分区可以分布在不同的服务器上,也就是说,一个主题可以横跨多个服务器,以此来提供比单个服务器更强大的性能。



图 1-5:包含多个分区的主题表示

我们通常会使用流这个词来描述 Kafka 这类系统的数据。很多时候,人们把一个主题的数据看成一个流,不管它有多少个分区。流是一组从生产者移动到消费者的数据。当我们讨论流式处理时,一般都是这样描述消息的。Kafka Streams、Apache Samza 和 Storm 这些框架以实时的方式处理消息,也就是所谓的流式处理。我们可以将流式处理与离线处理进行比较,比如 Hadoop 就是被设计用于在稍后某个时刻处理大量的数据。第 11 章将会介绍流式处理。





1.2.4 生产者和消费者


Kafka 的客户端就是 Kafka 系统的用户,它们被分为两种基本类型:生产者和消费者。除此之外,还有其他高级客户端 API——用于数据集成的 Kafka Connect API 和用于流式处理的 Kafka Streams。这些高级客户端 API 使用生产者和消费者作为内部组件,提供了高级的功能。

生产者创建消息。在其他发布与订阅系统中,生产者可能被称为发布者或写入者。一般情况下,一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。这通常是通过消息键和分区器来实现的,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器,根据不同的业务规则将消息映射到分区。第 3 章将详细介绍生产者。

消费者读取消息。在其他发布与订阅系统中,消费者可能被称为订阅者或读者。消费者订阅一个或多个主题,并按照消息生成的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。偏移量是另一种元数据,它是一个不断递增的整数值,在创建消息时,Kafka 会把它添加到消息里。在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的消息偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或重启,它的读取状态不会丢失。

消费者是消费者群组的一部分,也就是说,会有一个或多个消费者共同读取一个主题。群组保证每个分区只能被一个消费者使用。图 1-6 所示的群组中,有 3 个消费者同时读取一个主题。其中的两个消费者各自读取一个分区,另外一个消费者读取其他两个分区。消费者与分区之间的映射通常被称为消费者对分区的所有权关系。

通过这种方式,消费者可以消费包含大量消息的主题。而且,如果一个消费者失效,群组里的其他消费者可以接管失效消费者的工作。第 4 章将详细介绍消费者和消费者群组。



图 1-6:消费者群组从主题读取消息





1.2.5 broker和集群


一个独立的 Kafka 服务器被称为 broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出响应,返回已经提交到磁盘上的消息。根据特定的硬件及其性能特征,单个 broker 可以轻松处理数千个分区以及每秒百万级的消息量。

broker 是集群的组成部分。每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。控制器负责管理工作,包括将分区分配给 broker 和监控 broker。在集群中,一个分区从属于一个 broker,该 broker 被称为分区的首领。一个分区可以分配给多个 broker,这个时候会发生分区复制(见图 1-7)。这种复制机制为分区提供了消息冗余,如果有一个 broker 失效,其他 broker 可以接管领导权。不过,相关的消费者和生产者都要重新连接到新的首领。第 6 章将详细介绍集群的操作,包括分区复制。



图 1-7:集群里的分区复制

保留消息(在一定期限内)是 Kafka 的一个重要特性。Kafka broker 默认的消息保留策略是这样的:要么保留一段时间(比如 7 天),要么保留到消息达到一定大小的字节数(比如 1GB)。当消息数量达到这些上限时,旧消息就会过期并被删除,所以在任何时刻,可用消息的总量都不会超过配置参数所指定的大小。主题可以配置自己的保留策略,可以将消息保留到不再使用它们为止。例如,用于跟踪用户活动的数据可能需要保留几天,而应用程序的度量指标可能只需要保留几个小时。可以通过配置把主题当作紧凑型日志,只有最后一个带有特定键的消息会被保留下来。这种情况对于变更日志类型的数据来说比较适用,因为人们只关心最后时刻发生的那个变更。





1.2.6 多集群


随着 Kafka 部署数量的增加,基于以下几点原因,最好使用多个集群。

数据类型分离

安全需求隔离

多数据中心(灾难恢复)



如果使用多个数据中心,就需要在它们之间复制消息。这样,在线应用程序才可以访问到多个站点的用户活动信息。例如,如果一个用户修改了他们的资料信息,不管从哪个数据中心都应该能看到这些改动。或者多个站点的监控数据可以被聚集到一个部署了分析程序和告警系统的中心位置。不过,Kafka 的消息复制机制只能在单个集群里进行,不能在多个集群之间进行。

Kafka 提供了一个叫作 MirrorMaker 的工具,可以用它来实现集群间的消息复制。MirrorMaker 的核心组件包含了一个生产者和一个消费者,两者之间通过一个队列相连。

消费者从一个集群读取消息,生产者把消息发送到另一个集群上。图 1-8 展示了一个使用 MirrorMaker 的例子,两个“本地”集群的消息被聚集到一个“聚合”集群上,然后将该集群复制到其他数据中心。不过,这种方式在创建复杂的数据管道方面显得有点力不从心。第 7 章将详细讨论这些案例。



图 1-8:多数据中心架构





1.3 为什么选择Kafka


基于发布与订阅的消息系统那么多,为什么 Kafka 会是一个更好的选择呢?





1.3.1 多个生产者


Kafka 可以无缝地支持多个生产者,不管客户端在使用单个主题还是多个主题。所以它很适合用来从多个前端系统收集数据,并以统一的格式对外提供数据。例如,一个包含了多个微服务的网站,可以为页面视图创建一个单独的主题,所有服务都以相同的消息格式向该主题写入数据。消费者应用程序会获得统一的页面视图,而无需协调来自不同生产者的数据流。





1.3.2 多个消费者


除了支持多个生产者外,Kafka 也支持多个消费者从一个单独的消息流上读取数据,而且消费者之间互不影响。这与其他队列系统不同,其他队列系统的消息一旦被一个客户端读取,其他客户端就无法再读取它。另外,多个消费者可以组成一个群组,它们共享一个消息流,并保证整个群组对每个给定的消息只处理一次。





1.3.3 基于磁盘的数据存储


Kafka 不仅支持多个消费者,还允许消费者非实时地读取消息,这要归功于 Kafka 的数据保留特性。消息被提交到磁盘,根据设置的保留规则进行保存。每个主题可以设置单独的保留规则,以便满足不同消费者的需求,各个主题可以保留不同数量的消息。消费者可能会因为处理速度慢或突发的流量高峰导致无法及时读取消息,而持久化数据可以保证数据不会丢失。消费者可以在进行应用程序维护时离线一小段时间,而无需担心消息丢失或堵塞在生产者端。消费者可以被关闭,但消息会继续保留在 Kafka 里。消费者可以从上次中断的地方继续处理消息。





1.3.4 伸缩性


为了能够轻松处理大量数据,Kafka 从一开始就被设计成一个具有灵活伸缩性的系统。用户在开发阶段可以先使用单个 broker,再扩展到包含 3 个 broker 的小型开发集群,然后随着数据量不断增长,部署到生产环境的集群可能包含上百个 broker。对在线集群进行扩展丝毫不影响整体系统的可用性。也就是说,一个包含多个 broker 的集群,即使个别 broker 失效,仍然可以持续地为客户提供服务。要提高集群的容错能力,需要配置较高的复制系数。第 6 章将讨论关于复制的更多细节。





1.3.5 高性能


上面提到的所有特性,让 Kafka 成为了一个高性能的发布与订阅消息系统。通过横向扩展生产者、消费者和 broker,Kafka 可以轻松处理巨大的消息流。在处理大量数据的同时,它还能保证亚秒级的消息延迟。





1.4 数据生态系统


已经有很多应用程序加入到了数据处理的大军中。我们定义了输入和应用程序,负责生成数据或者把数据引入系统。我们定义了输出,它们可以是度量指标、报告或者其他类型的数据。我们创建了一些循环,使用一些组件从系统读取数据,对读取的数据进行处理,然后把它们导到数据基础设施上,以备不时之需。数据类型可以多种多样,每一种数据类型可以有不同的内容、大小和用途。

Kafka 为数据生态系统带来了循环系统,如图 1-9 所示。它在基础设施的各个组件之间传递消息,为所有客户端提供一致的接口。当与提供消息模式的系统集成时,生产者和消费者之间不再有紧密的耦合,也不需要在它们之间建立任何类型的直连。我们可以根据业务需要添加或移除组件,因为生产者不再关心谁在使用数据,也不关心有多少个消费者。



图 1-9:大数据生态系统





使用场景


活动跟踪

Kafka 最初的使用场景是跟踪用户的活动。网站用户与前端应用程序发生交互,前端应用程序生成用户活动相关的消息。这些消息可以是一些静态的信息,比如页面访问次数和点击量,也可以是一些复杂的操作,比如添加用户资料。这些消息被发布到一个或多个主题上,由后端应用程序负责读取。这样,我们就可以生成报告,为机器学习系统提供数据,更新搜索结果,或者实现其他更多的功能。



传递消息

Kafka 的另一个基本用途是传递消息。应用程序向用户发送通知(比如邮件)就是通过传递消息来实现的。这些应用程序组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何被发送的。一个公共应用程序会读取这些消息,对它们进行处理:

格式化消息(也就是所谓的装饰);

将多个消息放在同一个通知里发送;

根据用户配置的首选项来发送数据。



使用公共组件的好处在于,不需要在多个应用程序上开发重复的功能,而且可以在公共组件上做一些有趣的转换,比如把多个消息聚合成一个单独的通知,而这些工作是无法在其他地方完成的。



度量指标和日志记录

Kafka 也可以用于收集应用程序和系统度量指标以及日志。Kafka 支持多个生产者的特性在这个时候就可以派上用场。应用程序定期把度量指标发布到 Kafka 主题上,监控系统或告警系统读取这些消息。Kafka 也可以用在像 Hadoop 这样的离线系统上,进行较长时间片段的数据分析,比如年度增长走势预测。日志消息也可以被发布到 Kafka 主题上,然后被路由到专门的日志搜索系统(比如 Elasticsearch)或安全分析应用程序。更改目标系统(比如日志存储系统)不会影响到前端应用或聚合方法,这是 Kafka 的另一个优点。



提交日志

Kafka 的基本概念来源于提交日志,所以使用 Kafka 作为提交日志是件顺理成章的事。我们可以把数据库的更新发布到 Kafka 上,应用程序通过监控事件流来接收数据库的实时更新。这种变更日志流也可以用于把数据库的更新复制到远程系统上,或者合并多个应用程序的更新到一个单独的数据库视图上。数据持久化为变更日志提供了缓冲区,也就是说,如果消费者应用程序发生故障,可以通过重放这些日志来恢复系统状态。另外,紧凑型日志主题只为每个键保留一个变更数据,所以可以长时间使用,不需要担心消息过期问题。



流处理

流处理是又一个能提供多种类型应用程序的领域。可以说,它们提供的功能与 Hadoop 里的 map 和 reduce 有点类似,只不过它们操作的是实时数据流,而 Hadoop 则处理更长时间片段的数据,可能是几个小时或者几天,Hadoop 会对这些数据进行批处理。通过使用流式处理框架,用户可以编写小型应用程序来操作 Kafka 消息,比如计算度量指标,为其他应用程序有效地处理消息分区,或者对来自多个数据源的消息进行转换。第 11 章将通过其他案例介绍流处理。





1.5 起源故事


Kafka 是为了解决 LinkedIn 数据管道问题应运而生的。它的设计目的是提供一个高性能的消息系统,可以处理多种数据类型,并能够实时提供纯净且结构化的用户活动数据和系统度量指标。

数据为我们所做的每一件事提供了动力。

——Jeff Weiner,LinkedIn CEO





1.5.1 LinkedIn的问题


本章开头提到过,LinkedIn 有一个数据收集系统和应用程序指标,它使用自定义的收集器和一些开源工具来保存和展示内部数据。除了跟踪 CPU 使用率和应用性能这些一般性指标外,LinkedIn 还有一个比较复杂的用户请求跟踪功能。它使用了监控系统,可以跟踪单个用户的请求是如何在内部应用间传播的。不过监控系统存在很多不足。它使用的是轮询拉取度量指标的方式,指标之间的时间间隔较长,而且没有自助服务能力。它使用起来不太方便,很多简单的任务需要人工介入才能完成,而且一致性较差,同一个度量指标的名字在不同系统里的叫法不一样。

与此同时,我们还创建了另一个用于收集用户活动信息的系统。这是一个 HTTP 服务,前端的服务器会定期连接进来,在上面发布一些消息(XML 格式)。这些消息文件被转移到线下进行解析和校对。同样,这个系统也存在很多不足。XML 文件的格式无法保持一致,而且解析 XML 文件非常耗费计算资源。要想更改所创建的活动类型,需要在前端应用和离线处理程序之间做大量的协调工作。即使是这样,在更改数据结构时,仍然经常出现系统崩溃现象。而且批处理时间以小时计算,无法用它完成实时的任务。

监控和用户活动跟踪无法使用同一个后端服务。监控服务太过笨重,数据格式不适用于活动跟踪,而且无法在活动跟踪中使用轮询拉取模型。另一方面,把跟踪服务用在度量指标上也过于脆弱,批处理模型不适用于实时的监控和告警。不过,好在数据间存在很多共性,信息(比如特定类型的用户活动对应用程序性能的影响)之间的关联度还是很高的。特定类型用户活动数量的下降说明相关应用程序存在问题,不过批处理的长时间延迟意味着无法对这类问题作出及时的反馈。

最开始,我们调研了一些现成的开源解决方案,希望能够找到一个系统,可以实时访问数据,并通过横向扩展来处理大量的消息。我们使用 ActiveMQ 创建了一个原型系统,但它当时还无法满足横向扩展的需求。LinkedIn 不得不使用这种脆弱的解决方案,虽然 ActiveMQ 有很多缺陷会导致 broker 暂停服务。客户端的连接因此被阻塞,处理用户请求的能力也受到影响。于是我们最后决定构建自己的基础设施。





1.5.2 Kafka的诞生


LinkedIn 的开发团队由 Jay Kreps 领导。Jay Kreps 是 LinkedIn 的首席工程师,之前负责分布式键值存储系统 Voldemort 的开发。初建团队成员还包括 Neha Narkhede,不久之后, Jun Rao 也加入了进来。他们一起着手创建一个消息系统,可以同时满足上述的两种需求,并且可以在未来进行横向扩展。他们的主要目标如下:

使用推送和拉取模型解耦生产者和消费者;

为消息传递系统中的消息提供数据持久化,以便支持多个消费者;

通过系统优化实现高吞吐量;

系统可以随着数据流的增长进行横向扩展。



最后我们看到的这个发布与订阅消息系统具有典型的消息系统接口,但从存储层来看,它更像是一个日志聚合系统。Kafka 使用 Avro 作为消息序列化框架,每天高效地处理数十亿级别的度量指标和用户活动跟踪信息。LinkedIn 已经拥有超过万亿级别的消息使用量(截止到 2015 年 8 月),而且每天仍然需要处理超过千万亿字节的数据。





1.5.3 走向开源


2010 年底,Kafka 作为开源项目在 GitHub 上发布。2011 年 7 月,因为倍受开源社区的关注,它成为 Apache 软件基金会的孵化器项目。2012 年 10 月,Kafka 从孵化器项目毕业。从那时起,来自 LinkedIn 内部的开发团队一直为 Kafka 提供大力支持,而且吸引了大批来自 LinkedIn 外部的贡献者和参与者。现在,Kafka 被很多组织用在一些大型的数据管道上。2014 年秋天,Jay Kreps、Neha Narkhede 和 Jun Rao 离开 LinkedIn,创办了 Confluent。 Confluent 是一个致力于为企业开发提供支持、为 Kafka 提供培训的公司。这两家公司连同来自开源社区持续增长的贡献力量,一直在开发和维护 Kafka,让 Kafka 成为大数据管道的不二之选。





1.5.4 命名


关于 Kafka 的历史,人们经常会问到的一个问题就是,Kafka 这个名字是怎么想出来的,以及这个名字和这个项目之间有着怎样的联系。对于这个问题,Jay Kreps 解释如下:

我想既然 Kafka 是为了写数据而产生的,那么用作家的名字来命名会显得更有意义。我在大学时期上过很多文学课程,很喜欢 Franz Kafka。况且,对于开源项目来说,这个名字听起来很酷。因此,名字和应用本身基本没有太多联系。





1.6 开始Kafka之旅


现在我们对 Kafka 已经有了一个大体的了解,还知道了一些常见的术语,接下来可以开始使用 Kafka 来创建数据管道了。在下一章,我们将探究如何安装和配置 Kafka,还会讨论如何选择合适的硬件来运行 Kafka,以及把 Kafka 应用到生产环境需要注意的事项。





第 2 章 安装 Kafka


这一章将介绍如何安装和运行 Kafka,包括如何设置 Zookeeper(Kafka 使用 Zookeeper 保存 Broker 的元数据),还会介绍 Kafka 的基本配置,以及如何为 Kafka 选择合适的硬件,最后介绍如何在一个集群中安装多个 Kafka broker,以及把 Kafka 应用到生产环境需要注意的事项。





2.1 要事先行


在使用 Kafka 之前需要先做一些事情,接下来介绍怎样做。





2.1.1 选择操作系统


Kafka 是使用 Java 开发的应用程序,所以它可以运行在 Windows、MacOS 和 Linux 等多种操作系统上。本章将着重介绍如何在 Linux 上安装和使用 Kafka,因为把 Kafka 安装在 Linux 系统上是最为常见的。即使只是把 Kafka 作为一般性用途,仍然推荐使用 Linux 系统。关于如何在 Windows 和 MacOS 上安装 Kafka,请参考附录 A。





2.1.2 安装Java


在安装 Zookeeper 和 Kafka 之前,需要先安装 Java 环境。这里推荐安装 Java 8,可以使用系统自带的安装包,也可以直接从 java.com 网站下载。虽然运行 Zookeeper 和 Kafka 只需要 Java 运行时版本,但也可以安装完整的 JDK,以备不时之需。假设 JDK 8 update 51 已经安装在 /usr/java/jdk1.8.0_51 目录下,其他软件的安装都是基于这个前提进行的。





2.1.3 安装Zookeeper


Kafka 使用 Zookeeper 保存集群的元数据信息和消费者信息。Kafka 发行版自带了 Zookeeper,可以直接从脚本启动,不过安装一个完整版的 Zookeeper 也并不费劲。



图 2-1:Kafka 和 Zookeeper

Zookeeper 的 3.4.6 稳定版已经在 Kafka 上做过全面测试,可以从 apache.org 下载该版本的 Zookeeper:http://bit.ly/2sDWSgJ。

单机服务

下面的例子演示了如何使用基本的配置安装 Zookeeper,安装目录为 /usr/local/zookeeper,数据目录为 /var/lib/zookeeper。

# tar -zxf zookeeper-3.4.6.tar.gz # mv zookeeper-3.4.6 /usr/local/zookeeper # mkdir -p /var/lib/zookeeper # cat > /usr/local/zookeeper/conf/zoo.cfg << EOF > tickTime=2000 > dataDir=/var/lib/zookeeper > clientPort=2181 > EOF # export JAVA_HOME=/usr/java/jdk1.8.0_51 # /usr/local/zookeeper/bin/zkServer.sh start JMX enabled by default Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... STARTED #

现在可以连到 Zookeeper 端口上,通过发送四字命令 srvr 来验证 Zookeeper 是否安装正确。

# telnet localhost 2181 Trying ::1... Connected to localhost. Escape character is '^]'. srvr Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT Latency min/avg/max: 0/0/0 Received: 1 Sent: 0 Connections: 1 Outstanding: 0 Zxid: 0x0 Mode: standalone Node count: 4 Connection closed by foreign host. #



Zookeeper 群组(Ensemble)

Zookeeper 集群被称为群组。Zookeeper 使用的是一致性协议,所以建议每个群组里应该包含奇数个节点(比如 3 个、5 个等),因为只有当群组里的大多数节点(也就是法定人数)处于可用状态,Zookeeper 才能处理外部的请求。也就是说,如果你有一个包含 3 个节点的群组,那么它允许一个节点失效。如果群组包含 5 个节点,那么它允许 2 个节点失效。

 群组节点个数的选择

假设有一个包含 5 个节点的群组,如果要对群组做一些包括更换节点在内的配置更改,需要依次重启每一个节点。如果你的群组无法容忍多个节点失效,那么在进行群组维护时就会存在风险。不过,也不建议一个群组包含超过 7 个节点,因为 Zookeeper 使用了一致性协议,节点过多会降低整个群组的性能。



群组需要有一些公共配置,上面列出了所有服务器的清单,并且每个服务器还要在数据目录中创建一个 myid 文件,用于指明自己的 ID。如果群组里服务器的机器名是 zoo1.example.com、zoo2.example.com、zoo3.example.com,那么配置文件可能是这样的:

tickTime=2000 dataDir=/var/lib/zookeeper clientPort=2181 initLimit=20 syncLimit=5 server.1=zoo1.example.com:2888:3888 server.2=zoo2.example.com:2888:3888 server.3=zoo3.example.com:2888:3888

在这个配置中,initLimit 表示用于在从节点与主节点之间建立初始化连接的时间上限,syncLimit 表示允许从节点与主节点处于不同步状态的时间上限。这两个值都是 tickTime 的倍数,所以 initLimit 是 20*2000ms,也就是 40s。配置里还列出了群组中所有服务器的地址。服务器地址遵循 server.X=hostname:peerPort:leaderPort 格式,各个参数说明如下:

X

  服务器的 ID,它必须是一个整数,不过不一定要从 0 开始,也不要求是连续的;

hostname

  服务器的机器名或 IP 地址;

peerPort

  用于节点间通信的 TCP 端口;

leaderPort

  用于首领选举的 TCP 端口。

客户端只需要通过 clientPort 就能连接到群组,而群组节点间的通信则需要同时用到这 3 个端口(peerPort、leaderPort、clientPort)。

除了公共的配置文件外,每个服务器都必须在 data Dir 目录中创建一个叫作 myid 的文件,文件里要包含服务器 ID,这个 ID 要与配置文件里配置的 ID 保持一致。完成这些步骤后,就可以启动服务器,让它们彼此间进行通信了。





2.2 安装Kafka Broker


配置好 Java 和 Zookeeper 之后,接下来就可以安装 Kafka 了。可以从 http://kafka.apache.org/downloads.html 下载最新版本的 Kafka。截至本书写作时,Kafka 的版本是 0.9.0.1,对应的 Scala 版本是 2.11.0。

下面的例子将 Kafka 安装在 /usr/local/kafka 目录下,使用之前配置好的 Zookeeper,并把消息日志保存在 /tmp/kafka-logs 目录下。

# tar -zxf kafka_2.11-0.9.0.1.tgz # mv kafka_2.11-0.9.0.1 /usr/local/kafka # mkdir /tmp/kafka-logs # export JAVA_HOME=/usr/java/jdk1.8.0_51 # /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties #

一旦 Kafka 创建完毕,就可以对这个集群做一些简单的操作来验证它是否安装正确,比如创建一个测试主题,发布一些消息,然后读取它们。

创建并验证主题:

# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test". # /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 #

往测试主题上发布消息:

# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test Test Message 1 Test Message 2 ^D #

从测试主题上读取消息:

# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning Test Message 1 Test Message 2 ^C Consumed 2 messages #





2.3 broker配置


Kafka 发行包里自带的配置样本可以用来安装单机服务,但并不能满足大多数安装场景的要求。Kafka 有很多配置选项,涉及安装和调优的方方面面。不过大多数调优选项可以使用默认配置,除非你对调优有特别的要求。





2.3.1 常规配置


有一些配置选项,在单机安装时可以直接使用默认值,但在部署到其他环境时要格外小心。这些参数是单个服务器最基本的配置,它们中的大部分需要经过修改后才能用在集群里。

broker.id

每个 broker 都需要有一个标识符,使用 broker.id 来表示。它的默认值是 0,也可以被设置成其他任意整数。这个值在整个 Kafka 集群里必须是唯一的。这个值可以任意选定,如果出于维护的需要,可以在服务器节点间交换使用这些 ID。建议把它们设置成与机器名具有相关性的整数,这样在进行维护时,将 ID 号映射到机器名就没那么麻烦了。例如,如果机器名包含唯一性的数字(比如 host1.example.com、host2.example.com),那么用这些数字来设置 broker.id 就再好不过了。



port

如果使用配置样本来启动 Kafka,它会监听 9092 端口。修改 port 配置参数可以把它设置成其他任意可用的端口。要注意,如果使用 1024 以下的端口,需要使用 root 权限启动 Kafka,不过不建议这么做。



zookeeper.connect

用于保存 broker 元数据的 Zookeeper 地址是通过 zookeeper.connect 来指定的。 localhost:2181 表示这个 Zookeeper 是运行在本地的 2181 端口上。该配置参数是用冒号分隔的一组 hostname:port/path 列表,每一部分的含义如下:

hostname 是 Zookeeper 服务器的机器名或 IP 地址;

port 是 Zookeeper 的客户端连接端口;

/path 是可选的 Zookeeper 路径,作为 Kafka 集群的 chroot 环境。如果不指定,默认使用根路径。



如果指定的 chroot 路径不存在,broker 会在启动的时候创建它。

 为什么使用 chroot 路径

在 Kafka 集群里使用 chroot 路径是一种最佳实践。Zookeeper 群组可以共享给其他应用程序,即使还有其他 Kafka 集群存在,也不会产生冲突。最好是在配置文件里指定一组 Zookeeper 服务器,用分号把它们隔开。一旦有一个 Zookeeper 服务器宕机,broker 可以连接到 Zookeeper 群组的另一个节点上。





log.dirs

Kafka 把所有消息都保存在磁盘上,存放这些日志片段的目录是通过 log.dirs 指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么 broker 会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。要注意,broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。



num.recovery.threads.per.data.dir

对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段:

服务器正常启动,用于打开每个分区的日志片段;

服务器崩溃后重启,用于检查和截短每个分区的日志片段;

服务器正常关闭,用于关闭日志片段。



默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩溃,在进行恢复时使用并行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。



auto.create.topics.enable

默认情况下,Kafka 会在如下几种情形下自动创建主题:

当一个生产者开始往主题写入消息时;

当一个消费者开始从主题读取消息时;

当任意一个客户端向主题发送元数据请求时。



很多时候,这些行为都是非预期的。而且,根据 Kafka 协议,如果一个主题不先被创建,根本无法知道它是否已经存在。如果显式地创建主题,不管是手动创建还是通过其他配置系统来创建,都可以把 auto.create.topics.enable 设为 false。





2.3.2 主题的默认配置


Kafka 为新创建的主题提供了很多默认配置参数。可以通过管理工具(将在第 9 章介绍)为每个主题单独配置一部分参数,比如分区个数和数据保留策略。服务器提供的默认配置可以作为基准,它们适用于大部分主题。

 使用主题配置覆盖(override)

之前的 Kafka 版本允许主题覆盖服务器的默认配置,包括 log.retention.hours.per.topic、log.retention.bytes.per.topic 和 log.seg ment.bytes.per.topic 这几个参数。新版本不再支持这些参数,而且如果要对参数进行覆盖,需要使用管理工具。



num.partitions

num.partitions 参数指定了新创建的主题将包含多少个分区。如果启用了主题自动创建功能(该功能默认是启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。所以,如果要让一个主题的分区个数少于 num.partitions 指定的值,需要手动创建该主题(将在第 9 章讨论)。

第 1 章里已经提到,Kafka 集群通过分区对主题进行横向扩展,所以当有新的 broker 加入集群时,可以通过分区个数来实现集群的负载均衡。当然,这并不是说,在存在多个主题的情况下(它们分布在多个 broker 上),为了能让分区分布到所有 broker 上,主题分区的个数必须要大于 broker 的个数。不过,拥有大量消息的主题如果要进行负载分散,就需要大量的分区。

 如何选定分区数量

为主题选定分区数量并不是一件可有可无的事情,在进行数量选择时,需要考虑如下几个因素。

主题需要达到多大的吞吐量?例如,是希望每秒钟写入 100KB 还是 1GB ?

从单个分区读取数据的最大吞吐量是多少?每个分区一般都会有一个消费者,如果你知道消费者将数据写入数据库的速度不会超过每秒 50MB,那么你也该知道,从一个分区读取数据的吞吐量不需要超过每秒 50MB。

可以通过类似的方法估算生产者向单个分区写入数据的吞吐量,不过生产者的速度一般比消费者快得多,所以最好为生产者多估算一些吞吐量。

每个 broker 包含的分区个数、可用的磁盘空间和网络带宽。

如果消息是按照不同的键来写入分区的,那么为已有的主题新增分区就会很困难。

单个 broker 对分区个数是有限制的,因为分区越多,占用的内存越多,完成首领选举需要的时间也越长。





很显然,综合考虑以上几个因素,你需要很多分区,但不能太多。如果你估算出主题的吞吐量和消费者吞吐量,可以用主题吞吐量除以消费者吞吐量算出分区的个数。也就是说,如果每秒钟要从主题上写入和读取 1GB 的数据,并且每个消费者每秒钟可以处理 50MB 的数据,那么至少需要 20 个分区。这样就可以让 20 个消费者同时读取这些分区,从而达到每秒钟 1GB 的吞吐量。

如果不知道这些信息,那么根据经验,把分区的大小限制在 25GB 以内可以得到比较理想的效果。



log.retention.ms

Kafka 通常根据时间来决定数据可以被保留多久。默认使用 log.retention.hours 参数来配置时间,默认值为 168 小时,也就是一周。除此以外,还有其他两个参数 log.retention.minutes 和 log.retention.ms。这 3 个参数的作用是一样的,都是决定消息多久以后会被删除,不过还是推荐使用 log.retention.ms。如果指定了不止一个参数,Kafka 会优先使用具有最小值的那个参数。

 根据时间保留数据和最后修改时间

根据时间保留数据是通过检查磁盘上日志片段文件的最后修改时间来实现的。一般来说,最后修改时间指的就是日志片段的关闭时间,也就是文件里最后一个消息的时间戳。不过,如果使用管理工具在服务器间移动分区,最后修改时间就不准确了。时间误差可能导致这些分区过多地保留数据。在第 9 章讨论分区移动时会提到更多这方面的内容。





log.retention.bytes

另一种方式是通过保留的消息字节数来判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设为 1GB,那么这个主题最多可以保留 8GB 的数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。

 根据字节大小和时间保留数据

如果同时指定了 log.retention.bytes 和 log.retention.ms(或者另一个时间参数),只要任意一个条件得到满足,消息就会被删除。例如,假设 log.retention.ms 设置为 86 400 000(也就是 1 天),log.retention.bytes 设置为 1 000 000 000(也就是 1GB),如果消息字节总数在不到一天的时间就超过了 1GB,那么多出来的部分就会被删除。相反,如果消息字节总数小于 1GB,那么一天之后这些消息也会被删除,尽管分区的数据总量小于 1GB。





log.segment.bytes

以上的设置都作用在日志片段上,而不是作用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片段上。当日志片段大小达到 log.segment.bytes 指定的上限(默认是 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就会越频繁地关闭和分配新文件,从而降低磁盘写入的整体效率。

如果主题的消息量不大,那么如何调整这个参数的大小就变得尤为重要。例如,如果一个主题每天只接收 100MB 的消息,而 log.segment.bytes 使用默认设置,那么需要 10 天时间才能填满一个日志片段。因为在日志片段被关闭之前消息是不会过期的,所以如果 log.retention.ms 被设为 604 800 000(也就是 1 周),那么日志片段最多需要 17 天才会过期。

这是因为关闭日志片段需要 10 天的时间,而根据配置的过期时间,还需要再保留 7 天时间(要等到日志片段里的最后一个消息过期才能被删除)。

 使用时间戳获取偏移量

日志片段的大小会影响使用时间戳获取偏移量。在使用时间戳获取日志偏移量时,Kafka 会检查分区里最后修改时间大于指定时间戳的日志片段(已经被关闭的),该日志片段的前一个文件的最后修改时间小于指定时间戳。然后,Kafka 返回该日志片段(也就是文件名)开头的偏移量。对于使用时间戳获取偏移量的操作来说,日志片段越小,结果越准确。





log.segment.ms

另一个可以控制日志片段关闭时间的参数是 log.segment.ms,它指定了多长时间之后日志片段会被关闭。就像 log.retention.bytes 和 log.retention.ms 这两个参数一样,log.segment.bytes 和 log.retention.ms 这两个参数之间也不存在互斥问题。日志片段会在大小或时间达到上限时被关闭,就看哪个条件先得到满足。默认情况下,log.segment.ms 没有设定值,所以只根据大小来关闭日志片段。

 基于时间的日志片段对磁盘性能的影响

在使用基于时间的日志片段时,要着重考虑并行关闭多个日志片段对磁盘性能的影响。如果多个分区的日志片段永远不能达到大小的上限,就会发生这种情况,因为 broker 在启动之后就开始计算日志片段的过期时间,对于那些数据量小的分区来说,日志片段的关闭操作总是同时发生。





message.max.bytes

broker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认值是 1 000 000,也就是 1MB。如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误信息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 message.max.bytes 指定的值,消息的实际大小可以远大于这个值。

这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。

 在服务端和客户端之间协调消息大小的配置

消费者客户端设置的 fetch.message.max.bytes 必须与服务器端设置的消息大小进行协调。如果这个值比 message.max.bytes 小,那么消费者就无法读取比较大的消息,导致出现消费者被阻塞的情况。在为集群里的 broker 配置 replica.fetch.max.bytes 参数时,也遵循同样的原则。





2.4 硬件的选择


为 Kafka 选择合适的硬件更像是一门艺术。Kafka 本身对硬件没有特别的要求,它可以运行在任何系统上。不过,如果比较关注性能,那么就需要考虑几个会影响整体性能的因素:磁盘吞吐量和容量、内存、网络和 CPU。在确定了性能关注点之后,就可以在预算范围内选择最优化的硬件配置。





2.4.1 磁盘吞吐量


生产者客户端的性能直接受到服务器端磁盘吞吐量的影响。生产者生成的消息必须被提交到服务器保存,大多数客户端在发送消息之后会一直等待,直到至少有一个服务器确认消息已经成功提交为止。也就是说,磁盘写入速度越快,生成消息的延迟就越低。

在考虑硬盘类型对磁盘吞吐量的影响时,是选择传统的机械硬盘(HDD)还是固态硬盘(SSD),我们可以很容易地作出决定。固态硬盘的查找和访问速度都很快,提供了最好的性能。机械硬盘更便宜,单块硬盘容量也更大。在同一个服务器上使用多个机械硬盘,可以设置多个数据目录,或者把它们设置成磁盘阵列,这样可以提升机械硬盘的性能。其他方面的因素,比如磁盘特定的技术(串行连接存储技术或 SATA),或者磁盘控制器的质量,都会影响吞吐量。





2.4.2 磁盘容量


磁盘容量是另一个值得讨论的话题。需要多大的磁盘容量取决于需要保留的消息数量。如果服务器每天会收到 1TB 消息,并且保留 7 天,那么就需要 7TB 的存储空间,而且还要为其他文件提供至少 10% 的额外空间。除此之外,还需要提供缓冲区,用于应付消息流量的增长和波动。

在决定扩展 Kafka 集群规模时,存储容量是一个需要考虑的因素。通过让主题拥有多个分区,集群的总流量可以被均衡到整个集群,而且如果单个 broker 无法支撑全部容量,可以让其他 broker 提供可用的容量。存储容量的选择同时受到集群复制策略的影响(将在第 6 章讨论更多的细节)。





2.4.3 内存


除了磁盘性能外,服务器端可用的内存容量是影响客户端性能的主要因素。磁盘性能影响生产者,而内存影响消费者。消费者一般从分区尾部读取消息,如果有生产者存在,就紧跟在生产者后面。在这种情况下,消费者读取的消息会直接存放在系统的页面缓存里,这比从磁盘上重新读取要快得多。

运行 Kafka 的 JVM 不需要太大的内存,剩余的系统内存可以用作页面缓存,或者用来缓存正在使用中的日志片段。这也就是为什么不建议把 Kafka 同其他重要的应用程序部署在一起的原因,它们需要共享页面缓存,最终会降低 Kafka 消费者的性能。





2.4.4 网络


网络吞吐量决定了 Kafka 能够处理的最大数据流量。它和磁盘存储是制约 Kafka 扩展规模的主要因素。Kafka 支持多个消费者,造成流入和流出的网络流量不平衡,从而让情况变得更加复杂。对于给定的主题,一个生产者可能每秒钟写入 1MB 数据,但可能同时有多个消费者瓜分网络流量。其他的操作,如集群复制(在第 6 章介绍)和镜像(在第 8 章介绍)也会占用网络流量。如果网络接口出现饱和,那么集群的复制出现延时就在所难免,从而让集群不堪一击。





2.4.5 CPU


与磁盘和内存相比,Kafka 对计算处理能力的要求相对较低,不过它在一定程度上还是会影响整体的性能。客户端为了优化网络和磁盘空间,会对消息进行压缩。服务器需要对消息进行批量解压,设置偏移量,然后重新进行批量压缩,再保存到磁盘上。这就是 Kafka 对计算处理能力有所要求的地方。不过不管怎样,这都不应该成为选择硬件的主要考虑因素。





2.5 云端的Kafka


Kafka 一般被安装在云端,比如亚马逊网络服务(Amazon Web Services,AWS)。AWS 提供了很多不同配置的实例,我们要根据 Kafka 的性能优先级来选择合适的实例。可以先从要保留数据的大小开始考虑,然后考虑生产者方面的性能。如果要求低延迟,那么就需要专门为 I/O 优化过的使用固态硬盘的实例,否则,使用配备了临时存储的实例就可以了。选好存储类型之后,再选择 CPU 和内存就容易得多。

实际上,如果使用 AWS,一般会选择 m4 实例或 r3 实例。m4 实例允许较长时间地保留数据,不过磁盘吞吐量会小一些,因为它使用的是弹性块存储。r3 实例使用固态硬盘,具有较高的吞吐量,但保留的数据量会有所限制。如果想两者兼顾,那么需要升级成 i2 实例或 d2 实例,不过它们的成本要高得多。





2.6 Kafka集群


单个 Kafka 服务器足以满足本地开发或 POC 要求,不过集群也有它的强大之处。使用集群最大的好处是可以跨服务器进行负载均衡,再则就是可以使用复制功能来避免因单点故障造成的数据丢失。在维护 Kafka 或底层系统时,使用集群可以确保为客户端提供高可用性。本节只是介绍如何配置 Kafka 集群,第 6 章将介绍更多关于数据复制的内容。



图 2-2:一个简单的 Kafka 集群





2.6.1 需要多少个broker


一个 Kafka 集群需要多少个 broker 取决于以下几个因素。首先,需要多少磁盘空间来保留数据,以及单个 broker 有多少空间可用。如果整个集群需要保留 10TB 的数据,每个 broker 可以存储 2TB,那么至少需要 5 个 broker。如果启用了数据复制,那么至少还需要一倍的空间,不过这要取决于配置的复制系数是多少(将在第 6 章介绍)。也就是说,如果启用了数据复制,那么这个集群至少需要 10 个 broker。

第二个要考虑的因素是集群处理请求的能力。这通常与网络接口处理客户端流量的能力有关,特别是当有多个消费者存在或者在数据保留期间流量发生波动(比如高峰时段的流量爆发)时。如果单个 broker 的网络接口在高峰时段可以达到 80% 的使用量,并且有两个消费者,那么消费者就无法保持峰值,除非有两个 broker。如果集群启用了复制功能,则要把这个额外的消费者考虑在内。因磁盘吞吐量低和系统内存不足造成的性能问题,也可以通过扩展多个 broker 来解决。





2.6.2 broker配置


要把一个 broker 加入到集群里,只需要修改两个配置参数。首先,所有 broker 都必须配置相同的 zookeeper.connect,该参数指定了用于保存元数据的 Zookeeper 群组和路径。其次,每个 broker 都必须为 broker.id 参数设置唯一的值。如果两个 broker 使用相同的 broker.id,那么第二个 broker 就无法启动。在运行集群时,还可以配置其他一些参数,特别是那些用于控制数据复制的参数,这些将在后续的章节介绍。





2.6.3 操作系统调优


大部分 Linux 发行版默认的内核调优参数配置已经能够满足大多数应用程序的运行需求,不过还是可以通过调整一些参数来进一步提升 Kafka 的性能。这些参数主要与虚拟内存、网络子系统和用来存储日志片段的磁盘挂载点有关。这些参数一般配置在 /etc/sysctl.conf 文件里,不过在对内核参数进行调整时,最好参考操作系统的文档。

虚拟内存

一般来说,Linux 的虚拟内存会根据系统的工作负荷进行自动调整。我们可以对交换分区的处理方式和内存脏页进行调整,从而让 Kafka 更好地处理工作负载。

对于大多数依赖吞吐量的应用程序来说,要尽量避免内存交换。内存页和磁盘之间的交换对 Kafka 各方面的性能都有重大影响。Kafka 大量地使用系统页面缓存,如果虚拟内存被交换到磁盘,说明已经没有多余内存可以分配给页面缓存了。

一种避免内存交换的方法是不设置任何交换分区。内存交换不是必需的,不过它确实能够在系统发生灾难性错误时提供一些帮助。进行内存交换可以防止操作系统由于内存不足而突然终止进程。基于上述原因,建议把 vm.swappiness 参数的值设置得小一点,比如 1。该参数指明了虚拟机的子系统将如何使用交换分区,而不是只把内存页从页面缓存里移除。要优先考虑减小页面缓存,而不是进行内存交换。

 为什么不把 vm.swappiness 设为零

先前,人们建议尽量把 vm.swapiness 设为 0,它意味着“除非发生内存溢出,否则不要进行内存交换”。直到 Linux 内核 3.5-rc1 版本发布,这个值的意义才发生了变化。这个变化被移植到其他的发行版上,包括 Red Hat 企业版内核 2.6.32-303。在发生变化之后,0 意味着“在任何情况下都不要发生交换”。所以现在建议把这个值设为 1。



脏页会被冲刷到磁盘上,调整内核对脏页的处理方式可以让我们从中获益。Kafka 依赖 I/O 性能为生产者提供快速的响应。这就是为什么日志片段一般要保存在快速磁盘上,不管是单个快速磁盘(如 SSD)还是具有 NVRAM 缓存的磁盘子系统(如 RAID)。这样一来,在后台刷新进程将脏页写入磁盘之前,可以减少脏页的数量,这个可以通过将 vm.dirty_background_ratio 设为小于 10 的值来实现。该值指的是系统内存的百分比,大部分情况下设为 5 就可以了。它不应该被设为 0,因为那样会促使内核频繁地刷新页面,从而降低内核为底层设备的磁盘写入提供缓冲的能力。

通过设置 vm.dirty_ratio 参数可以增加被内核进程刷新到磁盘之前的脏页数量,可以将它设为大于 20 的值(这也是系统内存的百分比)。这个值可设置的范围很广,60~80 是个比较合理的区间。不过调整这个参数会带来一些风险,包括未刷新磁盘操作的数量和同步刷新引起的长时间 I/O 等待。如果该参数设置了较高的值,建议启用 Kafka 的复制功能,避免因系统崩溃造成数据丢失。

为了给这些参数设置合适的值,最好是在 Kafka 集群运行期间检查脏页的数量,不管是在生存环境还是模拟环境。可以在 /proc/vmstat 文件里查看当前脏页数量。

# cat /proc/vmstat | egrep "dirty|writeback" nr_dirty 3875 nr_writeback 29 nr_writeback_temp 0 #



磁盘

除了选择合适的磁盘硬件设备和使用 RAID 外,文件系统是影响性能的另一个重要因素。有很多种文件系统可供选择,不过对于本地文件系统来说,EXT4(第四代可扩展文件系统)和 XFS 最为常见。近来,XFS 成为很多 Linux 发行版默认的文件系统,因为它只需要做少量调优就可以承担大部分的工作负荷,比 EXT4 具有更好的表现。EXT4 也可以做得很好,但需要做更多的调优,存在较大的风险。其中就包括设置更长的提交间隔(默认是 5),以便降低刷新的频率。EXT4 还引入了块分配延迟,一旦系统崩溃,更容易造成数据丢失和文件系统毁坏。XFS 也使用了分配延迟算法,不过比 EXT4 的要安全些。XFS 为 Kafka 提供了更好的性能,除了由文件系统提供的自动调优之外,无需额外的调优。批量磁盘写入具有更高的效率,可以提升整体的 I/O 吞吐量。

不管使用哪一种文件系统来存储日志片段,最好要对挂载点的 noatime 参数进行合理的设置。文件元数据包含 3 个时间戳:创建时间(ctime)、最后修改时间(mtime)以及最后访问时间(atime)。默认情况下,每次文件被读取后都会更新 atime,这会导致大量的磁盘写操作,而且 atime 属性的用处不大,除非某些应用程序想要知道某个文件在最近一次修改后有没有被访问过(这种情况可以使用 realtime)。Kafka 用不到该属性,所以完全可以把它禁用掉。为挂载点设置 noatime 参数可以防止更新 atime,但不会影响 ctime 和 mtime。



网络

默认情况下,系统内核没有针对快速的大流量网络传输进行优化,所以对于应用程序来说,一般需要对 Linux 系统的网络栈进行调优,以实现对大流量的支持。实际上,调整 Kafka 的网络配置与调整其他大部分 Web 服务器和网络应用程序的网络配置是一样的。首先可以对分配给 socket 读写缓冲区的内存大小作出调整,这样可以显著提升网络的传输性能。socket 读写缓冲区对应的参数分别是 net.core.wmem_default 和 net.core.rmem_default,合理的值是 131 072(也就是 128KB)。读写缓冲区最大值对应的参数分别是 net.core.wmem_max 和 net.core.rmem_max,合理的值是 2 097 152(也就是 2MB)。要注意,最大值并不意味着每个 socket 一定要有这么大的缓冲空间,只是说在必要的情况下才会达到这个值。

除了设置 socket 外,还需要设置 TCP socket 的读写缓冲区,它们的参数分别是 net.ipv4.tcp_wmem 和 net.ipv4.tcp_rmem。这些参数的值由 3 个整数组成,它们使用空格分隔,分别表示最小值、默认值和最大值。最大值不能大于 net.core.wmem_max 和 net.core.rmem_max 指定的大小。例如,“4096 65536 2048000”表示最小值是 4KB、默认值是 64KB、最大值是 2MB。根据 Kafka 服务器接收流量的实际情况,可能需要设置更高的最大值,为网络连接提供更大的缓冲空间。

还有其他一些有用的网络参数。例如,把 net.ipv4.tcp_window_scaling 设为 1,启用 TCP 时间窗扩展,可以提升客户端传输数据的效率,传输的数据可以在服务器端进行缓冲。把 net.ipv4.tcp_max_syn_backlog 设为比默认值 1024 更大的值,可以接受更多的并发连接。把 net.core.netdev_max_backlog 设为比默认值 1000 更大的值,有助于应对网络流量的爆发,特别是在使用千兆网络的情况下,允许更多的数据包排队等待内核处理。





2.7 生产环境的注意事项


当你准备把 Kafka 从测试环境部署到生产环境时,需要注意一些事项,以便创建更可靠的消息服务。





2.7.1 垃圾回收器选项


为应用程序调整 Java 垃圾回收参数就像是一门艺术,我们需要知道应用程序是如何使用内存的,还需要大量的观察和试错。幸运的是,Java 7 为我们带来了 G1 垃圾回收器,让这种状况有所改观。在应用程序的整个生命周期,G1 会自动根据工作负载情况进行自我调节,而且它的停顿时间是恒定的。它可以轻松地处理大块的堆内存,把堆内存分为若干小块的区域,每次停顿时并不会对整个堆空间进行回收。

正常情况下,G1 只需要很少的配置就能完成这些工作。以下是 G1 的两个调整参数。

MaxGCPauseMillis:

  该参数指定每次垃圾回收默认的停顿时间。该值不是固定的,G1 可以根据需要使用更长的时间。它的默认值是 200ms。也就是说,G1 会决定垃圾回收的频率以及每一轮需要回收多少个区域,这样算下来,每一轮垃圾回收大概需要 200ms 的时间。

InitiatingHeapOccupancyPercent:

  该参数指定了在 G1 启动新一轮垃圾回收之前可以使用的堆内存百分比,默认值是 45。也就是说,在堆内存的使用率达到 45% 之前,G1 不会启动垃圾回收。这个百分比包括新生代和老年代的内存。

Kafka 对堆内存的使用率非常高,容易产生垃圾对象,所以可以把这些值设得小一些。如果一台服务器有 64GB 内存,并且使用 5GB 堆内存来运行 Kafka,那么可以参考以下的配置:MaxGCPauseMillis 可以设为 20ms;InitiatingHeapOccupancyPercent 可以设为 35,这样可以让垃圾回收比默认的要早一些启动。

Kafka 的启动脚本并没有启用 G1 回收器,而是使用了 Parallel New 和 CMS( Concurrent Mark-Sweep,并发标记和清除)垃圾回收器。不过它可以通过环境变量来修改。本章前面的内容使用 start 命令来修改它:

# export JAVA_HOME=/usr/java/jdk1.8.0_51 # export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+DisableExplicitGC -Djava.awt.headless=true" # /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties #





2.7.2 数据中心布局


在开发阶段,人们并不会太关心 Kafka 服务器在数据中心所处的物理位置,因为即使集群在短时间内出现局部或完全不可用,也不会造成太大影响。但是,在生产环境,服务不可用意味着金钱的损失,具体表现为无法为用户提供服务或者不知道用户正在做什么。这个时候,使用 Kafka 集群的复制功能就变得尤为重要(请参考第 6 章),而服务器在数据中心所处的物理位置也变得重要起来。如果在部署 Kafka 之前没有考虑好这个问题,那么在后续的维护过程中,移动服务器需要耗费更高的成本。

在为 broker 增加新的分区时,broker 并无法获知机架的信息。也就是说,两个 broker 有可能是在同一个机架上,或者在同一个可用区域里(如果运行在像 AWS 这样的的云服务上),所以,在为分区添加副本的时候,这些副本很可能被分配给同一个机架上的 broker,它们使用相同的电源和网络连接。如果该机架出了问题,这些分区就会离线,客户端就无法访问到它们。更糟糕的是,如果发生不完整的主节点选举,那么在恢复时就有可能丢失数据(第 6 章将介绍更多细节)。

所以,最好把集群的 broker 安装在不同的机架上,至少不要让它们共享可能出现单点故障的基础设施,比如电源和网络。也就是说,部署服务器需要至少两个电源连接(两个不同的回路)和两个网络交换器(保证可以进行无缝的故障切换)。除了这些以外,最好还要把 broker 安放在不同的机架上。因为随着时间的推移,机架也需要进行维护,而这会导致机器离线(比如移动机器或者重新连接电源)。





2.7.3 共享Zookeeper


Kafka 使用 Zookeeper 来保存 broker、主题和分区的元数据信息。对于一个包含多个节点的 Zookeeper 群组来说,Kafka 集群的这些流量并不算多,那些写操作只是用于构造消费者群组或集群本身。实际上,在很多部署环境里,会让多个 Kafka 集群共享一个 Zookeeper 群组(每个集群使用一个 chroot 路径)。

 Kafka 消费者和 Zookeeper

在 Kafka 0.9.0.0 版本之前,除了 broker 之外,消费者也会使用 Zookeeper 来保存一些信息,比如消费者群组的信息、主题信息、消费分区的偏移量(在消费者群组里发生失效转移时会用到)。到了 0.9.0.0 版本,Kafka 引入了一个新的消费者接口,允许 broker 直接维护这些信息。这个新的消费者接口将在第 4 章介绍。



不过,消费者和 Zookeeper 之间还是有一个值得注意的地方,消费者可以选择将偏移量提交到 Zookeeper 或 Kafka,还可以选择提交偏移量的时间间隔。如果消费者将偏移量提交到 Zookeeper,那么在每个提交时间点上,消费者将会为每一个消费的分区往 Zookeeper 写入一次偏移量。合理的提交间隔是 1 分钟,因为这刚好是消费者群组的某个消费者发生失效时能够读取到重复消息的时间。值得注意的是,这些提交对于 Zookeeper 来说流量不算小,特别是当集群里有多个消费者的时候。如果 Zookeeper 群组无法处理太大的流量,就有必要使用长一点的提交时间间隔。不过不管怎样,还是建议使用最新版本的 Kafka,让消费者把偏移量提交到 Kafka 服务器上,消除对 Zookeeper 的依赖。

虽然多个 Kafka 集群可以共享一个 Zookeeper 群组,但如果有可能的话,不建议把 Zookeeper 共享给其他应用程序。Kafka 对 Zookeeper 的延迟和超时比较敏感,与 Zookeeper 群组之间的一个通信异常就可能导致 Kafka 服务器出现无法预测的行为。这样很容易让多个 broker 同时离线,如果它们与 Zookeeper 之间断开连接,也会导致分区离线。这也会给集群控制器带来压力,在服务器离线一段时间之后,当控制器尝试关闭一个服务器时,会表现出一些细小的错误。其他的应用程序因重度使用或进行不恰当的操作给 Zookeeper 群组带来压力,所以最好让它们使用自己的 Zookeeper 群组。





2.8 总结


在这一章,我们学习了如何运行 Kafka,同时也讨论了如何为 Kafka 选择合适的硬件,以及在生产环境中使用 Kafka 需要注意的事项。有了 Kafka 集群之后,接下来要介绍基本的客户端应用程序。后面两章将介绍如何创建客户端,并用它们向 Kafka 生产消息(第 3 章)以及从 Kafka 读取这些消息(第 4 章)。





第 3 章 Kafka 生产者——向 Kafka 写入数据


不管是把 Kafka 作为消息队列、消息总线还是数据存储平台来使用,总是需要有一个可以往 Kafka 写入数据的生产者和一个可以从 Kafka 读取数据的消费者,或者一个兼具两种角色的应用程序。

例如,在一个信用卡事务处理系统里,有一个客户端应用程序,它可能是一个在线商店,每当有支付行为发生时,它负责把事务发送到 Kafka 上。另一个应用程序根据规则引擎检查这个事务,决定是批准还是拒绝。批准或拒绝的响应消息被写回 Kafka,然后发送给发起事务的在线商店。第三个应用程序从 Kafka 上读取事务和审核状态,把它们保存到数据库,随后分析师可以对这些结果进行分析,或许还能借此改进规则引擎。

开发者们可以使用 Kafka 内置的客户端 API 开发 Kafka 应用程序。

在这一章,我们将从 Kafka 生产者的设计和组件讲起,学习如何使用 Kafka 生产者。我们将演示如何创建 KafkaProducer 和 ProducerRecords 对象、如何将记录发送给 Kafka,以及如何处理从 Kafka 返回的错误,然后介绍用于控制生产者行为的重要配置选项,最后深入探讨如何使用不同的分区方法和序列化器,以及如何自定义序列化器和分区器。

在第 4 章,我们将会介绍 Kafka 的消费者客户端,以及如何从 Kafka 读取消息。

 第三方客户端

除了内置的客户端外,Kafka 还提供了二进制连接协议,也就是说,我们直接向 Kafka 网络端口发送适当的字节序列,就可以实现从 Kafka 读取消息或往 Kafka 写入消息。还有很多用其他语言实现的 Kafka 客户端,比如 C++、 Python、Go 语言等,它们都实现了 Kafka 的连接协议,使得 Kafka 不仅仅局限于在 Java 里使用。这些客户端不属于 Kafka 项目,不过 Kafka 项目 wiki 上提供了一个清单,列出了所有可用的客户端。连接协议和第三方客户端超出了本章的讨论范围。





3.1 生产者概览


一个应用程序在很多情况下需要往 Kafka 写入消息:记录用户的活动(用于审计和分析)、记录度量指标、保存日志消息、记录智能家电的信息、与其他应用程序进行异步通信、缓冲即将写入到数据库的数据,等等。

多样的使用场景意味着多样的需求:是否每个消息都很重要?是否允许丢失一小部分消息?偶尔出现重复消息是否可以接受?是否有严格的延迟和吞吐量要求?

在之前提到的信用卡事务处理系统里,消息丢失或消息重复是不允许的,可以接受的延迟最大为 500ms,对吞吐量要求较高——我们希望每秒钟可以处理一百万个消息。

保存网站的点击信息是另一种使用场景。在这个场景里,允许丢失少量的消息或出现少量的消息重复,延迟可以高一些,只要不影响用户体验就行。换句话说,只要用户点击链接后可以马上加载页面,那么我们并不介意消息要在几秒钟之后才能到达 Kafka 服务器。吞吐量则取决于网站用户使用网站的频度。

不同的使用场景对生产者 API 的使用和配置会有直接的影响。

尽管生产者 API 使用起来很简单,但消息的发送过程还是有点复杂的。图 3-1 展示了向 Kafka 发送消息的主要步骤。



图 3-1:Kafka 生产者组件图

我们从创建一个 ProducerRecord 对象开始,ProducerRecord 对象需要包含目标主题和要发送的内容。我们还可以指定键或分区。在发送 ProducerRecord 对象时,生产者要先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。

接下来,数据被传给分区器。如果之前在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情,直接把指定的分区返回。如果没有指定分区,那么分区器会根据 ProducerRecord 对象的键来选择一个分区。选好分区以后,生产者就知道该往哪个主题和分区发送这条记录了。紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。

服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,几次之后如果还是失败,就返回错误信息。





3.2 创建Kafka生产者


要往 Kafka 写入消息,首先要创建一个生产者对象,并设置一些属性。Kafka 生产者有 3 个必选的属性。

bootstrap.servers

  该属性指定 broker 的地址清单,地址的格式为 host:port。清单里不需要包含所有的 broker 地址,生产者会从给定的 broker 里查找到其他 broker 的信息。不过建议至少要提供两个 broker 的信息,一旦其中一个宕机,生产者仍然能够连接到集群上。

key.serializer

  broker 希望接收到的消息的键和值都是字节数组。生产者接口允许使用参数化类型,因此可以把 Java 对象作为键和值发送给 broker。这样的代码具有良好的可读性,不过生产者需要知道如何把这些 Java 对象转换成字节数组。key.serializer 必须被设置为一个实现了 org.apache.kafka.common.serialization.Serializer 接口的类,生产者会使用这个类把键对象序列化成字节数组。Kafka 客户端默认提供了 ByteArraySerializer(这个只做很少的事情)、StringSerializer 和 IntegerSerializer,因此,如果你只使用常见的几种 Java 对象类型,那么就没必要实现自己的序列化器。要注意,key.serializer 是必须设置的,就算你打算只发送值内容。

value.serializer

  与 key.serializer 一样,value.serializer 指定的类会将值序列化。如果键和值都是字符串,可以使用与 key.serializer 一样的序列化器。如果键是整数类型而值是字符串,那么需要使用不同的序列化器。

下面的代码片段演示了如何创建一个新的生产者,这里只指定了必要的属性,其他使用默认设置。

private Properties kafkaProps = new Properties(); ➊ kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092"); kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); ➋ kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(kafkaProps); ➌

❶ 新建一个 Properties 对象。

❷ 因为我们打算把键和值定义成字符串类型,所以使用内置的 StringSerializer。

❸ 在这里我们创建了一个新的生产者对象,并为键和值设置了恰当的类型,然后把 Properties 对象传给它。

这个接口很简单,通过配置生产者的不同属性就可以很大程度地控制它的行为。Kafka 的文档涵盖了所有的配置参数,我们将在这一章的后面部分介绍其中几个比较重要的参数。

实例化生产者对象后,接下来就可以开始发送消息了。发送消息主要有以下 3 种方式。

发送并忘记(fire-and-forget)

  我们把消息发送给服务器,但并不关心它是否正常到达。大多数情况下,消息会正常到达,因为 Kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。

同步发送

  我们使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待,就可以知道消息是否发送成功。

异步发送

  我们调用 send() 方法,并指定一个回调函数,服务器在返回响应时调用该函数。

在下面的几个例子中,我们会介绍如何使用上述几种方式来发送消息,以及如何处理可能发生的异常情况。

本章的所有例子都使用单线程,但其实生产者是可以使用多线程来发送消息的。刚开始的时候可以使用单个消费者和单个线程。如果需要更高的吞吐量,可以在生产者数量不变的前提下增加线程数量。如果这样做还不够,可以增加生产者数量。





3.3 发送消息到Kafka


最简单的消息发送方式如下所示。

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); ➊ try { producer.send(record); ➋ } catch (Exception e) { e.printStackTrace(); ➌ }

❶ 生产者的 send() 方法将 ProducerRecord 对象作为参数,所以我们要先创建一个 ProducerRecord 对象。ProducerRecord 有多个构造函数,稍后我们会详细讨论。这里使用其中一个构造函数,它需要目标主题的名字和要发送的键和值对象,它们都是字符串。键和值对象的类型必须与序列化器和生产者对象相匹配。

❷ 我们使用生产者的 send() 方法发送 ProducerRecord 对象。从生产者的架构图里可以看到,消息先是被放进缓冲区,然后使用单独的线程发送到服务器端。send() 方法会返回一个包含 RecordMetadata 的 Future 对象,不过因为我们会忽略返回值,所以无法知道消息是否发送成功。如果不关心发送结果,那么可以使用这种发送方式。比如,记录 Twitter 消息日志,或记录不太重要的应用程序日志。

❸ 我们可以忽略发送消息时可能发生的错误或在服务器端可能发生的错误,但在发送消息之前,生产者还是有可能发生其他的异常。这些异常有可能是 SerializationException(说明序列化消息失败)、BufferExhaustedException 或 TimeoutException(说明缓冲区已满),又或者是 InterruptException(说明发送线程被中断)。





3.3.1 同步发送消息


最简单的同步发送消息方式如下所示。

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); try { producer.send(record).get(); ➊ } catch (Exception e) { e.printStackTrace(); ➋ }

❶ 在这里,producer.send() 方法先返回一个 Future 对象,然后调用 Future 对象的 get() 方法等待 Kafka 响应。如果服务器返回错误,get() 方法会抛出异常。如果没有发生错误,我们会得到一个 RecordMetadata 对象,可以用它获取消息的偏移量。

❷ 如果在发送数据之前或者在发送过程中发生了任何错误,比如 broker 返回了一个不允许重发消息的异常或者已经超过了重发的次数,那么就会抛出异常。我们只是简单地把异常信息打印出来。

KafkaProducer 一般会发生两类错误。其中一类是可重试错误,这类错误可以通过重发消息来解决。比如对于连接错误,可以通过再次建立连接来解决,“无主(no leader)”错误则可以通过重新为分区选举首领来解决。KafkaProducer 可以被配置成自动重试,如果在多次重试后仍无法解决问题,应用程序会收到一个重试异常。另一类错误无法通过重试解决,比如“消息太大”异常。对于这类错误,KafkaProducer 不会进行任何重试,直接抛出异常。





3.3.2 异步发送消息


假设消息在应用程序和 Kafka 集群之间一个来回需要 10ms。如果在发送完每个消息后都等待回应,那么发送 100 个消息需要 1 秒。但如果只发送消息而不等待响应,那么发送 100 个消息所需要的时间会少很多。大多数时候,我们并不需要等待响应——尽管 Kafka 会把目标主题、分区信息和消息的偏移量发送回来,但对于发送端的应用程序来说不是必需的。不过在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入“错误消息”文件以便日后分析。

为了在异步发送消息的同时能够对异常情况进行处理,生产者提供了回调支持。下面是使用回调的一个例子。

private class DemoProducerCallback implements Callback {➊ @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if (e != null) { e.printStackTrace(); ➋ } } } ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA"); ➌ producer.send(record, new DemoProducerCallback()); ➍

❶ 为了使用回调,需要一个实现了 org.apache.kafka.clients.producer.Callback 接口的类,这个接口只有一个 onCompletion 方法。

❷ 如果 Kafka 返回一个错误,onCompletion 方法会抛出一个非空(non null)异常。这里我们只是简单地把它打印出来,但是在生产环境应该有更好的处理方式。

❸ 记录与之前的一样。

❹ 在发送消息时传进去一个回调对象。





3.4 生产者的配置


到目前为止,我们只介绍了生产者的几个必要配置参数——bootstrap.servers API 以及序列化器。

生产者还有很多可配置的参数,在 Kafka 文档里都有说明,它们大部分都有合理的默认值,所以没有必要去修改它们。不过有几个参数在内存使用、性能和可靠性方面对生产者影响比较大,接下来我们会一一说明。

acks

acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的。这个参数对消息丢失的可能性有重要影响。该参数有如下选项。

如果 acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。也就是说,如果当中出现了问题,导致服务器没有收到消息,那么生产者就无从得知,消息也就丢失了。不过,因为生产者不需要等待服务器的响应,所以它可以以网络能够支持的最大速度发送消息,从而达到很高的吞吐量。

如果 acks=1,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。如果消息无法到达首领节点(比如首领节点崩溃,新的首领还没有被选举出来),生产者会收到一个错误响应,为了避免数据丢失,生产者会重发消息。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用 Future 对象的 get() 方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。

如果 acks=all,只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行(第 5 章将讨论更多的细节)。不过,它的延迟比 acks=1 时更高,因为我们要等待不只一个服务器节点接收消息。





buffer.memory

该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。这个时候, send() 方法调用要么被阻塞,要么抛出异常,取决于如何设置 block.on.buffer.full 参数(在 0.9.0.0 版本里被替换成了 max.block.ms,表示在抛出异常之前可以阻塞一段时间)。



compression.type

默认情况下,消息发送时不会被压缩。该参数可以设置为 snappy、gzip 或 lz4,它指定了消息被发送给 broker 之前使用哪一种压缩算法进行压缩。snappy 压缩算法由 Google 发明,它占用较少的 CPU,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。gzip 压缩算法一般会占用较多的 CPU,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。



retries

生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下,retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。默认情况下,生产者会在每次重试之间等待 100ms,不过可以通过 retry.backoff.ms 参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间),让总的重试时间比 Kafka 集群从崩溃中恢复的时间长,否则生产者会过早地放弃重试。不过有些错误不是临时性错误,没办法通过重试来解决(比如“消息太大”错误)。一般情况下,因为生产者会自动进行重试,所以就没必要在代码逻辑里处理那些可重试的错误。你只需要处理那些不可重试的错误或重试次数超出上限的情况。



batch.size

当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。当批次被填满,批次里的所有消息会被发送出去。不过生产者并不一定都会等到批次被填满才发送,半满的批次,甚至只包含一个消息的批次也有可能被发送。所以就算把批次大小设置得很大,也不会造成延迟,只是会占用更多的内存而已。但如果设置得太小,因为生产者需要更频繁地发送消息,会增加一些额外的开销。



linger.ms

该参数指定了生产者在发送批次之前等待更多消息加入批次的时间。KafkaProducer 会在批次填满或 linger.ms 达到上限时把批次发送出去。默认情况下,只要有可用的线程,生产者就会把消息发送出去,就算批次里只有一个消息。把 linger.ms 设置成比 0 大的数,让生产者在发送批次之前等待一会儿,使更多的消息加入到这个批次。虽然这样会增加延迟,但也会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了)。



client.id

该参数可以是任意的字符串,服务器会用它来识别消息的来源,还可以用在日志和配额指标里。



max.in.flight.requests.per.connection

该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。



timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms

request.timeout.ms 指定了生产者在发送数据时等待服务器返回响应的时间,metadata.fetch.timeout.ms 指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回一个错误(抛出异常或执行回调)。timeout.ms 指定了 broker 等待同步副本返回消息确认的时间,与 asks 的配置相匹配——如果在指定时间内没有收到同步副本的确认,那么 broker 就会返回一个错误。



max.block.ms

该参数指定了在调用 send() 方法或使用 partitionsFor() 方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法就会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。



max.request.size

该参数用于控制生产者发送的请求大小。它可以指能发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1MB,那么可以发送的单个最大消息为 1MB,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1KB。另外,broker 对可接收的消息最大值也有自己的限制(message.max.bytes),所以两边的配置最好可以匹配,避免生产者发送的消息被 broker 拒绝。



receive.buffer.bytes 和 send.buffer.bytes

这两个参数分别指定了 TCP socket 接收和发送数据包的缓冲区大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心,那么可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。





 顺序保证

Kafka 可以保证同一个分区里的消息是有序的。也就是说,如果生产者按照一定的顺序发送消息,broker 就会按照这个顺序把它们写入分区,消费者也会按照同样的顺序读取它们。在某些情况下,顺序是非常重要的。例如,往一个账户存入 100 元再取出来,这个与先取钱再存钱是截然不同的!不过,有些场景对顺序不是很敏感。

如果把 retries 设为非零整数,同时把 max.in.flight.requests.per.connection 设为比 1 大的数,那么,如果第一个批次消息写入失败,而第二个批次写入成功,broker 会重试写入第一个批次。如果此时第一个批次也写入成功,那么两个批次的顺序就反过来了。

一般来说,如果某些场景要求消息是有序的,那么消息是否写入成功也是很关键的,所以不建议把 retries 设为 0。可以把 max.in.flight.requests.per.connection 设为 1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给 broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。





3.5 序列化器


我们已经在之前的例子里看到,创建一个生产者对象必须指定序列化器。我们已经知道如何使用默认的字符串序列化器,Kafka 还提供了整型和字节数组序列化器,不过它们还不足以满足大部分场景的需求。到最后,我们需要序列化的记录类型会越来越多。

接下来演示如何开发自己的序列化器,并介绍 Avro 序列化器作为推荐的备选方案。





3.5.1 自定义序列化器


如果发送到 Kafka 的对象不是简单的字符串或整型,那么可以使用序列化框架来创建消息记录,如 Avro、Thrift 或 Protobuf,或者使用自定义序列化器。我们强烈建议使用通用的序列化框架。不过,为了了解序列化器的工作原理,也为了说明为什么要使用序列化框架,让我们一起来看看如何自定义一个序列化器。

假设你创建了一个简单的类来表示一个客户:

public class Customer { private int customerID; private String customerName; public Customer(int ID, String name) { this.customerID = ID; this.customerName = name; } public int getID() { return customerID; } public String getName() { return customerName; } }

现在我们要为这个类创建一个序列化器,它看起来可能是这样的:

import org.apache.kafka.common.errors.SerializationException; import java.nio.ByteBuffer; import java.util.Map; public class CustomerSerializer implements Serializer<Customer> { @Override public void configure(Map configs, boolean isKey) { // 不做任何配置 } @Override /** Customer对象被序列化成: 表示customerID的4字节整数 表示customerName长度的4字节整数(如果customerName为空,则长度为0) 表示customerName的N个字节 */ public byte[] serialize(String topic, Customer data) { try { byte[] serializedName; int stringSize; if (data == null) return null; else { if (data.getName() != null) { serializedName = data.getName().getBytes("UTF-8"); stringSize = serializedName.length; } else { serializedName = new byte[0]; stringSize = 0; } } ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize); buffer.putInt(data.getID()); buffer.putInt(stringSize); buffer.put(serializedName); return buffer.array(); } catch (Exception e) { throw new SerializationException("Error when serializing Customer to byte[] " + e); } } @Override public void close() { // 不需要关闭任何东西 } }

只要使用这个 CustomerSerializer,就可以把消息记录定义成 ProducerRecord<String, Customer>,并且可以直接把 Customer 对象传给生产者。这个例子很简单,不过代码看起来太脆弱了——如果我们有多种类型的消费者,可能需要把 customerID 字段变成长整型,或者为 Customer 添加 startDate 字段,这样就会出现新旧消息的兼容性问题。在不同版本的序列化器和反序列化器之间调试兼容性问题着实是个挑战——你需要比较原始的字节数组。更糟糕的是,如果同一个公司的不同团队都需要往 Kafka 写入 Customer 数据,那么他们就需要使用相同的序列化器,如果序列化器发生改动,他们几乎要在同一时间修改代码。

基于以上几点原因,我们不建议使用自定义序列化器,而是使用已有的序列化器和反序列化器,比如 JSON、Avro、Thrift 或 Protobuf。下面我们将会介绍 Avro,然后演示如何序列化 Avro 记录并发送给 Kafka。





3.5.2 使用Avro序列化


Apache Avro(以下简称 Avro)是一种与编程语言无关的序列化格式。Doug Cutting 创建了这个项目,目的是提供一种共享数据文件的方式。

Avro 数据通过与语言无关的 schema 来定义。schema 通过 JSON 来描述,数据被序列化成二进制文件或 JSON 文件,不过一般会使用二进制文件。Avro 在读写文件时需要用到 schema,schema 一般会被内嵌在数据文件里。

Avro 有一个很有意思的特性是,当负责写消息的应用程序使用了新的 schema,负责读消息的应用程序可以继续处理消息而无需做任何改动,这个特性使得它特别适合用在像 Kafka 这样的消息系统上。

假设最初的 schema 是这样的:

{"namespace": "customerManagement.avro", "type": "record", "name": "Customer", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "faxNumber", "type": ["null", "string"], "default": "null"} ➊ ] }

➊ id 和 name 字段是必需的,faxNumber 是可选的,默认为 null。

假设我们已经使用了这个 schema 几个月的时间,并用它生成了几个太字节的数据。现在,我们决定在新版本里做一些修改。因为在 21 世纪不再需要 faxNumber 字段,需要用 email 字段来代替它。

新的 schema 如下:

{"namespace": "customerManagement.avro", "type": "record", "name": "Customer", "fields": [ {"name": "id", "type": "int"}, {"name": "name", "type": "string"}, {"name": "email", "type": ["null", "string"], "default": "null"} ] }

更新到新版的 schema 后,旧记录仍然包含 faxNumber 字段,而新记录则包含 email 字段。部分负责读取数据的应用程序进行了升级,那么它们是如何处理这些变化的呢?

在应用程序升级之前,它们会调用类似 getName()、getId() 和 getFaxNumber() 这样的方法。如果碰到使用新 schema 构建的消息,getName() 和 getId() 方法仍然能够正常返回,但 getFaxNumber() 方法会返回 null,因为消息里不包含传真号码。

在应用程序升级之后,getEmail() 方法取代了 getFaxNumber() 方法。如果碰到一个使用旧 schema 构建的消息,那么 getEmail() 方法会返回 null,因为旧消息不包含邮件地址。

现在可以看出使用 Avro 的好处了:我们修改了消息的 schema,但并没有更新所有负责读取数据的应用程序,而这样仍然不会出现异常或阻断性错误,也不需要对现有数据进行大幅更新。

不过这里有以下两个需要注意的地方。

用于写入数据和读取数据的 schema 必须是相互兼容的。Avro 文档提到了一些兼容性原则。

反序列化器需要用到用于写入数据的 schema,即使它可能与用于读取数据的 schema 不一样。Avro 数据文件里就包含了用于写入数据的 schema,不过在 Kafka 里有一种更好的处理方式,下一小节我们会介绍它。





3.5.3 在Kafka里使用Avro


Avro 的数据文件里包含了整个 schema,不过这样的开销是可接受的。但是如果在每条 Kafka 记录里都嵌入 schema,会让记录的大小成倍地增加。不过不管怎样,在读取记录时仍然需要用到整个 schema,所以要先找到 schema。我们遵循通用的结构模式并使用“schema 注册表”来达到目的。schema 注册表并不属于 Kafka,现在已经有一些开源的 schema 注册表实现。在这个例子里,我们使用的是 Confluent Schema Registry。该注册表的代码可以在 GitHub 上找到,你也可以把它作为 Confluent 平台的一部分进行安装。如果你决定使用这个注册表,可以参考它的文档。

我们把所有写入数据需要用到的 schema 保存在注册表里,然后在记录里引用 schema 的标识符。负责读取数据的应用程序使用标识符从注册表里拉取 schema 来反序列化记录。序列化器和反序列化器分别负责处理 schema 的注册和拉取。Avro 序列化器的使用方法与其他序列化器是一样的。



图 3-2:Avro 记录的序列化和反序列化流程图

下面的例子演示了如何把生成的 Avro 对象发送到 Kafka(关于如何使用 Avro 生成代码请参考 Avro 文档):

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); ➊ props.put("schema.registry.url", schemaUrl); ➋ String topic = "customerContacts"; Producer<String, Customer> producer = new KafkaProducer<String, Customer>(props); ➌ // 不断生成事件,直到有人按下Ctrl+C组合键 while (true) { Customer customer = CustomerGenerator.getNext(); System.out.println("Generated customer " + customer.toString()); ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer); ➍ producer.send(record); ➎ }

❶ 使用 Avro 的 KafkaAvroSerializer 来序列化对象。注意,AvroSerializer 也可以处理原语,这就是我们以后可以使用字符串作为记录键、使用客户对象作为值的原因。

❷ schema.registry.url 是一个新的参数,指向 schema 的存储位置。

❸ Customer 是生成的对象。我们会告诉生产者 Customer 对象就是记录的值。

❹ 实例化一个 ProducerRecord 对象,并指定 Customer 为值的类型,然后再传给它一个 Customer 对象。

❺ 把 Customer 对象作为记录发送出去,KafkaAvroSerializer 会处理剩下的事情。

如果你选择使用一般的 Avro 对象而非生成的 Avro 对象该怎么办?不用担心,这个时候你只需提供 schema 就可以了:

Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); ➊ props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); props.put("schema.registry.url", url); ➋ String schemaString = "{\"namespace\": \"customerManagement.avro\", \"type\": \"record\", " + ➌ "\"name\": \"Customer\"," + "\"fields\": [" + "{\"name\": \"id\", \"type\": \"int\"}," + "{\"name\": \"name\", \"type\": \"string\"}," + "{\"name\": \"email\", \"type\": [\"null\",\"string \"], \"default\":\"null\" }" + "]}"; Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props); ➍ Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(schemaString); for (int nCustomers = 0; nCustomers < customers; nCustomers++) { String name = "exampleCustomer" + nCustomers; String email = "example" + nCustomers + "@example.com"; GenericRecord customer = new GenericData.Record(schema); ➎ customer.put("id", nCustomers); customer.put("name", name); customer.put("email", email); ProducerRecord<String, GenericRecord> data = new ProducerRecord<String, GenericRecord>("customerContacts", name, customer); producer.send(data); } }

❶ 仍然使用同样的 KafkaAvroSerializer。

❷ 提供同样的 schema 注册表 URI。

❸ 这里需要提供 Avro schema,因为我们没有使用 Avro 生成的对象。

❹ 对象类型是 Avro GenericRecord,我们通过 schema 和需要写入的数据来初始化它。

❺ ProducerRecord 的值就是一个 GenericRecord 对象,它包含了 schema 和数据。序列化器知道如何从记录里获取 schema,把它保存到注册表里,并用它序列化对象数据。





3.6 分区


在之前的例子里,ProducerRecord 对象包含了目标主题、键和值。Kafka 的消息是一个个键值对,ProducerRecord 对象可以只包含目标主题和值,键可以设置为默认的 null,不过大多数应用程序会用到键。键有两个用途:可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区。拥有相同键的消息将被写到同一个分区。也就是说,如果一个进程只从一个主题的分区读取数据(第 4 章会介绍更多细节),那么具有相同键的所有记录都会被该进程读取。要创建一个包含键值的记录,只需像下面这样创建 ProducerRecord 对象:

ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");

如果要创建键为 null 的消息,不指定键就可以了:

ProducerRecord<Integer, String> record = new ProducerRecord<>("CustomerCountry", "USA"); ➊

➊ 这里的键被设为 null。

如果键值为 null,并且使用了默认的分区器,那么记录将被随机地发送到主题内各个可用的分区上。分区器使用轮询(Round Robin)算法将消息均衡地分布到各个分区上。

如果键不为空,并且使用了默认的分区器,那么 Kafka 会对键进行散列(使用 Kafka 自己的散列算法,即使升级 Java 版本,散列值也不会发生变化),然后根据散列值把消息映射到特定的分区上。这里的关键之处在于,同一个键总是被映射到同一个分区上,所以在进行映射时,我们会使用主题所有的分区,而不仅仅是可用的分区。这也意味着,如果写入数据的分区是不可用的,那么就会发生错误。但这种情况很少发生。我们将在第 6 章讨论 Kafka 的复制功能和可用性。

只有在不改变主题分区数量的情况下,键与分区之间的映射才能保持不变。举个例子,在分区数量保持不变的情况下,可以保证用户 045189 的记录总是被写到分区 34。在从分区读取数据时,可以进行各种优化。不过,一旦主题增加了新的分区,这些就无法保证了——旧数据仍然留在分区 34,但新的记录可能被写到其他分区上。如果要使用键来映射分区,那么最好在创建主题的时候就把分区规划好(第 2 章介绍了如何确定合适的分区数量),而且永远不要增加新分区。

实现自定义分区策略

我们已经讨论了默认分区器的特点,它是使用次数最多的分区器。不过,除了散列分区之外,有时候也需要对数据进行不一样的分区。假设你是一个 B2B 供应商,你有一个大客户,它是手持设备 Banana 的制造商。Banana 占据了你整体业务 10% 的份额。如果使用默认的散列分区算法,Banana 的账号记录将和其他账号记录一起被分配给相同的分区,导致这个分区比其他分区要大一些。服务器可能因此出现存储空间不足、处理缓慢等问题。我们需要给 Banana 分配单独的分区,然后使用散列分区算法处理其他账号。

下面是一个自定义分区器的例子:

import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.record.InvalidRecordException; import org.apache.kafka.common.utils.Utils; public class BananaPartitioner implements Partitioner { public void configure(Map<String, ?> configs) {} ➊ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if ((keyBytes == null) || (!(key instanceOf String))) ➋ throw new InvalidRecordException("We expect all messages to have customer name as key") if (((String) key).equals("Banana")) return numPartitions; // Banana总是被分配到最后一个分区 // 其他记录被散列到其他分区 return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)) } public void close() {} }

❶ Partitioner 接口包含了 configure、partition 和 close 这 3 个方法。这里我们只实现 partition 方法,不过我们真不应该在 partition 方法里硬编码客户的名字,而应该通过 configure 方法传进来。

❷ 我们只接受字符串作为键,如果不是字符串,就抛出异常。





3.7 旧版的生产者API


在这一章,我们讨论了生产者的 Java 客户端,它是 org.apache.kafka.clients 包的一部分。在写到这一章的时候,Kafka 还有两个旧版的 Scala 客户端,它们是 Kafka.producer 包的一部分,同时也是 Kafka 的核心模块,它们是 SyncProducer(根据 acks 参数的具体配置情况,在发送更多的消息之前,它会等待服务器对已发消息或批次进行确认)和 AsyncProducer(在后台将消息分为不同的批次,使用单独的线程发送这些批次,不为客户端提供发送结果)。

因为当前版本的生产者 API 同时支持上述两种发送方式,而且为开发者提供了更高的可靠性和灵活性,所以我们不再讨论旧版的 API。如果你想使用它们,那么在使用之前请再三考虑,如果确定要使用,可以从 Kafka 文档中了解更多的信息。





3.8 总结


我们以一个生产者示例开始了本章的内容——使用 10 行代码将消息发送到 Kafka。然后我们在代码中加入错误处理逻辑,并介绍了同步和异步两种发送方式。接下来,我们介绍了生产者的一些重要配置参数以及它们对生产者行为的影响。我们还讨论了用于控制消息格式的序列化器,并深入探讨了 Avro——一种在 Kafka 中得到广泛应用的序列化方式。最后,我们讨论了 Kafka 的分区机制,并给出了一个自定义分区的例子。

现在我们已经知道如何向 Kafka 写入消息,在第 4 章,我们将学习如何从 Kafka 读取消息。





第 4 章 Kafka 消费者——从 Kafka 读取数据


应用程序使用 KafkaConsumer 向 Kafka 订阅主题,并从订阅的主题上接收消息。从 Kafka 读取数据不同于从其他消息系统读取数据,它涉及一些独特的概念和想法。如果不先理解这些概念,就难以理解如何使用消费者 API。所以我们接下来先解释这些重要的概念,然后再举几个例子,演示如何使用消费者 API 实现不同的应用程序。





4.1 KafkaConsumer概念


要想知道如何从 Kafka 读取消息,需要先了解消费者和消费者群组的概念。以下章节将解释这些概念。





4.1.1 消费者和消费者群组


假设我们有一个应用程序需要从一个 Kafka 主题读取消息并验证这些消息,然后再把它们保存起来。应用程序需要创建一个消费者对象,订阅主题并开始接收消息,然后验证消息并保存结果。过了一阵子,生产者往主题写入消息的速度超过了应用程序验证数据的速度,这个时候该怎么办?如果只使用单个消费者处理消息,应用程序会远跟不上消息生成的速度。显然,此时很有必要对消费者进行横向伸缩。就像多个生产者可以向相同的主题写入消息一样,我们也可以使用多个消费者从同一个主题读取消息,对消息进行分流。

Kafka 消费者从属于消费者群组。一个群组里的消费者订阅的是同一个主题,每个消费者接收主题一部分分区的消息。

假设主题 T1 有 4 个分区,我们创建了消费者 C1,它是群组 G1 里唯一的消费者,我们用它订阅主题 T1。消费者 C1 将收到主题 T1 全部 4 个分区的消息,如图 4-1 所示。



图 4-1:1 个消费者收到 4 个分区的消息

如果在群组 G1 里新增一个消费者 C2,那么每个消费者将分别从两个分区接收消息。我们假设消费者 C1 接收分区 0 和分区 2 的消息,消费者 C2 接收分区 1 和分区 3 的消息,如图 4-2 所示。



图 4-2:2 个消费者收到 4 个分区的消息

如果群组 G1 有 4 个消费者,那么每个消费者可以分配到一个分区,如图 4-3 所示。



图 4-3:4 个消费者收到 4 个分区的消息

如果我们往群组里添加更多的消费者,超过主题的分区数量,那么有一部分消费者就会被闲置,不会接收到任何消息,如图 4-4 所示。



图 4-4:5 个消费者收到 4 个分区的消息

往群组里增加消费者是横向伸缩消费能力的主要方式。Kafka 消费者经常会做一些高延迟的操作,比如把数据写到数据库或 HDFS,或者使用数据进行比较耗时的计算。在这些情况下,单个消费者无法跟上数据生成的速度,所以可以增加更多的消费者,让它们分担负载,每个消费者只处理部分分区的消息,这就是横向伸缩的主要手段。我们有必要为主题创建大量的分区,在负载增长时可以加入更多的消费者。不过要注意,不要让消费者的数量超过主题分区的数量,多余的消费者只会被闲置。第 2 章介绍了如何为主题选择合适的分区数量。

除了通过增加消费者来横向伸缩单个应用程序外,还经常出现多个应用程序从同一个主题读取数据的情况。实际上,Kafka 设计的主要目标之一,就是要让 Kafka 主题里的数据能够满足企业各种应用场景的需求。在这些场景里,每个应用程序可以获取到所有的消息,而不只是其中的一部分。只要保证每个应用程序有自己的消费者群组,就可以让它们获取到主题所有的消息。不同于传统的消息系统,横向伸缩 Kafka 消费者和消费者群组并不会对性能造成负面影响。

在上面的例子里,如果新增一个只包含一个消费者的群组 G2,那么这个消费者将从主题 T1 上接收所有的消息,与群组 G1 之间互不影响。群组 G2 可以增加更多的消费者,每个消费者可以消费若干个分区,就像群组 G1 那样,如图 4-5 所示。总的来说,群组 G2 还是会接收到所有消息,不管有没有其他群组存在。

简而言之,为每一个需要获取一个或多个主题全部消息的应用程序创建一个消费者群组,然后往群组里添加消费者来伸缩读取能力和处理能力,群组里的每个消费者只处理一部分消息。



图 4-5:两个消费者群组对应一个主题





4.1.2 消费者群组和分区再均衡


我们已经从上一个小节了解到,群组里的消费者共同读取主题的分区。一个新的消费者加入群组时,它读取的是原本由其他消费者读取的消息。当一个消费者被关闭或发生崩溃时,它就离开群组,原本由它读取的分区将由群组里的其他消费者来读取。在主题发生变化时,比如管理员添加了新的分区,会发生分区重分配。

分区的所有权从一个消费者转移到另一个消费者,这样的行为被称为再均衡。再均衡非常重要,它为消费者群组带来了高可用性和伸缩性(我们可以放心地添加或移除消费者),不过在正常情况下,我们并不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。我们将在本章讨论如何进行安全的再均衡,以及如何避免不必要的再均衡。

消费者通过向被指派为群组协调器的 broker(不同的群组可以有不同的协调器)发送心跳来维持它们和群组的从属关系以及它们对分区的所有权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区里的消息。消费者会在轮询消息(为了获取消息)或提交偏移量时发送心跳。如果消费者停止发送心跳的时间足够长,会话就会过期,群组协调器认为它已经死亡,就会触发一次再均衡。

如果一个消费者发生崩溃,并停止读取消息,群组协调器会等待几秒钟,确认它死亡了才会触发再均衡。在这几秒钟时间里,死掉的消费者不会读取分区里的消息。在清理消费者时,消费者会通知协调器它将要离开群组,协调器会立即触发一次再均衡,尽量降低处理停顿。在本章的后续部分,我们将讨论一些用于控制发送心跳频率和会话过期时间的配置参数,以及如何根据实际需要来配置这些参数。

心跳行为在最近版本中的变化

在 0.10.1 版本里,Kafka 社区引入了一个独立的心跳线程,可以在轮询消息的空档发送心跳。这样一来,发送心跳的频率(也就是消费者群组用于检测发生崩溃的消费者或不再发送心跳的消费者的时间)与消息轮询的频率(由处理消息所花费的时间来确定)之间就是相互独立的。在新版本的 Kafka 里,可以指定消费者在离开群组并触发再均衡之前可以有多长时间不进行消息轮询,这样可以避免出现活锁(livelock),比如有时候应用程序并没有崩溃,只是由于某些原因导致无法正常运行。这个配置与 session.timeout.ms 是相互独立的,后者用于控制检测消费者发生崩溃的时间和停止发送心跳的时间。

本章的剩余部分将会讨论使用旧版本 Kafka 会面临的一些问题,以及如何解决这些问题。本章还包括如何应对需要较长时间来处理消息的情况的讨论,这些与 0.10.1 或更高版本的 Kafka 没有太大关系。如果你使用的是较新版本的Kafka,并且需要处理耗费较长时间的消息,只需要加大 max.poll.interval.ms 的值来增加轮询间隔的时长。





 分配分区是怎样的一个过程

当消费者要加入群组时,它会向群组协调器发送一个 JoinGroup 请求。第一个加入群组的消费者将成为“群主”。群主从协调器那里获得群组的成员列表(列表中包含了所有最近发送过心跳的消费者,它们被认为是活跃的),并负责给每一个消费者分配分区。它使用一个实现了 PartitionAssignor 接口的类来决定哪些分区应该被分配给哪个消费者。

Kafka 内置了两种分配策略,在后面的配置参数小节我们将深入讨论。分配完毕之后,群主把分配情况列表发送给群组协调器,协调器再把这些信息发送给所有消费者。每个消费者只能看到自己的分配信息,只有群主知道群组里所有消费者的分配信息。这个过程会在每次再均衡时重复发生。





4.2 创建Kafka消费者


在读取消息之前,需要先创建一个 KafkaConsumer 对象。创建 KafkaConsumer 对象与创建 KafkaProducer 对象非常相似——把想要传给消费者的属性放在 Properties 对象里。本章后续部分会深入讨论所有的属性。在这里,我们只需要使用 3 个必要的属性:bootstrap.servers、key.deserializer 和 value.deserializer。

第 1 个属性 bootstrap.servers 指定了 Kafka 集群的连接字符串。它的用途与在 KafkaProducer 中的用途是一样的,可以参考第 3 章了解它的详细定义。另外两个属性 key.deserializer 和 value.deserializer 与生产者的 serializer 定义也很类似,不过它们不是使用指定的类把 Java 对象转成字节数组,而是使用指定的类把字节数组转成 Java 对象。

第 4 个属性 group.id 不是必需的,不过我们现在姑且认为它是必需的。它指定了 KafkaConsumer 属于哪一个消费者群组。创建不属于任何一个群组的消费者也是可以的,只是这样做不太常见,在本书的大部分章节,我们都假设消费者是属于某个群组的。

下面的代码片段演示了如何创建一个 KafkaConsumer 对象:

Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

如果在第 3 章看过如何创建生产者,就应该很熟悉上面的这段代码。我们假设消费的键和值都是字符串类型,所以使用的是内置的 StringDeserializer,并且使用字符串类型创建了 KafkaConsumer 对象。唯一不同的是新增了 group.id 属性,它指定了消费者所属群组的名字。





4.3 订阅主题


创建好消费者之后,下一步可以开始订阅主题了。subscribe() 方法接受一个主题列表作为参数,使用起来很简单:

consumer.subscribe(Collections.singletonList("customerCountries")); ➊

➊ 为了简单起见,我们创建了一个只包含单个元素的列表,主题的名字叫作“customer Countries”。

我们也可以在调用 subscribe() 方法时传入一个正则表达式。正则表达式可以匹配多个主题,如果有人创建了新的主题,并且主题的名字与正则表达式匹配,那么会立即触发一次再均衡,消费者就可以读取新添加的主题。如果应用程序需要读取多个主题,并且可以处理不同类型的数据,那么这种订阅方式就很管用。在 Kafka 和其他系统之间复制数据时,使用正则表达式的方式订阅多个主题是很常见的做法。

要订阅所有与 test 相关的主题,可以这样做:

consumer.subscribe("test.*");





4.4 轮询


消息轮询是消费者 API 的核心,通过一个简单的轮询向服务器请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,开发者只需要使用一组简单的 API 来处理从分区返回的数据。消费者代码的主要部分如下所示:

try { while (true) { ➊ ConsumerRecords<String, String> records = consumer.poll(100); ➋ for (ConsumerRecord<String, String> record : records) ➌ { log.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); int updatedCount = 1; if (custCountryMap.countainsValue(record.value())) { updatedCount = custCountryMap.get(record.value()) + 1; } custCountryMap.put(record.value(), updatedCount) JSONObject json = new JSONObject(custCountryMap); System.out.println(json.toString(4)) ➍ } } } finally { consumer.close(); ➎ }

❶ 这是一个无限循环。消费者实际上是一个长期运行的应用程序,它通过持续轮询向 Kafka 请求数据。稍后我们会介绍如何退出循环,并关闭消费者。

❷ 这一行代码非常重要。就像鲨鱼停止移动就会死掉一样,消费者必须持续对 Kafka 进行轮询,否则会被认为已经死亡,它的分区会被移交给群组里的其他消费者。传给 poll() 方法的参数是一个超时时间,用于控制 poll() 方法的阻塞时间(在消费者的缓冲区里没有可用数据时会发生阻塞)。如果该参数被设为 0,poll() 会立即返回,否则它会在指定的毫秒数内一直等待 broker 返回数据。

❸ poll() 方法返回一个记录列表。每条记录都包含了记录所属主题的信息、记录所在分区的信息、记录在分区里的偏移量,以及记录的键值对。我们一般会遍历这个列表,逐条处理这些记录。poll() 方法有一个超时参数,它指定了方法在多久之后可以返回,不管有没有可用的数据都要返回。超时时间的设置取决于应用程序对响应速度的要求,比如要在多长时间内把控制权归还给执行轮询的线程。

❹ 把结果保存起来或者对已有的记录进行更新,处理过程也随之结束。在这里,我们的目的是统计来自各个地方的客户数量,所以使用了一个散列表来保存结果,并以 JSON 的格式打印结果。在真实场景里,结果一般会被保存到数据存储系统里。

❺ 在退出应用程序之前使用 close() 方法关闭消费者。网络连接和 socket 也会随之关闭,并立即触发一次再均衡,而不是等待群组协调器发现它不再发送心跳并认定它已死亡,因为那样需要更长的时间,导致整个群组在一段时间内无法读取消息。

轮询不只是获取数据那么简单。在第一次调用新消费者的 poll() 方法时,它会负责查找 GroupCoordinator,然后加入群组,接受分配的分区。如果发生了再均衡,整个过程也是在轮询期间进行的。当然,心跳也是从轮询里发送出去的。所以,我们要确保在轮询期间所做的任何处理工作都应该尽快完成。

 线程安全

在同一个群组里,我们无法让一个线程运行多个消费者,也无法让多个线程安全地共享一个消费者。按照规则,一个消费者使用一个线程。如果要在同一个消费者群组里运行多个消费者,需要让每个消费者运行在自己的线程里。最好是把消费者的逻辑封装在自己的对象里,然后使用 Java 的 ExecutorService 启动多个线程,使每个消费者运行在自己的线程上。Confluent 的博客(https://www.confluent.io/blog/)上有一个教程介绍如何处理这种情况。





4.5 消费者的配置


到目前为止,我们学习了如何使用消费者 API,不过只介绍了几个配置属性——bootstrap.servers、group.id、key.deserializer 和 value.deserializer。Kafka 的文档列出了所有与消费者相关的配置说明。大部分参数都有合理的默认值,一般不需要修改它们,不过有一些参数与消费者的性能和可用性有很大关系。接下来介绍这些重要的属性。

fetch.min.bytes

该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题不是很活跃的时候(或者一天里的低谷时段)就不需要来来回回地处理消息。如果没有很多可用数据,但消费者的 CPU 使用率却很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值设置得大一点可以降低 broker 的工作负载。



fetch.max.wait.ms

我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 feth.max.wait.ms 则用于指定 broker 的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致 500ms 的延迟。如果要降低潜在的延迟(为了满足 SLA),可以把该参数值设置得小一些。如果 fetch.max.wait.ms 被设为 100ms,并且 fetch.min.bytes 被设为 1MB,那么 Kafka 在收到消费者的请求后,要么返回 1MB 数据,要么在 100ms 后返回所有可用的数据,就看哪个条件先得到满足。



max.partition.fetch.bytes

该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB,也就是说,KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes 指定的字节。如果一个主题有 20 个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置)大,否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。消费者需要频繁调用 poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把 max.partition.fetch.bytes 值改小,或者延长会话过期时间。



session.timeout.ms

该属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。该属性与 heartbeat.interval.ms 紧密相关。heartbeat.interval.ms 指定了 poll() 方法向协调器发送心跳的频率,session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,一般需要同时修改这两个属性,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 1s。把 session.timeout.ms 值设得比默认值小,可以更快地检测和恢复崩溃的节点,不过长时间的轮询或垃圾收集可能导致非预期的再均衡。把该属性的值设置得大一些,可以减少意外的再均衡,不过检测节点崩溃需要更长的时间。



auto.offset.reset

该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。它的默认值是 latest,意思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)。另一个值是 earliest,意思是说,在偏移量无效的情况下,消费者将从起始位置读取分区的记录。



enable.auto.commit

我们稍后将介绍几种不同的提交偏移量的方式。该属性指定了消费者是否自动提交偏移量,默认值是 true。为了尽量避免出现重复数据和数据丢失,可以把它设为 false,由自己控制何时提交偏移量。如果把它设为 true,还可以通过配置 auto.commit.interval.ms 属性来控制提交的频率。



partition.assignment.strategy

我们知道,分区会被分配给群组里的消费者。PartitionAssignor 根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者。Kafka 有两个默认的分配策略。

Range

  该策略会把主题的若干个连续的分区分配给消费者。假设消费者 C1 和消费者 C2 同时订阅了主题 T1 和主题 T2,并且每个主题有 3 个分区。那么消费者 C1 有可能分配到这两个主题的分区 0 和分区 1,而消费者 C2 分配到这两个主题的分区 2。因为每个主题拥有奇数个分区,而分配是在主题内独立完成的,第一个消费者最后分配到比第二个消费者更多的分区。只要使用了 Range 策略,而且分区数量无法被消费者数量整除,就会出现这种情况。

RoundRobin

  该策略把主题的所有分区逐个分配给消费者。如果使用 RoundRobin 策略来给消费者 C1 和消费者 C2 分配分区,那么消费者 C1 将分到主题 T1 的分区 0 和分区 2 以及主题 T2 的分区 1,消费者 C2 将分配到主题 T1 的分区 1 以及主题 T2 的分区 0 和分区 2。一般来说,如果所有消费者都订阅相同的主题(这种情况很常见),RoundRobin 策略会给所有消费者分配相同数量的分区(或最多就差一个分区)。

可以通过设置 partition.assignment.strategy 来选择分区策略。默认使用的是 org.apache.kafka.clients.consumer.RangeAssignor,这个类实现了 Range 策略,不过也可以把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我们还可以使用自定义策略,在这种情况下,partition.assignment.strategy 属性的值就是自定义类的名字。



client.id

该属性可以是任意字符串,broker 用它来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额里。



max.poll.records

该属性用于控制单次调用 call() 方法能够返回的记录数量,可以帮你控制在轮询里需要处理的数据量。



receive.buffer.bytes 和 send.buffer.bytes

socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设为 -1,就使用操作系统的默认值。如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。





4.6 提交和偏移量


每次调用 poll() 方法,它总是返回由生产者写入 Kafka 但还没有被消费者读取过的记录,我们因此可以追踪到哪些记录是被群组里的哪个消费者读取的。之前已经讨论过,Kafka 不会像其他 JMS 队列那样需要得到消费者的确认,这是 Kafka 的一个独特之处。相反,消费者可以使用 Kafka 来追踪消息在分区里的位置(偏移量)。

我们把更新分区当前位置的操作叫作提交。

那么消费者是如何提交偏移量的呢?消费者往一个叫作 _consumer_offset 的特殊主题发送消息,消息里包含每个分区的偏移量。如果消费者一直处于运行状态,那么偏移量就没有什么用处。不过,如果消费者发生崩溃或者有新的消费者加入群组,就会触发再均衡,完成再均衡之后,每个消费者可能分配到新的分区,而不是之前处理的那个。为了能够继续之前的工作,消费者需要读取每个分区最后一次提交的偏移量,然后从偏移量指定的地方继续处理。

如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理,如图 4-6 所示。



图 4-6:提交的偏移量小于客户端处理的最后一个消息的偏移量

如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失,如图 4-7 所示。



图 4-7:提交的偏移量大于客户端处理的最后一个消息的偏移量

所以,处理偏移量的方式对客户端会有很大的影响。

KafkaConsumer API 提供了很多种方式来提交偏移量。





4.6.1 自动提交


最简单的提交方式是让消费者自动提交偏移量。如果 enable.auto.commit 被设为 true,那么每过 5s,消费者会自动把从 poll()方法接收到的最大偏移量提交上去。提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

不过,在使用这种简便的方式之前,需要知道它将会带来怎样的结果。

假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无法完全避免的。

在使用自动提交时,每次调用轮询方法都会把上一次调用返回的偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(在调用 close() 方法之前也会进行自动提交)。一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心。

自动提交虽然方便,不过并没有为开发者留有余地来避免重复处理消息。





4.6.2 提交当前偏移量


大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性,并在发生再均衡时减少重复消息的数量。消费者 API 提供了另一种提交偏移量的方式,开发者可以在必要的时候提交当前偏移量,而不是基于时间间隔。

把 auto.commit.offset 设为 false,让应用程序决定何时提交偏移量。使用 commitSync() 提交偏移量最简单也最可靠。这个 API 会提交由 poll() 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

要记住,commitSync() 将会提交由 poll() 返回的最新偏移量,所以在处理完所有记录后要确保调用了 commitSync(),否则还是会有丢失消息的风险。如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都将被重复处理。

下面是我们在处理完最近一批消息后使用 commitSync() 方法提交偏移量的例子。

while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); ➊ } try { consumer.commitSync(); ➋ } catch (CommitFailedException e) { log.error("commit failed", e) ➌ } }

❶ 我们假设把记录内容打印出来就算处理完毕,这个是由应用程序根据具体的使用场景来决定的。

❷ 处理完当前批次的消息,在轮询更多的消息之前,调用 commitSync() 方法提交当前批次最新的偏移量。

❸ 只要没有发生不可恢复的错误,commitSync() 方法会一直尝试直至提交成功。如果提交失败,我们也只能把异常记录到错误日志里。





4.6.3 异步提交


手动提交有一个不足之处,在 broker 对提交请求作出回应之前,应用程序会一直阻塞,这样会限制应用程序的吞吐量。我们可以通过降低提交频率来提升吞吐量,但如果发生了再均衡,会增加重复消息的数量。

这个时候可以使用异步提交 API。我们只管发送提交请求,无需等待 broker 的响应。

while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(); ➊ }

➊ 提交最后一个偏移量,然后继续做其他事情。

在成功提交或碰到无法恢复的错误之前,commitSync() 会一直重试,但是 commitAsync() 不会,这也是 commitAsync() 不好的一个地方。它之所以不进行重试,是因为在它收到服务器响应的时候,可能有一个更大的偏移量已经提交成功。假设我们发出一个请求用于提交偏移量 2000,这个时候发生了短暂的通信问题,服务器收不到请求,自然也不会作出任何响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量 3000。如果 commitAsync() 重新尝试提交偏移量 2000,它有可能在偏移量 3000 之后提交成功。这个时候如果发生再均衡,就会出现重复消息。

我们之所以提到这个问题的复杂性和提交顺序的重要性,是因为 commitAsync() 也支持回调,在 broker 作出响应时会执行回调。回调经常被用于记录提交错误或生成度量指标,不过如果你要用它来进行重试,一定要注意提交的顺序。

while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(new OffsetCommitCallback() { public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) { if (e != null) log.error("Commit failed for offsets {}", offsets, e); } }); ➊ }

➊ 发送提交请求然后继续做其他事情,如果提交失败,错误信息和偏移量会被记录下来。

 重试异步提交

我们可以使用一个单调递增的序列号来维护异步提交的顺序。在每次提交偏移量之后或在回调里提交偏移量时递增序列号。在进行重试前,先检查回调的序列号和即将提交的偏移量是否相等,如果相等,说明没有新的提交,那么可以安全地进行重试。如果序列号比较大,说明有一个新的提交已经发送出去了,应该停止重试。





4.6.4 同步和异步组合提交


一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。

因此,在消费者关闭前一般会组合使用 commitAsync() 和 commitSync()。它们的工作原理如下(后面讲到再均衡监听器时,我们会讨论如何在发生再均衡前提交偏移量):

try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitAsync(); ➊ } } catch (Exception e) { log.error("Unexpected error", e); } finally { try { consumer.commitSync(); ➋ } finally { consumer.close(); } }

❶ 如果一切正常,我们使用 commitAsync() 方法来提交。这样速度更快,而且即使这次提交失败,下一次提交很可能会成功。

❷ 如果直接关闭消费者,就没有所谓的“下一次提交”了。使用 commitSync() 方法会一直重试,直到提交成功或发生无法恢复的错误。





4.6.5 提交特定的偏移量


提交偏移量的频率与处理消息批次的频率是一样的。但如果想要更频繁地提交该怎么办?如果 poll() 方法返回一大批数据,为了避免因再均衡引起的重复处理整批消息,想要在批次中间提交偏移量该怎么办?这种情况无法通过调用 commitSync() 或 commitAsync() 来实现,因为它们只会提交最后一个偏移量,而此时该批次里的消息还没有处理完。

幸运的是,消费者 API 允许在调用 commitSync() 和 commitAsync() 方法时传进去希望提交的分区和偏移量的 map。假设你处理了半个批次的消息,最后一个来自主题“customers”分区 3 的消息的偏移量是 5000,你可以调用 commitSync() 方法来提交它。不过,因为消费者可能不只读取一个分区,你需要跟踪所有分区的偏移量,所以在这个层面上控制偏移量的提交会让代码变复杂。

下面是提交特定偏移量的例子:

private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); ➊ int count = 0; ... while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); ➋ currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata")); ➌ if (count % 1000 == 0) ➍ consumer.commitAsync(currentOffsets,null); ➎ count++; } }

❶ 用于跟踪偏移量的 map。

❷ 记住,printf 只是处理消息的临时方案。

❸ 在读取每条记录之后,使用期望处理的下一个消息的偏移量更新 map 里的偏移量。下一次就从这里开始读取消息。

❹ 我们决定每处理 1000 条记录就提交一次偏移量。在实际应用中,你可以根据时间或记录的内容进行提交。

❺ 这里调用的是 commitAsync(),不过调用commitSync()也是完全可以的。当然,在提交特定偏移量时,仍然要处理可能发生的错误。





4.7 再均衡监听器


在提交偏移量一节中提到过,消费者在退出和进行分区再均衡之前,会做一些清理工作。

你会在消费者失去对一个分区的所有权之前提交最后一个已处理记录的偏移量。如果消费者准备了一个缓冲区用于处理偶发的事件,那么在失去分区所有权之前,需要处理在缓冲区累积下来的记录。你可能还需要关闭文件句柄、数据库连接等。

在为消费者分配新分区或移除旧分区时,可以通过消费者 API 执行一些应用程序代码,在调用 subscribe() 方法时传进去一个 ConsumerRebalanceListener 实例就可以了。 ConsumerRebalanceListener 有两个需要实现的方法。

(1) public void onPartitionsRevoked(Collection<TopicPartition> partitions) 方法会在再均衡开始之前和消费者停止读取消息之后被调用。如果在这里提交偏移量,下一个接管分区的消费者就知道该从哪里开始读取了。

(2) public void onPartitionsAssigned(Collection<TopicPartition> partitions) 方法会在重新分配分区之后和消费者开始读取消息之前被调用。

下面的例子将演示如何在失去分区所有权之前通过 onPartitionsRevoked() 方法来提交偏移量。在下一节,我们会演示另一个同时使用了 onPartitionsAssigned() 方法的例子。

private Map<TopicPartition, OffsetAndMetadata> currentOffsets= new HashMap<>(); private class HandleRebalance implements ConsumerRebalanceListener { ➊ public void onPartitionsAssigned(Collection<TopicPartition> partitions) { ➋ } public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets); consumer.commitSync(currentOffsets); ➌ } } try { consumer.subscribe(topics, new HandleRebalance()); ➍ while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata")); } consumer.commitAsync(currentOffsets, null); } } catch (WakeupException e) { // 忽略异常,正在关闭消费者 } catch (Exception e) { log.error("Unexpected error", e); } finally { try { consumer.commitSync(currentOffsets); } finally { consumer.close(); System.out.println("Closed consumer and we are done"); } }

❶ 首先实现 ConsumerRebalanceListener 接口。

❷ 在获得新分区后开始读取消息,不需要做其他事情。

❸ 如果发生再均衡,我们要在即将失去分区所有权时提交偏移量。要注意,提交的是最近处理过的偏移量,而不是批次中还在处理的最后一个偏移量。因为分区有可能在我们还在处理消息的时候被撤回。我们要提交所有分区的偏移量,而不只是那些即将失去所有权的分区的偏移量——因为提交的偏移量是已经处理过的,所以不会有什么问题。调用 commitSync() 方法,确保在再均衡发生之前提交偏移量。

❹ 把 ConsumerRebalanceListener 对象传给 subscribe() 方法,这是最重要的一步。





4.8 从特定偏移量处开始处理记录


到目前为止,我们知道了如何使用 poll() 方法从各个分区的最新偏移量处开始处理消息。不过,有时候我们也需要从特定的偏移量处开始读取消息。

如果你想从分区的起始位置开始读取消息,或者直接跳到分区的末尾开始读取消息,可以使用 seekToBeginning(Collection<TopicPartition> tp) 和 seekToEnd(Collection<TopicPartition> tp) 这两个方法。

不过,Kafka 也为我们提供了用于查找特定偏移量的 API。它有很多用途,比如向后回退几个消息或者向前跳过几个消息(对时间比较敏感的应用程序在处理滞后的情况下希望能够向前跳过若干个消息)。在使用 Kafka 以外的系统来存储偏移量时,它将给我们带来更大的惊喜。

试想一下这样的场景:应用程序从 Kafka 读取事件(可能是网站的用户点击事件流),对它们进行处理(可能是使用自动程序清理点击操作并添加会话信息),然后把结果保存到数据库、NoSQL 存储引擎或 Hadoop。假设我们真的不想丢失任何数据,也不想在数据库里多次保存相同的结果。

这种情况下,消费者的代码可能是这样的:

while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1); processRecord(record); storeRecordInDB(record); consumer.commitAsync(currentOffsets); } }

在这个例子里,每处理一条记录就提交一次偏移量。尽管如此,在记录被保存到数据库之后以及偏移量被提交之前,应用程序仍然有可能发生崩溃,导致重复处理数据,数据库里就会出现重复记录。

如果保存记录和偏移量可以在一个原子操作里完成,就可以避免出现上述情况。记录和偏移量要么都被成功提交,要么都不提交。如果记录是保存在数据库里而偏移量是提交到 Kafka 上,那么就无法实现原子操作。

不过,如果在同一个事务里把记录和偏移量都写到数据库里会怎样呢?那么我们就会知道记录和偏移量要么都成功提交,要么都没有,然后重新处理记录。

现在的问题是:如果偏移量是保存在数据库里而不是 Kafka 里,那么消费者在得到新分区时怎么知道该从哪里开始读取?这个时候可以使用 seek() 方法。在消费者启动或分配到新分区时,可以使用 seek() 方法查找保存在数据库里的偏移量。

下面的例子大致说明了如何使用这个 API。使用 ConsumerRebalanceListener 和 seek() 方法确保我们是从数据库里保存的偏移量所指定的位置开始处理消息的。

public class SaveOffsetsOnRebalance implements ConsumerRebalanceListener { public void onPartitionsRevoked(Collection<TopicPartition> partitions) { commitDBTransaction(); ➊ } public void onPartitionsAssigned(Collection<TopicPartition> partitions) { for(TopicPartition partition: partitions) consumer.seek(partition, getOffsetFromDB(partition)); ➋ } } } consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer)); consumer.poll(0); for (TopicPartition partition: consumer.assignment()) consumer.seek(partition, getOffsetFromDB(partition)); ➌ while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { processRecord(record); storeRecordInDB(record); storeOffsetInDB(record.topic(), record.partition(), record.offset()); ➍ } commitDBTransaction(); }

❶ 使用一个虚构的方法来提交数据库事务。大致想法是这样的:在处理完记录之后,将记录和偏移量插入数据库,然后在即将失去分区所有权之前提交事务,确保成功保存了这些信息。

❷ 使用另一个虚构的方法来从数据库获取偏移量,在分配到新分区的时候,使用 seek() 方法定位到那些记录。

❸ 订阅主题之后,开始启动消费者,我们调用一次 poll() 方法,让消费者加入到消费者群组里,并获取分配到的分区,然后马上调用 seek() 方法定位分区的偏移量。要记住, seek() 方法只更新我们正在使用的位置,在下一次调用 poll() 时就可以获得正确的消息。如果 seek() 发生错误(比如偏移量不存在),poll() 就会抛出异常。

❹ 另一个虚构的方法,这次要更新的是数据库里用于保存偏移量的表。假设更新记录的速度非常快,所以每条记录都需要更新一次数据库,但提交的速度比较慢,所以只在每个批次末尾提交一次。这里可以通过很多种方式进行优化。

通过把偏移量和记录保存到同一个外部系统来实现单次语义可以有很多种方式,不过它们都需要结合使用 ConsumerRebalanceListener 和 seek() 方法来确保能够及时保存偏移量,并保证消费者总是能够从正确的位置开始读取消息。





4.9 如何退出


在之前讨论轮询时就说过,不需要担心消费者会在一个无限循环里轮询消息,我们会告诉消费者如何优雅地退出循环。

如果确定要退出循环,需要通过另一个线程调用 consumer.wakeup() 方法。如果循环运行在主线程里,可以在 ShutdownHook 里调用该方法。要记住,consumer.wakeup() 是消费者唯一一个可以从其他线程里安全调用的方法。调用 consumer.wakeup() 可以退出 poll(),并抛出 WakeupException 异常,或者如果调用 consumer.wakeup() 时线程没有等待轮询,那么异常将在下一轮调用 poll() 时抛出。我们不需要处理 WakeupException,因为它只是用于跳出循环的一种方式。不过,在退出线程之前调用consumer.close()是很有必要的,它会提交任何还没有提交的东西,并向群组协调器发送消息,告知自己要离开群组,接下来就会触发再均衡,而不需要等待会话超时。

下面是运行在主线程上的消费者退出线程的代码。这些代码经过了简化,你可以在这里查看完整的代码:http://bit.ly/2u47e9A。

Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { System.out.println("Starting exit..."); consumer.wakeup(); ➊ try { mainThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }); ... try { // 循环,直到按下Ctrl+C键,关闭的钩子会在退出时进行清理 while (true) { ConsumerRecords<String, String> records = movingAvg.consumer.poll(1000); System.out.println(System.currentTimeMillis() + " -- waiting for data..."); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } for (TopicPartition tp: consumer.assignment()) System.out.println("Committing offset at position:" + consumer.position(tp)); movingAvg.consumer.commitSync(); } } catch (WakeupException e) { // 忽略关闭异常 ➋ } finally { consumer.close(); ➌ System.out.println("Closed consumer and we are done"); } }

❶ ShutdownHook 运行在单独的线程里,所以退出循环最安全的方式只能是调用 wakeup() 方法。

❷ 在另一个线程里调用 wakeup() 方法,导致 poll() 抛出 WakeupException。你可能想捕获异常以确保应用不会意外终止,但实际上这不是必需的。

❸ 在退出之前,确保彻底关闭了消费者。





4.10 反序列化器


在之前的章节里提到过,生产者需要用序列化器把对象转换成字节数组再发送给 Kafka。类似地,消费者需要用反序列化器把从 Kafka 接收到的字节数组转换成 Java 对象。在前面的例子里,我们假设每个消息的键值对都是字符串,所以我们使用了默认的 String Deserializer。

在上一章讲解 Kafka 生产者的时候,我们已经介绍了如何序列化自定义对象类型,以及如何使用 Avro 和 AvroSerializer 根据定义好的 schema 生成 Avro 对象。现在来看看如何为对象自定义反序列化器,以及如何使用 Avro 和 Avro 反序列化器。

很显然,生成消息使用的序列化器与读取消息使用的反序列化器应该是一一对应的。使用 IntSerializer 序列化,然后使用 StringDeserializer 进行反序列化,会出现不可预测的结果。对于开发者来说,必须知道写入主题的消息使用的是哪一种序列化器,并确保每个主题里只包含能够被反序列化器解析的数据。使用 Avro 和 schema 注册表进行序列化和反序列化的优势在于:AvroSerializer 可以保证写入主题的数据与主题的 schema 是兼容的,也就是说,可以使用相应的反序列化器和 schema 来反序列化数据。另外,在生产者或消费者里出现的任何一个与兼容性有关的错误都会被捕捉到,它们都带有消息描述,也就是说,在出现序列化错误时,就没必要再去调试字节数组了。

尽管不建议使用自定义的反序列化器,我们仍然会简单地演示如何自定义反序列化器,然后再举例演示如何使用 Avro 来反序列化消息的键和值。

自定义反序列化器

我们以在第 3 章使用过的自定义对象为例,为它写一个反序列化器。

public class Customer { private int customerID; private String customerName;   public Customer(int ID, String name) { this.customerID = ID; this.customerName = name; }   public int getID() { return customerID; }   public String getName() { return customerName; } }

自定义反序列化器看起来是这样的:

import org.apache.kafka.common.errors.SerializationException;   import java.nio.ByteBuffer; import java.util.Map;   public class CustomerDeserializer implements Deserializer<Customer> { ➊   @Override public void configure(Map configs, boolean isKey) { // 不需要做任何配置 }   @Override public Customer deserialize(String topic, byte[] data) {   int id; int nameSize; String name;   try { if (data == null) return null; if (data.length < 8) throw new SerializationException("Size of data received by IntegerDeserializer is shorter than expected");   ByteBuffer buffer = ByteBuffer.wrap(data); id = buffer.getInt(); nameSize = buffer.getInt();   byte[] nameBytes = new byte[nameSize]; buffer.get(nameBytes); name = new String(nameBytes, "UTF-8");   return new Customer(id, name); ➋   } catch (Exception e) { throw new SerializationException("Error when serializing Customer to byte[] " + e); } }   @Override public void close() { // 不需要关闭任何东西 } }

❶ 消费者也需要使用 Customer 类,这个类和序列化器在生产者和消费者应用程序里要相互匹配。在一个大型的企业里,会有很多消费者和生产者共享这些数据,这对于企业来说算是一个挑战。

❷ 我们把序列化器的逻辑反过来,从字节数组里获取 customer ID 和 name,再用它们构建需要的对象。

使用反序列化器的消费者代码看起来是这样的:

Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.CustomerDeserializer");   KafkaConsumer<String, Customer> consumer = new KafkaConsumer<>(props);   consumer.subscribe(Collections.singletonList("customerCountries"))   while (true) { ConsumerRecords<String, Customer> records = consumer.poll(100); for (ConsumerRecord<String, Customer> record : records) { System.out.println("current customer Id: " + record.value().getID() + " and current customer name: " + record.value().getName()); } }

再强调一次,我们并不建议使用自定义序列化器和自定义反序列化器。它们把生产者和消费者紧紧地耦合在一起,并且很脆弱,容易出错。我们建议使用标准的消息格式,比如 JSON、Thrift、Protobuf 或 Avro。接下来,我们会看到如何在消费者里使用 Avro 反序列化器。可以回到第 3 章查看有关 Avro 的项目背景、schema 和 schema 的兼容能力。



在消费者里进行 Avro 反序列化

我们在 Avro 里使用第 3 章出现过的 Customer 类,为了读取这些对象,需要实现一个类似这样的消费者应用程序:

Properties props = new Properties(); props.put("bootstrap.servers", "broker1:9092,broker2:9092"); props.put("group.id", "CountryCounter"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"); ➊ props.put("schema.registry.url", schemaUrl); ➋ String topic = "customerContacts"   KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers,groupId, url)); consumer.subscribe(Collections.singletonList(topic));   System.out.println("Reading topic:" + topic);   while (true) { ConsumerRecords<String, Customer> records = consumer.poll(1000); ➌   for (ConsumerRecord<String, Customer> record: records) { System.out.println("Current customer name is: " + record.value().getName()); ➍ } consumer.commitSync(); }

❶ 使用 KafkaAvroDeserializer 来反序列化 Avro 消息。

❷ schema.registry.url 是一个新的参数,它指向 schema 的存放位置。消费者可以使用由生产者注册的 schema 来反序列化消息。

❸ 将生成的类 Customer 作为值的类型。

❹ record.value() 返回的是一个 Customer 实例,接下来就可以使用它了。





4.11 独立消费者——为什么以及怎样使用没有群组的消费者


到目前为止,我们讨论了消费者群组,分区被自动分配给群组里的消费者,在群组里新增或移除消费者时自动触发再均衡。通常情况下,这些行为刚好是你所需要的,不过有时候你需要一些更简单的东西。比如,你可能只需要一个消费者从一个主题的所有分区或者某个特定的分区读取数据。这个时候就不需要消费者群组和再均衡了,只需要把主题或者分区分配给消费者,然后开始读取消息并提交偏移量。

如果是这样的话,就不需要订阅主题,取而代之的是为自己分配分区。一个消费者可以订阅主题(并加入消费者群组),或者为自己分配分区,但不能同时做这两件事情。

下面的例子演示了一个消费者是如何为自己分配分区并从分区里读取消息的:

List<PartitionInfo> partitionInfos = null; partitionInfos = consumer.partitionsFor("topic"); ➊ if (partitionInfos != null) { for (PartitionInfo partition : partitionInfos) partitions.add(new TopicPartition(partition.topic(), partition.partition())); consumer.assign(partitions); ➋ while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record: records) { System.out.println("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); } consumer.commitSync(); } }

❶ 向集群请求主题可用的分区。如果只打算读取特定分区,可以跳过这一步。

❷ 知道需要哪些分区之后,调用 assign() 方法。

除了不会发生再均衡,也不需要手动查找分区,其他的看起来一切正常。不过要记住,如果主题增加了新的分区,消费者并不会收到通知。所以,要么周期性地调用 consumer.partitionsFor() 方法来检查是否有新分区加入,要么在添加新分区后重启应用程序。





4.12 旧版的消费者API


我们在这一章讨论的 Java KafkaConsumer 客户端是 org.apache.kafka.clients 包的一部分。在本书写到这一章的时候,Kafka 还有两个旧版本的 Scala 消费者客户端,它们是 kafka.consumer 包的一部分,属于 Kafka 核心模块。它们分别被叫作 SimpleConsumer(简单消费者,实际上也不是那么简单,它们是对 Kafka API 的轻度包装,可以用于从特定的分区和偏移量开始读取消息)和高级消费者。高级消费者指的就是 ZookeeperConsumerConnector,它有点像现在的消费者,有消费者群组,有分区再均衡,不过它使用 Zookeeper 来管理消费者群组,并不具备提交偏移量和再均衡的可操控性。

因为现在的消费者同时支持以上两种行为,并且为开发人员提供了更高的可靠性和可操控性,所以我们不打算讨论旧版 API。如果你想使用它们,那么请三思,如果确定要使用,可以从 Kafka 文档中了解更多的信息。





4.13 总结


我们在本章开头解释了 Kafka 消费者群组概念,消费者群组支持多个消费者从主题上读取消息。在介绍完概念之后,我们又给出了一个消费订阅主题并持续读取消息的例子。然后介绍了一些重要的消费者配置参数以及它们对消费者行为的影响。我们用一大部分内容解释偏移量以及消费者是如何管理偏移量的。了解消费者提交偏移量的方式有助更好地使用消费者客户端,所以我们介绍了几种不同的偏移量提交方式。然后又探讨了消费者 API 的其他话题,比如如何处理再均衡以及如何关闭消费者。

最后,我们介绍了反序列化器,消费者客户端使用反序列化器将保存在 Kafka 里的字节转换成应用程序可以处理的 Java 对象。我们主要介绍了 Avro 反序列化器,虽然有很多可用的反序列化器,但在 Kafka 里,Avro 是最为常用的一个。

现在,我们已经知道如何生成和读取 Kafka 消息,下一章将介绍 Kafka 内部的实现细节。





第 5 章 深入 Kafka


如果只是为了开发 Kafka 应用程序,或者只是在生产环境使用 Kafka,那么了解 Kafka 的内部工作原理不是必需的。不过,了解 Kafka 的内部工作原理有助于理解 Kafka 的行为,也有助于诊断问题。本章并不会涵盖 Kafka 的每一个设计和实现细节,而是集中讨论以下 3 个有意思的话题:

Kafka 如何进行复制;

Kafka 如何处理来自生产者和消费者的请求;

Kafka 的存储细节,比如文件格式和索引。



在对 Kafka 进行调优时,深入理解这些问题是很有必要的。了解了内部机制,可以更有目的性地进行深入的调优,而不只是停留在表面,隔靴搔痒。





5.1 集群成员关系


Kafka 使用 Zookeeper 来维护集群成员的信息。每个 broker 都有一个唯一标识符,这个标识符可以在配置文件里指定,也可以自动生成。在 broker 启动的时候,它通过创建临时节点把自己的 ID 注册到 Zookeeper。Kafka 组件订阅 Zookeeper 的 /brokers/ids 路径(broker 在 Zookeeper 上的注册路径),当有 broker 加入集群或退出集群时,这些组件就可以获得通知。

如果你要启动另一个具有相同 ID 的 broker,会得到一个错误——新 broker 会试着进行注册,但不会成功,因为 Zookeeper 里已经有一个具有相同 ID 的 broker。

在 broker 停机、出现网络分区或长时间垃圾回收停顿时,broker 会从 Zookeeper 上断开连接,此时 broker 在启动时创建的临时节点会自动从 Zookeeper 上移除。监听 broker 列表的 Kafka 组件会被告知该 broker 已移除。

在关闭 broker 时,它对应的节点也会消失,不过它的 ID 会继续存在于其他数据结构中。例如,主题的副本列表(下面会介绍)里就可能包含这些 ID。在完全关闭一个 broker 之后,如果使用相同的 ID 启动另一个全新的 broker,它会立即加入集群,并拥有与旧 broker 相同的分区和主题。





5.2 控制器


控制器其实就是一个 broker,只不过它除了具有一般 broker 的功能之外,还负责分区首领的选举(我们将在 5.3 节讨论分区首领选举)。集群里第一个启动的 broker 通过在 Zookeeper 里创建一个临时节点 /controller 让自己成为控制器。其他 broker 在启动时也会尝试创建这个节点,不过它们会收到一个“节点已存在”的异常,然后“意识”到控制器节点已存在,也就是说集群里已经有一个控制器了。其他 broker 在控制器节点上创建 Zookeeper watch 对象,这样它们就可以收到这个节点的变更通知。这种方式可以确保集群里一次只有一个控制器存在。

如果控制器被关闭或者与 Zookeeper 断开连接,Zookeeper 上的临时节点就会消失。集群里的其他 broker 通过 watch 对象得到控制器节点消失的通知,它们会尝试让自己成为新的控制器。第一个在 Zookeeper 里成功创建控制器节点的 broker 就会成为新的控制器,其他节点会收到“节点已存在”的异常,然后在新的控制器节点上再次创建 watch 对象。每个新选出的控制器通过 Zookeeper 的条件递增操作获得一个全新的、数值更大的 controller epoch。其他 broker 在知道当前 controller epoch 后,如果收到由控制器发出的包含较旧 epoch 的消息,就会忽略它们。

当控制器发现一个 broker 已经离开集群(通过观察相关的 Zookeeper 路径),它就知道,那些失去首领的分区需要一个新首领(这些分区的首领刚好是在这个 broker 上)。控制器遍历这些分区,并确定谁应该成为新首领(简单来说就是分区副本列表里的下一个副本),然后向所有包含新首领或现有跟随者的 broker 发送请求。该请求消息包含了谁是新首领以及谁是分区跟随者的信息。随后,新首领开始处理来自生产者和消费者的请求,而跟随者开始从新首领那里复制消息。

当控制器发现一个 broker 加入集群时,它会使用 broker ID 来检查新加入的 broker 是否包含现有分区的副本。如果有,控制器就把变更通知发送给新加入的 broker 和其他 broker,新 broker 上的副本开始从首领那里复制消息。

简而言之,Kafka 使用 Zookeeper 的临时节点来选举控制器,并在节点加入集群或退出集群时通知控制器。控制器负责在节点加入或离开集群时进行分区首领选举。控制器使用 epoch 来避免“脑裂”。“脑裂”是指两个节点同时认为自己是当前的控制器。





5.3 复制


复制功能是 Kafka 架构的核心。在 Kafka 的文档里,Kafka 把自己描述成“一个分布式的、可分区的、可复制的提交日志服务”。复制之所以这么关键,是因为它可以在个别节点失效时仍能保证 Kafka 的可用性和持久性。

Kafka 使用主题来组织数据,每个主题被分为若干个分区,每个分区有多个副本。那些副本被保存在 broker 上,每个 broker 可以保存成百上千个属于不同主题和分区的副本。

副本有以下两种类型。

首领副本

  每个分区都有一个首领副本。为了保证一致性,所有生产者请求和消费者请求都会经过这个副本。

跟随者副本

  首领以外的副本都是跟随者副本。跟随者副本不处理来自客户端的请求,它们唯一的任务就是从首领那里复制消息,保持与首领一致的状态。如果首领发生崩溃,其中的一个跟随者会被提升为新首领。

首领的另一个任务是搞清楚哪个跟随者的状态与自己是一致的。跟随者为了保持与首领的状态一致,在有新消息到达时尝试从首领那里复制消息,不过有各种原因会导致同步失败。例如,网络拥塞导致复制变慢,broker 发生崩溃导致复制滞后,直到重启 broker 后复制才会继续。

为了与首领保持同步,跟随者向首领发送获取数据的请求,这种请求与消费者为了读取消息而发送的请求是一样的。首领将响应消息发给跟随者。请求消息里包含了跟随者想要获取消息的偏移量,而且这些偏移量总是有序的。

一个跟随者副本先请求消息 1,接着请求消息 2,然后请求消息 3,在收到这 3 个请求的响应之前,它是不会发送第 4 个请求消息的。如果跟随者发送了请求消息 4,那么首领就知道它已经收到了前面 3 个请求的响应。通过查看每个跟随者请求的最新偏移量,首领就会知道每个跟随者复制的进度。如果跟随者在 10s 内没有请求任何消息,或者虽然在请求消息,但在 10s 内没有请求最新的数据,那么它就会被认为是不同步的。如果一个副本无法与首领保持一致,在首领发生失效时,它就不可能成为新首领——毕竟它没有包含全部的消息。

相反,持续请求得到的最新消息副本被称为同步的副本。在首领发生失效时,只有同步副本才有可能被选为新首领。

跟随者的正常不活跃时间或在成为不同步副本之前的时间是通过 replica.lag.time.max.ms 参数来配置的。这个时间间隔直接影响着首领选举期间的客户端行为和数据保留机制。我们将在第 6 章讨论可靠性保证,到时候会深入讨论这个问题。

除了当前首领之外,每个分区都有一个首选首领——创建主题时选定的首领就是分区的首选首领。之所以把它叫作首选首领,是因为在创建分区时,需要在 broker 之间均衡首领(后面会介绍在 broker 间分布副本和首领的算法)。因此,我们希望首选首领在成为真正的首领时,broker 间的负载最终会得到均衡。默认情况下,Kafka 的 auto.leader.rebalance.enable 被设为 true,它会检查首选首领是不是当前首领,如果不是,并且该副本是同步的,那么就会触发首领选举,让首选首领成为当前首领。

 找到首选首领

从分区的副本清单里可以很容易找到首选首领(可以使用 kafka.topics.sh 工具查看副本和分区的详细信息,我们将在第 10 章介绍管理工具)。清单里的第一个副本一般就是首选首领。不管当前首领是哪一个副本,都不会改变这个事实,即使使用副本分配工具将副本重新分配给其他 broker。要记住,如果你手动进行副本分配,第一个指定的副本就是首选首领,所以要确保首选首领被传播到其他 broker 上,避免让包含了首领的 broker 负载过重,而其他 broker 却无法为它们分担负载。





5.4 处理请求


broker 的大部分工作是处理客户端、分区副本和控制器发送给分区首领的请求。Kafka 提供了一个二进制协议(基于 TCP),指定了请求消息的格式以及 broker 如何对请求作出响应——包括成功处理请求或在处理请求过程中遇到错误。客户端发起连接并发送请求, broker 处理请求并作出响应。broker 按照请求到达的顺序来处理它们——这种顺序保证让 Kafka 具有了消息队列的特性,同时保证保存的消息也是有序的。

所有的请求消息都包含一个标准消息头:

Request type(也就是 API key)

Request version(broker 可以处理不同版本的客户端请求,并根据客户端版本作出不同的响应)

Correlation ID——一个具有唯一性的数字,用于标识请求消息,同时也会出现在响应消息和错误日志里(用于诊断问题)

Client ID——用于标识发送请求的客户端



我们不打算在这里描述该协议,因为在 Kafka 文档里已经有很详细的说明。不过,了解 broker 如何处理请求还是有必要的——后面在我们讨论 Kafka 监控和各种配置选项时,你就会了解到那些与队列和线程有关的度量指标和配置参数。

broker 会在它所监听的每一个端口上运行一个 Acceptor 线程,这个线程会创建一个连接,并把它交给 Processor 线程去处理。Processor 线程(也被叫作“网络线程”)的数量是可配置的。网络线程负责从客户端获取请求消息,把它们放进请求队列,然后从响应队列获取响应消息,把它们发送给客户端。图 5-1 为 Kafka 处理请求的内部流程。

请求消息被放到请求队列后,IO 线程会负责处理它们。下面是几种最常见的请求类型。

生产请求

  生产者发送的请求,它包含客户端要写入 broker 的消息。

获取请求

  在消费者和跟随者副本需要从 broker 读取消息时发送的请求。



图 5-1:Kafka 处理请求的内部流程

生产请求和获取请求都必须发送给分区的首领副本。如果 broker 收到一个针对特定分区的请求,而该分区的首领在另一个 broker 上,那么发送请求的客户端会收到一个“非分区首领”的错误响应。当针对特定分区的获取请求被发送到一个不含有该分区首领的 broker 上,也会出现同样的错误。Kafka 客户端要自己负责把生产请求和获取请求发送到正确的 broker 上。

那么客户端怎么知道该往哪里发送请求呢?客户端使用了另一种请求类型,也就是元数据请求。这种请求包含了客户端感兴趣的主题列表。服务器端的响应消息里指明了这些主题所包含的分区、每个分区都有哪些副本,以及哪个副本是首领。元数据请求可以发送给任意一个 broker,因为所有 broker 都缓存了这些信息。

一般情况下,客户端会把这些信息缓存起来,并直接往目标 broker 上发送生产请求和获取请求。它们需要时不时地通过发送元数据请求来刷新这些信息(刷新的时间间隔通过 metadata.max.age.ms 参数来配置),从而知道元数据是否发生了变更——比如,在新 broker 加入集群时,部分副本会被移动到新的 broker 上(如图 5-2 所示)。另外,如果客户端收到“非首领”错误,它会在尝试重发请求之前先刷新元数据,因为这个错误说明了客户端正在使用过期的元数据信息,之前的请求被发到了错误的 broker 上。



图 5-2:客户端路由请求





5.4.1 生产请求


我们在第 3 章讨论如何配置生产者的时候,提到过 acks 这个配置参数——该参数指定了需要多少个 broker 确认才可以认为一个消息写入是成功的。不同的配置对“写入成功”的界定是不一样的,如果 acks=1,那么只要首领收到消息就认为写入成功;如果 acks=all,那么需要所有同步副本收到消息才算写入成功;如果 acks=0,那么生产者在把消息发出去之后,完全不需要等待 broker 的响应。

包含首领副本的 broker 在收到生产请求时,会对请求做一些验证。

发送数据的用户是否有主题写入权限?

请求里包含的 acks 值是否有效(只允许出现 0、1 或 all)?

如果 acks=all,是否有足够多的同步副本保证消息已经被安全写入?(我们可以对 broker 进行配置,如果同步副本的数量不足,broker 可以拒绝处理新消息。在第 6 章介绍 Kafka 持久性和可靠性保证时,我们会讨论更多这方面的细节。)



之后,消息被写入本地磁盘。在 Linux 系统上,消息会被写到文件系统缓存里,并不保证它们何时会被刷新到磁盘上。Kafka 不会一直等待数据被写到磁盘上——它依赖复制功能来保证消息的持久性。

在消息被写入分区的首领之后,broker 开始检查 acks 配置参数——如果 acks 被设为 0 或 1,那么 broker 立即返回响应;如果 acks 被设为 all,那么请求会被保存在一个叫作炼狱的缓冲区里,直到首领发现所有跟随者副本都复制了消息,响应才会被返回给