diff options
author | clemenso <clemenso@8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4> | 2008-09-25 16:31:18 +0000 |
---|---|---|
committer | clemenso <clemenso@8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4> | 2008-09-25 16:31:18 +0000 |
commit | ef884e591c38023d980a158f29ec1d71ed256a41 (patch) | |
tree | 99bcb3a1ee104e1663e99b5d42f6b7ef69f11941 /BKUOnline/src | |
parent | cc03466f753afb6a1feb2f203966ba0672ddae95 (diff) | |
download | mocca-ef884e591c38023d980a158f29ec1d71ed256a41.tar.gz mocca-ef884e591c38023d980a158f29ec1d71ed256a41.tar.bz2 mocca-ef884e591c38023d980a158f29ec1d71ed256a41.zip |
consume/produce
git-svn-id: https://joinup.ec.europa.eu/svn/mocca/trunk@72 8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4
Diffstat (limited to 'BKUOnline/src')
-rw-r--r-- | BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java | 160 | ||||
-rw-r--r-- | BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java | 78 |
2 files changed, 172 insertions, 66 deletions
diff --git a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java index 7897f984..dc3cc6d3 100644 --- a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java +++ b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java @@ -47,82 +47,79 @@ import org.apache.commons.logging.LogFactory; public class STALRequestBrokerImpl implements STALRequestBroker { private static final Log log = LogFactory.getLog(STALRequestBrokerImpl.class); - protected List<STALRequest> requests = null; - protected List<STALResponse> responses = null; - protected List<HashDataInput> currentHashDataInput; -// private boolean isHandlingRequest = false; + private boolean expectingResponse = false; private boolean interrupted = false; + private final RequestsMonitor reqMon = new RequestsMonitor(); + private final ResponsesMonitor respMon = new ResponsesMonitor(); + + private long timeout; + + public STALRequestBrokerImpl(long timeoutMillisec) { + if (timeoutMillisec <= 0) + timeoutMillisec = DEFAULT_TIMEOUT_MS; + this.timeout = timeoutMillisec; + } + /** * Produce requests (and HashDataInputCallback) and wait for responses. - * The next thread may enter once we consumed the responses. + * This method is not thread safe, since every bindingprocessor thread possesses it's own instance. + * It however assures cooperation with STAL webservice threads consuming the requests and producing responses. * * @param requests * @return * - * @pre requests either single SignRequest, QuitRequest or multiple ReadInfoboxRequests + * @pre requests: either single SignRequest, QuitRequest or multiple ReadInfoboxRequests */ @Override - public synchronized List<STALResponse> handleRequest(List<STALRequest> requests) { + public List<STALResponse> handleRequest(List<STALRequest> requests) { if (interrupted) { return null; } try { -// long beforeWait = System.currentTimeMillis(); -// while (isHandlingRequest) { -// log.trace("waiting to produce request"); -// wait(TIMEOUT_MS); -// if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { -// log.warn("timeout while waiting to produce request"); -// return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000)); -// } -// } + synchronized (reqMon) { log.trace("produce request"); -// isHandlingRequest = true; - this.requests = requests; - currentHashDataInput = null; + reqMon.produce(requests); + reqMon.setHashDataInput(null); for (STALRequest request : requests) { if (request instanceof SignRequest) { log.trace("Received SignRequest, keep HashDataInput."); - currentHashDataInput = ((SignRequest) request).getHashDataInput(); + reqMon.setHashDataInput(((SignRequest) request).getHashDataInput()); break; } else if (request instanceof QuitRequest) { log.trace("Received QuitRequest, do not wait for responses."); log.trace("notifying request consumers"); - notify(); + reqMon.notify(); return new ArrayList<STALResponse>(); } else if (log.isTraceEnabled()) { log.trace("Received STAL request: " + request.getClass().getName()); } } log.trace("notifying request consumers"); - notify(); - + reqMon.notify(); + } + + synchronized (respMon) { long beforeWait = System.currentTimeMillis(); - while (this.responses == null) { + while (respMon.responses == null) { log.trace("waiting to consume response"); - wait(TIMEOUT_MS); - if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { - log.warn("timeout while waiting to consume response"); - this.requests = null; - currentHashDataInput = null; -// isHandlingRequest = false; + respMon.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { + log.warn("timeout while waiting to consume response, cleanup requests"); + reqMon.consume(); //TODO check deadlock? + reqMon.setHashDataInput(null); return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000)); } } log.trace("consuming responses"); - List<STALResponse> resps = responses; - responses = null; + List<STALResponse> responses = respMon.consume(); log.trace("notifying response producers"); - notify(); - -// isHandlingRequest = false; -// log.trace("notifying request producers"); -// notify(); + respMon.notify(); - return resps; + return responses; + } } catch (InterruptedException ex) { log.warn("interrupt in handleRequest(): " + ex.getMessage()); interrupted = true; @@ -131,68 +128,75 @@ public class STALRequestBrokerImpl implements STALRequestBroker { } /** + * This method is thread-safe, except for + * an 'initial' call to nextRequest(null) followed by a + * 'zombie' call to nextRequest(notNull). + * This case (per design) leads to a timeout of the original call. + * (synchronizing the entire method does not + * hinder the zombie to interrupt two consecutive nextRequest() calls.) * * @param responses * @return QUIT if expected responses are not provided */ @Override - public synchronized List<STALRequest> nextRequest(List<STALResponse> responses) { + public List<STALRequest> nextRequest(List<STALResponse> responses) { if (interrupted) { return null; } try { + synchronized (respMon) { if (responses != null && responses.size() > 0) { if (!expectingResponse) { - log.warn("Received unexpected response in nextRequest()"); + log.warn("Received unexpected response in nextRequest(), return QUIT"); return Collections.singletonList((STALRequest) new QuitRequest()); } long beforeWait = System.currentTimeMillis(); - while (this.responses != null) { + while (respMon.responses != null) { log.trace("waiting to produce response"); - wait(TIMEOUT_MS); - if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { + respMon.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { log.warn("timeout while waiting to produce response"); return Collections.singletonList((STALRequest) new QuitRequest()); } } log.trace("produce response"); - this.responses = responses; - //reset HashDataInputCallback + respMon.produce(responses); + //reset HashDataInputCallback iff SignResponse if (log.isTraceEnabled()) { for (STALResponse response : responses) { log.trace("Received STAL response: " + response.getClass().getName()); } } log.trace("notifying response consumers"); - notify(); + respMon.notify(); } else { if (expectingResponse) { - log.warn("No expected response received in nextRequest()"); + log.warn("Did not receive expected response(s) in nextRequest(), return QUIT"); return Collections.singletonList((STALRequest) new QuitRequest()); } log.trace("expecting non-null response in next nextRequest(response)"); expectingResponse = true; } + } + + synchronized (reqMon) { long beforeWait = System.currentTimeMillis(); - while (this.requests == null) { + while (reqMon.requests == null) { log.trace("waiting to consume request"); - wait(TIMEOUT_MS); - if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { + reqMon.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { log.warn("timeout while waiting to consume request"); return Collections.singletonList((STALRequest) new QuitRequest()); } } log.trace("consume request"); - List<STALRequest> reqs = requests; - requests = null; - if (reqs.size() > 0 && reqs.get(0) instanceof QuitRequest) { -// isHandlingRequest = false; -// log.trace("consumed QUIT, notifying request producers"); -// notify(); + List<STALRequest> requests = reqMon.consume(); + if (requests.size() > 0 && requests.get(0) instanceof QuitRequest) { log.trace("expecting no response in next nextRequest()"); expectingResponse = false; } - return reqs; + return requests; + } } catch (InterruptedException ex) { log.warn("interrupt in nextRequest(): " + ex.getMessage()); interrupted = true; @@ -202,12 +206,48 @@ public class STALRequestBrokerImpl implements STALRequestBroker { @Override public synchronized List<HashDataInput> getHashDataInput() { - log.trace("return " + currentHashDataInput.size() + " current HashDataInput(s) "); - return currentHashDataInput; + log.trace("return " + reqMon.hashDataInput.size() + " current HashDataInput(s) "); + return reqMon.getHashDataInput(); } @Override public void setLocale(Locale locale) { - // TODO Auto-generated method stub + } + + class RequestsMonitor { + List<STALRequest> requests; + List<HashDataInput> hashDataInput; + + void produce(List<STALRequest> req) { + requests = req; + } + + synchronized List<STALRequest> consume() { + List<STALRequest> reqs = requests; + requests = null; + return reqs; + } + + void setHashDataInput(List<HashDataInput> hdi) { + hashDataInput = hdi; + } + + List<HashDataInput> getHashDataInput() { + return hashDataInput; + } + } + + class ResponsesMonitor { + List<STALResponse> responses; + + void produce(List<STALResponse> resp) { + responses = resp; + } + + synchronized List<STALResponse> consume() { + List<STALResponse> resps = responses; + responses = null; + return resps; + } } } diff --git a/BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java b/BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java index 2bcc96ae..d6ce2720 100644 --- a/BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java +++ b/BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java @@ -78,8 +78,9 @@ public class STALRequestBrokerTest { BindingProcessorSimulator bp = new BindingProcessorSimulator(); bp.setRequests(Collections.singletonList(requests)); + new Thread(new ServiceSimulator(), "STALService1").start(); new Thread(bp, "BindingProcessor").start(); - new Thread(new ServiceSimulator(), "STALService").start(); + new Thread(new ServiceSimulator(), "STALService2").start(); try { Thread.sleep(1000); @@ -88,7 +89,7 @@ public class STALRequestBrokerTest { } } - @Ignore + @Test public void testSign() { log.debug("**************** test SignRequest"); List<STALRequest> requests = new ArrayList<STALRequest>(); @@ -125,7 +126,8 @@ public class STALRequestBrokerTest { new Thread(bp, "BindingProcessor").start(); // new Thread(bp2, "BindingProcessor2").start(); new Thread(new ServiceSimulator(), "STALService").start(); - + new Thread(new ZombieServiceSimulator(), "STALServiceZombie").start(); + try { Thread.sleep(1000); } catch (InterruptedException ex) { @@ -171,7 +173,7 @@ public class STALRequestBrokerTest { new Thread(new TimeoutServiceSimulator(), "STALService").start(); try { - Thread.sleep(STALRequestBroker.TIMEOUT_MS + 1); + Thread.sleep(STALRequestBroker.DEFAULT_TIMEOUT_MS + 1); } catch (InterruptedException ex) { log.error("interrupted: " + ex.getMessage()); } @@ -186,13 +188,13 @@ public class STALRequestBrokerTest { new Thread(new ServiceSimulator(), "STALService").start(); try { - Thread.sleep(STALRequestBroker.TIMEOUT_MS + 1); + Thread.sleep(STALRequestBroker.DEFAULT_TIMEOUT_MS + 1); } catch (InterruptedException ex) { log.error("interrupted: " + ex.getMessage()); } } - @Test + @Ignore public void testMultipleServices() { log.debug("**************** test multiple SignRequests"); List<STALRequest> requests = new ArrayList<STALRequest>(); @@ -269,6 +271,70 @@ public class STALRequestBrokerTest { } } + class ZombieServiceSimulator implements Runnable { + + @Override + public void run() { + try { + log.debug("calling stal.nextRequest(oldResponse)"); + STALResponse oldResp = new InfoboxReadResponse(); + List<STALRequest> requests = stal.nextRequest(Collections.singletonList(oldResp)); + log.debug("got " + requests.size() + " requests. processing..."); + Thread.sleep(1); + List<STALResponse> responses = new ArrayList<STALResponse>(); + for (STALRequest request : requests) { + if (request instanceof InfoboxReadRequest) { + log.debug("received UNEXPECTED READINFOBOX request"); + + InfoboxReadResponse r = new InfoboxReadResponse(); + r.setInfoboxValue("dummyInfobox".getBytes()); + responses.add(r); + } else if (request instanceof SignRequest) { + + log.debug("received UNEXPECTED SIGN request"); + + log.debug("calling stal.getCurrentHashDataInputCallback"); + List<HashDataInput> hdis = stal.getHashDataInput(); + assertNotNull(hdis); + assertEquals(hdis.size(), 1); + HashDataInput hdi = hdis.get(0);// cb.getHashDataInput("1234"); + InputStream hd = hdi.getHashDataInput(); + byte[] data = new byte[hd.available()]; + hd.read(data); + log.debug("got HashDataInput " + new String(data)); + + + SignResponse r = new SignResponse(); + r.setSignatureValue("dummySignature".getBytes()); + responses.add(r); + } else if (request instanceof QuitRequest) { + log.debug("received EXPECTED QUIT request"); + return; + } + } + +// if (requests.size() > 0) { +// log.debug("calling stal.setResponse with " + requests.size() + " responses"); +// stal.setResponse(responses); +// } + log.debug("calling stal.nextRequest with " + responses.size() + " responses"); + requests = stal.nextRequest(responses); + for (STALRequest request : requests) { + if (request instanceof QuitRequest) { + log.debug("got QUIT request"); + } else { + log.debug("expected QUIT request, got " + request.getClass().getName()); + } + } + } catch (IOException ex) { + log.error(ex.getMessage()); + } catch (InterruptedException ex) { + log.error(ex.getMessage()); + } + } + + } + class ServiceSimulator implements Runnable { @Override |