Spring Integration Java DSL:逐行教程

工程 | Artem Bilan | 2014 年 11 月 25 日 | ...

亲爱的 Spring 社区!

Spring Integration Java DSL 1.0 GA 发布公告之后,我想向您介绍 Spring Integration Java DSL,这是一个基于经典Cafe Demo 集成示例的逐行教程。我们在此描述了Spring Boot 支持、Spring Framework Java 和注解 配置、IntegrationFlow 功能,并向 Java 8 Lambda 支持致敬,后者是 DSL 样式的灵感来源。当然,这一切都由Spring Integration Core 项目提供支持。

对于那些尚未对 Java 8 感兴趣的人,我们提供了类似的无 Lambda 教程:Spring Integration Java DSL(Java 8 之前):逐行教程

但是,在我们开始描述 Cafe 演示应用程序之前,这里有一个更简短的示例,以便开始…

@Configuration
@EnableAutoConfiguration
@IntegrationComponentScan
public class Start {

	public static void main(String[] args) throws InterruptedException {
		ConfigurableApplicationContext ctx = 
                                 SpringApplication.run(Start.class, args);

		List<String> strings = Arrays.asList("foo", "bar");
		System.out.println(ctx.getBean(Upcase.class).upcase(strings));

		ctx.close();
	}

	@MessagingGateway
	public interface Upcase {

		@Gateway(requestChannel = "upcase.input")
		Collection<String> upcase(Collection<String> strings);

	}

	@Bean
	public IntegrationFlow upcase() {
	     return f -> f
		 	.split()                                         // 1
			.<String, String>transform(String::toUpperCase)  // 2
			.aggregate();                                    // 3
	}

}

我们将把基础设施(注解等)的描述留给主要的咖啡流程描述。在这里,我们希望您专注于最后一个 @Bean,即 IntegrationFlow 以及将消息发送到该流程的网关方法。

main 方法中,我们将字符串集合发送到网关,并将结果打印到 STDOUT。该流程首先将集合拆分为单个 String(1);然后将每个字符串转换为大写(2),最后我们将它们重新聚合回集合(3)。由于这是流程的末尾,框架将聚合的结果返回到网关,并且新的有效负载成为网关方法的返回值。

等效的 XML 配置可能是…

<int:gateway service interface="foo.Upcase" 
                 default-request-channel="upcase.input">

<int:splitter input-channel="upcase.input" output-channel="transform"/>

<int:transformer expression="payload.toUpperCase()"
    input-channel="transform"
    output-channel="aggregate" />

<int:aggregator input-channle="aggregate" />

或者…

<int:gateway service interface="foo.Upcase" 
                default-request-channel="upcase.input">

<int:chain input-channel="upcase.input">
    <int:splitter />
    <int:transformer expression="payload.toUpperCase()" />
    <int:aggregator />
</int:chain>

## Cafe Demo

Cafe Demo 应用程序的目的是演示如何使用企业集成模式 (EIP) 来反映现实生活中咖啡馆的 订单-配送 场景。通过此应用程序,我们处理多个饮料订单 - 热饮和冷饮。运行应用程序后,我们可以在标准输出 (System.out.println) 中看到冷饮比热饮准备得更快。但是整个订单的配送将推迟到热饮准备好为止。

为了反映领域模型,我们有几个类:OrderOrderItemDrinkDelivery。它们都提到了集成场景中,但我们不会在这里分析它们,因为它们足够简单。

我们应用程序的源代码仅放置在一个类中;重要的行用一个数字进行注释,该数字对应于后面的注释。

@SpringBootApplication               // 1
@IntegrationComponentScan            // 2
public class Application {

  public static void main(String[] args) throws Exception {
  	ConfigurableApplicationContext ctx =
  	              SpringApplication.run(Application.class, args);// 3

  	Cafe cafe = ctx.getBean(Cafe.class);                         // 4
  	for (int i = 1; i <= 100; i++) {                             // 5
       Order order = new Order(i);
       order.addItem(DrinkType.LATTE, 2, false); //hot
       order.addItem(DrinkType.MOCHA, 3, true);  //iced
       cafe.placeOrder(order);
  	}

  	System.out.println("Hit 'Enter' to terminate");              // 6
  	System.in.read();
  	ctx.close();
  }

