/* * Copyright 2017 Graz University of Technology EAAF-Core Components has been developed in a * cooperation between EGIZ, A-SIT Plus, A-SIT, and Graz University of Technology. * * Licensed under the EUPL, Version 1.2 or - as soon they will be approved by the European * Commission - subsequent versions of the EUPL (the "Licence"); You may not use this work except in * compliance with the Licence. You may obtain a copy of the Licence at: * https://joinup.ec.europa.eu/news/understanding-eupl-v12 * * Unless required by applicable law or agreed to in writing, software distributed under the Licence * is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express * or implied. See the Licence for the specific language governing permissions and limitations under * the Licence. * * This product combines work with different licenses. See the "NOTICE" text file for details on the * various modules and licenses. The "NOTICE" text file is part of the distribution. Any derivative * works that you distribute must include a readable copy of the "NOTICE" text file. */ package at.gv.egiz.eaaf.core.impl.idp.process; import java.io.InputStream; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.collections4.IterableUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import at.gv.egiz.eaaf.core.api.IRequest; import at.gv.egiz.eaaf.core.api.data.EaafConstants; import at.gv.egiz.eaaf.core.api.idp.process.ExecutionContext; import at.gv.egiz.eaaf.core.api.idp.process.ExpressionEvaluationContext; import at.gv.egiz.eaaf.core.api.idp.process.ExpressionEvaluator; import at.gv.egiz.eaaf.core.api.idp.process.ProcessEngine; import at.gv.egiz.eaaf.core.api.idp.process.ProcessInstanceStoreDao; import at.gv.egiz.eaaf.core.api.idp.process.Task; import at.gv.egiz.eaaf.core.exceptions.EaafException; import at.gv.egiz.eaaf.core.exceptions.ProcessExecutionException; import at.gv.egiz.eaaf.core.impl.idp.process.dao.ProcessInstanceStore; import at.gv.egiz.eaaf.core.impl.idp.process.model.EndEvent; import at.gv.egiz.eaaf.core.impl.idp.process.model.ProcessDefinition; import at.gv.egiz.eaaf.core.impl.idp.process.model.ProcessNode; import at.gv.egiz.eaaf.core.impl.idp.process.model.StartEvent; import at.gv.egiz.eaaf.core.impl.idp.process.model.TaskInfo; import at.gv.egiz.eaaf.core.impl.idp.process.model.Transition; /** * Process engine implementation allowing starting and continuing processes as * well as providing means for cleanup actions. */ public class ProcessEngineImpl implements ProcessEngine { private final Logger log = LoggerFactory.getLogger(getClass()); @Autowired ProcessInstanceStoreDao piStoreDao; @Autowired ApplicationContext context; private final ProcessDefinitionParser pdp = new ProcessDefinitionParser(); private final Map processDefinitions = new ConcurrentHashMap<>(); private static final String MDC_CTX_PI_NAME = "processInstanceId"; private static final String MDC_CTX_TASK_NAME = "taskId"; private ExpressionEvaluator transitionConditionExpressionEvaluator; @Override public void registerProcessDefinition(final ProcessDefinition processDefinition) { log.info("Registering process definition '{}'.", processDefinition.getId()); processDefinitions.put(processDefinition.getId(), processDefinition); } @Override public String registerProcessDefinition(final InputStream processDefinitionInputStream) throws ProcessDefinitionParserException { final ProcessDefinition pd = pdp.parse(processDefinitionInputStream); postValidationOfProcessDefintion(pd); registerProcessDefinition(pd); return pd.getId(); } /** * Sets the process definitions. * * @param processDefinitions The process definitions. * @throws IllegalArgumentException In case the process definitions contain * definitions with the same identifier. */ public void setProcessDefinitions(final Iterable processDefinitions) { this.processDefinitions.clear(); for (final ProcessDefinition pd : processDefinitions) { if (this.processDefinitions.containsKey(pd.getId())) { throw new IllegalArgumentException( "Duplicate process definition identifier '" + pd.getId() + "'."); } registerProcessDefinition(pd); } } /** * Sets an expression evaluator that should be used to process transition * condition expressions. * * @param transitionConditionExpressionEvaluator The expression evaluator. */ public void setTransitionConditionExpressionEvaluator( final ExpressionEvaluator transitionConditionExpressionEvaluator) { this.transitionConditionExpressionEvaluator = transitionConditionExpressionEvaluator; } @Override public String createProcessInstance(final String processDefinitionId, final ExecutionContext executionContext) throws ProcessExecutionException { // look for respective process definition final ProcessDefinition pd = processDefinitions.get(processDefinitionId); if (pd == null) { throw new ProcessExecutionException( "Unable to find process definition for process '" + processDefinitionId + "'."); } // create and keep process instance final ProcessInstance pi = new ProcessInstance(pd, executionContext); log.info("Creating process instance from process definition '{}': {}", processDefinitionId, pi.getId()); try { saveOrUpdateProcessInstance(pi); } catch (final EaafException e) { throw new ProcessExecutionException("Unable to persist process instance.", e); } return pi.getId(); } @Override public String createProcessInstance(final String processDefinitionId) throws ProcessExecutionException { return createProcessInstance(processDefinitionId, null); } @Override public void start(final IRequest pendingReq) throws ProcessExecutionException { try { if (StringUtils.isEmpty(pendingReq.getProcessInstanceId())) { log.error("Pending-request with id:" + pendingReq.getPendingRequestId() + " includes NO 'ProcessInstanceId'"); throw new ProcessExecutionException("Pending-request with id:" + pendingReq.getPendingRequestId() + " includes NO 'ProcessInstanceId'"); } final ProcessInstance pi = loadProcessInstance(pendingReq.getProcessInstanceId()); if (pi == null) { throw new ProcessExecutionException( "Process instance '" + pendingReq.getProcessInstanceId() + "' does not exist."); } MDC.put(MDC_CTX_PI_NAME, pi.getId()); if (!ProcessInstanceState.NOT_STARTED.equals(pi.getState())) { throw new ProcessExecutionException("Process instance '" + pi.getId() + "' has already been started (current state is " + pi.getState() + ")."); } log.info("Starting process instance '{}'.", pi.getId()); // execute process pi.setState(ProcessInstanceState.STARTED); execute(pi, pendingReq); // store ProcessInstance if it is not already ended if (!ProcessInstanceState.ENDED.equals(pi.getState())) { saveOrUpdateProcessInstance(pi); } } catch (final EaafException e) { throw new ProcessExecutionException("Unable to load/save process instance.", e); } finally { MDC.remove(MDC_CTX_PI_NAME); } } @Override public void signal(final IRequest pendingReq) throws ProcessExecutionException { try { if (StringUtils.isEmpty(pendingReq.getProcessInstanceId())) { log.error("Pending-request with id:" + pendingReq.getPendingRequestId() + " includes NO 'ProcessInstanceId'"); throw new ProcessExecutionException("Pending-request with id:" + pendingReq.getPendingRequestId() + " includes NO 'ProcessInstanceId'"); } final ProcessInstance pi = loadProcessInstance(pendingReq.getProcessInstanceId()); if (pi == null) { throw new ProcessExecutionException( "Process instance '" + pendingReq.getProcessInstanceId() + "' does not exist."); } MDC.put(MDC_CTX_PI_NAME, pi.getId()); if (!ProcessInstanceState.SUSPENDED.equals(pi.getState())) { throw new ProcessExecutionException("Process instance '" + pi.getId() + "' has not been suspended (current state is " + pi.getState() + ")."); } log.debug("Waking up process instance '{}'.", pi.getId()); pi.setState(ProcessInstanceState.STARTED); // put pending-request ID on execution-context because it could be changed pi.getExecutionContext().put(EaafConstants.PARAM_HTTP_TARGET_PENDINGREQUESTID, pendingReq.getPendingRequestId()); execute(pi, pendingReq); // store ProcessInstance if it is not already ended if (!ProcessInstanceState.ENDED.equals(pi.getState())) { saveOrUpdateProcessInstance(pi); } } catch (final EaafException e) { throw new ProcessExecutionException("Unable to load/save process instance.", e); } finally { MDC.remove(MDC_CTX_PI_NAME); } } /** * Instantiates a task implementation given by a {@link TaskInfo}. * * @param ti The task info. * @return A Task implementation or {@code null} if the task info does not * reference any task implementing classes. * @throws ProcessExecutionException Thrown in case of error (when the * referenced class does not implement * {@link Task} for instance). */ private Task createTaskInstance(final TaskInfo ti) throws ProcessExecutionException { final String clazz = StringUtils.trimToNull(ti.getTaskImplementingClass()); Task task = null; if (clazz != null) { log.debug("Instantiating task implementing class '{}'.", clazz); Object instanceClass = null; try { instanceClass = context.getBean(clazz); } catch (final Exception e) { throw new ProcessExecutionException( "Unable to get class '" + clazz + "' associated with task '" + ti.getId() + "' .", e); } if (instanceClass == null || !(instanceClass instanceof Task)) { throw new ProcessExecutionException("Class '" + clazz + "' associated with task '" + ti.getId() + "' is not assignable to " + Task.class.getName() + "."); } try { task = (Task) instanceClass; } catch (final Exception e) { throw new ProcessExecutionException("Unable to instantiate class '" + clazz + "' associated with task '" + ti.getId() + "' .", e); } } return task; } /** * Starts/executes a given process instance. * * @param pi The process instance. * @param pendingReq current pending request * @throws ProcessExecutionException Thrown in case of error. */ private void execute(final ProcessInstance pi, IRequest pendingReq) throws ProcessExecutionException { if (ProcessInstanceState.ENDED.equals(pi.getState())) { throw new ProcessExecutionException( "Process for instance '" + pi.getId() + "' has already been ended."); } final ProcessDefinition pd = pi.getProcessDefinition(); final ProcessNode processNode = pd.getProcessNode(pi.getNextId()); log.debug("Processing node '{}'.", processNode.getId()); // distinguish process node types StartEvent, TaskInfo and EndEvent if (processNode instanceof TaskInfo) { // TaskInfo types need to be executed final TaskInfo ti = (TaskInfo) processNode; MDC.put(MDC_CTX_TASK_NAME, ti.getId()); try { log.debug("Processing task '{}'.", ti.getId()); final Task task = createTaskInstance(ti); if (task != null) { try { log.debug("Executing task implementation for task '{}'.", ti.getId()); log.trace("Execution context before task execution: {}", pi.getExecutionContext().keySet()); pendingReq = task.execute(pendingReq, pi.getExecutionContext()); log.debug("Returned from execution of task '{}'.", ti.getId()); log.trace("Execution context after task execution: {}", pi.getExecutionContext().keySet()); } catch (final Throwable t) { throw new ProcessExecutionException("Error executing task '" + ti.getId() + "'.", t); } // check if process was cancelled dynamically by task if (pi.getExecutionContext().isProcessCancelled()) { log.debug("Processing task '{}' was cancelled by Task: '{}'.", pi.getId(), ti.getId()); processFinishEvent(pi); return; } } else { log.debug("No task implementing class set."); } } finally { MDC.remove(MDC_CTX_TASK_NAME); } } else if (processNode instanceof EndEvent) { processFinishEvent(pi); return; } final ExpressionEvaluationContext expressionContext = new ExpressionEvaluationContextImpl(pi); // traverse pointer final Transition t = IterableUtils.find(processNode.getOutgoingTransitions(), transition -> { if (transitionConditionExpressionEvaluator != null && transition.getConditionExpression() != null) { log.trace("Evaluating transition expression '{}'.", transition.getConditionExpression()); return transitionConditionExpressionEvaluator.evaluate(expressionContext, transition.getConditionExpression()); } return true; }); if (t == null) { throw new ProcessExecutionException( "No valid transition starting from process node '" + processNode.getId() + "'."); } log.trace("Found suitable transition: {}", t); // update pointer log.trace("Shifting process token from '{}' to '{}'.", pi.getNextId(), t.getTo().getId()); pi.setNextId(t.getTo().getId()); // inspect current task if (t.getTo() instanceof TaskInfo && ((TaskInfo) t.getTo()).isAsync()) { // immediately return in case of asynchonous task log.debug("Suspending process instance '{}' for asynchronous task '{}'.", pi.getId(), t.getTo().getId()); pi.setState(ProcessInstanceState.SUSPENDED); return; } // continue execution in case of StartEvent or Task if (processNode instanceof StartEvent || processNode instanceof TaskInfo) { execute(pi, pendingReq); } } @Override public ProcessInstance getProcessInstance(final String processInstanceId) { ProcessInstance processInstance; try { processInstance = loadProcessInstance(processInstanceId); } catch (final EaafException e) { throw new RuntimeException( "The process instance '" + processInstanceId + "' could not be retrieved.", e); } if (processInstance == null) { throw new IllegalArgumentException( "The process instance '" + processInstanceId + "' does not/no longer exist."); } return processInstance; } /** * Persists a {@link ProcessInstance} to the database. * * @param processInstance The object to persist. * @throws MOADatabaseException Thrown if an error occurs while accessing the * database. */ private void saveOrUpdateProcessInstance(final ProcessInstance processInstance) throws EaafException { final ProcessInstanceStore store = new ProcessInstanceStore(); final ExecutionContext ctx = processInstance.getExecutionContext(); final Map ctxData = new HashMap<>(); for (final String key : ctx.keySet()) { ctxData.put(key, ctx.get(key)); } store.setExecutionContextData(ctxData); store.setNextTaskId(processInstance.getNextId()); store.setProcessDefinitionId(processInstance.getProcessDefinition().getId()); store.setProcessInstanceId(processInstance.getId()); store.setProcessState(processInstance.getState()); piStoreDao.saveOrUpdate(store); } /** * Load a {@link ProcessInstance} with a certain id from the database. * * @param processInstanceId The process instance id * @return The process instance corresponding to the id or {@code null} if no * such object is found. * @throws MOADatabaseException Thrown if an error occurs while accessing the * database. */ private ProcessInstance loadProcessInstance(final String processInstanceId) throws EaafException { final ProcessInstanceStore piStore = piStoreDao.load(processInstanceId); if (piStore == null) { return null; } final ExecutionContext executionContext = new ExecutionContextImpl(piStore.getProcessInstanceId()); final Map executionContextData = piStore.getExecutionContextData(); for (final Entry el : executionContextData.entrySet()) { executionContext.put(el.getKey(), el.getValue()); } final ProcessInstance pi = new ProcessInstance( processDefinitions.get(piStore.getProcessDefinitionId()), executionContext); pi.setNextId(piStore.getNextTaskId()); pi.setState(piStore.getProcessState()); return pi; } /* * (non-Javadoc) * * @see * at.gv.egovernment.moa.id.process.ProcessEngine#deleteProcessInstance(java. * lang.String) */ @Override public void deleteProcessInstance(final String processInstanceId) throws ProcessExecutionException { if (StringUtils.isEmpty(processInstanceId)) { throw new ProcessExecutionException( "Unable to remove process instance: ProcessInstanceId is empty"); } try { piStoreDao.remove(processInstanceId); } catch (final EaafException e) { throw new ProcessExecutionException("Unable to remove process instance.", e); } } /** * Finish a process-flow and remove any process-flow related information. * * @param pi current process instance * @throws ProcessExecutionException In case of an process error */ private void processFinishEvent(final ProcessInstance pi) throws ProcessExecutionException { log.info("Finishing process instance '{}'.", pi.getId()); try { piStoreDao.remove(pi.getId()); } catch (final EaafException e) { throw new ProcessExecutionException("Unable to remove process instance.", e); } pi.setState(ProcessInstanceState.ENDED); log.debug("Final process context: {}", pi.getExecutionContext().keySet()); } /** * Perform some post-validation operations on process definition. * *

* Like: check if all tasks that are defined are available on context *

* * @param pd current process definition * @throws ProcessDefinitionParserException In case of a parser error */ private void postValidationOfProcessDefintion(final ProcessDefinition pd) throws ProcessDefinitionParserException { try { for (final TaskInfo task : pd.getTaskInfos().values()) { createTaskInstance(task); } } catch (final ProcessExecutionException e) { log.error("Post-validation of process definition: {} find an error: {}", pd.getId(), e.getMessage()); throw new ProcessDefinitionParserException( "Post-validation find an error in process definition:" + pd.getId(), e); } } }