Spring Cloud Bus 将分布式系统的节点与轻量级消息代理链接起来。然后,该代理可用于广播状态更改(例如配置更改)或其他管理指令。一个关键思想是,总线就像是横向扩展的 Spring Boot 应用程序的分布式执行器。但是,它也可以用作应用程序之间的通信渠道。该项目为 AMQP 代理或 Kafka 作为传输提供了启动器。

Spring Cloud 在非限制性 Apache 2.0 许可证下发布。如果您想为文档的这一部分做出贡献或者发现错误,请在github项目中查找源代码和问题跟踪器。

1. 快速入门

如果 Spring Cloud Bus 在类路径上检测到自身,则通过添加 Spring Boot 自动配置来工作。要启用总线,请将spring-cloud-starter-bus-amqp或 添加spring-cloud-starter-bus-kafka到您的依赖项管理中。Spring Cloud 负责剩下的事情。确保代理(RabbitMQ 或 Kafka)可用并已配置。在本地主机上运行时,您无需执行任何操作。如果远程运行,请使用 Spring Cloud Connectors 或 Spring Boot 约定来定义代理凭据,如以下 Rabbit 示例所示:

应用程序.yml
spring:
  rabbitmq:
    host: mybroker.com
    port: 5672
    username: user
    password: secret

