领先一步
VMware提供培训和认证,以加速您的进步。
了解更多我最近收到一封电子邮件,询问如何将控制权从Activiti(Alfresco的一个开源BPMN2工作流引擎)的等待状态转发到Spring Integration流程,然后在Spring Integration中完成一些工作后恢复Activiti工作流进程的执行。
为了真正理解为什么这是一个有用的目标,我们需要一些背景知识。
Activiti是一个业务流程引擎。它基本上是一个节点(状态)的有向图,用于模拟复杂业务流程的状态。它跟踪业务流程中描述的工作进度。它描述了系统中基于自动和人工的参与者。它还支持查询业务流程引擎以询问正在进行的流程实例相关的问题:有多少流程实例,哪些流程实例已停滞等。业务流程管理系统(BPMS)具有许多优势,其中一些是
is_enrolled
或reset_password_date
之类的特定于流程状态的字段。最后一点值得关注:像Activiti这样的好的BPM引擎支持可视化建模业务流程。UML支持使用活动(步骤)和泳道(参与满足这些步骤的代理)来可视化地描述流程。当然,UML只是一个建模工具。它没有运行时语义。业务流程管理的圣杯一直是拥有一个建模符号,该符号可以被业务分析师和应用程序开发人员使用。BPMN 2是我们朝着实现这一目标迈出的最接近的一步。
例如,这是一个非常简单的业务流程的可视化模型。
这是创建的用于支持该模型的标准XML标记。此XML具有执行语义,而不仅仅是建模语义。
<?xml version="1.0" encoding="UTF-8"?>
<definitions id="definitions"
xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:activiti="http://activiti.org/bpmn"
typeLanguage="http://www.w3.org/2001/XMLSchema"
expressionLanguage="http://www.w3.org/1999/XPath"
targetNamespace="http://www.activiti.org/bpmn2.0">
<process id="asyncProcess">
<startEvent id="start"/>
<sequenceFlow
id="flow1"
sourceRef="start"
targetRef="sigw"/>
<serviceTask
id="sigw"
name="Delegate to Spring Integration"
activiti:delegateExpression="#{gateway}"/>
<sequenceFlow
id="scriptFlow"
sourceRef="sigw"
targetRef="confirmMovementTask"/>
<scriptTask
id="confirmMovementTask"
name="Execute script" scriptFormat="groovy">
<script>
println 'Moving on..'
</script>
</scriptTask>
<sequenceFlow
id="flow2"
sourceRef="confirmMovementTask"
targetRef="theEnd"/>
<endEvent id="theEnd"/>
</process>
</definitions>
大多数分析师不会手动编写此XML。相反,他们会使用像Alfresco的Activiti Cloud这样的建模工具。但是,XML工件是可双向转换的:开发人员可以对其进行注释,然后在建模工具中进行修改。
但是,经过检查,你会发现大部分内容并不复杂。该流程有四个状态
startEvent
- 流程从这里开始serviceTask
- 调用名为gateway
的Spring bean以在Spring Integration中启动一些处理(稍后将详细介绍!)scriptTask
- 使用Groovy打印简单的确认消息endEvent
- 完成排序由连接节点的各种sequenceFlow
元素明确指定。
Activiti是跟踪业务流程状态的好方法,但它不是一个特别强大的组件模型或集成技术。为此,我们需要一个像Spring Integration这样的集成框架。
Spring Integration支持跨多个原本不兼容的系统集成服务和数据。从概念上讲,组合集成流程类似于在UNIX操作系统上使用stdin
和stdout
组合管道和过滤器流程。
cat input.txt | grep ERROR | wc -l > output.txt
在这个例子中,我们从一个源(文件input.txt
)获取数据,将其传递给grep
命令以过滤结果并仅保留包含标记ERROR
的行,然后将其传递给wc
实用程序,我们使用它来计算有多少行。最后,最终计数被写入输出文件output.txt
。这些组件 - cat
、grep
和wc
- 彼此一无所知。它们的设计并非彼此考虑。相反,它们只知道如何从stdin
读取和写入stdout
。这种数据的标准化使得很容易将复杂的解决方案从简单的原子组合起来。在这个例子中,cat
文件的行为将数据转换为任何了解stdin
的进程都可以读取的数据。它将入站数据适配为标准化格式,即字符串行。最后,重定向(>
)运算符将标准化数据(字符串行)转换为文件系统上的数据。它适配了它。管道(|
)字符用于表示一个组件的输出应该流向另一个组件的输入。
Spring Integration流程的工作方式相同:数据被标准化为Message<T>
实例。每个Message<T>
都有一个有效负载和标头 - Map<K,V>
中有效负载的元数据 - 它们是不同消息组件的输入和输出。这些消息组件通常由Spring Integration提供,但也很容易编写和使用您自己的组件。有各种各样的消息组件支持所有企业应用程序集成模式(过滤器、路由器、转换器、适配器、网关等)。Spring框架MessageChannel
是一个命名管道,Message<T>
通过它在消息组件之间流动。它们是管道,默认情况下,它们的工作方式有点像java.util.Queue
。数据输入,数据输出。
Spring Integration入站适配器将来自外部系统(RabbitMQ、Apache Kafka和JMS等消息队列、电子邮件、Twitter、文件系统挂载、物联网设备以及无数其他系统)的数据适配为Message<T>
。Spring Integration出站适配器反向执行相同的操作,获取Message<T>
并将它们写入外部系统(例如,作为电子邮件、推文或Apache Kafka消息)。
Spring Integration支持事件驱动架构,因为它可以帮助检测并响应外部世界的事件。例如,您可以使用Spring Integration每10秒轮询一次文件系统,并在出现新文件时发布一个Message<T>
。您可以使用Spring Integration充当传递到Apache Kafka主题的消息的侦听器。适配器负责响应外部事件,让您不必过多担心消息的来源,并让您专注于消息到达后如何处理它。它相当于依赖注入的集成!
依赖注入使组件代码无需担心资源初始化和获取,并使其能够专注于使用这些依赖项编写代码。javax.sql.DataSource
字段从哪里来的?谁在乎!Spring将其连接起来,它可能在测试中从Mock获得它,在经典应用程序服务器中从JNDI获得它,或者从配置的Spring Boot bean获得它。组件代码仍然不知道这些细节。大约15年前,当我们第一次开始讨论依赖注入时,我们会谈论“好莱坞原则”:“别打电话给我,我会打电话给你!”这更适用于Spring Integration!
入站网关接收来自外部系统的传入请求,将它们作为Message<T>
处理,并发送回复。出站网关获取Message<T>
,将它们转发到外部系统,并等待该系统的响应。它们支持请求和回复交互。
Activiti可用于根据已记录的、可审计的和可预测的状态来描述复杂的长期运行流程,而Spring Integration可用于,嗯,集成!Spring Integration是保存有趣Java代码的地方,Activiti跟踪总体状态。这个技巧在20年前很有用,在当今微服务的广泛分布式世界中也很有用,在微服务的广泛分布式世界中,单个请求的处理可能跨越多个服务和节点。因此,Spring Boot、Spring Integration和Activiti如此出色地协同工作很方便!
一个常见的用例是使用Activiti启动BPM流程,然后在进入等待状态时,将该状态的处理委托给Spring Integration,当然,Spring Integration可以将工作分发到其他系统。下面是一个简单的流程图。
BPM流程状态通常涉及人工操作。例如,工作流引擎可能有一个状态,其中文档被发送给人工审批,但审核人员正在休假,几周后才能回来。在这种情况下,一直保持线程打开,期待几毫秒或几秒钟内返回所需的确认信息来继续处理,是浪费资源的,更不用说危险了。
Activiti足够智能,可以在等待状态期间暂停执行,将执行状态持久化到数据库,并且只有在流程执行被_发出信号_后才恢复。向流程引擎发出信号会重新加载流程并恢复执行。一个简单的例子可能是一个新的用户注册业务流程,它委托给Spring Integration发送包含注册确认链接的电子邮件。用户可能需要几天时间才能点击确认链接,但点击后,系统应继续进行注册业务流程。
在这篇文章中,我们将介绍如何启动一个进入等待状态的BPM流程,然后委托给Spring Integration进行某种自定义处理,然后在遥远的将来恢复流程的执行。
我们将设置两个Spring Integration流程:一个处理来自Activiti到Spring Integration的请求,另一个处理最终的回复并触发流程的恢复。
我们需要一些东西来启动我们的流程,所以这里有一个简单的REST端点(https://127.0.0.1:8080/start
),每次启动一个新流程。
@RestController
class ProcessStartingRestController {
@Autowired
private ProcessEngine processEngine;
@RequestMapping(method = RequestMethod.GET, value = "/start")
Map<String, String> launch() {
ProcessInstance asyncProcess = this.processEngine.getRuntimeService()
.startProcessInstanceByKey("asyncProcess");
return Collections.singletonMap("executionId", asyncProcess.getId());
}
}
消息将在我们在@Configuration
类中创建的两个MessageChannel
之间流动:requests
和replies
。
@Configuration
class MessageChannels {
@Bean
DirectChannel requests() {
return new DirectChannel();
}
@Bean
DirectChannel replies() {
return new DirectChannel();
}
}
使用@Configuration
类的好处是它本身就是一个Spring组件,可以在任何地方注入。我们可以通过直接调用@Bean
提供程序方法来取消引用通道。另一种方法是在每次注入对其中一个通道的引用时使用@Qualifier
,例如:
public static final String REPLIES_CHANNEL_NAME = "replies";
@Autowired
@Qualifier(REPLIES_CHANNEL_NAME)
private MessageChannel replies;
我们的BPMN非常简单,但我们将使用Activiti特定的命名空间属性activiti:delegateExpression="#{gateway}"
来告诉Activiti调用一个名为gateway
的bean,该bean在Spring中注册。Activiti知道这样做是因为此应用程序使用Spring Boot的Activiti自动配置,该配置除了许多其他功能外,还将Spring管理的bean公开给Activiti表达式语言。gateway
是一个基于Activiti的ReceiveTaskActivityBehavior
类型的bean。Spring Boot同时具有Spring Integration和Activiti的自动配置,因此90%的繁琐设置都消失了。
让我们看看我们简单的gateway
组件,它是Activiti的ActivityBehavior
接口的实现,充当回调函数,我们可以在其中将Message<T>
发送到requests
通道并启动我们的Spring Integration流程。这里重要的是,我们捕获了executionId
,稍后我们将需要它来_恢复_或_发出信号_流程。
@Bean
ActivityBehavior gateway(MessageChannels channels) {
return new ReceiveTaskActivityBehavior() {
@Override
public void execute(ActivityExecution execution) throws Exception {
Message<?> executionMessage = MessageBuilder
.withPayload(execution)
.setHeader("executionId", execution.getId())
.build();
channels.requests().send(executionMessage);
}
};
}
Message<T>
将从requests
MessageChannel
的另一端弹出,需要一些东西来处理它。在一个复杂的例子中,将请求转换为有意义的消息,然后例如将其转发到其他系统(如电子邮件)是很简单的。在这里,我们只是打印出报头,以便我们可以记下executionId
并在以后使用它。
@Bean
IntegrationFlow requestsFlow(MessageChannels channels) {
return IntegrationFlows.from(channels.requests())
.handle(msg -> msg.getHeaders().entrySet()
.forEach(e -> log.info(e.getKey() + '=' + e.getValue())))
.get();
}
此时,工作流定义已被持久化,并且没有活动的流程实例。我们需要以某种方式异步地向它发出信号。我们将使用REST端点https://127.0.0.1:8080/resume/{executionId}
这样做。REST很容易理解,但实际上我们可以使用Spring Integration知道的任何外部系统的任何事件来实现此效果。唯一需要确保的是,无论外部事件是什么,我们都能以某种方式发送executionId
,就像我们在这里通过在URI中捕获它一样。
@RestController
class ProcessResumingRestController {
@Autowired
private MessageChannels messageChannels;
@RequestMapping(method = RequestMethod.GET, value = "/resume/{executionId}")
void resume(@PathVariable String executionId) {
Message<String> build = MessageBuilder.withPayload(executionId)
.setHeader("executionId", executionId)
.build();
this.messageChannels.replies().send(build);
}
}
当Message<T>
流经replies
MessageChannel
时,它将从另一端弹出,同样,需要一些东西来处理它。在这里,我们将使用另一个Spring Integration流程,该流程接收传入的Message<T>
并发出流程恢复的信号。执行此流程后,您将看到流程的下一步scriptTask
被评估,以及打印到控制台的“Moving on!”字样。
@Bean
IntegrationFlow repliesFlow(MessageChannels channels,
ProcessEngine engine) {
return IntegrationFlows.from(channels.replies())
.handle(msg -> engine.getRuntimeService().signal(
String.class.cast(msg.getHeaders().get("executionId"))))
.get();
}
我们已经涵盖了很多内容!我们同时使用BPM引擎和集成框架来处理流程,并且我知道这两种技术的正确交集使用是一个利基主题。这篇文章的目标是有效地将两者结合使用,在适用时充分发挥其作用。对于简单的集成,BPM会增加相当多的认知负担,但在业务流程描述必须对模型和业务分析师友好时,它提供了很大的价值。一个常见的误解是,一旦涉及BPM,开发人员就必须放弃增强系统的能力;并非如此!由于Spring Boot和Activiti团队的努力工作,Activiti与所有Spring完美配合。
真正的强大之处在于使用BPM来编排复杂的处理逻辑:想象一下,在BPM流程中遇到一个状态,然后调用Spring Batch作业,或者使用Spring Cloud中具有Ribbon负载均衡的RestTemplate
调用REST服务,或者将Message<T>
转发到Spring Cloud Data Flow流处理中。Spring Cloud Data Flow是我最喜欢的处理数据的方法之一,因为它构建在Spring Cloud Stream之上,而Spring Cloud Stream又构建在Spring Integration之上:一直都是MessageChannel
!
该示例的代码在线提供。