diff options
author | clemenso <clemenso@8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4> | 2008-09-25 16:31:18 +0000 |
---|---|---|
committer | clemenso <clemenso@8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4> | 2008-09-25 16:31:18 +0000 |
commit | ef884e591c38023d980a158f29ec1d71ed256a41 (patch) | |
tree | 99bcb3a1ee104e1663e99b5d42f6b7ef69f11941 /BKUOnline/src/main/java/at/gv | |
parent | cc03466f753afb6a1feb2f203966ba0672ddae95 (diff) | |
download | mocca-ef884e591c38023d980a158f29ec1d71ed256a41.tar.gz mocca-ef884e591c38023d980a158f29ec1d71ed256a41.tar.bz2 mocca-ef884e591c38023d980a158f29ec1d71ed256a41.zip |
consume/produce
git-svn-id: https://joinup.ec.europa.eu/svn/mocca/trunk@72 8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4
Diffstat (limited to 'BKUOnline/src/main/java/at/gv')
-rw-r--r-- | BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java | 160 |
1 files changed, 100 insertions, 60 deletions
diff --git a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java index 7897f984..dc3cc6d3 100644 --- a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java +++ b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java @@ -47,82 +47,79 @@ import org.apache.commons.logging.LogFactory; public class STALRequestBrokerImpl implements STALRequestBroker { private static final Log log = LogFactory.getLog(STALRequestBrokerImpl.class); - protected List<STALRequest> requests = null; - protected List<STALResponse> responses = null; - protected List<HashDataInput> currentHashDataInput; -// private boolean isHandlingRequest = false; + private boolean expectingResponse = false; private boolean interrupted = false; + private final RequestsMonitor reqMon = new RequestsMonitor(); + private final ResponsesMonitor respMon = new ResponsesMonitor(); + + private long timeout; + + public STALRequestBrokerImpl(long timeoutMillisec) { + if (timeoutMillisec <= 0) + timeoutMillisec = DEFAULT_TIMEOUT_MS; + this.timeout = timeoutMillisec; + } + /** * Produce requests (and HashDataInputCallback) and wait for responses. - * The next thread may enter once we consumed the responses. + * This method is not thread safe, since every bindingprocessor thread possesses it's own instance. + * It however assures cooperation with STAL webservice threads consuming the requests and producing responses. * * @param requests * @return * - * @pre requests either single SignRequest, QuitRequest or multiple ReadInfoboxRequests + * @pre requests: either single SignRequest, QuitRequest or multiple ReadInfoboxRequests */ @Override - public synchronized List<STALResponse> handleRequest(List<STALRequest> requests) { + public List<STALResponse> handleRequest(List<STALRequest> requests) { if (interrupted) { return null; } try { -// long beforeWait = System.currentTimeMillis(); -// while (isHandlingRequest) { -// log.trace("waiting to produce request"); -// wait(TIMEOUT_MS); -// if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { -// log.warn("timeout while waiting to produce request"); -// return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000)); -// } -// } + synchronized (reqMon) { log.trace("produce request"); -// isHandlingRequest = true; - this.requests = requests; - currentHashDataInput = null; + reqMon.produce(requests); + reqMon.setHashDataInput(null); for (STALRequest request : requests) { if (request instanceof SignRequest) { log.trace("Received SignRequest, keep HashDataInput."); - currentHashDataInput = ((SignRequest) request).getHashDataInput(); + reqMon.setHashDataInput(((SignRequest) request).getHashDataInput()); break; } else if (request instanceof QuitRequest) { log.trace("Received QuitRequest, do not wait for responses."); log.trace("notifying request consumers"); - notify(); + reqMon.notify(); return new ArrayList<STALResponse>(); } else if (log.isTraceEnabled()) { log.trace("Received STAL request: " + request.getClass().getName()); } } log.trace("notifying request consumers"); - notify(); - + reqMon.notify(); + } + + synchronized (respMon) { long beforeWait = System.currentTimeMillis(); - while (this.responses == null) { + while (respMon.responses == null) { log.trace("waiting to consume response"); - wait(TIMEOUT_MS); - if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { - log.warn("timeout while waiting to consume response"); - this.requests = null; - currentHashDataInput = null; -// isHandlingRequest = false; + respMon.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { + log.warn("timeout while waiting to consume response, cleanup requests"); + reqMon.consume(); //TODO check deadlock? + reqMon.setHashDataInput(null); return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000)); } } log.trace("consuming responses"); - List<STALResponse> resps = responses; - responses = null; + List<STALResponse> responses = respMon.consume(); log.trace("notifying response producers"); - notify(); - -// isHandlingRequest = false; -// log.trace("notifying request producers"); -// notify(); + respMon.notify(); - return resps; + return responses; + } } catch (InterruptedException ex) { log.warn("interrupt in handleRequest(): " + ex.getMessage()); interrupted = true; @@ -131,68 +128,75 @@ public class STALRequestBrokerImpl implements STALRequestBroker { } /** + * This method is thread-safe, except for + * an 'initial' call to nextRequest(null) followed by a + * 'zombie' call to nextRequest(notNull). + * This case (per design) leads to a timeout of the original call. + * (synchronizing the entire method does not + * hinder the zombie to interrupt two consecutive nextRequest() calls.) * * @param responses * @return QUIT if expected responses are not provided */ @Override - public synchronized List<STALRequest> nextRequest(List<STALResponse> responses) { + public List<STALRequest> nextRequest(List<STALResponse> responses) { if (interrupted) { return null; } try { + synchronized (respMon) { if (responses != null && responses.size() > 0) { if (!expectingResponse) { - log.warn("Received unexpected response in nextRequest()"); + log.warn("Received unexpected response in nextRequest(), return QUIT"); return Collections.singletonList((STALRequest) new QuitRequest()); } long beforeWait = System.currentTimeMillis(); - while (this.responses != null) { + while (respMon.responses != null) { log.trace("waiting to produce response"); - wait(TIMEOUT_MS); - if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { + respMon.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { log.warn("timeout while waiting to produce response"); return Collections.singletonList((STALRequest) new QuitRequest()); } } log.trace("produce response"); - this.responses = responses; - //reset HashDataInputCallback + respMon.produce(responses); + //reset HashDataInputCallback iff SignResponse if (log.isTraceEnabled()) { for (STALResponse response : responses) { log.trace("Received STAL response: " + response.getClass().getName()); } } log.trace("notifying response consumers"); - notify(); + respMon.notify(); } else { if (expectingResponse) { - log.warn("No expected response received in nextRequest()"); + log.warn("Did not receive expected response(s) in nextRequest(), return QUIT"); return Collections.singletonList((STALRequest) new QuitRequest()); } log.trace("expecting non-null response in next nextRequest(response)"); expectingResponse = true; } + } + + synchronized (reqMon) { long beforeWait = System.currentTimeMillis(); - while (this.requests == null) { + while (reqMon.requests == null) { log.trace("waiting to consume request"); - wait(TIMEOUT_MS); - if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { + reqMon.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { log.warn("timeout while waiting to consume request"); return Collections.singletonList((STALRequest) new QuitRequest()); } } log.trace("consume request"); - List<STALRequest> reqs = requests; - requests = null; - if (reqs.size() > 0 && reqs.get(0) instanceof QuitRequest) { -// isHandlingRequest = false; -// log.trace("consumed QUIT, notifying request producers"); -// notify(); + List<STALRequest> requests = reqMon.consume(); + if (requests.size() > 0 && requests.get(0) instanceof QuitRequest) { log.trace("expecting no response in next nextRequest()"); expectingResponse = false; } - return reqs; + return requests; + } } catch (InterruptedException ex) { log.warn("interrupt in nextRequest(): " + ex.getMessage()); interrupted = true; @@ -202,12 +206,48 @@ public class STALRequestBrokerImpl implements STALRequestBroker { @Override public synchronized List<HashDataInput> getHashDataInput() { - log.trace("return " + currentHashDataInput.size() + " current HashDataInput(s) "); - return currentHashDataInput; + log.trace("return " + reqMon.hashDataInput.size() + " current HashDataInput(s) "); + return reqMon.getHashDataInput(); } @Override public void setLocale(Locale locale) { - // TODO Auto-generated method stub + } + + class RequestsMonitor { + List<STALRequest> requests; + List<HashDataInput> hashDataInput; + + void produce(List<STALRequest> req) { + requests = req; + } + + synchronized List<STALRequest> consume() { + List<STALRequest> reqs = requests; + requests = null; + return reqs; + } + + void setHashDataInput(List<HashDataInput> hdi) { + hashDataInput = hdi; + } + + List<HashDataInput> getHashDataInput() { + return hashDataInput; + } + } + + class ResponsesMonitor { + List<STALResponse> responses; + + void produce(List<STALResponse> resp) { + responses = resp; + } + + synchronized List<STALResponse> consume() { + List<STALResponse> resps = responses; + responses = null; + return resps; + } } } |