  @MessagingGateway                                              // 7
  public interface Cafe {

  	@Gateway(requestChannel = "orders.input")                    // 8
  	void placeOrder(Order order);                                // 9

  }

  private AtomicInteger hotDrinkCounter = new AtomicInteger();

  private AtomicInteger coldDrinkCounter = new AtomicInteger();  // 10

  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata poller() {                               // 11
  	return Pollers.fixedDelay(1000).get();
  }

  @Bean
  public IntegrationFlow orders() {                             // 12
  	return f -> f                                               // 13
  	  .split(Order.class, Order::getItems)                      // 14
  	  .channel(c -> c.executor(Executors.newCachedThreadPool()))// 15
  	  .<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping // 16
  	    .subFlowMapping("true", sf -> sf                        // 17
  	      .channel(c -> c.queue(10))                            // 18
  	      .publishSubscribeChannel(c -> c                       // 19
  	        .subscribe(s ->                                     // 20
  	          s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))// 21
  	        .subscribe(sub -> sub                               // 22
  	          .<OrderItem, String>transform(item ->
  	            Thread.currentThread().getName()
  	              + " prepared cold drink #"
  	              + this.coldDrinkCounter.incrementAndGet()
  	              + " for order #" + item.getOrderNumber()
  	              + ": " + item)                                 // 23
  	          .handle(m -> System.out.println(m.getPayload())))))// 24
  	    .subFlowMapping("false", sf -> sf                        // 25
  	      .channel(c -> c.queue(10))
  	      .publishSubscribeChannel(c -> c
  	        .subscribe(s ->
  	          s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))// 26
  	        .subscribe(sub -> sub
  	          .<OrderItem, String>transform(item ->
  	            Thread.currentThread().getName()
  	              + " prepared hot drink #"
  	              + this.hotDrinkCounter.incrementAndGet()
  	              + " for order #" + item.getOrderNumber()
  	              + ": " + item)
  	          .handle(m -> System.out.println(m.getPayload()))))))
  	  .<OrderItem, Drink>transform(orderItem ->
  	    new Drink(orderItem.getOrderNumber(),
  	      orderItem.getDrinkType(),
  	      orderItem.isIced(),
  	      orderItem.getShots()))                                // 27
  	  .aggregate(aggregator -> aggregator                       // 28
  	    .outputProcessor(group ->                               // 29
  	      new Delivery(group.getMessages()
  	        .stream()
  	        .map(message -> (Drink) message.getPayload())
  	        .collect(Collectors.toList())))                     // 30
  	    .correlationStrategy(m ->
  	      ((Drink) m.getPayload()).getOrderNumber()), null)     // 31
  	  .handle(CharacterStreamWritingMessageHandler.stdout());   // 32
  }

}

逐行检查代码…

1

@SpringBootApplication

来自 Spring Boot 1.2 的新的元注解。包括 @Configuration@EnableAutoConfiguration。由于我们处于 Spring Integration 应用程序中,并且 Spring Boot 为其提供了自动配置,因此会自动应用 @EnableIntegration,以初始化 Spring Integration 基础设施,包括 Java DSL 的环境 - DslIntegrationConfigurationInitializer,它由 IntegrationConfigurationBeanFactoryPostProcessor/META-INF/spring.factories 中获取。

2

@IntegrationComponentScan

