使用 Activiti 和 Spring Integration 协调工作

工程 | Josh Long | 2016年2月7日 | ...

最近我收到一封电子邮件,询问如何在 Activiti(Alfresco 的一个开源 BPMN2 工作流引擎)的等待状态下将控制权转发到 Spring Integration 流程,然后在 Spring Integration 完成某些工作后恢复 Activiti 工作流进程的执行。

要真正理解为何这是个有用的目标,我们需要一些背景知识。

什么是 Activiti

Activiti 是一个 业务流程引擎。它本质上是一个节点(状态)的有向图,用于建模复杂业务流程的状态。它跟踪业务流程中描述的工作进展。它描述了系统中的自动化和人工参与者。它还支持查询业务流程引擎,以询问正在进行的流程实例的问题:有多少个实例,哪些实例卡住了等等。业务流程管理系统(BPMS)提供了许多优势,其中包括:

  • 协作流程,其中人员和服务被用于推动实现更大的业务需求(例如贷款审批、法律合规、文档修订等)
  • 它们支持组织中重要业务流程的审计和记录。这在监管环境中是无价的。
  • BPM 引擎被 设计 用于处理长时间运行的流程状态,这意味着您的领域模型不再需要充斥着诸如 is_enrolledreset_password_date 等特定于流程状态的字段。
  • 易于建模复杂的业务流程

最后一点值得重点关注:像 Activiti 这样优秀的 BPM 引擎支持业务流程的可视化建模。UML 支持使用活动(步骤)和泳道(参与完成这些步骤的代理)可视化地描述流程。当然,UML 只是一个建模工具。它没有运行时语义。业务流程管理的终极目标是拥有一种建模符号,既可以被业务分析师使用, 可以被应用程序开发人员使用。BPMN 2 是我们迄今为止最接近实现该目标的成果。

例如,这里是一个 非常 简单的业务流程的可视化模型。

这里是为支持该模型而创建的标准 XML 标记。这个 XML 具有执行语义,而不仅仅是 仅仅 建模语义。

<?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
             xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
             xmlns:activiti="http://activiti.org/bpmn"
             typeLanguage="http://www.w3.org/2001/XMLSchema"
             expressionLanguage="http://www.w3.org/1999/XPath"
             targetNamespace="http://www.activiti.org/bpmn2.0">

    <process id="asyncProcess">

        <startEvent id="start"/>

          <sequenceFlow
            id="flow1"
            sourceRef="start"
            targetRef="sigw"/>

        <serviceTask 
            id="sigw"
           name="Delegate to Spring Integration"
           activiti:delegateExpression="#{gateway}"/>

           <sequenceFlow
            id="scriptFlow"
            sourceRef="sigw"
            targetRef="confirmMovementTask"/>

        <scriptTask 
            id="confirmMovementTask"
            name="Execute script" scriptFormat="groovy">
            <script>
                println 'Moving on..'
            </script>
        </scriptTask>

          <sequenceFlow
            id="flow2"
            sourceRef="confirmMovementTask"
            targetRef="theEnd"/>

        <endEvent id="theEnd"/>

    </process>

</definitions>

大多数分析师不会手动编写这个 XML。相反,他们会使用像 Alfresco 的 Activiti Cloud 这样的建模工具。然而,XML 文档是可往返的:它可以被开发人员标注,然后用建模工具进行修改。

然而,仔细检查后你会发现,大部分内容并不复杂。该流程有四种状态

  • startEvent - 流程开始的地方
  • serviceTask - 调用名为 gateway 的 Spring Bean,以在 Spring Integration 中启动一些处理(稍后详细介绍!)
  • scriptTask - 使用 Groovy 打印一条简单的确认消息
  • endEvent - 完成

顺序由连接节点的各种 sequenceFlow 元素明确规定。

Activiti 是跟踪业务流程状态的好方法,但它不是特别强大的组件模型或集成技术。为此,我们需要一个像 Spring Integration 这样的集成框架。

什么是 Spring Integration

