4.0.4
参考指南
本指南介绍了 Spring Cloud Stream Binder 的 Apache Kafka 实现。它包含有关其设计、使用和配置选项的信息,以及有关 Stream Cloud Stream 概念如何映射到 Apache Kafka 特定构造的信息。此外,本指南还介绍了 Spring Cloud Stream 的 Kafka Streams 绑定功能。
1.Apache Kafka 绑定器
1.1. 用法
要使用 Apache Kafka Binder,您需要将spring-cloud-stream-binder-kafka依赖项添加到 Spring Cloud Stream 应用程序,如以下 Maven 示例所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
或者,您也可以使用 Spring Cloud Stream Kafka Starter,如以下 Maven 示例所示:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
1.2. 概述
下图显示了 Apache Kafka Binder 操作方式的简化图:
Apache Kafka Binder 实现将每个目标映射到 Apache Kafka 主题。消费者组直接映射到相同的 Apache Kafka 概念。分区也直接映射到 Apache Kafka 分区。
Binder当前使用Apache Kafkakafka-clients版本3.1.0。该客户端可以与较旧的代理进行通信(请参阅 Kafka 文档),但某些功能可能不可用。例如,对于 0.11.xx 之前的版本,不支持本机标头。此外,0.11.xx 不支持该autoAddPartitions属性。
1.3. 配置选项
本节包含 Apache Kafka 绑定器使用的配置选项。
有关绑定器的常见配置选项和属性,请参阅核心文档中的绑定属性。
1.3.1. Kafka 绑定器属性
- spring.cloud.stream.kafka.binder.brokers
-
Kafka Binder 连接的代理列表。
默认值:
localhost. - spring.cloud.stream.kafka.binder.defaultBrokerPort
-
brokers允许指定带有或不带有端口信息的主机(例如,host1,host2:port2)。当代理列表中未配置端口时,这将设置默认端口。默认值:
9092. - spring.cloud.stream.kafka.binder.configuration
-
传递给绑定器创建的所有客户端的客户端属性(生产者和消费者)的键/值映射。由于这些属性由生产者和消费者同时使用,因此使用应仅限于公共属性,例如安全设置。通过此配置提供的未知 Kafka 生产者或消费者属性将被过滤掉,并且不允许传播。这里的属性取代 boot.properties 中设置的任何属性。
默认:空地图。
- spring.cloud.stream.kafka.binder.consumerProperties
-
任意 Kafka 客户端消费者属性的键/值映射。除了支持已知的 Kafka 消费者属性之外,这里还允许未知的消费者属性。此处的属性将取代启动中和上面的属性中设置的任何属性
configuration。默认:空地图。
- spring.cloud.stream.kafka.binder.headers
-
由活页夹传输的自定义标头的列表。
kafka-clients仅当与版本 < 0.11.0.0 的旧应用程序 (⇐ 1.3.x) 通信时才需要。较新的版本本身支持标头。默认值:空。
- spring.cloud.stream.kafka.binder.healthTimeout
-
等待获取分区信息的时间,以秒为单位。如果此计时器到期,运行状况报告为关闭。
默认值:10。
- spring.cloud.stream.kafka.binder.requiredAcks
-
代理上所需的确认数量。有关生产者
acks属性,请参阅 Kafka 文档。默认值:
1. - spring.cloud.stream.kafka.binder.minPartitionCount
-
仅当设置
autoCreateTopics或时才有效。autoAddPartitions绑定器在其生成或消耗数据的主题上配置的全局最小分区数。它可以被生产者的设置或生产者的设置partitionCount值(如果其中一个较大)取代。instanceCount * concurrency默认值:
1. - spring.cloud.stream.kafka.binder.producerProperties
-
任意 Kafka 客户端生产者属性的键/值映射。除了支持已知的 Kafka 生产者属性之外,这里还允许未知的生产者属性。此处的属性将取代启动中和上面的属性中设置的任何属性
configuration。默认:空地图。
- spring.cloud.stream.kafka.binder.replicationFactor
-
autoCreateTopics如果处于活动状态,则自动创建主题的复制因子。可以在每个绑定上被覆盖。如果您使用 2.4 之前的 Kafka 代理版本,则该值应至少设置为 1。从版本 3.0.8 开始,binder 使用-1默认值,这表明代理的“default.replication.factor”属性将用于确定副本的数量。请咨询您的 Kafka 代理管理员,看看是否存在需要最小复制因子的策略,如果是这种情况,通常情况下,将default.replication.factor匹配该值并且-1应该使用,除非您需要大于最小值的复制因子。默认值:
-1. - spring.cloud.stream.kafka.binder.autoCreateTopics
-
如果设置为
true,活页夹会自动创建新主题。如果设置为false,活页夹依赖于已配置的主题。在后一种情况下,如果主题不存在,活页夹将无法启动。该设置独立于 auto.create.topics.enable经纪商的设置,不会对其产生影响。如果服务器设置为自动创建主题,则可以使用默认代理设置将它们创建为元数据检索请求的一部分。默认值:
true. - spring.cloud.stream.kafka.binder.autoAddPartitions
-
如果设置为
true,活页夹会根据需要创建新分区。如果设置为false,活页夹依赖于已配置的主题的分区大小。如果目标主题的分区数小于预期值,则 Binder 启动失败。默认值:
false. - spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
-
在活页夹中启用事务。请参阅
transaction.idKafka 文档和文档中的事务spring-kafka。启用事务后,各个producer属性将被忽略,所有生产者都使用这些spring.cloud.stream.kafka.binder.transaction.producer.*属性。默认
null(无交易) - spring.cloud.stream.kafka.binder.transaction.producer.*
-
交易活页夹中生产者的全局生产者属性。请参阅
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefixKafka Producer Properties以及所有绑定器支持的一般生产者属性。默认值:请参阅各个生产者属性。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
-
KafkaHeaderMapper用于将标头映射到 Kafka 标头或从 Kafka 标头映射出的 bean 名称spring-messaging。例如,如果您希望在BinderHeaderMapper对标头使用 JSON 反序列化的 bean 中自定义受信任的包,请使用此选项。如果使用此属性的绑定器无法使用此自定义bean,则绑定器将在回退到绑定器创建的默认值之前BinderHeaderMapper查找名称kafkaBinderHeaderMapper为 type 的标头映射器 bean。BinderHeaderMapperBinderHeaderMapper默认值:无。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
-
down当主题上的任何分区(无论从哪个消费者接收数据)都没有领导者时,将绑定器运行状况设置为 的标志。默认值:
false. - spring.cloud.stream.kafka.binder.certificateStoreDirectory
-
当信任库或密钥库证书位置以类路径 URL ( ) 形式给出时
classpath:…,绑定器会将资源从 JAR 文件内的类路径位置复制到文件系统上的某个位置。ssl.truststore.location对于代理级证书 (和ssl.keystore.location) 以及用于架构注册表的证书 (schema.registry.ssl.truststore.location和schema.registry.ssl.keystore.location)都是如此。请记住,必须在 下提供信任库和密钥库类路径位置spring.cloud.stream.kafka.binder.configuration…。例如,spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location、spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location等。文件将移动到指定为该属性值的位置,该位置必须是文件系统上运行应用程序的进程可写入的现有目录。如果未设置此值并且证书文件是类路径资源,则它将被移至系统的临时目录,如System.getProperty("java.io.tmpdir"). 如果该值存在,但在文件系统上找不到该目录或该目录不可写,情况也是如此。默认值:无。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
-
当设置为 true 时,每当访问指标时就会计算每个消费者主题的偏移滞后指标。当设置为 false 时,仅使用定期计算的偏移滞后。
默认值:true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
-
计算每个消费者主题的偏移滞后的时间间隔。
metrics.defaultOffsetLagMetricsEnabled只要禁用或其计算时间过长,就会使用该值。默认值:60 秒
- spring.cloud.stream.kafka.binder.enableObservation
-
在此活页夹中的所有绑定上启用微米观察注册表。
默认值:假
1.3.2. 卡夫卡消费者属性
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为spring.cloud.stream.kafka.default.consumer.<property>=<value>.
|
以下属性仅适用于 Kafka 消费者,并且必须以 为前缀spring.cloud.stream.kafka.bindings.<channelName>.consumer.。
- admin.configuration
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.properties,并且在未来的版本中将删除对它的支持。 - admin.replicas-assignment
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replicas-assignment,并且在未来的版本中将删除对它的支持。 - admin.replication-factor
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replication-factor,并且在未来的版本中将删除对它的支持。 - autoRebalanceEnabled
-
当 时
true,主题分区会在消费者组的成员之间自动重新平衡。当 时false,每个消费者都会根据spring.cloud.stream.instanceCount和分配一组固定的分区spring.cloud.stream.instanceIndex。这需要在每个启动的实例上适当设置spring.cloud.stream.instanceCount和属性。spring.cloud.stream.instanceIndex在这种情况下,属性的值spring.cloud.stream.instanceCount通常必须大于 1。默认值:
true. - ackEachRecord
-
当
autoCommitOffsetis 时true,此设置指示是否在处理每个记录后提交偏移量。consumer.poll()默认情况下,在处理完返回的一批记录中的所有记录后提交偏移量。轮询返回的记录数可以使用max.poll.recordsKafka 属性进行控制,该属性通过 Consumerconfiguration属性进行设置。将其设置为true可能会导致性能下降,但这样做会降低发生故障时重新传送记录的可能性。另外,请参阅绑定器requiredAcks属性,它也会影响提交偏移量的性能。从 3.1 开始,此属性已被弃用,转而使用ackMode. 如果ackMode未设置且未启用批处理模式,RECORD则将使用 ackMode。默认值:
false. - autoCommitOffset
-
从版本 3.1 开始,此属性已被弃用。有关替代方案的更多详细信息,请参阅
ackMode参考资料。处理消息后是否自动提交偏移量。如果设置为,则入站消息中会出现带有类型标头false键的标头。应用程序可以使用此标头来确认消息。有关详细信息,请参阅示例部分。当该属性设置为 时,Kafka Binder 将 ack 模式设置为,应用程序负责确认记录。另见。kafka_acknowledgmentorg.springframework.kafka.support.Acknowledgmentfalseorg.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUALackEachRecord默认值:
true. - ackMode
-
指定容器确认模式。这是基于 Spring Kafka 中定义的 AckMode 枚举。如果
ackEachRecord属性设置为true并且消费者不处于批处理模式,那么这将使用 的 ack 模式RECORD,否则,使用该属性提供的 ack 模式。 - autoCommitOnError
-
在可轮询的消费者中,如果设置为
true,它总是在出错时自动提交。如果未设置(默认值)或 false,则不会在可轮询消费者中自动提交。请注意,此属性仅适用于可轮询的消费者。默认值:未设置。
- resetOffsets
-
是否将消费者上的偏移量重置为 startOffset 提供的值。
KafkaBindingRebalanceListener如果提供a 则必须为 false ;请参阅使用 KafkaBindingRebalanceListener。有关此属性的更多信息,请参阅重置偏移。默认值:
false. - startOffset
-
新组的起始偏移量。允许值:
earliest和latest。如果为消费者“绑定”显式设置消费者组(通过spring.cloud.stream.bindings.<channelName>.group),则“startOffset”设置为earliest。否则,它被设置为latest针对anonymous消费者组。有关此属性的更多信息,请参阅重置偏移。默认值:null(相当于
earliest)。 - enableDlq
-
当设置为 true 时,它会为消费者启用 DLQ 行为。默认情况下,导致错误的消息将转发到名为 的主题
error.<destination>.<group>。dlqName可以通过设置属性或定义 of@Bean类型来配置 DLQ 主题名称DlqDestinationResolver。这为更常见的 Kafka 重放场景提供了另一种选择,适用于错误数量相对较少且重放整个原始主题可能过于麻烦的情况。有关更多信息,请参阅死信主题处理。从版本 2.0 开始,发送到 DLQ 主题的消息通过以下标头进行了增强:x-original-topic、x-exception-message和x-exception-stacktraceasbyte[]。默认情况下,失败的记录将发送到 DLQ 主题中与原始记录相同的分区号。有关如何更改该行为, 请参阅死信主题分区选择。不允许的时候destinationIsPattern是true。默认值:
false. - dlqPartitions
-
当
enableDlq为 true 并且未设置此属性时,将创建一个与主主题具有相同分区数的死信主题。通常,死信记录会被发送到死信主题中与原始记录相同的分区。这种行为是可以改变的;请参阅死信主题分区选择。如果此属性设置为1并且没有DqlPartitionFunctionbean,则所有死信记录都将写入分区0。如果此属性大于1,您必须提供一个DlqPartitionFunctionbean。请注意,实际分区计数受绑定器minPartitionCount属性的影响。默认:
none - configuration
-
映射包含通用 Kafka 消费者属性的键/值对。除了具有 Kafka 消费者属性之外,还可以在此处传递其他配置属性。例如应用程序所需的一些属性,例如
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar.bootstrap.servers此处无法设置该属性;如果需要连接到多个集群,请使用多绑定程序支持。默认:空地图。
- dlqName
-
用于接收错误消息的 DLQ 主题的名称。
默认值:null(如果未指定,导致错误的消息将转发到名为 的主题
error.<destination>.<group>)。 - dlqProducerProperties
-
使用它,可以设置 DLQ 特定的生产者属性。通过 kafka 生产者属性可用的所有属性都可以通过该属性进行设置。当消费者启用本机解码(即 useNativeDecoding: true)时,应用程序必须为 DLQ 提供相应的键/值序列化器。
dlqProducerProperties.configuration.key.serializer这必须以和的形式提供dlqProducerProperties.configuration.value.serializer。默认值:默认 Kafka 生产者属性。
- standardHeaders
-
指示入站通道适配器填充哪些标准标头。允许的值:
none、id、timestamp或both。如果使用本机反序列化并且接收消息的第一个组件需要id(例如配置为使用 JDBC 消息存储的聚合器),则很有用。默认:
none - converterBeanName
-
实现 的 bean 的名称
RecordMessageConverter。在入站通道适配器中使用来替换默认的MessagingMessageConverter.默认:
null - idleEventInterval
-
指示最近未收到消息的事件之间的时间间隔(以毫秒为单位)。使用 an
ApplicationListener<ListenerContainerIdleEvent>来接收这些事件。有关使用示例,请参阅示例:暂停和恢复 Consumer 。默认:
30000 - destinationIsPattern
-
如果为 true,则目标将被
Pattern代理视为用于匹配主题名称的正则表达式。如果为 true,则不会配置主题,并且enableDlq不允许主题,因为绑定器在配置阶段不知道主题名称。请注意,检测与模式匹配的新主题所需的时间由消费者属性控制metadata.max.age.ms,(在撰写本文时)默认为 300,000 毫秒(5 分钟)。这可以使用configuration上面的属性进行配置。默认:
false - topic.properties
-
供应新主题时使用的 Kafka 主题属性
Map- 例如,spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0默认值:无。
- topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,键是分区,值是分配。在配置新主题时使用。请参阅jar
NewTopic中的 Javadocskafka-clients。默认值:无。
- topic.replication-factor
-
配置主题时使用的复制因子。覆盖活页夹范围的设置。
replicas-assignments如果存在则忽略。默认值:无(使用活页夹范围内的默认值 -1)。
- pollTimeout
-
用于轮询消费者的超时。
默认值:5 秒。
- transactionManager
-
KafkaAwareTransactionManager用于覆盖此绑定的绑定器事务管理器的 Bean 名称。如果您想使用 Kafka 事务同步另一个事务,通常需要使用ChainedKafkaTransactionManaager. 为了实现记录的一次性消费和生产,消费者和生产者绑定必须全部配置有相同的事务管理器。默认值:无。
- txCommitRecovered
-
当使用事务绑定器时,默认情况下,恢复记录的偏移量(例如,当重试次数耗尽并且记录被发送到死信主题时)将通过新事务提交。设置此属性以
false禁止提交已恢复记录的偏移量。默认值:true。
- commonErrorHandlerBeanName
-
CommonErrorHandler每个消费者绑定使用的 bean 名称。当存在时,该用户提供的CommonErrorHandler优先级高于绑定器定义的任何其他错误处理程序。如果应用程序不想使用ListenerContainerCustomizer然后检查目标/组组合来设置错误处理程序,那么这是表达错误处理程序的便捷方法。默认值:无。
1.3.3. 重置偏移
当应用程序启动时,每个分配的分区中的初始位置取决于两个属性startOffset和resetOffsets。如果resetOffsets是false,则应用正常的 Kafka 消费者auto.offset.reset语义。即,如果绑定的消费者组的分区没有提交的偏移量,则位置为earliest或latest。默认情况下,具有显式groupuse的绑定earliest和匿名绑定(不具有 no group) use latest。可以通过设置绑定属性来覆盖这些默认值startOffset。第一次使用特定的绑定启动时,不会有提交的偏移量group。不存在已提交偏移量的另一个条件是偏移量是否已过期。对于现代经纪人(自 2.1 起)和默认经纪人属性,偏移量将在最后一个成员离开组后 7 天过期。offsets.retention.minutes有关详细信息,请参阅经纪人属性。
当resetOffsetsis 时true,绑定器应用与代理上没有提交偏移量时应用的语义相似的语义,就好像此绑定从未从主题中消耗过一样;即忽略任何当前提交的偏移量。
以下是可能使用此功能的两个用例。
-
从包含键/值对的压缩主题中进行消费。设置
resetOffsets为true和;startOffset_earliest绑定将seekToBeginning在所有新分配的分区上执行。 -
从包含事件的主题中使用,您只对此绑定运行时发生的事件感兴趣。设置
resetOffsets为true和;startOffset_latest绑定将seekToEnd在所有新分配的分区上执行。
| 如果在初始分配后发生重新平衡,则只会在初始分配期间未分配的任何新分配的分区上执行查找。 |
有关主题偏移的更多控制,请参阅使用 KafkaBindingRebalanceListener;当提供监听器时,resetOffsets不应设置为true,否则会导致错误。
1.3.4. 消耗批次
从版本 3.0 开始,当spring.cloud.stream.binding.<name>.consumer.batch-mode设置为时true,通过轮询 Kafka 收到的所有记录都Consumer将作为 呈现给List<?>侦听器方法。否则,该方法将一次调用一条记录。批次的大小由 Kafka 消费者属性max.poll.records, fetch.min.bytes,控制fetch.max.wait.ms;请参阅 Kafka 文档以获取更多信息。
从 version 开始4.0.2,binder 在以批处理模式消费时支持 DLQ 功能。请记住,在批处理模式下对使用者绑定使用 DLQ 时,从先前轮询收到的所有记录都将传递到 DLQ 主题。
使用批处理模式时,不支持在活页夹内重试,因此maxAttempts将被覆盖为 1。您可以配置 a DefaultErrorHandler(使用 a ListenerContainerCustomizer)来实现在活页夹中重试的类似功能。您还可以使用手册AckMode和调用Ackowledgment.nack(index, sleep)来提交部分批次的偏移量并重新交付剩余的记录。有关这些技术的更多信息,
请参阅Spring for Apache Kafka 文档。 |
1.3.5。卡夫卡生产者属性
为了避免重复,Spring Cloud Stream 支持为所有通道设置值,格式为spring.cloud.stream.kafka.default.producer.<property>=<value>.
|
以下属性仅适用于 Kafka 生产者,并且必须以 为前缀spring.cloud.stream.kafka.bindings.<channelName>.producer.。
- admin.configuration
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.properties,并且在未来的版本中将删除对它的支持。 - admin.replicas-assignment
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replicas-assignment,并且在未来的版本中将删除对它的支持。 - admin.replication-factor
-
从版本 2.1.1 开始,此属性已被弃用,取而代之的是
topic.replication-factor,并且在未来的版本中将删除对它的支持。 - bufferSize
-
Kafka 生产者在发送之前尝试批处理的数据量上限(以字节为单位)。
默认值:
16384. - sync
-
生产者是否同步。
默认值:
false. - sendTimeoutExpression
-
根据传出消息计算的 SpEL 表达式用于计算启用同步发布时等待确认的时间 - 例如,
headers['mySendTimeout']. 超时值以毫秒为单位。对于 3.0 之前的版本,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]. 现在,在转换有效负载之前先计算表达式。默认值:
none. - batchTimeout
-
生产者在发送消息之前等待多长时间以允许在同一批次中累积更多消息。(通常情况下,生产者根本不等待,只是发送上一次发送过程中累积的所有消息。)非零值可能会增加吞吐量,但会增加延迟。
默认值:
0. - messageKeyExpression
-
根据传出消息评估的 SpEL 表达式用于填充生成的 Kafka 消息的键 - 例如,
headers['myKey']。对于 3.0 之前的版本,除非使用本机编码,否则无法使用有效负载,因为在计算此表达式时,有效负载已经采用byte[]. 现在,在转换有效负载之前先计算表达式。Function<String, String>在常规处理器(或)的情况下Function<Message<?>, Message<?>,如果生成的密钥需要与来自主题的传入密钥相同,则可以如下设置此属性。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']对于反应函数,有一个重要的警告需要记住。在这种情况下,由应用程序手动将标头从传入消息复制到出站消息。您可以设置标头,例如myKey并按照上面的建议使用headers['myKey'],或者为了方便起见,只需设置KafkaHeaders.MESSAGE_KEY标头,并且根本不需要设置此属性。默认值:
none. - headerPatterns
-
以逗号分隔的简单模式列表,用于匹配要映射
Headers到ProducerRecord. 模式可以以通配符(星号)开始或结束。可以通过添加前缀来否定模式!。匹配在第一个匹配(正或负)后停止。例如!ask,as*会通过ash但不会ask。id并且timestamp从未被映射。默认值:(
*所有标头 - 除了id和timestamp) - configuration
-
具有包含通用 Kafka 生产者属性的键/值对的映射。
bootstrap.servers此处无法设置该属性;如果需要连接到多个集群,请使用多绑定程序支持。默认:空地图。
- topic.properties
-
供应新主题时使用的 Kafka 主题属性
Map- 例如,spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0 - topic.replicas-assignment
-
副本分配的 Map<Integer, List<Integer>>,键是分区,值是分配。在配置新主题时使用。请参阅jar
NewTopic中的 Javadocskafka-clients。默认值:无。
- topic.replication-factor
-
配置主题时使用的复制因子。覆盖活页夹范围的设置。
replicas-assignments如果存在则忽略。默认值:无(使用活页夹范围内的默认值 -1)。
- useTopicHeader
-
设置为以使用出站消息中的消息标头
true的值覆盖默认绑定目标(主题名称) 。KafkaHeaders.TOPIC如果标头不存在,则使用默认绑定目标。默认值:
false. - recordMetadataChannel
-
MessageChannel成功发送结果应发送到的bean 名称;bean 必须存在于应用程序上下文中。发送到通道的消息是带有附加 header 的发送消息(转换后,如果有的话)KafkaHeaders.RECORD_METADATA。header包含RecordMetadataKafka客户端提供的对象;它包括主题中写入记录的分区和偏移量。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)失败的发送进入生产者错误通道(如果配置);请参阅错误通道。
默认值:空。
Kafka 绑定器使用partitionCount生产者的设置作为提示来创建具有给定分区计数的主题(与 结合使用minPartitionCount,两者中的最大值为所使用的值)。配置minPartitionCount活页夹和partitionCount应用程序时请务必小心,因为使用较大的值。如果已存在分区数较小的主题并且autoAddPartitions被禁用(默认),则绑定器将无法启动。如果某个主题已存在且分区数量较少且autoAddPartitions已启用,则会添加新分区。如果已存在主题的分区数量大于 (minPartitionCount或partitionCount) 的最大值,则使用现有分区计数。
|
- compression
-
设置
compression.type生产者属性。支持的值为none、gzip、snappy和lz4。zstd如果您将kafka-clientsjar 覆盖到 2.1.0(或更高版本)(如Spring for Apache Kafka 文档中所述),并且希望使用zstd压缩,请使用spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd.默认值:
none. - transactionManager
-
KafkaAwareTransactionManager用于覆盖此绑定的绑定器事务管理器的 Bean 名称。如果您想使用 Kafka 事务同步另一个事务,通常需要使用ChainedKafkaTransactionManaager. 为了实现记录的一次性消费和生产,消费者和生产者绑定必须全部配置有相同的事务管理器。默认值:无。
- closeTimeout
-
关闭生产者时等待的超时秒数。
默认:
30 - allowNonTransactional
-
通常,与事务绑定器关联的所有输出绑定都将在新事务中发布(如果尚未在处理中)。此属性允许您覆盖该行为。如果设置为 true,发布到此输出绑定的记录将不会在事务中运行,除非事务已在处理中。
默认:
false
1.3.6。使用示例
在本节中,我们将展示上述属性在特定场景中的使用。
示例:设置ackMode并MANUAL依赖手动确认
此示例说明了如何在消费者应用程序中手动确认偏移量。
本示例要求spring.cloud.stream.kafka.bindings.input.consumer.ackMode将其设置为MANUAL。对于您的示例,请使用相应的输入通道名称。
@SpringBootApplication
public class ManuallyAcknowdledgingConsumer {
public static void main(String[] args) {
SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
}
@Bean
public Consumer<Message<?>> process() {
return message -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
示例:安全配置
Apache Kafka 0.9 支持客户端和代理之间的安全连接。要利用此功能,请遵循Apache Kafka 文档中的指南以及Confluence 文档中的 Kafka 0.9 安全指南。使用该spring.cloud.stream.kafka.binder.configuration选项为活页夹创建的所有客户端设置安全属性。
例如,要设置security.protocol为SASL_SSL,请设置以下属性:
spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_SSL
所有其他安全属性都可以以类似的方式设置。
使用 Kerberos 时,请按照参考文档中的说明创建和引用 JAAS 配置。
Spring Cloud Stream 支持使用 JAAS 配置文件和 Spring Boot 属性将 JAAS 配置信息传递到应用程序。
使用 JAAS 配置文件
可以使用系统属性为 Spring Cloud Stream 应用程序设置 JAAS 和(可选)krb5 文件位置。以下示例演示如何使用 JAAS 配置文件启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:
java -Djava.security.auth.login.config=/path.to/kafka_client_jaas.conf -jar log.jar \
--spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT
使用 Spring Boot 属性
作为 JAAS 配置文件的替代方案,Spring Cloud Stream 提供了一种使用 Spring Boot 属性为 Spring Cloud Stream 应用程序设置 JAAS 配置的机制。
以下属性可用于配置 Kafka 客户端的登录上下文:
- spring.cloud.stream.kafka.binder.jaas.loginModule
-
登录模块名称。一般情况下无需设置。
默认值:
com.sun.security.auth.module.Krb5LoginModule. - spring.cloud.stream.kafka.binder.jaas.controlFlag
-
登录模块的控制标志。
默认值:
required. - spring.cloud.stream.kafka.binder.jaas.options
-
具有包含登录模块选项的键/值对的映射。
默认:空地图。
以下示例演示如何使用 Spring Boot 配置属性启动具有 SASL 和 Kerberos 的 Spring Cloud Stream 应用程序:
java --spring.cloud.stream.kafka.binder.brokers=secure.server:9092 \
--spring.cloud.stream.bindings.input.destination=stream.ticktock \
--spring.cloud.stream.kafka.binder.autoCreateTopics=false \
--spring.cloud.stream.kafka.binder.configuration.security.protocol=SASL_PLAINTEXT \
--spring.cloud.stream.kafka.binder.jaas.options.useKeyTab=true \
--spring.cloud.stream.kafka.binder.jaas.options.storeKey=true \
--spring.cloud.stream.kafka.binder.jaas.options.keyTab=/etc/security/keytabs/kafka_client.keytab \
--spring.cloud.stream.kafka.binder.jaas.options.principal=kafka-client-1@EXAMPLE.COM
前面的示例相当于以下 JAAS 文件:
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
keyTab="/etc/security/keytabs/kafka_client.keytab"
principal="kafka-client-1@EXAMPLE.COM";
};
如果所需的主题已存在于代理上或将由管理员创建,则可以关闭自动创建,并且仅需要发送客户端 JAAS 属性。
不要在同一应用程序中混合使用 JAAS 配置文件和 Spring Boot 属性。如果-Djava.security.auth.login.config系统属性已经存在,Spring Cloud Stream 会忽略 Spring Boot 属性。
|
autoCreateTopics将和与 Kerberos 一起
使用时要小心autoAddPartitions。通常,应用程序可能使用在 Kafka 和 Zookeeper 中没有管理权限的主体。因此,依赖 Spring Cloud Stream 创建/修改主题可能会失败。在安全环境中,我们强烈建议使用 Kafka 工具创建主题并管理 ACL。
|
多绑定器配置和 JAAS
当连接到多个集群时,其中每个集群都需要单独的 JAAS 配置,请使用 属性设置 JAAS 配置sasl.jaas.config。当应用程序中存在此属性时,它优先于上述其他策略。有关更多详细信息,请参阅KIP-85 。
例如,如果您的应用程序中有两个具有单独 JAAS 配置的集群,则您可以使用以下模板:
spring.cloud.stream:
binders:
kafka1:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\";"
kafka2:
type: kafka
environment:
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9093
configuration.sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user1\" password=\"user1-secret\";"
kafka.binder:
configuration:
security.protocol: SASL_PLAINTEXT
sasl.mechanism: PLAIN
请注意,上述配置中的两个 Kafka 集群以及sasl.jaas.config每个集群的值都不同。
有关如何设置和运行此类应用程序的更多详细信息,请参阅此示例应用程序。
示例:暂停和恢复消费者
如果您希望暂停消费但不导致分区重新平衡,则可以暂停并恢复消费。这是通过管理绑定生命周期来实现的,如Spring Cloud Stream 文档中的绑定可视化和控制所示,使用State.PAUSED和State.RESUMED。
要恢复,您可以使用ApplicationListener(或@EventListener方法)来接收ListenerContainerIdleEvent实例。事件发布的频率由idleEventInterval酒店控制。
1.4. 交易活页夹
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix通过设置为非空值来启用事务,例如tx-。当在处理器应用程序中使用时,消费者启动交易;在消费者线程上发送的任何记录都参与同一事务。当监听器正常退出时,监听器容器会将偏移量发送到事务并提交。通用生产者工厂用于使用spring.cloud.stream.kafka.binder.transaction.producer.*属性配置的所有生产者绑定;单个绑定 Kafka 生产者属性将被忽略。
事务不支持正常的活页夹重试(和死信),因为重试将在原始事务中运行,原始事务可能会回滚,并且任何已发布的记录也将回滚。当启用重试时(公共属性maxAttempts大于零),重试属性用于配置DefaultAfterRollbackProcessor以在容器级别启用重试。类似地,该功能不是在事务中发布死信记录,而是再次通过DefaultAfterRollbackProcessor主事务回滚后运行的侦听器容器移至侦听器容器。
|
如果您希望在源应用程序中使用事务,或者从某个任意线程进行仅生产者事务(例如@Scheduled方法),则必须获取对事务生产者工厂的引用并使用KafkaTransactionManager它定义一个 bean。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
请注意,我们使用 ; 获取对活页夹的引用BinderFactory。null当仅配置一个绑定器时,在第一个参数中使用。如果配置了多个活页夹,请使用活页夹名称来获取引用。一旦我们有了对绑定器的引用,我们就可以获得对绑定器的引用ProducerFactory并创建一个事务管理器。
然后你将使用普通的 Spring 事务支持,例如TransactionTemplate或@Transactional,例如:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
如果您希望将仅生产者事务与其他事务管理器的事务同步,请使用ChainedTransactionManager.
如果您部署应用程序的多个实例,则每个实例都需要一个唯一的transactionIdPrefix.
|
1.5. 错误通道
从版本 1.3 开始,绑定器无条件地将异常发送到每个消费者目标的错误通道,并且还可以配置为将异步生产者发送失败发送到错误通道。有关详细信息,请参阅有关错误处理的部分。
ErrorMessage发送失败的负载是KafkaSendFailureException具有以下属性的:
-
failedMessage:Spring消息Message<?>发送失败。 -
recordProducerRecord:从创建的原始failedMessage
没有自动处理生产者异常(例如发送到死信队列)。您可以使用自己的 Spring Integration 流程来使用这些异常。
1.6. 卡夫卡指标
Kafka Binder 模块公开以下指标:
spring.cloud.stream.binder.kafka.offset:该指标指示给定消费者组尚未从给定活页夹主题中消费多少消息。提供的指标基于 Micrometer 库。KafkaBinderMetrics如果 Micrometer 位于类路径上并且应用程序没有提供其他此类 bean,则绑定器将创建该bean。该指标包含消费者组信息、主题以及提交偏移量与主题上最新偏移量的实际滞后。该指标对于向 PaaS 平台提供自动缩放反馈特别有用。
可以通过在命名空间中设置属性来配置指标收集行为spring.cloud.stream.kafka.binder.metrics,请参阅kafka活页夹属性部分以获取更多信息。
您可以排除KafkaBinderMetrics创建必要的基础设施(例如消费者),然后通过在应用程序中提供以下组件来报告指标。
@Component
class NoOpBindingMeters {
NoOpBindingMeters(MeterRegistry registry) {
registry.config().meterFilter(
MeterFilter.denyNameStartsWith(KafkaBinderMetrics.OFFSET_LAG_METRIC_NAME));
}
}
有关如何有选择地抑制仪表的更多详细信息,请参阅此处。
1.7. 墓碑记录(空记录值)
当使用压缩主题时,带有值的记录null(也称为逻辑删除记录)代表键的删除。要在 Spring Cloud Stream 函数中接收此类消息,您可以使用以下策略。
@Bean
public Function<Message<Person>, String> myFunction() {
return value -> {
Object v = value.getPayload();
String className = v.getClass().getName();
if (className.isEqualTo("org.springframework.kafka.support.KafkaNull")) {
// this is a tombstone record
}
else {
// continue with processing
}
};
}
1.8. 使用 KafkaBindingRebalanceListener
应用程序可能希望在最初分配分区时寻找任意偏移量的主题/分区,或者对使用者执行其他操作。从版本 2.1 开始,如果您在应用程序上下文中提供单个KafkaBindingRebalanceListenerbean,它将连接到所有 Kafka 使用者绑定中。
public interface KafkaBindingRebalanceListener {
/**
* Invoked by the container before any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedBeforeCommit(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
/**
* Invoked by the container after any pending offsets are committed.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
*/
default void onPartitionsRevokedAfterCommit(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
/**
* Invoked when partitions are initially assigned or after a rebalance.
* Applications might only want to perform seek operations on an initial assignment.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions,
boolean initial) {
}
}
当您提供重新平衡侦听器时,您无法将resetOffsets消费者属性设置为。true
1.9. 重试和死信处理
默认情况下,当您配置重试(例如maxAttemts)并enableDlq在消费者绑定中时,这些功能将在绑定器中执行,侦听器容器或 Kafka 消费者不会参与。
在某些情况下,最好将此功能移至侦听器容器,例如:
-
重试和延迟的总和将超出消费者的
max.poll.interval.ms财产,可能导致分区重新平衡。 -
您希望将死信发布到不同的 Kafka 集群。
-
您希望向错误处理程序添加重试侦听器。
-
...
要配置将此功能从活页夹移动到容器,请定义 of@Bean类型ListenerContainerWithDlqAndRetryCustomizer。该接口有以下方法:
/**
* Configure the container.
* @param container the container.
* @param destinationName the destination name.
* @param group the group.
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
* enableDlq).
* @param backOff the backOff using retry properties (if configured).
* @see #retryAndDlqInBinding(String, String)
*/
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff);
/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
* @param destinationName the destination name.
* @param group the group.
* @return false to disable retries and DLQ in the binding
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
目标解析器BackOff是根据绑定属性(如果已配置)创建的。使用属性KafkaTemplate中的配置spring.kafka….。然后,您可以使用它们来创建自定义错误处理程序和死信发布者;例如:
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
if (destinationName.equals("topicWithLongTotalRetryConfig")) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return !destinationName.contains("topicWithLongTotalRetryConfig");
}
};
}
现在,只需单次重试延迟需要大于消费者的max.poll.interval.ms财产。
当使用多个绑定器时,“ListenerContainerWithDlqAndRetryCustomizer”bean 会被“DefaultBinderFactory”覆盖。对于要应用的 bean,您需要使用“BinderCustomizer”来设置容器定制器(请参阅[binder-customizer]):
@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}
1.10. 自定义消费者和生产者配置
如果您想要对用于创建ConsumerFactory和ProducerFactory在 Kafka 中使用的消费者和生产者配置进行高级定制,您可以实现以下定制器。
-
消费者配置定制器
-
生产者配置定制器
这两个接口都提供了一种配置用于消费者和生产者属性的配置映射的方法。例如,如果您想要访问在应用程序级别定义的 bean,您可以将其注入到方法的实现中configure。当绑定器发现这些定制器可用作 bean 时,它将configure在创建消费者和生产者工厂之前调用该方法。
这两个接口还提供对绑定名称和目标名称的访问,以便在自定义生产者和消费者属性时可以访问它们。
1.11. 自定义 AdminClient 配置
与上面的消费者和生产者配置自定义一样,应用程序还可以通过提供AdminClientConfigCustomizer. AdminClientConfigCustomizer 的配置方法提供对管理客户端属性的访问,您可以使用它来定义进一步的自定义。Binder 的 Kafka 主题配置器为通过此定制器给出的属性提供最高优先级。下面是提供此定制器 bean 的示例。
@Bean
public AdminClientConfigCustomizer adminClientConfigCustomizer() {
return props -> {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
};
}
1.12. 自定义 Kafka Binder 健康状况指示器
当 Spring Boot 执行器位于类路径上时,Kafka 绑定器会激活默认的运行状况指示器。该运行状况指示器检查活页夹的运行状况以及与 Kafka 代理的任何通信问题。如果应用程序想要禁用此默认运行状况检查实现并包含自定义实现,那么它可以提供接口的实现KafkaBinderHealth。
KafkaBinderHealth是一个从 扩展而来的标记接口HealthIndicator。在自定义实现中,它必须提供该health()方法的实现。自定义实现必须作为 bean 存在于应用程序配置中。当绑定器发现自定义实现时,它将使用该自定义实现而不是默认实现。下面是应用程序中此类自定义实现 bean 的示例。
@Bean
public KafkaBinderHealth kafkaBinderHealthIndicator() {
return new KafkaBinderHealth() {
@Override
public Health health() {
// custom implementation details.
}
};
}
1.13。死信主题处理
1.13.1. 死信主题分区选择
默认情况下,记录使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少具有与原始记录一样多的分区。
要更改此行为,请将实现添加DlqPartitionFunction到@Bean应用程序上下文中。只能存在一颗这样的 bean。该功能提供了消费者组、失败者ConsumerRecord和异常。例如,如果您总是想路由到分区 0,您可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将使用者绑定的dlqPartitions属性设置为 1(并且绑定器的属性minPartitionCount等于1),则无需提供DlqPartitionFunction; 框架将始终使用分区 0。如果将使用者绑定的dlqPartitions属性设置为大于1(或绑定器的minPartitionCount属性大于1)的值,则必须提供一个DlqPartitionFunctionbean,即使分区计数与原始主题的相同。
|
还可以为 DLQ 主题定义自定义名称。为此,请创建应用程序上下文DlqDestinationResolver的实现。@Bean当绑定器检测到这样的 bean 时,它优先,否则它将使用该dlqName属性。如果这些都没有找到,则默认为error.<destination>.<group>. DlqDestinationResolver这是一个as a的示例@Bean。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在提供实现时要记住的一件重要事情DlqDestinationResolver是活页夹中的配置程序不会自动为应用程序创建主题。这是因为绑定器无法推断实现可能发送到的所有 DLQ 主题的名称。因此,如果您使用此策略提供 DLQ 名称,则应用程序有责任确保预先创建这些主题。
1.13.2. 处理死信主题中的记录
由于该框架无法预测用户希望如何处理死信消息,因此它不提供任何标准机制来处理它们。如果死信的原因是暂时的,您可能希望将消息路由回原始主题。但是,如果问题是永久性问题,则可能会导致无限循环。本主题中的示例 Spring Boot 应用程序是如何将这些消息路由回原始主题的示例,但在三次尝试后将它们移至“停车场”主题。该应用程序是另一个从死信主题读取的 spring-cloud-stream 应用程序。当5秒内没有收到消息时退出。
这些示例假设原始目的地是so8400out,消费者组是so8400。
有几个策略需要考虑:
-
仅当主应用程序未运行时才考虑运行重新路由。否则,短暂错误的重试很快就会用完。
-
或者,使用两阶段方法:使用此应用程序路由到第三个主题,使用另一个应用程序从那里路由回主要主题。
以下代码清单显示了示例应用程序:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private StreamBridge streamBridge;
@Bean
public Function<Message<?>, Message<?>> reRoute() {
return failed -> {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, retries + 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
};
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, exiting");
return;
}
}
}
}
1.14. 使用 Kafka Binder 进行分区
Apache Kafka 本身支持主题分区。
有时,将数据发送到特定分区是有利的 - 例如,当您想要严格排序消息处理时(特定客户的所有消息都应发送到同一分区)。
下面的例子展示了如何配置生产者和消费者端:
@SpringBootApplication
public class KafkaPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"foo1", "bar1", "qux1",
"foo2", "bar2", "qux2",
"foo3", "bar3", "qux3",
"foo4", "bar4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.topic
producer:
partition-key-expression: headers['partitionKey']
partition-count: 12
请务必记住,由于 Apache Kafka 本身支持分区,因此无需依赖如上所述的绑定器分区,除非您使用示例中的自定义分区键或涉及有效负载本身的表达式。绑定器提供的分区选择适用于不支持本机分区的中间件技术。请注意,我们使用的是partitionKey上面示例中调用的自定义键,这将是分区的决定因素,因此在这种情况下,适合使用活页夹分区。当使用本机 Kafka 分区时,即当您不提供 时partition-key-expression,Apache Kafka 将选择一个分区,默认情况下,该分区将是记录键在可用分区数量上的哈希值。要将键添加到出站记录,请将KafkaHeaders.KEY标头设置为 spring-messaging 中所需的键值Message<?>。默认情况下,当没有提供记录键时,Apache Kafka 将根据Apache Kafka 文档中描述的逻辑选择分区。
|
必须配置主题以具有足够的分区,以便为所有使用者组实现所需的并发性。上述配置最多支持 12 个消费者实例(如果并发数concurrency为 2,则为 6 个;如果并发数为 3,则为 4 个,依此类推)。通常最好“过度配置”分区,以允许未来消费者或并发的增加。
|
上述配置使用默认分区 ( key.hashCode() % partitionCount)。这可能会也可能不会提供适当的平衡算法,具体取决于键值。特别要注意的是,这种分区策略与独立 Kafka 生产者使用的默认策略不同 - 例如 Kafka Streams 使用的策略,这意味着当由这些客户端生成时,相同的键值可能会在分区之间进行不同的平衡。您可以使用partitionSelectorExpression或partitionSelectorClass属性覆盖此默认值。
|
由于分区是由 Kafka 原生处理的,因此消费者端不需要特殊配置。Kafka 在实例之间分配分区。
| kafka主题的partitionCount可能在运行时发生变化(例如由于管理任务)。此后计算的分区将有所不同(例如,随后将使用新的分区)。从 Spring Cloud Stream 4.0.3 开始,将支持分区计数运行时更改。另请参阅参数“spring.kafka. Producer.properties.metadata.max.age.ms”来配置更新间隔。由于某些限制,无法使用引用消息“有效负载”的“分区键表达式”,在这种情况下该机制将被禁用。默认情况下,整体行为处于禁用状态,可以使用配置参数“ Producer.dynamicPartitionUpdatesEnabled = true”来启用。 |
以下 Spring Boot 应用程序侦听 Kafka 流并打印(到控制台)每条消息所在的分区 ID:
@SpringBootApplication
public class KafkaPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
.web(WebApplicationType.NONE)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
System.out.println(message + " received from partition " + partition);
};
}
}
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.topic
group: myGroup
您可以根据需要添加实例。Kafka 重新平衡分区分配。如果实例计数(或instance count * concurrency)超过分区数量,则一些消费者处于空闲状态。
2.反应式Kafka Binder
Spring Cloud Stream 中的 Kafka Binder 提供了基于Reactor Kafka项目的专用反应式 Binder。这种反应式 Kafka 绑定器在基于 Apache Kafka 的应用程序中实现了完整的端到端反应功能,例如反压、反应流等。Flux当您的 Spring Cloud Stream Kafka 应用程序使用响应式类型(等)编写时Mono,建议使用此响应式 Kafka 绑定器,而不是基于常规消息通道的 Kafka 绑定器。
2.1. Maven坐标
以下是反应式 Kafka 绑定器的 Maven 坐标。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-reactive</artifactId>
</dependency>
2.2. 使用 Reactive Kafka Binder 的基本示例
在本节中,我们将展示一些使用反应式绑定器编写反应式 Kafka 应用程序的基本代码片段及其相关详细信息。
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return s -> s.map(String::toUpperCase);
}
您可以将上述upppercase函数与基于消息通道的 Kafka 绑定器 ( spring-cloud-stream-binder-kafka) 以及反应式 Kafka 绑定器 ( spring-cloud-stream-binder-kafka-reactive) 一起使用,这是本节讨论的主题。当将此函数与常规 Kafka 绑定器一起使用时,尽管您在应用程序中(即在函数中uppercase)使用响应式类型,但您只能在函数执行过程中获得响应式流。在函数的执行上下文之外,没有反应性优势,因为底层绑定器不基于反应性堆栈。因此,尽管这看起来像是带来了完整的端到端反应式堆栈,但该应用程序只是部分反应式的。
现在假设您正在使用适用于 Kafka 的反应式绑定器 -spring-cloud-stream-binder-kafka-reactive以及上述函数的应用程序。这种绑定器实现将提供从顶端的消费到链底端的发布的全部反应性优势。这是因为底层绑定器是构建在Reactor Kafka的核心 API之上的。在消费者方面,它使用 KafkaReceiver ,它是 Kafka 消费者的反应式实现。同样,在生产者方面,它使用KafkaSender API,这是 Kafka 生产者的反应式实现。由于反应式 Kafka 绑定器的基础是建立在适当的反应式 Kafka API 之上的,因此应用程序可以获得使用反应式技术的全部好处。使用此反应式 Kafka 绑定器时,应用程序会内置自动背压等反应功能。
从版本 4.0.2 开始,您可以通过分别提供一个或多个或beans来自定义ReceiverOptions和。它们接收绑定名称和初始选项,返回自定义选项。接口经过扩展,因此当存在多个定制器时,将按照所需的顺序应用定制器。SenderOptionsReceiverOptionsCustomizerSenderOptionsCustomizerBiFunctionOrdered
默认情况下,绑定器不提交偏移量。从版本 4.0.2 开始,KafkaHeaders.ACKNOWLEDGMENT标头包含一个对象,该对象允许您通过调用其或方法ReceiverOffset来提交偏移量。
acknowledge()commit() |
@Bean
public Consumer<Flux<Message<String>> consume() {
return msg -> {
process(msg.getPayload());
msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, ReceiverOffset.class).acknowledge();
}
}
reactor-kafka有关详细信息,请参阅文档和 javadocs。
此外,从版本4.0.3开始,可以将Kafka Consumer属性reactiveAtmostOnce设置为true,并且Binder将在处理每个轮询返回的记录之前自动提交偏移量。另外,从版本4.0.3开始,您可以将consumer属性设置reactiveAutoCommit为true,并且binder将在处理每个poll返回的记录后自动提交偏移量。在这些情况下,确认标头不存在。
4.0.2 也提供了reactiveAutoCommit,但实现不正确,它的行为与reactiveAtMostOnce.
|
以下是如何使用的示例reaciveAutoCommit。
@Bean
Consumer<Flux<Flux<ConsumerRecord<?, String>>>> input() {
return flux -> flux
.doOnNext(inner -> inner
.doOnNext(val -> {
log.info(val.value());
})
.subscribe())
.subscribe();
}
请注意,使用自动提交时reactor-kafka返回 a 。Flux<Flux<ConsumerRecord<?, ?>>>鉴于 Spring 无法访问内部 Flux 的内容,应用程序必须处理 Native ConsumerRecord;没有对内容应用消息转换或转换服务。这需要使用本机解码(通过Deserializer在配置中指定适当的类型)来返回所需类型的记录键/值。
2.3. 使用原始格式的记录
在上面的upppercase函数中,我们使用记录 as Flux<String>,然后将其生成为Flux<String>。有时您可能需要以原始接收格式接收记录 - ReceiverRecord. 这是这样一个函数。
@Bean
public Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<String>> lowercase() {
return s -> s.map(rec -> new String(rec.value()).toLowerCase());
}
在此函数中,请注意,我们将记录消费为Flux<ReceiverRecord<byte[], byte[]>>,然后将其生成为Flux<String>。
ReceiverRecord是基本接收记录,是ConsumerRecordReactor Kafka中的一个专门的Kafka。使用反应式 Kafka 绑定器时,上述函数将使您能够访问ReceiverRecord每个传入记录的类型。但是,在这种情况下,您需要为RecordMessageConverter提供自定义实现。默认情况下,反应式 Kafka 绑定器使用MessagingMessageConverter将有效负载和标头从ConsumerRecord. 因此,当您的处理程序方法接收到它时,有效负载已经从接收到的记录中提取出来并传递给该方法,就像我们上面看到的第一个函数的情况一样。通过RecordMessageConverter在应用程序中提供自定义实现,您可以覆盖默认行为。例如,如果您想将记录作为原始数据使用Flux<ReceiverRecord<byte[], byte[]>>,那么您可以在应用程序中提供以下 bean 定义。
@Bean
RecordMessageConverter fullRawReceivedRecord() {
return new RecordMessageConverter() {
private final RecordMessageConverter converter = new MessagingMessageConverter();
@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type payloadType) {
return MessageBuilder.withPayload(record).build();
}
@Override
public ProducerRecord<?, ?> fromMessage(Message<?> message, String defaultTopic) {
return this.converter.fromMessage(message, defaultTopic);
}
};
}
然后,您需要指示框架使用此转换器进行所需的绑定。这是一个基于我们的lowercase函数的示例。
spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRawReceivedRecord"
lowercase-in-0是我们函数的输入绑定名称lowercase。对于出站( lowecase-out-0),我们仍然使用常规的MessagingMessageConverter.
在toMessage上面的实现中,我们接收原始数据ConsumerRecord(ReceiverRecord因为我们处于反应式绑定器上下文中),然后将其包装在Message. 然后将该消息有效负载ReceiverRecord提供给用户方法。
如果reactiveAutoCommit是false(默认),则调用rec.receiverOffset().acknowledge()(或commit()) 来提交偏移量;如果reactiveAutoCommit是,则true磁通供应s 。有关详细信息,ConsumerRecord请参阅文档和 javadocs。reactor-kafka
2.4. 并发性
当将反应式函数与反应式 Kafka 绑定器一起使用时,如果您在使用者绑定上设置并发性,则绑定器将创建KafkaReceiver与并发值提供的数量一样多的专用对象。换句话说,这会创建多个具有单独Flux实现的反应流。当您使用分区主题中的记录时,这可能很有用。
例如,假设传入主题至少有三个分区。然后您可以设置以下属性。
spring.cloud.stream.bindings.lowercase-in-0.consumer.concurrency=3
这将创建三个专用KafkaReceiver对象,它们生成三个单独的Flux实现,然后将它们流式传输到处理程序方法。
2.5. 多路复用
multiplex从版本 4.0.3 开始,反应式绑定器现在支持公共消费者属性,其中单个绑定可以从多个主题进行消费。当false(默认)时,将为公共属性中逗号分隔列表中指定的每个主题创建单独的绑定destination。
2.6。目的地是模式
从版本 4.0.3 开始,destination-is-pattern现在支持 Kafka 绑定消费者属性。接收器选项使用 regex 配置Pattern,允许绑定从与模式匹配的任何主题进行消费。
2.7. 发送结果通道
从版本 4.0.3 开始,您可以配置resultMetadataChannelto receiveSenderResult<?>来确定发送的成功/失败。
包含SenderResult允许correlationMetadata您将结果与发送关联起来;它还包含RecordMetadata,表示TopicPartition发送记录的 和 偏移量。
必须resultMetadataChannel 是一个FluxMessageChannel实例。
以下是如何使用此功能的示例,其中相关元数据类型为Integer:
@Bean
FluxMessageChannel sendResults() {
return new FluxMessageChannel();
}
@ServiceActivator(inputChannel = "sendResults")
void handleResults(SenderResult<Integer> result) {
if (result.exception() != null) {
failureFor(result);
}
else {
successFor(result);
}
}
要在输出记录上设置相关元数据,请设置CORRELATION_ID标头:
streamBridge.send("words1", MessageBuilder.withPayload("foobar")
.setCorrelationId(42)
.build());
当与 a 一起使用该功能时Function,函数输出类型必须是 a ,Message<?>并且相关 id 标头设置为所需的值。
元数据应该是唯一的,至少在发送期间如此。
3.Kafka Streams Binder
3.1. 用法
要使用 Kafka Streams 绑定器,您只需使用以下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
为 Kafka Streams 绑定器引导新项目的快速方法是使用Spring Initializr,然后选择“Cloud Streams”和“Spring for Kafka Streams”,如下所示
3.2. 概述
Spring Cloud Stream 包含一个专门为Apache Kafka Streams绑定设计的绑定器实现。通过这种本机集成,Spring Cloud Stream“处理器”应用程序可以直接在核心业务逻辑中使用 Apache Kafka Streams API。
Kafka Streams Binder 为 Kafka Streams 中的三种主要类型提供了绑定功能 - KStream、KTable和GlobalKTable。
Kafka Streams 应用程序通常遵循以下模型:从入站主题读取记录,应用业务逻辑,然后将转换后的记录写入出站主题。或者,也可以定义没有出站目的地的处理器应用程序。
在下面的部分中,我们将详细了解 Spring Cloud Stream 与 Kafka Streams 的集成。
3.3. 编程模型
当使用 Kafka Streams binder 提供的编程模型时,高级Streams DSL以及高级和低级Processor-API的混合都可以用作选项。当混合较高级别和较低级别的 API 时,这通常是通过调用transform或process上的 API 方法来实现的KStream。
3.3.1. 功能风格
从 Spring Cloud Stream 开始3.0.0,Kafka Streams 绑定器允许使用 Java 8 中提供的函数式编程风格来设计和开发应用程序。这意味着应用程序可以简洁地表示为类型 或 的 lambdajava.util.function.Function表达式java.util.function.Consumer。
让我们举一个非常基本的例子。
@SpringBootApplication
public class SimpleConsumerApplication {
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input ->
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
虽然很简单,但这是一个完整的独立 Spring Boot 应用程序,它利用 Kafka Streams 进行流处理。这是一个消费者应用程序,没有出站绑定,只有一个入站绑定。应用程序使用数据,它只是将键和值的信息记录KStream在标准输出上。该应用程序包含SpringBootApplication注释和标记为 的方法Bean。bean 方法的类型是java.util.function.Consumer用 参数化的KStream。然后在实现中,我们返回一个 Consumer 对象,它本质上是一个 lambda 表达式。在 lambda 表达式内部,提供了处理数据的代码。
在此应用程序中,有一个类型为 的输入绑定KStream。绑定器使用 name 为应用程序创建此绑定process-in-0,即函数 bean 名称的名称,后跟破折号字符 ( -) 和文字,in后跟另一个破折号,然后是参数的序号位置。您可以使用此绑定名称来设置其他属性,例如目标。例如,spring.cloud.stream.bindings.process-in-0.destination=my-topic.
| 如果未在绑定上设置目标属性,则会创建一个与绑定同名的主题(如果应用程序有足够的权限),或者该主题预计已可用。 |
一旦构建为 uber-jar(例如,kstream-consumer-app.jar),您就可以像下面这样运行上面的示例。
如果应用程序选择使用 Spring 的Component注释来定义功能 bean,则绑定器也支持该模型。上面的功能bean可以重写如下。
@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {
@Override
public void accept(KStream<Object, String> input) {
input.foreach((key, value) -> {
System.out.println("Key: " + key + " Value: " + value);
});
}
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic
这是另一个示例,它是一个具有输入和输出绑定的完整处理器。这是经典的字数统计示例,其中应用程序从主题接收数据,然后在滚动时间窗口中计算每个单词出现的次数。
@SpringBootApplication
public class WordCountProcessorApplication {
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(5000))
.count(Materialized.as("word-counts-state-store"))
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))));
}
public static void main(String[] args) {
SpringApplication.run(WordCountProcessorApplication.class, args);
}
}
再次强调,这是一个完整的 Spring Boot 应用程序。这里与第一个应用程序的区别在于 bean 方法的类型为java.util.function.Function。第一个参数化类型用于Function输入KStream,第二个参数化类型用于输出。在方法主体中,提供了一个类型的 lambda 表达式,Function并且作为实现,给出了实际的业务逻辑。与前面讨论的基于消费者的应用程序类似,这里的输入绑定process-in-0默认命名为。对于输出,绑定名称也自动设置为process-out-0。
一旦构建为 uber-jar(例如wordcount-processor.jar),您就可以像下面这样运行上面的示例。
java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts
该应用程序将使用来自 Kafka 主题的消息words,并将计算结果发布到输出主题counts。
Spring Cloud Stream 将确保来自传入和传出主题的消息自动绑定为 KStream 对象。作为开发人员,您可以专注于代码的业务方面,即编写处理器中所需的逻辑。设置 Kafka Streams 基础设施所需的特定配置由框架自动处理。
我们上面看到的两个示例都有一个KStream输入绑定。在这两种情况下,绑定都从单个主题接收记录。如果您想将多个主题复用到一个KStream绑定中,您可以提供逗号分隔的 Kafka 主题作为下面的目标。
spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3
此外,如果您想要将主题与正则表达式进行匹配,您还可以提供主题模式作为目标。
spring.cloud.stream.bindings.process-in-0.destination=input.*
多个输入绑定
许多重要的 Kafka Streams 应用程序通常通过多个绑定使用来自多个主题的数据。例如,一个主题被使用为Kstream,另一个主题被使用为KTable或GlobalKTable。应用程序可能希望接收表类型数据的原因有很多。考虑一个用例,其中基础主题是通过数据库中的变更数据捕获 (CDC) 机制填充的,或者应用程序可能只关心下游处理的最新更新。如果应用程序指定数据需要绑定为KTable或GlobalKTable,那么 Kafka Streams 绑定器将正确地将目标绑定到KTable或GlobalKTable,并使它们可供应用程序进行操作。我们将研究如何在 Kafka Streams 绑定器中处理多个输入绑定的几种不同场景。
Kafka Streams Binder 中的 BiFunction
这是一个示例,其中我们有两个输入和一个输出。在这种情况下,应用程序可以利用java.util.function.BiFunction.
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.toStream());
}
同样,基本主题与前面的示例相同,但这里我们有两个输入。Java 的BiFunction支持用于将输入绑定到所需的目的地。绑定器为输入生成的默认绑定名称分别为process-in-0和process-in-1。默认输出绑定是process-out-0. 在此示例中,第一个参数BiFunction绑定为KStream第一个输入的 a ,第二个参数绑定为KTable第二个输入的 a 。
Kafka Streams Binder 中的 BiConsumer
如果有两个输入,但没有输出,在这种情况下我们可以使用java.util.function.BiConsumer如下所示。
@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
return (userClicksStream, userRegionsTable) -> {}
}
超过两个输入
如果您有两个以上的输入怎么办?在某些情况下,您需要两个以上的输入。在这种情况下,绑定器允许您链接部分函数。在函数式编程术语中,这种技术通常称为柯里化(currying)。随着 Java 8 中添加的函数式编程支持,Java 现在允许您编写柯里化函数。Spring Cloud Stream Kafka Streams 绑定器可以利用此功能来启用多个输入绑定。
让我们看一个例子。
@Bean
public Function<KStream<Long, Order>,
Function<GlobalKTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
return orders -> (
customers -> (
products -> (
orders.join(customers,
(orderId, order) -> order.getCustomerId(),
(order, customer) -> new CustomerOrder(customer, order))
.join(products,
(orderId, customerOrder) -> customerOrder
.productId(),
(customerOrder, product) -> {
EnrichedOrder enrichedOrder = new EnrichedOrder();
enrichedOrder.setProduct(product);
enrichedOrder.setCustomer(customerOrder.customer);
enrichedOrder.setOrder(customerOrder.order);
return enrichedOrder;
})
)
)
);
}
让我们看看上面介绍的绑定模型的详细信息。在此模型中,我们在入站上有 3 个部分应用的功能。我们将它们称为f(x)、f(y)和f(z)。如果我们将这些函数扩展为真正的数学函数,它将看起来像这样:f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder>。变量x代表KStream<Long, Order>,y变量代表GlobalKTable<Long, Customer>,z变量代表GlobalKTable<Long, Product>。第一个函数f(x)具有应用程序的第一个输入绑定 ( KStream<Long, Order>),其输出是函数 f(y)。该函数f(y)具有应用程序的第二个输入绑定 ( GlobalKTable<Long, Customer>),其输出是另一个函数f(z)。该函数的输入f(z)是应用程序的第三个输入 ( GlobalKTable<Long, Product>),其输出是KStream<Long, EnrichedOrder>应用程序的最终输出绑定。KStream三个部分函数(分别为、GlobalKTable、 )的输入GlobalKTable可在方法主体中使用,以将业务逻辑实现为 lambda 表达式的一部分。
输入绑定分别命名为enrichOrder-in-0、enrichOrder-in-1和enrichOrder-in-2。输出绑定命名为enrichOrder-out-0.
使用柯里化函数,您几乎可以拥有任意数量的输入。但是,请记住,除了 Java 中的上述少量输入和部分应用的函数之外,任何其他情况都可能导致代码不可读。因此,如果您的 Kafka Streams 应用程序需要相当少量的输入绑定,并且您想要使用此功能模型,那么您可能需要重新考虑您的设计并适当地分解应用程序。
输出绑定
KStreamKafka Streams 绑定器允许或类型KTable作为输出绑定。在幕后,绑定器使用toon 方法KStream将结果记录发送到输出主题。如果应用程序KTable在函数中提供 a 作为输出,绑定器仍然通过委托给to的方法来使用此技术KStream。
例如,下面的两个函数都可以工作:
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
多个输出绑定
Kafka Streams 允许将出站数据写入多个主题。此功能在 Kafka Streams 中称为分支。当使用多个输出绑定时,您需要提供一个 KStream ( ) 数组KStream[]作为出站返回类型。
这是一个例子:
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {
Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");
return input -> {
final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, value) -> value)
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.as("WordCounts-branch"))
.toStream()
.map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
new Date(key.window().start()), new Date(key.window().end()))))
.split()
.branch(isEnglish)
.branch(isFrench)
.branch(isSpanish)
.noDefaultBranch();
return stringKStreamMap.values().toArray(new KStream[0]);
};
}
编程模型保持不变,但出站参数化类型为KStream[]。上述函数的默认输出绑定名称分别为、process-out-0。绑定器之所以生成三个输出绑定,是因为它检测到返回数组的长度为 3。请注意,在此示例中,我们提供了一个; 如果我们改为使用,则需要额外的输出绑定,本质上返回一个长度为 4 的数组。process-out-1process-out-2KStreamnoDefaultBranch()defaultBranch()KStream
Kafka Streams 基于函数的编程风格总结
总之,下表显示了可以在功能范例中使用的各种选项。
| 输入数量 | 输出数量 | 使用的组件 |
|---|---|---|
1 |
0 |
java.util.function.Consumer |
2 |
0 |
java.util.function.BiConsumer |
1 |
1..n |
java.util.function.Function |
2 |
1..n |
java.util.function.BiFunction |
>= 3 |
0..n |
使用柯里化函数 |
-
如果该表中有多个输出,则类型将变为
KStream[]。
Kafka Streams Binder 中的函数组合
Kafka Streams 绑定器支持线性拓扑的最小形式的功能组合。使用 Java 函数式 API 支持,您可以编写多个函数,然后使用该方法自行组合它们andThen。例如,假设您有以下两个函数。
@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
return input -> input.peek((s, s2) -> {});
}
@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
return input -> input.peek((s, s2) -> {});
}
即使粘合剂中没有功能组合支持,您也可以按如下方式组合这两个功能。
@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
foo().andThen(bar());
}
然后您可以提供表单的定义spring.cloud.function.definition=foo;bar;composed。通过绑定器中的函数组合支持,您无需编写要在其中进行显式函数组合的第三个函数。
你可以简单地这样做:
spring.cloud.function.definition=foo|bar
你甚至可以这样做:
spring.cloud.function.definition=foo|bar;foo;bar
在此示例中,组合函数的默认绑定名称变为foobar-in-0和foobar-out-0。
Kafka Streams bincer 中功能组合的局限性
当你有java.util.function.Functionbean 时,它可以与另一个函数或多个函数组合。java.util.function.Consumer相同的功能bean也可以由a组成。在这种情况下,消费者是最后组成的组件。一个函数可以由多个函数组成,然后java.util.function.Consumer也以一个 bean 结尾。
当组合 类型的 beans 时java.util.function.BiFunction,BiFunction必须是定义中的第一个函数。组成的实体必须是 或java.util.function.Function类型java.util.funciton.Consumer。换句话说,你不能拿一个BiFunctionbean然后与另一个bean组合BiFunction。
您不能使用第一个组件的类型BiConsumer或定义进行组合。Consumer您也不能使用输出为数组(KStream[]用于分支)的函数进行组合,除非这是定义中的最后一个组件。
函数定义中的第一个 也可以使用柯里化Function形式。BiFunction例如,以下是可能的。
@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
return a -> b ->
a.join(b, (value1, value2) -> value1 + value2);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}
函数定义可以是curriedFoo|bar. 在幕后,绑定器将为柯里化函数创建两个输入绑定,并根据定义中的最终函数创建一个输出绑定。在这种情况下,默认输入绑定将是curriedFoobar-in-0和curriedFoobar-in-1。此示例的默认输出绑定变为curriedFoobar-out-0.
KTable在函数组合中用作输出的特别注意事项
假设您有以下两个功能。
@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
return KStream::toTable;
};
}
@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
return KTable::toStream;
}
您可以将它们组成为foo|bar,但请记住,第二个函数(bar在本例中)必须具有KTable作为输入,因为第一个函数 ( foo) 具有KTable作为输出。
3.4. 编程模型的辅助工具
3.4.1. 单个应用程序中的多个 Kafka Streams 处理器
Binder 允许在单个 Spring Cloud Stream 应用程序中拥有多个 Kafka Streams 处理器。您可以按如下方式申请。
@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
...
}
在这种情况下,绑定器将创建 3 个具有不同应用程序 ID 的独立 Kafka Streams 对象(更多内容见下文)。但是,如果应用程序中有多个处理器,则必须告诉 Spring Cloud Stream,需要激活哪些功能。以下是激活功能的方法。
spring.cloud.function.definition: process;anotherProcess;yetAnotherProcess
如果您不希望立即激活某些功能,您可以从此列表中删除该功能。
当您在同一应用程序中拥有单个 Kafka Streams 处理器和Function通过不同绑定器处理的其他类型的 Bean(例如,基于常规 Kafka 消息通道绑定器的函数 Bean)时,情况也是如此。
3.4.2. Kafka 流应用程序 ID
应用程序 ID 是您需要为 Kafka Streams 应用程序提供的强制属性。Spring Cloud Stream Kafka Streams 绑定器允许您以多种方式配置此应用程序 ID。
如果应用程序中只有一个处理器,则可以使用以下属性在绑定器级别进行设置:
spring.cloud.stream.kafka.streams.binder.applicationId。
为方便起见,如果您只有一个处理器,您还可以使用spring.application.name属性来委托应用程序 ID。
如果应用程序中有多个 Kafka Streams 处理器,则需要为每个处理器设置应用程序 ID。对于功能模型,您可以将其作为属性附加到每个功能。
例如,假设您具有以下功能。
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
...
}
和
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
...
}
然后,您可以使用以下活页夹级别属性为每个设置应用程序 ID。
spring.cloud.stream.kafka.streams.binder.functions.process.applicationId
和
spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationId
对于基于函数的模型,这种在绑定级别设置应用程序 ID 的方法也将起作用。但是,如果您使用函数模型,则如我们上面所见,在绑定器级别设置每个函数会容易得多。
对于生产部署,强烈建议通过配置显式指定应用程序 ID。如果您要自动扩展应用程序,这一点尤其重要,在这种情况下,您需要确保使用相同的应用程序 ID 部署每个实例。
如果应用程序未提供应用程序 ID,那么在这种情况下,绑定程序将自动为您生成静态应用程序 ID。这在开发场景中很方便,因为它避免了显式提供应用程序 ID 的需要。以这种方式生成的应用程序 ID 在应用程序重新启动时将是静态的。在功能模型的情况下,生成的应用程序 ID 将是函数 bean 名称后跟文字applicationID,例如process-applicationIDif processif 函数 bean 名称。
设置应用程序 ID 的摘要
-
默认情况下,binder 会根据函数方法自动生成应用程序 ID。
-
如果您有单个处理器,则可以使用
spring.kafka.streams.applicationId,spring.application.name或spring.cloud.stream.kafka.streams.binder.applicationId。 -
如果您有多个处理器,则可以使用属性 - 为每个函数设置应用程序 ID
spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId。
3.4.3. 使用函数样式覆盖绑定器生成的默认绑定名称
默认情况下,在使用函数式风格时,绑定器使用上面讨论的策略生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n],例如 process-in -0、process-out-0 等。如果要覆盖这些绑定名称,可以通过指定以下属性来实现。
spring.cloud.stream.function.bindings.<default binding name>。默认绑定名称是活页夹生成的原始绑定名称。
例如,可以说,你有这个功能。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
Binder 将生成名称为 、process-in-0和process-in-1的绑定process-out-0。现在,如果您想将它们完全更改为其他名称,也许是更多特定于域的绑定名称,那么您可以按如下方式执行此操作。
spring.cloud.stream.function.bindings.process-in-0=users
spring.cloud.stream.function.bindings.process-in-0=regions
和
spring.cloud.stream.function.bindings.process-out-0=clicks
之后,您必须在这些新绑定名称上设置所有绑定级别属性。
请记住,使用上述函数式编程模型,在大多数情况下遵守默认绑定名称是有意义的。您可能仍然想要执行此覆盖的唯一原因是当您有大量配置属性并且您希望将绑定映射到更适合域的内容时。
3.4.4. 设置引导服务器配置
运行 Kafka Streams 应用程序时,您必须提供 Kafka Broker 服务器信息。如果您不提供此信息,绑定器预计您以默认值运行代理localhost:9092。如果情况并非如此,那么您需要覆盖它。有几种方法可以做到这一点。
-
使用启动属性 -
spring.kafka.bootstrapServers -
活页夹级属性 -
spring.cloud.stream.kafka.streams.binder.brokers
当涉及到 Binder 级别属性时,是否使用通过常规 Kafka Binder 提供的 Broker 属性并不重要spring.cloud.stream.kafka.binder.brokers。Kafka Streams Binder 将首先检查 Kafka Streams Binder 特定代理属性是否已设置 ( spring.cloud.stream.kafka.streams.binder.brokers),如果未找到,则查找spring.cloud.stream.kafka.binder.brokers.
3.5. 记录序列化和反序列化
Kafka Streams 绑定器允许您以两种方式序列化和反序列化记录。一是Kafka提供的原生序列化和反序列化设施,二是Spring Cloud Stream框架的消息转换能力。让我们看一些细节。
3.5.1. 入站反序列化
密钥始终使用本机 Serdes 进行反序列化。
对于值,默认情况下,入站反序列化由 Kafka 本机执行。请注意,这是对以前版本的 Kafka Streams binder 的默认行为的重大更改,其中反序列化是由框架完成的。
SerdeKafka Streams 绑定器将尝试通过查看 的类型签名来推断匹配类型java.util.function.Function|Consumer。这是它与 Serdes 匹配的顺序。
-
如果应用程序提供 type 的 bean
Serde,并且返回类型使用传入键或值类型的实际类型进行参数化,则它将使用该类型Serde进行入站反序列化。例如,如果应用程序中有以下内容,绑定器会检测到传入值类型与KStreambean 上参数化的类型相匹配Serde。它将用于入站反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它查看类型并查看它们是否是 Kafka Streams 公开的类型之一。如果是这样,请使用它们。以下是绑定器将尝试从 Kafka Streams 匹配的 Serde 类型。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果Kafka Streams提供的Serdes中没有一个与类型不匹配,那么它将使用Spring Kafka提供的JsonSerde。在这种情况下,绑定器假定类型是 JSON 友好的。如果您有多个值对象作为输入,这非常有用,因为绑定器将在内部将它们推断为正确的 Java 类型。在回退到之前
JsonSerde,绑定器会检查SerdeKafka Streams 配置中设置的默认值,看看它是否Serde可以与传入的 KStream 类型匹配。
如果上述策略均不起作用,则应用程序必须Serde通过配置提供 s。这可以通过两种方式进行配置 - 绑定或默认。
首先,绑定器将查看是否Serde在绑定级别提供了 a。例如,如果您有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
Serde然后,您可以使用以下内容提供绑定级别:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果您Serde为每个输入绑定提供上述内容,那么这将具有更高的优先级,并且绑定器将远离任何Serde推理。
|
如果您希望使用默认键/值 Serdes 进行入站反序列化,您可以在绑定器级别执行此操作。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您不想要 Kafka 提供的本机解码,您可以依赖 Spring Cloud Stream 提供的消息转换功能。由于本机解码是默认的,为了让 Spring Cloud Stream 反序列化入站值对象,您需要显式禁用本机解码。
例如,如果您具有与上述相同的 BiFunction 处理器,那么spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false
您需要单独禁用所有输入的本机解码。否则,本机解码仍将应用于您未禁用的解码。
默认情况下,Spring Cloud Stream 将用作application/json内容类型并使用适当的 json 消息转换器。您可以通过使用以下属性和适当的MessageConverterbean 来使用自定义消息转换器。
spring.cloud.stream.bindings.process-in-0.contentType
3.5.2. 出站序列化
出站序列化几乎遵循与上述入站反序列化相同的规则。与入站反序列化一样,与之前版本的 Spring Cloud Stream 相比,一个主要变化是出站序列化由 Kafka 本地处理。在 Binder 3.0 版本之前,这是由框架本身完成的。
Serde出站的键始终由 Kafka 使用绑定器推断的匹配进行序列化。如果它无法推断密钥的类型,则需要使用配置来指定。
使用与入站反序列化相同的规则来推断值 Serdes。首先,它匹配以查看出站类型是否来自应用程序中提供的 bean。如果没有,它会检查它是否与 Kafka 公开的匹配,例如Serde- Integer、Long、Short、Double、Float、byte[]和UUID。String如果这不起作用,那么它会回退到JsonSerdeSpring Kafka 项目提供的,但首先查看默认Serde配置以查看是否有匹配。请记住,所有这些对应用程序来说都是透明的。如果这些都不起作用,那么用户必须提供Serde配置来使用。
假设您使用的是BiFunction与上面相同的处理器。然后您可以按如下方式配置出站键/值 Serdes。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果 Serde 推理失败,并且未提供绑定级别 Serdes,则绑定程序将回退到JsonSerde,但查看默认 Serdes 是否匹配。
默认 Serdes 的配置方式与上面反序列化下描述的方式相同。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果您的应用程序使用分支功能并具有多个输出绑定,则必须针对每个绑定进行配置。再次强调,如果绑定器能够推断Serde类型,则不需要执行此配置。
如果您不想要 Kafka 提供的本机编码,但想要使用框架提供的消息转换,那么您需要显式禁用本机编码,因为本机编码是默认的。例如,如果您具有与上述相同的 BiFunction 处理器,那么spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false
您需要在分支的情况下单独禁用所有输出的本机编码。否则,本机编码仍将应用于您未禁用的编码。
当 Spring Cloud Stream 完成转换时,默认情况下,它将用作application/json内容类型并使用适当的 json 消息转换器。您可以通过使用以下属性和相应的MessageConverterbean 来使用自定义消息转换器。
spring.cloud.stream.bindings.process-out-0.contentType
当禁用本机编码/解码时,binder 将不会像本机 Serdes 那样进行任何推理。应用程序需要显式提供所有配置选项。因此,在编写 Spring Cloud Stream Kafka Streams 应用程序时,通常建议保留反/序列化的默认选项,并坚持使用 Kafka Streams 提供的本机反/序列化。您必须使用框架提供的消息转换功能的一种情况是当您的上游生产者使用特定的序列化策略时。在这种情况下,您需要使用匹配的反序列化策略,因为本机机制可能会失败。当依赖默认Serde机制时,应用程序必须确保活页夹能够正确映射入站和出站Serde,否则可能会失败。
值得一提的是,上面概述的数据反/序列化方法仅适用于处理器的边缘,即入站和出站。您的业务逻辑可能仍然需要调用明确需要对象的 Kafka Streams API Serde。这些仍然是应用程序的责任,并且必须由开发人员进行相应处理。
3.6. 错误处理
Apache Kafka Streams 提供本机处理反序列化错误异常的功能。有关此支持的详细信息,请参阅此。Apache Kafka Streams 开箱即用,提供两种反序列化异常处理程序 -LogAndContinueExceptionHandler和LogAndFailExceptionHandler。顾名思义,前者将记录错误并继续处理下一条记录,后者将记录错误并失败。LogAndFailExceptionHandler是默认的反序列化异常处理程序。
3.6.1. 处理 Binder 中的反序列化异常
Kafka Streams 绑定器允许使用以下属性指定上面的反序列化异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或者
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述两个反序列化异常处理程序之外,绑定器还提供了第三个异常处理程序,用于将错误记录(毒丸)发送到 DLQ(死信队列)主题。以下是启用此 DLQ 异常处理程序的方法。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
设置上述属性后,所有反序列化错误的记录都会自动发送到DLQ主题。
您可以设置发布 DLQ 消息的主题名称,如下所示。
您可以提供DlqDestinationResolver一个函数式接口的实现。
DlqDestinationResolver将ConsumerRecord异常作为输入,然后允许指定主题名称作为输出。通过访问 Kafka ConsumerRecord,可以在BiFunction.
以下是提供 的实现的示例DlqDestinationResolver。
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在提供实现时要记住的一件重要事情DlqDestinationResolver是活页夹中的配置程序不会自动为应用程序创建主题。这是因为绑定器无法推断实现可能发送到的所有 DLQ 主题的名称。因此,如果您使用此策略提供 DLQ 名称,则应用程序有责任确保预先创建这些主题。
如果DlqDestinationResolver作为 bean 存在于应用程序中,则具有更高的优先级。如果您不想遵循此方法而是使用配置提供静态 DLQ 名称,则可以设置以下属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果设置了此选项,则错误记录将发送到主题custom-dlq。如果应用程序未使用上述任一策略,那么它将创建一个名为 的 DLQ 主题error.<input-topic-name>.<application-id>。例如,如果您的绑定的目标主题是inputTopic且应用程序 ID 是process-applicationId,则默认 DLQ 主题是error.inputTopic.process-applicationId。如果您打算启用 DLQ,则始终建议为每个输入绑定显式创建一个 DLQ 主题。
3.6.2. 每个输入消费者绑定的 DLQ
该属性spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用程序。这意味着如果同一应用程序中有多个函数,则此属性将应用于所有函数。但是,如果您有多个处理器或单个处理器内有多个输入绑定,则可以使用绑定器为每个输入使用者绑定提供的更细粒度的 DLQ 控制。
如果您有以下处理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
并且您只想在第一个输入绑定上启用 DLQ 并在第二个输入绑定上启用skipAndContinue,然后您可以在使用者上执行以下操作。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
以这种方式设置反序列化异常处理程序比在绑定器级别设置具有更高的优先级。
3.6.3. DLQ 分区
默认情况下,记录使用与原始记录相同的分区发布到死信主题。这意味着死信主题必须至少具有与原始记录一样多的分区。
要更改此行为,请将实现添加DlqPartitionFunction到@Bean应用程序上下文中。只能存在一颗这样的 bean。该函数提供了消费者组(大多数情况下与应用程序ID相同)、失败ConsumerRecord和异常。例如,如果您总是想路由到分区 0,您可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果将使用者绑定的dlqPartitions属性设置为 1(并且绑定器的属性minPartitionCount等于1),则无需提供DlqPartitionFunction; 框架将始终使用分区 0。如果将使用者绑定的dlqPartitions属性设置为大于1(或绑定器的minPartitionCount属性大于1)的值,则必须提供一个DlqPartitionFunctionbean,即使分区计数与原始主题的相同。
|
在 Kafka Streams 绑定器中使用异常处理功能时需要记住以下几点。
-
该属性
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用程序。这意味着如果同一应用程序中有多个函数,则此属性将应用于所有函数。 -
反序列化的异常处理与本机反序列化和框架提供的消息转换一致。
3.6.4. 在 Binder 中处理生产异常
与上述对反序列化异常处理程序的支持不同,绑定器不提供用于处理生产异常的第一类机制。但是,您仍然可以使用StreamsBuilderFactoryBean定制器配置生产异常处理程序,您可以在下面的后续部分中找到更多详细信息。
3.7. 重试关键业务逻辑
在某些情况下,您可能希望重试对应用程序至关重要的部分业务逻辑。可能存在对关系数据库的外部调用或从 Kafka Streams 处理器调用 REST 端点。这些调用可能会因各种原因而失败,例如网络问题或远程服务不可用。更常见的是,如果您可以重试,这些故障可能会自行解决。默认情况下,Kafka Streams 绑定器RetryTemplate为所有输入绑定创建 bean。
如果该函数具有以下签名,
@Bean
public java.util.function.Consumer<KStream<Object, String>> process()
并且使用默认绑定名称,RetryTemplate将注册为process-in-0-RetryTemplate. process-in-0这遵循绑定名称 ( ) 后跟文字的约定-RetryTemplate。在多个输入绑定的情况下,RetryTemplate每个绑定将有一个单独的 bean 可用。RetryTemplate如果应用程序中有可用的自定义bean 并通过 提供spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName,则该自定义 bean 优先于任何输入绑定级别重试模板配置属性。
一旦RetryTemplate绑定被注入到应用程序中,它就可以用于重试应用程序的任何关键部分。这是一个例子:
@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
retryTemplate.execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
或者您可以使用RetryTemplate如下自定义。
@EnableAutoConfiguration
public static class CustomRetryTemplateApp {
@Bean
@StreamRetryTemplate
RetryTemplate fooRetryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(1);
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
return input -> input
.process(() -> new Processor<Object, String>() {
@Override
public void init(ProcessorContext processorContext) {
}
@Override
public void process(Object o, String s) {
fooRetryTemplate().execute(context -> {
//Critical business logic goes here.
});
}
@Override
public void close() {
}
});
}
}
请注意,当重试次数耗尽时,默认情况下将抛出最后一个异常,导致处理器终止。如果您希望处理异常并继续处理,您可以向该方法添加 RecoveryCallback execute: 这是一个示例。
retryTemplate.execute(context -> {
//Critical business logic goes here.
}, context -> {
//Recovery logic goes here.
return null;
));
有关 RetryTemplate、重试策略、退避策略等的更多信息,请参阅Spring Retry项目。
3.8. 国营商店
当使用高级 DSL 并进行适当的调用来触发状态存储时,Kafka Streams 会自动创建状态存储。
如果您希望将传入KTable绑定具体化为命名状态存储,则可以使用以下策略来实现。
假设您有以下功能。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
然后通过设置以下属性,传入的KTable数据将被具体化到指定的状态存储中。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
您可以将自定义状态存储定义为应用程序中的 bean,这些状态存储将由活页夹检测并添加到 Kafka Streams 构建器中。特别是当使用处理器API时,需要手动注册一个状态存储。为此,您可以在应用程序中将 StateStore 创建为 bean。以下是定义此类 Bean 的示例。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
然后应用程序可以直接访问这些状态存储。
在引导期间,上述 bean 将由绑定器处理并传递给 Streams 构建器对象。
访问状态存储:
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
当涉及到注册全局状态存储时,这将不起作用。要注册全局状态存储,请参阅下面有关自定义的部分StreamsBuilderFactoryBean。
3.9. 交互式查询
Kafka Streams Binder API 公开了一个类,用于InteractiveQueryService交互式查询状态存储。您可以在应用程序中将其作为 Spring bean 进行访问。从应用程序访问此 bean 的一种简单方法是访问autowirebean。
@Autowired
private InteractiveQueryService interactiveQueryService;
一旦获得此 bean 的访问权限,您就可以查询您感兴趣的特定状态存储。见下文。
ReadOnlyKeyValueStore<Object, Object> keyValueStore =
interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());
在启动期间,上述检索存储的方法调用可能会失败。例如,它可能仍在初始化状态存储中。在这种情况下,重试此操作会很有用。Kafka Streams 绑定器提供了一个简单的重试机制来适应这种情况。
以下是可用于控制重试的两个属性。
-
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - 默认为
1。 -
spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - 默认为
1000毫秒。
如果有多个 kafka 流应用程序实例正在运行,那么在以交互方式查询它们之前,您需要确定哪个应用程序实例托管您正在查询的特定密钥。
InteractiveQueryServiceAPI提供了识别主机信息的方法。
为了使其工作,您必须配置该属性,application.server如下所示:
spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>
以下是一些代码片段:
org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
key, keySerializer);
if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {
//query from the store that is locally available
}
else {
//query from the remote host
}
有关这些主机查找方法的更多信息,请参阅有关这些方法的 Javadoc。同样对于这些方法,在启动期间,如果底层 KafkaStreams 对象未准备好,它们可能会抛出异常。上述重试属性也适用于这些方法。
3.9.1. 通过 InteractiveQueryService 可用的其他 API 方法
使用以下 API 方法检索KeyQueryMetadata与给定存储和密钥的组合关联的对象。
public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)
使用以下 API 方法检索KakfaStreams与给定存储和密钥的组合关联的对象。
public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)
3.9.2. 自定义存储查询参数
有时需要在通过 查询商店之前对商店查询参数进行微调InteractiveQueryService。为此,从4.0.1绑定器的版本开始,您可以提供一个 bean,StoreQueryParametersCustomizer该 bean 是一个带有以customizea 作为StoreQueryParameter参数的方法的函数接口。这是它的方法签名。
StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);
使用这种方法,应用程序可以进一步定制,StoreQueryParameters例如启用过时的存储。
当此 bean 出现在该应用程序中时,将在查询状态存储之前InteractiveQueryService调用其方法。customize
请记住,StoreQueryParametersCustomizer应用程序中必须有一个可用的唯一 bean。
|
3.10. 健康指标
健康指标需要依赖性spring-boot-starter-actuator。对于行家使用:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
Spring Cloud Stream Kafka Streams Binder 提供了一个健康指示器来检查底层流线程的状态。Spring Cloud Stream定义了一个属性management.health.binders.enabled来启用健康指示器。请参阅
Spring Cloud Stream 文档。
运行状况指示器为每个流线程的元数据提供以下详细信息:
-
线程名称
-
线程状态:
CREATED、RUNNING、PARTITIONS_REVOKED、PARTITIONS_ASSIGNED、PENDING_SHUTDOWN或DEAD -
活动任务:任务 ID 和分区
-
备用任务:任务ID和分区
默认情况下,只有全局状态可见(UP或DOWN)。要显示详细信息,该属性management.endpoint.health.show-details必须设置为ALWAYS或WHEN_AUTHORIZED。有关健康信息的更多详细信息,请参阅
Spring Boot Actuator 文档。
健康指标的状态是UP所有注册的Kafka线程是否都处于该RUNNING状态。
|
由于 Kafka Streams 绑定器中有 3 个单独的绑定器(KStream、KTable和GlobalKTable),因此它们都会报告健康状态。启用时show-details,报告的某些信息可能是多余的。
当同一应用程序中存在多个 Kafka Streams 处理器时,将报告所有这些处理器的运行状况检查,并按 Kafka Streams 的应用程序 ID 进行分类。
3.11. 访问 Kafka Streams 指标
Spring Cloud Stream Kafka Streams 绑定器提供了可以通过 Micrometer 导出的 Kafka Streams 指标MeterRegistry。
对于 Spring Boot 2.2.x 版本,指标支持是通过绑定器通过自定义 Micrometer 指标实现提供的。对于 Spring Boot 2.3.x 版本,Kafka Streams 指标支持是通过 Micrometer 原生提供的。
通过启动执行器端点访问指标时,请确保添加metrics到属性management.endpoints.web.exposure.include。然后,您可以访问/acutator/metrics以获取所有可用指标的列表,然后可以通过相同的 URI ( /actuator/metrics/<metric-name>) 单独访问这些指标。
3.12. 混合高级 DSL 和低级处理器 API
Kafka Streams 提供两种 API 变体。它具有类似于 API 的更高级别 DSL,您可以在其中链接许多函数式程序员可能熟悉的各种操作。Kafka Streams 还提供对低级处理器 API 的访问。处理器 API 虽然非常强大并且能够在较低级别上控制事物,但本质上是必不可少的。Spring Cloud Stream 的 Kafka Streams 绑定器允许您使用高级 DSL 或混合 DSL 和处理器 API。混合这两种变体为您提供了很多选项来控制应用程序中的各种用例。应用程序可以使用transform或process方法 API 调用来访问处理器 API。
下面介绍了如何使用 API 在 Spring Cloud Stream 应用程序中组合 DSL 和处理器 API process。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
这是使用 API 的示例transform。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
APIprocess方法调用是终端操作,而transformAPI 是非终端操作,它为您提供了潜在的转换KStream,您可以使用 DSL 或处理器 API 继续进一步处理。
3.13. 出站分区支持
Kafka Streams 处理器通常将处理后的输出发送到出站 Kafka 主题中。如果出站主题已分区并且处理器需要将传出数据发送到特定分区,则应用程序需要提供类型为 的 bean StreamPartitioner。有关更多详细信息,请参阅StreamPartitioner 。让我们看一些例子。
这是我们已经多次看到的同一个处理器,
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
这是输出绑定目标:
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
如果主题outputTopic有 4 个分区,如果您不提供分区策略,Kafka Streams 将使用默认分区策略,根据特定用例,这可能不是您想要的结果。比方说,您想要将匹配的任何密钥发送到spring分区 0、cloud分区 1、stream分区 2,并将其他所有内容发送到分区 3。这就是您需要在应用程序中执行的操作。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
这是一个基本的实现,但是,您可以访问记录的键/值、主题名称和分区总数。因此,如果需要,您可以实施复杂的分区策略。
您还需要提供此 bean 名称以及应用程序配置。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
应用程序中的每个输出主题都需要像这样单独配置。
3.14。StreamsBuilderFactoryBean 定制器
通常需要自定义StreamsBuilderFactoryBean创建KafkaStreams对象的方法。基于 Spring Kafka 提供的底层支持,binder 允许您自定义StreamsBuilderFactoryBean. 您可以使用 来自StreamsBuilderFactoryBeanCustomizer定义其StreamsBuilderFactoryBean本身。StreamsBuilderFactoryBean然后,一旦您通过此定制器访问了,您就可以KafkaStreams使用 来定制相应的KafkaStreamsCustomzier. 这两个定制器都是 Spring for Apache Kafka 项目的一部分。
下面是一个使用 的示例StreamsBuilderFactoryBeanCustomizer。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return sfb -> sfb.setStateListener((newState, oldState) -> {
//Do some action here!
});
}
上图展示了您可以执行哪些操作来自定义StreamsBuilderFactoryBean. 您基本上可以调用任何可用的突变操作来自StreamsBuilderFactoryBean定义它。该定制器将在工厂 bean 启动之前由绑定器调用。
一旦您访问了StreamsBuilderFactoryBean,您还可以自定义底层KafkaStreams对象。这是这样做的蓝图。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
};
}
KafkaStreamsCustomizerStreamsBuilderFactoryBeabn将在底层KafkaStreams启动之前被右侧调用。
StreamsBuilderFactoryBeanCustomizer整个应用程序中只能有一个。那么我们如何考虑多个 Kafka Streams 处理器,因为每个处理器都由单独的对象备份StreamsBuilderFactoryBean?在这种情况下,如果这些处理器的定制需要不同,那么应用程序需要根据应用程序 ID 应用一些过滤器。
例如,
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
.equals("processor1-application-id")) {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setUncaughtExceptionHandler((t, e) -> {
});
}
});
}
};
3.14.1. 使用定制器注册全局状态存储
如上所述,绑定器不提供将全局状态存储注册为功能的一流方法。为此,您需要使用定制器。以下是如何做到这一点。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
try {
final StreamsBuilder streamsBuilder = fb.getObject();
streamsBuilder.addGlobalStore(...);
}
catch (Exception e) {
}
};
}
同样,如果您有多个处理器,您希望通过使用如上所述的应用程序 IDStreamsBuilder过滤掉其他对象,将全局状态存储附加到右侧。StreamsBuilderFactoryBean
3.14.2. 使用定制器注册生产异常处理程序
在错误处理部分,我们指出绑定器没有提供一流的方法来处理生产异常。尽管如此,您仍然可以使用StreamsBuilderFacotryBean定制器来注册生产异常处理程序。见下文。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
CustomProductionExceptionHandler.class);
};
}
再说一遍,如果您有多个处理器,您可能需要根据正确的StreamsBuilderFactoryBean. 您还可以使用配置属性添加此类生产异常处理程序(有关更多信息,请参阅下文),但如果您选择采用编程方法,则这是一个选项。
3.15。时间戳提取器
Kafka Streams 允许您基于各种时间戳概念来控制消费者记录的处理。默认情况下,Kafka Streams 提取嵌入在消费者记录中的时间戳元数据。TimestampExtractor您可以通过为每个输入绑定提供不同的实现来更改此默认行为。以下是有关如何完成此操作的一些详细信息。
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
TimestampExtractor然后为每个消费者绑定设置上述bean 名称。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果您跳过输入使用者绑定来设置自定义时间戳提取器,则该使用者将使用默认设置。
3.16。具有基于 Kafka Streams 的绑定器和常规 Kafka Binder 的多绑定器
您可以拥有一个应用程序,其中既具有基于常规 Kafka 绑定器的函数/消费者/供应商,又具有基于 Kafka Streams 的处理器。但是,您不能将它们混合在一个函数或消费者中。
这是一个示例,其中同一应用程序中同时具有基于活页夹的组件。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
这是配置中的相关部分:
spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
如果您有与上面相同的应用程序,但正在处理两个不同的 Kafka 集群,那么事情会变得更加复杂,例如常规程序同时作用于processKafka 集群 1 和集群 2(从 cluster-1 接收数据并将数据发送到 cluster-) 2)并且 Kafka Streams 处理器作用于 Kafka 集群 2。然后您必须使用Spring Cloud Stream 提供的多绑定器设施。
以下是您的配置在这种情况下可能发生的变化。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
注意上面的配置。我们有两种绑定器,但总共有 3 个绑定器,第一个是基于集群 1 ( kafka1) 的常规 Kafka 绑定器,然后是另一个基于集群 2 ( kafka2) 的 Kafka 绑定器,最后是kstream一个 ( kafka3)。应用程序中的第一个处理器从两个绑定程序接收数据 kafka1并将数据发布到kafka2两个绑定程序基于常规 Kafka 绑定程序但不同的集群。第二个处理器是 Kafka Streams 处理器,它使用与kafka3相同的集群kafka2但不同的绑定器类型的数据。
由于 Kafka Streams 绑定器系列中提供了三种不同的绑定器类型 -kstream和ktable-globalktable如果您的应用程序具有基于任何这些绑定器的多个绑定,则需要将其显式提供为绑定器类型。
例如,如果您有如下处理器,
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
然后,必须在多绑定器场景中进行如下配置。请注意,只有当您有真正的多绑定程序场景时才需要这样做,其中有多个处理器在单个应用程序中处理多个集群。在这种情况下,需要显式地为绑定器提供绑定,以区别于其他处理器的绑定器类型和簇。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.
3.17。状态清理
默认情况下,当绑定停止时,不会清除任何本地状态。这与 Spring Kafka 2.7 版中有效的行为相同。有关更多详细信息,请参阅Spring Kafka 文档。要修改此行为,只需向应用程序上下文添加一个CleanupConfig @Bean(配置为在启动、停止或两者都不清除时进行清理)即可;该 bean 将被检测并连接到工厂 bean 中。
3.18。Kafka Streams拓扑可视化
Kafka Streams 绑定器提供以下执行器端点来检索拓扑描述,您可以使用外部工具可视化拓扑。
/actuator/kafkastreamstopology
/actuator/kafkastreamstopology/<application-id of the processor>
您需要包含 Spring Boot 中的执行器和 Web 依赖项才能访问这些端点。此外,您还需要添加kafkastreamstopology到management.endpoints.web.exposure.include属性。默认情况下,kafkastreamstopology端点处于禁用状态。
3.19。Kafka Streams 应用程序中基于事件类型的路由
Kafka Streams 绑定器不支持基于常规消息通道的绑定器中可用的路由功能。但是,Kafka Streams Binder 仍然通过入站记录上的事件类型记录标头提供路由功能。
要启用基于事件类型的路由,应用程序必须提供以下属性。
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes。
这可以是逗号分隔值。
例如,假设我们有这个函数:
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
foo我们还假设,如果传入记录的事件类型为或,我们只希望执行此函数中的业务逻辑bar。eventTypes这可以使用绑定上的属性表达如下。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
现在,当应用程序运行时,绑定器会检查每个传入记录的标头event_type,并查看其值是否设置为foo或bar。如果没有找到其中任何一个,则将跳过函数执行。
默认情况下,绑定器期望记录头键为event_type,但可以根据绑定进行更改。例如,如果我们想要更改此绑定上的标头键而my_event不是默认值,则可以按如下方式进行更改。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event。
在 Kafkfa Streams 绑定器中使用事件路由功能时,它使用字节数组Serde来反序列化所有传入记录。如果记录标头与事件类型匹配,则只有它使用实际的Serde配置或推断的来进行正确的反序列化Serde。如果您在绑定上设置反序列化异常处理程序,则会出现问题,因为预期的反序列化仅发生在堆栈中,从而导致意外错误。为了解决此问题,您可以在绑定上设置以下属性,以强制绑定器使用配置的或推断的Serde而不是字节数组Serde。
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
这样,应用程序可以在使用事件路由功能时立即检测到反序列化问题,并可以采取适当的处理决策。
3.20。Kafka Streams 绑定器中的绑定可视化和控制
从3.1.2版本开始,Kafka Streams Binder支持绑定可视化和控制。唯一支持的两个生命周期阶段是STOPPED和STARTED。生命周期阶段PAUSED和RESUMED在 Kafka Streams 绑定器中不可用。
为了激活绑定可视化和控制,应用程序需要包含以下两个依赖项。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
如果您更喜欢使用 webflux,则可以包含spring-boot-starter-webflux而不是标准的 Web 依赖项。
此外,还需要设置以下属性:
management.endpoints.web.exposure.include=bindings
为了进一步说明此功能,让我们使用以下应用程序作为指导:
@SpringBootApplication
public class KafkaStreamsApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaStreamsApplication.class, args);
}
@Bean
public Consumer<KStream<String, String>> consumer() {
return s -> s.foreach((key, value) -> System.out.println(value));
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> function() {
return ks -> ks;
}
}
正如我们所看到的,该应用程序有两个 Kafka Streams 函数 - 一个是消费者,另一个是函数。消费者绑定默认命名为consumer-in-0。同样,对于该函数,输入绑定是function-in-0,输出绑定是function-out-0。
应用程序启动后,我们可以使用以下绑定端点查找有关绑定的详细信息。
curl http://localhost:8080/actuator/bindings | jq .
[
{
"bindingName": "consumer-in-0",
"name": "consumer-in-0",
"group": "consumer-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-in-0",
"name": "function-in-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": true,
"extendedInfo": {}
},
{
"bindingName": "function-out-0",
"name": "function-out-0",
"group": "function-applicationId",
"pausable": false,
"state": "running",
"paused": false,
"input": false,
"extendedInfo": {}
}
]
有关所有三个绑定的详细信息可以在上面找到。
现在让我们停止consumer-in-0 绑定。
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
此时,通过此绑定将不会收到任何记录。
重新开始绑定。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0
当单个函数上存在多个绑定时,对任何这些绑定调用这些操作都将起作用。这是因为单个函数上的所有绑定都由相同的StreamsBuilderFactoryBean. 因此,对于上面的函数,或者function-in-0或 都function-out-0可以。
3.21。手动启动 Kafka Streams 处理器
Spring Cloud Stream Kafka Streams 绑定器提供了一个基于StreamsBuilderFactoryManagerSpring StreamsBuilderFactoryBeanfor Apache Kafka 的抽象。该管理器 API 用于控制StreamsBuilderFactoryBean基于 Binder 的应用程序中每个处理器的倍数。因此,在使用binder时,如果你想手动控制StreamsBuilderFactoryBean应用程序中各个对象的自动启动,则需要使用StreamsBuilderFactoryManager. 您可以使用该属性spring.kafka.streams.auto-startup并将其设置false为 以关闭处理器的自动启动。然后,在应用程序中,您可以使用如下所示的内容来启动处理器StreamsBuilderFactoryManager。
@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
return args -> {
sbfm.start();
};
}
当您希望应用程序在主线程中启动并让 Kafka Streams 处理器单独启动时,此功能非常方便。例如,当您有需要恢复的大型状态存储时,如果处理器按照默认情况正常启动,则可能会阻止您的应用程序启动。如果您正在使用某种活性探测机制(例如在 Kubernetes 上),它可能会认为应用程序已关闭并尝试重新启动。为了纠正这个问题,您可以设置spring.kafka.streams.auto-startup并false遵循上述方法。
请记住,在使用 Spring Cloud Stream 绑定器时,您不是直接StreamsBuilderFactoryBean从 Spring for Apache Kafka 进行处理,而是StreamsBuilderFactoryManager因为StreamsBuilderFactoryBean对象是由绑定器内部管理的。
3.22. 有选择地手动启动 Kafka Streams 处理器
虽然上述方法将通过 无条件地将自动启动false应用于应用程序中的所有 Kafka Streams 处理器StreamsBuilderFactoryManager,但通常希望仅单独选择的 Kafka Streams 处理器不自动启动。例如,假设您的应用程序中有三个不同的功能(处理器),并且对于其中一个处理器,您不希望将其作为应用程序启动的一部分启动。下面是这种情况的一个例子。
@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {
}
@Bean
public Consumer<KStream<?, ?>> process2() {
}
@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {
}
在上面的这种情况下,如果您设置spring.kafka.streams.auto-startup为false,则在应用程序启动期间不会自动启动任何处理器。在这种情况下,您必须通过调用start()底层StreamsBuilderFactoryManager. 但是,如果我们有一个用例选择性地仅禁用一个处理器,那么您必须auto-startup为该处理器设置单独的绑定。让我们假设我们不希望我们的process3函数自动启动。这是BiFunction具有两个输入绑定 -process3-in-0和 的process3-in-1。为了避免该处理器自动启动,您可以选择任何这些输入绑定并auto-startup对其进行设置。您选择哪种绑定并不重要;如果您愿意,可以将两者都设置为开启,但其中一个就足够了auto-startup。false因为它们共享相同的工厂 bean,所以您不必在两个绑定上将 autoStartup 设置为 false,但为了清楚起见,这样做可能是有意义的。
这是 Spring Cloud Stream 属性,您可以使用它来禁用该处理器的自动启动。
spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
或者
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
然后,您可以使用 REST 端点或使用 API 手动启动处理器,BindingsEndpoint如下所示。为此,您需要确保 Spring Boot 执行器对类路径有依赖性。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0
或者
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
有关此机制的更多详细信息,请参阅参考文档中的本节。
当按照本节所述通过禁用来控制绑定时auto-startup,请注意,这仅适用于消费者绑定。换句话说,如果您使用生产者绑定,process3-out-0则不会对禁用处理器的自动启动产生任何影响,尽管此生产者绑定使用与StreamsBuilderFactoryBean消费者绑定相同的功能。
|
3.23。使用 Spring Cloud Sleuth 进行跟踪
当 Spring Cloud Sleuth 位于基于 Spring Cloud Stream Kafka Streams 绑定器的应用程序的类路径上时,其使用者和生产者都会自动使用跟踪信息进行检测。然而,为了跟踪任何应用程序特定的操作,这些操作需要由用户代码显式检测。这可以通过KafkaStreamsTracing在应用程序中注入来自 Spring Cloud Sleuth 的 bean 来完成,然后通过此注入的 bean 调用各种 Kafka Streams 操作。以下是一些使用它的示例。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
return new KeyValue<>(value.getRegion(),
value.getClicks());
}))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
.toStream());
}
在上面的示例中,有两个地方添加了显式跟踪检测。首先,我们记录传入的键/值信息KStream。记录此信息时,关联的跨度和跟踪 ID 也会被记录,以便监控系统可以跟踪它们并与相同的跨度 ID 关联。其次,当我们调用一个操作时,我们map不是直接在类上调用它,而是将其包装在一个操作中,然后从. 在这种情况下,记录的消息也将包含跨度 ID 和跟踪 ID。KStreamtransformmapKafkaStreamsTracing
这是另一个示例,我们使用低级转换器 API 来访问各种 Kafka Streams 标头。当 spring-cloud-sleuth 位于类路径上时,所有跟踪标头也可以这样访问。
@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
return input -> input.transform(kafkaStreamsTracing.transformer(
"transformer-1",
() -> new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, String> transform(String key, String value) {
LOG.info("Headers: " + this.context.headers());
LOG.info("K/V:" + key + "/" + value);
// More transformations, business logic execution, etc. go here.
return KeyValue.pair(key, value);
}
@Override
public void close() {
}
}));
}
3.24。配置选项
本节包含 Kafka Streams 绑定器使用的配置选项。
有关绑定器的常见配置选项和属性,请参阅核心文档。
3.24.1. Kafka Streams 绑定器属性
以下属性在绑定器级别可用,并且必须以spring.cloud.stream.kafka.streams.binder.
任何 Kafka 绑定器为前缀,前提是在 Kafka Streams 绑定器中重用的属性必须以 为前缀,spring.cloud.stream.kafka.streams.binder而不是spring.cloud.stream.kafka.binder。此规则的唯一例外是定义 Kafka 引导服务器属性时,在这种情况下任一前缀都有效。
- configuration
-
具有键/值对的映射,其中包含与 Apache Kafka Streams API 相关的属性。该属性必须以 为前缀
spring.cloud.stream.kafka.streams.binder.。以下是使用此属性的一些示例。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
有关可能进入流配置的所有属性的更多信息,请参阅StreamsConfigApache Kafka Streams 文档中的 JavaDocs。您可以设置的所有配置都StreamsConfig可以通过此设置。使用此属性时,它适用于整个应用程序,因为这是绑定器级别属性。如果应用程序中有多个处理器,则所有处理器都将获得这些属性。对于像这样的属性application.id,这将成为问题,因此您必须仔细检查如何StreamsConfig使用此绑定级别属性来映射属性configuration。
- functions.<function-bean-name>.applicationId
-
仅适用于功能型处理器。这可用于设置应用程序中每个功能的应用程序 ID。在有多个功能的情况下,这是设置应用程序ID的便捷方法。
- functions.<function-bean-name>.configuration
-
仅适用于功能型处理器。具有键/值对的映射,其中包含与 Apache Kafka Streams API 相关的属性。这与上面描述的绑定器级别属性类似
configuration,但此级别的configuration属性仅限于指定的函数。当您有多个处理器并且想要根据特定功能限制对配置的访问时,您可能需要使用此功能。所有StreamsConfig属性都可以在这里使用。 - brokers
-
经纪商网址
默认:
localhost - zkNodes
-
动物园管理员网址
默认:
localhost - deserializationExceptionHandler
-
反序列化错误处理程序类型。该处理程序在绑定器级别应用,因此应用于应用程序中的所有输入绑定。有一种方法可以在消费者绑定层面进行更细粒度的控制。可能的值为 -
logAndContinue、logAndFail或skipAndContinuesendToDlq默认:
logAndFail - applicationId
-
在绑定级别全局设置 Kafka Streams 应用程序的 application.id 的便捷方法。如果应用程序包含多个功能,那么应用程序id应该设置不同。请参阅上面详细讨论设置应用程序 ID 的内容。
默认:应用程序将生成静态应用程序 ID。有关更多详细信息,请参阅应用程序 ID 部分。
- stateStoreRetry.maxAttempts
-
尝试连接到状态存储的最大尝试次数。
默认值:1
- stateStoreRetry.backoffPeriod
-
重试时尝试连接到状态存储时的退避期。
默认值:1000 毫秒
- consumerProperties
-
粘合剂级别的任意消费者属性。
- producerProperties
-
粘合剂级别的任意生产者属性。
- includeStoppedProcessorsForHealthCheck
-
当通过执行器停止处理器的绑定时,默认情况下该处理器将不参与健康检查。将此属性设置为
true可为所有处理器启用运行状况检查,包括当前通过绑定执行器端点停止的处理器。默认值:假
3.24.2. Kafka Streams 生产者属性
以下属性仅适用于 Kafka Streams 生产者,并且必须以 为前缀spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
。 为了方便起见,如果有多个输出绑定并且它们都需要一个通用值,则可以使用前缀进行配置spring.cloud.stream.kafka.streams.default.producer.。
- keySerde
-
要使用的密钥 Serde
默认值:参见上面关于消息反/序列化的讨论
- valueSerde
-
使用的值 serde
默认值:参见上面关于消息反/序列化的讨论
- useNativeEncoding
-
启用/禁用本机编码的标志
默认值:
true. - streamPartitionerBeanName
-
要在使用者处使用的自定义出站分区程序 bean 名称。应用程序可以提供自定义
StreamPartitioner的 Spring bean,并且可以将该 bean 的名称提供给生产者以代替默认的名称。默认值:请参阅上面有关出站分区支持的讨论。
- producedAs
-
处理器正在生成的接收器组件的自定义名称。
默认:(
none由 Kafka Streams 生成)
3.24.3。Kafka Streams 消费者属性
以下属性可供 Kafka Streams 消费者使用,并且必须以前缀 为spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
方便起见,如果有多个输入绑定并且它们都需要一个通用值,则可以使用前缀进行配置spring.cloud.stream.kafka.streams.default.consumer.。
- applicationId
-
设置每个输入绑定的 application.id。
默认值:见上文。
- keySerde
-
要使用的密钥 Serde
默认值:参见上面关于消息反/序列化的讨论
- valueSerde
-
使用的值 serde
默认值:参见上面关于消息反/序列化的讨论
- materializedAs
-
使用传入的 KTable 类型时要实现的状态存储
默认值:
none. - useNativeDecoding
-
启用/禁用本机解码的标志
默认值:
true. - dlqName
-
DLQ 主题名称。
默认值:请参阅上面有关错误处理和 DLQ 的讨论。
- startOffset
-
如果没有提交的要消耗的偏移量,则从该偏移量开始。这主要在消费者第一次从某个主题消费时使用。Kafka Streams 使用
earliest作为默认策略,binder 使用相同的默认策略。这可以被覆盖以latest使用此属性。默认值:
earliest.
注意:resetOffsets在消费者上使用不会对 Kafka Streams Binder 产生任何影响。与基于消息通道的绑定器不同,Kafka Streams 绑定器不会按需开始或结束。
- deserializationExceptionHandler
-
反序列化错误处理程序类型。该处理程序针对每个消费者绑定应用,而不是前面描述的绑定器级别属性。可能的值为 -
logAndContinue、logAndFail或skipAndContinuesendToDlq默认:
logAndFail - timestampExtractorBeanName
-
消费者使用的特定时间戳提取器 bean 名称。应用程序可以提供
TimestampExtractorSpring bean,并且可以将该 bean 的名称提供给消费者使用,而不是使用默认名称。默认值:请参阅上面有关时间戳提取器的讨论。
- eventTypes
-
此绑定支持的事件类型的逗号分隔列表。
默认:
none - eventTypeHeaderKey
-
每个传入记录上的事件类型标头键都通过此绑定。
默认:
event_type - consumedAs
-
处理器正在使用的源组件的自定义名称。
默认:(
none由 Kafka Streams 生成)
3.24.4. 关于并发的特别说明
在 Kafka Streams 中,您可以使用该属性控制处理器可以创建的线程数num.stream.threads。您可以使用上面描述的绑定器、函数、生产者或消费者级别下的各种configuration选项来完成此操作。您还可以使用concurrency核心 Spring Cloud Stream 提供的属性来实现此目的。使用这个的时候,需要用在消费者身上。当您有多个输入绑定时,请在第一个输入绑定上设置此项。例如,当设置 时spring.cloud.stream.bindings.process-in-0.consumer.concurrency,它将num.stream.threads被活页夹翻译为。如果您有多个处理器,并且其中一个处理器定义了绑定级别并发性,但其他处理器没有定义,则那些没有绑定级别并发性的处理器将默认返回通过 指定的绑定器范围属性
spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads。如果此绑定器配置不可用,则应用程序将使用 Kafka Streams 设置的默认值。
4. 提示、技巧和食谱
4.1. 使用 Kafka 进行简单的 DLQ
4.1.1. 问题陈述
作为一名开发人员,我想编写一个消费者应用程序来处理来自 Kafka 主题的记录。但是,如果处理过程中发生一些错误,我不希望应用程序完全停止。相反,我想将错误记录发送到 DLT(死信主题),然后继续处理新记录。
4.1.2. 解决方案
这个问题的解决方案是使用Spring Cloud Stream中的DLQ功能。为了讨论的目的,我们假设以下是我们的处理器函数。
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
这是一个非常简单的函数,它会为其处理的所有记录引发异常,但您可以使用此函数并将其扩展到任何其他类似的情况。
为了将错误记录发送到 DLT,我们需要提供以下配置。
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
为了激活 DLQ,应用程序必须提供组名称。匿名消费者无法使用 DLQ 设施。我们还需要通过enableDLQ将 Kafka 消费者绑定上的属性设置为 来启用 DLQ true。最后,我们可以选择通过提供 Kafka 消费者绑定来提供 DLT 名称dlqName,否则error.input-topic.my-group在本例中默认为该绑定。
请注意,在上面提供的示例消费者中,有效负载的类型是byte[]。默认情况下,Kafka binder 中的 DLQ 生产者期望类型为 的有效负载byte[]。如果情况并非如此,那么我们需要提供正确的序列化器的配置。例如,让我们重写消费者函数如下:
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
现在,我们需要告诉 Spring Cloud Stream,在写入 DLT 时我们希望如何序列化数据。以下是针对此场景的修改后的配置:
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
4.2. 具有高级重试选项的 DLQ
4.2.1. 问题陈述
这与上面的配方类似,但作为开发人员,我想配置重试的处理方式。
4.2.2. 解决方案
如果您遵循上述方法,那么当处理遇到错误时,您将获得 Kafka 绑定器中内置的默认重试选项。
默认情况下,绑定器最多退出 3 次,初始延迟为 1 秒,每次退出的乘数为 2.0,最大延迟为 10 秒。您可以更改所有这些配置,如下所示:
spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
如果需要,您还可以通过提供布尔值映射来提供可重试异常的列表。例如,
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
默认情况下,将重试上图中未列出的任何异常。如果不需要,那么您可以通过提供来禁用它,
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
您还可以提供自己的RetryTemplate并将其标记为@StreamRetryTemplate将由活页夹扫描和使用。当您需要更复杂的重试策略和策略时,这非常有用。
如果您有多个@StreamRetryTemplatebean,那么您可以使用 属性指定您的绑定需要哪一个,
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
4.3. 使用 DLQ 处理反序列化错误
4.3.1. 问题陈述
我有一个处理器在 Kafka 消费者中遇到反序列化异常。我希望 Spring Cloud Stream DLQ 机制能够捕获这种情况,但事实并非如此。我该如何处理这个问题?
4.3.2. 解决方案
当 Kafka 消费者抛出不可恢复的反序列化异常时,Spring Cloud Stream 提供的正常 DLQ 机制将无济于事。这是因为,这个异常甚至在消费者的poll()方法返回之前就发生了。Spring for Apache Kafka 项目提供了一些很好的方法来帮助绑定器解决这种情况。让我们探索一下这些。
假设这是我们的函数:
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
这是一个带有String参数的简单函数。
我们希望绕过 Spring Cloud Stream 提供的消息转换器,并希望使用本机反序列化器。对于String类型而言,它没有多大意义,但对于像 AVRO 等更复杂的类型,您必须依赖外部反序列化器,因此希望将转换委托给 Kafka。
现在,当消费者收到数据时,让我们假设有一条错误记录导致反序列化错误,例如可能有人传递了 anInteger而不是 a 。String在这种情况下,如果您不在应用程序中执行某些操作,异常将通过链传播,并且您的应用程序最终将退出。
为了处理这个问题,您可以添加一个ListenerContainerCustomizer @Bean配置DefaultErrorHandler. 这DefaultErrorHandler是配置有一个DeadLetterPublishingRecoverer. 我们还需要ErrorHandlingDeserializer为消费者配置一个。这听起来像是很多复杂的事情,但实际上,在本例中,它可以归结为这 3 个 bean。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
让我们逐一分析一下。第一个是ListenerContainerCustomizer需要DefaultErrorHandler. 现在,容器已使用特定的错误处理程序进行了自定义。您可以在此处了解有关容器定制的更多信息。
第二个 bean 是DefaultErrorHandler配置为发布到DLT. 有关 的更多详细信息,请参见此处DefaultErrorHandler。
第三个 bean 是DeadLetterPublishingRecoverer最终负责发送到 的DLT。默认情况下,DLT主题命名为 ORIGINAL_TOPIC_NAME.DLT。不过你可以改变这一点。请参阅文档了解更多详细信息。
我们还需要通过应用程序配置来配置ErrorHandlingDeserializer。
ErrorHandlingDeserializer实际解串器的代表。如果出现错误,它将记录的键/值设置为空并包含消息的原始字节。然后,它在标头中设置异常并将该记录传递给侦听器,然后侦听器调用注册的错误处理程序。
以下是所需的配置:
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
ErrorHandlingDeserializer我们通过configuration绑定上的属性提供。我们还表明要委托的实际反序列化器是StringDeserializer.
请记住,上面的 dlq 属性均与本节中的讨论无关。它们纯粹是为了解决任何应用程序级别的错误。
4.4. Kafka Binder 中的基本偏移量管理
4.4.1. 问题陈述
我想编写一个 Spring Cloud Stream Kafka 消费者应用程序,但不确定它如何管理 Kafka 消费者偏移量。你可以解释吗?
4.4.2. 解决方案
我们鼓励您阅读有关此内容的文档部分,以全面了解它。
这是它的要点:
Kafka 默认支持两种类型的偏移量 -earliest和latest。它们的语义从名称中就可以不言自明。
假设您是第一次运行消费者。如果您在 Spring Cloud Stream 应用程序中错过了 group.id,那么它就会成为匿名消费者。每当您有匿名消费者时,在这种情况下,Spring Cloud Stream 应用程序默认将从latest主题分区中的可用偏移量开始。另一方面,如果您显式指定 group.id,则默认情况下,Spring Cloud Stream 应用程序将从earliest主题分区中的可用偏移量开始。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset在上述两种情况下(具有显式组和匿名组的使用者),可以通过使用该属性并将其设置为 或 来earliest切换起始偏移量latest。
现在,假设您之前已经运行过消费者,现在再次启动它。在这种情况下,上述情况中的起始偏移量语义不适用,因为消费者找到了消费者组已提交的偏移量(在匿名消费者的情况下,尽管应用程序不提供 group.id,但绑定器将自动为您生成一个)。它只是从最后提交的偏移量开始。即使您提供了值,也是如此startOffset。
但是,您可以使用该resetOffsets属性覆盖消费者从最后提交的偏移量开始的默认行为。为此,请将属性设置spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets为true(false默认情况下)。然后确保您提供该startOffset值( 或earliest)latest。当您执行此操作然后启动使用者应用程序时,每次启动时,它都会像第一次启动一样启动,并忽略该分区的任何已提交偏移量。
4.5. 寻求 Kafka 中的任意偏移量
4.5.1. 问题陈述
使用 Kafka binder,我知道它可以将偏移量设置为 或earliest,latest但我需要寻求中间某个位置的偏移量,即任意偏移量。有没有办法使用 Spring Cloud Stream Kafka 绑定器来实现这一点?
4.5.2. 解决方案
之前我们了解了 Kafka Binder 如何帮助您处理基本的偏移量管理。默认情况下,活页夹不允许您倒带到任意偏移量,至少通过我们在该配方中看到的机制是这样。但是,绑定器提供了一些低级策略来实现此用例。让我们来探索一下它们。
首先,当您想要重置为earliest或 以外的任意偏移量时latest,请确保将resetOffsets配置保留为默认值,即false。然后,您必须提供类型为 的自定义 bean KafkaBindingRebalanceListener,它将被注入到所有消费者绑定中。它是一个带有一些默认方法的接口,但这是我们感兴趣的方法:
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
让我们看看细节。
本质上,每次在主题分区的初始分配期间或重新平衡之后都会调用此方法。为了更好地说明,让我们假设我们的主题是foo并且它有 4 个分区。最初,我们只在组中启动一个消费者,该消费者将从所有分区进行消费。当消费者第一次启动时,所有 4 个分区都会被初始分配。但是,我们不想启动分区以默认值进行消费(earliest因为我们定义了一个组),而是对于每个分区,我们希望它们在寻找任意偏移量后进行消费。想象一下,您有一个业务案例可以从某些偏移量中使用,如下所示。
Partition start offset
0 1000
1 2000
2 2000
3 1000
这可以通过如下实施上述方法来实现。
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
这只是一个基本的实现。现实世界的用例比这复杂得多,您需要进行相应的调整,但这肯定会给您一个基本的草图。当消费者seek失败时,它可能会抛出一些运行时异常,您需要决定在这些情况下该怎么做。
4.5.3. 如果我们使用相同的组 id 启动第二个消费者怎么办?
当我们添加第二个消费者时,将发生重新平衡,并且一些分区将被移动。假设新消费者获得分区2和3。当这个新的 Spring Cloud Stream 消费者调用此方法时,它将看到这是对该消费者onPartitionsAssigned的分区的初始分配。因此,由于对参数进行条件检查,它将执行查找操作。对于第一个消费者,它现在只有分区,
但是,对于这个消费者来说,这只是一个重新平衡事件,而不被视为初始分配。因此,由于对参数的条件检查,它不会重新寻找给定的偏移量。23initial01initial
4.6. 如何使用 Kafka Binder 手动确认?
4.6.1. 问题陈述
使用 Kafka 绑定器,我想手动确认消费者中的消息。我怎么做?
4.6.2. 解决方案
默认情况下,Kafka Binder 委托给 Apache Kafka 项目的 Spring 中的默认提交设置。ackModeSpring Kafka 中的默认值是batch. 请参阅此处了解更多详细信息。
在某些情况下,您希望禁用此默认提交行为并依赖于手动提交。以下步骤可以让您做到这一点。
将属性设置spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode为MANUAL或MANUAL_IMMEDIATE。当这样设置时,消费者方法收到的消息中将出现一个名为kafka_acknowledgment(from ) 的标头。KafkaHeaders.ACKNOWLEDGMENT
例如,想象这是您的消费者方法。
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
然后将该属性设置spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode为MANUAL或MANUAL_IMMEDIATE。
4.7. 如何覆盖 Spring Cloud Stream 中的默认绑定名称?
4.7.1. 问题陈述
Spring Cloud Stream 根据函数定义和签名创建默认绑定,但如何将这些覆盖为更适合域的名称?
4.7.2. 解决方案
假设以下是您的函数签名。
@Bean
public Function<String, String> uppercase(){
...
}
默认情况下,Spring Cloud Stream 将创建如下绑定。
-
0 中的大写字母
-
大写输出-0
您可以使用以下属性将这些绑定覆盖为某些内容。
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
此后,必须对新名称my-transformer-in和进行所有绑定属性my-transformer-out。
这是 Kafka Streams 和多个输入的另一个示例。
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
默认情况下,Spring Cloud Stream 会为此函数创建三个不同的绑定名称。
-
处理Order-in-0
-
处理订单-1
-
处理订单-out-0
每次要在这些绑定上设置某些配置时,都必须使用这些绑定名称。您不喜欢这样,并且想要使用更多域友好且可读的绑定名称,例如类似的名称。
-
命令
-
账户
-
丰富的订单
只需设置这三个属性即可轻松做到这一点
-
spring.cloud.stream.function.bindings.processOrder-in-0=订单
-
spring.cloud.stream.function.bindings.processOrder-in-1=帐户
-
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
执行此操作后,它将覆盖默认绑定名称,并且您要在其上设置的任何属性都必须位于这些新绑定名称上。
4.8. 如何发送消息密钥作为我的记录的一部分?
4.8.1. 问题陈述
我需要发送密钥以及记录的有效负载,有没有办法在 Spring Cloud Stream 中做到这一点?
4.8.2. 解决方案
您通常需要发送关联数据结构(例如映射)作为带有键和值的记录。Spring Cloud Stream 允许您以简单的方式做到这一点。以下是执行此操作的基本蓝图,但您可能希望对其进行调整以适应您的特定用例。
这是示例生产者方法(又名Supplier)。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
String这是一个简单的函数,它发送带有有效负载和密钥的消息。请注意,我们使用 将密钥设置为消息头KafkaHeaders.MESSAGE_KEY。
如果你想改变默认的键kafka_messageKey,那么在配置中,我们需要指定这个属性:
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
请注意,我们使用绑定名称,supplier-out-0因为这是我们的函数名称,请相应更新。
然后,我们在生成消息时使用这个新密钥。
4.9. 如何使用本机序列化器和反序列化器而不是 Spring Cloud Stream 完成的消息转换?
4.9.1. 问题陈述
我不想使用 Spring Cloud Stream 中的消息转换器,而是想使用 Kafka 中的本机序列化器和反序列化器。默认情况下,Spring Cloud Stream 使用其内部内置消息转换器来处理此转换。我怎样才能绕过这个并将责任委托给卡夫卡?
4.9.2. 解决方案
这真的很容易做到。
您所要做的就是提供以下属性来启用本机序列化。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
然后,您还需要设置序列化器。有几种方法可以做到这一点。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
或使用活页夹配置。
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
使用活页夹方式时,它适用于所有绑定,而在绑定处设置它们是针对每个绑定的。
在反序列化方面,您只需提供反序列化器作为配置。
例如,
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
您还可以在活页夹级别设置它们。
您可以设置一个可选属性来强制进行本机解码。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
然而,对于 Kafka 绑定器来说,这是不必要的,因为当它到达绑定器时,Kafka 已经使用配置的反序列化器对它们进行反序列化。
4.10. 解释 Kafka Streams 绑定器中的偏移量重置如何工作
4.10.1. 问题陈述
默认情况下,Kafka Streams 绑定器始终从新消费者的最早偏移量开始。有时,应用程序从最新的偏移量开始是有益的或要求的。Kafka Streams 绑定器可以让您做到这一点。
4.10.2. 解决方案
在研究解决方案之前,让我们先看看以下场景。
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
我们有一个BiConsumer需要两个输入绑定的 bean。在本例中,第一个绑定用于 a KStream,第二个绑定用于 a KTable。第一次运行此应用程序时,默认情况下,两个绑定都从earliest偏移量开始。那我latest因为一些要求想从offset开始呢?您可以通过启用以下属性来做到这一点。
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
如果您只希望一个绑定从 offset 开始latest,另一个绑定从 default 开始earliest,则将后一个绑定从配置中保留。
请记住,一旦存在已提交的偏移量,这些设置将不会被遵守,并且已提交的偏移量优先。
4.11. 跟踪 Kafka 中记录的成功发送(生成)
4.11.1. 问题陈述
我有一个 Kafka 生产者应用程序,我想跟踪所有成功的发送。
4.11.2. 解决方案
让我们假设我们的应用程序中有以下供应商。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
然后,我们需要定义一个新的MessageChannelbean来捕获所有成功发送的信息。
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
接下来,在应用程序配置中定义此属性以提供recordMetadataChannel.
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
此时,发送成功的信息将被发送到fooRecordChannel。
您可以编写IntegrationFlow如下内容来查看信息。
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
在该handle方法中,有效负载是发送到 Kafka 的内容,消息头包含一个名为 的特殊键kafka_recordMetadata。它的值是a RecordMetadata,包含有关主题分区、当前偏移量等信息。
4.12. 在 Kafka 中添加自定义标头映射器
4.12.1. 问题陈述
我有一个 Kafka 生产者应用程序,它设置了一些标头,但它们在消费者应用程序中丢失了。这是为什么?
4.12.2. 解决方案
正常情况下,这应该没问题。
想象一下,您有以下制作人。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
在消费者方面,您仍然应该看到标题“foo”,并且以下内容不会给您带来任何问题。
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
如果您在应用程序中提供自定义标头映射器,那么这将不起作用。假设您的KafkaHeaderMapper应用程序中有一个空白。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
如果这是您的实现,那么您将错过foo消费者的标头。很可能,这些方法中可能有一些逻辑KafkaHeaderMapper。您需要以下内容来填充foo标题。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
这将正确填充foo从生产者到消费者的标头。
4.12.3. id 标头的特别说明
在 Spring Cloud Stream 中,id标头是一个特殊的标头,但某些应用程序可能希望有特殊的自定义 id 标头 - 类似custom-idorID或Id。第一个 ( custom-id) 将在没有任何自定义标头映射器的情况下从生产者传播到消费者。但是,如果您使用框架保留标头的变体进行生成id- 例如ID、Id等iD,那么您将遇到框架内部的问题。请参阅此StackOverflow 线程,了解有关此用例的更多上下文。在这种情况下,您必须使用自定义KafkaHeaderMapper来映射区分大小写的 id 标头。例如,假设您有以下制作人。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
上面的标头Id将从消费端消失,因为它与框架id标头发生冲突。您可以提供自定义KafkaHeaderMapper来解决此问题。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
通过这样做,生产者端和消费者端都可以使用id和标头。Id
4.13. 在交易中生成多个主题
4.13.2. 解决方案
使用 Kafka Binder 中的事务支持进行事务,然后提供AfterRollbackProcessor. 为了生成多个主题,请使用StreamBridgeAPI。
以下是此操作的代码片段:
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
4.13.3. 所需配置
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
为了进行测试,您可以使用以下命令:
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
一些重要的注意事项:
请确保您在应用程序配置上没有任何 DLQ 设置,因为我们手动配置 DLT(默认情况下它将发布到根据input.DLT初始消费者函数命名的主题)。另外,将maxAttempts消费者绑定重置为1以避免绑定器重试。在上面的示例中,总共最多尝试 3 次(初始尝试 + 中的 2 次尝试FixedBackoff)。
有关如何测试此代码的更多详细信息,请参阅StackOverflow 线程。如果您使用 Spring Cloud Stream 通过添加更多消费者函数来测试它,请确保将isolation-level消费者绑定设置为read-committed。
这个StackOverflow 线程也与此讨论相关。
4.14。运行多个可轮询消费者时要避免的陷阱
4.14.1. 问题陈述
如何运行可轮询消费者的多个实例并client.id为每个实例生成唯一的?
4.14.2. 解决方案
假设我有以下定义:
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
运行应用程序时,Kafka 消费者会生成一个 client.id (类似于consumer-my-group-1)。对于正在运行的应用程序的每个实例,这client.id都是相同的,从而导致意外问题。
为了解决此问题,您可以在应用程序的每个实例上添加以下属性:
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
有关更多详细信息,请参阅此GitHub 问题。