领先一步
VMware 提供培训和认证,助您加速进步。
了解更多我谨代表 Spring Integration 团队,非常高兴地宣布 Spring Integration 的 5.1.0.RELEASE 版本现已发布。
可以从 Maven Central、JCenter 以及我们的 发布仓库 下载。
compile "org.springframework.integration:spring-integration-core:5.1.0.RELEASE"
首先,我要感谢所有社区成员对框架持续的积极贡献!
除了常规的依赖升级、错误修复和内部性能改进之外,此版本还引入了一些值得关注的新功能。
为了严格保证消息发布的顺序,可以使用 BoundRabbitChannelAdvice 作为 MessageHandler 的建议,以便在同一个线程绑定的 Channel 中执行所有下游 AMQP 操作。这通常用于 splitter 或其他会发送多条消息的机制。
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlows.from(Gateway.class)
.split(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
如果 SourcePollingChannelAdapter 或 PollingConsumer 配置了 outputChannel 作为 FluxMessageChannel,则轮询任务将不是由调度器执行,而是由 Flux.generate() 执行,并根据 trigger.nextExecutionTime() 进行后续的 Mono.delay()。这样,实际的源轮询将按需执行,以响应 FluxMessageChannel 传播的下游背压。PollerMetadata 的选项保持不变,对最终配置透明。
fluxTransform()IntegrationFlowDefinition 现在提供了新的 fluxTransform() 操作符,该操作符接受一个 Function,用于以响应式方式转换传入消息的 Flux。 fluxTransform() 底层完全基于 Reactor 的 Flux.transform() 操作符,并且如果函数本身不产生一个回复消息,它会将请求头复制到回复消息中。该调用在内部被包装了输入和输出的 FluxMessageChannel 实例。
IntegrationFlow integrationFlow = f -> f
.split()
.<String, String>fluxTransform(flux -> flux
.map(Message::getPayload)
.map(String::toUpperCase))
.aggregate(a -> a
.outputProcessor(group -> group
.getMessages()
.stream()
.map(Message::getPayload)
.map(String.class::cast)
.collect(Collectors.joining(","))))
.channel(resultChannel);
此外,Java DSL 还提供了更多方便的操作符:nullChannel()、convert(Class<?>) 和 logAndReply()。有关更多信息,请参阅它们的 Javadoc。
java.util.function 接口现在是消息端点的第一公民。Function<?, ?> 和 Consumer<?> 可以直接从 @ServiceActivator 或 @Transformer 定义中使用,例如:
@Bean
@Transformer(inputChannel = "functionServiceChannel")
public Function<String, String> functionAsService() {
return String::toUpperCase;
}
Supplier<?> 接口可以简单地与 @InboundChannelAdapter 注解一起使用,或者作为 <int:inbound-channel-adapter> 中的 ref。
@Bean
@InboundChannelAdapter(value = "inputChannel", poller = @Poller(fixedDelay = "1000"))
public Supplier<String> pojoSupplier() {
return () -> "foo";
}
现在也可以直接在消息端点定义中使用 Kotlin lambda。
@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
return { print(it) }
}
@Bean
@InboundChannelAdapter(value = "counterChannel",
poller = [Poller(fixedRate = "10", maxMessagesPerPoll = "1")])
fun kotlinSupplier(): () -> String {
return { "baz" }
}
随着 Micrometer 升级到 1.1,框架现在会自动从 MeterRegistry 中移除已注册的 meter。这在我们开发动态 IntegrationFlow 并需要在运行时注册和移除它们时非常有用。
现在,在运行时注册的 MBeans(例如通过动态 IntegrationFlow)会在其 bean 在运行时销毁时自动从 JMX 服务器注册中移除。
HTTP (以及 WebFlux) 入站端点,如果在运行时声明(例如通过动态 IntegrationFlow),现在会在 HandlerMapping 中注册其请求映射,并在其 bean 销毁时自动将其移除。
鉴于 Spring Social 项目即将 生命周期结束,我们已将 spring-integration-twitter 模块移至 Spring Integration Extensions umbrella 下的一个独立项目,并发布了 org.springframework.integration:spring-integration-social-twitter:1.0.0.RELEASE,该版本完全基于 Spring Integration 5.1。
有关更多更改,请查阅参考手册的 What’s New 章节。另请参阅 Migration Guide,了解此版本中的破坏性更改以及如何处理它们。
此版本是 Spring Boot 2.1 GA 的基础。
接下来,我们期待将 master 分支切换到 5.2 版本,以开始开发新功能和有价值的改进!
欢迎通过合适的沟通渠道提出任何反馈、功能建议、批评、Bug 报告和疑问!