Spring integration Java DSL 1.1 M1 已发布

发布 | Artem Bilan | 2015年4月15日 | ...

尊敬的 Spring 社区成员:

我们高兴地宣布 Spring Integration Java DSL 1.1 里程碑 1 现已发布。使用 里程碑仓库 并配合 Maven 或 Gradle 进行早期访问试用。

compile "org.springframework.integration:spring-integration-java-dsl:1.1.0.M1"

老实说,1.1 版本计划的许多功能尚未实现,但感谢我们的朋友 Josh Long鼓励以及最近宣布的 Apache Kafka 支持(Spring Integration Kafka 支持 1.1 正式发布Spring XD 1.1.1 发布),我们发布此里程碑 1 版本主要是为了在 Java 配置 DSL 中展示对 Apache Kafka 的支持。

在本文中,我们将介绍此版本中的该功能及其他功能。

Apache Kafka 支持

让我们从 Spring Integration Java DSL 的 KafkaTests 类中的一个“简单”示例开始

@Bean
public ConnectionFactory connectionFactory(EmbeddedZookeeper zookeeper) {
        return new DefaultConnectionFactory(
                new ZookeeperConfiguration(zookeeper.connectString()));
}

@Bean
public OffsetManager offsetManager(ConnectionFactory connectionFactory) {
        MetadataStoreOffsetManager offsetManager =
                           new MetadataStoreOffsetManager(connectionFactory);
        // start reading at the end of the
       offsetManager.setReferenceTimestamp(OffsetRequest.LatestTime());
       return offsetManager;
}

@Bean
public IntegrationFlow listeningFromKafkaFlow(
                    ConnectionFactory connectionFactory,
                    OffsetManager offsetManager) {
     return IntegrationFlows
         .from(Kafka.messageDriverChannelAdapter(connectionFactory, TEST_TOPIC)
                  .autoCommitOffset(false)
		  .payloadDecoder(String::new)
		  .keyDecoder(b -> Integer.valueOf(new String(b)))
		  .configureListenerContainer(c ->
				c.offsetManager(offsetManager)
						.maxFetch(100)))
         .<String, String>transform(String::toUpperCase)
         .channel(c -> c.queue("listeningFromKafkaResults"))
         .get();
}
  • EmbeddedZookeeper 是 Apache Kafka test artifact(在本例中是 testCompile 'org.apache.kafka:kafka_2.10:0.8.1.1:test')的一部分,它与 kafka.utils.TestUtils 等许多其他功能一起,对于单元测试非常有用。
  • 有关 ConnectionFactoryOffsetManager 的更多信息,请参阅 Spring Integration Kafka 项目。
  • 上述配置中最重要的部分是 IntegrationFlow bean 定义。Spring Integration Java DSL 提供了一个命名空间工厂 - Kafka - 它利用 IntegrationComponentSpec 实现来支持 Spring Integration Kafka 适配器,例如用于 KafkaMessageDrivenChannelAdapterKafkaMessageDrivenChannelAdapterSpec
  • 这是一个构建器模式的例子,该规范只是将来自 method-chain 的选项委托给底层的 KafkaMessageDrivenChannelAdapter 实例。
  • 对于那些不熟悉 Scala(Apache Kafka 的编写语言)的人,比如我自己,请注意 .payloadDecoder(String::new) 这一行。kafka.serializer.Decoder 是一个 Scala trait,它被编译成一个 Java 接口(不是类!),所以我们可以在这里将它表示为一个 Java 8 lambda 方法。
  • .configureListenerContainer() 是一个 lambda 感知方法,用于分离 KafkaMessageListenerContainer 特定选项的关注点。

Kafka 命名空间工厂中的其他自解释工厂方法包括用于 KafkaHighLevelConsumerMessageSource 轮询适配器的 .inboundChannelAdapter(...) 和用于 KafkaProducerMessageHandler.outboundChannelAdapter(...)。请查阅它们的 JavaDocs 以获取更多信息。

有关更多信息,请查看 Josh Long 关于使用 Apache Kafka 和 Spring 构建集成和数据处理管道的帖子!

POJO 方法调用

社区提供了许多很棒的反馈(网络研讨会回放:Spring Integration Java DSL 介绍),其中很多是关于 bean 方法调用组件(服务、转换器、路由器等)。我们清楚地听到了你们的意见:组件方法选择已得到改进。下面是一个示例,它类似于 XML 配置中的 <int:service-activator input-channel="greetingChannel" ref="greetingService"/>


@Configuration
@EnableIntegration
@ComponentScan
public class MyConfiguration {

	@Autowired
	private GreetingService greetingService;

	@Bean
	public IntegrationFlow greetingFlow() {
		return IntegrationFlows.from("greetingChannel")
				.handle(this.greetingService)
				.get();
	}

}

@Component
public class GreetingService {

   public void greeting(String payload) {
        System.out.println("Hello " + payload);
   }
}

在这里,greeting 方法将由框架自动选择。另一种方法是使用 methodName 参数,以便在存在歧义的情况下指定方法。许多其他 EIP 实现也引入了类似的 POJO 方法调用 EIP 方法,例如 transform(Object service, String methodName)split(Object service) 等。

