summaryrefslogtreecommitdiff
path: root/BKUOnline/src
diff options
context:
space:
mode:
authorclemenso <clemenso@8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4>2008-09-25 16:31:18 +0000
committerclemenso <clemenso@8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4>2008-09-25 16:31:18 +0000
commitef884e591c38023d980a158f29ec1d71ed256a41 (patch)
tree99bcb3a1ee104e1663e99b5d42f6b7ef69f11941 /BKUOnline/src
parentcc03466f753afb6a1feb2f203966ba0672ddae95 (diff)
downloadmocca-ef884e591c38023d980a158f29ec1d71ed256a41.tar.gz
mocca-ef884e591c38023d980a158f29ec1d71ed256a41.tar.bz2
mocca-ef884e591c38023d980a158f29ec1d71ed256a41.zip
consume/produce
git-svn-id: https://joinup.ec.europa.eu/svn/mocca/trunk@72 8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4
Diffstat (limited to 'BKUOnline/src')
-rw-r--r--BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java160
-rw-r--r--BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java78
2 files changed, 172 insertions, 66 deletions
diff --git a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java
index 7897f984..dc3cc6d3 100644
--- a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java
+++ b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java
@@ -47,82 +47,79 @@ import org.apache.commons.logging.LogFactory;
public class STALRequestBrokerImpl implements STALRequestBroker {
private static final Log log = LogFactory.getLog(STALRequestBrokerImpl.class);
- protected List<STALRequest> requests = null;
- protected List<STALResponse> responses = null;
- protected List<HashDataInput> currentHashDataInput;
-// private boolean isHandlingRequest = false;
+
private boolean expectingResponse = false;
private boolean interrupted = false;
+ private final RequestsMonitor reqMon = new RequestsMonitor();
+ private final ResponsesMonitor respMon = new ResponsesMonitor();
+
+ private long timeout;
+
+ public STALRequestBrokerImpl(long timeoutMillisec) {
+ if (timeoutMillisec <= 0)
+ timeoutMillisec = DEFAULT_TIMEOUT_MS;
+ this.timeout = timeoutMillisec;
+ }
+
/**
* Produce requests (and HashDataInputCallback) and wait for responses.
- * The next thread may enter once we consumed the responses.
+ * This method is not thread safe, since every bindingprocessor thread possesses it's own instance.
+ * It however assures cooperation with STAL webservice threads consuming the requests and producing responses.
*
* @param requests
* @return
*
- * @pre requests either single SignRequest, QuitRequest or multiple ReadInfoboxRequests
+ * @pre requests: either single SignRequest, QuitRequest or multiple ReadInfoboxRequests
*/
@Override
- public synchronized List<STALResponse> handleRequest(List<STALRequest> requests) {
+ public List<STALResponse> handleRequest(List<STALRequest> requests) {
if (interrupted) {
return null;
}
try {
-// long beforeWait = System.currentTimeMillis();
-// while (isHandlingRequest) {
-// log.trace("waiting to produce request");
-// wait(TIMEOUT_MS);
-// if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) {
-// log.warn("timeout while waiting to produce request");
-// return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000));
-// }
-// }
+ synchronized (reqMon) {
log.trace("produce request");
-// isHandlingRequest = true;
- this.requests = requests;
- currentHashDataInput = null;
+ reqMon.produce(requests);
+ reqMon.setHashDataInput(null);
for (STALRequest request : requests) {
if (request instanceof SignRequest) {
log.trace("Received SignRequest, keep HashDataInput.");
- currentHashDataInput = ((SignRequest) request).getHashDataInput();
+ reqMon.setHashDataInput(((SignRequest) request).getHashDataInput());
break;
} else if (request instanceof QuitRequest) {
log.trace("Received QuitRequest, do not wait for responses.");
log.trace("notifying request consumers");
- notify();
+ reqMon.notify();
return new ArrayList<STALResponse>();
} else if (log.isTraceEnabled()) {
log.trace("Received STAL request: " + request.getClass().getName());
}
}
log.trace("notifying request consumers");
- notify();
-
+ reqMon.notify();
+ }
+
+ synchronized (respMon) {
long beforeWait = System.currentTimeMillis();
- while (this.responses == null) {
+ while (respMon.responses == null) {
log.trace("waiting to consume response");
- wait(TIMEOUT_MS);
- if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) {
- log.warn("timeout while waiting to consume response");
- this.requests = null;
- currentHashDataInput = null;
-// isHandlingRequest = false;
+ respMon.wait(timeout);
+ if (System.currentTimeMillis() - beforeWait >= timeout) {
+ log.warn("timeout while waiting to consume response, cleanup requests");
+ reqMon.consume(); //TODO check deadlock?
+ reqMon.setHashDataInput(null);
return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000));
}
}
log.trace("consuming responses");
- List<STALResponse> resps = responses;
- responses = null;
+ List<STALResponse> responses = respMon.consume();
log.trace("notifying response producers");
- notify();
-
-// isHandlingRequest = false;
-// log.trace("notifying request producers");
-// notify();
+ respMon.notify();
- return resps;
+ return responses;
+ }
} catch (InterruptedException ex) {
log.warn("interrupt in handleRequest(): " + ex.getMessage());
interrupted = true;
@@ -131,68 +128,75 @@ public class STALRequestBrokerImpl implements STALRequestBroker {
}
/**
+ * This method is thread-safe, except for
+ * an 'initial' call to nextRequest(null) followed by a
+ * 'zombie' call to nextRequest(notNull).
+ * This case (per design) leads to a timeout of the original call.
+ * (synchronizing the entire method does not
+ * hinder the zombie to interrupt two consecutive nextRequest() calls.)
*
* @param responses
* @return QUIT if expected responses are not provided
*/
@Override
- public synchronized List<STALRequest> nextRequest(List<STALResponse> responses) {
+ public List<STALRequest> nextRequest(List<STALResponse> responses) {
if (interrupted) {
return null;
}
try {
+ synchronized (respMon) {
if (responses != null && responses.size() > 0) {
if (!expectingResponse) {
- log.warn("Received unexpected response in nextRequest()");
+ log.warn("Received unexpected response in nextRequest(), return QUIT");
return Collections.singletonList((STALRequest) new QuitRequest());
}
long beforeWait = System.currentTimeMillis();
- while (this.responses != null) {
+ while (respMon.responses != null) {
log.trace("waiting to produce response");
- wait(TIMEOUT_MS);
- if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) {
+ respMon.wait(timeout);
+ if (System.currentTimeMillis() - beforeWait >= timeout) {
log.warn("timeout while waiting to produce response");
return Collections.singletonList((STALRequest) new QuitRequest());
}
}
log.trace("produce response");
- this.responses = responses;
- //reset HashDataInputCallback
+ respMon.produce(responses);
+ //reset HashDataInputCallback iff SignResponse
if (log.isTraceEnabled()) {
for (STALResponse response : responses) {
log.trace("Received STAL response: " + response.getClass().getName());
}
}
log.trace("notifying response consumers");
- notify();
+ respMon.notify();
} else {
if (expectingResponse) {
- log.warn("No expected response received in nextRequest()");
+ log.warn("Did not receive expected response(s) in nextRequest(), return QUIT");
return Collections.singletonList((STALRequest) new QuitRequest());
}
log.trace("expecting non-null response in next nextRequest(response)");
expectingResponse = true;
}
+ }
+
+ synchronized (reqMon) {
long beforeWait = System.currentTimeMillis();
- while (this.requests == null) {
+ while (reqMon.requests == null) {
log.trace("waiting to consume request");
- wait(TIMEOUT_MS);
- if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) {
+ reqMon.wait(timeout);
+ if (System.currentTimeMillis() - beforeWait >= timeout) {
log.warn("timeout while waiting to consume request");
return Collections.singletonList((STALRequest) new QuitRequest());
}
}
log.trace("consume request");
- List<STALRequest> reqs = requests;
- requests = null;
- if (reqs.size() > 0 && reqs.get(0) instanceof QuitRequest) {
-// isHandlingRequest = false;
-// log.trace("consumed QUIT, notifying request producers");
-// notify();
+ List<STALRequest> requests = reqMon.consume();
+ if (requests.size() > 0 && requests.get(0) instanceof QuitRequest) {
log.trace("expecting no response in next nextRequest()");
expectingResponse = false;
}
- return reqs;
+ return requests;
+ }
} catch (InterruptedException ex) {
log.warn("interrupt in nextRequest(): " + ex.getMessage());
interrupted = true;
@@ -202,12 +206,48 @@ public class STALRequestBrokerImpl implements STALRequestBroker {
@Override
public synchronized List<HashDataInput> getHashDataInput() {
- log.trace("return " + currentHashDataInput.size() + " current HashDataInput(s) ");
- return currentHashDataInput;
+ log.trace("return " + reqMon.hashDataInput.size() + " current HashDataInput(s) ");
+ return reqMon.getHashDataInput();
}
@Override
public void setLocale(Locale locale) {
- // TODO Auto-generated method stub
+ }
+
+ class RequestsMonitor {
+ List<STALRequest> requests;
+ List<HashDataInput> hashDataInput;
+
+ void produce(List<STALRequest> req) {
+ requests = req;
+ }
+
+ synchronized List<STALRequest> consume() {
+ List<STALRequest> reqs = requests;
+ requests = null;
+ return reqs;
+ }
+
+ void setHashDataInput(List<HashDataInput> hdi) {
+ hashDataInput = hdi;
+ }
+
+ List<HashDataInput> getHashDataInput() {
+ return hashDataInput;
+ }
+ }
+
+ class ResponsesMonitor {
+ List<STALResponse> responses;
+
+ void produce(List<STALResponse> resp) {
+ responses = resp;
+ }
+
+ synchronized List<STALResponse> consume() {
+ List<STALResponse> resps = responses;
+ responses = null;
+ return resps;
+ }
}
}
diff --git a/BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java b/BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java
index 2bcc96ae..d6ce2720 100644
--- a/BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java
+++ b/BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java
@@ -78,8 +78,9 @@ public class STALRequestBrokerTest {
BindingProcessorSimulator bp = new BindingProcessorSimulator();
bp.setRequests(Collections.singletonList(requests));
+ new Thread(new ServiceSimulator(), "STALService1").start();
new Thread(bp, "BindingProcessor").start();
- new Thread(new ServiceSimulator(), "STALService").start();
+ new Thread(new ServiceSimulator(), "STALService2").start();
try {
Thread.sleep(1000);
@@ -88,7 +89,7 @@ public class STALRequestBrokerTest {
}
}
- @Ignore
+ @Test
public void testSign() {
log.debug("**************** test SignRequest");
List<STALRequest> requests = new ArrayList<STALRequest>();
@@ -125,7 +126,8 @@ public class STALRequestBrokerTest {
new Thread(bp, "BindingProcessor").start();
// new Thread(bp2, "BindingProcessor2").start();
new Thread(new ServiceSimulator(), "STALService").start();
-
+ new Thread(new ZombieServiceSimulator(), "STALServiceZombie").start();
+
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
@@ -171,7 +173,7 @@ public class STALRequestBrokerTest {
new Thread(new TimeoutServiceSimulator(), "STALService").start();
try {
- Thread.sleep(STALRequestBroker.TIMEOUT_MS + 1);
+ Thread.sleep(STALRequestBroker.DEFAULT_TIMEOUT_MS + 1);
} catch (InterruptedException ex) {
log.error("interrupted: " + ex.getMessage());
}
@@ -186,13 +188,13 @@ public class STALRequestBrokerTest {
new Thread(new ServiceSimulator(), "STALService").start();
try {
- Thread.sleep(STALRequestBroker.TIMEOUT_MS + 1);
+ Thread.sleep(STALRequestBroker.DEFAULT_TIMEOUT_MS + 1);
} catch (InterruptedException ex) {
log.error("interrupted: " + ex.getMessage());
}
}
- @Test
+ @Ignore
public void testMultipleServices() {
log.debug("**************** test multiple SignRequests");
List<STALRequest> requests = new ArrayList<STALRequest>();
@@ -269,6 +271,70 @@ public class STALRequestBrokerTest {
}
}
+ class ZombieServiceSimulator implements Runnable {
+
+ @Override
+ public void run() {
+ try {
+ log.debug("calling stal.nextRequest(oldResponse)");
+ STALResponse oldResp = new InfoboxReadResponse();
+ List<STALRequest> requests = stal.nextRequest(Collections.singletonList(oldResp));
+ log.debug("got " + requests.size() + " requests. processing...");
+ Thread.sleep(1);
+ List<STALResponse> responses = new ArrayList<STALResponse>();
+ for (STALRequest request : requests) {
+ if (request instanceof InfoboxReadRequest) {
+ log.debug("received UNEXPECTED READINFOBOX request");
+
+ InfoboxReadResponse r = new InfoboxReadResponse();
+ r.setInfoboxValue("dummyInfobox".getBytes());
+ responses.add(r);
+ } else if (request instanceof SignRequest) {
+
+ log.debug("received UNEXPECTED SIGN request");
+
+ log.debug("calling stal.getCurrentHashDataInputCallback");
+ List<HashDataInput> hdis = stal.getHashDataInput();
+ assertNotNull(hdis);
+ assertEquals(hdis.size(), 1);
+ HashDataInput hdi = hdis.get(0);// cb.getHashDataInput("1234");
+ InputStream hd = hdi.getHashDataInput();
+ byte[] data = new byte[hd.available()];
+ hd.read(data);
+ log.debug("got HashDataInput " + new String(data));
+
+
+ SignResponse r = new SignResponse();
+ r.setSignatureValue("dummySignature".getBytes());
+ responses.add(r);
+ } else if (request instanceof QuitRequest) {
+ log.debug("received EXPECTED QUIT request");
+ return;
+ }
+ }
+
+// if (requests.size() > 0) {
+// log.debug("calling stal.setResponse with " + requests.size() + " responses");
+// stal.setResponse(responses);
+// }
+ log.debug("calling stal.nextRequest with " + responses.size() + " responses");
+ requests = stal.nextRequest(responses);
+ for (STALRequest request : requests) {
+ if (request instanceof QuitRequest) {
+ log.debug("got QUIT request");
+ } else {
+ log.debug("expected QUIT request, got " + request.getClass().getName());
+ }
+ }
+ } catch (IOException ex) {
+ log.error(ex.getMessage());
+ } catch (InterruptedException ex) {
+ log.error(ex.getMessage());
+ }
+ }
+
+ }
+
class ServiceSimulator implements Runnable {
@Override