From d89f36b67ea1d838a78523538a24e044518f3587 Mon Sep 17 00:00:00 2001 From: mcentner Date: Tue, 26 Jan 2010 16:22:56 +0000 Subject: MOCCA 1.2.11 with SHA-2 enabled. git-svn-id: https://joinup.ec.europa.eu/svn/mocca/branches/mocca-1.2.11-sha2@599 8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4 --- .../stal/service/impl/STALRequestBrokerImpl.java | 324 +++++++++++++++++++++ 1 file changed, 324 insertions(+) create mode 100644 mocca-1.2.11/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java (limited to 'mocca-1.2.11/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java') diff --git a/mocca-1.2.11/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java b/mocca-1.2.11/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java new file mode 100644 index 00000000..a2447ab7 --- /dev/null +++ b/mocca-1.2.11/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java @@ -0,0 +1,324 @@ +/* + * Copyright 2008 Federal Chancellery Austria and + * Graz University of Technology + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package at.gv.egiz.stal.service.impl; + +import at.gv.egiz.stal.ErrorResponse; +import at.gv.egiz.stal.HashDataInput; +import at.gv.egiz.stal.QuitRequest; +import at.gv.egiz.stal.STALRequest; +import at.gv.egiz.stal.STALResponse; +import at.gv.egiz.stal.SignRequest; +import at.gv.egiz.stal.service.translator.STALTranslator; +import at.gv.egiz.stal.service.translator.TranslationException; +import at.gv.egiz.stal.service.types.ObjectFactory; +import at.gv.egiz.stal.service.types.QuitRequestType; +import at.gv.egiz.stal.service.types.RequestType; +import at.gv.egiz.stal.service.types.ResponseType; +import at.gv.egiz.stalx.service.translator.STALXTranslationHandler; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import javax.xml.bind.JAXBElement; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * An instance of STALRequestBroker is shared between a producer thread (SLCommand) + * and multiple consumer threads (STALService). + * This implementation assures that handleRequest is executed only once the previous invocation returned. + * The BindingProcessor assures that a new SLCommand calls handleRequest() only once + * the bindingProcessor called handleRequest(QUIT) after the previous SLCommand's handleRequest() returned. + * + * Multiple STALService threads might call nextRequest()/getSignedReferences() in any order. + * + * @author clemens + */ +public class STALRequestBrokerImpl implements STALRequestBroker { + + private static final Log log = LogFactory.getLog(STALRequestBrokerImpl.class); + + private ObjectFactory of = new ObjectFactory(); + private STALTranslator translator = new STALTranslator(); + + private boolean interrupted = false; + + protected final ArrayList> requests; + protected final ArrayList> responses; + + protected ArrayList hashDataInputs; + + private long timeout; + + public STALRequestBrokerImpl(long timeoutMillisec) { + if (timeoutMillisec <= 0) + timeoutMillisec = DEFAULT_TIMEOUT_MS; + timeout = timeoutMillisec; + requests = new ArrayList>(); + responses = new ArrayList>(); + hashDataInputs = new ArrayList(); + + // register handler for STAL-X + translator.registerTranslationHandler(new STALXTranslationHandler()); + } + + /** + * Produce requests (and HashDataInputCallback) and wait for responses. + * This method is not thread safe, since every bindingprocessor thread possesses it's own instance. + * It however assures cooperation with STAL webservice threads consuming the requests and producing responses. + * + * @param requests + * @return + * + * @pre requests: either single SignRequest, QuitRequest or multiple ReadInfoboxRequests + */ + @Override + public List handleRequest(List stalRequests) { + if (interrupted) { + return null; + } + try { + synchronized (requests) { + log.trace("produce request"); + + requests.clear(); + hashDataInputs.clear(); + + for (STALRequest stalRequest : stalRequests) { + try { + JAXBElement request = translator.translate(stalRequest); + requests.add(request); + if (stalRequest instanceof SignRequest) { + //TODO refactor SignRequestType to keep HDI + // and getHashDataInput() accesses request obj + // (requests are cleared only when we receive the response) + // DataObjectHashDataInput with reference caching enabled DataObject + hashDataInputs.addAll(((SignRequest) stalRequest).getHashDataInput()); + } else if (stalRequest instanceof QuitRequest) { + log.trace("Received QuitRequest, do not wait for responses."); + log.trace("notifying request consumers"); + requests.notify(); + return new ArrayList(); + } + } catch (TranslationException ex) { + log.error(ex.getMessage() + ", send QUIT"); + requests.clear(); + QuitRequestType reqT = of.createQuitRequestType(); + JAXBElement req = of.createGetNextRequestResponseTypeQuitRequest(reqT); + requests.add(req); + log.trace("notifying request consumers"); + requests.notify(); + return new ArrayList(); + } + } + + +// if (stalRequest instanceof SignRequest) { +// log.trace("Received SignRequest, keep HashDataInput."); +// SignRequestType reqT = of.createSignRequestType(); +// reqT.setKeyIdentifier(((SignRequest) stalRequest).getKeyIdentifier()); +// reqT.setSignedInfo(((SignRequest) stalRequest).getSignedInfo()); +// JAXBElement req = of.createGetNextRequestResponseTypeSignRequest(reqT); +// requests.add(req); +// //DataObjectHashDataInput with reference caching enabled DataObject +// hashDataInputs.addAll(((SignRequest) stalRequest).getHashDataInput()); +// break; +// } else if (stalRequest instanceof InfoboxReadRequest) { +// log.trace("Received InfoboxReadRequest"); +// InfoboxReadRequestType reqT = new InfoboxReadRequestType(); +// reqT.setInfoboxIdentifier(((InfoboxReadRequest) stalRequest).getInfoboxIdentifier()); +// reqT.setDomainIdentifier(((InfoboxReadRequest) stalRequest).getDomainIdentifier()); +// JAXBElement req = of.createGetNextRequestResponseTypeInfoboxReadRequest(reqT); +// requests.add(req); +// } else if (stalRequest instanceof QuitRequest) { +// log.trace("Received QuitRequest, do not wait for responses."); +// QuitRequestType reqT = of.createQuitRequestType(); +// JAXBElement req = of.createGetNextRequestResponseTypeQuitRequest(reqT); +// requests.add(req); +// log.trace("notifying request consumers"); +// requests.notify(); +// return new ArrayList(); +// } else { +// log.error("Received unsupported STAL request: " + stalRequest.getClass().getName() + ", send QUIT"); +// requests.clear(); +// QuitRequestType reqT = of.createQuitRequestType(); +// JAXBElement req = of.createGetNextRequestResponseTypeQuitRequest(reqT); +// requests.add(req); +// log.trace("notifying request consumers"); +// requests.notify(); +// return new ArrayList(); +// } +// } + log.trace("notifying request consumers"); + requests.notify(); + } + + synchronized (responses) { + long beforeWait = System.currentTimeMillis(); + while (responses.isEmpty()) { + log.trace("waiting to consume response"); + responses.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { + log.warn("timeout while waiting to consume response, cleanup requests"); + requests.clear(); + hashDataInputs.clear(); + return Collections.singletonList((STALResponse) new ErrorResponse(ERR_4500)); + } + } + log.trace("consuming responses"); + List stalResponses = new ArrayList(); + try { + for (JAXBElement resp : responses) { + STALResponse stalResp = translator.translate(resp); + stalResponses.add(stalResp); + } + } catch (TranslationException ex) { + log.error(ex.getMessage() + ", return ErrorResponse (4000)"); + ErrorResponse stalResp = new ErrorResponse(4000); + stalResp.setErrorMessage(ex.getMessage()); + stalResponses = Collections.singletonList((STALResponse) stalResp); + } + + responses.clear(); + log.trace("notifying response producers"); + responses.notify(); + + return stalResponses; + } + } catch (InterruptedException ex) { + log.warn("interrupt in handleRequest(): " + ex.getMessage()); + interrupted = true; + return null; + } + } + + @Override + public List> connect() { + if (interrupted) { + return null; + } + try { + synchronized (requests) { + long beforeWait = System.currentTimeMillis(); + while (requests.isEmpty()) { + log.trace("waiting to consume request"); + requests.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { + log.warn("timeout while waiting to consume request"); + return createSingleQuitRequest(); + } + } + log.trace("don't consume request now, leave for further connect calls"); + return requests; + } + } catch (InterruptedException ex) { + log.warn("interrupt in nextRequest(): " + ex.getMessage()); + interrupted = true; + return null; + } + } + + /** + * This method is thread-safe, except for + * an 'initial' call to nextRequest(null) followed by a + * 'zombie' call to nextRequest(notNull). + * This case (per design) leads to a timeout of the original call. + * (synchronizing the entire method does not + * hinder the zombie to interrupt two consecutive nextRequest() calls.) + * + * @param responses + * @return QUIT if expected responses are not provided + */ + @Override + public List> nextRequest(List> resps) { + if (interrupted) { + return null; + } + try { + synchronized (requests) { + log.trace("received responses, now consume request"); + if (requests.size() != 0) { + requests.clear(); + } else { + log.warn("requests queue is empty, response might have already been produced previously "); + // return QUIT? + } + } + + synchronized (responses) { + if (resps != null && resps.size() > 0) { + long beforeWait = System.currentTimeMillis(); + while (!responses.isEmpty()) { + log.trace("waiting to produce response"); + responses.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { + log.warn("timeout while waiting to produce response"); + return createSingleQuitRequest(); + } + } + log.trace("produce response"); + responses.addAll(resps); + //reset HashDataInputCallback iff SignResponse + if (log.isTraceEnabled()) { + for (JAXBElement response : resps) { + log.trace("Received STAL response: " + response.getValue().getClass().getName()); + } + } + log.trace("notifying response consumers"); + responses.notify(); + } else { + log.error("Received NextRequest without responses, return QUIT"); + return createSingleQuitRequest(); + } + } + + synchronized (requests) { + long beforeWait = System.currentTimeMillis(); + while (requests.isEmpty()) { + log.trace("waiting to consume request"); + requests.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { + log.warn("timeout while waiting to consume request"); + return createSingleQuitRequest(); + } + } + log.trace("don't consume request now, but on next response delivery"); + return requests; + } + } catch (InterruptedException ex) { + log.warn("interrupt in nextRequest(): " + ex.getMessage()); + interrupted = true; + return null; + } + } + + @Override + public List getHashDataInput() { + synchronized (requests) { + log.trace("return " + hashDataInputs.size() + " current HashDataInput(s) "); + return hashDataInputs; + } + } + + private List> createSingleQuitRequest() { + QuitRequestType quitT = of.createQuitRequestType(); + JAXBElement quit = of.createGetNextRequestResponseTypeQuitRequest(quitT); + ArrayList> l = new ArrayList>(); + l.add(quit); + return l; + } +} -- cgit v1.2.3