领先一步
VMware 提供培训和认证,助您加速进步。
了解更多我最近收到一封电子邮件,询问如何将 Activiti(Alfresco 的开源 BPMN2 工作流引擎)中的等待状态控制转发到 Spring Integration 流,并在 Spring Integration 完成某些工作后恢复 Activiti 工作流进程的执行。
要真正理解这个目标为何有用,我们需要一些背景信息。
Activiti 是一个业务流程引擎。它基本上是一个节点(状态)的有向图,用于模拟复杂业务流程的状态。它跟踪业务流程中描述的工作的进展。它描述了系统中的自动执行者和人工执行者。它还支持对业务流程引擎进行查询,以询问正在进行的流程实例的问题:有多少实例,哪些已停止,等等。业务流程管理系统 (BPMS) 提供了许多优势,其中一些是
is_enrolled 或 reset_password_date。最后一点值得关注:像 Activiti 这样的优秀 BPM 引擎支持可视化建模业务流程。UML 使用活动(步骤)和泳道(参与满足这些步骤的代理)来可视化地描述流程。当然,UML 只是一个建模工具。它没有运行时语义。业务流程管理领域的圣杯一直是拥有一个既能被业务分析师又能被应用程序开发人员使用的建模符号。 BPMN 2 是我们最接近实现这一目标的工具。
例如,这里是一个非常简单的业务流程的可视化模型。
这是为支持该模型而创建的标准 XML 标记。此 XML 具有执行语义,而不仅仅是建模语义。
<?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:activiti="http://activiti.org/bpmn"
typeLanguage="http://www.w3.org/2001/XMLSchema"
expressionLanguage="http://www.w3.org/1999/XPath"
targetNamespace="http://www.activiti.org/bpmn2.0">
<process id="asyncProcess">
<startEvent id="start"/>
<sequenceFlow
id="flow1"
sourceRef="start"
targetRef="sigw"/>
<serviceTask
id="sigw"
name="Delegate to Spring Integration"
activiti:delegateExpression="#{gateway}"/>
<sequenceFlow
id="scriptFlow"
sourceRef="sigw"
targetRef="confirmMovementTask"/>
<scriptTask
id="confirmMovementTask"
name="Execute script" scriptFormat="groovy">
<script>
println 'Moving on..'
</script>
</scriptTask>
<sequenceFlow
id="flow2"
sourceRef="confirmMovementTask"
targetRef="theEnd"/>
<endEvent id="theEnd"/>
</process>
</definitions>
大多数分析师不会手动编写此 XML。相反,他们会使用像 Alfresco 的 Activiti Cloud 这样的建模工具。然而,XML 工件是可往返的:它可以由开发人员注释,然后可以在建模工具中进行修改。
仔细检查,您会发现其中大部分并不复杂。该过程有四个状态
startEvent - 流程的起点serviceTask - 调用名为 gateway 的 Spring bean 来启动 Spring Integration 中的一部分处理(稍后详述!)scriptTask - 使用 Groovy 打印一条简单的确认消息endEvent - 完成排序明确通过连接节点的各种 sequenceFlow 元素来定义。
Activiti 是跟踪业务流程状态的好方法,但它并不是一个特别强大的组件模型或集成技术。为此,我们需要像 Spring Integration 这样的集成框架。
Spring Integration 支持在多个不兼容的系统之间集成服务和数据。概念上,构建集成流类似于在具有 stdin 和 stdout 的 UNIX OS 上构建管道-过滤器流。
cat input.txt | grep ERROR | wc -l > output.txt
在此示例中,我们从一个源(文件 input.txt)获取数据,将其通过 grep 命令进行过滤,只保留包含 ERROR 标记的行,然后将其通过 wc 工具来计算行数。最后,最终计数被写入输出文件 output.txt。这些组件 - cat、grep 和 wc - 彼此一无所知。它们不是为了彼此而设计的。相反,它们只知道如何从 stdin 读取和写入 stdout。这种数据规范化使得从简单的原子组件构建复杂的解决方案非常容易。在这个例子中,cat 一个文件将数据转化为任何知道 stdin 的进程都可以读取的数据。它适配了入站数据到规范化格式,即字符串行。最后,重定向 (>) 操作符将规范化数据(字符串行)转化为文件系统上的数据。它适配了它。管道 (|) 字符用于指示一个组件的输出应流向另一个组件的输入。
Spring Integration 流的工作方式相同:数据被规范化为 Message<T> 实例。每个 Message<T> 都有一个载荷和一个头 - 载荷的元数据以 Map<K,V> 的形式 - 它们是不同消息组件的输入和输出。这些消息组件通常由 Spring Integration 提供,但编写和使用自己的组件也很容易。存在支持所有 企业应用集成模式(过滤器、路由器、转换器、适配器、网关等)的各种消息组件。Spring 框架的 MessageChannel 是一个命名管道,Message<T> 在消息组件之间流动。它们是管道,默认情况下,它们的工作方式类似于 java.util.Queue。数据输入,数据输出。
Spring Integration 入站适配器适配来自外部系统(如 RabbitMQ、Apache Kafka 和 JMS 的消息队列、电子邮件、Twitter、文件系统挂载、物联网设备以及无数其他系统)的数据到 Message<T>。Spring Integration 出站适配器也以相反的方式工作,接收 Message<T> 并将它们写入外部系统(例如,作为电子邮件、Tweet 或 Apache Kafka 消息)。
Spring Integration 支持事件驱动架构,因为它能够检测并响应外部世界的事件。例如,您可以使用 Spring Integration 每 10 秒轮询一个文件系统,并在出现新文件时发布一个 Message<T>。您可以使用 Spring Integration 作为 Apache Kafka 主题上消息的监听器。适配器负责响应外部事件,让您不必过多担心消息的产生,而是可以专注于处理到达的消息。这是与依赖注入相当的集成!
依赖注入使组件代码免于担心资源初始化和获取,使其可以专注于编写具有这些依赖的代码。javax.sql.DataSource 字段来自哪里?谁在乎!Spring 注入了它,它可能从测试中的 Mock、经典应用服务器中的 JNDI 或配置的 Spring Boot bean 中获得。组件代码对这些细节一无所知。大约 15 年前,当我们第一次开始谈论依赖注入时,我们会说“好莱坞原则”:“不要给我打电话,我会给你打电话!”这同样适用于 Spring Integration!
入站网关从外部系统接收传入请求,将它们作为 Message<T> 进行处理,并发送响应。出站网关接收 Message<T>,将其转发到外部系统,并等待该系统的响应。它们支持请求和响应交互。
Activiti 可用于描述复杂、长期运行的流程,将其记录、可审计和可预测的状态,而 Spring Integration 可用于,嗯,集成!Spring Integration 是我们存放有趣的 Java 代码的地方,而 Activiti 负责跟踪整体状态。这个技巧在 20 年前就很有用,在当今这个庞大的分布式微服务世界中仍然很有用,因为单个请求的处理可能跨越多个服务和节点。因此,Spring Boot、Spring Integration 和 Activiti 能够如此紧密地协同工作,这非常方便!
一个常见的用例是使用 Activiti 启动 BPM 流程,然后在进入等待状态时,将该状态的处理委托给 Spring Integration,Spring Integration 当然可以将工作分发到其他系统。下面是一个说明流程的简单图表。
BPM 流程状态通常涉及人工参与者。例如,一个工作流引擎可能有一个状态,其中一个文档被发送给人工进行审批,但审查员正在休假,要几周后才能回来。让一个线程保持开放,期望在几毫秒或几秒内收到继续处理所需的确认,这是浪费,更不用说危险了。
Activiti 足够智能,可以暂停执行,在等待状态期间将执行状态持久化到数据库,并在信号发送给流程执行后才恢复。信号流程引擎会重新加载流程并恢复执行。一个简单的例子可能是用户注册业务流程,该流程委托给 Spring Integration 发送带有注册确认链接的电子邮件。用户可能需要几天时间才能点击确认链接,但在点击后,系统应继续进行注册业务流程。
在本帖中,我们将探讨如何启动一个 BPM 流程,该流程进入等待状态,然后委托给 Spring Integration 执行某种自定义处理,并在遥远的未来恢复流程的执行。
我们将设置两个 Spring Integration 流:一个用于处理从 Activiti 到 Spring Integration 的请求,另一个用于处理最终的回复并触发流程的恢复。
我们需要一些东西来启动我们的流程,所以这里有一个简单的 REST 端点(https://:8080/start),每次调用都会启动一个新流程。
@RestController
class ProcessStartingRestController {
@Autowired
private ProcessEngine processEngine;
@RequestMapping(method = RequestMethod.GET, value = "/start")
Map<String, String> launch() {
ProcessInstance asyncProcess = this.processEngine.getRuntimeService()
.startProcessInstanceByKey("asyncProcess");
return Collections.singletonMap("executionId", asyncProcess.getId());
}
}
消息将通过我们在 @Configuration 类中创建的两个 MesssageChannel 流动:requests 和 replies。
@Configuration
class MessageChannels {
@Bean
DirectChannel requests() {
return new DirectChannel();
}
@Bean
DirectChannel replies() {
return new DirectChannel();
}
}
使用 @Configuration 类的好处是它本身就是一个 Spring 组件,可以在任何地方注入。我们可以通过直接调用 @Bean 提供者方法来解引用通道。另一种方法是每次注入通道引用时都使用 @Qualifier,例如
public static final String REPLIES_CHANNEL_NAME = "replies";
@Autowired
@Qualifier(REPLIES_CHANNEL_NAME)
private MessageChannel replies;
我们的 BPMN 非常简单,但我们将使用一个 Activiti 特定的命名空间属性 activiti:delegateExpression="#{gateway}" 来告诉 Activiti 调用 Spring 中注册的一个名为 gateway 的 bean。Activiti 知道这样做是因为该应用程序使用了 Activiti 的 Spring Boot 自动配置,其中包含许多功能,包括将 Spring 管理的 bean 暴露给 Activiti 表达式语言。gateway 是一个类型为 ReceiveTaskActivityBehavior 的 Activiti bean。Spring Boot 同时为 Spring Integration 和 Activiti 提供了自动配置,因此 90% 的繁琐设置都被消除了。
让我们看看我们简单的 gateway 组件,它是 Activiti 的 ActivityBehavior 接口的实现,它充当回调,我们可以将 Message<T> 发送到 requests 通道并启动我们的 Spring Integration 流。这里重要的是我们捕获了 executionId,我们稍后需要它来恢复或信号流程。
@Bean
ActivityBehavior gateway(MessageChannels channels) {
return new ReceiveTaskActivityBehavior() {
@Override
public void execute(ActivityExecution execution) throws Exception {
Message<?> executionMessage = MessageBuilder
.withPayload(execution)
.setHeader("executionId", execution.getId())
.build();
channels.requests().send(executionMessage);
}
};
}
Message<T> 将从 requests MessageChannel 的另一端弹出,并且需要有东西来处理它。在一个复杂的例子中,将请求转换成有意义的消息,然后,例如,将其转发到其他系统(如电子邮件)将是轻而易举的。在这里,我们只是打印出头部信息,以便我们可以注意到 executionId 并在以后使用它。
@Bean
IntegrationFlow requestsFlow(MessageChannels channels) {
return IntegrationFlows.from(channels.requests())
.handle(msg -> msg.getHeaders().entrySet()
.forEach(e -> log.info(e.getKey() + '=' + e.getValue())))
.get();
}
此时,工作流定义已被持久化,并且没有活动的流程实例。我们需要以某种方式异步地信号它。我们将使用一个 REST 端点 https://:8080/resume/{executionId} 来实现这一点。REST 易于理解,但实际上我们可以使用 Spring Integration 知道的任何外部系统的任何事件来实现此效果。唯一需要确保的是,无论外部事件是什么,我们都能够以某种方式传递 executionId,就像我们在这里通过在 URI 中捕获它一样。
@RestController
class ProcessResumingRestController {
@Autowired
private MessageChannels messageChannels;
@RequestMapping(method = RequestMethod.GET, value = "/resume/{executionId}")
void resume(@PathVariable String executionId) {
Message<String> build = MessageBuilder.withPayload(executionId)
.setHeader("executionId", executionId)
.build();
this.messageChannels.replies().send(build);
}
}
当 Message<T> 流经 replies MessageChannel 时,它将从另一端弹出,同样,需要有东西来处理它。在这里,我们将使用另一个 Spring Integration 流,该流接收传入的 Message<T> 并发出恢复流程的信号。一旦执行了这个流,您将看到流程的下一步,即 scriptTask,被评估,并在控制台中打印出“Moving on!”。
@Bean
IntegrationFlow repliesFlow(MessageChannels channels,
ProcessEngine engine) {
return IntegrationFlows.from(channels.replies())
.handle(msg -> engine.getRuntimeService().signal(
String.class.cast(msg.getHeaders().get("executionId"))))
.get();
}
我们涵盖了很多内容!我们同时涉及了 BPM 引擎和集成框架来处理处理,我理解正确使用两者的交叉点是一个相当小众的话题。本帖的目标是尽可能有效地利用两者。对于简单的集成,BPM 会增加相当大的认知负荷,但当业务流程描述必须对模型和业务分析师友好时,它提供了巨大的价值。一个常见的误解是,一旦引入 BPM,开发人员就必须放弃增强系统的能力;这绝对不是真的!感谢 Spring Boot 和 Activiti 团队的辛勤工作,Activiti 与所有 Spring 组件完美配合。
真正的强大之处在于使用 BPM 来编排复杂的处理逻辑:想象一下命中 BPM 流程中的一个状态,然后调用 Spring Batch 作业,或者使用 Spring Cloud 中的 Ribbon 负载均衡的 RestTemplate 调用 REST 服务,或者将 Message<T> 转发到 Spring Cloud Data Flow 流处理。Spring Cloud Data Flow 是我最喜欢的处理方法之一,因为它构建在 Spring Cloud Stream 之上,而 Spring Cloud Stream 又构建在 Spring Integration 之上:一路都是 MessageChannels!
示例代码 在线提供。