领先一步
VMware 提供培训和认证,助您快速进步。
了解更多亲爱的 Spring 社区:
我们很高兴宣布 Spring Integration Hazelcast Support 项目的 Milestone 1 版本现已可用。请使用 Milestone 仓库(通过 Maven 或 Gradle)进行早期试用。
compile "org.springframework.integration:spring-integration-hazelcast:1.0.0.M1"
首先,特别感谢 Eren Avşaroğulları,他发起了这个项目,并且是一位积极响应、充满活力的贡献者。请不要错过他今年在 SpringOne 大会上的 演讲!
Spring Integration 实现了知名的企业集成模式(Enterprise Integration Patterns),在基于 Spring 的应用中提供轻量级消息传递,并通过声明式适配器支持与外部系统集成。Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点分离,这对于编写可维护、可测试的代码至关重要。
另一方面,Hazelcast 是领先的开源内存数据网格,用 Java 编写;它允许在服务器、集群和地域之间分发数据和计算,并能管理非常大的数据集或高数据摄取率。
因此,从我的角度来看,将消息传递引入基于 Hazelcast 的应用,以及提供一种简单的方式通过 Hazelcast 分布式 Spring Integration 消息流,这看起来非常棒。
我相信这个扩展会受到社区的欢迎,并将根据收到的反馈进行积极开发和支持。
让我们回顾一下这个 Milestone 版本为您带来了哪些功能!
对于那些不想等待项目完成,或者已经在应用中使用了 Spring Integration 和 Hazelcast,或者正在考虑这样做的开发者,我很高兴展示一些在这个里程碑发布之前就已经开箱即用的功能。
得益于 QueueChannel
的通用实现,我们已经可以通过 Hazelcast 拥有一个分布式消息通道。
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config());
}
@Bean
public PollableChannel hazelcastQueueChannel() {
return new QueueChannel(hazelcastInstance()
.Message<?>>getQueue("siQueue"));
}
}
在应用中将此配置部署在 Hazelcast 集群的多个节点上,我们将拥有一个分布式 QueueChannel
,并且只有一个节点能够从这个 IQueue
中轮询(poll)单个 Message
。
Hazelcast 中的 ITopic
抽象与 JMS 中的 Topic
有相似的语义:所有订阅者都会收到发布的消息。稍微发挥一点想象力,我们可以将这种机制作为开箱即用的功能来利用。
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public ITopic<Message<?>> siTopic() {
return hazelcastInstance().getTopic("siTopic");
}
@Bean
public MessageChannel publishToHazelcastTopicChannel(
ITopic<Message<?>> siTopic) {
return new FixedSubscriberChannel(siTopic::publish);
}
@Bean
public MessageChannel fromHazelcastTopicChannel() {
return new DirectChannel();
}
@PostConstruct
public void init() {
siTopic().addMessageListener(m ->
fromHazelcastTopicChannel().send(m.getMessageObject()));
}
}
FixedSubscriberChannel
是 DirectChannel
的一个优化变体,它在初始化时需要一个 MessageHandler
。由于 MessageHandler
是一个函数式接口,我们可以简单地为 handleMessage
方法提供一个 Lambda 表达式。当一条消息发送到 publishToHazelcastTopicChannel
时,它只是被发布到 Hazelcast 的 ITopic
。com.hazelcast.core.MessageListener
也是一个函数式接口,因此我们可以为 ITopic#addMessageListener
提供一个 Lambda 表达式。所以,将整个 Message>
发布到 ITopic
允许我们在订阅者端原样接收它,并将其发送到 MessageChannel
,以便在 Hazelcast 集群中所有订阅的节点上进行处理。
利用 Hazelcast 的 ExecutorService
支持,我们可以配置一个 ExecutorChannel
,使其在整个集群中一次只接受和执行一条消息。我称之为集群范围的单例(cluster-wide singleton)。
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.addExecutorConfig(new ExecutorConfig()
.setName("singletonExecutor")
.setPoolSize(1)));
}
@Bean
public MessageChannel hazelcastSingletonExecutorChannel() {
return new ExecutorChannel(hazelcastInstance()
.getExecutorService("singletonExecutor"));
}
}
现在,让我们谈谈在 Spring Integration Hazelcast 扩展的第一个里程碑版本中已经可用的功能。
通过 Spring Integration Hazelcast Support,我们提供了这些组件用于从 Hazelcast 获取入站数据。
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
cache="multiMap"
cache-events="ADDED, REMOVED, CLEAR_ALL" />
<int-hazelcast:cq-inbound-channel-adapter
channel="cqMapChannel"
cache="cqMap"
cache-events="UPDATED, REMOVED"
predicate="name=TestName AND surname=TestSurname"
include-value="true"
cache-listening-policy="SINGLE" />
<int-hazelcast:ds-inbound-channel-adapter
channel="dsMapChannel"
cache="dsMap"
iteration-type="ENTRY"
distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
<int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>
请参阅 Spring Integration Hazelcast 项目页面,了解有关其用途和配置选项的更多信息。
对于那些已经熟悉 Spring Integration Gemfire 支持的人来说,这些组件的使用应该很明显。
目前,我们只提供一个通用的出站通道适配器,用于将数据放入 Hazelcast。
<int-hazelcast:outbound-channel-adapter channel="listChannel"
cache="distributedList" />
<bean id="distributedList" factory-bean="instance" factory-method="getList">
<constructor-arg value="distributedList"/>
</bean>
请参阅项目主页上关于此适配器的更多信息。我们将在 RELEASE
版本之前让这个组件更加灵活,例如利用上面提到的对 ITopic
的发布操作,添加运行时 distributedObject
解析(例如通过 SpEL),允许将 MapEntry
作为入站消息的 payload
等。
这只是通往 RELEASE
版本道路的开端。我们和 Eren 正在考虑一些功能,例如 Hazelcast Distributed Execution Service Activator
、Hazelcast Client Support
、Hazelcast backed Subscribable Channel
、HazelcastLockRegistry
、Annotation Support
等,并希望在今年九月的 SpringOne 大会之前发布 1.0.0.RELEASE
版本。在此期间,请随时通过 StackOverflow、JIRA 和 GitHub Issues 与我们联系,分享您的想法和建议!
项目页面 | JIRA | 问题 | [贡献] (https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md) | StackOverflow (spring-integration
标签)