该总线当前支持向所有侦听节点或特定服务(由 Eureka 定义)的所有节点发送消息。执行/bus/*器命名空间有一些 HTTP 端点。目前,已实施两项。第一个,/bus/env发送键/值对来更新每个节点的 Spring 环境。第二个,/bus/refresh重新加载每个应用程序的配置,就好像它们都已在其/refresh 端点上 ping 通一样。

Spring Cloud Bus 启动器涵盖了 Rabbit 和 Kafka,因为它们是两种最常见的实现。然而,Spring Cloud Stream 非常灵活,并且 Binder 可以与spring-cloud-bus.

2. 总线端点

Spring Cloud Bus 提供了两个端点,/actuator/busrefresh分别对应/actuator/busenv 于 Spring Cloud Commons 中的各个执行器端点 。/actuator/refresh/actuator/env

2.1. 总线刷新端点

端点/actuator/busrefresh清除RefreshScope缓存并重新绑定 @ConfigurationProperties。有关详细信息,请参阅刷新范围文档。

要公开/actuator/busrefresh端点,您需要将以下配置添加到您的应用程序:

management.endpoints.web.exposure.include=busrefresh

2.2. 总线环境端点

端点/actuator/busenv使用跨多个实例的指定键/值对更新每个实例环境。

要公开/actuator/busenv端点,您需要将以下配置添加到您的应用程序:

management.endpoints.web.exposure.include=busenv

端点/actuator/busenv接受POST具有以下形状的请求:

{
    "name": "key1",
    "value": "value1"
}

3. 寻址实例

应用程序的每个实例都有一个服务 ID,可以设置其值, spring.cloud.bus.id并且其值应为以冒号分隔的标识符列表,按从最不具体到最具体的顺序排列。默认值是根据环境构造为spring.application.nameserver.port(或spring.application.index,如果设置的话)的组合。ID 的默认值以 的形式构造app:index:id,其中:

  • appvcap.application.name,如果存在的话,或者spring.application.name

  • indexvcap.application.instance_index,如果存在,则为 、 spring.application.indexlocal.server.portserver.port0(按该顺序)。

  • idvcap.application.instance_id,如果存在的话,或者是一个随机值。

HTTP 端点接受“目标”路径参数,例如 /busrefresh/customers:9000,其中destination是服务 ID。如果总线上的某个实例拥有该 ID,则它会处理该消息,而所有其他实例会忽略该消息。

4. 寻址服务的所有实例

Spring 中使用“destination”参数PathMatcher(路径分隔符为冒号 —  :)来确定实例是否处理消息。使用前面的示例,/busenv/customers:**定位“客户”服务的所有实例,而不管服务 ID 的其余部分如何。

5.服务ID必须唯一

总线尝试两次消除对事件的处理——一次从原始事件中 ApplicationEvent,一次从队列中。为此,它会根据当前服务 ID 检查发送服务 ID。如果服务的多个实例具有相同的 ID,则不会处理事件。当在本地计算机上运行时,每个服务都位于不同的端口上,并且该端口是 ID 的一部分。Cloud Foundry 提供了一个索引来区分。为了确保 ID 在 Cloud Foundry 外部是唯一的,请spring.application.index为每个服务实例设置唯一的 ID。

6. 自定义消息代理

Spring Cloud Bus 使用Spring Cloud Stream来广播消息。因此,要使消息流动,您只需在类路径中包含您选择的绑定器实现即可。有AMQP(RabbitMQ)和Kafka(spring-cloud-starter-bus-[amqp|kafka])的总线方便的启动器。一般来说,Spring Cloud Stream 依赖 Spring Boot 自动配置约定来配置中间件。例如,可以使用 spring.rabbitmq.*配置属性更改 AMQP 代理地址。Spring Cloud Bus 有一些本机配置属性spring.cloud.bus.*(例如, spring.cloud.bus.destination用作外部中间件的主题的名称)。通常,默认值就足够了。

要了解有关如何自定义消息代理设置的更多信息,请参阅 Spring Cloud Stream 文档。

7. 跟踪总线事件

总线事件(其子类RemoteApplicationEvent)可以通过设置来跟踪 spring.cloud.bus.trace.enabled=true。如果这样做,Spring Boot TraceRepository (如果存在)会显示发送的每个事件以及来自每个服务实例的所有确认。以下示例来自/trace端点:

{
  "timestamp": "2015-11-26T10:24:44.411+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "stores:8081",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.864+0000",
  "info": {
    "signal": "spring.cloud.bus.sent",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
  },
  {
  "timestamp": "2015-11-26T10:24:41.862+0000",
  "info": {
    "signal": "spring.cloud.bus.ack",
    "type": "RefreshRemoteApplicationEvent",
    "id": "c4d374b7-58ea-4928-a312-31984def293b",
    "origin": "customers:9000",
    "destination": "*:**"
  }
}

前面的跟踪显示 aRefreshRemoteApplicationEvent从 发送 customers:9000,广播到所有服务,并由customers:9000和 接收(确认) stores:8081

要自己处理 ack 信号,您可以向应用程序添加 和 类型(并启用跟踪@EventListenerAckRemoteApplicationEventSentApplicationEvent或者,您可以利用TraceRepository并从那里挖掘数据。

任何总线应用程序都可以跟踪 ack。然而,有时,在中央服务中执行此操作很有用,该服务可以对数据进行更复杂的查询或将其转发到专门的跟踪服务。

8. 广播您自己的活动

总线可以承载任何类型的事件RemoteApplicationEvent。默认传输是 JSON,解串器需要提前知道要使用哪些类型。要注册新类型,必须将其放入 org.springframework.cloud.bus.event.

要自定义事件名称,您可以@JsonTypeName在自定义类上使用或依赖默认策略,即使用类的简单名称。

生产者和消费者都需要访问类定义。

8.1. 在自定义包中注册事件

如果您不能或不想使用org.springframework.cloud.bus.event 自定义事件的子包,则必须 RemoteApplicationEvent使用@RemoteApplicationEventScan注释指定要扫描该类型事件的包。使用 include 指定的包@RemoteApplicationEventScan包含子包。

例如,考虑以下自定义事件,称为MyEvent

package com.acme;

public class MyEvent extends RemoteApplicationEvent {
    ...
}

您可以通过以下方式向解串器注册该事件:

package com.acme;

@Configuration
@RemoteApplicationEventScan
public class BusConfiguration {
    ...
}

@RemoteApplicationEventScan 如果不指定值,则注册所使用的类的包。在此示例中,com.acme是使用 的包注册的 BusConfiguration

您还可以使用valuebasePackagesbasePackageClasses上的属性显式指定要扫描的包@RemoteApplicationEventScan,如以下示例所示:

package com.acme;

@Configuration
//@RemoteApplicationEventScan({"com.acme", "foo.bar"})
//@RemoteApplicationEventScan(basePackages = {"com.acme", "foo.bar", "fizz.buzz"})
@RemoteApplicationEventScan(basePackageClasses = BusConfiguration.class)
public class BusConfiguration {
    ...
}

前面的所有示例@RemoteApplicationEventScan都是等效的,因为该 com.acme包是通过显式指定 上的包来注册的 @RemoteApplicationEventScan

您可以指定要扫描的多个基础包。

9. 配置属性

要查看所有总线相关配置属性的列表,请查看附录页面