/******************************************************************************* *******************************************************************************/ package at.gv.egiz.eaaf.core.impl.idp.process; import java.io.InputStream; import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.Predicate; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import at.gv.egiz.eaaf.core.api.IRequest; import at.gv.egiz.eaaf.core.api.data.EAAFConstants; import at.gv.egiz.eaaf.core.api.idp.process.ExecutionContext; import at.gv.egiz.eaaf.core.api.idp.process.ExpressionEvaluationContext; import at.gv.egiz.eaaf.core.api.idp.process.ExpressionEvaluator; import at.gv.egiz.eaaf.core.api.idp.process.ProcessEngine; import at.gv.egiz.eaaf.core.api.idp.process.ProcessInstanceStoreDAO; import at.gv.egiz.eaaf.core.api.idp.process.Task; import at.gv.egiz.eaaf.core.exceptions.EAAFException; import at.gv.egiz.eaaf.core.exceptions.ProcessExecutionException; import at.gv.egiz.eaaf.core.impl.idp.process.dao.ProcessInstanceStore; import at.gv.egiz.eaaf.core.impl.idp.process.model.EndEvent; import at.gv.egiz.eaaf.core.impl.idp.process.model.ProcessDefinition; import at.gv.egiz.eaaf.core.impl.idp.process.model.ProcessNode; import at.gv.egiz.eaaf.core.impl.idp.process.model.StartEvent; import at.gv.egiz.eaaf.core.impl.idp.process.model.TaskInfo; import at.gv.egiz.eaaf.core.impl.idp.process.model.Transition; /** * Process engine implementation allowing starting and continuing processes as well as providing means for cleanup actions. */ public class ProcessEngineImpl implements ProcessEngine { private Logger log = LoggerFactory.getLogger(getClass()); @Autowired ProcessInstanceStoreDAO piStoreDao; @Autowired ApplicationContext context; private ProcessDefinitionParser pdp = new ProcessDefinitionParser(); private Map processDefinitions = new ConcurrentHashMap(); private final static String MDC_CTX_PI_NAME = "processInstanceId"; private final static String MDC_CTX_TASK_NAME = "taskId"; private ExpressionEvaluator transitionConditionExpressionEvaluator; @Override public void registerProcessDefinition(ProcessDefinition processDefinition) { log.info("Registering process definition '{}'.", processDefinition.getId()); processDefinitions.put(processDefinition.getId(), processDefinition); } @Override public String registerProcessDefinition(InputStream processDefinitionInputStream) throws ProcessDefinitionParserException{ ProcessDefinition pd = pdp.parse(processDefinitionInputStream); registerProcessDefinition(pd); return pd.getId(); } /** * Sets the process definitions. * * @param processDefinitions * The process definitions. * @throws IllegalArgumentException * In case the process definitions contain definitions with the same identifier. */ public void setProcessDefinitions(Iterable processDefinitions) { this.processDefinitions.clear(); for (ProcessDefinition pd : processDefinitions) { if (this.processDefinitions.containsKey(pd.getId())) { throw new IllegalArgumentException("Duplicate process definition identifier '" + pd.getId() + "'."); } registerProcessDefinition(pd); } } /** * Sets an expression evaluator that should be used to process transition condition expressions. * @param transitionConditionExpressionEvaluator The expression evaluator. */ public void setTransitionConditionExpressionEvaluator( ExpressionEvaluator transitionConditionExpressionEvaluator) { this.transitionConditionExpressionEvaluator = transitionConditionExpressionEvaluator; } @Override public String createProcessInstance(String processDefinitionId, ExecutionContext executionContext) throws ProcessExecutionException { // look for respective process definition ProcessDefinition pd = processDefinitions.get(processDefinitionId); if (pd == null) { throw new ProcessExecutionException("Unable to find process definition for process '" + processDefinitionId + "'."); } // create and keep process instance ProcessInstance pi = new ProcessInstance(pd, executionContext); log.info("Creating process instance from process definition '{}': {}", processDefinitionId, pi.getId()); try { saveOrUpdateProcessInstance(pi); } catch (EAAFException e) { throw new ProcessExecutionException("Unable to persist process instance.", e); } return pi.getId(); } @Override public String createProcessInstance(String processDefinitionId) throws ProcessExecutionException { return createProcessInstance(processDefinitionId, null); } @Override public void start(IRequest pendingReq) throws ProcessExecutionException { try { if (StringUtils.isEmpty(pendingReq.getProcessInstanceId())) { log.error("Pending-request with id:" + pendingReq.getPendingRequestId() + " includes NO 'ProcessInstanceId'"); throw new ProcessExecutionException("Pending-request with id:" + pendingReq.getPendingRequestId() + " includes NO 'ProcessInstanceId'"); } ProcessInstance pi = loadProcessInstance(pendingReq.getProcessInstanceId()); if (pi == null ) { throw new ProcessExecutionException("Process instance '" + pendingReq.getProcessInstanceId() + "' does not exist."); } MDC.put(MDC_CTX_PI_NAME, pi.getId()); if (!ProcessInstanceState.NOT_STARTED.equals(pi.getState())) { throw new ProcessExecutionException("Process instance '" + pi.getId() + "' has already been started (current state is " + pi.getState() + ")."); } log.info("Starting process instance '{}'.", pi.getId()); // execute process pi.setState(ProcessInstanceState.STARTED); execute(pi, pendingReq); //store ProcessInstance if it is not already ended if (!ProcessInstanceState.ENDED.equals(pi.getState())) saveOrUpdateProcessInstance(pi); } catch (EAAFException e) { throw new ProcessExecutionException("Unable to load/save process instance.", e); } finally { MDC.remove(MDC_CTX_PI_NAME); } } @Override public void signal(IRequest pendingReq) throws ProcessExecutionException { try { if (StringUtils.isEmpty(pendingReq.getProcessInstanceId())) { log.error("Pending-request with id:" + pendingReq.getPendingRequestId() + " includes NO 'ProcessInstanceId'"); throw new ProcessExecutionException("Pending-request with id:" + pendingReq.getPendingRequestId() + " includes NO 'ProcessInstanceId'"); } ProcessInstance pi = loadProcessInstance(pendingReq.getProcessInstanceId()); if (pi == null ) { throw new ProcessExecutionException("Process instance '" + pendingReq.getProcessInstanceId() + "' does not exist."); } MDC.put(MDC_CTX_PI_NAME, pi.getId()); if (!ProcessInstanceState.SUSPENDED.equals(pi.getState())) { throw new ProcessExecutionException("Process instance '" + pi.getId() + "' has not been suspended (current state is " + pi.getState() + ")."); } log.info("Waking up process instance '{}'.", pi.getId()); pi.setState(ProcessInstanceState.STARTED); //put pending-request ID on execution-context because it could be changed pi.getExecutionContext().put(EAAFConstants.PARAM_HTTP_TARGET_PENDINGREQUESTID, pendingReq.getPendingRequestId()); execute(pi, pendingReq); //store ProcessInstance if it is not already ended if (!ProcessInstanceState.ENDED.equals(pi.getState())) saveOrUpdateProcessInstance(pi); } catch (EAAFException e) { throw new ProcessExecutionException("Unable to load/save process instance.", e); } finally { MDC.remove(MDC_CTX_PI_NAME); } } /** * Instantiates a task implementation given by a {@link TaskInfo}. * @param ti The task info. * @return A Task implementation or {@code null} if the task info does not reference any task implementing classes. * @throws ProcessExecutionException Thrown in case of error (when the referenced class does not implement {@link Task} for instance). */ private Task createTaskInstance(TaskInfo ti) throws ProcessExecutionException { String clazz = StringUtils.trimToNull(ti.getTaskImplementingClass()); Task task = null; if (clazz != null) { log.debug("Instantiating task implementing class '{}'.", clazz); Object instanceClass = null; try { instanceClass = context.getBean(clazz); } catch (Exception e) { throw new ProcessExecutionException("Unable to get class '" + clazz + "' associated with task '" + ti.getId() + "' .", e); } if (instanceClass == null || !(instanceClass instanceof Task)) { throw new ProcessExecutionException("Class '" + clazz + "' associated with task '" + ti.getId() + "' is not assignable to " + Task.class.getName() + "."); } try { task = (Task) instanceClass; } catch (Exception e) { throw new ProcessExecutionException("Unable to instantiate class '" + clazz + "' associated with task '" + ti.getId() + "' .", e); } } return task; } /** * Starts/executes a given process instance. * @param pi The process instance. * @param pendingReq * @throws ProcessExecutionException Thrown in case of error. */ private void execute(final ProcessInstance pi, IRequest pendingReq) throws ProcessExecutionException { if (ProcessInstanceState.ENDED.equals(pi.getState())) { throw new ProcessExecutionException("Process for instance '" + pi.getId() + "' has already been ended."); } ProcessDefinition pd = pi.getProcessDefinition(); ProcessNode processNode = pd.getProcessNode(pi.getNextId()); log.debug("Processing node '{}'.", processNode.getId()); // distinguish process node types StartEvent, TaskInfo and EndEvent if (processNode instanceof TaskInfo) { // TaskInfo types need to be executed TaskInfo ti = (TaskInfo) processNode; MDC.put(MDC_CTX_TASK_NAME, ti.getId()); try { log.info("Processing task '{}'.", ti.getId()); Task task = createTaskInstance(ti); if (task != null) { try { log.info("Executing task implementation for task '{}'.", ti.getId()); log.debug("Execution context before task execution: {}", pi.getExecutionContext().keySet()); pendingReq = task.execute(pendingReq, pi.getExecutionContext()); log.info("Returned from execution of task '{}'.", ti.getId()); log.debug("Execution context after task execution: {}", pi.getExecutionContext().keySet()); } catch (Throwable t) { throw new ProcessExecutionException("Error executing task '" + ti.getId() + "'.", t); } } else { log.debug("No task implementing class set."); } } finally { MDC.remove(MDC_CTX_TASK_NAME); } } else if (processNode instanceof EndEvent) { log.info("Finishing process instance '{}'.", pi.getId()); try { piStoreDao.remove(pi.getId()); } catch (EAAFException e) { throw new ProcessExecutionException("Unable to remove process instance.", e); } pi.setState(ProcessInstanceState.ENDED); log.debug("Final process context: {}", pi.getExecutionContext().keySet()); return; } final ExpressionEvaluationContext expressionContext = new ExpressionEvaluationContextImpl(pi); // traverse pointer Transition t = CollectionUtils.find(processNode.getOutgoingTransitions(), new Predicate() { @Override public boolean evaluate(Transition transition) { if (transitionConditionExpressionEvaluator != null && transition.getConditionExpression() != null) { log.trace("Evaluating transition expression '{}'.", transition.getConditionExpression()); return transitionConditionExpressionEvaluator.evaluate(expressionContext, transition.getConditionExpression()); } return true; } }); if (t == null) { throw new ProcessExecutionException("No valid transition starting from process node '" + processNode.getId()+ "'."); } log.trace("Found suitable transition: {}", t); // update pointer log.trace("Shifting process token from '{}' to '{}'.", pi.getNextId(), t.getTo().getId()); pi.setNextId(t.getTo().getId()); // inspect current task if (t.getTo() instanceof TaskInfo && (((TaskInfo) t.getTo()).isAsync())) { // immediately return in case of asynchonous task log.info("Suspending process instance '{}' for asynchronous task '{}'.", pi.getId(), t.getTo().getId()); pi.setState(ProcessInstanceState.SUSPENDED); return; } // continue execution in case of StartEvent or Task if (processNode instanceof StartEvent || processNode instanceof TaskInfo) { execute(pi, pendingReq); } } @Override public ProcessInstance getProcessInstance(String processInstanceId) { ProcessInstance processInstance; try { processInstance = loadProcessInstance(processInstanceId); } catch (EAAFException e) { throw new RuntimeException("The process instance '" + processInstanceId + "' could not be retrieved.", e); } if (processInstance == null) { throw new IllegalArgumentException("The process instance '" + processInstanceId + "' does not/no longer exist."); } return processInstance; } /** * Persists a {@link ProcessInstance} to the database. * @param processInstance The object to persist. * @throws MOADatabaseException Thrown if an error occurs while accessing the database. */ private void saveOrUpdateProcessInstance(ProcessInstance processInstance) throws EAAFException { ProcessInstanceStore store = new ProcessInstanceStore(); ExecutionContext ctx = processInstance.getExecutionContext(); Map ctxData = new HashMap(); for (String key : ctx.keySet()) { ctxData.put(key, ctx.get(key)); } store.setExecutionContextData(ctxData); store.setNextTaskId(processInstance.getNextId()); store.setProcessDefinitionId(processInstance.getProcessDefinition().getId()); store.setProcessInstanceId(processInstance.getId()); store.setProcessState(processInstance.getState()); piStoreDao.saveOrUpdate(store); } /** * Load a {@link ProcessInstance} with a certain id from the database. * @param processInstanceId The process instance id * @return The process instance corresponding to the id or {@code null} if no such object is found. * @throws MOADatabaseException Thrown if an error occurs while accessing the database. */ private ProcessInstance loadProcessInstance(String processInstanceId) throws EAAFException { ProcessInstanceStore piStore = piStoreDao.load(processInstanceId); if (piStore == null) { return null; } ExecutionContext executionContext = new ExecutionContextImpl(piStore.getProcessInstanceId()); Map executionContextData = piStore.getExecutionContextData(); for (String key : executionContextData.keySet()) { executionContext.put(key, executionContextData.get(key)); } ProcessInstance pi = new ProcessInstance(processDefinitions.get(piStore.getProcessDefinitionId()), executionContext); pi.setNextId(piStore.getNextTaskId()); pi.setState(piStore.getProcessState()); return pi; } /* (non-Javadoc) * @see at.gv.egovernment.moa.id.process.ProcessEngine#deleteProcessInstance(java.lang.String) */ @Override public void deleteProcessInstance(String processInstanceId) throws ProcessExecutionException { if (StringUtils.isEmpty(processInstanceId)) { throw new ProcessExecutionException("Unable to remove process instance: ProcessInstanceId is empty"); } try { piStoreDao.remove(processInstanceId); } catch (EAAFException e) { throw new ProcessExecutionException("Unable to remove process instance.", e); } } }