领先一步
VMware 提供培训和认证,助您加速进步。
了解更多在这篇博文中,我将向您展示如何使用 Spring Integration 和 dm Server 创建一个松耦合且可扩展的应用程序。使用 OSGi 的额外好处是,我们可以在运行时更改应用程序的行为,当然我们也会从中获得一些乐趣。首先,我将快速强调设计应用程序以供并发使用的原因,然后我将描述将 OSGi 捆绑包与消息传递集成的不同策略。在此过程中,您将一瞥我们的工具和 dm Server 的一些功能。如果您已下载并安装最新的 SpringSource Tool Suite 和 dm Server,您应该能够自己完成此操作。您不需要示例代码来跟随故事,但如果您有兴趣,它将在 Spring Integration 沙盒中提供。
同步崩溃
应用程序中的同步调用执行良好,但它们无法扩展。之前已经有很多文档记载,所以我将快速回顾一下然后继续。当您进行同步调用时,调用线程必须阻塞直到调用完成。如果您在该方法中进行 I/O,阻塞线程会浪费 CPU 周期。如果您有一个大规模并行 Web 应用程序,这不是一个大问题,因为您只需增加您正在使用的线程池的最大大小。但是,如果您需要从单个调用中充分利用服务器的 CPU 算力,那您就不走运了。由于一个线程运行在一个核心上,您拥有的核心越多,单线程程序的效率就越低。
消息传递救援
自己进行并发编程很有趣,但它不是最容易做的事情,您可能想今天早点下班。这就是 Spring Integration 可以帮助您的地方。它为您提供了您所需的所有并发处理控制,而无需您承担并发编程的低级细节。
OSGi 如何提供帮助?
您可以完全防止没有 OSGi 的同步崩溃。它只是让生活变得更好,您不应该错过它。每次进行一点更改时都进行完全重新部署和重新启动 JVM 将不会很有趣。还有一件事使 OSGi 和消息传递的组合非常有趣。如果企业应用程序变得庞大,您迟早需要进行模块化。在同一个传统服务器中运行多个模块是灾难的根源(也称为 jar 地狱)。因此,在许多情况下,在传统架构中,人们会过早地进行扩展。这会造成非常糟糕的设置。不同的节点必须通过网络相互通信,并且它们不能共享 CPU 和内存等资源。这使得架构效率低下并严重影响性能。您不仅在一个服务器的某些核心上浪费周期,现在您还在多个节点上重复同样的效率低下。更糟糕的是,网络延迟增加了 I/O 等待,而正常的本地方法调用就足够了。
使用 OSGi,您不必承担这种损失,您可以在同一个 JVM 中运行不同的捆绑包,这样它们之间的调用就像正常的方***一样快,而没有与在同一个类路径上部署不同团队的 jar 相关联的可怕的 jar 地狱。拥有一个可以在运行时更改其行为、由多个团队协作开发并帮助您将所需硬件减少到单个强大服务器的应用程序。听起来很棒,那么我们开始吧。
如果您喜欢使用实际代码示例或自己并行编写示例来跟随博客,则需要准备好适当的工具(和代码)。要跟随博客的思路,您不需要这样做。我将在此处包含必要的代码示例。本文无意成为详细的操作指南,因此如果您想跟随,您将不得不自己编写代码,或者只是使用示例。为此,请按照以下步骤操作。
在本节中,我们将使用一个依赖于 Spring Integration 的单个捆绑包,并将我们的工作移交给它。这个捆绑包将隐藏所有消息传递细节,如果您系统只有一小部分需要做密集工作,这可能正是您所需要的。
我将展示的示例应用程序是为一座中世纪城镇开发的。在这些城镇中,过去有一位城市传令官会在街上高声宣告当地新闻和公告。这个城镇(不愿透露姓名)已被城市传令官工会强迫改善其城市传令官工作人员的工作条件,并且由于预算限制,他们希望同时减少人员。
他们得出结论,实现这些目标的唯一方法是用自动化系统取代城市传令官的大部分体力劳动。城市传令官只需在舒适的办公室里将他的消息传递给这个系统,然后消息就会分发给镇上的所有公民。市政厅已经跟踪所有公民,所以将这个分发逻辑放在城镇的市政厅最容易。
[caption id="attachment_1096" align="alignnone" width="653" caption="示例中各种捆绑包的概述。"]
[/caption]
城市传令官捆绑包 决定城市传令官不应重新接受培训以使用新系统,因此他仍将调用城镇的 cry() 方法,他只是不再需要多次进行才能覆盖城镇的每个部分。公告的分发将由城镇中的新系统透明地完成。towncrier 捆绑包只有一个 TownCrier.java 类,它被注入了它应该向其哭喊的 Town。
@Component
public class TownCrier {
private static final Log logger = LogFactory.getLog(TownCrier.class);
@Autowired //Spring will take care of the wiring as usual
private Town town;
private ScheduledThreadPoolExecutor schedule;
@PostConstruct
public void start() {
logger.info("Starting feeder");
schedule = new ScheduledThreadPoolExecutor(1);
TimerTask task = new TimerTask() {
public void run() {
logger.info("Crying the time");
town.cry("Oyez! Oyez! Oyez! It is now " + new Date());
}
};
schedule.scheduleAtFixedRate(task, 1, 1, TimeUnit.SECONDS);
}
}
如果您了解 Spring Integration,您会对这段代码感到畏缩,并希望使用入站通道适配器和轮询器,但我们只是在这里模拟遗留代码,所以请耐心等待。
为了帮助 Spring 引用将放在另一个捆绑包中的 Town,我们将使用 Spring DM。如果您以前从未接触过 OSGi 和 Spring DM,现在是阅读 入门指南 的好时机。我们只需在 osgi-context.xml 中添加一个 OSGi 引用元素。
<osgi:reference id="collectorService"
interface="com.springsource.samples.integration.osgi.town.input.Town" />
最后,MANIFEST.mf 中有一个对导出服务的 town 捆绑包的依赖项。
Import-Package: com.springsource.samples.integration.osgi.town.input,
org.apache.commons.logging;version="[1.1.1,1.1.1]",
javax.annotation
同样,完整代码在 Spring Integration 沙盒中
城镇捆绑包 城镇(捆绑包)已完全采用消息传递。他们正在使用 Spring Integration 来为他们处理消息传递和并发。他们还定义了城市传令官和公民(稍后提及)所需的输入和输出接口。
Town 接口有一个 cry 方法,我们希望向城市传令官公开它的一个实现。我们可以让 Spring Integration 为我们创建一个实现,该实现接收字符串参数并将其作为消息包装在通道上。这是通过网关完成的。
<gateway id="inputGateway" default-request-channel="announcements"
service-interface="com.springsource.samples.integration.osgi.town.input.Town" />
<publish-subscribe-channel id="announcements" />
public interface Town {
void cry(String string);
}
由于 Town 接口只有一个方法,我们不需要告诉 Spring Integration 调用哪个方法,所以这里的 @Gateway 是可选的。
运行时监听
镇上的居民将在市政厅登记,但不知道消息传递。公民可以通过其 onCry(String string) 方法监听哭声,但对 Spring Integration 没有依赖。在某些情况下,您可能希望运行一个不依赖 Spring Integration 的捆绑包,但在事件驱动架构中使用 Spring Integration 来启用它。
该示例在 CityHall 的 Town 捆绑包中使用了一个服务激活器,负责解包消息并将它们推送到公民。
@ServiceActivator
public void onCry(String announcement) {
for (Citizen consumer : citizens) {
consumer.onCry(announcement);
}
}
为了允许公民自行注册,CitizenRegistry 作为 OSGi 服务公开。
<osgi:service id="citizenRegistry" ref="cityHall"
interface="com.springsource.samples.integration.osgi.town.output.CitizenRegistry" />
公民注册服务也由市政厅实现,以维护公告分发到的公民集合。
在 scribe 捆绑包中实现了一个 Citizen 示例。它通过 OSGi 引用引用 CitizenRegistry。
<osgi:reference id="citizenRegistry"
interface="com.springsource.samples.integration.osgi.town.output.CitizenRegistry" />
抄写员在初始化期间向公民注册表注册自己,然后记录它收到的所有公告。
这样,只有城镇捆绑包的代码需要更改才能开始使用新系统,而公民捆绑包可以保持不变。当然,可能有充分的理由让多个捆绑包了解正在进行的消息传递并直接共享通道。在下一节中,我们将深入探讨这一点。
假设城镇也与中央政府打交道。政府以政府捆绑包的形式派代表到城镇。该代表直接了解公告通道,因此无需使用网关或服务激活器来屏蔽它。城镇将 announcementsChannel 公开为 OSGi 服务。
<osgi:service id="announcementsChannel" ref="announcements"
interface="org.springframework.integration.channel.SubscribableChannel" />
为了将政府法令发送到城镇的公告通道,我们在示例中的政府捆绑包中有一个网关,但这可以是任何端点。重要的是要在政府捆绑包中引用城镇捆绑包中的通道。
<osgi:reference id="announcementsChannel"
interface="org.springframework.integration.channel.SubscribableChannel" />
现在您可以在另一个捆绑包中将其用作普通通道
<si:service-activator input-channel="announcementsChannel"
ref="nationalArchive" />
<si:gateway
service-interface="com.springsource.samples.integration.osgi.government.GovernmentDecreeGateway"
id="decreeer" default-request-channel="announcementsChannel" />
为了简单起见,该示例实现了非常简单的本地端点,但是一旦通道桥接了捆绑包,就没有任何东西可以阻止您使用 Spring Integration 的适配器离开 JVM。
假设政府捆绑包成为银行的内部贷款/信贷部门(其对其他方的贷款利率可能会根据联邦利率而变化——可以建立在不同的架构上)。我不是金融专家,也不是您领域的专家,所以我不会妄加揣测这个技术解决方案究竟如何应用。未来,我们将把示例重构为一个更真实的领域,并将其添加到示例中,为您提供一些想法。如果您有特殊要求,请务必在此处评论。
要设计一个可扩展的应用程序,您需要考虑同步移交的问题。消息传递可以解决许多这些问题。对于为了最佳性能最好在单个 JVM 上运行的大型应用程序的发展,OSGi 是一个引人注目的替代方案。将 Spring Integration 和 dm Server 混合使用是强大而简单的。在本文中,您已经了解了如何