领先一步
VMware 提供培训和认证,以加速您的进步。
了解更多在这篇博文中,我将向您展示如何使用 Spring Integration 和 dm Server 创建一个松散耦合且可扩展的应用程序。使用 OSGi 的额外好处将允许我们在运行时更改应用程序的行为,当然我们也会从中获得一些乐趣。首先,我将快速重点介绍为并发使用而设计应用程序的原因,然后我将描述将 OSGi 捆绑包与消息传递集成的不同策略。在此过程中,您将了解我们的工具和一些 dm Server 功能。如果您已下载并安装了最新的 SpringSource Tool Suite 和 dm Server,您应该能够自己完成此操作。您不需要示例代码来理解本文,但如果您感兴趣,它将在 Spring Integration 沙箱中提供。
同步的崩溃
应用程序中的同步调用运行良好,但它们无法扩展。之前已经有很多文档对此进行了说明,因此我将快速回顾一下,然后继续。当您进行同步调用时,调用线程必须阻塞直到调用完成。如果您碰巧在该方法中执行 I/O,则阻塞线程会浪费 CPU 周期。如果您有一个大规模并行的 Web 应用程序,这并不是一个太大的问题,因为您可以简单地增加正在使用的线程池的最大大小。但是,如果您需要从单个调用中充分利用服务器 CPU 能力,那么您就走运了。由于一个线程在一个内核上运行,因此内核越多,单线程程序的效率就越低。
消息传递来救援
自己进行并发编程非常有趣,但它并非易事,您可能希望今天早点下班。这就是 Spring Integration 可以帮助您的地方。它为您提供了所有必要的并发处理控制,而无需您负责并发编程的底层细节。
OSGi 如何提供帮助?
您可以在没有 OSGi 的情况下完全防止同步崩溃。它只是让生活变得更好,以至于您不应该错过它。每次您进行小的更改时都要进行完整的重新部署和 JVM 重启不会很有趣。还有一件事使 OSGi 和消息传递的组合非常有趣。如果企业应用程序变得庞大,您必须在某些时候进行模块化。在同一个传统服务器上运行多个模块是灾难的秘诀(也称为 jar 地狱)。因此,在传统架构中,人们在许多情况下过早地进行了横向扩展。这导致了一个非常糟糕的设置。不同的节点必须通过网络相互通信,并且它们不能共享 CPU 和内存等资源。这使得架构效率低下,严重影响性能。您不仅在一个服务器的某些内核上浪费了周期,而且现在还在多个节点上重复相同的低效性。雪上加霜的是,网络延迟增加了 I/O 等待,而正常的本地方法调用就足够了。
使用 OSGi,您不必承受这种损失,您可以在同一个 JVM 中运行不同的捆绑包,以便它们之间的调用与正常的本地方法调用一样快,而不会出现与在同一个类路径上部署不同团队的 jar 文件相关的可怕的 jar 地狱。拥有一个可以在运行时更改其行为、与多个团队一起工作的应用程序,并帮助您将所需的硬件减少到一台强大的服务器。听起来不错,所以让我们开始吧。
如果您更喜欢使用实际的代码示例来学习这篇博文,或者自己并行编写示例,则需要准备好合适的工具(和代码)。要理解博文的思想,您不需要这样做。我将在此处包含必要的代码示例。本文并非旨在成为详细的入门指南,因此如果您想继续学习,您必须自己进行编码,或者只使用示例。为此,请按照以下步骤操作。
在本节中,我们将使用一个依赖于 Spring Integration 的单个捆绑包,并将我们的工作移交给它。此捆绑包将隐藏所有消息传递细节,如果您只有一小部分系统执行密集型工作,这可能正是您需要的。
我将展示的示例应用程序是为中世纪城镇开发的。在这些城镇中,过去有一个镇上的报话员会在街上大声喊出当地的新闻和公告。相关城镇(希望保持匿名)已被镇上的报话员工会迫使改善其报话员员工的工作条件,并且由于预算限制,他们希望同时减少员工人数。
他们得出的结论是,实现这些目标的唯一方法是用自动化系统取代大部分镇上报话员的人工劳动。镇上的报话员只需从他办公室的舒适环境中将他的信息传递给这个系统,然后该信息将透明地分发给城镇中的所有公民。市政厅已经跟踪所有公民,因此将这种分发逻辑放在城镇市政厅是最简单的。
[caption id="attachment_1096" align="alignnone" width="653" caption="示例中各个捆绑包的概述。"][/caption]
**镇上报话员捆绑包** 据决定,镇上的报话员不应该被重新培训以使用新系统,因此他仍然会调用城镇的 cry() 方法,但他不再需要多次去覆盖城镇的每个部分了。公告的分发将由城镇中的新系统透明地完成。镇上报话员捆绑包只有一个类 TownCrier.java,它被注入到它应该向其喊叫的城镇中。
@Component
public class TownCrier {
private static final Log logger = LogFactory.getLog(TownCrier.class);
@Autowired //Spring will take care of the wiring as usual
private Town town;
private ScheduledThreadPoolExecutor schedule;
@PostConstruct
public void start() {
logger.info("Starting feeder");
schedule = new ScheduledThreadPoolExecutor(1);
TimerTask task = new TimerTask() {
public void run() {
logger.info("Crying the time");
town.cry("Oyez! Oyez! Oyez! It is now " + new Date());
}
};
schedule.scheduleAtFixedRate(task, 1, 1, TimeUnit.SECONDS);
}
}
如果您了解 Spring Integration,您会对这段代码感到畏缩,并希望改用入站通道适配器和轮询器,但我们在这里只是模拟遗留代码,所以请耐心等待。
为了帮助 Spring 引用将放在另一个捆绑包中的城镇,我们将使用 Spring DM。如果您以前从未使用过 OSGi 和 Spring DM,那么现在是阅读 入门指南 的好时机。我们只需在我们的 osgi-context.xml 中添加一个 OSGi 引用元素。
<osgi:reference id="collectorService"
interface="com.springsource.samples.integration.osgi.town.input.Town" />
最后,在 MANIFEST.mf 中有一个对导出服务的城镇捆绑包的依赖关系。
Import-Package: com.springsource.samples.integration.osgi.town.input,
org.apache.commons.logging;version="[1.1.1,1.1.1]",
javax.annotation
同样,完整的代码位于 Spring Integration 沙箱
**城镇捆绑包** 城镇(捆绑包)已完全采用消息传递。他们正在使用 Spring Integration 来为他们处理消息传递和并发。他们还定义了镇上报话员和公民(稍后会提到)所需的输入和输出接口。
Town 接口有一个 cry 方法,我们希望向镇上报话员公开其实现。我们可以让 Spring Integration 为我们创建一个实现,该实现采用字符串参数并将其包装在 Message 中放入通道中。这是使用网关完成的。
<gateway id="inputGateway" default-request-channel="announcements"
service-interface="com.springsource.samples.integration.osgi.town.input.Town" />
<publish-subscribe-channel id="announcements" />
public interface Town {
void cry(String string);
}
由于 Town 接口只有一个方法,因此我们不需要告诉 Spring Integration 调用哪个方法,因此此处的 @Gateway 是可选的。
在运行时监听
城镇中的公民将在市政厅注册,但不知道消息传递。公民可以使用其 onCry(String string) 方法监听哭声,但没有任何对 Spring Integration 的依赖。在某些情况下,您可能希望运行没有任何 Spring Integration 依赖项的捆绑包,但在使用 Spring Integration 的事件驱动架构中启用它。
该示例在 CityHall 中的 Town 捆绑包中使用服务激活器来处理解包消息并将它们推送到公民。
@ServiceActivator
public void onCry(String announcement) {
for (Citizen consumer : citizens) {
consumer.onCry(announcement);
}
}
为了允许公民注册自己,CitizenRegistry 作为 OSGi 服务公开。
<osgi:service id="citizenRegistry" ref="cityHall"
interface="com.springsource.samples.integration.osgi.town.output.CitizenRegistry" />
CitizenRegistry 服务也由 CityHall 实现,以维护向其分发公告的公民集合。
在 scribe 捆绑包中,实现了 Citizen 的示例。它通过 OSGi 引用引用 CitizenRegistry。
<osgi:reference id="citizenRegistry"
interface="com.springsource.samples.integration.osgi.town.output.CitizenRegistry" />
scribe 在初始化期间将自身注册到公民注册表,然后记录它接收到的所有公告。
这样,只有城镇捆绑包的代码需要更改才能开始使用新系统,而公民捆绑包可以保持不变。当然,可能有很多充分的理由拥有多个了解正在进行的消息传递并直接共享通道的捆绑包。在下一节中,我们将研究一下。
假设小镇也与中央政府打交道。政府以政府捆绑包的形式向小镇派遣代表。该代表直接了解公告渠道,因此无需使用网关或服务激活器将其屏蔽。小镇将 announcementsChannel 公开为 OSGi 服务。
<osgi:service id="announcementsChannel" ref="announcements"
interface="org.springframework.integration.channel.SubscribableChannel" />
为了将政府法令发送到小镇的公告渠道,我们在示例中的政府捆绑包中设置了一个网关,但这可以是任何端点。重要的是在政府捆绑包中引用小镇捆绑包中的渠道。
<osgi:reference id="announcementsChannel"
interface="org.springframework.integration.channel.SubscribableChannel" />
现在您可以将其作为其他捆绑包中的普通渠道使用。
<si:service-activator input-channel="announcementsChannel"
ref="nationalArchive" />
<si:gateway
service-interface="com.springsource.samples.integration.osgi.government.GovernmentDecreeGateway"
id="decreeer" default-request-channel="announcementsChannel" />
为简单起见,示例实现了非常简单的本地端点,但是一旦渠道桥接了捆绑包,就没有任何东西可以阻止您使用 Spring Integration 适配器来跳出 JVM。
假设政府捆绑包成为银行的内部贷款/信贷部门(其对其他人的贷款利率可能因美联储利率而异——可以基于不同的架构构建)。我不是金融专家,也不是您所在领域的专家,因此我不会冒昧地告诉您此技术解决方案的确切应用方式。将来,我们将对示例进行重构以使其更贴近实际领域,并将此添加到示例中,以提供一些思路。如果您有特殊要求,请务必在此处发表评论。
为了设计一个可扩展的应用程序,您需要考虑同步移交的问题。消息传递可以解决许多这些问题。对于最适合在单个 JVM 上运行以获得最佳性能的大型应用程序的演进,OSGi 是一个引人注目的替代方案。将 Spring Integration 和 dm Server 结合使用功能强大且简单。在本文中,您已经了解了如何