Spring Integration 支持跨多个原本不兼容的系统集成服务和数据。从概念上讲,构建一个集成流类似于在 UNIX 操作系统上使用 stdinstdout 构建管道-过滤器(pipes-and-filters)流

cat input.txt |  grep ERROR | wc -l > output.txt

在这个例子中,我们从一个源(文件 input.txt)获取数据,将其通过管道传递给 grep 命令来过滤结果,只保留包含标记 ERROR 的行,然后通过管道传递给 wc 工具来计算有多少行。最后,最终的计数被写入输出文件 output.txt。这些组件——catgrepwc——彼此之间一无所知。它们在设计时并未考虑相互协作。相反,它们只知道如何从 stdin 读取和写入 stdout。这种数据标准化使得从简单的原子构建复杂的解决方案变得非常容易。在这个例子中,cat 文件这一操作将数据转换成了任何了解 stdin 的进程都可以读取的数据。它将入站数据 适配 成了标准化的字符串行格式。最后,重定向操作符 (>) 将标准化的数据(字符串行)转换成文件系统上的数据。它对其进行了 适配。管道 (|) 字符用于表示一个组件的输出应该流向另一个组件的输入。

Spring Integration 流的工作方式也类似:数据被标准化为 Message<T> 实例。每个 Message<T> 都有一个 payload 和 headers——载荷的元数据,存储在 Map<K,V> 中——它们是不同消息组件的输入和输出。这些消息组件通常由 Spring Integration 提供,但您也可以轻松编写和使用自己的组件。有各种各样的消息组件支持 企业应用集成模式 中的所有模式(过滤器、路由器、转换器、适配器、网关等)。Spring 框架的 MessageChannel 是一个命名通道,Message<T> 通过它在消息组件之间流动。它们是管道,默认情况下,它们的工作方式有点像 java.util.Queue。数据进,数据出。

Spring Integration 的入站适配器将来自外部系统(消息队列,如 RabbitMQ、Apache Kafka 和 JMS,电子邮件、Twitter、文件系统挂载、物联网设备以及无数其他系统)的数据 适配Message<T>。Spring Integration 的出站适配器则执行相反的操作,接收 Message<T> 并将其写入外部系统(例如,作为一封电子邮件、一条推文或一条 Apache Kafka 消息)。

Spring Integration 支持 事件驱动架构,因为它可以帮助检测并响应外部世界的事件。例如,您可以使用 Spring Integration 每 10 秒轮询一次文件系统,并在出现新文件时发布一个 Message<T>。您可以使用 Spring Integration 作为监听器来接收发送到 Apache Kafka 主题的消息。适配器负责响应外部事件,让您不必过多担心消息的来源,而可以将精力集中在消息到达后的处理上。这就像依赖注入的集成等价物!

依赖注入使组件代码无需担心资源的初始化和获取,从而可以专注于编写依赖这些资源的代码。javax.sql.DataSource 字段从哪里来?谁在乎呢!Spring 会将其注入进来,它可能来自测试中的 Mock 对象,传统应用服务器中的 JNDI,或者配置好的 Spring Boot Bean。组件代码对这些细节一无所知。大约 15 年前,当我们刚开始谈论依赖注入时,我们会提到“好莱坞原则”:“别给我打电话,我会给你打电话!”这在 Spring Integration 中更是如此!

入站网关接收来自外部系统的传入请求,将它们作为 Message<T> 处理,并发送回复。出站网关接收 Message<T>,将它们转发到外部系统,并等待该系统的响应。它们支持请求和回复交互。

Activiti 和 Spring Integration 网关

Activiti 可用于以记录、可审计和可预测的状态来描述复杂的、长时间运行的流程,而 Spring Integration 则可用于,嗯,集成!Spring Integration 是我们存放有趣 Java 代码的地方,而 Activiti 则负责跟踪整体状态。这个技巧在 20 年前有用,在当今庞大的分布式微服务世界中也同样有用,因为单个请求的处理可能跨越多个服务和节点。因此,Spring Boot、Spring Integration 和 Activiti 能够很好地协同工作 就很方便了!

