Spring 集成 Java DSL 1.1 M1 已发布

版本发布 | Artem Bilan | 2015年4月15日 | ...

尊敬的 Spring 社区:

我们很高兴地宣布 Spring Integration Java DSL 1.1 里程碑版本 1 现已发布。使用 Maven 或 Gradle 的 里程碑仓库 来抢先体验。

compile "org.springframework.integration:spring-integration-java-dsl:1.1.0.M1"

说实话,`1.1` 版本中计划的许多功能尚未实现,但感谢我们朋友 Josh Long鼓励以及最近关于 Apache Kafka 支持的公告(Spring Integration Kafka 支持 1.1 版本发布Spring XD 1.1.1 版本发布),我们发布了这个里程碑版本 1,主要是为了展示 Java 配置 DSL 中的 Apache Kafka 支持。

我们将在本文中介绍此版本中的这些以及其他功能。

Apache Kafka 支持

让我们从 Spring Integration Java DSL 中 `KafkaTests` 类的一些“简单”示例开始。

@Bean
public ConnectionFactory connectionFactory(EmbeddedZookeeper zookeeper) {
        return new DefaultConnectionFactory(
                new ZookeeperConfiguration(zookeeper.connectString()));
}

@Bean
public OffsetManager offsetManager(ConnectionFactory connectionFactory) {
        MetadataStoreOffsetManager offsetManager =
                           new MetadataStoreOffsetManager(connectionFactory);
        // start reading at the end of the
       offsetManager.setReferenceTimestamp(OffsetRequest.LatestTime());
       return offsetManager;
}

@Bean
public IntegrationFlow listeningFromKafkaFlow(
                    ConnectionFactory connectionFactory,
                    OffsetManager offsetManager) {
     return IntegrationFlows
         .from(Kafka.messageDriverChannelAdapter(connectionFactory, TEST_TOPIC)
                  .autoCommitOffset(false)
		  .payloadDecoder(String::new)
		  .keyDecoder(b -> Integer.valueOf(new String(b)))
		  .configureListenerContainer(c ->
				c.offsetManager(offsetManager)
						.maxFetch(100)))
         .<String, String>transform(String::toUpperCase)
         .channel(c -> c.queue("listeningFromKafkaResults"))
         .get();
}
  • `EmbeddedZookeeper` 是 Apache Kafka `test` artifact (`testCompile 'org.apache.kafka:kafka_2.10:0.8.1.1:test'` 在我们的例子中)的一部分,并且与许多其他功能(如 `kafka.utils.TestUtils`)一起,它对单元测试非常有用。
  • 请参考 Spring Integration Kafka 项目以获取有关 `ConnectionFactory` 和 `OffsetManager` 的更多信息。
  • 上面配置中最重要的一部分是 `IntegrationFlow` bean 定义。Spring Integration Java DSL 提供了一个命名空间工厂 - `Kafka` - 它利用 `IntegrationComponentSpec` 实现来实现 Spring Integration Kafka 适配器,例如 `KafkaMessageDrivenChannelAdapterSpec` 用于 `KafkaMessageDrivenChannelAdapter`。
  • 作为 *构建器模式* 的一个示例,该规范只是将选项从 `method-chain` 代理到底层的 `KafkaMessageDrivenChannelAdapter` 实例。
  • 对于那些像我一样不熟悉 Scala(Apache Kafka 使用的语言)的人,请注意 `.payloadDecoder(String::new)` 行。`kafka.serializer.Decoder` 是一个 Scala `trait`,它被编译成一个 Java 接口(不是类!),因此我们可以在这里将其表示为 Java 8 lambda 方法。
  • `.configureListenerContainer()` 是一个 lambda 感知方法,用于分离 `KafkaMessageListenerContainer` 特定选项的关注点。

`Kafka` 命名空间工厂中的其他自解释工厂方法是用于 `KafkaHighLevelConsumerMessageSource` 轮询适配器的 `.inboundChannelAdapter(...)` 和用于 `KafkaProducerMessageHandler` 的 `.outboundChannelAdapter(...)`。请参阅它们的 JavaDocs 以获取更多信息。

有关更多信息,请查看 Josh Long 关于 使用 Apache Kafka 进行 Spring 集成和数据处理管道 的文章!

POJO 方法调用

来自社区的大量宝贵反馈(网络研讨会回放:介绍 Spring 集成的 Java DSL)围绕着 bean 方法调用组件(服务、转换器、路由器等),我们认真倾听了:组件方法选择已得到改进。以下是一个类似于 XML 配置中 `<int:service-activator input-channel="greetingChannel" ref="greetingService"/>` 的示例


@Configuration
@EnableIntegration
@ComponentScan
public class MyConfiguration {

	@Autowired
	private GreetingService greetingService;

	@Bean
	public IntegrationFlow greetingFlow() {
		return IntegrationFlows.from("greetingChannel")
				.handle(this.greetingService)
				.get();
	}

}

@Component
public class GreetingService {

   public void greeting(String payload) {
        System.out.println("Hello " + payload);
   }
}

在这里,框架将自动选择 `greeting` 方法。还有一个使用 `methodName` 参数的替代方法,以便在存在歧义的情况下指定方法。类似的 POJO 方法调用 EIP 方法已引入许多其他 EIP 实现,例如 `transform(Object service, String methodName)`、`split(Object service)` 等。

Spring Integration Java DSL 还尊重 Spring Integration 消息传递注释,例如 `@ServiceActivator`、`@Router`、`@Filter` 等,甚至 `@Payload`、`@Header`。请参阅 `IntegrationFlowDefinition` JavaDocs 以获取更多信息。