Spring Integration 的 @ComponentScan 类似物,用于基于接口扫描组件(Spring Framework 的 @ComponentScan 仅查看类)。Spring Integration 支持发现用 @MessagingGateway 注解的接口(见下面的 #7)。

3

ConfigurableApplicationContext ctx = SpringApplication.run(Application.class, args);

我们类的 main 方法旨在使用此类中的配置启动 Spring Boot 应用程序,并通过 Spring Boot 启动 ApplicationContext。此外,它将命令行参数委托给 Spring Boot。例如,您可以指定 --debug 以查看引导自动配置报告的日志。

4

Cafe cafe = ctx.getBean(Cafe.class);

由于我们已经有了一个 ApplicationContext,因此我们可以开始与应用程序交互。而 Cafe 就是那个入口点 - 用 EIP 术语来说,就是 网关。网关只是接口,应用程序不与 Messaging API 交互;它只是处理领域(见下面的 #7)。

5

for (int i = 1; i <= 100; i++) {

为了演示咖啡馆的“工作”,我们启动了 100 个订单,每个订单包含两杯饮料 - 一杯热饮和一杯冷饮。并将 Order 发送到 Cafe 网关。

6

System.out.println("Hit 'Enter' to terminate");

通常,Spring Integration 应用程序是异步的,因此为了避免 main 线程过早退出,我们阻止 main 方法,直到通过命令行进行一些最终用户交互。非守护线程将使应用程序保持打开状态,但 System.read() 为我们提供了一种机制来干净地关闭应用程序。

7

@MessagingGateway

用于标记业务接口以指示它是最终应用程序和集成层之间 网关 的注解。它是 Spring Integration XML 配置中 <gateway /> 组件的类似物。Spring Integration 为此接口创建了一个 Proxy,并将其作为 bean 填充到应用程序上下文中。此 Proxy 的目的是将参数包装在 Message<?> 对象中,并根据提供的选项将其发送到 MessageChannel

8

@Gateway(requestChannel = "orders.input")

方法级注解,用于根据方法以及目标集成流区分业务逻辑。在此示例中,我们使用 orders.inputrequestChannel 引用,它是我们 IntegrationFlow 输入通道的 MessageChannel bean 名称(见下面的 #13)。

9

void placeOrder(Order order);

接口方法是最终应用程序与集成层交互的中心点。此方法具有 void 返回类型。这意味着我们的集成流是 单向 的,我们只是将消息发送到集成流,但不会等待回复。

10

private AtomicInteger hotDrinkCounter = new AtomicInteger();
private AtomicInteger coldDrinkCounter = new AtomicInteger();

两个计数器用于收集我们的咖啡馆如何处理饮料的信息。

11

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {

默认 轮询器 bean。它是 Spring Integration XML 配置中 <poller default="true"> 组件的类似物。对于 inputChannelPollableChannel 的端点是必需的。在这种情况下,对于两个 Cafe 队列 - 热饮和冷饮是必要的(见下面的 #18)。在这里,我们使用 DSL 项目中的 Pollers 工厂,并使用其方法链流畅 API 来构建轮询器元数据。请注意,如果端点需要特定 轮询器(而不是默认轮询器),则可以直接从 IntegrationFlow 定义中使用 Pollers

12

@Bean
public IntegrationFlow orders() {

IntegrationFlow bean 定义。它是 Spring Integration Java DSL 的核心组件,尽管它在运行时没有发挥任何作用,而是在 bean 注册阶段发挥作用。下面的所有其他代码都在 IntegrationFlow 对象中注册 Spring Integration 组件(MessageChannelMessageHandlerEventDrivenConsumerMessageProducerMessageSource 等),这些组件由 IntegrationFlowBeanPostProcessor 解析以处理这些组件并根据需要将它们注册为应用程序上下文中的 bean(某些元素,例如通道可能已经存在)。

13

return f -> f

IntegrationFlow 是一个 Consumer 函数式接口,因此我们可以最大程度地减少代码并仅专注于集成场景需求。它的 Lambda 接受 IntegrationFlowDefinition 作为参数。此类提供了一套全面的方法,可以将其组合到 中。我们称这些为 EIP 方法,因为它们为 EI 模式提供了实现并填充了 Spring Integration Core 中的组件。在 bean 注册阶段,IntegrationFlowBeanPostProcessor 将此内联 (Lambda) IntegrationFlow 转换为 StandardIntegrationFlow 并处理其组件。我们可以使用 IntegrationFlows 工厂(例如 IntegrationFlow.from("channelX"). ... .get())实现相同的功能,但我们发现 Lambda 定义更优雅。使用 Lambda 的 IntegrationFlow 定义将 DirectChannel 填充为流的 inputChannel,并在应用程序上下文中注册为一个 bean,在本示例中其名称为 orders.inputflow bean 名称 + ".input")。这就是我们在 Cafe 网关中使用该名称的原因。

14

.split(Order.class, Order::getItems)

由于我们的集成流通过 orders.input 通道接受消息,因此我们已准备好使用和处理它们。我们场景中的第一个 EIP 方法是 .split()。我们知道来自 orders.input 通道的消息 有效负载 是一个 Order 领域对象,因此我们可以在此处简单地使用其类型并使用 Java 8 方法引用 功能。第一个参数是我们期望的消息 有效负载 的类型,第二个参数是 getItems() 方法的引用,该方法返回 Collection<OrderItem>。因此,这执行了 拆分 EI 模式,当我们将每个集合条目作为单独的消息发送到下一个通道时。在后台,.split() 方法注册一个 MethodInvokingSplitter MessageHandler 实现以及该 MessageHandlerEventDrivenConsumer,并将 orders.input 通道连接为 inputChannel

15

.channel(c -> c.executor(Executors.newCachedThreadPool()))

.channel() EIP 方法允许在端点之间指定具体的 MessageChannel,就像通过 Spring Integration XML 配置中的 output-channel/input-channel 属性对一样。默认情况下,DSL 集成流定义中的端点使用 DirectChannel 连接,这些通道根据 IntegrationFlow bean 名称和流链中的 索引 获取 bean 名称。在这种情况下,我们使用另一个 Lambda 表达式,它从其 Channels 工厂中选择特定的 MessageChannel 实现并使用流畅 API 对其进行配置。此处的当前通道是 ExecutorChannel,以允许将消息从 拆分器 分发到单独的 线程,以便在后续流中并行处理它们。

16

.<OrderItem, Boolean>route(OrderItem::isIced, mapping -> mapping

我们场景中的下一个 EIP 方法是 .route(),用于将 热饮/冷饮 订单项发送到不同的咖啡馆厨房。我们再次在这里使用方法引用 (isIced()) 从传入的消息中获取 路由键。第二个 Lambda 参数表示 路由器映射 - 与 Spring Integration XML 配置中 <router> 组件的 <mapping> 子元素类似。但是,由于我们使用的是 Java,因此我们可以使用其 Lambda 支持更进一步!Spring Integration Java DSL 引入了 子流 定义用于 路由器,除了传统的 通道映射 之外。每个子流根据路由执行,如果子流产生结果,则将其传递到路由器后流定义中的下一个元素。

17

.subFlowMapping("true", sf -> sf 

指定当前路由器mappingKey的集成流程。在本示例中,我们有两个子流程 - hoticed。子流程与IntegrationFlow功能接口相同,因此我们可以像在顶级IntegrationFlow定义中一样使用它的Lambda。子流程与其父流程没有任何运行时依赖关系,这仅仅是一种逻辑关系。

18

.channel(c -> c.queue(10))

我们已经知道,IntegrationFlow的Lambda定义从[FLOW_BEAN_NAME].input DirectChannel开始,所以可能会有一个问题“如果我们再次指定.channel(),它在这里是如何工作的?”。DSL 会处理这种情况,并将这两个通道与一个BridgeHandler和端点连接起来。在我们的示例中,我们在这里使用了一个受限的QueueChannel来反映现实生活中咖啡馆厨房的繁忙状态。这里也是我们需要那个全局轮询器的地方,用于监听此通道的下一个端点。

19

.publishSubscribeChannel(c -> c

.publishSubscribeChannel() EIP 方法是.channel() 的变体,用于MessageChannels.publishSubscribe(),但带有.subscribe() 选项,当我们可以指定子流程作为通道的订阅者时。没错,子流程再次出现!因此,可以将子流程指定到任意深度。无论.subscribe() 子流程是否存在,父流程中的下一个端点也是此.publishSubscribeChannel() 的订阅者。由于我们已经在.route() 子流程中,因此最后一个订阅者是一个隐式的BridgeHandler,它只是将消息弹出到顶级 - 到一个类似的隐式BridgeHandler,以将消息弹出到主流程中的下一个.transform() 端点。关于我们流程的当前位置,还有一点需要注意:前面的EIP方法是.channel(c -> c.queue(10)),而这个是用于MessageChannel 的。因此,它们再次与一个隐式的BridgeHandler 绑定。在实际应用中,我们可以避免使用这个.publishSubscribeChannel(),只需使用咖啡馆厨房的单个.handle() 即可,但我们的目标是尽可能多地涵盖DSL功能。这就是为什么我们将厨房工作分配给同一个PublishSubscribeChannel 的多个子流程的原因。

20

.subscribe(s ->

.subscribe() 方法接受一个IntegrationFlow 作为参数,可以将其指定为Lambda以将订阅者配置为子流程。我们在这里使用多个子流程订阅者来避免多行Lambda,并涵盖一些DSL以及Spring Integration的功能。

21

s.handle(m -> sleepUninterruptibly(1, TimeUnit.SECONDS)))

在这里,我们使用一个简单的.handle() EIP 方法来阻塞当前线程一段时间,以演示咖啡馆厨房准备饮品的速度。我们在这里使用Google Guava Uninterruptibles.sleepUninterruptibly,以避免在Lambda表达式中使用try...catch 块,尽管你可以这样做,你的Lambda将是多行的。或者你可以将该代码移动到一个单独的方法中,并在这里将其用作方法引用

由于我们在.publishSubscribeChannel() 上没有使用任何Executor,因此所有订阅者都将在同一个线程上顺序执行;在我们的例子中,它是来自前面QueueChannel轮询器TaskScheduler 线程之一。这就是为什么这个sleep 会阻塞所有下游进程,并允许演示那个限制为10 的QueueChannel繁忙状态

22

.subscribe(sub -> sub

下一个子流程订阅者,它将在iced 饮品1秒钟的sleep 之后执行。我们在这里使用另一个子流程,因为前一个的.handle()单向的,其本质是MessageHandler 的Lambda。因此,为了继续我们整个流程的处理,我们有几个订阅者:一些子流程在完成工作后不会向父流程返回任何内容。

23

 .<OrderItem, String>transform(item ->
  	            Thread.currentThread().getName()
  	              + " prepared cold drink #"
  	              + this.coldDrinkCounter.incrementAndGet()
  	              + " for order #" + item.getOrderNumber()
  	              + ": " + item)         

当前订阅者子流程中的转换器 用于将OrderItem 转换为友好的STDOUT消息,用于下一个.handle。在这里,我们看到了泛型与Lambda表达式的使用。这是使用GenericTransformer 功能接口实现的。

24

.handle(m -> System.out.println(m.getPayload())))))

这里的.handle() 只是为了演示如何使用Lambda表达式将有效负载 打印到STDOUT。它表示我们的饮品已准备好。之后,PublishSubscribeChannel 的最终(隐式)订阅者只需将带有OrderItem 的消息发送到主流程中的.transform()

25

.subFlowMapping("false", sf -> sf

用于hot 饮品的.subFlowMapping()。实际上,它类似于前面的iced 饮品子流程,但具有特定的hot 业务逻辑。

26

s.handle(m -> sleepUninterruptibly(5, TimeUnit.SECONDS)))

hot 饮品的sleepUninterruptibly。没错,我们需要更多时间来烧水!

27

 .<OrderItem, Drink>transform(orderItem ->
  	    new Drink(orderItem.getOrderNumber(),
  	      orderItem.getDrinkType(),
  	      orderItem.isIced(),
  	      orderItem.getShots()))      

主要的OrderItemDrink 转换器,当.route() 子流程在咖啡馆厨房订阅者完成饮料准备后返回其结果时执行。

28

.aggregate(aggregator -> aggregator

.aggregate() EIP 方法提供了类似于配置AggregatingMessageHandler 及其端点的选项,就像我们在使用Spring Integration XML配置时使用<aggregator> 组件一样。当然,使用Java DSL,我们可以在原处配置聚合器,而无需任何其他额外的bean。Lambda再次派上用场!从咖啡馆业务逻辑的角度来看,我们为初始Order 组成Delivery,因为我们在流程开始附近将原始订单.split()OrderItem

29

.outputProcessor(group -> 

AggregatorSpec.outputProcessor() 允许我们在聚合器完成组后发出自定义结果。它是<aggregator> 组件中的ref/method 或POJO方法上的@Aggregator 注解的类似物。我们的目标是为所有Drink 组成一个Delivery

30

new Delivery(group.getMessages()
  	        .stream()
  	        .map(message -> (Drink) message.getPayload())
  	        .collect(Collectors.toList())))    

正如你所看到的,我们在这里使用Java 8的Stream 特性来处理Collection。我们遍历从释放的MessageGroup 中获取的消息,并将(map)每个消息转换为其Drink 有效负载Stream 的结果(.collect())(一个Drink 列表)传递给Delivery 构造函数。带有这个新的Delivery 有效负载的Message 被发送到我们咖啡馆场景中的下一个端点。

31

.correlationStrategy(m ->
  	      ((Drink) m.getPayload()).getOrderNumber()), null)

.correlationStrategy() Lambda 演示了我们如何自定义聚合器的行为。当然,我们可以在这里仅依赖Spring Integration 中内置的SequenceDetails,它在流程开始时由.split() 默认填充到每个分割的消息中,但是包含了CorrelationStrategy 的Lambda示例以供说明。(使用XML,我们可以使用correlation-expression 或自定义CorrelationStrategy)。此行中.aggregate() EIP 方法的第二个参数用于endpointConfigurer 以自定义选项,如autoStartuprequiresReplyadviceChain 等。我们在这里使用null 来表明我们依赖于端点的默认选项。许多EIP方法都提供了带或不带endpointConfigurer 的重载版本,但.aggregate() 需要一个端点参数,以避免对AggregatorSpec Lambda 参数进行显式转换。

32

.handle(CharacterStreamWritingMessageHandler.stdout());

这是我们流程的结束 - Delivery 已交付给客户!我们只是在这里使用Spring Integration Core 中现成的CharacterStreamWritingMessageHandler 将消息有效负载 打印到STDOUT。这是一种展示如何从Java DSL 中使用Spring Integration Core(及其模块)中现有的组件。

好了,我们已经完成了基于Spring Integration Java DSL 的Cafe Demo 示例的描述。将其与XML 示例 进行比较,以获取有关Spring Integration 的更多信息。

这不是DSL内容的全面教程。我们在这里不回顾endpointConfigurer 选项、Transformers 工厂、IntegrationComponentSpec 层次结构、NamespaceFactories,我们如何指定多个IntegrationFlow bean 并将其连接到单个应用程序等,请参阅参考手册 以获取更多信息。

至少这个逐行教程应该向你展示Spring Integration Java DSL 的基础知识,以及它在Spring Framework Java 和注释配置、Spring Integration 基础和Java 8 Lambda 支持之间的无缝融合!

还可以查看si4demo,了解包括Java DSL 在内的Spring Integration 的发展,如2014 年SpringOne/2GX 大会上所示。(视频很快就会发布)。

与往常一样,我们期待你的意见和反馈(StackOverflowspring-integration 标签)、Spring JIRAGitHub),我们非常欢迎贡献

附注:即使本教程完全基于Java 8 Lambda 支持,我们也不想错过Java 8 之前的用户,我们将提供类似的非Lambda 博文。敬请期待!

获取Spring 新闻通讯

与Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看Spring 社区中所有即将举行的活动。

查看全部