一个常见的用例是使用 Activiti 启动一个 BPM 流程,然后在进入等待状态时,将该状态的处理委托给 Spring Integration,Spring Integration 当然可以将工作分发给其他系统。这是一个说明流程的简单图示。

BPM 流程状态通常会涉及人工代理。例如,一个工作流引擎可能有一个状态是文档发送给人工进行审批,但审批人正在休假,几周后才能回来。如果为了等待继续处理所需的确认而保持一个线程开放,期望在几毫秒或几秒内返回,那将是浪费的,更不用说危险了。

Activiti 足够智能,可以在等待状态期间暂停执行,将执行状态持久化到数据库,并且仅在流程执行被 发出信号 时才恢复。向流程引擎发出信号会重新激活流程并恢复执行。一个简单的例子可能是新用户注册业务流程,该流程委托给 Spring Integration 发送包含注册确认链接的电子邮件。用户可能需要几天时间才能点击确认链接,但点击后,系统应继续进行注册业务流程。

在这篇博文中,我们将探讨如何启动一个 BPM 流程,该流程进入等待状态,然后委托给 Spring Integration 进行某种自定义处理,并在遥远的将来恢复流程的执行。

我们将设置两个 Spring Integration 流:一个用于处理从 Activiti 发送到 Spring Integration 的请求,另一个用于处理最终的回复并触发流程的恢复。

