Spring Framework 4.0 M2:WebSocket 消息架构

工程 | Rossen Stoyanchev | 2013 年 7 月 24 日 | ...

正如我之前写过的那样,WebSocket API 仅仅是 WebSocket 样式消息应用程序的起点。许多实际的挑战仍然存在。正如一位 Tomcat 邮件列表用户最近思考的那样

在我看来,WebSocket 似乎还没有真正“准备好投入生产”(我并不是在谈论 Tomcat 的实现本身,而是更普遍地说)……IE 中的原生 WebSocket 功能仅在 IE-10 及更高版本中可用,并且允许此功能在较低 IE 版本中工作的解决方案有点“不确定”(例如,依赖于通过 Adobe 的 FlashPlayer 的转向)。(我们的大多数客户都是大型公司,他们不会更新他们的浏览器,也不会在他们的防火墙中打开特殊端口,仅仅是为了取悦我们)。

Spring Framework 4.0 的第一个里程碑提供了对SockJS 的服务器端支持,SockJS 是最佳且最全面的 WebSocket 浏览器回退选项。您需要在不支持 WebSocket 的浏览器中以及网络代理阻止其使用的情况下使用回退选项。简单来说,SockJS 使您能够构建当今的 WebSocket 应用程序,并在必要时依靠透明的回退选项。

即使有回退选项,更大的挑战仍然存在。套接字是一个非常底层的抽象,而当今绝大多数 Web 应用程序都没有针对套接字进行编程。这就是 WebSocket 协议定义子协议机制的原因,该机制本质上能够并鼓励在 WebSocket 上使用更高级别的协议,就像我们在 TCP 上使用 HTTP 一样。

Spring Framework 4.0 的第二个里程碑支持在 WebSocket 上使用更高级别的消息协议。为了演示这一点,我们构建了一个示例应用程序。

股票投资组合示例 股票投资组合示例应用程序(在 Github 上可用)加载用户的投资组合持仓,允许买卖股票,使用价格报价,并显示持仓更新。这是一个相当简单的应用程序。但它处理了许多在基于浏览器的消息应用程序中可能出现的常见任务。

Snapshot of Stock Portfolio Application

那么我们如何构建这样的应用程序呢?从 HTTP 和 REST,我们习惯于依靠 URL 以及 HTTP 动词来表达需要执行的操作。在这里,我们有一个套接字和大量消息。如何确定消息的目标对象以及消息的含义?

Browser and Server exchange messages but what's in the message?

浏览器和服务器必须在表达此类语义之前达成关于通用消息格式的共识。存在几种可以提供帮助的协议。由于其简单性和广泛的支持,我们选择在本里程碑中使用STOMP

简单/流式文本导向消息协议 (STOMP)

STOMP 是一种旨在简化的消息协议。它基于 HTTP 模型的帧。一个帧由一个命令、可选的头部和可选的主体组成。

例如,股票投资组合应用程序需要接收股票报价,因此客户端发送一个SUBSCRIBE帧,其中destination头部指示客户端想要订阅的内容

SUBSCRIBE  
id:sub-1  
destination:/topic/price.stock.*  

随着股票报价变得可用,服务器会发送一个MESSAGE帧,其中包含匹配的目标和订阅 ID,以及内容类型头部和主体

MESSAGE  
subscription:sub-1  
message-id:wm2si1tj-4  
content-type: application/json  
destination:/topic/stocks.PRICE.STOCK.NASDAQ.EMC  

