Spring Integration Hazelcast 支持 1.0 里程碑 1 现已可用

发布 | Artem Bilan | 2015 年 4 月 20 日 | ...

尊敬的 Spring 社区:

我们很高兴地宣布 Spring Integration Hazelcast 支持项目的里程碑 1。使用 里程碑存储库 与 Maven 或 Gradle 尝试早期访问。

compile "org.springframework.integration:spring-integration-hazelcast:1.0.0.M1"

首先,特别感谢 Eren Avşaroğulları,他发起了这个项目,并且是一位反应迅速且充满活力的贡献者。请不要错过他在今年 SpringOne 上与我们一起进行的 演讲

概述

Spring Integration 实现众所周知的 企业集成模式,并在基于 Spring 的应用程序中提供轻量级消息传递,并通过声明式适配器支持与外部系统的集成。Spring Integration 的主要目标是为构建企业集成解决方案提供一个简单的模型,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。

另一方面,Hazelcast 是领先的开源内存数据网格,用 Java 编写;它允许跨服务器、集群和地理位置分布数据和计算,并管理非常大的数据集或高数据摄取率。

因此,在我看来,将消息传递到基于 Hazelcast 的应用程序中,以及通过 Hazelcast 轻松分发 Spring Integration 消息流非常酷。

我相信此扩展将受到社区的需求,并根据收到的反馈进行积极的开发和支持。

让我们回顾一下我们在此里程碑中为您提供的建议!

就地功能

对于那些不想等待项目完成,或者已经在他们的应用程序中使用 Spring Integration 和 Hazelcast 或者想要考虑这样做的人,我很高兴地介绍即使在此里程碑版本之前也能开箱即用的功能。

IQueue

由于QueueChannel 的实现非常通用,我们已经可以使用 Hazelcast 创建分布式消息通道。

@Configuration
@EnableIntegration
public static class ContextConfiguration {

	@Bean
	public HazelcastInstance hazelcastInstance() {
		return Hazelcast.newHazelcastInstance(new Config());
	}

	@Bean
	public PollableChannel hazelcastQueueChannel() {
		return new QueueChannel(hazelcastInstance()
                                          .Message<?>>getQueue("siQueue"));
	}

}

将此配置放置在应用程序的 Hazelcast 集群中的多个节点上,我们将拥有一个分布式QueueChannel,并且只有一个节点能够从此IQueue 轮询单个Message

ITopic

Hazelcast 中的ITopic 抽象具有与 JMS 中的Topic 类似的语义:所有订阅者都会收到已发布的消息。使用一点想象力,我们可以将此机制用作开箱即用的功能。

@Configuration
@EnableIntegration
public static class ContextConfiguration {

	@Bean
    public ITopic<Message<?>> siTopic() {
    	return hazelcastInstance().getTopic("siTopic");
    }
    
    @Bean
    public MessageChannel publishToHazelcastTopicChannel(
                                    ITopic<Message<?>> siTopic) {
    	return new FixedSubscriberChannel(siTopic::publish);
    }

    @Bean
    public MessageChannel fromHazelcastTopicChannel() {
	return new DirectChannel();
    }

    @PostConstruct
    public void init() {
	siTopic().addMessageListener(m -> 
                     fromHazelcastTopicChannel().send(m.getMessageObject()));
    }

}

FixedSubscriberChannelDirectChannel 的优化变体,它在初始化时需要一个MessageHandler。由于MessageHandler 是一个函数式接口,因此我们可以简单地为handleMessage 方法提供一个 Lambda 表达式。当消息发送到publishToHazelcastTopicChannel 时,它只是发布到 Hazelcast ITopiccom.hazelcast.core.MessageListener 也是一个函数式接口,因此我们可以为ITopic#addMessageListener 提供一个 Lambda 表达式。因此,将整个Message<?> 发布到ITopic 允许我们在订阅者处按原样接收它并发送到MessageChannel 以在 Hazelcast 集群中的所有已订阅节点上进行处理。

IExecutorService

使用 Hazelcast ExecutorService 支持,我们可以配置一个ExecutorChannel 以在整个集群中一次接受和执行一条消息。我称之为集群范围的单例

@Configuration
@EnableIntegration
public static class ContextConfiguration {

	@Bean
	public HazelcastInstance hazelcastInstance() {
		return Hazelcast.newHazelcastInstance(new Config()
				.addExecutorConfig(new ExecutorConfig()
						.setName("singletonExecutor")
						.setPoolSize(1)));
	}
	@Bean
	public MessageChannel hazelcastSingletonExecutorChannel() {
		return new ExecutorChannel(hazelcastInstance()
                                       .getExecutorService("singletonExecutor"));
	}
	
}

现在让我们讨论一下 Spring Integration Hazelcast 扩展的第一个里程碑中已经可用的功能。

Hazelcast 入站通道适配器

使用 Spring Integration Hazelcast 支持,我们为来自 Hazelcast 的入站数据提供了这些组件。

<int-hazelcast:inbound-channel-adapter channel="multiMapChannel" 
                cache="multiMap" 
                cache-events="ADDED, REMOVED, CLEAR_ALL" /> 
                              
<int-hazelcast:cq-inbound-channel-adapter 
                channel="cqMapChannel" 
                cache="cqMap" 
                cache-events="UPDATED, REMOVED" 
                predicate="name=TestName AND surname=TestSurname"
                include-value="true"
                cache-listening-policy="SINGLE" /> 
                
<int-hazelcast:ds-inbound-channel-adapter  
                channel="dsMapChannel" 
                cache="dsMap"
                iteration-type="ENTRY" 
                distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
    <int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>

请参阅 Spring Integration Hazelcast 项目页面,以获取有关其用途和配置选项的更多信息。

对于已经熟悉 Spring Integration Gemfire 支持的人来说,这些的使用应该是显而易见的。

Hazelcast 出站通道适配器

目前,我们只提供一个通用的出站通道适配器来将数据放入 Hazelcast。

<int-hazelcast:outbound-channel-adapter channel="listChannel" 
                     cache="distributedList" /> 

<bean id="distributedList" factory-bean="instance" factory-method="getList"> 
    <constructor-arg value="distributedList"/> 
</bean> 

在项目主页上查看有关此适配器的更多信息。在RELEASE 之前,我们将使此组件更加灵活,例如利用上面提到的对ITopic发布操作,添加运行时distributedObject 解析(例如通过 SpEL),允许接受MapEntry 作为传入消息的payload 等。

总结

这仅仅是通往 RELEASE 的道路的开始。我们与 Eren 一样,心中还有几个功能,例如Hazelcast 分布式执行服务激活器Hazelcast 客户端支持Hazelcast 支持的可订阅通道HazelcastLockRegistry注解支持等,并希望在 9 月份的 SpringOne 大会之前发布1.0.0.RELEASE。在此期间,请随时通过 StackOverflow、JIRA 和 GitHub 问题与我们联系,分享您的想法和建议!

项目页面 | JIRA | 问题 | [贡献] (https://github.com/spring-projects/spring-integration/blob/master/CONTRIBUTING.md) | StackOverflowspring-integration 标签)

获取 Spring 新闻通讯

与 Spring 新闻通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获取支持

Tanzu Spring 在一个简单的订阅中提供对 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部