Spring Integration 在 dm Server 上

工程 | Iwein Fuld | 2009 年 2 月 27 日 | ...

引言

在这篇博客文章中,我将向您展示如何使用 Spring Integration 和 dm Server 创建松耦合、可扩展的应用程序。使用 OSGi 的额外好处是可以在运行时改变应用程序的行为,当然我们也会借此找点乐子。首先,我将快速强调设计并发应用程序的原因,然后我将描述将 OSGi bundle 与消息传递集成的不同策略。在此过程中,您将能瞥见我们的工具和一些 dm Server 功能。如果您已经下载并安装了最新的 SpringSource Tool Suite 和 dm Server,您应该能够自己完成这些。您不需要示例代码来跟随故事,但如果您感兴趣,可以在 Spring Integration sandbox 中找到它。

同步的弊端

应用程序中的同步调用表现良好,但它们无法扩展。这一点之前已有不少文献记载,因此我将快速为您回顾一下,然后继续。进行同步调用时,调用线程必须阻塞,直到调用完成。如果您在该方法中执行 I/O 操作,阻塞线程就会浪费 CPU 周期。如果您有一个大规模并行 Web 应用程序,这不是什么大问题,因为您可以简单地增加使用的线程池的最大大小。然而,如果您需要从一次调用中充分利用服务器的 CPU 能力,那就没辙了。由于一个线程运行在一个核心上,因此您拥有的核心越多,单线程程序就越低效。

消息传递来解救

亲自深入并发编程乐趣无穷,但这并非易事,而且您今天可能想早点完成工作。这就是 Spring Integration 可以帮助您的地方。它为您提供了所需的所有并发处理控制,而无需您承担并发编程底层细节的责任。

OSGi 如何提供帮助?

没有 OSGi,您也可以完全避免同步的弊端。但它让生活变得如此美好,您不应该错过它。每次进行微小更改时都进行 JVM 的完全重新部署和重启会非常无趣。还有一件事让 OSGi 和消息传递的结合变得非常有趣。如果企业应用程序变得庞大,您迟早需要模块化。在同一个传统服务器中运行多个模块是一场灾难(也称为 jar hell)。因此,在传统架构中,人们在许多情况下过早地进行了横向扩展。这导致了非常糟糕的设置。不同的节点必须通过网络相互通信,并且无法共享 CPU 和内存等资源。这使得架构效率低下,并严重影响性能。您不仅浪费了一台服务器中某些核心的周期,现在还跨多个节点重复同样的低效率。更糟糕的是,网络延迟增加了 I/O 等待时间,而在本地方法调用就足够的情况下。

有了 OSGi,您就不必承担这样的损失,您可以在同一个 JVM 中运行不同的 bundle,因此它们之间的调用就像普通方法调用一样快速,而不会出现因在同一个 classpath 上部署不同团队的 jar 包而导致的可怕的 jar hell 问题。拥有一个可以在运行时改变行为、可以由多个团队协作开发并且可以帮助您将所需硬件减少到一台强大服务器的应用程序。听起来很不错,那么我们来做吧。

准备工作

如果您喜欢使用实际的代码示例来跟随博客,或者自己并行编写示例,您需要准备好适当的工具(和代码)。如果您只是想理解博客中的想法,则无需这样做。我将在此处包含必要的代码示例。本文并非详细的教程,因此如果您想跟随实践,您将需要自己编写代码,或者直接使用示例。为此,请遵循以下步骤。

  1. 下载并安装 SpringSource Tool Suitedm Server(这就像解压文件一样简单)。如果您愿意,也可以使用带有 dm Server 工具和 Spring IDE 的 Eclipse。
  2. 以默认名称配置 dm Server 并启动它
    1. 打开 Server 视图,在其中右键单击并选择新建
    2. 在搜索窗口中输入 Spring 并选择 SpringSource dm Server 1.0
    3. 指向已解压的 dm Server 目录
  3. 更新您的 bundle 仓库并安装 Spring Integration Core 1.0.1 和 Commons Logging 1.1.1
    1. 双击新创建的服务器,服务器管理控制台将打开
    2. 选择 repository 选项卡,然后点击“Update local bundle and library repository index”(更新本地 bundle 和库仓库索引)
    3. 搜索 Spring Integration 并下载核心 bundle
    4. 启动服务器
  4. Spring Integration sandbox 导入项目
    1. 将项目签出到本地工作目录
    2. 从 STS 中选择 File->Import->Existing projects into workspace(文件->导入->将现有项目导入工作区)并指向工作目录