IntegrationFlowAdapter

毫不奇怪,由于 `IntegrationFlow` 是一个接口,我们可以将其直接实现作为自定义组件,它可以在 Spring Integration Java DSL 环境中按原样工作。

@Component
public class MyFlow implements IntegrationFlow {

	@Override
	public void configure(IntegrationFlowDefinition<?> f) {
		f.<String, String>transform(String::toUpperCase);
	}

}

这类似于 `@Bean` 定义,但是这种方法有助于我们的组件保持更松散的耦合。

但是,等等,还有更多!`IntegrationFlow` 实现(例如 `@Bean` 定义情况下的 lambda)仅限于 `DirectChannel` 输入通道。我们在这里更进一步,引入了 `IntegrationFlowAdapter`。这是一个我最喜欢的示例,用于演示如何使用它。

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

        private final AtomicBoolean invoked = new tomicBoolean();

        public Date nextExecutionTime(TriggerContext triggerContext) {
              return this.invoked.getAndSet(true) ? null : new Date();
       }

       @Override
       protected IntegrationFlowDefinition<?> buildFlow() {
          return from(this, "messageSource",
                        e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                   .split(this)
		   .transform(this)
		   .aggregate(a -> a.processor(this, null), null)
		   .enrichHeaders(Collections.singletonMap("foo", "FOO"))
		   .filter(this)
		   .handle(this)
		   .channel(c -> c.queue("myFlowAdapterOutput"));
      }

      public String messageSource() {
	       return "B,A,R";
      }

      @Splitter
      public String[] split(String payload) {
           return StringUtils.commaDelimitedListToStringArray(payload);
      }

      @Transformer
      public String transform(String payload) {
           return payload.toLowerCase();
      }

      @Aggregator
      public String aggregate(List<String> payloads) {
             return payloads.stream().collect(Collectors.joining());
      }

      @Filter
      public boolean filter(@Header Optional<String> foo) {
              return foo.isPresent();
      }

      @ServiceActivator
      public String handle(String payload, @Header String foo) {
             return payload + ":" + foo;
      }

}

当然,使用 POJO 方法调用支持(见上文),不可能如此轻松地构建流。

动态语言(脚本)支持

Spring FrameworkSpring Integration 长期以来一直支持动态语言,并且主要与 XML Spring 配置相关联。从 Java 代码处理脚本(如 Groovy、Ruby、JavaScript 等)可能看起来很奇怪,但我们发现它是在运行时重新加载功能的有用工具,并且在 Java lambda 不够动态时也很有用。让我们看看 Spring Integration Java DSL 中的 `Scripts` 命名空间工厂。

@Configuration
@EnableIntegration
public class ScriptsConfiguration {

	@Value("com/my/project/integration/scripts/splitterScript.groovy")
	private Resource splitterScript;

	@Bean
	public PollableChannel results() {
		return new QueueChannel();
	}

	@Bean
	public IntegrationFlow scriptSplitter() {
		return f -> f
        	             .split(Scripts.script(this.splitterScript)
                                              .refreshCheckDelay(10000)
                                              .variable("foo", "bar"))
                             .channel(results());
	}

}

此脚本支持允许我们仅处理外部资源,这些资源可以在运行时更改和重新加载。Spring Integration Scripting 模块支持的 `inline` 脚本没有意义,因为我们有 Java 8 lambda 来处理这些情况。

内联 WireTap

Wire Tap EI 模式Spring Integration 中实现为 `ChannelInterceptor`,可以像这样注入到任何 `MessageChannel` 中作为拦截器。

@Bean
public MessageChannel myChannel() {
     return MessageChannels.direct()
                .interceptor(new WireTap(loggerChannel()))
                .get();
}

`IntegrationFlow` 定义允许我们省略 EIP 组件之间的 `MessageChannel` 声明,因此我们引入了一个内联 `.wireTap()` EIP 方法,以便为这些匿名通道允许 `WireTap` 注入。以下是一些示例。

@Bean
public IntegrationFlow wireTapFlow1() {
	return IntegrationFlows.from("tappedChannel1")
		.wireTap("tapChannel",
                         wt -> wt.selector(m -> m.getPayload().equals("foo")))
		.channel("nullChannel")
		.get();
}

@Bean
public IntegrationFlow wireTapFlow2() {
	return f -> f
		.wireTap(sf -> sf
			.<String, String>transform(String::toUpperCase)
			.channel(c -> c.queue("wireTapSubflowResult")))
		.channel("nullChannel");
}

请参阅 `IntegrationFlowDefinition.wireTap()` 方法的 JavaDocs 以获取更多信息,并且不要错过 GitHub 项目页面上的测试用例。

总结

1.1 版本还有很多工作要做,例如进一步简化 `.aggregate()` 等配置,能够注入外部子流,能够将 `IntegrationComponentSpec` 实现配置为单独的 `@Bean` 以简化目标流定义,更多协议特定的命名空间工厂等等。请随时通过 StackOverflow、JIRA 和 GitHub 问题与我们联系,分享您的想法和建议!

项目页面 | JIRA | 问题 | [贡献] (https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md) | StackOverflow(`spring-integration` 标签)

获取 Spring 新闻通讯

保持与 Spring 新闻通讯的联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring在一个简单的订阅中提供对OpenJDK™、Spring和Apache Tomcat®的支持和二进制文件。

了解更多

即将举行的活动

查看Spring社区中所有即将举行的活动。

查看全部