diff options
Diffstat (limited to 'bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java')
-rw-r--r-- | bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java | 414 |
1 files changed, 191 insertions, 223 deletions
diff --git a/bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java b/bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java index bf9a63e2..eee80b03 100644 --- a/bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java +++ b/bkucommon/src/main/java/at/gv/egiz/bku/binding/BindingProcessorManagerImpl.java @@ -16,315 +16,283 @@ */ package at.gv.egiz.bku.binding; -import at.gv.egiz.bku.conf.Configuration; -import java.net.MalformedURLException; -import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.apache.commons.configuration.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import at.gv.egiz.bku.jmx.ComponentMXBean; +import at.gv.egiz.bku.jmx.ComponentState; import at.gv.egiz.bku.slcommands.SLCommandInvoker; import at.gv.egiz.bku.slexceptions.SLRuntimeException; import at.gv.egiz.bku.utils.binding.Protocol; -import at.gv.egiz.stal.STAL; import at.gv.egiz.stal.STALFactory; /** * This class maintains all active BindingProcessor Objects. Currently, only * HTTPBinding is supported. */ -public class BindingProcessorManagerImpl implements BindingProcessorManager { +public class BindingProcessorManagerImpl implements BindingProcessorManager, ComponentMXBean { + + public static long DEFAULT_MAX_ACCEPTED_AGE = 2 * 60 * 1000; + + public static int DEFAULT_CLEAN_UP_INTERVAL = 60; - public final static Protocol[] SUPPORTED_PROTOCOLS = { Protocol.HTTP, - Protocol.HTTPS }; + private final Logger log = LoggerFactory.getLogger(BindingProcessorManagerImpl.class); - private static Log log = LogFactory.getLog(BindingProcessorManagerImpl.class); + private List<BindingProcessorFactory> factories = Collections.emptyList(); - /** spring injected config - * Passed to created bindingprocessors, to replace their configuration */ - protected Configuration config; + private Configuration configuration; - protected STALFactory stalFactory; - protected SLCommandInvoker commandInvokerClass; + private STALFactory stalFactory; + + private SLCommandInvoker commandInvoker; - private RemovalStrategy removalStrategy; - private ExecutorService executorService; - private Map<Id, ProcessingContext> contextMap = Collections.synchronizedMap(new HashMap<Id, ProcessingContext>()); -// private Map<Id, MapEntityWrapper> bindingProcessorMap = Collections -// .synchronizedMap(new HashMap<Id, MapEntityWrapper>()); + private ExecutorService executorService = Executors.newCachedThreadPool(); + private Map<Id, BindingProcessorFuture> submittedFutures = Collections + .synchronizedMap(new HashMap<Id, BindingProcessorFuture>()); + + private int cleanUpInterval = DEFAULT_CLEAN_UP_INTERVAL; + + private long maxAcceptedAge = DEFAULT_MAX_ACCEPTED_AGE; + + private ScheduledExecutorService cleanUpService = Executors + .newSingleThreadScheduledExecutor(); + + public BindingProcessorManagerImpl() { + cleanUpService.scheduleAtFixedRate(new CleanUpTask(), cleanUpInterval, + cleanUpInterval, TimeUnit.SECONDS); + } + /** - * Container to hold a Future and Bindingprocessor object as map value. - * - * @author wbauer - * @see BindingProcessorManagerImpl#bindingProcessorMap + * @return the configuration */ -// static class MapEntityWrapper { -// private Future<?> future; -// private BindingProcessor bindingProcessor; -// -// public MapEntityWrapper(Future<?> future, BindingProcessor bindingProcessor) { -// if ((bindingProcessor == null) || (future == null)) { -// throw new NullPointerException("Argument must not be null"); -// } -// this.bindingProcessor = bindingProcessor; -// this.future = future; -// } -// -// public Future<?> getFuture() { -// return future; -// } -// -// public BindingProcessor getBindingProcessor() { -// return bindingProcessor; -// } -// -// public int hashCode() { -// return bindingProcessor.getId().hashCode(); -// } -// -// public boolean equals(Object other) { -// if (other instanceof MapEntityWrapper) { -// MapEntityWrapper o = (MapEntityWrapper) other; -// return (o.bindingProcessor.getId().equals(bindingProcessor.getId())); -// } else { -// return false; -// } -// } -// } + public Configuration getConfiguration() { + return configuration; + } /** - * - * @param fab - * must not be null - * @param ci - * must not be null (prototype to generate new instances) + * @param configuration the configuration to set */ - public BindingProcessorManagerImpl(STALFactory fab, SLCommandInvoker ci, Configuration conf) { - if (fab == null) { - throw new NullPointerException("STALFactory must not be null"); - } - stalFactory = fab; - if (ci == null) { - throw new NullPointerException("SLCommandInvoker must not be null"); - } - commandInvokerClass = ci; - config = conf; - executorService = Executors.newCachedThreadPool(); + public void setConfiguration(Configuration configuration) { + this.configuration = configuration; } /** - * - * @return the STALFactory currently used. + * @return the factoryMap */ - public STALFactory getStalFactory() { - return stalFactory; + public List<BindingProcessorFactory> getFactories() { + return factories; } /** - * Sets the STALFactory to be used. - * - * @param stalFactory + * @param factoryMap the factoryMap to set */ - public void setStalFactory(STALFactory stalFactory) { - this.stalFactory = stalFactory; + public void setFactories(List<BindingProcessorFactory> factories) { + this.factories = factories; } /** - * Could be used to setup a new executor service during application stratup. + * Sets a SLCommandInvoker prototype used to create a SLCommandInvoker for + * initialization of a BindingProcessor. * - * @param executorService + * @param invoker + */ + public void setSlCommandInvoker(SLCommandInvoker invoker) { + commandInvoker = invoker; + } + + /** + * @return the SLCommandInvoker prototype used to create a SLCommandInvoker + * for initialization of a BindingProcessor. */ - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; + public SLCommandInvoker getCommandInvoker() { + return commandInvoker; } - public void setRemovalStrategy(RemovalStrategy aStrategy) { - removalStrategy = aStrategy; + /** + * @return the STALFactory currently used. + */ + public STALFactory getStalFactory() { + return stalFactory; } - public RemovalStrategy getRemovlaStrategy() { - return removalStrategy; + /** + * Sets the STALFactory used to create a STAL implementation for initialization of + * a BindingProcessor. + * + * @param stalFactory + */ + public void setStalFactory(STALFactory stalFactory) { + this.stalFactory = stalFactory; } + /* (non-Javadoc) + * @see at.gv.egiz.bku.binding.BindingProcessorManager#shutdown() + */ + @Override public void shutdown() { - log.info("Shutting down the BindingProcessorManager"); + log.info("Shutting down the BindingProcessorManager."); executorService.shutdown(); + cleanUpService.shutdown(); } + /* (non-Javadoc) + * @see at.gv.egiz.bku.binding.BindingProcessorManager#shutdownNow() + */ + @Override public void shutdownNow() { log.info("Shutting down the BindingProcessorManager NOW!"); + cleanUpService.shutdownNow(); executorService.shutdownNow(); - log.debug("Number of binding contexts currently managed: " - + contextMap.size()); -// + bindingProcessorMap.size()); + log.debug("Number of binding contexts currently managed: {}.", submittedFutures.size()); if (log.isDebugEnabled()) { - for (ProcessingContext ctx : contextMap.values()) { - Id bpId = ctx.getBindingProcessor().getId(); - Future future = ctx.getFuture(); - log.debug(bpId + " cancelled: " + future.isCancelled()); - log.debug(bpId + " done: " + future.isDone()); + for (BindingProcessorFuture future : submittedFutures.values()) { + if (future.isCancelled()) { + log.debug("BindingProcessor {} is cancelled.", future.getBindingProcessor().getId()); + } else { + log.debug("BindingProcessor {} is done: {}.", future.getBindingProcessor().getId(), future.isDone()); + } } -// for (Iterator<MapEntityWrapper> it = bindingProcessorMap.values() -// .iterator(); it.hasNext();) { -// MapEntityWrapper entry = it.next(); -// log.debug(entry.getBindingProcessor().getId() + ": isDone: " -// + entry.getFuture().isDone()); -// log.debug(entry.getBindingProcessor().getId() + ": isCanceled: " -// + entry.getFuture().isCancelled()); -// } } } - /** - * Uses the default locale + /* (non-Javadoc) + * @see at.gv.egiz.bku.binding.BindingProcessorManager#createBindingProcessor(java.lang.String, java.lang.String) */ - public BindingProcessor createBindingProcessor(String srcUrl, - String aSessionId) throws MalformedURLException { - return createBindingProcessor(srcUrl, aSessionId, null); + @Override + public BindingProcessor createBindingProcessor(String protocol) { + Protocol p = Protocol.fromString(protocol); + for (BindingProcessorFactory factory : factories) { + if (factory.getSupportedProtocols().contains(p)) { + return factory.createBindingProcessor(); + } + } + throw new IllegalArgumentException(); } - /** - * FactoryMethod creating a new BindingProcessor object. - * - * @param protocol - * must not be null - * @throws MalformedURLException + /* (non-Javadoc) + * @see at.gv.egiz.bku.binding.BindingProcessorManager#createBindingProcessor(java.lang.String, java.lang.String, java.util.Locale) */ - public BindingProcessor createBindingProcessor(String srcUrl, - String aSessionId, Locale locale) throws MalformedURLException { - URL url = new URL(srcUrl); - String low = url.getProtocol().toLowerCase(); - Protocol proto = null; - for (int i = 0; i < SUPPORTED_PROTOCOLS.length; i++) { - if (SUPPORTED_PROTOCOLS[i].toString().equals(low)) { - proto = SUPPORTED_PROTOCOLS[i]; - break; - } - } - if (proto == null) { - throw new UnsupportedOperationException(); - } - BindingProcessor bindingProcessor = new HTTPBindingProcessor(aSessionId, - commandInvokerClass.newInstance(), url); - stalFactory.setLocale(locale); - STAL stal = stalFactory.createSTAL(); - bindingProcessor.init(stal, commandInvokerClass.newInstance(), config); - if (locale != null) { - bindingProcessor.setLocale(locale); -// stal.setLocale(locale); - } + @Override + public BindingProcessor createBindingProcessor(String protocol, Locale locale) { + BindingProcessor bindingProcessor = createBindingProcessor(protocol); + bindingProcessor.setLocale(locale); return bindingProcessor; } - /** - * @return the bindingprocessor object for this id or null if no - * bindingprocessor was found. + /* (non-Javadoc) + * @see at.gv.egiz.bku.binding.BindingProcessorManager#process(java.lang.String, at.gv.egiz.bku.binding.BindingProcessor) */ - @Override - public BindingProcessor getBindingProcessor(Id aId) { -// if (bindingProcessorMap.get(aId) != null) { -// return bindingProcessorMap.get(aId).getBindingProcessor(); - ProcessingContext ctx = contextMap.get(aId); - if (ctx != null) { - return ctx.getBindingProcessor(); - } else { - return null; + @Override + public BindingProcessorFuture process(Id id, BindingProcessor bindingProcessor) { + + log.trace("Initialize BindingProcessor for processing."); + bindingProcessor.init(id.toString(), stalFactory.createSTAL(), commandInvoker.newInstance()); + + BindingProcessorFuture future = new BindingProcessorFuture(bindingProcessor); + if (submittedFutures.containsKey(bindingProcessor.getId())) { + log.error("BindingProcessor with with id {} already submitted.", id); + throw new SLRuntimeException("BindingProcessor with with id " + id + + " already submitted."); } + + try { + log.debug("Submitting BindingProcessor {} for processing.", id); + executorService.execute(future); + submittedFutures.put(bindingProcessor.getId(), future); + } catch (RejectedExecutionException e) { + log.error("BindingProcessor {} processing rejected.", id, e); + throw new SLRuntimeException("BindingProcessor {} " + id + " processing rejected.", e); + } + + return future; + } - /** - * + /* (non-Javadoc) + * @see at.gv.egiz.bku.binding.BindingProcessorManager#getBindingProcessor(at.gv.egiz.bku.binding.Id) */ - @Override - public void setSTALFactory(STALFactory aStalFactory) { - if (aStalFactory == null) { - throw new NullPointerException("Cannot set STALFactory to null"); + @Override + public BindingProcessor getBindingProcessor(Id id) { + BindingProcessorFuture future = submittedFutures.get(id); + if (future != null) { + return future.getBindingProcessor(); + } else { + return null; } - stalFactory = aStalFactory; } - /** - * Causes the BindingProcessorManager to manage the provided BindingProcessor - * Creates a processing context, - * schedules the provided binding processor for processing and - * immediately returns the context. - * - * @param aBindingProcessor - * must not be null + /* (non-Javadoc) + * @see at.gv.egiz.bku.binding.BindingProcessorManager#removeBindingProcessor(at.gv.egiz.bku.binding.Id) */ @Override - public ProcessingContext process(BindingProcessor aBindingProcessor) { - if (contextMap.containsKey(aBindingProcessor.getId())) { -// if (bindingProcessorMap.containsKey(aBindingProcessor.getId())) { - log.fatal("Clashing ids, cannot process bindingprocessor with id:" - + aBindingProcessor.getId()); - throw new SLRuntimeException( - "Clashing ids, cannot process bindingprocessor with id:" - + aBindingProcessor.getId()); + public void removeBindingProcessor(Id id) { + BindingProcessorFuture future = submittedFutures.remove(id); + if (future != null) { + if (!future.isDone()) { + log.debug("Interrupting BindingProcessor {}.", id ); + future.cancel(true); + } + if (log.isInfoEnabled()) { + Object[] args = {id, future.getExecutionTime() / 1000.0, future.getAge() / 1000.0}; + log.info("Removing BindingProcessor {} (active:{}s/age:{}s).", args); + } } - log.debug("processing bindingprocessor: " + aBindingProcessor.getId()); - Future<?> f = executorService.submit(aBindingProcessor); - ProcessingContext ctx = new ProcessingContext(aBindingProcessor, f); - contextMap.put(aBindingProcessor.getId(), ctx); -// bindingProcessorMap.put(aBindingProcessor.getId(), new MapEntityWrapper(f, -// aBindingProcessor)); - return ctx; } + /* (non-Javadoc) + * @see at.gv.egiz.bku.binding.BindingProcessorManager#getManagedIds() + */ @Override - public void setSLCommandInvoker(SLCommandInvoker invoker) { - commandInvokerClass = invoker; + public Set<Id> getManagedIds() { + return Collections.unmodifiableSet(new HashSet<Id>(submittedFutures.keySet())); } + /* (non-Javadoc) + * @see at.gv.egiz.bku.jmx.ComponentMXBean#checkComponentState() + */ @Override - public void removeBindingProcessor(Id sessionId) { - log.debug("Removing binding processor: " + sessionId); - ProcessingContext ctx = contextMap.get(sessionId); - if (ctx == null) { - log.warn("no processing context to remove for session " + sessionId); - return; - } - Future f = ctx.getFuture(); - -// MapEntityWrapper wrapper = bindingProcessorMap.get(sessionId); -// if (wrapper == null) { -// return; -// } -// Future<?> f = wrapper.getFuture(); - - if (!f.isDone()) { - log.trace("canceling " + sessionId); - f.cancel(true); - } - contextMap.remove(sessionId); -// bindingProcessorMap.remove(sessionId); + public ComponentState checkComponentState() { + return new ComponentState(true); } - - @Override - public Set<Id> getManagedIds() { - Set<Id> result = new HashSet<Id>(); - synchronized (contextMap) { - for (Id id : contextMap.keySet()) { - result.add(id); + + public class CleanUpTask implements Runnable { + + @Override + public void run() { + Collection<BindingProcessorFuture> futures = submittedFutures.values(); + List<Id> toBeRemoved = new ArrayList<Id>(); + int active = 0; + for(BindingProcessorFuture future : futures) { + BindingProcessor bindingProcessor = future.getBindingProcessor(); + if (!future.isDone()) { + active++; + } + if ((bindingProcessor.getLastAccessTime().getTime() - System + .currentTimeMillis()) > maxAcceptedAge) { + toBeRemoved.add(bindingProcessor.getId()); + } + } + for (Id id : toBeRemoved) { + removeBindingProcessor(id); } } -// synchronized (bindingProcessorMap) { -// for (Iterator<Id> it = bindingProcessorMap.keySet().iterator(); it -// .hasNext();) { -// result.add(it.next()); -// } -// } - return result; + } -}
\ No newline at end of file +} |