异步移交给消息传递 bundle

在本节中,我们将使用一个依赖于 Spring Integration 的单一 bundle,并将我们的工作移交给它。这个 bundle 将隐藏所有消息传递的细节,如果您系统中只有一小部分需要进行密集工作,这可能正是您所需要的。

我将展示的示例应用程序是为一个中世纪小镇开发的。在这些小镇上,过去有一个报信员,他会走街串巷,大声宣布当地新闻和公告。这个小镇(希望保持匿名)被报信员工会迫使其改善报信员的工作条件,并且由于预算限制,他们希望同时减少人数。

他们得出的结论是,实现这些目标的唯一方法是用自动化系统取代报信员的大部分体力劳动。报信员只需在他舒适的办公室里将信息传递给这个系统,然后信息就会分发给镇上的所有公民。市政厅已经记录了所有公民的信息,因此最简单的方法是将这个分发逻辑放在市政厅。

[caption id="attachment_1096" align="alignnone" width="653" caption="示例中各种 bundle 的概览。"]System design of various bundles[/caption]

报信员 bundle 决定不重新培训报信员以适应新系统,因此他仍然会调用镇上的 cry() 方法,只是不再需要四处奔波并多次执行以覆盖镇上的每个地方。公告的分发将由镇上的新系统透明地完成。报信员 bundle 只有一个类 TownCrier.java,它被注入了应该向其喊话的 Town。


@Component
public class TownCrier {
	private static final Log logger = LogFactory.getLog(TownCrier.class);

	@Autowired //Spring will take care of the wiring as usual
	private Town town;

	private ScheduledThreadPoolExecutor schedule;

	@PostConstruct
	public void start() {
		logger.info("Starting feeder");
		schedule = new ScheduledThreadPoolExecutor(1);

		TimerTask task = new TimerTask() {
			public void run() {
				logger.info("Crying the time");
				town.cry("Oyez! Oyez! Oyez! It is now " + new Date());
			}
		};
		schedule.scheduleAtFixedRate(task, 1, 1, TimeUnit.SECONDS);
	}
}

如果您了解 Spring Integration,您可能会对这段代码感到不适,并希望改用 inbound channel adapter 和 poller,但我们在这里只是模拟遗留代码,所以请耐心听我说。

为了帮助 Spring 引用将放在另一个 bundle 中的 town,我们将使用 Spring DM。如果您以前从未接触过 OSGi 和 Spring DM,现在是阅读 入门 的好时机。我们只需在 osgi-context.xml 中添加一个 OSGi reference 元素。


<osgi:reference id="collectorService"
	interface="com.springsource.samples.integration.osgi.town.input.Town" />

最后,在 MANIFEST.mf 中存在一个对导出服务的 town bundle 的依赖。

Import-Package: com.springsource.samples.integration.osgi.town.input,
 org.apache.commons.logging;version="[1.1.1,1.1.1]",
 javax.annotation

同样,完整代码位于 Spring Integration sandbox

Town bundle Town(bundle)已经完全接受了消息传递。他们正在使用 Spring Integration 来处理消息传递和并发。他们还定义了报信员和公民(稍后提及)所需的输入和输出接口。

Town 接口有一个 cry 方法,我们希望将它的实现暴露给报信员。我们可以让 Spring Integration 为我们创建一个实现,该实现接受字符串参数并将其封装在 Message 中放在通道上。这通过 gateway 完成。


<gateway id="inputGateway" default-request-channel="announcements"
	service-interface="com.springsource.samples.integration.osgi.town.input.Town" />

<publish-subscribe-channel id="announcements" />

public interface Town {
	void cry(String string);
}

由于 Town 接口只有一个方法,我们不需要告诉 Spring Integration 调用哪个方法,因此这里的 @Gateway 是可选的。

运行时监听

镇上的公民会在市政厅注册,但他们不知道消息传递。Citizen 可以使用其 onCry(String string) 方法监听喊话,但它不依赖于 Spring Integration。在某些情况下,您可能希望运行一个不依赖 Spring Integration 的 bundle,但在使用 Spring Integration 的事件驱动架构中启用它。

