summaryrefslogtreecommitdiff
path: root/BKUOnline/src/main
diff options
context:
space:
mode:
authorclemenso <clemenso@8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4>2008-09-25 16:31:18 +0000
committerclemenso <clemenso@8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4>2008-09-25 16:31:18 +0000
commitef884e591c38023d980a158f29ec1d71ed256a41 (patch)
tree99bcb3a1ee104e1663e99b5d42f6b7ef69f11941 /BKUOnline/src/main
parentcc03466f753afb6a1feb2f203966ba0672ddae95 (diff)
downloadmocca-ef884e591c38023d980a158f29ec1d71ed256a41.tar.gz
mocca-ef884e591c38023d980a158f29ec1d71ed256a41.tar.bz2
mocca-ef884e591c38023d980a158f29ec1d71ed256a41.zip
consume/produce
git-svn-id: https://joinup.ec.europa.eu/svn/mocca/trunk@72 8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4
Diffstat (limited to 'BKUOnline/src/main')
-rw-r--r--BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java160
1 files changed, 100 insertions, 60 deletions
diff --git a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java
index 7897f984..dc3cc6d3 100644
--- a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java
+++ b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java
@@ -47,82 +47,79 @@ import org.apache.commons.logging.LogFactory;
public class STALRequestBrokerImpl implements STALRequestBroker {
private static final Log log = LogFactory.getLog(STALRequestBrokerImpl.class);
- protected List<STALRequest> requests = null;
- protected List<STALResponse> responses = null;
- protected List<HashDataInput> currentHashDataInput;
-// private boolean isHandlingRequest = false;
+
private boolean expectingResponse = false;
private boolean interrupted = false;
+ private final RequestsMonitor reqMon = new RequestsMonitor();
+ private final ResponsesMonitor respMon = new ResponsesMonitor();
+
+ private long timeout;
+
+ public STALRequestBrokerImpl(long timeoutMillisec) {
+ if (timeoutMillisec <= 0)
+ timeoutMillisec = DEFAULT_TIMEOUT_MS;
+ this.timeout = timeoutMillisec;
+ }
+
/**
* Produce requests (and HashDataInputCallback) and wait for responses.
- * The next thread may enter once we consumed the responses.
+ * This method is not thread safe, since every bindingprocessor thread possesses it's own instance.
+ * It however assures cooperation with STAL webservice threads consuming the requests and producing responses.
*
* @param requests
* @return
*
- * @pre requests either single SignRequest, QuitRequest or multiple ReadInfoboxRequests
+ * @pre requests: either single SignRequest, QuitRequest or multiple ReadInfoboxRequests
*/
@Override
- public synchronized List<STALResponse> handleRequest(List<STALRequest> requests) {
+ public List<STALResponse> handleRequest(List<STALRequest> requests) {
if (interrupted) {
return null;
}
try {
-// long beforeWait = System.currentTimeMillis();
-// while (isHandlingRequest) {
-// log.trace("waiting to produce request");
-// wait(TIMEOUT_MS);
-// if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) {
-// log.warn("timeout while waiting to produce request");
-// return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000));
-// }
-// }
+ synchronized (reqMon) {
log.trace("produce request");
-// isHandlingRequest = true;
- this.requests = requests;
- currentHashDataInput = null;
+ reqMon.produce(requests);
+ reqMon.setHashDataInput(null);
for (STALRequest request : requests) {
if (request instanceof SignRequest) {
log.trace("Received SignRequest, keep HashDataInput.");
- currentHashDataInput = ((SignRequest) request).getHashDataInput();
+ reqMon.setHashDataInput(((SignRequest) request).getHashDataInput());
break;
} else if (request instanceof QuitRequest) {
log.trace("Received QuitRequest, do not wait for responses.");
log.trace("notifying request consumers");
- notify();
+ reqMon.notify();
return new ArrayList<STALResponse>();
} else if (log.isTraceEnabled()) {
log.trace("Received STAL request: " + request.getClass().getName());
}
}
log.trace("notifying request consumers");
- notify();
-
+ reqMon.notify();
+ }
+
+ synchronized (respMon) {
long beforeWait = System.currentTimeMillis();
- while (this.responses == null) {
+ while (respMon.responses == null) {
log.trace("waiting to consume response");
- wait(TIMEOUT_MS);
- if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) {
- log.warn("timeout while waiting to consume response");
- this.requests = null;
- currentHashDataInput = null;
-// isHandlingRequest = false;
+ respMon.wait(timeout);
+ if (System.currentTimeMillis() - beforeWait >= timeout) {
+ log.warn("timeout while waiting to consume response, cleanup requests");
+ reqMon.consume(); //TODO check deadlock?
+ reqMon.setHashDataInput(null);
return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000));
}
}
log.trace("consuming responses");
- List<STALResponse> resps = responses;
- responses = null;
+ List<STALResponse> responses = respMon.consume();
log.trace("notifying response producers");
- notify();
-
-// isHandlingRequest = false;
-// log.trace("notifying request producers");
-// notify();
+ respMon.notify();
- return resps;
+ return responses;
+ }
} catch (InterruptedException ex) {
log.warn("interrupt in handleRequest(): " + ex.getMessage());
interrupted = true;
@@ -131,68 +128,75 @@ public class STALRequestBrokerImpl implements STALRequestBroker {
}
/**
+ * This method is thread-safe, except for
+ * an 'initial' call to nextRequest(null) followed by a
+ * 'zombie' call to nextRequest(notNull).
+ * This case (per design) leads to a timeout of the original call.
+ * (synchronizing the entire method does not
+ * hinder the zombie to interrupt two consecutive nextRequest() calls.)
*
* @param responses
* @return QUIT if expected responses are not provided
*/
@Override
- public synchronized List<STALRequest> nextRequest(List<STALResponse> responses) {
+ public List<STALRequest> nextRequest(List<STALResponse> responses) {
if (interrupted) {
return null;
}
try {
+ synchronized (respMon) {
if (responses != null && responses.size() > 0) {
if (!expectingResponse) {
- log.warn("Received unexpected response in nextRequest()");
+ log.warn("Received unexpected response in nextRequest(), return QUIT");
return Collections.singletonList((STALRequest) new QuitRequest());
}
long beforeWait = System.currentTimeMillis();
- while (this.responses != null) {
+ while (respMon.responses != null) {
log.trace("waiting to produce response");
- wait(TIMEOUT_MS);
- if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) {
+ respMon.wait(timeout);
+ if (System.currentTimeMillis() - beforeWait >= timeout) {
log.warn("timeout while waiting to produce response");
return Collections.singletonList((STALRequest) new QuitRequest());
}
}
log.trace("produce response");
- this.responses = responses;
- //reset HashDataInputCallback
+ respMon.produce(responses);
+ //reset HashDataInputCallback iff SignResponse
if (log.isTraceEnabled()) {
for (STALResponse response : responses) {
log.trace("Received STAL response: " + response.getClass().getName());
}
}
log.trace("notifying response consumers");
- notify();
+ respMon.notify();
} else {
if (expectingResponse) {
- log.warn("No expected response received in nextRequest()");
+ log.warn("Did not receive expected response(s) in nextRequest(), return QUIT");
return Collections.singletonList((STALRequest) new QuitRequest());
}
log.trace("expecting non-null response in next nextRequest(response)");
expectingResponse = true;
}
+ }
+
+ synchronized (reqMon) {
long beforeWait = System.currentTimeMillis();
- while (this.requests == null) {
+ while (reqMon.requests == null) {
log.trace("waiting to consume request");
- wait(TIMEOUT_MS);
- if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) {
+ reqMon.wait(timeout);
+ if (System.currentTimeMillis() - beforeWait >= timeout) {
log.warn("timeout while waiting to consume request");
return Collections.singletonList((STALRequest) new QuitRequest());
}
}
log.trace("consume request");
- List<STALRequest> reqs = requests;
- requests = null;
- if (reqs.size() > 0 && reqs.get(0) instanceof QuitRequest) {
-// isHandlingRequest = false;
-// log.trace("consumed QUIT, notifying request producers");
-// notify();
+ List<STALRequest> requests = reqMon.consume();
+ if (requests.size() > 0 && requests.get(0) instanceof QuitRequest) {
log.trace("expecting no response in next nextRequest()");
expectingResponse = false;
}
- return reqs;
+ return requests;
+ }
} catch (InterruptedException ex) {
log.warn("interrupt in nextRequest(): " + ex.getMessage());
interrupted = true;
@@ -202,12 +206,48 @@ public class STALRequestBrokerImpl implements STALRequestBroker {
@Override
public synchronized List<HashDataInput> getHashDataInput() {
- log.trace("return " + currentHashDataInput.size() + " current HashDataInput(s) ");
- return currentHashDataInput;
+ log.trace("return " + reqMon.hashDataInput.size() + " current HashDataInput(s) ");
+ return reqMon.getHashDataInput();
}
@Override
public void setLocale(Locale locale) {
- // TODO Auto-generated method stub
+ }
+
+ class RequestsMonitor {
+ List<STALRequest> requests;
+ List<HashDataInput> hashDataInput;
+
+ void produce(List<STALRequest> req) {
+ requests = req;
+ }
+
+ synchronized List<STALRequest> consume() {
+ List<STALRequest> reqs = requests;
+ requests = null;
+ return reqs;
+ }
+
+ void setHashDataInput(List<HashDataInput> hdi) {
+ hashDataInput = hdi;
+ }
+
+ List<HashDataInput> getHashDataInput() {
+ return hashDataInput;
+ }
+ }
+
+ class ResponsesMonitor {
+ List<STALResponse> responses;
+
+ void produce(List<STALResponse> resp) {
+ responses = resp;
+ }
+
+ synchronized List<STALResponse> consume() {
+ List<STALResponse> resps = responses;
+ responses = null;
+ return resps;
+ }
}
}