From ef884e591c38023d980a158f29ec1d71ed256a41 Mon Sep 17 00:00:00 2001 From: clemenso Date: Thu, 25 Sep 2008 16:31:18 +0000 Subject: consume/produce git-svn-id: https://joinup.ec.europa.eu/svn/mocca/trunk@72 8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4 --- .../stal/service/impl/STALRequestBrokerImpl.java | 160 +++++++++++++-------- 1 file changed, 100 insertions(+), 60 deletions(-) (limited to 'BKUOnline/src/main/java') diff --git a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java index 7897f984..dc3cc6d3 100644 --- a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java +++ b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java @@ -47,82 +47,79 @@ import org.apache.commons.logging.LogFactory; public class STALRequestBrokerImpl implements STALRequestBroker { private static final Log log = LogFactory.getLog(STALRequestBrokerImpl.class); - protected List requests = null; - protected List responses = null; - protected List currentHashDataInput; -// private boolean isHandlingRequest = false; + private boolean expectingResponse = false; private boolean interrupted = false; + private final RequestsMonitor reqMon = new RequestsMonitor(); + private final ResponsesMonitor respMon = new ResponsesMonitor(); + + private long timeout; + + public STALRequestBrokerImpl(long timeoutMillisec) { + if (timeoutMillisec <= 0) + timeoutMillisec = DEFAULT_TIMEOUT_MS; + this.timeout = timeoutMillisec; + } + /** * Produce requests (and HashDataInputCallback) and wait for responses. - * The next thread may enter once we consumed the responses. + * This method is not thread safe, since every bindingprocessor thread possesses it's own instance. + * It however assures cooperation with STAL webservice threads consuming the requests and producing responses. * * @param requests * @return * - * @pre requests either single SignRequest, QuitRequest or multiple ReadInfoboxRequests + * @pre requests: either single SignRequest, QuitRequest or multiple ReadInfoboxRequests */ @Override - public synchronized List handleRequest(List requests) { + public List handleRequest(List requests) { if (interrupted) { return null; } try { -// long beforeWait = System.currentTimeMillis(); -// while (isHandlingRequest) { -// log.trace("waiting to produce request"); -// wait(TIMEOUT_MS); -// if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { -// log.warn("timeout while waiting to produce request"); -// return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000)); -// } -// } + synchronized (reqMon) { log.trace("produce request"); -// isHandlingRequest = true; - this.requests = requests; - currentHashDataInput = null; + reqMon.produce(requests); + reqMon.setHashDataInput(null); for (STALRequest request : requests) { if (request instanceof SignRequest) { log.trace("Received SignRequest, keep HashDataInput."); - currentHashDataInput = ((SignRequest) request).getHashDataInput(); + reqMon.setHashDataInput(((SignRequest) request).getHashDataInput()); break; } else if (request instanceof QuitRequest) { log.trace("Received QuitRequest, do not wait for responses."); log.trace("notifying request consumers"); - notify(); + reqMon.notify(); return new ArrayList(); } else if (log.isTraceEnabled()) { log.trace("Received STAL request: " + request.getClass().getName()); } } log.trace("notifying request consumers"); - notify(); - + reqMon.notify(); + } + + synchronized (respMon) { long beforeWait = System.currentTimeMillis(); - while (this.responses == null) { + while (respMon.responses == null) { log.trace("waiting to consume response"); - wait(TIMEOUT_MS); - if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { - log.warn("timeout while waiting to consume response"); - this.requests = null; - currentHashDataInput = null; -// isHandlingRequest = false; + respMon.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { + log.warn("timeout while waiting to consume response, cleanup requests"); + reqMon.consume(); //TODO check deadlock? + reqMon.setHashDataInput(null); return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000)); } } log.trace("consuming responses"); - List resps = responses; - responses = null; + List responses = respMon.consume(); log.trace("notifying response producers"); - notify(); - -// isHandlingRequest = false; -// log.trace("notifying request producers"); -// notify(); + respMon.notify(); - return resps; + return responses; + } } catch (InterruptedException ex) { log.warn("interrupt in handleRequest(): " + ex.getMessage()); interrupted = true; @@ -131,68 +128,75 @@ public class STALRequestBrokerImpl implements STALRequestBroker { } /** + * This method is thread-safe, except for + * an 'initial' call to nextRequest(null) followed by a + * 'zombie' call to nextRequest(notNull). + * This case (per design) leads to a timeout of the original call. + * (synchronizing the entire method does not + * hinder the zombie to interrupt two consecutive nextRequest() calls.) * * @param responses * @return QUIT if expected responses are not provided */ @Override - public synchronized List nextRequest(List responses) { + public List nextRequest(List responses) { if (interrupted) { return null; } try { + synchronized (respMon) { if (responses != null && responses.size() > 0) { if (!expectingResponse) { - log.warn("Received unexpected response in nextRequest()"); + log.warn("Received unexpected response in nextRequest(), return QUIT"); return Collections.singletonList((STALRequest) new QuitRequest()); } long beforeWait = System.currentTimeMillis(); - while (this.responses != null) { + while (respMon.responses != null) { log.trace("waiting to produce response"); - wait(TIMEOUT_MS); - if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { + respMon.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { log.warn("timeout while waiting to produce response"); return Collections.singletonList((STALRequest) new QuitRequest()); } } log.trace("produce response"); - this.responses = responses; - //reset HashDataInputCallback + respMon.produce(responses); + //reset HashDataInputCallback iff SignResponse if (log.isTraceEnabled()) { for (STALResponse response : responses) { log.trace("Received STAL response: " + response.getClass().getName()); } } log.trace("notifying response consumers"); - notify(); + respMon.notify(); } else { if (expectingResponse) { - log.warn("No expected response received in nextRequest()"); + log.warn("Did not receive expected response(s) in nextRequest(), return QUIT"); return Collections.singletonList((STALRequest) new QuitRequest()); } log.trace("expecting non-null response in next nextRequest(response)"); expectingResponse = true; } + } + + synchronized (reqMon) { long beforeWait = System.currentTimeMillis(); - while (this.requests == null) { + while (reqMon.requests == null) { log.trace("waiting to consume request"); - wait(TIMEOUT_MS); - if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { + reqMon.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { log.warn("timeout while waiting to consume request"); return Collections.singletonList((STALRequest) new QuitRequest()); } } log.trace("consume request"); - List reqs = requests; - requests = null; - if (reqs.size() > 0 && reqs.get(0) instanceof QuitRequest) { -// isHandlingRequest = false; -// log.trace("consumed QUIT, notifying request producers"); -// notify(); + List requests = reqMon.consume(); + if (requests.size() > 0 && requests.get(0) instanceof QuitRequest) { log.trace("expecting no response in next nextRequest()"); expectingResponse = false; } - return reqs; + return requests; + } } catch (InterruptedException ex) { log.warn("interrupt in nextRequest(): " + ex.getMessage()); interrupted = true; @@ -202,12 +206,48 @@ public class STALRequestBrokerImpl implements STALRequestBroker { @Override public synchronized List getHashDataInput() { - log.trace("return " + currentHashDataInput.size() + " current HashDataInput(s) "); - return currentHashDataInput; + log.trace("return " + reqMon.hashDataInput.size() + " current HashDataInput(s) "); + return reqMon.getHashDataInput(); } @Override public void setLocale(Locale locale) { - // TODO Auto-generated method stub + } + + class RequestsMonitor { + List requests; + List hashDataInput; + + void produce(List req) { + requests = req; + } + + synchronized List consume() { + List reqs = requests; + requests = null; + return reqs; + } + + void setHashDataInput(List hdi) { + hashDataInput = hdi; + } + + List getHashDataInput() { + return hashDataInput; + } + } + + class ResponsesMonitor { + List responses; + + void produce(List resp) { + responses = resp; + } + + synchronized List consume() { + List resps = responses; + responses = null; + return resps; + } } } -- cgit v1.2.3