Bootiful Spring Boot 3.4:Spring Integration

工程 | Josh Long | 2024 年 11 月 24 日 | ...

Spring Integration 6.4 是您处理所有企业应用程序集成问题的一站式商店。 它支持众多的消息传递和集成模式,甚至更多用于各种技术的适配器——SFTP、FTP、Redis、Apache Pulsar、Apache Kafka、JDBC、TCP/IP 等。 因此,正如您可能已经猜到的那样,根本无法跟上它们。发行说明做得非常好,因此我将列出我的一些最爱。

  • 远程文件系统入站适配器现在使用 clearFetchedCache() 方法从缓存中删除未处理的远程文件的引用。
  • Spring Integration 分布式锁机制有一个方法 - LockRepository#delete - 现在返回删除分布式锁所有权的结果。
  • 类似地,分布式锁的 Redis 支持实现 - RedisLockRegistry - 如果锁的所有权已过期,则抛出 ConcurrentModificationException
  • 现在有一个方便的 Consumer<SshClient> 允许进一步自定义内部 SshClient
  • 出站 ZeroMQ 现在可以绑定到 TCP 端口,而不是连接到 URL。
  • 现在可以使用相应的 ClientManager 通过 IntegrationFlowContext 在运行时添加 MqttPahoMessageDrivenChannelAdapterMqttv5PahoMessageDrivenChannelAdapter 的多个实例。
  • 对 Python 的脚本支持现在建立在 GraalVM Truffle Polyglot 实现之上
  • AbstractMailReceiver 公开了一个选项来禁用设置 Flags.Flag.FLAGGED
  • MessageBuilder 中提取了一个新的 BaseMessageBuilder,以简化构建自定义构建器实现,其中大多数逻辑应与 `MessageBuilder 的逻辑相同。
  • 有一个新的 ControlBusFactoryBean 可以简化新建 ControlBus 的过程。 此外,还有一个新的 Control Bus HTTP 控制器。

我认为改进的控制总线支持非常值得关注! 控制总线模式背后的想法是,应该可以使用与用于向系统发送消息相同的带内消息传递基础设施来控制系统。 使用 Spring Integration 控制 Spring Integration(和您的 Spring 应用程序)。 Spring 有一种机制可以自动检测并公开使用 @ManagedResource 注释的 Spring bean 作为 JMX bean 进行操作。 Spring Integration 中的 ControlBus 机制采用相同的 bean 并导出它们,以便通过传入和传出 Spring Integration 的消息进行操作。

package com.example.bootiful_34.integration;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.DirectChannelSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.http.config.EnableControlBusController;
import org.springframework.jmx.export.annotation.ManagedOperation;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.messaging.MessageChannel;

import java.util.concurrent.atomic.AtomicInteger;

@Configuration
@EnableControlBusController
class ControlBusConfiguration {

	@Bean
	IntegrationFlow controlBusFlow(MessageChannel controlBusFlowMessageChannel) {
		return IntegrationFlow.from(controlBusFlowMessageChannel).controlBusOnRegistry().get();
	}

	@Bean
	DirectChannelSpec controlBusFlowMessageChannel() {
		return MessageChannels.direct();
	}

	@Bean
	MyOperationsManagedResource myOperationsManagedResource() {
		return new MyOperationsManagedResource();
	}

	@ManagedResource
	public static class MyOperationsManagedResource {

		static final AtomicInteger COUNTER = new AtomicInteger(0);

		@ManagedOperation(description = "Update the magic number")
		public void updateMagicNumber(int magicNumber) {
			System.out.println("doSomething with magic number " + magicNumber);
			COUNTER.incrementAndGet();
		}

	}

}

在此示例中,一个 bean 操作一些重要的状态——配置一些神奇的数字。 好的,好的,我明白为什么这不是最诱人的示例。 但请耐心等待。 无论如何,它提供了一个方法——updateMagicNumber(int)——我们希望可以通过 JMX 和 Spring Integration 的 ControlBus 访问它。 我正常注册 bean,确保也使用 @ManagedResource 注释该类。

我定义了一个类型为 DirectChannel 的 Spring MessageChannel,然后定义了一个 Spring Integration IntegrationFlow,它通过 DirectChannel 将所有消息路由到 Spring 应用程序中的控制总线。 然后,控制总线将调整消息并将其路由到适当的托管资源。 让我们看看消息需要是什么样子才能调用该托管资源及其期望参数的方法。

package com.example.bootiful_34.integration;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.integration.IntegrationMessageHeaderAccessor;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;

@SpringBootTest
class ControlBusConfigurationTest {

	@Test
	void controlBusConfigurationRunner(@Autowired MessageChannel controlBusFlowMessageChannel) {
		var msg = MessageBuilder.withPayload("myOperationsManagedResource.updateMagicNumber")
			.setHeaderIfAbsent(IntegrationMessageHeaderAccessor.CONTROL_BUS_ARGUMENTS, List.of(42))
			.build();
		assertEquals(ControlBusConfiguration.MyOperationsManagedResource.COUNTER.get(), 0,
				"there should be zero invocations of the control bus thus far");
		controlBusFlowMessageChannel.send(msg);
		assertEquals(ControlBusConfiguration.MyOperationsManagedResource.COUNTER.get(), 1,
				"there should be one invocation of the control bus thus far");
	}

}

确实足够简单了? 在此测试中,我们注入 DirectChannel 并创建一个新的 Message,其有效负载是托管资源 bean 的名称以及该托管资源上的方法,以及一些数据传递到该托管资源 bean 的方法的参数中。 测试的其余部分只是为了确保消息已成功传递,并且计数器已递增以反映调用。

ControlBus 模式非常强大! 请记住:Spring MessageChannel 实例是您连接到任何其他系统的网关,无论是通过 Spring Cloud Stream、Spring Framework 4 的 WebSocket 支持还是 Spring Integration 的入站适配器。 此方法调用可能同样快速地由文件存放在 FTP 服务器上、Apache Kafka 队列上到达的新消息或特定部分到达的 TCP 有效负载导致。 我们可以使用 Spring Security 来保护入站消息通道,拒绝无法进行身份验证的消息。 这比直接使用 JMX 容易得多。

如果这还不够,新的 @EnableControlBusController 注释会公开导出 ControlBus 功能!

获取 Spring 新闻通讯

通过 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证来加速您的进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部