Spring集成交互
Spring 集成框架扩展了 Spring 编程模型以支持众所周知的企业集成模式。它支持基于 Spring 的应用程序内的轻量级消息传递,并支持通过声明性适配器与外部系统集成。它还提供了一个高级 DSL,将各种操作(端点)组合成一个逻辑集成流。通过这种 DSL 配置的 lambda 风格,Spring Integration 已经具有良好的接口采用水平java.util.function。代理@MessagingGateway接口也可以是Function或Consumer,根据 Spring Cloud Function 环境可以注册到函数目录中。有关其对函数的支持的更多信息,请参阅 Spring Integration ReferenceManual 。
另一方面,从 version 开始4.0.3,Spring Cloud Function 引入了一个spring-cloud-function-integration模块,该模块提供了更深入、更特定于云且基于自动配置的 API,用于FunctionCatalog从 Spring Integration DSL 角度进行交互。是FunctionFlowBuilder自动配置和自动连接的FunctionCatalog,表示目标实例的功能特定 DSL 的入口点IntegrationFlow。除了标准IntegrationFlow.from()工厂(为了方便)之外,还FunctionFlowBuilder公开了一个工厂来查找提供的fromSupplier(String supplierDefinition)目标。那么这就导致了. 这是 和 的实现,分别公开和运算符来查找或。请参阅他们的 Javadocs 以获取更多信息。SupplierFunctionCatalogFunctionFlowBuilderFunctionFlowDefinitionFunctionFlowDefinitionIntegrationFlowExtensionapply(String functionDefinition)accept(String consumerDefinition)FunctionConsumerFunctionCatalog
以下示例演示了FunctionFlowBuilder实际操作以及 API 其余部分的功能IntegrationFlow:
@Configuration
public class IntegrationConfiguration {
@Bean
Supplier<byte[]> simpleByteArraySupplier() {
return "simple test data"::getBytes;
}
@Bean
Function<String, String> upperCaseFunction() {
return String::toUpperCase;
}
@Bean
BlockingQueue<String> results() {
return new LinkedBlockingQueue<>();
}
@Bean
Consumer<String> simpleStringConsumer(BlockingQueue<String> results) {
return results::add;
}
@Bean
QueueChannel wireTapChannel() {
return new QueueChannel();
}
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("simpleByteArraySupplier")
.wireTap("wireTapChannel")
.apply("upperCaseFunction")
.log(LoggingHandler.Level.WARN)
.accept("simpleStringConsumer");
}
}
由于FunctionCatalog.lookup()功能不仅限于简单的函数名称,因此还可以在提到的apply()和accept()运算符中使用函数组合功能:
@Bean
IntegrationFlow functionCompositionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.from("functionCompositionInput")
.accept("upperCaseFunction|simpleStringConsumer");
}
当我们将预定义函数的自动配置依赖项添加到 Spring Cloud 应用程序中时,该 API 变得更加相关。例如,流应用程序项目除了应用程序映像之外,还提供具有用于各种集成用例的功能的工件,例如debezium-supplier,elasticsearch-consumer等等aggregator-function。
以下配置分别基于http-supplier、spel-function和file-consumer:
@Bean
IntegrationFlow someFunctionFlow(FunctionFlowBuilder functionFlowBuilder) {
return functionFlowBuilder
.fromSupplier("httpSupplier", e -> e.poller(Pollers.trigger(new OnlyOnceTrigger())))
.<Flux<?>>handle((fluxPayload, headers) -> fluxPayload, e -> e.async(true))
.channel(c -> c.flux())
.apply("spelFunction")
.<String, String>transform(String::toUpperCase)
.accept("fileConsumer");
}
我们需要的只是将它们的配置添加到application.properties(如果需要):
http.path-pattern=/testPath
spel.function.expression=new String(payload)
file.consumer.name=test-data.txt