向kafka发送消息的send流程

 2024-03-06 01:08:51  阅读 0

向kafka发送消息的send流程

创建对象的主要步骤

当我们通过 send 方法发送消息时,首先会在 send 方法内部创建一个对象。

该对象主要包含两个必需参数:目标主题(topic)和消息内容(value),以及可选参数()、key(key)、()等。

每条消息都有一个时间戳。 如果我们在发送消息时不指定时间戳,生产者将在消息记录上标记其当前时间。 Kafka最终使用的时间戳取决于为主题配置的时间戳类型:

如果主题配置为使用 mon...,则将使用生产者记录中的时间戳。

如果主题配置为使用 mon...,则将消息附加到日志文件时,生产者记录中的时间戳将被本地时间覆盖。 无论上述情况如何,实际使用的时间戳都会在 中返回给客户端。

拦截器

0.10版本引入,主要是针对末端的定制逻辑控制。

它可能运行在多个线程中,开发者在具体实现时需要保证线程安全。

序列化键和值

发送对象时,生产者通过序列化器(如 Avro)将键和值序列化为字节数组,以便可以通过网络传输。

分区器根据key选择分区

如果我们在发送消息时指定了分区,那么分区器不会再做任何事情,直接返回指定的分区。

如果我们在发送消息时没有指定分区,分区器将根据对象键的哈希值选择分区。

如果发送消息时未指定分区且未配置key,则采用轮询方式分配分区。

发送消息给卡夫卡

根据上一步中选择的分区,生产者知道将该消息发送到哪个主题和分区。 消息将被添加到一个消息批次中,缓存在缓冲区中,同一批次中的所有消息将被发送到同一主题和分区。 一个单独的线程会将这些批次发送到相应的 Kafka。

返回响应

当kafka收到消息时,它会返回响应。 如果消息成功写入Kafka,会返回一个对象,其中包含主题和分区信息,以及分区中记录的偏移量、时间戳等。 如果写入失败,会返回错误信息。 收到错误后,生产者将尝试重试发送消息。 如果一定次数后仍然失败,则会返回错误消息。

基本线程 主线程

负责消息创建、拦截器、序列化器、分区器等操作,并将消息追加到消息收集器中。

消息收集器为每个分区维护一个 Deque 类型的双端队列。 可以理解为一个集合。 批量发送有助于提高吞吐量,减少网络影响; 因为生产者客户端使用java.io。 在发送消息之前保存消息并保持实现的重用; 缓存池仅针对特定大小(由batch.size指定)进行管理。 太大的消息缓存无法重用。 每次追加消息时,都会找到/创建对应的双端队列,从其尾部取出一个,并判断当前消息的大小是否可以写入批量。 能写就写;能写就写;能写就写。 如果无法写入,则新建一条,并判断消息大小是否超过客户端参数配置batch.size的值。 如果没有超过batch.size的值,就新建一个batch.size,这样方便缓存。 重复使用; 如果超过,将建立相应的消息大小。 缺点是内存不能重复使用。 该线程从消息收集器获取缓存的消息,并将其处理成 Node 的形式,代表集群的节点。 它将进一步转换为表格,然后将数据发送到服务器。 在发送之前,线程将消息以Map的形式保存起来进行缓存。 通过它可以获取到,即当前Node中负载压力最小的那个,这样消息就可以尽快发送出去,同步发送

使用send()方法发送消息,它会返回一个或多个对象,调用get()方法等待,就可以知道消息是否发送成功。

如果业务要求消息必须按顺序发送,可以使用同步发送,并且只能发送到一个分区。 通过该参数,可以在发送失败时重试发送。

设置=1可以控制生产者在收到服务器响应之前只发送一条消息,并在消息发送成功后立即刷新它,从而控制消息发送的顺序。

有两种类型的异常。 一种是可重试的异常,可以通过重新发送消息来解决。 例如,连接异常后重新连接、“无”异常后重新选择新的等。可以配置为遇到此类异常后自动重新发送消息,直到超过重试次数。

另一种是不可重试的异常,比如“size Too Large”(消息太大),会立即返回错误。

异步发送

调用 send() 方法并指定服务器返回响应时将调用的回调函数。

如果业务需要知道消息是否发送成功,并且不关心消息的顺序,那么可以使用异步+回调的方式发送消息,参数=0,失败的消息会记录在日志中文件。

要使用该功能,必须首先实现 org..kafka... 接口,该接口只有一个方法。

如果发送异常,则参数 e 将非空。

当异步发送消息时,Kafka首先会将消息存储在缓冲池中。 当满足以下设定条件时,将触发缓冲池消息发送操作:

消息缓存达到batch.size指定的大小。 自上次发送消息以来的时间间隔已达到 .ms 指定的值。 调用flush()方法会立即触发发送并阻塞,直到当前缓冲区发送完毕。 调用close()触发发送,发送后关闭。

标签: 发送 消息 分区

如本站内容信息有侵犯到您的权益请联系我们删除,谢谢!!


Copyright © 2020 All Rights Reserved 京ICP5741267-1号 统计代码