{\"ticker\":\"EMC\",\"price\":24.19}  

为了在浏览器中执行所有这些操作,我们使用stomp.jsSockJS 客户端


var socket = new SockJS('/spring-websocket-portfolio/portfolio');
var client = Stomp.over(socket);

var onConnect = function() {
  client.subscribe("/topic/price.stock.*", function(message) {
      // process quote
  });
};
client.connect('guest', 'guest', onConnect);

这已经是一个巨大的进步!!我们有一个标准的消息格式和客户端支持。

现在我们可以转向服务器端。

消息代理解决方案 服务器端的一个选项是纯消息代理解决方案,其中消息直接发送到传统的 Message Broker(如 RabbitMQ、ActiveMQ 等)。大多数(如果不是全部)代理都支持通过 TCP 的 STOMP,但越来越多地它们也支持通过 WebSocket 的 STOMP,而 RabbitMQ 则更进一步,还支持 SockJS。我们的架构如下所示

Browser sends STOMP messages to broker, application connects to broker via AMQP or JMS

这是一个健壮且可扩展的解决方案,但可以说它并不适合手头的任务。消息代理通常用于企业内部。直接通过 Web 公开它们并不理想。

如果我们从 REST 中学到任何东西,那就是我们不想公开有关系统内部细节的信息,例如数据库或域模型。

此外,作为 Java 开发人员,您希望应用安全、验证和添加应用程序逻辑。在消息代理解决方案中,应用程序服务器位于消息代理之后,这与大多数 Web 应用程序开发人员习惯的方式有很大不同。

这就是为什么像socket.io 这样的库很受欢迎。它很简单,并且针对 Web 应用程序的需求。另一方面,我们不能忽视消息代理处理消息的能力,它们确实擅长此任务,并且消息传递是一个难题。我们需要两者兼得。

应用程序和消息代理解决方案 另一种方法是使应用程序处理传入的消息,并充当 Web 客户端和消息代理之间的中介。来自客户端的消息可以通过应用程序流向代理,反之亦然,来自代理的消息可以通过应用程序流回客户端。这使应用程序有机会检查传入的消息类型 和“目标”头部,并决定是处理消息还是将其传递给代理。

Browser sends messages to application that in turn sends messages to a message broker

这就是我们选择的方法。为了更好地说明这一点,这里有一些场景。

加载投资组合持仓

  • 客户端请求投资组合持仓
  • 应用程序通过加载并返回数据到订阅来处理请求
  • 消息代理不参与此交互

订阅股票报价

  • 客户端发送股票报价订阅请求
  • 应用程序将消息传递给消息代理
  • 消息代理将消息传播到所有已订阅的客户端

接收股票报价

  • QuoteService 将股票报价消息发送到消息代理
  • 消息代理将消息传播到所有已订阅的客户端

执行交易

  • 客户端发送交易请求
  • 应用程序处理它,通过TradeService提交交易以执行
  • 消息代理不参与此交互

接收持仓更新

  • 交易服务将持仓更新消息发送到消息代理上的队列
  • 消息代理将持仓更新发送到客户端
  • 稍后将更详细地介绍向特定用户发送消息

严格来说,使用消息代理是可选的。我们提供了一个开箱即用的“简单”替代方案,用于入门。但是,建议使用消息代理来实现可扩展性和在具有多个应用程序服务器的部署中使用。

代码片段 让我们看一些客户端和服务器端代码的示例。

这是portfolio.js 请求投资组合持仓

stompClient.subscribe("/app/positions", function(message) {
  self.portfolio().loadPositions(JSON.parse(message.body));
});

在服务器端,PortfolioController 检测到请求并返回投资组合持仓,演示了 Web 应用程序中非常常见的请求-回复交互。由于我们使用 Spring Security 来保护 HTTP 请求(包括导致 WebSocket 握手的请求),因此下面的principal方法参数取自 Spring Security 在 HttpServletRequest 上设置的用户主体。

@Controller
public class PortfolioController {

  // ...

  @SubscribeEvent("/app/positions")
  public List<PortfolioPosition> getPortfolios(Principal principal) {
    String user = principal.getName();
    Portfolio portfolio = this.portfolioService.findPortfolio(user);
    return portfolio.getPositions();
  }
}

这是portfolio.js发送交易请求

stompClient.send("/app/trade", {}, JSON.stringify(trade));

在服务器端,PortfolioController发送交易以执行

@Controller
public class PortfolioController {

  // ...

  @MessageMapping(value="/app/trade")
  public void executeTrade(Trade trade, Principal principal) {
    trade.setUsername(principal.getName());
    this.tradeService.executeTrade(trade);
  }
}

PortfolioController还可以通过向用户发送消息来处理意外异常。

@Controller
public class PortfolioController {

  // ...

  @MessageExceptionHandler
  @ReplyToUser(value="/queue/errors")
  public String handleException(Throwable exception) {
    return exception.getMessage();
  }
}

那么从应用程序内部向已订阅的客户端发送消息呢?这是QuoteService发送报价的方式

@Service
public class QuoteService {

  private final MessageSendingOperations<String> messagingTemplate;

  @Scheduled(fixedDelay=1000)
  public void sendQuotes() {
    for (Quote quote : this.quoteGenerator.generateQuotes()) {
      String destination = "/topic/price.stock." + quote.getTicker();
      this.messagingTemplate.convertAndSend(destination, quote);
    }
  }
}

以及这是TradeService在交易执行后发送持仓更新的方式

@Service
public class TradeService {

  // ...

  @Scheduled(fixedDelay=1500)
  public void sendTradeNotifications() {
    for (TradeResult tr : this.tradeResults) {
      String queue = "/queue/position-updates";
      this.messagingTemplate.convertAndSendToUser(tr.user, queue, tr.position);
    }
  }
}

如果你想知道……是的,PortfolioController还可以包含 Spring MVC 方法(例如@RequestMapping),如之前构建在线游戏应用程序的开发人员在此工单中建议的那样

是的,将[消息]映射和 Spring MVC 映射合并起来会很好。它们没有理由不能统一。

就像 QuoteService 和 TradeService 一样,Spring MVC 控制器方法也可以发布消息。

Spring 应用程序的消息支持 长期以来,Spring Integration 一直为众所周知的企业集成模式以及轻量级消息提供了头等舱抽象。在处理此里程碑时,我们意识到后者正是我们需要构建的基础。

因此,我很高兴地宣布,我们已将一些 Spring Integration 类型移动到 Spring Framework 中的一个名为spring-messaging的新模块中。除了核心抽象(如MessageMessageChannelMessageHandler等)之外,新模块还包含支持本文中描述的新功能的所有注释和类。

考虑到这一点,我们现在可以查看股票投资组合应用程序的内部架构图

Diagram of internal architecture with message broker

StompWebSocketHandler将传入的客户端消息放入“调度”消息通道。此通道有 3 个订阅者。第一个委托给带注释的方法,第二个将消息中继到 STOMP 消息代理,而第三个通过将目标转换为客户端订阅的唯一队列名称来处理发送到各个用户的消息(更多详细信息将在后面介绍)。

默认情况下,应用程序使用作为入门选项提供的“简单”消息代理运行。如示例README中所述,您可以通过激活和停用配置文件在“简单”消息代理和功能齐全的消息代理之间切换。

Diagram of internal architecture with simple broker

另一个可能的配置更改是从基于Executor到基于Reactor的消息通道实现进行切换,以进行消息传递。最近发布了第一个里程碑的Reactor项目也用于管理应用程序与消息代理之间的TCP连接。

您可以查看完整的应用程序配置,其中还包括新的Spring Security Java 配置。您可能还会对改进的STS对Java配置的支持感兴趣。

向单个用户发送消息 您可以轻松地看到如何将消息广播到多个已订阅的客户端,只需将消息发布到主题即可。更难看到如何将消息发送到特定用户。例如,您可能会捕获异常并希望发送错误消息。或者您可能已收到交易确认,并希望将其发送给用户。

在传统的邮件应用程序中,通常会创建一个临时队列,并在任何需要回复的消息上设置“reply-to”标头。这可以工作,但在 Web 应用程序中感觉相当麻烦。客户端必须记住在所有适用的消息上设置必要的标头,并且服务器应用程序可能需要跟踪并传递此信息。有时此类信息可能根本无法获得,例如,在处理 HTTP POST 作为传递消息的替代方法时。

为了支持此需求,我们向每个连接的客户端发送唯一的队列后缀。然后可以附加该后缀以创建唯一的队列名称。

client.connect('guest', 'guest', function(frame) {

  var suffix = frame.headers['queue-suffix'];

  client.subscribe("/queue/error" + suffix, function(msg) {
    // handle error
  });

  client.subscribe("/queue/position-updates" + suffix, function(msg) {
    // handle position update
  });

});

然后在服务器端,@MessageExceptionHandler方法(或任何消息处理方法)可以添加@ReplyToUser注释以将返回值作为消息发送。

@MessageExceptionHandler
@ReplyToUser(value="/queue/errors")
public String handleException(Throwable exception) {
  // ...
}

所有其他类,如 TradeService,可以使用消息传递模板来实现相同的功能。

String user = "fabrice";
String queue = "/queue/position-updates";
this.messagingTemplate.convertAndSendToUser(user, queue, position);

在这两种情况下,我们都在内部查找用户队列后缀(通过配置的UserQueueSuffixResolver)以重建正确的队列名称。目前只有一种简单的解析器实现。但是,很容易添加一个Redis实现,无论用户连接到此应用程序服务器还是其他应用程序服务器,它都支持相同的功能。

结论 希望这能对新功能有所帮助。为了避免文章过长,我建议您查看示例,并考虑它对您正在编写或打算编写的应用程序意味着什么。在我们努力在 9 月初发布候选版本之际,这是一个寻求反馈的绝佳时机。

要使用 Spring Framework 4.0.0.M2,请将http://repo.springsource.org/libs-milestonehttp://repo.springsource.org/milestone存储库添加到您的配置中。前者包含瞬态依赖项,如我们的存储库常见问题解答中所述。

SpringOne 2GX 2013 即将到来

尽快在圣克拉拉的 SpringOne预订您的席位。这是了解所有最新动态并提供直接反馈的最佳机会。今年预计会有许多重要的新品发布。查看最近的博文以了解我的意思,还有更多内容即将推出!

获取 Spring 时事通讯

与 Spring 时事通讯保持联系

订阅

领先一步

VMware 提供培训和认证,以加速您的进步。

了解更多

获得支持

Tanzu Spring 在一个简单的订阅中提供 OpenJDK™、Spring 和 Apache Tomcat® 的支持和二进制文件。

了解更多

即将举行的活动

查看 Spring 社区中所有即将举行的活动。

查看全部