summaryrefslogtreecommitdiff
path: root/bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java
diff options
context:
space:
mode:
Diffstat (limited to 'bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java')
-rw-r--r--bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java414
1 files changed, 191 insertions, 223 deletions
diff --git a/bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java b/bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java
index bf9a63e2..eee80b03 100644
--- a/bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java
+++ b/bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java
@@ -16,315 +16,283 @@
*/
package at.gv.egiz.bku.binding;
-import at.gv.egiz.bku.conf.Configuration;
-import java.net.MalformedURLException;
-import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
+import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import at.gv.egiz.bku.jmx.ComponentMXBean;
+import at.gv.egiz.bku.jmx.ComponentState;
import at.gv.egiz.bku.slcommands.SLCommandInvoker;
import at.gv.egiz.bku.slexceptions.SLRuntimeException;
import at.gv.egiz.bku.utils.binding.Protocol;
-import at.gv.egiz.stal.STAL;
import at.gv.egiz.stal.STALFactory;
/**
* This class maintains all active BindingProcessor Objects. Currently, only
* HTTPBinding is supported.
*/
-public class BindingProcessorManagerImpl implements BindingProcessorManager {
+public class BindingProcessorManagerImpl implements BindingProcessorManager, ComponentMXBean {
+
+ public static long DEFAULT_MAX_ACCEPTED_AGE = 2 * 60 * 1000;
+
+ public static int DEFAULT_CLEAN_UP_INTERVAL = 60;
- public final static Protocol[] SUPPORTED_PROTOCOLS = { Protocol.HTTP,
- Protocol.HTTPS };
+ private final Logger log = LoggerFactory.getLogger(BindingProcessorManagerImpl.class);
- private static Log log = LogFactory.getLog(BindingProcessorManagerImpl.class);
+ private List<BindingProcessorFactory> factories = Collections.emptyList();
- /** spring injected config
- * Passed to created bindingprocessors, to replace their configuration */
- protected Configuration config;
+ private Configuration configuration;
- protected STALFactory stalFactory;
- protected SLCommandInvoker commandInvokerClass;
+ private STALFactory stalFactory;
+
+ private SLCommandInvoker commandInvoker;
- private RemovalStrategy removalStrategy;
- private ExecutorService executorService;
- private Map<Id, ProcessingContext> contextMap = Collections.synchronizedMap(new HashMap<Id, ProcessingContext>());
-// private Map<Id, MapEntityWrapper> bindingProcessorMap = Collections
-// .synchronizedMap(new HashMap<Id, MapEntityWrapper>());
+ private ExecutorService executorService = Executors.newCachedThreadPool();
+ private Map<Id, BindingProcessorFuture> submittedFutures = Collections
+ .synchronizedMap(new HashMap<Id, BindingProcessorFuture>());
+
+ private int cleanUpInterval = DEFAULT_CLEAN_UP_INTERVAL;
+
+ private long maxAcceptedAge = DEFAULT_MAX_ACCEPTED_AGE;
+
+ private ScheduledExecutorService cleanUpService = Executors
+ .newSingleThreadScheduledExecutor();
+
+ public BindingProcessorManagerImpl() {
+ cleanUpService.scheduleAtFixedRate(new CleanUpTask(), cleanUpInterval,
+ cleanUpInterval, TimeUnit.SECONDS);
+ }
+
/**
- * Container to hold a Future and Bindingprocessor object as map value.
- *
- * @author wbauer
- * @see BindingProcessorManagerImpl#bindingProcessorMap
+ * @return the configuration
*/
-// static class MapEntityWrapper {
-// private Future<?> future;
-// private BindingProcessor bindingProcessor;
-//
-// public MapEntityWrapper(Future<?> future, BindingProcessor bindingProcessor) {
-// if ((bindingProcessor == null) || (future == null)) {
-// throw new NullPointerException("Argument must not be null");
-// }
-// this.bindingProcessor = bindingProcessor;
-// this.future = future;
-// }
-//
-// public Future<?> getFuture() {
-// return future;
-// }
-//
-// public BindingProcessor getBindingProcessor() {
-// return bindingProcessor;
-// }
-//
-// public int hashCode() {
-// return bindingProcessor.getId().hashCode();
-// }
-//
-// public boolean equals(Object other) {
-// if (other instanceof MapEntityWrapper) {
-// MapEntityWrapper o = (MapEntityWrapper) other;
-// return (o.bindingProcessor.getId().equals(bindingProcessor.getId()));
-// } else {
-// return false;
-// }
-// }
-// }
+ public Configuration getConfiguration() {
+ return configuration;
+ }
/**
- *
- * @param fab
- * must not be null
- * @param ci
- * must not be null (prototype to generate new instances)
+ * @param configuration the configuration to set
*/
- public BindingProcessorManagerImpl(STALFactory fab, SLCommandInvoker ci, Configuration conf) {
- if (fab == null) {
- throw new NullPointerException("STALFactory must not be null");
- }
- stalFactory = fab;
- if (ci == null) {
- throw new NullPointerException("SLCommandInvoker must not be null");
- }
- commandInvokerClass = ci;
- config = conf;
- executorService = Executors.newCachedThreadPool();
+ public void setConfiguration(Configuration configuration) {
+ this.configuration = configuration;
}
/**
- *
- * @return the STALFactory currently used.
+ * @return the factoryMap
*/
- public STALFactory getStalFactory() {
- return stalFactory;
+ public List<BindingProcessorFactory> getFactories() {
+ return factories;
}
/**
- * Sets the STALFactory to be used.
- *
- * @param stalFactory
+ * @param factoryMap the factoryMap to set
*/
- public void setStalFactory(STALFactory stalFactory) {
- this.stalFactory = stalFactory;
+ public void setFactories(List<BindingProcessorFactory> factories) {
+ this.factories = factories;
}
/**
- * Could be used to setup a new executor service during application stratup.
+ * Sets a SLCommandInvoker prototype used to create a SLCommandInvoker for
+ * initialization of a BindingProcessor.
*
- * @param executorService
+ * @param invoker
+ */
+ public void setSlCommandInvoker(SLCommandInvoker invoker) {
+ commandInvoker = invoker;
+ }
+
+ /**
+ * @return the SLCommandInvoker prototype used to create a SLCommandInvoker
+ * for initialization of a BindingProcessor.
*/
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
+ public SLCommandInvoker getCommandInvoker() {
+ return commandInvoker;
}
- public void setRemovalStrategy(RemovalStrategy aStrategy) {
- removalStrategy = aStrategy;
+ /**
+ * @return the STALFactory currently used.
+ */
+ public STALFactory getStalFactory() {
+ return stalFactory;
}
- public RemovalStrategy getRemovlaStrategy() {
- return removalStrategy;
+ /**
+ * Sets the STALFactory used to create a STAL implementation for initialization of
+ * a BindingProcessor.
+ *
+ * @param stalFactory
+ */
+ public void setStalFactory(STALFactory stalFactory) {
+ this.stalFactory = stalFactory;
}
+ /* (non-Javadoc)
+ * @see at.gv.egiz.bku.binding.BindingProcessorManager#shutdown()
+ */
+ @Override
public void shutdown() {
- log.info("Shutting down the BindingProcessorManager");
+ log.info("Shutting down the BindingProcessorManager.");
executorService.shutdown();
+ cleanUpService.shutdown();
}
+ /* (non-Javadoc)
+ * @see at.gv.egiz.bku.binding.BindingProcessorManager#shutdownNow()
+ */
+ @Override
public void shutdownNow() {
log.info("Shutting down the BindingProcessorManager NOW!");
+ cleanUpService.shutdownNow();
executorService.shutdownNow();
- log.debug("Number of binding contexts currently managed: "
- + contextMap.size());
-// + bindingProcessorMap.size());
+ log.debug("Number of binding contexts currently managed: {}.", submittedFutures.size());
if (log.isDebugEnabled()) {
- for (ProcessingContext ctx : contextMap.values()) {
- Id bpId = ctx.getBindingProcessor().getId();
- Future future = ctx.getFuture();
- log.debug(bpId + " cancelled: " + future.isCancelled());
- log.debug(bpId + " done: " + future.isDone());
+ for (BindingProcessorFuture future : submittedFutures.values()) {
+ if (future.isCancelled()) {
+ log.debug("BindingProcessor {} is cancelled.", future.getBindingProcessor().getId());
+ } else {
+ log.debug("BindingProcessor {} is done: {}.", future.getBindingProcessor().getId(), future.isDone());
+ }
}
-// for (Iterator<MapEntityWrapper> it = bindingProcessorMap.values()
-// .iterator(); it.hasNext();) {
-// MapEntityWrapper entry = it.next();
-// log.debug(entry.getBindingProcessor().getId() + ": isDone: "
-// + entry.getFuture().isDone());
-// log.debug(entry.getBindingProcessor().getId() + ": isCanceled: "
-// + entry.getFuture().isCancelled());
-// }
}
}
- /**
- * Uses the default locale
+ /* (non-Javadoc)
+ * @see at.gv.egiz.bku.binding.BindingProcessorManager#createBindingProcessor(java.lang.String, java.lang.String)
*/
- public BindingProcessor createBindingProcessor(String srcUrl,
- String aSessionId) throws MalformedURLException {
- return createBindingProcessor(srcUrl, aSessionId, null);
+ @Override
+ public BindingProcessor createBindingProcessor(String protocol) {
+ Protocol p = Protocol.fromString(protocol);
+ for (BindingProcessorFactory factory : factories) {
+ if (factory.getSupportedProtocols().contains(p)) {
+ return factory.createBindingProcessor();
+ }
+ }
+ throw new IllegalArgumentException();
}
- /**
- * FactoryMethod creating a new BindingProcessor object.
- *
- * @param protocol
- * must not be null
- * @throws MalformedURLException
+ /* (non-Javadoc)
+ * @see at.gv.egiz.bku.binding.BindingProcessorManager#createBindingProcessor(java.lang.String, java.lang.String, java.util.Locale)
*/
- public BindingProcessor createBindingProcessor(String srcUrl,
- String aSessionId, Locale locale) throws MalformedURLException {
- URL url = new URL(srcUrl);
- String low = url.getProtocol().toLowerCase();
- Protocol proto = null;
- for (int i = 0; i < SUPPORTED_PROTOCOLS.length; i++) {
- if (SUPPORTED_PROTOCOLS[i].toString().equals(low)) {
- proto = SUPPORTED_PROTOCOLS[i];
- break;
- }
- }
- if (proto == null) {
- throw new UnsupportedOperationException();
- }
- BindingProcessor bindingProcessor = new HTTPBindingProcessor(aSessionId,
- commandInvokerClass.newInstance(), url);
- stalFactory.setLocale(locale);
- STAL stal = stalFactory.createSTAL();
- bindingProcessor.init(stal, commandInvokerClass.newInstance(), config);
- if (locale != null) {
- bindingProcessor.setLocale(locale);
-// stal.setLocale(locale);
- }
+ @Override
+ public BindingProcessor createBindingProcessor(String protocol, Locale locale) {
+ BindingProcessor bindingProcessor = createBindingProcessor(protocol);
+ bindingProcessor.setLocale(locale);
return bindingProcessor;
}
- /**
- * @return the bindingprocessor object for this id or null if no
- * bindingprocessor was found.
+ /* (non-Javadoc)
+ * @see at.gv.egiz.bku.binding.BindingProcessorManager#process(java.lang.String, at.gv.egiz.bku.binding.BindingProcessor)
*/
- @Override
- public BindingProcessor getBindingProcessor(Id aId) {
-// if (bindingProcessorMap.get(aId) != null) {
-// return bindingProcessorMap.get(aId).getBindingProcessor();
- ProcessingContext ctx = contextMap.get(aId);
- if (ctx != null) {
- return ctx.getBindingProcessor();
- } else {
- return null;
+ @Override
+ public BindingProcessorFuture process(Id id, BindingProcessor bindingProcessor) {
+
+ log.trace("Initialize BindingProcessor for processing.");
+ bindingProcessor.init(id.toString(), stalFactory.createSTAL(), commandInvoker.newInstance());
+
+ BindingProcessorFuture future = new BindingProcessorFuture(bindingProcessor);
+ if (submittedFutures.containsKey(bindingProcessor.getId())) {
+ log.error("BindingProcessor with with id {} already submitted.", id);
+ throw new SLRuntimeException("BindingProcessor with with id " + id
+ + " already submitted.");
}
+
+ try {
+ log.debug("Submitting BindingProcessor {} for processing.", id);
+ executorService.execute(future);
+ submittedFutures.put(bindingProcessor.getId(), future);
+ } catch (RejectedExecutionException e) {
+ log.error("BindingProcessor {} processing rejected.", id, e);
+ throw new SLRuntimeException("BindingProcessor {} " + id + " processing rejected.", e);
+ }
+
+ return future;
+
}
- /**
- *
+ /* (non-Javadoc)
+ * @see at.gv.egiz.bku.binding.BindingProcessorManager#getBindingProcessor(at.gv.egiz.bku.binding.Id)
*/
- @Override
- public void setSTALFactory(STALFactory aStalFactory) {
- if (aStalFactory == null) {
- throw new NullPointerException("Cannot set STALFactory to null");
+ @Override
+ public BindingProcessor getBindingProcessor(Id id) {
+ BindingProcessorFuture future = submittedFutures.get(id);
+ if (future != null) {
+ return future.getBindingProcessor();
+ } else {
+ return null;
}
- stalFactory = aStalFactory;
}
- /**
- * Causes the BindingProcessorManager to manage the provided BindingProcessor
- * Creates a processing context,
- * schedules the provided binding processor for processing and
- * immediately returns the context.
- *
- * @param aBindingProcessor
- * must not be null
+ /* (non-Javadoc)
+ * @see at.gv.egiz.bku.binding.BindingProcessorManager#removeBindingProcessor(at.gv.egiz.bku.binding.Id)
*/
@Override
- public ProcessingContext process(BindingProcessor aBindingProcessor) {
- if (contextMap.containsKey(aBindingProcessor.getId())) {
-// if (bindingProcessorMap.containsKey(aBindingProcessor.getId())) {
- log.fatal("Clashing ids, cannot process bindingprocessor with id:"
- + aBindingProcessor.getId());
- throw new SLRuntimeException(
- "Clashing ids, cannot process bindingprocessor with id:"
- + aBindingProcessor.getId());
+ public void removeBindingProcessor(Id id) {
+ BindingProcessorFuture future = submittedFutures.remove(id);
+ if (future != null) {
+ if (!future.isDone()) {
+ log.debug("Interrupting BindingProcessor {}.", id );
+ future.cancel(true);
+ }
+ if (log.isInfoEnabled()) {
+ Object[] args = {id, future.getExecutionTime() / 1000.0, future.getAge() / 1000.0};
+ log.info("Removing BindingProcessor {} (active:{}s/age:{}s).", args);
+ }
}
- log.debug("processing bindingprocessor: " + aBindingProcessor.getId());
- Future<?> f = executorService.submit(aBindingProcessor);
- ProcessingContext ctx = new ProcessingContext(aBindingProcessor, f);
- contextMap.put(aBindingProcessor.getId(), ctx);
-// bindingProcessorMap.put(aBindingProcessor.getId(), new MapEntityWrapper(f,
-// aBindingProcessor));
- return ctx;
}
+ /* (non-Javadoc)
+ * @see at.gv.egiz.bku.binding.BindingProcessorManager#getManagedIds()
+ */
@Override
- public void setSLCommandInvoker(SLCommandInvoker invoker) {
- commandInvokerClass = invoker;
+ public Set<Id> getManagedIds() {
+ return Collections.unmodifiableSet(new HashSet<Id>(submittedFutures.keySet()));
}
+ /* (non-Javadoc)
+ * @see at.gv.egiz.bku.jmx.ComponentMXBean#checkComponentState()
+ */
@Override
- public void removeBindingProcessor(Id sessionId) {
- log.debug("Removing binding processor: " + sessionId);
- ProcessingContext ctx = contextMap.get(sessionId);
- if (ctx == null) {
- log.warn("no processing context to remove for session " + sessionId);
- return;
- }
- Future f = ctx.getFuture();
-
-// MapEntityWrapper wrapper = bindingProcessorMap.get(sessionId);
-// if (wrapper == null) {
-// return;
-// }
-// Future<?> f = wrapper.getFuture();
-
- if (!f.isDone()) {
- log.trace("canceling " + sessionId);
- f.cancel(true);
- }
- contextMap.remove(sessionId);
-// bindingProcessorMap.remove(sessionId);
+ public ComponentState checkComponentState() {
+ return new ComponentState(true);
}
-
- @Override
- public Set<Id> getManagedIds() {
- Set<Id> result = new HashSet<Id>();
- synchronized (contextMap) {
- for (Id id : contextMap.keySet()) {
- result.add(id);
+
+ public class CleanUpTask implements Runnable {
+
+ @Override
+ public void run() {
+ Collection<BindingProcessorFuture> futures = submittedFutures.values();
+ List<Id> toBeRemoved = new ArrayList<Id>();
+ int active = 0;
+ for(BindingProcessorFuture future : futures) {
+ BindingProcessor bindingProcessor = future.getBindingProcessor();
+ if (!future.isDone()) {
+ active++;
+ }
+ if ((bindingProcessor.getLastAccessTime().getTime() - System
+ .currentTimeMillis()) > maxAcceptedAge) {
+ toBeRemoved.add(bindingProcessor.getId());
+ }
+ }
+ for (Id id : toBeRemoved) {
+ removeBindingProcessor(id);
}
}
-// synchronized (bindingProcessorMap) {
-// for (Iterator<Id> it = bindingProcessorMap.keySet().iterator(); it
-// .hasNext();) {
-// result.add(it.next());
-// }
-// }
- return result;
+
}
-} \ No newline at end of file
+}