抢占先机
VMware 提供培训和认证,助您快速提升。
了解更多尊敬的 Spring 社区成员:
我们高兴地宣布 Spring Integration Java DSL 1.1 里程碑 1 现已发布。使用 里程碑仓库 并配合 Maven 或 Gradle 进行早期访问试用。
compile "org.springframework.integration:spring-integration-java-dsl:1.1.0.M1"
老实说,1.1
版本计划的许多功能尚未实现,但感谢我们的朋友 Josh Long 的鼓励以及最近宣布的 Apache Kafka 支持(Spring Integration Kafka 支持 1.1 正式发布,Spring XD 1.1.1 发布),我们发布此里程碑 1 版本主要是为了在 Java 配置 DSL 中展示对 Apache Kafka 的支持。
在本文中,我们将介绍此版本中的该功能及其他功能。
让我们从 Spring Integration Java DSL 的 KafkaTests
类中的一个“简单”示例开始
@Bean
public ConnectionFactory connectionFactory(EmbeddedZookeeper zookeeper) {
return new DefaultConnectionFactory(
new ZookeeperConfiguration(zookeeper.connectString()));
}
@Bean
public OffsetManager offsetManager(ConnectionFactory connectionFactory) {
MetadataStoreOffsetManager offsetManager =
new MetadataStoreOffsetManager(connectionFactory);
// start reading at the end of the
offsetManager.setReferenceTimestamp(OffsetRequest.LatestTime());
return offsetManager;
}
@Bean
public IntegrationFlow listeningFromKafkaFlow(
ConnectionFactory connectionFactory,
OffsetManager offsetManager) {
return IntegrationFlows
.from(Kafka.messageDriverChannelAdapter(connectionFactory, TEST_TOPIC)
.autoCommitOffset(false)
.payloadDecoder(String::new)
.keyDecoder(b -> Integer.valueOf(new String(b)))
.configureListenerContainer(c ->
c.offsetManager(offsetManager)
.maxFetch(100)))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults"))
.get();
}
EmbeddedZookeeper
是 Apache Kafka test
artifact(在本例中是 testCompile 'org.apache.kafka:kafka_2.10:0.8.1.1:test'
)的一部分,它与 kafka.utils.TestUtils
等许多其他功能一起,对于单元测试非常有用。ConnectionFactory
和 OffsetManager
的更多信息,请参阅 Spring Integration Kafka 项目。IntegrationFlow
bean 定义。Spring Integration Java DSL 提供了一个命名空间工厂 - Kafka
- 它利用 IntegrationComponentSpec
实现来支持 Spring Integration Kafka 适配器,例如用于 KafkaMessageDrivenChannelAdapter
的 KafkaMessageDrivenChannelAdapterSpec
。method-chain
的选项委托给底层的 KafkaMessageDrivenChannelAdapter
实例。.payloadDecoder(String::new)
这一行。kafka.serializer.Decoder
是一个 Scala trait
,它被编译成一个 Java 接口(不是类!),所以我们可以在这里将它表示为一个 Java 8 lambda 方法。.configureListenerContainer()
是一个 lambda 感知方法,用于分离 KafkaMessageListenerContainer
特定选项的关注点。Kafka
命名空间工厂中的其他自解释工厂方法包括用于 KafkaHighLevelConsumerMessageSource
轮询适配器的 .inboundChannelAdapter(...)
和用于 KafkaProducerMessageHandler
的 .outboundChannelAdapter(...)
。请查阅它们的 JavaDocs 以获取更多信息。
有关更多信息,请查看 Josh Long 关于使用 Apache Kafka 和 Spring 构建集成和数据处理管道的帖子!
社区提供了许多很棒的反馈(网络研讨会回放:Spring Integration Java DSL 介绍),其中很多是关于 bean 方法调用组件(服务、转换器、路由器等)。我们清楚地听到了你们的意见:组件方法选择已得到改进。下面是一个示例,它类似于 XML 配置中的 <int:service-activator input-channel="greetingChannel" ref="greetingService"/>
@Configuration
@EnableIntegration
@ComponentScan
public class MyConfiguration {
@Autowired
private GreetingService greetingService;
@Bean
public IntegrationFlow greetingFlow() {
return IntegrationFlows.from("greetingChannel")
.handle(this.greetingService)
.get();
}
}
@Component
public class GreetingService {
public void greeting(String payload) {
System.out.println("Hello " + payload);
}
}
在这里,greeting
方法将由框架自动选择。另一种方法是使用 methodName
参数,以便在存在歧义的情况下指定方法。许多其他 EIP 实现也引入了类似的 POJO 方法调用 EIP 方法,例如 transform(Object service, String methodName)
、split(Object service)
等。
Spring Integration Java DSL 也遵循 Spring Integration 消息注解,例如 @ServiceActivator
、@Router
、@Filter
等,甚至包括 @Payload
、@Header
。请查阅 IntegrationFlowDefinition
的 JavaDocs 以获取更多信息。
毫不奇怪,由于 IntegrationFlow
是一个接口,我们可以直接提供其实现作为自定义组件,并且它在 Spring Integration Java DSL 环境中可以按原样工作
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
这类似于 @Bean
定义,但这种方法有助于我们的组件保持更松散的耦合。
但是,等等,还有更多!IntegrationFlow
实现(例如 @Bean
定义中的 lambda)仅限于 DirectChannel
输入通道。我们在这里更进一步,引入了 IntegrationFlowAdapter
。这是我最喜欢的一个示例,演示了它的用法
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new tomicBoolean();
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this, "messageSource",
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("foo", "FOO"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "B,A,R";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> foo) {
return foo.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String foo) {
return payload + ":" + foo;
}
}
当然,有了 POJO 方法调用支持(参见上文),可以轻松构建流程。
Spring Framework 和 Spring Integration 长期以来一直支持动态语言,并且主要与 XML Spring 配置关联。从 Java 代码处理脚本(如 Groovy、Ruby、JavaScript 等)可能看起来很奇怪,但我们发现它是一个在运行时重新加载功能的有用工具,尤其是在 Java lambda 不够动态的情况下。让我们看看 Spring Integration Java DSL 中的 Scripts
命名空间工厂
@Configuration
@EnableIntegration
public class ScriptsConfiguration {
@Value("com/my/project/integration/scripts/splitterScript.groovy")
private Resource splitterScript;
@Bean
public PollableChannel results() {
return new QueueChannel();
}
@Bean
public IntegrationFlow scriptSplitter() {
return f -> f
.split(Scripts.script(this.splitterScript)
.refreshCheckDelay(10000)
.variable("foo", "bar"))
.channel(results());
}
}
这种脚本支持使我们能够仅处理外部资源,这些资源可以在运行时更改和重新加载。Spring Integration Scripting 模块支持的 inline
脚本没有意义,因为对于这种情况我们有 Java 8 lambda。
Wire Tap EI 模式在 Spring Integration 中作为 ChannelInterceptor
实现,可以像这样注入到任何 MessageChannel
中作为拦截器
@Bean
public MessageChannel myChannel() {
return MessageChannels.direct()
.interceptor(new WireTap(loggerChannel()))
.get();
}
IntegrationFlow
定义允许我们省略 EIP 组件之间的 MessageChannel
声明,因此我们引入了内联的 .wireTap()
EIP 方法,以便为这些匿名通道注入 WireTap
。以下是一些示例
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlows.from("tappedChannel1")
.wireTap("tapChannel",
wt -> wt.selector(m -> m.getPayload().equals("foo")))
.channel("nullChannel")
.get();
}
@Bean
public IntegrationFlow wireTapFlow2() {
return f -> f
.wireTap(sf -> sf
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("wireTapSubflowResult")))
.channel("nullChannel");
}
请参阅 IntegrationFlowDefinition.wireTap()
方法的 JavaDocs 以获取更多信息,并且不要错过我们在 GitHub 项目页面上的测试用例。
1.1 版本还有很多工作要做,例如进一步简化 .aggregate()
等配置,注入外部子流的能力,将 IntegrationComponentSpec
实现配置为单独的 @Bean
以简化目标流定义的能力,更多特定协议的命名空间工厂等等。请随时通过 StackOverflow、JIRA 和 GitHub issues 联系我们,分享您的想法和建议!
项目页面 | JIRA | 问题 | [贡献] (https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md) | StackOverflow (spring-integration
标签)