Spring Integration Java DSL 也遵循 Spring Integration 消息注解,例如 @ServiceActivator@Router@Filter 等,甚至包括 @Payload@Header。请查阅 IntegrationFlowDefinition 的 JavaDocs 以获取更多信息。

IntegrationFlowAdapter

毫不奇怪,由于 IntegrationFlow 是一个接口,我们可以直接提供其实现作为自定义组件,并且它在 Spring Integration Java DSL 环境中可以按原样工作

@Component
public class MyFlow implements IntegrationFlow {

	@Override
	public void configure(IntegrationFlowDefinition<?> f) {
		f.<String, String>transform(String::toUpperCase);
	}

}

这类似于 @Bean 定义,但这种方法有助于我们的组件保持更松散的耦合。

但是,等等,还有更多!IntegrationFlow 实现(例如 @Bean 定义中的 lambda)仅限于 DirectChannel 输入通道。我们在这里更进一步,引入了 IntegrationFlowAdapter。这是我最喜欢的一个示例,演示了它的用法

@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {

        private final AtomicBoolean invoked = new tomicBoolean();

        public Date nextExecutionTime(TriggerContext triggerContext) {
              return this.invoked.getAndSet(true) ? null : new Date();
       }

       @Override
       protected IntegrationFlowDefinition<?> buildFlow() {
          return from(this, "messageSource",
                        e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
                   .split(this)
		   .transform(this)
		   .aggregate(a -> a.processor(this, null), null)
		   .enrichHeaders(Collections.singletonMap("foo", "FOO"))
		   .filter(this)
		   .handle(this)
		   .channel(c -> c.queue("myFlowAdapterOutput"));
      }

      public String messageSource() {
	       return "B,A,R";
      }

      @Splitter
      public String[] split(String payload) {
           return StringUtils.commaDelimitedListToStringArray(payload);
      }

      @Transformer
      public String transform(String payload) {
           return payload.toLowerCase();
      }

      @Aggregator
      public String aggregate(List<String> payloads) {
             return payloads.stream().collect(Collectors.joining());
      }

      @Filter
      public boolean filter(@Header Optional<String> foo) {
              return foo.isPresent();
      }

      @ServiceActivator
      public String handle(String payload, @Header String foo) {
             return payload + ":" + foo;
      }

}

当然,有了 POJO 方法调用支持(参见上文),可以轻松构建流程。

动态语言(脚本)支持

Spring FrameworkSpring Integration 长期以来一直支持动态语言,并且主要与 XML Spring 配置关联。从 Java 代码处理脚本(如 Groovy、Ruby、JavaScript 等)可能看起来很奇怪,但我们发现它是一个在运行时重新加载功能的有用工具,尤其是在 Java lambda 不够动态的情况下。让我们看看 Spring Integration Java DSL 中的 Scripts 命名空间工厂

@Configuration
@EnableIntegration
public class ScriptsConfiguration {

	@Value("com/my/project/integration/scripts/splitterScript.groovy")
	private Resource splitterScript;

	@Bean
	public PollableChannel results() {
		return new QueueChannel();
	}

	@Bean
	public IntegrationFlow scriptSplitter() {
		return f -> f
        	             .split(Scripts.script(this.splitterScript)
                                              .refreshCheckDelay(10000)
                                              .variable("foo", "bar"))
                             .channel(results());
	}

}

这种脚本支持使我们能够仅处理外部资源,这些资源可以在运行时更改和重新加载。Spring Integration Scripting 模块支持的 inline 脚本没有意义,因为对于这种情况我们有 Java 8 lambda。

内联 WireTap

Wire Tap EI 模式Spring Integration 中作为 ChannelInterceptor 实现,可以像这样注入到任何 MessageChannel 中作为拦截器

@Bean
public MessageChannel myChannel() {
     return MessageChannels.direct()
                .interceptor(new WireTap(loggerChannel()))
                .get();
}

IntegrationFlow 定义允许我们省略 EIP 组件之间的 MessageChannel 声明,因此我们引入了内联的 .wireTap() EIP 方法,以便为这些匿名通道注入 WireTap。以下是一些示例

@Bean
public IntegrationFlow wireTapFlow1() {
	return IntegrationFlows.from("tappedChannel1")
		.wireTap("tapChannel",
                         wt -> wt.selector(m -> m.getPayload().equals("foo")))
		.channel("nullChannel")
		.get();
}

@Bean
public IntegrationFlow wireTapFlow2() {
	return f -> f
		.wireTap(sf -> sf
			.<String, String>transform(String::toUpperCase)
			.channel(c -> c.queue("wireTapSubflowResult")))
		.channel("nullChannel");
}

请参阅 IntegrationFlowDefinition.wireTap() 方法的 JavaDocs 以获取更多信息,并且不要错过我们在 GitHub 项目页面上的测试用例。

总结

1.1 版本还有很多工作要做,例如进一步简化 .aggregate() 等配置,注入外部子流的能力,将 IntegrationComponentSpec 实现配置为单独的 @Bean 以简化目标流定义的能力,更多特定协议的命名空间工厂等等。请随时通过 StackOverflow、JIRA 和 GitHub issues 联系我们,分享您的想法和建议!

项目页面 | JIRA | 问题 | [贡献] (https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md) | StackOverflow (spring-integration 标签)

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

抢占先机

VMware 提供培训和认证,助您快速提升。

了解更多

获取支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一次简单订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看全部