Bootiful Spring Boot 3.4:Spring Integration

工程 | Josh Long | 2024 年 11 月 24 日 | ...

Spring Integration 6.4 是你处理所有企业应用集成事务的一站式解决方案。它支持众多消息传递和集成模式,以及更多的技术适配器——SFTP、FTP、Redis、Apache Pulsar、Apache Kafka、JDBC、TCP/IP 等。所以,你可能已经猜到了,根本不可能全部跟上。 发布说明做得相当好,所以我将列出我最喜欢的一些。

  • 远程文件系统入站适配器现在使用 clearFetchedCache() 方法从缓存中删除未处理远程文件的引用。
  • Spring Integration 分布式锁机制有一个方法 - LockRepository#delete - 该方法现在返回移除分布式锁所有权的结果。
  • 同样,基于 Redis 的分布式锁实现 - RedisLockRegistry - 如果锁的所有权已过期,则会抛出 ConcurrentModificationException
  • 现在有一个方便的 Consumer<SshClient>,允许进一步定制内部 SshClient
  • 出站 ZeroMQ 现在可以绑定到 TCP 端口,而不是连接到 URL。
  • 现在可以使用相应的 ClientManager 通过 IntegrationFlowContext 在运行时添加多个 MqttPahoMessageDrivenChannelAdapterMqttv5PahoMessageDrivenChannelAdapter 实例。
  • Python 的脚本支持现在基于 GraalVM Truffle Polyglot 实现。
  • AbstractMailReceiver 公开了一个禁用设置 Flags.Flag.FLAGGED 的选项。
  • MessageBuilder 中提取了一个新的 BaseMessageBuilder,用于简化构建自定义构建器实现,其中大部分逻辑应与 `MessageBuilder` 相同。
  • 新增了一个 ControlBusFactoryBean,可以简化新 ControlBus 的启动。此外,还有一个新的 Control Bus HTTP 控制器。

我认为改进的控制总线支持非常值得一看!控制总线模式背后的理念是,应该能够使用与发送消息到系统相同的带内消息传递基础设施来控制系统。使用 Spring Integration 控制 Spring Integration(以及您的 Spring 应用程序)。Spring 有一个机制,可以自动检测并公开用 @ManagedResource 标记的 Spring bean 作为 JMX bean 进行操作。Spring Integration 中的 ControlBus 机制会获取这些相同的 bean,并将它们导出以便通过进出 Spring Integration 的消息进行操作。

package com.example.bootiful_34.integration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.DirectChannelSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.config.EnableControlBusController;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.MessageChannel;

import java.util.concurrent.atomic.AtomicInteger;

@Configuration
@EnableControlBusController
class ControlBusConfiguration {

	@Bean
	IntegrationFlow controlBusFlow(MessageChannel controlBusFlowMessageChannel) {
		return IntegrationFlow.from(controlBusFlowMessageChannel).controlBusOnRegistry().get();
	}

	@Bean
	DirectChannelSpec controlBusFlowMessageChannel() {
		return MessageChannels.direct();
	}

	@Bean
	MyOperationsManagedResource myOperationsManagedResource() {
		return new MyOperationsManagedResource();
	}

	@ManagedResource
	public static class MyOperationsManagedResource {

		static final AtomicInteger COUNTER = new AtomicInteger(0);

		@ManagedOperation(description = "Update the magic number")
		public void updateMagicNumber(int magicNumber) {
			System.out.println("doSomething with magic number " + magicNumber);
			COUNTER.incrementAndGet();
		}

	}

}

在这个示例中,一个 bean 操作了一些重要的状态——配置一个魔术数字。好吧,好吧,我知道这并不是最诱人的示例。但请耐心点。总之,它提供了一个方法——updateMagicNumber(int)——我们期望可以通过 JMX 和 Spring Integration 的 ControlBus 访问。我正常注册 bean,并确保也用 @ManagedResource 标记类。

我定义了一个类型为 DirectChannel 的 Spring MessageChannel,然后是一个 Spring Integration IntegrationFlow,它通过 DirectChannel 将所有消息路由到 Spring 应用程序中的控制总线。然后,控制总线将适配消息并将其路由到适当的托管资源。让我们看看要调用该托管资源及其需要参数的方法,消息需要是什么样的。

package com.example.bootiful_34.integration;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;

@SpringBootTest
class ControlBusConfigurationTest {

	@Test
	void controlBusConfigurationRunner(@Autowired MessageChannel controlBusFlowMessageChannel) {
		var msg = MessageBuilder.withPayload("myOperationsManagedResource.updateMagicNumber")
			.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.CONTROL_BUS_ARGUMENTS, List.of(42))
			.build();
		assertEquals(ControlBusConfiguration.MyOperationsManagedResource.COUNTER.get(), 0,
				"there should be zero invocations of the control bus thus far");
		controlBusFlowMessageChannel.send(msg);
		assertEquals(ControlBusConfiguration.MyOperationsManagedResource.COUNTER.get(), 1,
				"there should be one invocation of the control bus thus far");
	}

}

足够直接,不是吗?在这个测试中,我们注入 DirectChannel 并构建一个新的 Message,其 payload 是托管资源 bean 的名称,以及托管资源上的方法,以及一些要传递到该托管资源 bean 方法的参数的数据。测试的其余部分只是确保消息已成功送达,并且计数器已增加以反映调用。

ControlBus 模式非常强大!记住:Spring MessageChannel 实例是您进入任何其他系统的网关,无论是通过 Spring Cloud Stream、Spring Framework 4 的 WebSocket 支持,还是 Spring Integration 的入站适配器。此方法调用也可以同样快速地来自 FTP 服务器上的文件、Apache Kafka 队列上的新消息,或者特定部分的 TCP payload。我们可以使用 Spring Security 来保护入站消息通道,拒绝无法验证的消息。这比直接使用 JMX 要容易得多。

如果这还不够,新的 @EnableControlBusController 注释会公开 ControlBus 的功能!

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您加速进步。

了解更多

获得支持

Tanzu Spring 提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件,只需一份简单的订阅。

了解更多

即将举行的活动

查看 Spring 社区所有即将举行的活动。

查看所有