diff options
Diffstat (limited to 'eaaf_core/src/main/java/at/gv/egiz/eaaf/core/impl/idp/process/ProcessEngineImpl.java')
-rw-r--r-- | eaaf_core/src/main/java/at/gv/egiz/eaaf/core/impl/idp/process/ProcessEngineImpl.java | 424 |
1 files changed, 424 insertions, 0 deletions
diff --git a/eaaf_core/src/main/java/at/gv/egiz/eaaf/core/impl/idp/process/ProcessEngineImpl.java b/eaaf_core/src/main/java/at/gv/egiz/eaaf/core/impl/idp/process/ProcessEngineImpl.java new file mode 100644 index 00000000..b5028542 --- /dev/null +++ b/eaaf_core/src/main/java/at/gv/egiz/eaaf/core/impl/idp/process/ProcessEngineImpl.java @@ -0,0 +1,424 @@ +/******************************************************************************* + *******************************************************************************/ +package at.gv.egiz.eaaf.core.impl.idp.process; + +import java.io.InputStream; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.Predicate; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; + +import at.gv.egiz.eaaf.core.api.IRequest; +import at.gv.egiz.eaaf.core.api.data.EAAFConstants; +import at.gv.egiz.eaaf.core.api.idp.process.ExecutionContext; +import at.gv.egiz.eaaf.core.api.idp.process.ExpressionEvaluationContext; +import at.gv.egiz.eaaf.core.api.idp.process.ExpressionEvaluator; +import at.gv.egiz.eaaf.core.api.idp.process.ProcessEngine; +import at.gv.egiz.eaaf.core.api.idp.process.ProcessInstanceStoreDAO; +import at.gv.egiz.eaaf.core.api.idp.process.Task; +import at.gv.egiz.eaaf.core.exceptions.EAAFException; +import at.gv.egiz.eaaf.core.exceptions.ProcessExecutionException; +import at.gv.egiz.eaaf.core.impl.idp.process.dao.ProcessInstanceStore; +import at.gv.egiz.eaaf.core.impl.idp.process.model.EndEvent; +import at.gv.egiz.eaaf.core.impl.idp.process.model.ProcessDefinition; +import at.gv.egiz.eaaf.core.impl.idp.process.model.ProcessNode; +import at.gv.egiz.eaaf.core.impl.idp.process.model.StartEvent; +import at.gv.egiz.eaaf.core.impl.idp.process.model.TaskInfo; +import at.gv.egiz.eaaf.core.impl.idp.process.model.Transition; + +/** + * Process engine implementation allowing starting and continuing processes as well as providing means for cleanup actions. + */ +public class ProcessEngineImpl implements ProcessEngine { + + private Logger log = LoggerFactory.getLogger(getClass()); + + @Autowired ProcessInstanceStoreDAO piStoreDao; + @Autowired ApplicationContext context; + + private ProcessDefinitionParser pdp = new ProcessDefinitionParser(); + + private Map<String, ProcessDefinition> processDefinitions = new ConcurrentHashMap<String, ProcessDefinition>(); + + private final static String MDC_CTX_PI_NAME = "processInstanceId"; + private final static String MDC_CTX_TASK_NAME = "taskId"; + + private ExpressionEvaluator transitionConditionExpressionEvaluator; + + @Override + public void registerProcessDefinition(ProcessDefinition processDefinition) { + log.info("Registering process definition '{}'.", processDefinition.getId()); + processDefinitions.put(processDefinition.getId(), processDefinition); + } + + @Override + public String registerProcessDefinition(InputStream processDefinitionInputStream) throws ProcessDefinitionParserException{ + ProcessDefinition pd = pdp.parse(processDefinitionInputStream); + registerProcessDefinition(pd); + return pd.getId(); + } + + /** + * Sets the process definitions. + * + * @param processDefinitions + * The process definitions. + * @throws IllegalArgumentException + * In case the process definitions contain definitions with the same identifier. + */ + public void setProcessDefinitions(Iterable<ProcessDefinition> processDefinitions) { + this.processDefinitions.clear(); + for (ProcessDefinition pd : processDefinitions) { + if (this.processDefinitions.containsKey(pd.getId())) { + throw new IllegalArgumentException("Duplicate process definition identifier '" + pd.getId() + "'."); + } + registerProcessDefinition(pd); + } + } + + /** + * Sets an expression evaluator that should be used to process transition condition expressions. + * @param transitionConditionExpressionEvaluator The expression evaluator. + */ + public void setTransitionConditionExpressionEvaluator( + ExpressionEvaluator transitionConditionExpressionEvaluator) { + this.transitionConditionExpressionEvaluator = transitionConditionExpressionEvaluator; + } + + + @Override + public String createProcessInstance(String processDefinitionId, ExecutionContext executionContext) throws ProcessExecutionException { + // look for respective process definition + ProcessDefinition pd = processDefinitions.get(processDefinitionId); + if (pd == null) { + throw new ProcessExecutionException("Unable to find process definition for process '" + processDefinitionId + "'."); + } + // create and keep process instance + ProcessInstance pi = new ProcessInstance(pd, executionContext); + log.info("Creating process instance from process definition '{}': {}", processDefinitionId, pi.getId()); + + try { + saveOrUpdateProcessInstance(pi); + + } catch (EAAFException e) { + throw new ProcessExecutionException("Unable to persist process instance.", e); + } + + return pi.getId(); + } + + @Override + public String createProcessInstance(String processDefinitionId) throws ProcessExecutionException { + return createProcessInstance(processDefinitionId, null); + } + + @Override + public void start(IRequest pendingReq) throws ProcessExecutionException { + try { + if (StringUtils.isEmpty(pendingReq.getProcessInstanceId())) { + log.error("Pending-request with id:" + pendingReq.getPendingRequestId() + + " includes NO 'ProcessInstanceId'"); + throw new ProcessExecutionException("Pending-request with id:" + pendingReq.getPendingRequestId() + + " includes NO 'ProcessInstanceId'"); + } + + ProcessInstance pi = loadProcessInstance(pendingReq.getProcessInstanceId()); + + if (pi == null ) { + throw new ProcessExecutionException("Process instance '" + pendingReq.getProcessInstanceId() + "' does not exist."); + + } + + MDC.put(MDC_CTX_PI_NAME, pi.getId()); + + if (!ProcessInstanceState.NOT_STARTED.equals(pi.getState())) { + throw new ProcessExecutionException("Process instance '" + pi.getId() + "' has already been started (current state is " + pi.getState() + ")."); + } + log.info("Starting process instance '{}'.", pi.getId()); + // execute process + pi.setState(ProcessInstanceState.STARTED); + execute(pi, pendingReq); + + //store ProcessInstance if it is not already ended + if (!ProcessInstanceState.ENDED.equals(pi.getState())) + saveOrUpdateProcessInstance(pi); + + } catch (EAAFException e) { + throw new ProcessExecutionException("Unable to load/save process instance.", e); + + } finally { + MDC.remove(MDC_CTX_PI_NAME); + } + } + + @Override + public void signal(IRequest pendingReq) throws ProcessExecutionException { + + try { + if (StringUtils.isEmpty(pendingReq.getProcessInstanceId())) { + log.error("Pending-request with id:" + pendingReq.getPendingRequestId() + + " includes NO 'ProcessInstanceId'"); + throw new ProcessExecutionException("Pending-request with id:" + pendingReq.getPendingRequestId() + + " includes NO 'ProcessInstanceId'"); + } + + ProcessInstance pi = loadProcessInstance(pendingReq.getProcessInstanceId()); + + if (pi == null ) { + throw new ProcessExecutionException("Process instance '" + pendingReq.getProcessInstanceId() + "' does not exist."); + + } + + MDC.put(MDC_CTX_PI_NAME, pi.getId()); + + if (!ProcessInstanceState.SUSPENDED.equals(pi.getState())) { + throw new ProcessExecutionException("Process instance '" + pi.getId() + "' has not been suspended (current state is " + pi.getState() + ")."); + } + + log.info("Waking up process instance '{}'.", pi.getId()); + pi.setState(ProcessInstanceState.STARTED); + + //put pending-request ID on execution-context because it could be changed + pi.getExecutionContext().put(EAAFConstants.PARAM_HTTP_TARGET_PENDINGREQUESTID, pendingReq.getPendingRequestId()); + + execute(pi, pendingReq); + + //store ProcessInstance if it is not already ended + if (!ProcessInstanceState.ENDED.equals(pi.getState())) + saveOrUpdateProcessInstance(pi); + + } catch (EAAFException e) { + throw new ProcessExecutionException("Unable to load/save process instance.", e); + + } finally { + MDC.remove(MDC_CTX_PI_NAME); + } + } + + + /** + * Instantiates a task implementation given by a {@link TaskInfo}. + * @param ti The task info. + * @return A Task implementation or {@code null} if the task info does not reference any task implementing classes. + * @throws ProcessExecutionException Thrown in case of error (when the referenced class does not implement {@link Task} for instance). + */ + private Task createTaskInstance(TaskInfo ti) throws ProcessExecutionException { + String clazz = StringUtils.trimToNull(ti.getTaskImplementingClass()); + Task task = null; + + if (clazz != null) { + log.debug("Instantiating task implementing class '{}'.", clazz); + Object instanceClass = null; + try { + instanceClass = context.getBean(clazz); + + } catch (Exception e) { + throw new ProcessExecutionException("Unable to get class '" + clazz + "' associated with task '" + ti.getId() + "' .", e); + + } + if (instanceClass == null || !(instanceClass instanceof Task)) { + throw new ProcessExecutionException("Class '" + clazz + "' associated with task '" + ti.getId() + "' is not assignable to " + Task.class.getName() + "."); + + } + try { + task = (Task) instanceClass; + + } catch (Exception e) { + throw new ProcessExecutionException("Unable to instantiate class '" + clazz + "' associated with task '" + ti.getId() + "' .", e); + } + } + + return task; + } + + /** + * Starts/executes a given process instance. + * @param pi The process instance. + * @param pendingReq + * @throws ProcessExecutionException Thrown in case of error. + */ + private void execute(final ProcessInstance pi, IRequest pendingReq) throws ProcessExecutionException { + if (ProcessInstanceState.ENDED.equals(pi.getState())) { + throw new ProcessExecutionException("Process for instance '" + pi.getId() + "' has already been ended."); + } + ProcessDefinition pd = pi.getProcessDefinition(); + ProcessNode processNode = pd.getProcessNode(pi.getNextId()); + log.debug("Processing node '{}'.", processNode.getId()); + + // distinguish process node types StartEvent, TaskInfo and EndEvent + + if (processNode instanceof TaskInfo) { + // TaskInfo types need to be executed + TaskInfo ti = (TaskInfo) processNode; + MDC.put(MDC_CTX_TASK_NAME, ti.getId()); + try { + log.info("Processing task '{}'.", ti.getId()); + Task task = createTaskInstance(ti); + if (task != null) { + try { + log.info("Executing task implementation for task '{}'.", ti.getId()); + log.debug("Execution context before task execution: {}", pi.getExecutionContext().keySet()); + pendingReq = task.execute(pendingReq, pi.getExecutionContext()); + log.info("Returned from execution of task '{}'.", ti.getId()); + log.debug("Execution context after task execution: {}", pi.getExecutionContext().keySet()); + } catch (Throwable t) { + throw new ProcessExecutionException("Error executing task '" + ti.getId() + "'.", t); + } + } else { + log.debug("No task implementing class set."); + } + } finally { + MDC.remove(MDC_CTX_TASK_NAME); + } + + } else if (processNode instanceof EndEvent) { + log.info("Finishing process instance '{}'.", pi.getId()); + + try { + piStoreDao.remove(pi.getId()); + + } catch (EAAFException e) { + throw new ProcessExecutionException("Unable to remove process instance.", e); + + } + pi.setState(ProcessInstanceState.ENDED); + log.debug("Final process context: {}", pi.getExecutionContext().keySet()); + return; + } + + final ExpressionEvaluationContext expressionContext = new ExpressionEvaluationContextImpl(pi); + + // traverse pointer + Transition t = CollectionUtils.find(processNode.getOutgoingTransitions(), new Predicate<Transition>() { + @Override + public boolean evaluate(Transition transition) { + if (transitionConditionExpressionEvaluator != null && transition.getConditionExpression() != null) { + log.trace("Evaluating transition expression '{}'.", transition.getConditionExpression()); + return transitionConditionExpressionEvaluator.evaluate(expressionContext, transition.getConditionExpression()); + } + return true; + } + }); + if (t == null) { + throw new ProcessExecutionException("No valid transition starting from process node '" + processNode.getId()+ "'."); + } + log.trace("Found suitable transition: {}", t); + // update pointer + log.trace("Shifting process token from '{}' to '{}'.", pi.getNextId(), t.getTo().getId()); + pi.setNextId(t.getTo().getId()); + + // inspect current task + if (t.getTo() instanceof TaskInfo && (((TaskInfo) t.getTo()).isAsync())) { + // immediately return in case of asynchonous task + log.info("Suspending process instance '{}' for asynchronous task '{}'.", pi.getId(), t.getTo().getId()); + pi.setState(ProcessInstanceState.SUSPENDED); + return; + } + + // continue execution in case of StartEvent or Task + if (processNode instanceof StartEvent || processNode instanceof TaskInfo) { + execute(pi, pendingReq); + } + } + + @Override + public ProcessInstance getProcessInstance(String processInstanceId) { + + ProcessInstance processInstance; + try { + processInstance = loadProcessInstance(processInstanceId); + + } catch (EAAFException e) { + throw new RuntimeException("The process instance '" + processInstanceId + "' could not be retrieved.", e); + } + + if (processInstance == null) { + throw new IllegalArgumentException("The process instance '" + processInstanceId + "' does not/no longer exist."); + } + + return processInstance; + } + + /** + * Persists a {@link ProcessInstance} to the database. + * @param processInstance The object to persist. + * @throws MOADatabaseException Thrown if an error occurs while accessing the database. + */ + private void saveOrUpdateProcessInstance(ProcessInstance processInstance) throws EAAFException { + ProcessInstanceStore store = new ProcessInstanceStore(); + + ExecutionContext ctx = processInstance.getExecutionContext(); + + Map<String, Serializable> ctxData = new HashMap<String, Serializable>(); + for (String key : ctx.keySet()) { + ctxData.put(key, ctx.get(key)); + } + store.setExecutionContextData(ctxData); + + store.setNextTaskId(processInstance.getNextId()); + store.setProcessDefinitionId(processInstance.getProcessDefinition().getId()); + + store.setProcessInstanceId(processInstance.getId()); + store.setProcessState(processInstance.getState()); + + piStoreDao.saveOrUpdate(store); + } + + /** + * Load a {@link ProcessInstance} with a certain id from the database. + * @param processInstanceId The process instance id + * @return The process instance corresponding to the id or {@code null} if no such object is found. + * @throws MOADatabaseException Thrown if an error occurs while accessing the database. + */ + private ProcessInstance loadProcessInstance(String processInstanceId) throws EAAFException { + + ProcessInstanceStore piStore = piStoreDao.load(processInstanceId); + + if (piStore == null) { + return null; + } + + ExecutionContext executionContext = new ExecutionContextImpl(piStore.getProcessInstanceId()); + + Map<String, Serializable> executionContextData = piStore.getExecutionContextData(); + for (String key : executionContextData.keySet()) { + executionContext.put(key, executionContextData.get(key)); + } + + ProcessInstance pi = new ProcessInstance(processDefinitions.get(piStore.getProcessDefinitionId()), executionContext); + pi.setNextId(piStore.getNextTaskId()); + pi.setState(piStore.getProcessState()); + + return pi; + } + + /* (non-Javadoc) + * @see at.gv.egovernment.moa.id.process.ProcessEngine#deleteProcessInstance(java.lang.String) + */ + @Override + public void deleteProcessInstance(String processInstanceId) throws ProcessExecutionException { + if (StringUtils.isEmpty(processInstanceId)) { + throw new ProcessExecutionException("Unable to remove process instance: ProcessInstanceId is empty"); + + } + + try { + piStoreDao.remove(processInstanceId); + + } catch (EAAFException e) { + throw new ProcessExecutionException("Unable to remove process instance.", e); + + } + + } + +} |