summaryrefslogtreecommitdiff
path: root/eaaf_core/src/main/java/at/gv/egiz/eaaf/core/impl/idp/process/ProcessEngineImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'eaaf_core/src/main/java/at/gv/egiz/eaaf/core/impl/idp/process/ProcessEngineImpl.java')
-rw-r--r--eaaf_core/src/main/java/at/gv/egiz/eaaf/core/impl/idp/process/ProcessEngineImpl.java424
1 files changed, 424 insertions, 0 deletions
diff --git a/eaaf_core/src/main/java/at/gv/egiz/eaaf/core/impl/idp/process/ProcessEngineImpl.java b/eaaf_core/src/main/java/at/gv/egiz/eaaf/core/impl/idp/process/ProcessEngineImpl.java
new file mode 100644
index 00000000..b5028542
--- /dev/null
+++ b/eaaf_core/src/main/java/at/gv/egiz/eaaf/core/impl/idp/process/ProcessEngineImpl.java
@@ -0,0 +1,424 @@
+/*******************************************************************************
+ *******************************************************************************/
+package at.gv.egiz.eaaf.core.impl.idp.process;
+
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.collections4.Predicate;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+
+import at.gv.egiz.eaaf.core.api.IRequest;
+import at.gv.egiz.eaaf.core.api.data.EAAFConstants;
+import at.gv.egiz.eaaf.core.api.idp.process.ExecutionContext;
+import at.gv.egiz.eaaf.core.api.idp.process.ExpressionEvaluationContext;
+import at.gv.egiz.eaaf.core.api.idp.process.ExpressionEvaluator;
+import at.gv.egiz.eaaf.core.api.idp.process.ProcessEngine;
+import at.gv.egiz.eaaf.core.api.idp.process.ProcessInstanceStoreDAO;
+import at.gv.egiz.eaaf.core.api.idp.process.Task;
+import at.gv.egiz.eaaf.core.exceptions.EAAFException;
+import at.gv.egiz.eaaf.core.exceptions.ProcessExecutionException;
+import at.gv.egiz.eaaf.core.impl.idp.process.dao.ProcessInstanceStore;
+import at.gv.egiz.eaaf.core.impl.idp.process.model.EndEvent;
+import at.gv.egiz.eaaf.core.impl.idp.process.model.ProcessDefinition;
+import at.gv.egiz.eaaf.core.impl.idp.process.model.ProcessNode;
+import at.gv.egiz.eaaf.core.impl.idp.process.model.StartEvent;
+import at.gv.egiz.eaaf.core.impl.idp.process.model.TaskInfo;
+import at.gv.egiz.eaaf.core.impl.idp.process.model.Transition;
+
+/**
+ * Process engine implementation allowing starting and continuing processes as well as providing means for cleanup actions.
+ */
+public class ProcessEngineImpl implements ProcessEngine {
+
+ private Logger log = LoggerFactory.getLogger(getClass());
+
+ @Autowired ProcessInstanceStoreDAO piStoreDao;
+ @Autowired ApplicationContext context;
+
+ private ProcessDefinitionParser pdp = new ProcessDefinitionParser();
+
+ private Map<String, ProcessDefinition> processDefinitions = new ConcurrentHashMap<String, ProcessDefinition>();
+
+ private final static String MDC_CTX_PI_NAME = "processInstanceId";
+ private final static String MDC_CTX_TASK_NAME = "taskId";
+
+ private ExpressionEvaluator transitionConditionExpressionEvaluator;
+
+ @Override
+ public void registerProcessDefinition(ProcessDefinition processDefinition) {
+ log.info("Registering process definition '{}'.", processDefinition.getId());
+ processDefinitions.put(processDefinition.getId(), processDefinition);
+ }
+
+ @Override
+ public String registerProcessDefinition(InputStream processDefinitionInputStream) throws ProcessDefinitionParserException{
+ ProcessDefinition pd = pdp.parse(processDefinitionInputStream);
+ registerProcessDefinition(pd);
+ return pd.getId();
+ }
+
+ /**
+ * Sets the process definitions.
+ *
+ * @param processDefinitions
+ * The process definitions.
+ * @throws IllegalArgumentException
+ * In case the process definitions contain definitions with the same identifier.
+ */
+ public void setProcessDefinitions(Iterable<ProcessDefinition> processDefinitions) {
+ this.processDefinitions.clear();
+ for (ProcessDefinition pd : processDefinitions) {
+ if (this.processDefinitions.containsKey(pd.getId())) {
+ throw new IllegalArgumentException("Duplicate process definition identifier '" + pd.getId() + "'.");
+ }
+ registerProcessDefinition(pd);
+ }
+ }
+
+ /**
+ * Sets an expression evaluator that should be used to process transition condition expressions.
+ * @param transitionConditionExpressionEvaluator The expression evaluator.
+ */
+ public void setTransitionConditionExpressionEvaluator(
+ ExpressionEvaluator transitionConditionExpressionEvaluator) {
+ this.transitionConditionExpressionEvaluator = transitionConditionExpressionEvaluator;
+ }
+
+
+ @Override
+ public String createProcessInstance(String processDefinitionId, ExecutionContext executionContext) throws ProcessExecutionException {
+ // look for respective process definition
+ ProcessDefinition pd = processDefinitions.get(processDefinitionId);
+ if (pd == null) {
+ throw new ProcessExecutionException("Unable to find process definition for process '" + processDefinitionId + "'.");
+ }
+ // create and keep process instance
+ ProcessInstance pi = new ProcessInstance(pd, executionContext);
+ log.info("Creating process instance from process definition '{}': {}", processDefinitionId, pi.getId());
+
+ try {
+ saveOrUpdateProcessInstance(pi);
+
+ } catch (EAAFException e) {
+ throw new ProcessExecutionException("Unable to persist process instance.", e);
+ }
+
+ return pi.getId();
+ }
+
+ @Override
+ public String createProcessInstance(String processDefinitionId) throws ProcessExecutionException {
+ return createProcessInstance(processDefinitionId, null);
+ }
+
+ @Override
+ public void start(IRequest pendingReq) throws ProcessExecutionException {
+ try {
+ if (StringUtils.isEmpty(pendingReq.getProcessInstanceId())) {
+ log.error("Pending-request with id:" + pendingReq.getPendingRequestId()
+ + " includes NO 'ProcessInstanceId'");
+ throw new ProcessExecutionException("Pending-request with id:" + pendingReq.getPendingRequestId()
+ + " includes NO 'ProcessInstanceId'");
+ }
+
+ ProcessInstance pi = loadProcessInstance(pendingReq.getProcessInstanceId());
+
+ if (pi == null ) {
+ throw new ProcessExecutionException("Process instance '" + pendingReq.getProcessInstanceId() + "' does not exist.");
+
+ }
+
+ MDC.put(MDC_CTX_PI_NAME, pi.getId());
+
+ if (!ProcessInstanceState.NOT_STARTED.equals(pi.getState())) {
+ throw new ProcessExecutionException("Process instance '" + pi.getId() + "' has already been started (current state is " + pi.getState() + ").");
+ }
+ log.info("Starting process instance '{}'.", pi.getId());
+ // execute process
+ pi.setState(ProcessInstanceState.STARTED);
+ execute(pi, pendingReq);
+
+ //store ProcessInstance if it is not already ended
+ if (!ProcessInstanceState.ENDED.equals(pi.getState()))
+ saveOrUpdateProcessInstance(pi);
+
+ } catch (EAAFException e) {
+ throw new ProcessExecutionException("Unable to load/save process instance.", e);
+
+ } finally {
+ MDC.remove(MDC_CTX_PI_NAME);
+ }
+ }
+
+ @Override
+ public void signal(IRequest pendingReq) throws ProcessExecutionException {
+
+ try {
+ if (StringUtils.isEmpty(pendingReq.getProcessInstanceId())) {
+ log.error("Pending-request with id:" + pendingReq.getPendingRequestId()
+ + " includes NO 'ProcessInstanceId'");
+ throw new ProcessExecutionException("Pending-request with id:" + pendingReq.getPendingRequestId()
+ + " includes NO 'ProcessInstanceId'");
+ }
+
+ ProcessInstance pi = loadProcessInstance(pendingReq.getProcessInstanceId());
+
+ if (pi == null ) {
+ throw new ProcessExecutionException("Process instance '" + pendingReq.getProcessInstanceId() + "' does not exist.");
+
+ }
+
+ MDC.put(MDC_CTX_PI_NAME, pi.getId());
+
+ if (!ProcessInstanceState.SUSPENDED.equals(pi.getState())) {
+ throw new ProcessExecutionException("Process instance '" + pi.getId() + "' has not been suspended (current state is " + pi.getState() + ").");
+ }
+
+ log.info("Waking up process instance '{}'.", pi.getId());
+ pi.setState(ProcessInstanceState.STARTED);
+
+ //put pending-request ID on execution-context because it could be changed
+ pi.getExecutionContext().put(EAAFConstants.PARAM_HTTP_TARGET_PENDINGREQUESTID, pendingReq.getPendingRequestId());
+
+ execute(pi, pendingReq);
+
+ //store ProcessInstance if it is not already ended
+ if (!ProcessInstanceState.ENDED.equals(pi.getState()))
+ saveOrUpdateProcessInstance(pi);
+
+ } catch (EAAFException e) {
+ throw new ProcessExecutionException("Unable to load/save process instance.", e);
+
+ } finally {
+ MDC.remove(MDC_CTX_PI_NAME);
+ }
+ }
+
+
+ /**
+ * Instantiates a task implementation given by a {@link TaskInfo}.
+ * @param ti The task info.
+ * @return A Task implementation or {@code null} if the task info does not reference any task implementing classes.
+ * @throws ProcessExecutionException Thrown in case of error (when the referenced class does not implement {@link Task} for instance).
+ */
+ private Task createTaskInstance(TaskInfo ti) throws ProcessExecutionException {
+ String clazz = StringUtils.trimToNull(ti.getTaskImplementingClass());
+ Task task = null;
+
+ if (clazz != null) {
+ log.debug("Instantiating task implementing class '{}'.", clazz);
+ Object instanceClass = null;
+ try {
+ instanceClass = context.getBean(clazz);
+
+ } catch (Exception e) {
+ throw new ProcessExecutionException("Unable to get class '" + clazz + "' associated with task '" + ti.getId() + "' .", e);
+
+ }
+ if (instanceClass == null || !(instanceClass instanceof Task)) {
+ throw new ProcessExecutionException("Class '" + clazz + "' associated with task '" + ti.getId() + "' is not assignable to " + Task.class.getName() + ".");
+
+ }
+ try {
+ task = (Task) instanceClass;
+
+ } catch (Exception e) {
+ throw new ProcessExecutionException("Unable to instantiate class '" + clazz + "' associated with task '" + ti.getId() + "' .", e);
+ }
+ }
+
+ return task;
+ }
+
+ /**
+ * Starts/executes a given process instance.
+ * @param pi The process instance.
+ * @param pendingReq
+ * @throws ProcessExecutionException Thrown in case of error.
+ */
+ private void execute(final ProcessInstance pi, IRequest pendingReq) throws ProcessExecutionException {
+ if (ProcessInstanceState.ENDED.equals(pi.getState())) {
+ throw new ProcessExecutionException("Process for instance '" + pi.getId() + "' has already been ended.");
+ }
+ ProcessDefinition pd = pi.getProcessDefinition();
+ ProcessNode processNode = pd.getProcessNode(pi.getNextId());
+ log.debug("Processing node '{}'.", processNode.getId());
+
+ // distinguish process node types StartEvent, TaskInfo and EndEvent
+
+ if (processNode instanceof TaskInfo) {
+ // TaskInfo types need to be executed
+ TaskInfo ti = (TaskInfo) processNode;
+ MDC.put(MDC_CTX_TASK_NAME, ti.getId());
+ try {
+ log.info("Processing task '{}'.", ti.getId());
+ Task task = createTaskInstance(ti);
+ if (task != null) {
+ try {
+ log.info("Executing task implementation for task '{}'.", ti.getId());
+ log.debug("Execution context before task execution: {}", pi.getExecutionContext().keySet());
+ pendingReq = task.execute(pendingReq, pi.getExecutionContext());
+ log.info("Returned from execution of task '{}'.", ti.getId());
+ log.debug("Execution context after task execution: {}", pi.getExecutionContext().keySet());
+ } catch (Throwable t) {
+ throw new ProcessExecutionException("Error executing task '" + ti.getId() + "'.", t);
+ }
+ } else {
+ log.debug("No task implementing class set.");
+ }
+ } finally {
+ MDC.remove(MDC_CTX_TASK_NAME);
+ }
+
+ } else if (processNode instanceof EndEvent) {
+ log.info("Finishing process instance '{}'.", pi.getId());
+
+ try {
+ piStoreDao.remove(pi.getId());
+
+ } catch (EAAFException e) {
+ throw new ProcessExecutionException("Unable to remove process instance.", e);
+
+ }
+ pi.setState(ProcessInstanceState.ENDED);
+ log.debug("Final process context: {}", pi.getExecutionContext().keySet());
+ return;
+ }
+
+ final ExpressionEvaluationContext expressionContext = new ExpressionEvaluationContextImpl(pi);
+
+ // traverse pointer
+ Transition t = CollectionUtils.find(processNode.getOutgoingTransitions(), new Predicate<Transition>() {
+ @Override
+ public boolean evaluate(Transition transition) {
+ if (transitionConditionExpressionEvaluator != null && transition.getConditionExpression() != null) {
+ log.trace("Evaluating transition expression '{}'.", transition.getConditionExpression());
+ return transitionConditionExpressionEvaluator.evaluate(expressionContext, transition.getConditionExpression());
+ }
+ return true;
+ }
+ });
+ if (t == null) {
+ throw new ProcessExecutionException("No valid transition starting from process node '" + processNode.getId()+ "'.");
+ }
+ log.trace("Found suitable transition: {}", t);
+ // update pointer
+ log.trace("Shifting process token from '{}' to '{}'.", pi.getNextId(), t.getTo().getId());
+ pi.setNextId(t.getTo().getId());
+
+ // inspect current task
+ if (t.getTo() instanceof TaskInfo && (((TaskInfo) t.getTo()).isAsync())) {
+ // immediately return in case of asynchonous task
+ log.info("Suspending process instance '{}' for asynchronous task '{}'.", pi.getId(), t.getTo().getId());
+ pi.setState(ProcessInstanceState.SUSPENDED);
+ return;
+ }
+
+ // continue execution in case of StartEvent or Task
+ if (processNode instanceof StartEvent || processNode instanceof TaskInfo) {
+ execute(pi, pendingReq);
+ }
+ }
+
+ @Override
+ public ProcessInstance getProcessInstance(String processInstanceId) {
+
+ ProcessInstance processInstance;
+ try {
+ processInstance = loadProcessInstance(processInstanceId);
+
+ } catch (EAAFException e) {
+ throw new RuntimeException("The process instance '" + processInstanceId + "' could not be retrieved.", e);
+ }
+
+ if (processInstance == null) {
+ throw new IllegalArgumentException("The process instance '" + processInstanceId + "' does not/no longer exist.");
+ }
+
+ return processInstance;
+ }
+
+ /**
+ * Persists a {@link ProcessInstance} to the database.
+ * @param processInstance The object to persist.
+ * @throws MOADatabaseException Thrown if an error occurs while accessing the database.
+ */
+ private void saveOrUpdateProcessInstance(ProcessInstance processInstance) throws EAAFException {
+ ProcessInstanceStore store = new ProcessInstanceStore();
+
+ ExecutionContext ctx = processInstance.getExecutionContext();
+
+ Map<String, Serializable> ctxData = new HashMap<String, Serializable>();
+ for (String key : ctx.keySet()) {
+ ctxData.put(key, ctx.get(key));
+ }
+ store.setExecutionContextData(ctxData);
+
+ store.setNextTaskId(processInstance.getNextId());
+ store.setProcessDefinitionId(processInstance.getProcessDefinition().getId());
+
+ store.setProcessInstanceId(processInstance.getId());
+ store.setProcessState(processInstance.getState());
+
+ piStoreDao.saveOrUpdate(store);
+ }
+
+ /**
+ * Load a {@link ProcessInstance} with a certain id from the database.
+ * @param processInstanceId The process instance id
+ * @return The process instance corresponding to the id or {@code null} if no such object is found.
+ * @throws MOADatabaseException Thrown if an error occurs while accessing the database.
+ */
+ private ProcessInstance loadProcessInstance(String processInstanceId) throws EAAFException {
+
+ ProcessInstanceStore piStore = piStoreDao.load(processInstanceId);
+
+ if (piStore == null) {
+ return null;
+ }
+
+ ExecutionContext executionContext = new ExecutionContextImpl(piStore.getProcessInstanceId());
+
+ Map<String, Serializable> executionContextData = piStore.getExecutionContextData();
+ for (String key : executionContextData.keySet()) {
+ executionContext.put(key, executionContextData.get(key));
+ }
+
+ ProcessInstance pi = new ProcessInstance(processDefinitions.get(piStore.getProcessDefinitionId()), executionContext);
+ pi.setNextId(piStore.getNextTaskId());
+ pi.setState(piStore.getProcessState());
+
+ return pi;
+ }
+
+ /* (non-Javadoc)
+ * @see at.gv.egovernment.moa.id.process.ProcessEngine#deleteProcessInstance(java.lang.String)
+ */
+ @Override
+ public void deleteProcessInstance(String processInstanceId) throws ProcessExecutionException {
+ if (StringUtils.isEmpty(processInstanceId)) {
+ throw new ProcessExecutionException("Unable to remove process instance: ProcessInstanceId is empty");
+
+ }
+
+ try {
+ piStoreDao.remove(processInstanceId);
+
+ } catch (EAAFException e) {
+ throw new ProcessExecutionException("Unable to remove process instance.", e);
+
+ }
+
+ }
+
+}