Activiti 6.0.0(带Spring引导)持有异步进程,不立即启动,当我发出进程启动命令时:(发出此命令时有打开的JPA事务,实体作为变量传递给进程)
ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(processId, variableMap);
5分钟后,它运行它没有任何错误。
日志:
15:17:01.158 DEBUG [http-nio-8080-exec-5] c.m.i.e.p.input.impl.ImportingPortImpl : Starting workflow process... [workflowName=batch-main]
<5 MINUTE TEA BREAK>
15:22:10.677 DEBUG [SimpleAsyncTaskExecutor-6] c.m.i.e.i.c.analizers.ContentAnalizers : Task really started..
这种情况大约发生在30%的情况下,在其他情况下,它立即开始,所以这是非常奇怪的。
如何解决此问题以在每种情况下立即运行而无需等待5分钟超时?
更多详细日志:
16:53:14.136 DEBUG [http-nio-8080-exec-2] c.m.i.e.r.a.i.IntellivectorImportAdapter : Getting content stream from multipart [multipartIndex=0;fileName=IMPORTED-4e578b1a-e7f6-4b5a-91d0-9e9a40f3328e.tiff;contentType=image/tiff]
16:53:14.137 DEBUG [http-nio-8080-exec-2] o.h.e.t.internal.TransactionImpl : begin
16:53:14.138 INFO [http-nio-8080-exec-2] jdbc.sqlonly : select count(RES.ID_) from ACT_RE_PROCDEF RES WHERE RES.KEY_ = 'batch-main'
16:53:14.142 INFO [http-nio-8080-exec-2] jdbc.sqlonly : select nextval ('iv_batch_id_seq')
16:53:14.144 INFO [http-nio-8080-exec-2] jdbc.sqlonly : select nextval ('iv_document_id_seq')
16:53:14.147 INFO [http-nio-8080-exec-2] jdbc.sqlonly : insert into iv_batch (created_date, id) values ('11/28/2017 16:53:14.142', 1245)
16:53:14.148 INFO [http-nio-8080-exec-2] jdbc.sqlonly : insert into iv_document (batch_id, created_date, id) values (1245, '11/28/2017 16:53:14.144', 1272)
16:53:14.150 INFO [http-nio-8080-exec-2] jdbc.sqlonly : insert into iv_document_contents (mime_type, storage_id, document_id, phase) values ('image/tiff', 'c74d06ef-3c92-4c22-8238-7e45e80d53e3', 1272, 'IMPORTED')
16:53:14.150 INFO [http-nio-8080-exec-2] jdbc.sqlonly : update iv_document_contents set index_=0 where document_id=1272 and phase='IMPORTED'
16:53:14.151 DEBUG [http-nio-8080-exec-2] c.m.i.e.p.input.impl.ImportingPortImpl : Imported a batch, starting workflow... [batchId=1245;documentCount=1;workflowName=batch-main]
16:53:14.151 INFO [http-nio-8080-exec-2] jdbc.sqlonly : select * from ACT_RE_PROCDEF where KEY_ = 'batch-main' and (TENANT_ID_ = '' or TENANT_ID_ is null) and VERSION_ = (select max(VERSION_) from ACT_RE_PROCDEF where KEY_ = 'batch-main' and (TENANT_ID_ = '' or TENANT_ID_ is null))
16:53:14.153 INFO [http-nio-8080-exec-2] jdbc.sqlonly : insert into ACT_HI_VARINST (ID_, PROC_INST_ID_, EXECUTION_ID_, TASK_ID_, NAME_, REV_, VAR_TYPE_, BYTEARRAY_ID_, DOUBLE_, LONG_ , TEXT_, TEXT2_, CREATE_TIME_, LAST_UPDATED_TIME_)
values ( '5048', '5047', '5047', NULL, 'batchId', 0, 'long', NULL, NULL, 1245, '1245', NULL, '11/28/2017 16:53:14.153', '11/28/2017 16:53:14.153' )
16:53:14.154 INFO [http-nio-8080-exec-2] jdbc.sqlonly : insert into ACT_HI_PROCINST ( ID_, PROC_INST_ID_, BUSINESS_KEY_, PROC_DEF_ID_, START_TIME_, END_TIME_, DURATION_, START_USER_ID_, START_ACT_ID_, END_ACT_ID_, SUPER_PROCESS_INSTANCE_ID_, DELETE_REASON_, TENANT_ID_, NAME_ )
values ( '5047', '5047', NULL, 'batch-main:3:5021', '11/28/2017 16:53:14.152', NULL, NULL, NULL, 'theStart', NULL, NULL, NULL, '', NULL )
16:53:14.156 INFO [http-nio-8080-exec-2] jdbc.sqlonly : insert into ACT_RU_EXECUTION (ID_, REV_, PROC_INST_ID_, BUSINESS_KEY_, PROC_DEF_ID_, ACT_ID_, IS_ACTIVE_, IS_CONCURRENT_, IS_SCOPE_,IS_EVENT_SCOPE_, IS_MI_ROOT_, PARENT_ID_, SUPER_EXEC_, ROOT_PROC_INST_ID_, SUSPENSION_STATE_, TENANT_ID_, NAME_, START_TIME_, START_USER_ID_, IS_COUNT_ENABLED_, EVT_SUBSCR_COUNT_, TASK_COUNT_, JOB_COUNT_, TIMER_JOB_COUNT_, SUSP_JOB_COUNT_, DEADLETTER_JOB_COUNT_, VAR_COUNT_, ID_LINK_COUNT_)
values ('5047', 1, '5047', NULL, 'batch-main:3:5021', NULL, 1, 0, 1, 0, 0, NULL, NULL, '5047', 1, '', NULL, '11/28/2017 16:53:14.152', NULL, 0, 0, 0, 0, 0, 0, 0, 0, 0) , ('5049', 1, '5047', NULL, 'batch-main:3:5021', 'theStart', 1, 0, 0, 0, 0, '5047', NULL, '5047', 1, '', NULL, '11/28/2017 16:53:14.153', NULL, 0, 0, 0, 0, 0, 0, 0, 0, 0)
16:53:14.157 INFO [http-nio-8080-exec-2] jdbc.sqlonly : insert into ACT_RU_VARIABLE (ID_, REV_, TYPE_, NAME_, PROC_INST_ID_, EXECUTION_ID_, TASK_ID_, BYTEARRAY_ID_, DOUBLE_, LONG_ , TEXT_, TEXT2_)
values ( '5048', 1, 'long', 'batchId', '5047', '5047', NULL, NULL, NULL, 1245, '1245', NULL )
16:53:14.159 INFO [http-nio-8080-exec-2] jdbc.sqlonly : insert into ACT_RU_JOB ( ID_, REV_, TYPE_, LOCK_OWNER_, LOCK_EXP_TIME_, EXCLUSIVE_, EXECUTION_ID_, PROCESS_INSTANCE_ID_, PROC_DEF_ID_, RETRIES_, EXCEPTION_STACK_ID_, EXCEPTION_MSG_, DUEDATE_, REPEAT_, HANDLER_TYPE_, HANDLER_CFG_, TENANT_ID_)
values ('5050', 1, 'message', 'e53309f8-8780-4849-be82-c9acfbffa338', '11/28/2017 16:58:14.153', 1, '5049', '5047', 'batch-main:3:5021', 3, NULL, NULL, NULL, NULL, 'async-continuation', NULL, '' )
16:53:14.160 DEBUG [http-nio-8080-exec-2] o.h.e.t.internal.TransactionImpl : begin
16:53:14.160 DEBUG [http-nio-8080-exec-2] o.h.e.t.internal.TransactionImpl : committing
16:53:14.160 DEBUG [http-nio-8080-exec-2] o.h.e.t.internal.TransactionImpl : committing
16:53:14.161 DEBUG [SimpleAsyncTaskExecutor-2] o.h.e.t.internal.TransactionImpl : begin
16:53:14.161 INFO [SimpleAsyncTaskExecutor-2] jdbc.sqlonly : select * from ACT_RU_EXECUTION where ID_ = '5049'
16:53:14.161 DEBUG [SimpleAsyncTaskExecutor-2] o.h.e.t.internal.TransactionImpl : committing
16:53:14.162 DEBUG [SimpleAsyncTaskExecutor-2] o.h.e.t.internal.TransactionImpl : begin
16:53:14.162 INFO [SimpleAsyncTaskExecutor-2] jdbc.sqlonly : select * from ACT_RU_JOB where ID_ = '5050'
16:53:14.163 DEBUG [SimpleAsyncTaskExecutor-2] o.h.e.t.internal.TransactionImpl : committing
16:53:14.163 DEBUG [SimpleAsyncTaskExecutor-2] o.h.e.t.internal.TransactionImpl : begin
16:53:14.163 INFO [SimpleAsyncTaskExecutor-2] jdbc.sqlonly : select * from ACT_RU_EXECUTION where ID_ = '5047'
16:53:14.164 DEBUG [SimpleAsyncTaskExecutor-2] o.h.e.t.internal.TransactionImpl : committing
16:53:22.492 DEBUG [activiti-acquire-async-jobs] o.h.e.t.internal.TransactionImpl : begin
16:53:22.493 INFO [activiti-acquire-async-jobs] jdbc.sqlonly : select RES.* from ACT_RU_JOB RES where LOCK_EXP_TIME_ is null LIMIT 1 OFFSET 0
16:53:22.493 DEBUG [activiti-acquire-timer-jobs] o.h.e.t.internal.TransactionImpl : begin
16:53:22.493 DEBUG [activiti-acquire-async-jobs] o.h.e.t.internal.TransactionImpl : committing
16:53:22.493 INFO [activiti-acquire-timer-jobs] jdbc.sqlonly : select RES.* from ACT_RU_TIMER_JOB RES where DUEDATE_ <= '11/28/2017 16:53:22.493' and LOCK_OWNER_ is null LIMIT 1 OFFSET 0
16:53:22.495 DEBUG [activiti-acquire-timer-jobs] o.h.e.t.internal.TransactionImpl : committing
16:53:22.495 DEBUG [activiti-acquire-timer-jobs] o.h.e.t.internal.TransactionImpl : begin
16:53:22.495 DEBUG [activiti-acquire-timer-jobs] o.h.e.t.internal.TransactionImpl : committing
<NOT STARTED.... >
异步作业执行器只在锁过期后执行作业
我已经将范围缩小到这个问题:选中“异步”和未选中“独占”的任务会导致延迟。我创建了一个展示行为的工作流。五个任务每个Hibernate一秒,并设置了“异步”和“独占”标志的各种组合。在下面的日志片段中,请注意任务“scripttask04_async”前的5分钟延迟以及ExecuteAsyncJobCmd记录的错误消息。
engine 17:25:48.337 DEBUG NfExecutionListener execution id: 60065, received event: start, task id: null, parent id: null, variables {admin=null}, local variables {admin=VariableInstanceEntity[id=60066, name=admin, type=null]}
engine 17:25:48.396 DEBUG ExecuteAsyncJobCmd Executing async job 60068
engine 17:25:48.407 DEBUG NfExecutionListener execution id: 60065, received event: start, task id: null, parent id: null, variables {admin=null}, local variables {admin=VariableInstanceEntity[id=60066, name=admin, type=null]}
engine 17:25:48.407 DEBUG ContinueProcessOperation Executing activityBehavior class org.activiti.engine.impl.bpmn.behavior.NoneStartEventActivityBehavior on activity 'debugHelperStart' with execution 60067
engine 17:25:48.411 DEBUG TakeOutgoingSequenceFlowsOperation Leaving flow node class org.activiti.bpmn.model.StartEvent with id 'debugHelperStart' by following it's 1 outgoing sequenceflow
engine 17:25:48.413 DEBUG ContinueProcessOperation Sequence flow 'flow1' encountered. Continuing process by following it using execution 60067
engine 17:25:48.414 DEBUG ContinueProcessOperation Executing activityBehavior class org.activiti.engine.impl.bpmn.behavior.ScriptTaskActivityBehavior on activity 'debugHelper01Task_sync_excl' with execution 60067
========================== === ==== before sleep debugHelper01Task_sync_excl
engine 17:25:49.308 INFO AnnotationConfigApplicationContext Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@40b5162b: startup date [Tue May 01 17:25:49 UTC 2018]; parent: org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@3439f68d
engine 17:25:49.407 INFO AutowiredAnnotationBeanPostProcessor JSR-330 'javax.inject.Inject' annotation found and supported for autowiring
engine 17:25:49.843 INFO ChainedDynamicProperty Flipping property: localhost.ribbon.ActiveConnectionsLimit to use NEXT property: niws.loadbalancer.availabilityFilteringRule.activeConnectionsLimit = 2147483647
========================== === ==== after sleep debugHelper01Task_sync_excl
engine 17:25:49.851 DEBUG TakeOutgoingSequenceFlowsOperation Leaving flow node class org.activiti.bpmn.model.ScriptTask with id 'debugHelper01Task_sync_excl' by following it's 1 outgoing sequenceflow
engine 17:25:49.851 DEBUG ContinueProcessOperation Sequence flow 'flow2' encountered. Continuing process by following it using execution 60067
engine 17:25:49.960 INFO BaseLoadBalancer Client: localhost instantiated a LoadBalancer: DynamicServerListLoadBalancer:{NFLoadBalancer:name=localhost,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:null
engine 17:25:49.974 INFO DynamicServerListLoadBalancer Using serverListUpdater PollingServerListUpdater
engine 17:25:49.986 INFO DynamicServerListLoadBalancer DynamicServerListLoadBalancer for client localhost initialized: DynamicServerListLoadBalancer:{NFLoadBalancer:name=localhost,current list of Servers=[],Load balancer stats=Zone stats: {},Server stats: []}ServerList:com.netflix.loadbalancer.ConfigurationBasedServerList@50af6398
engine 17:25:59.119 DEBUG ExecuteAsyncJobCmd Executing async job 60072
engine 17:25:59.122 DEBUG ContinueProcessOperation Executing activityBehavior class org.activiti.engine.impl.bpmn.behavior.ScriptTaskActivityBehavior on activity 'scripttask02_async_excl' with execution 60067
========================== === ==== before sleep scripttask02_async_excl
========================== === ==== after sleep scripttask02_async_excl
engine 17:26:00.132 DEBUG TakeOutgoingSequenceFlowsOperation Leaving flow node class org.activiti.bpmn.model.ScriptTask with id 'scripttask02_async_excl' by following it's 1 outgoing sequenceflow
engine 17:26:00.132 DEBUG ContinueProcessOperation Sequence flow 'flow3' encountered. Continuing process by following it using execution 60067
engine 17:26:00.132 DEBUG ContinueProcessOperation Executing activityBehavior class org.activiti.engine.impl.bpmn.behavior.ScriptTaskActivityBehavior on activity 'scripttask03_sync' with execution 60067
========================== === ==== before sleep scripttask03_sync
========================== === ==== after sleep scripttask03_sync
engine 17:26:01.136 DEBUG TakeOutgoingSequenceFlowsOperation Leaving flow node class org.activiti.bpmn.model.ScriptTask with id 'scripttask03_sync' by following it's 1 outgoing sequenceflow
engine 17:26:01.136 DEBUG ContinueProcessOperation Sequence flow 'flow4' encountered. Continuing process by following it using execution 60067
engine 17:26:01.148 DEBUG ExecuteAsyncJobCmd Job does not exist anymore and will not be executed. It has most likely been deleted as part of another concurrent part of the process instance.
engine 17:31:39.388 DEBUG ExecuteAsyncJobCmd Executing async job 60076
engine 17:31:39.392 DEBUG ContinueProcessOperation Executing activityBehavior class org.activiti.engine.impl.bpmn.behavior.ScriptTaskActivityBehavior on activity 'scripttask04_async' with execution 60067
========================== === ==== before sleep scripttask04_async
========================== === ==== after sleep scripttask04_async
engine 17:31:40.401 DEBUG TakeOutgoingSequenceFlowsOperation Leaving flow node class org.activiti.bpmn.model.ScriptTask with id 'scripttask04_async' by following it's 1 outgoing sequenceflow
engine 17:31:40.402 DEBUG ContinueProcessOperation Sequence flow 'flow5' encountered. Continuing process by following it using execution 60067
engine 17:31:40.422 DEBUG ExecuteAsyncJobCmd Executing async job 60078
engine 17:31:40.425 DEBUG ContinueProcessOperation Executing activityBehavior class org.activiti.engine.impl.bpmn.behavior.ScriptTaskActivityBehavior on activity 'scripttask05_async_excl' with execution 60067
========================== === ==== before sleep scripttask05_async_excl
========================== === ==== after sleep scripttask05_async_excl
engine 17:31:41.440 DEBUG TakeOutgoingSequenceFlowsOperation Leaving flow node class org.activiti.bpmn.model.ScriptTask with id 'scripttask05_async_excl' by following it's 1 outgoing sequenceflow
engine 17:31:41.441 DEBUG ContinueProcessOperation Sequence flow 'flow6' encountered. Continuing process by following it using execution 60067
engine 17:31:41.441 DEBUG ContinueProcessOperation Executing activityBehavior class org.activiti.engine.impl.bpmn.behavior.NoneEndEventActivityBehavior on activity 'debugHelperEnd' with execution 60067
engine 17:31:41.441 DEBUG TakeOutgoingSequenceFlowsOperation Leaving flow node class org.activiti.bpmn.model.EndEvent with id 'debugHelperEnd' by following it's 0 outgoing sequenceflow
engine 17:31:41.441 DEBUG TakeOutgoingSequenceFlowsOperation No outgoing sequence flow found for flow node 'debugHelperEnd'.
engine 17:31:41.476 DEBUG NfExecutionListener execution id: 60065, received event: end, task id: null, parent id: null, variables {admin=null}, local variables {admin=VariableInstanceEntity[id=60066, name=admin, type=null]}
我怀疑打开的事务与ExecuteAsyncJobCmd
结合是导致此问题的原因。来自命令的代码库:
// We need to refetch the job, as it could have been deleted by another concurrent job
// For exampel: an embedded subprocess with a couple of async tasks and a timer on the boundary of the subprocess
// when the timer fires, all executions and thus also the jobs inside of the embedded subprocess are destroyed.
// However, the async task jobs could already have been fetched and put in the queue.... while in reality they have been deleted.
// A refetch is thus needed here to be sure that it exists for this transaction.
Job job = commandContext.getJobEntityManager().findById(jobId);
if (job == null) {
log.debug("Job does not exist anymore and will not be executed. It has most likely been deleted "
+ "as part of another concurrent part of the process instance.");
return null;
}
这是我们在使用Activiti Spring Boot Starter中提供的默认进程引擎配置测试内存db和异步作业时遇到的确切行为。
我也有同样的问题,我想知道为什么会发生这种情况,所以我深入研究了Activiti 6.0的源代码。
事实证明,这是一个并发bug,它与事务处理有关,特别是在您使用actititi-Spring时。
在这里查看GitHub上的问题:https://github.com/Activiti/Activiti/issues/2666
具体来说,当一个作业导致创建另一个作业时,Activiti的异步执行器有可能在将作业实体插入数据库的事务提交之前开始执行该作业。由于命令的执行方式和事务包装的完成,这种情况特别发生在actititi-Spring中。
我目前没有一个好的解决方法(避免它的一种方法是不使用actititi-Spring,但如果您需要在应用程序中集成Spring,这是不可行的)。