我们需要一些东西来启动流程,所以这里有一个简单的 REST 端点 (http://localhost:8080/start),每次调用它都会启动一个新的流程

@RestController
class ProcessStartingRestController {

 @Autowired
 private ProcessEngine processEngine;

 @RequestMapping(method = RequestMethod.GET, value = "/start")
 Map<String, String> launch() {
  ProcessInstance asyncProcess = this.processEngine.getRuntimeService()
    .startProcessInstanceByKey("asyncProcess");
  return Collections.singletonMap("executionId", asyncProcess.getId());
 }
}

消息将流经我们在一个 @Configuration 类中创建的两个 MessageChannelrequestsreplies

@Configuration
class MessageChannels {

 @Bean
 DirectChannel requests() {
  return new DirectChannel();
 }

 @Bean
 DirectChannel replies() {
  return new DirectChannel();
 }
}

使用 @Configuration 类的好处在于它本身是一个 Spring 组件,可以在任何地方被注入。我们可以通过直接调用 @Bean 提供者方法来解引用这些通道。另一种方法是每次注入通道引用时都使用 @Qualifier,例如:


public static final String REPLIES_CHANNEL_NAME = "replies";

@Autowired
@Qualifier(REPLIES_CHANNEL_NAME)
private MessageChannel replies;

我们的 BPMN 相当直观,但我们将使用一个 Activiti 特有的命名空间属性 activiti:delegateExpression="#{gateway}",来告诉 Activiti 调用一个在 Spring 中注册的名为 gateway 的 Bean。Activiti 知道这样做是因为这个应用程序使用了 Activiti 对 Spring Boot 的自动配置,这个自动配置(以及其他许多功能)将 Spring 管理的 Bean 暴露给了 Activiti 的表达式语言。gateway 是一个基于 Activiti 的 Bean,类型为 ReceiveTaskActivityBehavior。Spring Boot 对 Spring Integration 和 Activiti 都有自动配置,所以可以省去 90% 繁琐的设置工作。

让我们看看我们简单的 gateway 组件,它是 Activiti 的 ActivityBehavior 接口的一个实现,充当回调,我们可以将一个 Message<T> 发送到 requests 通道并启动我们的 Spring Integration 流。这里的重点是我们已经捕获了 executionId,稍后我们将需要它来 恢复发出信号 给流程。

@Bean
ActivityBehavior gateway(MessageChannels channels) {
  return new ReceiveTaskActivityBehavior() {

    @Override
    public void execute(ActivityExecution execution) throws Exception {

      Message<?> executionMessage = MessageBuilder
          .withPayload(execution)
          .setHeader("executionId", execution.getId())
          .build();

      channels.requests().send(executionMessage);
    }
  };
}

Message<T> 将从 requests MessageChannel 的另一端弹出,需要有东西来处理它。在一个复杂的例子中,将请求转换为有意义的消息,然后例如转发到其他系统(如电子邮件)将是微不足道的。在这里,我们只是打印出 headers,以便我们可以记录 executionId 并在以后使用。

@Bean
IntegrationFlow requestsFlow(MessageChannels channels) {
 return IntegrationFlows.from(channels.requests())
   .handle(msg -> msg.getHeaders().entrySet()
     .forEach(e -> log.info(e.getKey() + '=' + e.getValue())))
   .get();
}

此时,工作流定义已被持久化,并且没有活动的流程实例。我们需要以某种方式异步地发出信号。我们将使用一个 REST 端点来实现,例如 http://localhost:8080/resume/{executionId}。REST 很容易理解,但实际上我们可以使用 Spring Integration 知道的任何外部系统的任何事件来实现这一效果。唯一需要确保的是,无论外部事件是什么,我们都能够以某种方式将 executionId 传递过去,就像我们在这里通过在 URI 中捕获它一样。


@RestController
class ProcessResumingRestController {

 @Autowired
 private MessageChannels messageChannels;

 @RequestMapping(method = RequestMethod.GET, value = "/resume/{executionId}")
 void resume(@PathVariable String executionId) {

  Message<String> build = MessageBuilder.withPayload(executionId)
    .setHeader("executionId", executionId)
    .build();

  this.messageChannels.replies().send(build);
 }
}

Message<T> 流经 replies MessageChannel 时,它将从另一端弹出,同样需要有东西来处理它。在这里,我们将使用另一个 Spring Integration 流,它接收传入的 Message<T> 并发出流程恢复的信号。一旦这个流执行完毕,您将在控制台上看到流程的下一步(即 scriptTask)被评估,并打印出“Moving on!”字样。

@Bean
IntegrationFlow repliesFlow(MessageChannels channels,
       ProcessEngine engine) {
 return IntegrationFlows.from(channels.replies())
   .handle(msg -> engine.getRuntimeService().signal(
     String.class.cast(msg.getHeaders().get("executionId"))))
   .get();
}

下一步

我们讲了很多!我们同时使用 BPM 引擎 集成框架来处理业务,我理解正确地结合使用这两者可能是一个小众话题。这篇博文的目标是在适用时充分有效地使用两者。BPM 在简单的集成中会增加相当多的 认知负荷,但在业务流程描述必须对模型和业务分析师友好时,它能提供 巨大 的价值。一个常见的误解是,一旦涉及 BPM,开发人员就必须放弃增强系统的能力;事实并非如此!感谢 Spring Boot 和 Activiti 团队的辛勤工作,Activiti 与所有 Spring 组件都能完美配合。

真正的强大之处在于使用 BPM 来协调复杂的处理逻辑:想象一下在一个 BPM 流程中达到某个状态,然后调用一个 Spring Batch 作业,或者在 Spring Cloud 中使用 Ribbon 负载均衡的 RestTemplate 调用一个 REST 服务,或者将 Message<T> 转发到一个 Spring Cloud Data Flow 流处理中。Spring Cloud Data Flow 是我最喜欢的数据处理方法之一,因为它构建在 Spring Cloud Stream 之上,而 Spring Cloud Stream 又构建在 Spring Integration 之上:自下而上全是 MessageChannel

示例代码可以在线获取

获取 Spring 新闻通讯

订阅 Spring 新闻通讯,保持联系

订阅

先行一步

VMware 提供培训和认证,助您加速前进。

了解更多

获取支持

Tanzu Spring 通过一项简单的订阅,为 OpenJDK™、Spring 和 Apache Tomcat® 提供支持和二进制文件。

了解更多

近期活动

查看 Spring 社区的所有近期活动。

查看全部