diff options
Diffstat (limited to 'BKUOnline/src/main/java/at')
-rw-r--r-- | BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java | 378 |
1 files changed, 9 insertions, 369 deletions
diff --git a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java index 727e8cf4..6160c71e 100644 --- a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java +++ b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java @@ -47,24 +47,13 @@ import org.apache.commons.logging.LogFactory; public class STALRequestBrokerImpl implements STALRequestBroker { private static final Log log = LogFactory.getLog(STALRequestBrokerImpl.class); -// protected RequestResponseBroker broker; protected List<STALRequest> requests = null; protected List<STALResponse> responses = null; protected List<HashDataInput> currentHashDataInput; private boolean isHandlingRequest = false; private boolean expectingResponse = false; private boolean interrupted = false; -// private Object handleRequestCondition = new Object(); -// private Object gotResponsesCondition = new Object(); -// public STALRequestBrokerImpl() { -// broker = new RequestResponseBroker(); -// new Thread(handler).start(); -// } -// @Override -// public HashDataInputCallback getCurrentHashDataInputCallback() { -// return broker.getCurrentHashDataInputCallback(); -// } /** * Produce requests (and HashDataInputCallback) and wait for responses. * The next thread may enter once we consumed the responses. @@ -83,15 +72,11 @@ public class STALRequestBrokerImpl implements STALRequestBroker { long beforeWait = System.currentTimeMillis(); while (isHandlingRequest) { log.trace("waiting to produce request"); -// try { wait(TIMEOUT_MS); if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { log.warn("timeout while waiting to produce request"); return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000)); } -// } catch (InterruptedException ex) { -// log.warn("interrupt while waiting to produce request: " + ex.getMessage()); -// } } log.trace("produce request"); isHandlingRequest = true; @@ -104,30 +89,9 @@ public class STALRequestBrokerImpl implements STALRequestBroker { currentHashDataInput = ((SignRequest) request).getHashDataInput(); break; } else if (request instanceof QuitRequest) { - //alternative1: - //for QUIT requests, do not wait for responses, but for request consumation - // (i.e. set isHandlingReq to false once QUIT is consumed) log.trace("Received QuitRequest, do not wait for responses."); log.trace("notifying request consumers"); notify(); - //alternative2: - //wait for QUIT to be consumed - // (i.e. notify me once QUIT is consumed) -// while (this.requests != null) { -// try { -// long beforeWait = System.currentTimeMillis(); -// wait(TIMEOUT_MS); -// if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { -// log.warn("timeout while waiting for QUIT to be consumed"); -// this.requests = null; -// isHandlingRequest = false; -// return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000)); -// } -// } catch (InterruptedException ex) { -// log.warn("interrupt while waiting for QUIT to be consumed: " + ex.getMessage()); -// } -// } -// isHandlingRequest = false; return new ArrayList<STALResponse>(); } else if (log.isTraceEnabled()) { log.trace("Received STAL request: " + request.getClass().getName()); @@ -139,7 +103,6 @@ public class STALRequestBrokerImpl implements STALRequestBroker { beforeWait = System.currentTimeMillis(); while (this.responses == null) { log.trace("waiting to consume response"); -// try { wait(TIMEOUT_MS); if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { log.warn("timeout while waiting to consume response"); @@ -148,21 +111,18 @@ public class STALRequestBrokerImpl implements STALRequestBroker { isHandlingRequest = false; return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000)); } -// } catch (InterruptedException ex) { -// log.warn("interrupt while waiting to consume response: " + ex.getMessage()); -// } } log.trace("consuming responses"); - List<STALResponse> resps = responses; - responses = null; - log.trace("notifying response producers"); - notify(); + List<STALResponse> resps = responses; + responses = null; + log.trace("notifying response producers"); + notify(); - isHandlingRequest = false; - log.trace("notifying request producers"); - notify(); + isHandlingRequest = false; + log.trace("notifying request producers"); + notify(); - return resps; + return resps; } catch (InterruptedException ex) { log.warn("interrupt in handleRequest(): " + ex.getMessage()); interrupted = true; @@ -189,15 +149,11 @@ public class STALRequestBrokerImpl implements STALRequestBroker { long beforeWait = System.currentTimeMillis(); while (this.responses != null) { log.trace("waiting to produce response"); -// try { wait(TIMEOUT_MS); if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { log.warn("timeout while waiting to produce response"); return Collections.singletonList((STALRequest) new QuitRequest()); } -// } catch (InterruptedException ex) { -// log.warn("interrupt while waiting to produce response: " + ex.getMessage()); -// } } log.trace("produce response"); this.responses = responses; @@ -211,7 +167,6 @@ public class STALRequestBrokerImpl implements STALRequestBroker { notify(); } else { if (expectingResponse) { - // while (expectingResponse) wait(); log.warn("No expected response received in nextRequest()"); return Collections.singletonList((STALRequest) new QuitRequest()); } @@ -221,33 +176,21 @@ public class STALRequestBrokerImpl implements STALRequestBroker { long beforeWait = System.currentTimeMillis(); while (this.requests == null) { log.trace("waiting to consume request"); -// try { wait(TIMEOUT_MS); if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { log.warn("timeout while waiting to consume request"); return Collections.singletonList((STALRequest) new QuitRequest()); } -// } catch (InterruptedException ex) { -// log.warn("interrupt while waiting to consume request: " + ex.getMessage()); -// } } log.trace("consume request"); List<STALRequest> reqs = requests; - //TODO check if QUIT and set isHandlingReq to false here? - // (rename isHandlingReq -> produce) - // handleReq(QUIT) doesn't wait() and returns immediately - // cf. handleReq(QUIT) requests = null; - //no need to notify; request producer is waiting for isHandlingRequest - //(alt2: the QUIT producer returned immediately and didn't notify) - //(alt1: the QUIT producer is waiting for notification on QUIT consumption) if (reqs.size() > 0 && reqs.get(0) instanceof QuitRequest) { isHandlingRequest = false; log.trace("consumed QUIT, notifying request producers"); notify(); log.trace("expecting no response in next nextRequest()"); expectingResponse = false; - //notify no-response request consumers } return reqs; } catch (InterruptedException ex) { @@ -262,310 +205,7 @@ public class STALRequestBrokerImpl implements STALRequestBroker { log.trace("return " + currentHashDataInput.size() + " current HashDataInput(s) "); return currentHashDataInput; } -// /** -// * Causes the calling thread to sleep until response is passed via nextRequest() -// * (except for QUIT request, which returns immediately). -// * The requestList may contain at most one signRequest. -// * The signRequest's signedRefCallback is stored until a response to the signRequest is provided (2nd nextRequest() call), -// * i.e. until handleRequest() returns. -// * -// * @param aRequestList -// * @return -// * @pre requestList contains at most one signRequest -// */ -// @Override -// public List<STALResponse> handleRequest(List<STALRequest> requestList) { -// try { -// if (log.isTraceEnabled()) { -// log.trace("HandleRequest (" + requestList.size() + " requests)"); -// } -// -// broker.produceRequests(requestList); -// -// // QUIT returns immediately -// if (requestList.size() == 1 && requestList.get(0) instanceof QuitRequest) { -// log.trace("Received QUIT request, do not wait for responses."); -// return new ArrayList<STALResponse>(); -// } -// return broker.consumeResponses(); -// } catch (InterruptedException ex) { -// log.error("Interrupted while handling STAL request list: " + ex.getMessage()); -// return Collections.singletonList((STALResponse) new ErrorResponse()); -// } catch (TimeoutException ex) { -// log.error("Timeout during handle request: " + ex.getMessage()); -// ErrorResponse err = new ErrorResponse(); -// err.setErrorCode(ERR_6000); -// return Collections.singletonList((STALResponse) err); -// } -// } -// -// @Override -// public void setResponse(List<STALResponse> responses) { -// try { -//// if (responses != null && responses.size() > 0) { -//// List<STALResponse> stalResponses = translateResponses(responses); -// broker.produceResponses(responses); -//// } else { -//// log.trace("Received emtpy responses list, do not add."); -//// } -// } catch (InterruptedException ex) { -// log.error("Interrupted while setting STAL response: " + ex.getMessage()); -//// broker.interrupt(new ErrorResponse()); -// } catch (TimeoutException ex) { -// log.error("Timeout during setResponse: " + ex.getMessage()); -// } -// } -// -// /** -// * TODO split in nextRequest(void) and setResponses(responses) -// * <br/> -// * Translate (possibly empty) STAL-WS response list to STAL responses and -// * wait until request(s) are available and translate to STAL-WS requests. -// * @param prevResponse if null or zero-length, they are not passed to the handler -// * @return -// */ -// @Override -// public List<STALRequest> nextRequest() { //List<ResponseType> responses) { -// try { -//// if (responses != null && responses.size() > 0) { -//// List<STALResponse> stalResponses = translateResponses(responses); -//// broker.produceResponses(stalResponses); -//// } else { -//// log.trace("Received emtpy responses list, do not add."); -//// } -// -//// List<? extends STALRequest> stalRequests = broker.consumeRequests(); -//// List<RequestType> requests = translateRequests(stalRequests); -// return broker.consumeRequests(); -//// } catch (InterruptedException ex) { -//// log.error("Interrupted while requesting next STAL request: " + ex.getMessage()); -//// return Collections.singletonList((STALResponse) new ErrorResponse()); -// } catch (InterruptedException ex) { -// log.error("Interrupted while requesting next STAL request: " + ex.getMessage()); -//// broker.interrupt(new ErrorResponse()); -// return new ArrayList<STALRequest>(); -// } catch (TimeoutException ex) { -// log.error("Timeout during nextRequest: " + ex.getMessage()); -// return new ArrayList<STALRequest>(); -// } -// } -// -//// @Override -//// public void interruptRequestHandling(ErrorResponseType error) { -//// if (log.isTraceEnabled()) { -//// log.trace("Received Error: " + error.getErrorMessage()); -//// } -//// broker.interrupt(new ErrorResponse(error.getErrorCode())); -//// } -// -// //TODO -//// private List<RequestType> translateRequests(List<? extends STALRequest> stalRequests) { -//// List<RequestType> requests = new ArrayList<RequestType>(stalRequests.size()); -//// for (STALRequest stalRequest : stalRequests) { -//// if (stalRequest instanceof InfoboxReadRequest) { -//// InfoboxReadRequestType req = new InfoboxReadRequestType(); -//// req.setInfoboxIdentifier(((InfoboxReadRequest) stalRequest).getInfoboxIdentifier()); -//// log.warn("TODO consider domain identifier for infobox " + req.getInfoboxIdentifier()); -//// req.setDomainIdentifier("TODO"); -//// requests.add(req); -//// } else if (stalRequest instanceof SignRequest) { -//// //TODO -//// //remember current sign request for getSignedReferences() -//// throw new UnsupportedOperationException("SignRequest unsupported"); -//// } else if (stalRequest instanceof QuitRequest) { -//// requests.add(new QuitRequestType()); -//// } else { -//// log.error("Unknown STAL request: " + stalRequest.getClass().getName()); -//// } -//// } -//// return requests; -//// } -// -//// private List<STALResponse> translateResponses(List<ResponseType> responses) { -//// List<STALResponse> stalResponses = new ArrayList<STALResponse>(responses.size()); -//// for (ResponseType response : responses) { -//// if (response instanceof InfoboxReadResponseType) { -//// byte[] infoboxValue = ((InfoboxReadResponseType) response).getInfoboxValue(); -//// stalResponses.add(new InfoboxReadResponse(infoboxValue)); -//// } else if (response instanceof SignResponseType) { -//// byte[] signatureValue = ((SignResponseType) response).getSignatureValue(); -//// stalResponses.add(new SignResponse(signatureValue)); -//// } else if (response instanceof ErrorResponseType) { -//// int errorCode = ((ErrorResponseType) response).getErrorCode(); -//// log.warn("TODO consider error msg: " + ((ErrorResponseType) response).getErrorMessage()); -//// stalResponses.add(new ErrorResponse(errorCode)); -//// } else { -//// log.error("Unknown STAL service response " + response.getId() + ": " + response.getClass().getName()); -//// } -//// } -//// return stalResponses; -//// } -// /** -// * synchronize on this, not on request/response lists since they are nulled -// */ -// // protected since outer handler field is protected -// protected class RequestResponseBroker { //implements Runnable { -// -// protected List<STALRequest> requests = null; -// protected List<STALResponse> responses = null; -// protected HashDataInputCallback currentHashDataInputCallback; -// -//// @Override -//// public void run() { -//// while (true) { -//// ; -//// } -//// //TODO handler lifecycle in run()? -//// } -// /** -// * wait until requests are consumed, -// * produce requests, remember sigRefCallback and notify consumer -// * (no need for synchronized?) -// * @param requests -// */ -// public synchronized void produceRequests(List<STALRequest> requests) throws InterruptedException, TimeoutException { -//// synchronized (requests) { -// -// // requests is null, since there's only one producer thread calling handleRequests() -// // and handleRequest() returns only if nextRequest() was called -// while (this.requests != null) { -//// requests.wait(); -// long before = System.currentTimeMillis(); -// log.trace("waiting to produce requests ..."); -// wait(); //TIMEOUT_MS); -// if (System.currentTimeMillis() - before >= TIMEOUT_MS) { -// log.error("Timeout while waiting to produce requests."); -// throw new TimeoutException(); -// } -// } -// log.trace("producing requests"); -// this.requests = requests; -// // getSignedReferences does not produce responses, -// // so the command thread will not continue (and no further signRequest can possibly be produced) -// // once the ws-client sends nextRequest with responses to the signRequest, the callback is invalidated -// -// // reset callback if for some reason produceResponse() wasn't called -// currentHashDataInputCallback = null; -// for (STALRequest request : requests) { -// if (request instanceof SignRequest) { -// log.trace("keep hashdatainput callback"); -// currentHashDataInputCallback = ((SignRequest) request).getHashDataInput(); -// break; -// } -// } -// -//// requests.notify(); -// log.trace("notifying request consumers (TODO not only consumers)"); -// notify(); -//// } -// } -// -// /** -// * wait until requests are produced and consume them -// * @return -// */ -// public synchronized List<STALRequest> consumeRequests() throws InterruptedException, TimeoutException { -// List<STALRequest> retVal = null; -//// synchronized (requests) { -// while (requests == null) { -//// requests.wait(); -// long before = System.currentTimeMillis(); -// log.trace("waiting to consumer requests ..."); -// wait(); //TIMEOUT_MS); -// if (System.currentTimeMillis() - before >= TIMEOUT_MS) { -// log.error("Timeout while waiting to consume requests."); -// throw new TimeoutException(); -// } -// } -// log.trace("consuming requests"); -// retVal = requests; -// requests = null; -//// } -// log.trace("???notify request producers???"); -// return retVal; -// } -// -// /** -// * wait until previous responses are consumed, -// * produce responses and notify consumer -// * @param responses -// */ -// public synchronized void produceResponses(List<STALResponse> responses) throws InterruptedException, TimeoutException { -//// synchronized (responses) { -// while (this.responses != null) { -//// responses.wait(); -// long before = System.currentTimeMillis(); -// log.trace("waiting to produce responses ..."); -// wait(); //TIMEOUT_MS); -// if (System.currentTimeMillis() - before >= TIMEOUT_MS) { -// log.error("Timeout while waiting to produce responses."); -// throw new TimeoutException(); -// } -// } -// log.trace("producing responses"); -// this.responses = responses; -// //invalidate sigrefcallback (from now on handleRequest() may be called, producing new requests) -// //make sure the provided responses are for the corresponding signrequest -// if (this.requests == null) {//requests already consumed=>responses correspond to these -// log.trace("resetting current hashdatainput"); -// currentHashDataInputCallback = null; -// } -//// responses.notify(); -// log.trace("notify response consumers (TODO only consumers?)"); -// notify(); -//// } -// } -// -// /** -// * wait until responses are available, consume them -// * @return -// * @throws java.lang.Exception -// */ -// public synchronized List<STALResponse> consumeResponses() throws InterruptedException, TimeoutException { -// List<STALResponse> retVal = null; -//// synchronized (responses) { -// while (responses == null) { -//// responses.wait(); -// long before = System.currentTimeMillis(); -// log.trace("waiting to consume responses ..."); -// wait(); //TIMEOUT_MS); -// if (System.currentTimeMillis() - before >= TIMEOUT_MS) { -// log.error("Timeout while waiting to consume responses."); -// throw new TimeoutException(); -// } -// } -// log.trace("consuming responses"); -// retVal = responses; -// responses = null; -//// } -// log.trace("???notify response producers???"); -// return retVal; -// } -// -// /** -// * get the signrefcallback until handleRequest() is called the next time. -// * @return null if last request was not a signRequest -// */ -// public synchronized HashDataInputCallback getCurrentHashDataInputCallback() { -// log.trace("obtain current hashdatainput"); -// return currentHashDataInputCallback; -// } -// /** -// * add the error to responses and notify (response-) consumers -// * @param error -// */ -//// public synchronized void interrupt(ErrorResponse error) { -////// synchronized (responses) { -//// if (responses == null) { -//// responses = Collections.singletonList((STALResponse) error); -//// } else { -//// responses.add(error); -//// } -////// responses.notify(); -//// notify(); -////// } -//// } -// } + @Override public void setLocale(Locale locale) { // TODO Auto-generated method stub |