Kafka Java生产者如何管理TCP连接
Java生产者如何管理TCP连接
为何是TCP
Apache Kafka 的所有通信都是基于 TCP 的,而不是基于 HTTP 或其他协议。无论是生产者、消费者,还是 Broker 之间的通信都是如此。为什么 Kafka 不使用 HTTP 作为底层的通信协议呢?其实这里面的原因有很多, 但最主要的原因在于 TCP 和 HTTP 之间的区别。
从社区的角度来看,在开发客户端时,人们能够利用 TCP 本身提供的一些高级功能,比如多路复用请求以及同时轮询 多个连接的能力。
所谓的多路复用请求,即 multiplexing request,是指将两 个或多个数据流合并到底层单一物理连接中的过程。TCP 的 多路复用请求会在一条物理连接上创建若干个虚拟连接,每 个虚拟连接负责流转各自对应的数据流。其实严格来说, TCP 并不能多路复用,它只是提供可靠的消息交付语义保证,比如自动重传丢失的报文。更严谨地说,作为一个基于报文的协议,TCP 能够被用于多 路复用连接场景的前提是,上层的应用协议(比如 HTTP) 允许发送多条消息。
生产者程序概览
第 1 步:构造生产者对象所需的参数对象。
第 2 步:利用第 1 步的参数对象,创建 KafkaProducer 对 象实例。
第 3 步:使用 KafkaProducer 的 send 方法发送消息。
第 4 步:调用 KafkaProducer 的 close 方法关闭生产者并 释放各种系统资源。
1 | Properties props = new Properties (); |
这段代码使用了 Java 7 提供的 try-with-resource 特性, 所以并没有显式调用 producer.close() 方法。无论是否显 式调用 close 方法,所有生产者程序大致都是这个路数。kafka的producer客户端是如何管理这些TCP连接的?
何时创建TCP连接
就上面的那段代码而言,可能创建 TCP 连接的地方有两处:Producer producer = new KafkaProducer(props)
和 producer.send(msg, callback)
。那么打底是前者还是后者?
首先,生产者应用在创建 KafkaProducer 实例时是会建立 与 Broker 的 TCP 连接的。其实这种表述也不是很准确,应 该这样说:在创建 KafkaProducer 实例时,生产者应用会在后台创建并启动一个名为 Sender 的线程,该 Sender 线 程开始运行时首先会创建与 Broker 的连接。也就是说,会连接bootstrap.servers参数中指定的所有broker。它是 Producer 的核心参数之一,指定了这个 Producer 启动时 要连接的 Broker 地址。请注意,这里的“启动时”,代表 的是 Producer 启动时会发起与这些 Broker 的连接。因 此,如果你为这个参数指定了 1000 个 Broker 连接信息,你的 Producer 启动时会首先创建与这 1000 个 Broker 的 TCP 连接。
因此,可以知道TCP 连接是在创建 KafkaProducer 实例时建立的。
但是!
TCP 连接还可能在两个地方被创建:一个是在更新元数据后,另一个是在消息发送时。为什么说是可能?因 为这两个地方并非总是创建 TCP 连接。当 Producer 更新了集群的元数据信息之后,如果发现与某些 Broker 当前没 有连接,那么它就会创建一个 TCP 连接。同样地,当要发送消息时,Producer 发现尚不存在与目标 Broker 的连 接,也会创建一个。例如下面的两个场景
场景一:当 Producer 尝试给一个不存在的主题发送消息 时,Broker 会告诉 Producer 说这个主题不存在。此时 Producer 会发送 METADATA 请求给 Kafka 集群,去尝试 获取最新的元数据信息。
场景二:Producer 通过 metadata.max.age.ms 参数定期 地去更新元数据信息。该参数的默认值是 300000,即 5 分 钟,也就是说不管集群那边是否有变化,Producer 每 5 分 钟都会强制刷新一次元数据以保证它是最及时的数据。
何时关闭TCP连接
Producer 端关闭 TCP 连接的方式有两种:一种是用户主动 关闭;一种是 Kafka 自动关闭。
第一种。这里的主动关闭实际上是广义的主动关 闭,甚至包括用户调用 kill -9 主动“杀掉”Producer 应 用。当然最推荐的方式还是调用 producer.close() 方法来 关闭。
第二种是 Kafka 帮你关闭,这与 Producer 端参数 connections.max.idle.ms 的值有关。默认情况下该参数值 是 9 分钟,即如果在 9 分钟内没有任何请求“流过”某个 TCP 连接,那么 Kafka 会主动帮你把该 TCP 连接关闭。用 户可以在 Producer 端设置 connections.max.idle.ms=-1 禁掉这种机制。一旦被设置成 -1,TCP 连接将成为永久长 连接。当然这只是软件层面的“长连接”机制,由于 Kafka创建的这些 Socket 连接都开启了 keepalive,因此 keepalive 探活机制还是会遵守的。
Author: corn1ng
Link: https://corn1ng.github.io/2019/10/24/kafka/4Java生产者如何管理TCP连接/
License: 知识共享署名-非商业性使用 4.0 国际许可协议