领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多尊敬的 Spring 社区:
我们很高兴地宣布 Spring Integration Java DSL 1.1 里程碑版本 1 现已发布。使用 Maven 或 Gradle 的 里程碑仓库 来抢先体验。
compile "org.springframework.integration:spring-integration-java-dsl:1.1.0.M1"
说实话,`1.1` 版本中计划的许多功能尚未实现,但感谢我们朋友 Josh Long 的鼓励以及最近关于 Apache Kafka 支持的公告(Spring Integration Kafka 支持 1.1 版本发布,Spring XD 1.1.1 版本发布),我们发布了这个里程碑版本 1,主要是为了展示 Java 配置 DSL 中的 Apache Kafka 支持。
我们将在本文中介绍此版本中的这些以及其他功能。
让我们从 Spring Integration Java DSL 中 `KafkaTests` 类的一些“简单”示例开始。
@Bean
public ConnectionFactory connectionFactory(EmbeddedZookeeper zookeeper) {
return new DefaultConnectionFactory(
new ZookeeperConfiguration(zookeeper.connectString()));
}
@Bean
public OffsetManager offsetManager(ConnectionFactory connectionFactory) {
MetadataStoreOffsetManager offsetManager =
new MetadataStoreOffsetManager(connectionFactory);
// start reading at the end of the
offsetManager.setReferenceTimestamp(OffsetRequest.LatestTime());
return offsetManager;
}
@Bean
public IntegrationFlow listeningFromKafkaFlow(
ConnectionFactory connectionFactory,
OffsetManager offsetManager) {
return IntegrationFlows
.from(Kafka.messageDriverChannelAdapter(connectionFactory, TEST_TOPIC)
.autoCommitOffset(false)
.payloadDecoder(String::new)
.keyDecoder(b -> Integer.valueOf(new String(b)))
.configureListenerContainer(c ->
c.offsetManager(offsetManager)
.maxFetch(100)))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults"))
.get();
}
`Kafka` 命名空间工厂中的其他自解释工厂方法是用于 `KafkaHighLevelConsumerMessageSource` 轮询适配器的 `.inboundChannelAdapter(...)` 和用于 `KafkaProducerMessageHandler` 的 `.outboundChannelAdapter(...)`。请参阅它们的 JavaDocs 以获取更多信息。
有关更多信息,请查看 Josh Long 关于 使用 Apache Kafka 进行 Spring 集成和数据处理管道 的文章!
来自社区的大量宝贵反馈(网络研讨会回放:介绍 Spring 集成的 Java DSL)围绕着 bean 方法调用组件(服务、转换器、路由器等),我们认真倾听了:组件方法选择已得到改进。以下是一个类似于 XML 配置中 `<int:service-activator input-channel="greetingChannel" ref="greetingService"/>` 的示例
@Configuration
@EnableIntegration
@ComponentScan
public class MyConfiguration {
@Autowired
private GreetingService greetingService;
@Bean
public IntegrationFlow greetingFlow() {
return IntegrationFlows.from("greetingChannel")
.handle(this.greetingService)
.get();
}
}
@Component
public class GreetingService {
public void greeting(String payload) {
System.out.println("Hello " + payload);
}
}
在这里,框架将自动选择 `greeting` 方法。还有一个使用 `methodName` 参数的替代方法,以便在存在歧义的情况下指定方法。类似的 POJO 方法调用 EIP 方法已引入许多其他 EIP 实现,例如 `transform(Object service, String methodName)`、`split(Object service)` 等。
Spring Integration Java DSL 还尊重 Spring Integration 消息传递注释,例如 `@ServiceActivator`、`@Router`、`@Filter` 等,甚至 `@Payload`、`@Header`。请参阅 `IntegrationFlowDefinition` JavaDocs 以获取更多信息。
毫不奇怪,由于 `IntegrationFlow` 是一个接口,我们可以将其直接实现作为自定义组件,它可以在 Spring Integration Java DSL 环境中按原样工作。
@Component
public class MyFlow implements IntegrationFlow {
@Override
public void configure(IntegrationFlowDefinition<?> f) {
f.<String, String>transform(String::toUpperCase);
}
}
这类似于 `@Bean` 定义,但是这种方法有助于我们的组件保持更松散的耦合。
但是,等等,还有更多!`IntegrationFlow` 实现(例如 `@Bean` 定义情况下的 lambda)仅限于 `DirectChannel` 输入通道。我们在这里更进一步,引入了 `IntegrationFlowAdapter`。这是一个我最喜欢的示例,用于演示如何使用它。
@Component
public class MyFlowAdapter extends IntegrationFlowAdapter {
private final AtomicBoolean invoked = new tomicBoolean();
public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(this, "messageSource",
e -> e.poller(p -> p.trigger(this::nextExecutionTime)))
.split(this)
.transform(this)
.aggregate(a -> a.processor(this, null), null)
.enrichHeaders(Collections.singletonMap("foo", "FOO"))
.filter(this)
.handle(this)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
public String messageSource() {
return "B,A,R";
}
@Splitter
public String[] split(String payload) {
return StringUtils.commaDelimitedListToStringArray(payload);
}
@Transformer
public String transform(String payload) {
return payload.toLowerCase();
}
@Aggregator
public String aggregate(List<String> payloads) {
return payloads.stream().collect(Collectors.joining());
}
@Filter
public boolean filter(@Header Optional<String> foo) {
return foo.isPresent();
}
@ServiceActivator
public String handle(String payload, @Header String foo) {
return payload + ":" + foo;
}
}
当然,使用 POJO 方法调用支持(见上文),不可能如此轻松地构建流。
Spring Framework 和 Spring Integration 长期以来一直支持动态语言,并且主要与 XML Spring 配置相关联。从 Java 代码处理脚本(如 Groovy、Ruby、JavaScript 等)可能看起来很奇怪,但我们发现它是在运行时重新加载功能的有用工具,并且在 Java lambda 不够动态时也很有用。让我们看看 Spring Integration Java DSL 中的 `Scripts` 命名空间工厂。
@Configuration
@EnableIntegration
public class ScriptsConfiguration {
@Value("com/my/project/integration/scripts/splitterScript.groovy")
private Resource splitterScript;
@Bean
public PollableChannel results() {
return new QueueChannel();
}
@Bean
public IntegrationFlow scriptSplitter() {
return f -> f
.split(Scripts.script(this.splitterScript)
.refreshCheckDelay(10000)
.variable("foo", "bar"))
.channel(results());
}
}
此脚本支持允许我们仅处理外部资源,这些资源可以在运行时更改和重新加载。Spring Integration Scripting 模块支持的 `inline` 脚本没有意义,因为我们有 Java 8 lambda 来处理这些情况。
Wire Tap EI 模式 在 Spring Integration 中实现为 `ChannelInterceptor`,可以像这样注入到任何 `MessageChannel` 中作为拦截器。
@Bean
public MessageChannel myChannel() {
return MessageChannels.direct()
.interceptor(new WireTap(loggerChannel()))
.get();
}
`IntegrationFlow` 定义允许我们省略 EIP 组件之间的 `MessageChannel` 声明,因此我们引入了一个内联 `.wireTap()` EIP 方法,以便为这些匿名通道允许 `WireTap` 注入。以下是一些示例。
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlows.from("tappedChannel1")
.wireTap("tapChannel",
wt -> wt.selector(m -> m.getPayload().equals("foo")))
.channel("nullChannel")
.get();
}
@Bean
public IntegrationFlow wireTapFlow2() {
return f -> f
.wireTap(sf -> sf
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("wireTapSubflowResult")))
.channel("nullChannel");
}
请参阅 `IntegrationFlowDefinition.wireTap()` 方法的 JavaDocs 以获取更多信息,并且不要错过 GitHub 项目页面上的测试用例。
1.1 版本还有很多工作要做,例如进一步简化 `.aggregate()` 等配置,能够注入外部子流,能够将 `IntegrationComponentSpec` 实现配置为单独的 `@Bean` 以简化目标流定义,更多协议特定的命名空间工厂等等。请随时通过 StackOverflow、JIRA 和 GitHub 问题与我们联系,分享您的想法和建议!
项目页面 | JIRA | 问题 | [贡献] (https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md) | StackOverflow(`spring-integration` 标签)