领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多尊敬的 Spring 社区:
我们很高兴地宣布 Spring Integration Hazelcast 支持项目的里程碑 1。使用 里程碑存储库 与 Maven 或 Gradle 尝试早期访问。
compile "org.springframework.integration:spring-integration-hazelcast:1.0.0.M1"
首先,特别感谢 Eren Avşaroğulları,他发起了这个项目,并且是一位反应迅速且充满活力的贡献者。请不要错过他在今年 SpringOne 上与我们一起进行的 演讲!
Spring Integration 实现众所周知的 企业集成模式,并在基于 Spring 的应用程序中提供轻量级消息传递,并通过声明式适配器支持与外部系统的集成。Spring Integration 的主要目标是为构建企业集成解决方案提供一个简单的模型,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。
另一方面,Hazelcast 是领先的开源内存数据网格,用 Java 编写;它允许跨服务器、集群和地理位置分布数据和计算,并管理非常大的数据集或高数据摄取率。
因此,在我看来,将消息传递到基于 Hazelcast 的应用程序中,以及通过 Hazelcast 轻松分发 Spring Integration 消息流非常酷。
我相信此扩展将受到社区的需求,并根据收到的反馈进行积极的开发和支持。
让我们回顾一下我们在此里程碑中为您提供的建议!
对于那些不想等待项目完成,或者已经在他们的应用程序中使用 Spring Integration 和 Hazelcast 或者想要考虑这样做的人,我很高兴地介绍即使在此里程碑版本之前也能开箱即用的功能。
由于QueueChannel
的实现非常通用,我们已经可以使用 Hazelcast 创建分布式消息通道。
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config());
}
@Bean
public PollableChannel hazelcastQueueChannel() {
return new QueueChannel(hazelcastInstance()
.Message<?>>getQueue("siQueue"));
}
}
将此配置放置在应用程序的 Hazelcast 集群中的多个节点上,我们将拥有一个分布式QueueChannel
,并且只有一个节点能够从此IQueue
轮询单个Message
。
Hazelcast 中的ITopic
抽象具有与 JMS 中的Topic
类似的语义:所有订阅者都会收到已发布的消息。使用一点想象力,我们可以将此机制用作开箱即用的功能。
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public ITopic<Message<?>> siTopic() {
return hazelcastInstance().getTopic("siTopic");
}
@Bean
public MessageChannel publishToHazelcastTopicChannel(
ITopic<Message<?>> siTopic) {
return new FixedSubscriberChannel(siTopic::publish);
}
@Bean
public MessageChannel fromHazelcastTopicChannel() {
return new DirectChannel();
}
@PostConstruct
public void init() {
siTopic().addMessageListener(m ->
fromHazelcastTopicChannel().send(m.getMessageObject()));
}
}
FixedSubscriberChannel
是DirectChannel
的优化变体,它在初始化时需要一个MessageHandler
。由于MessageHandler
是一个函数式接口,因此我们可以简单地为handleMessage
方法提供一个 Lambda 表达式。当消息发送到publishToHazelcastTopicChannel
时,它只是发布到 Hazelcast ITopic
。com.hazelcast.core.MessageListener
也是一个函数式接口,因此我们可以为ITopic#addMessageListener
提供一个 Lambda 表达式。因此,将整个Message<?>
发布到ITopic
允许我们在订阅者处按原样接收它并发送到MessageChannel
以在 Hazelcast 集群中的所有已订阅节点上进行处理。
使用 Hazelcast ExecutorService
支持,我们可以配置一个ExecutorChannel
以在整个集群中一次接受和执行一条消息。我称之为集群范围的单例。
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.addExecutorConfig(new ExecutorConfig()
.setName("singletonExecutor")
.setPoolSize(1)));
}
@Bean
public MessageChannel hazelcastSingletonExecutorChannel() {
return new ExecutorChannel(hazelcastInstance()
.getExecutorService("singletonExecutor"));
}
}
现在让我们讨论一下 Spring Integration Hazelcast 扩展的第一个里程碑中已经可用的功能。
使用 Spring Integration Hazelcast 支持,我们为来自 Hazelcast 的入站数据提供了这些组件。
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
cache="multiMap"
cache-events="ADDED, REMOVED, CLEAR_ALL" />
<int-hazelcast:cq-inbound-channel-adapter
channel="cqMapChannel"
cache="cqMap"
cache-events="UPDATED, REMOVED"
predicate="name=TestName AND surname=TestSurname"
include-value="true"
cache-listening-policy="SINGLE" />
<int-hazelcast:ds-inbound-channel-adapter
channel="dsMapChannel"
cache="dsMap"
iteration-type="ENTRY"
distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
<int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>
请参阅 Spring Integration Hazelcast 项目页面,以获取有关其用途和配置选项的更多信息。
对于已经熟悉 Spring Integration Gemfire 支持的人来说,这些的使用应该是显而易见的。
目前,我们只提供一个通用的出站通道适配器来将数据放入 Hazelcast。
<int-hazelcast:outbound-channel-adapter channel="listChannel"
cache="distributedList" />
<bean id="distributedList" factory-bean="instance" factory-method="getList">
<constructor-arg value="distributedList"/>
</bean>
在项目主页上查看有关此适配器的更多信息。在RELEASE
之前,我们将使此组件更加灵活,例如利用上面提到的对ITopic
的发布操作,添加运行时distributedObject
解析(例如通过 SpEL),允许接受MapEntry
作为传入消息的payload
等。
这仅仅是通往 RELEASE 的道路的开始。我们与 Eren 一样,心中还有几个功能,例如Hazelcast 分布式执行服务激活器
、Hazelcast 客户端支持
、Hazelcast 支持的可订阅通道
、HazelcastLockRegistry
、注解支持
等,并希望在 9 月份的 SpringOne 大会之前发布1.0.0.RELEASE
。在此期间,请随时通过 StackOverflow、JIRA 和 GitHub 问题与我们联系,分享您的想法和建议!
项目页面 | JIRA | 问题 | [贡献] (https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md) | StackOverflow(spring-integration
标签)