diff options
Diffstat (limited to 'id/server/idserverlib/src/main/java/at/gv/egovernment/moa/id/process/ProcessEngineImpl.java')
-rw-r--r-- | id/server/idserverlib/src/main/java/at/gv/egovernment/moa/id/process/ProcessEngineImpl.java | 304 |
1 files changed, 304 insertions, 0 deletions
diff --git a/id/server/idserverlib/src/main/java/at/gv/egovernment/moa/id/process/ProcessEngineImpl.java b/id/server/idserverlib/src/main/java/at/gv/egovernment/moa/id/process/ProcessEngineImpl.java new file mode 100644 index 000000000..8f9d73b3d --- /dev/null +++ b/id/server/idserverlib/src/main/java/at/gv/egovernment/moa/id/process/ProcessEngineImpl.java @@ -0,0 +1,304 @@ +package at.gv.egovernment.moa.id.process; + +import java.io.InputStream; +import java.io.Serializable; +import java.util.Date; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.Predicate; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.MDC; + +import com.datentechnik.process_engine.api.ExecutionContext; +import com.datentechnik.process_engine.api.ExpressionEvaluationContext; +import com.datentechnik.process_engine.api.ExpressionEvaluator; +import com.datentechnik.process_engine.api.Task; +import com.datentechnik.process_engine.model.EndEvent; +import com.datentechnik.process_engine.model.ProcessDefinition; +import com.datentechnik.process_engine.model.ProcessNode; +import com.datentechnik.process_engine.model.StartEvent; +import com.datentechnik.process_engine.model.TaskInfo; +import com.datentechnik.process_engine.model.Transition; + +/** + * Process engine implementation allowing starting and continuing processes as well as providing means for cleanup actions. + * @author tknall + * + */ +public class ProcessEngineImpl implements ProcessEngine { + + private Logger log = LoggerFactory.getLogger(getClass()); + + private ProcessDefinitionParser pdp = new ProcessDefinitionParser(); + + private Map<String, ProcessDefinition> processDefinitions = new ConcurrentHashMap<String, ProcessDefinition>(); + private Map<String, ProcessInstance> processInstances = new ConcurrentHashMap<String, ProcessInstance>(); + + private final static String MDC_CTX_PI_NAME = "processInstanceId"; + private final static String MDC_CTX_TASK_NAME = "taskId"; + + private static final long DEFAULT_PROCESS_INSTANCE_MAX_AGE_SECONDS = 3600; + private long processInstanceIdleTimeSeconds = DEFAULT_PROCESS_INSTANCE_MAX_AGE_SECONDS; + private ExpressionEvaluator transitionConditionExpressionEvaluator; + + @Override + public void registerProcessDefinition(ProcessDefinition processDefinition) { + log.info("Registering process definition '{}'.", processDefinition.getId()); + processDefinitions.put(processDefinition.getId(), processDefinition); + } + + @Override + public void registerProcessDefinition(InputStream processDefinitionInputStream) throws ProcessDefinitionParserException{ + registerProcessDefinition(pdp.parse(processDefinitionInputStream)); + } + + /** + * Sets the process definitions. + * + * @param processDefinitions + * The process definitions. + * @throws IllegalArgumentException + * In case the process definitions contain definitions with the same identifier. + */ + public void setProcessDefinitions(Iterable<ProcessDefinition> processDefinitions) { + this.processDefinitions.clear(); + for (ProcessDefinition pd : processDefinitions) { + if (this.processDefinitions.containsKey(pd.getId())) { + throw new IllegalArgumentException("Duplicate process definition identifier '" + pd.getId() + "'."); + } + registerProcessDefinition(pd); + } + } + + /** + * Defines the time frame in seconds an idle process instance will be managed by the process engine. A process + * instance with an idle time larger than the given time will be removed. + * <p/> + * Note that {@link #cleanup()} needs to be called in order to remove expired process instances. + * + * @param processInstanceMaxIdleTimeSeconds + * The maximum idle time in seconds. + */ + public void setProcessInstanceMaxIdleTimeSeconds(long processInstanceMaxIdleTimeSeconds) { + this.processInstanceIdleTimeSeconds = processInstanceMaxIdleTimeSeconds; + } + + /** + * Sets an expression evaluator that should be used to process transition condition expressions. + * @param transitionConditionExpressionEvaluator The expression evaluator. + */ + public void setTransitionConditionExpressionEvaluator( + ExpressionEvaluator transitionConditionExpressionEvaluator) { + this.transitionConditionExpressionEvaluator = transitionConditionExpressionEvaluator; + } + + + @Override + public ProcessInstance createProcessInstance(String processDefinitionId, ExecutionContext executionContext) throws ProcessExecutionException { + // look for respective process definition + ProcessDefinition pd = processDefinitions.get(processDefinitionId); + if (pd == null) { + throw new ProcessExecutionException("Unable to find process definition for process '" + processDefinitionId + "'."); + } + // create and keep process instance + ProcessInstance pi = new ProcessInstance(pd, executionContext); + log.info("Creating process instance from process definition '{}': {}", processDefinitionId, pi.getId()); + processInstances.put(pi.getId(), pi); + return pi; + } + + @Override + public ProcessInstance createProcessInstance(String processDefinitionId) throws ProcessExecutionException { + return createProcessInstance(processDefinitionId, null); + } + + @Override + public void start(ProcessInstance pi) throws ProcessExecutionException { + MDC.put(MDC_CTX_PI_NAME, pi.getId()); + try { + if (!ProcessInstanceState.NOT_STARTED.equals(pi.getState())) { + throw new ProcessExecutionException("Process instance '" + pi.getId() + "' has already been started (current state is " + pi.getState() + ")."); + } + log.info("Starting process instance '{}'.", pi.getId()); + // execute process + pi.setState(ProcessInstanceState.STARTED); + execute(pi); + } finally { + MDC.remove(MDC_CTX_PI_NAME); + } + } + + @Override + public void signal(ProcessInstance pi) throws ProcessExecutionException { + MDC.put(MDC_CTX_PI_NAME, pi.getId()); + try { + if (!ProcessInstanceState.SUSPENDED.equals(pi.getState())) { + throw new ProcessExecutionException("Process instance '" + pi.getId() + "' has not been suspended (current state is " + pi.getState() + ")."); + } + log.info("Waking up process instance '{}'.", pi.getId()); + pi.setState(ProcessInstanceState.STARTED); + execute(pi); + } finally { + MDC.remove(MDC_CTX_PI_NAME); + } + } + + @Override + public synchronized void cleanup() { + log.trace("Cleanup job started."); + Iterator<Entry<String, ProcessInstance>> it = processInstances.entrySet().iterator(); + while (it.hasNext()) { + Entry<String, ProcessInstance> entry = it.next(); + ProcessInstance pi = entry.getValue(); + log.trace("Checking process instance {}.", pi); + long ageMillis = new Date().getTime() - pi.getLru().getTime(); + if (ageMillis > processInstanceIdleTimeSeconds * 1000) { + log.info("Removing process instance '{}'.", pi.getId()); + processInstances.remove(entry.getKey()); + } + } + log.trace("Cleanup job completed."); + } + + /** + * Instantates a task implementation given by a {@link TaskInfo}. + * @param ti The task info. + * @return A Task implementation or {@code null} if the task info does not reference any task implementing classes. + * @throws ProcessExecutionException Thrown in case of error (when the referenced class does not implement {@link Task} for instance). + */ + private Task createTaskInstance(TaskInfo ti) throws ProcessExecutionException { + String clazz = StringUtils.trimToNull(ti.getTaskImplementingClass()); + Task task = null; + + if (clazz != null) { + log.debug("Instantiating task implementing class '{}'.", clazz); + Class<?> instanceClass = null; + try { + instanceClass = Class.forName(clazz, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new ProcessExecutionException("Unable to get class '" + clazz + "' associated with task '" + ti.getId() + "' .", e); + } + if (!Task.class.isAssignableFrom(instanceClass)) { + throw new ProcessExecutionException("Class '" + clazz + "' associated with task '" + ti.getId() + "' is not assignable to " + Task.class.getName() + "."); + } + try { + task = (Task) instanceClass.newInstance(); + } catch (Exception e) { + throw new ProcessExecutionException("Unable to instantiate class '" + clazz + "' associated with task '" + ti.getId() + "' .", e); + } + } + + return task; + } + + /** + * Starts/executes a given process instance. + * @param pi The process instance. + * @throws ProcessExecutionException Thrown in case of error. + */ + private void execute(final ProcessInstance pi) throws ProcessExecutionException { + if (ProcessInstanceState.ENDED.equals(pi.getState())) { + throw new ProcessExecutionException("Process for instance '" + pi.getId() + "' has already been ended."); + } + ProcessDefinition pd = pi.getProcessDefinition(); + ProcessNode processNode = pd.getProcessNode(pi.getNextId()); + log.debug("Processing node '{}'.", processNode.getId()); + + // distinguish process node types StartEvent, TaskInfo and EndEvent + + if (processNode instanceof TaskInfo) { + // TaskInfo types need to be executed + TaskInfo ti = (TaskInfo) processNode; + MDC.put(MDC_CTX_TASK_NAME, ti.getId()); + try { + log.info("Processing task '{}'.", ti.getId()); + Task task = createTaskInstance(ti); + if (task != null) { + try { + log.info("Executing task implementation for task '{}'.", ti.getId()); + log.debug("Execution context before task execution: {}", pi.getExecutionContext().keySet()); + task.execute(pi.getExecutionContext()); + log.info("Returned from execution of task '{}'.", ti.getId()); + log.debug("Execution context after task execution: {}", pi.getExecutionContext().keySet()); + } catch (Throwable t) { + throw new ProcessExecutionException("Error executing task '" + ti.getId() + "'.", t); + } + } else { + log.debug("No task implementing class set."); + } + } finally { + MDC.remove(MDC_CTX_TASK_NAME); + } + + } else if (processNode instanceof EndEvent) { + log.info("Finishing process instance '{}'.", pi.getId()); + processInstances.remove(pi.getId()); + pi.setState(ProcessInstanceState.ENDED); + log.debug("Final process context: {}", pi.getExecutionContext().keySet()); + return; + } + + final ExpressionEvaluationContext expressionContext = new ExpressionEvaluationContextImpl(pi); + + // traverse pointer + Transition t = CollectionUtils.find(processNode.getOutgoingTransitions(), new Predicate<Transition>() { + @Override + public boolean evaluate(Transition transition) { + if (transitionConditionExpressionEvaluator != null && transition.getConditionExpression() != null) { + log.trace("Evaluating transition expression '{}'.", transition.getConditionExpression()); + return transitionConditionExpressionEvaluator.evaluate(expressionContext, transition.getConditionExpression()); + } + return true; + } + }); + if (t == null) { + throw new ProcessExecutionException("No valid transition starting from process node '" + processNode.getId()+ "'."); + } + log.trace("Found suitable transition: {}", t); + // update pointer + log.trace("Shifting process token from '{}' to '{}'.", pi.getNextId(), t.getTo().getId()); + pi.setNextId(t.getTo().getId()); + + // inspect current task + if (t.getTo() instanceof TaskInfo && (((TaskInfo) t.getTo()).isAsync())) { + // immediately return in case of asynchonous task + log.info("Suspending process instance '{}' for asynchronous task '{}'.", pi.getId(), t.getTo().getId()); + pi.setState(ProcessInstanceState.SUSPENDED); + return; + } + + // continue execution in case of StartEvent or Task + if (processNode instanceof StartEvent || processNode instanceof TaskInfo) { + execute(pi); + } + } + + @Override + public ProcessInstance getProcessInstance(String processInstanceId) { + ProcessInstance processInstance = processInstances.get(processInstanceId); + if (processInstance == null) { + throw new IllegalArgumentException("The process instance '" + processInstanceId + "' does not/no longer exist."); + } + return processInstance; + } + + @Override + public ProcessInstance findProcessInstanceWith(String key, Serializable value) { + Iterator<ProcessInstance> it = processInstances.values().iterator(); + while (it.hasNext()) { + ProcessInstance pi = it.next(); + if (Objects.equals(pi.getExecutionContext().get(key), value)) { + return pi; + } + } + return null; + } + +} |