示例在 CityHall 的 Town bundle 中使用了一个 service activator,它负责解包消息并将它们推送给公民


@ServiceActivator
public void onCry(String announcement) {
	for (Citizen consumer : citizens) {
		consumer.onCry(announcement);
	}
}

为了允许公民自行注册,CitizenRegistry 被公开为一个 OSGi 服务。


<osgi:service id="citizenRegistry" ref="cityHall"
	interface="com.springsource.samples.integration.osgi.town.output.CitizenRegistry" />

公民注册服务也由 CityHall 实现,用于维护接收公告的公民集合。

在 scribe bundle 中实现了一个 Citizen 的示例。它通过一个 OSGi reference 引用了 CitizenRegistry。


<osgi:reference id="citizenRegistry"
	interface="com.springsource.samples.integration.osgi.town.output.CitizenRegistry" />

Scribe 在初始化期间向公民注册中心注册自己,随后记录所有接收到的公告。

这样,只有 town bundle 的代码需要更改才能开始使用新系统,而 citizen bundle 可以保持不变。当然,可能有充分的理由拥有多个了解消息传递并直接共享通道的 bundle。在下一节中,我们将探讨这一点。

将通道公开为 OSGi 服务

假设这个小镇也与中央政府打交道。政府以 government bundle 的形式派代表来到镇上。这个代表直接了解 announcementsChannel,因此无需使用 gateway 或 service activator 将其屏蔽。小镇将 announcementsChannel 公开为一个 OSGi 服务


<osgi:service id="announcementsChannel" ref="announcements"
	interface="org.springframework.integration.channel.SubscribableChannel" />

为了将政府法令发送到镇上的公告通道,我们在示例中的 government bundle 中有一个 gateway,但这可以是任何端点。重要的是在 government bundle 中引用 town bundle 中的通道


	<osgi:reference id="announcementsChannel"
		interface="org.springframework.integration.channel.SubscribableChannel" />

现在您可以在另一个 bundle 中将其作为普通通道使用


<si:service-activator input-channel="announcementsChannel"
	ref="nationalArchive" />

<si:gateway
	service-interface="com.springsource.samples.integration.osgi.government.GovernmentDecreeGateway"
	id="decreeer" default-request-channel="announcementsChannel" />

为了简单起见,示例实现了非常简单的本地端点,但一旦通道连接了 bundle,您就可以使用 Spring Integration 的适配器来离开 JVM。

我如何在现实世界中应用这一点?

如果消息有效载荷是中央银行或美联储的利率变化,小镇是内部银行应用程序的中心消息系统,公民是各个交易服务(例如:外汇、债券交易、期权和权证等)。每个都需要模块化,因为
  • 它们由不同的团队频繁更改
  • 定期添加更多模块
  • 每个模块需要根据新的利率采取不同的行动

假设 government bundle 成为银行内部的贷款/信贷部门(其自身对其他机构的贷款利率可能因美联储利率而异 - 可能基于不同的架构构建)。我不是金融专家,也不是您所在领域的专家,因此我不会武断地告诉您这个技术解决方案具体如何应用。将来我们将把示例重构到一个更真实的领域,并将其添加到示例中,为您提供一些思路。如果您有特殊请求,请务必在此处评论反馈。

结论

要设计一个可扩展的应用程序,您需要考虑同步移交的问题。消息传递可以解决许多这些问题。为了演进大型应用程序,使其在单个 JVM 上获得最佳性能,OSGi 是一个引人注目的替代方案。将 Spring Integration 和 dm Server 两者结合使用,强大而简单。在本文中,您已经了解了如何

  • 使用 gateway 处理不依赖 Spring Integration 的 bundle 的入站调用
  • 使用 service activator 处理对不依赖 Spring Integration 的 bundle 的出站调用
  • 公开 SubscribableChannel,以便使用 Spring Integration 的 bundle 可以轮询消息
  • 公开 MessageChannel,以便使用 Spring Integration 的 bundle 可以推送消息
同样的设置使用其他消息传递框架和 OSGi 容器也是可能的,但这个示例的简单性树立了一个难以超越的良好榜样。是的,这是一个挑战,来吧,咬钩吧,你知道你想要...

链接

订阅 Spring 通讯

通过 Spring 通讯保持联系

订阅

领先一步

VMware 提供培训和认证,助您飞速发展。

了解更多

获取支持

Tanzu Spring 在一项简单订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部