From d0879e9058943c6afa1912ccbeae936db2811f26 Mon Sep 17 00:00:00 2001 From: clemenso Date: Tue, 30 Sep 2008 13:54:54 +0000 Subject: backport to JAXWS2.0 STALService initial connect() git-svn-id: https://joinup.ec.europa.eu/svn/mocca/trunk@76 8a26b1a7-26f0-462f-b9ef-d0e30c41f5a4 --- .../stal/service/impl/STALRequestBrokerImpl.java | 324 +++++++++++++++------ 1 file changed, 227 insertions(+), 97 deletions(-) (limited to 'BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java') diff --git a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java index dc3cc6d3..bfa83dd4 100644 --- a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java +++ b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java @@ -22,10 +22,17 @@ package at.gv.egiz.stal.service.impl; import at.gv.egiz.stal.ErrorResponse; import at.gv.egiz.stal.HashDataInput; +import at.gv.egiz.stal.InfoboxReadRequest; import at.gv.egiz.stal.QuitRequest; import at.gv.egiz.stal.STALRequest; import at.gv.egiz.stal.STALResponse; import at.gv.egiz.stal.SignRequest; +import at.gv.egiz.stal.service.types.InfoboxReadRequestType; +import at.gv.egiz.stal.service.types.QuitRequestType; +import at.gv.egiz.stal.service.types.RequestType; +import at.gv.egiz.stal.service.types.ResponseType; +import at.gv.egiz.stal.service.types.SignRequestType; +import at.gv.egiz.stal.util.STALTranslator; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -48,18 +55,26 @@ public class STALRequestBrokerImpl implements STALRequestBroker { private static final Log log = LogFactory.getLog(STALRequestBrokerImpl.class); - private boolean expectingResponse = false; +// private boolean expectingResponse = false; private boolean interrupted = false; - private final RequestsMonitor reqMon = new RequestsMonitor(); - private final ResponsesMonitor respMon = new ResponsesMonitor(); +// private final RequestsMonitor reqMon = new RequestsMonitor(); +// private final ResponsesMonitor respMon = new ResponsesMonitor(); + + protected ArrayList requests; + protected ArrayList responses; + + protected ArrayList hashDataInputs; private long timeout; public STALRequestBrokerImpl(long timeoutMillisec) { if (timeoutMillisec <= 0) timeoutMillisec = DEFAULT_TIMEOUT_MS; - this.timeout = timeoutMillisec; + timeout = timeoutMillisec; + requests = new ArrayList(); + responses = new ArrayList(); + hashDataInputs = new ArrayList(); } /** @@ -73,52 +88,80 @@ public class STALRequestBrokerImpl implements STALRequestBroker { * @pre requests: either single SignRequest, QuitRequest or multiple ReadInfoboxRequests */ @Override - public List handleRequest(List requests) { + public List handleRequest(List stalRequests) { if (interrupted) { return null; } try { - synchronized (reqMon) { + synchronized (requests) { log.trace("produce request"); - reqMon.produce(requests); - reqMon.setHashDataInput(null); - for (STALRequest request : requests) { - if (request instanceof SignRequest) { - log.trace("Received SignRequest, keep HashDataInput."); - reqMon.setHashDataInput(((SignRequest) request).getHashDataInput()); - break; - } else if (request instanceof QuitRequest) { - log.trace("Received QuitRequest, do not wait for responses."); - log.trace("notifying request consumers"); - reqMon.notify(); - return new ArrayList(); - } else if (log.isTraceEnabled()) { - log.trace("Received STAL request: " + request.getClass().getName()); + requests.clear(); + hashDataInputs.clear(); +// reqMon.produce(requests); +// reqMon.setHashDataInput(null); + + for (STALRequest stalRequest : stalRequests) { + if (stalRequest instanceof SignRequest) { + log.trace("Received SignRequest, keep HashDataInput."); + SignRequestType req = new SignRequestType(); + req.setKeyIdentifier(((SignRequest) stalRequest).getKeyIdentifier()); + req.setSignedInfo(((SignRequest) stalRequest).getSignedInfo()); + requests.add(req); + hashDataInputs.addAll(((SignRequest) stalRequest).getHashDataInput()); + break; + } else if (stalRequest instanceof InfoboxReadRequest) { + log.trace("Received InfoboxReadRequest"); + InfoboxReadRequestType req = new InfoboxReadRequestType(); + req.setInfoboxIdentifier(((InfoboxReadRequest) stalRequest).getInfoboxIdentifier()); + req.setDomainIdentifier(((InfoboxReadRequest) stalRequest).getDomainIdentifier()); + requests.add(req); + } else if (stalRequest instanceof QuitRequest) { + log.trace("Received QuitRequest, do not wait for responses."); + requests.add(new QuitRequestType()); + log.trace("notifying request consumers"); + requests.notify(); +// reqMon.notify(); + return new ArrayList(); + } else { + log.error("Received unsupported STAL request: " + stalRequest.getClass().getName() + ", send QUIT"); + requests.clear(); + requests.add(new QuitRequestType()); + log.trace("notifying request consumers"); + requests.notify(); + return new ArrayList(); } } log.trace("notifying request consumers"); - reqMon.notify(); + requests.notify(); +// reqMon.notify(); } - synchronized (respMon) { + synchronized (responses) { //respMon) { long beforeWait = System.currentTimeMillis(); - while (respMon.responses == null) { +// while (respMon.responses == null) { + while (responses.isEmpty()) { log.trace("waiting to consume response"); - respMon.wait(timeout); +// respMon.wait(timeout); + responses.wait(timeout); if (System.currentTimeMillis() - beforeWait >= timeout) { log.warn("timeout while waiting to consume response, cleanup requests"); - reqMon.consume(); //TODO check deadlock? - reqMon.setHashDataInput(null); +// reqMon.consume(); //TODO check deadlock? +// reqMon.setHashDataInput(null); + requests.clear(); //TODO sync on requests? + hashDataInputs.clear(); return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000)); } } log.trace("consuming responses"); - List responses = respMon.consume(); +// List responses = respMon.consume(); + List resps = STALTranslator.toSTAL(responses); + responses.clear(); log.trace("notifying response producers"); - respMon.notify(); + responses.notify(); +// respMon.notify(); - return responses; + return resps; } } catch (InterruptedException ex) { log.warn("interrupt in handleRequest(): " + ex.getMessage()); @@ -127,6 +170,36 @@ public class STALRequestBrokerImpl implements STALRequestBroker { } } + @Override + public List connect() { + if (interrupted) { + return null; + } + try { + synchronized (requests) { + long beforeWait = System.currentTimeMillis(); + while (requests.isEmpty()) { + log.trace("waiting to consume request"); + requests.wait(timeout); + if (System.currentTimeMillis() - beforeWait >= timeout) { + log.warn("timeout while waiting to consume request"); + return Collections.singletonList((RequestType) new QuitRequestType()); + } + } + log.trace("consume request"); + List reqs = new ArrayList(); + reqs.addAll(requests); + + requests.clear(); + return reqs; + } + } catch (InterruptedException ex) { + log.warn("interrupt in nextRequest(): " + ex.getMessage()); + interrupted = true; + return null; + } + } + /** * This method is thread-safe, except for * an 'initial' call to nextRequest(null) followed by a @@ -139,63 +212,74 @@ public class STALRequestBrokerImpl implements STALRequestBroker { * @return QUIT if expected responses are not provided */ @Override - public List nextRequest(List responses) { + public List nextRequest(List resps) { if (interrupted) { return null; } try { - synchronized (respMon) { - if (responses != null && responses.size() > 0) { - if (!expectingResponse) { - log.warn("Received unexpected response in nextRequest(), return QUIT"); - return Collections.singletonList((STALRequest) new QuitRequest()); - } + synchronized (responses) { //respMon) { + if (resps != null && resps.size() > 0) { +// if (!expectingResponse) { +// log.warn("Received unexpected response in nextRequest(), return QUIT"); +// return Collections.singletonList((RequestType) new QuitRequestType()); +// } long beforeWait = System.currentTimeMillis(); - while (respMon.responses != null) { +// while (respMon.responses != null) { + while (!responses.isEmpty()) { log.trace("waiting to produce response"); - respMon.wait(timeout); +// respMon.wait(timeout); + responses.wait(timeout); if (System.currentTimeMillis() - beforeWait >= timeout) { log.warn("timeout while waiting to produce response"); - return Collections.singletonList((STALRequest) new QuitRequest()); + return Collections.singletonList((RequestType) new QuitRequestType()); } } log.trace("produce response"); - respMon.produce(responses); +// respMon.produce(resps); + responses.addAll(resps); //reset HashDataInputCallback iff SignResponse if (log.isTraceEnabled()) { - for (STALResponse response : responses) { + for (ResponseType response : resps) { log.trace("Received STAL response: " + response.getClass().getName()); } } log.trace("notifying response consumers"); - respMon.notify(); +// respMon.notify(); + responses.notify(); } else { - if (expectingResponse) { - log.warn("Did not receive expected response(s) in nextRequest(), return QUIT"); - return Collections.singletonList((STALRequest) new QuitRequest()); - } - log.trace("expecting non-null response in next nextRequest(response)"); - expectingResponse = true; +// if (expectingResponse) { +// log.warn("Did not receive expected response(s) in nextRequest(), return QUIT"); +// return Collections.singletonList((RequestType) new QuitRequestType()); +// } +// log.trace("expecting non-null response in next nextRequest(response)"); +// expectingResponse = true; + log.error("Received NextRequest without responses, return QUIT"); + return Collections.singletonList((RequestType) new QuitRequestType()); } } - synchronized (reqMon) { + synchronized (requests) { //reqMon) { long beforeWait = System.currentTimeMillis(); - while (reqMon.requests == null) { +// while (reqMon.requests == null) { + while (requests.isEmpty()) { log.trace("waiting to consume request"); - reqMon.wait(timeout); +// reqMon.wait(timeout); + requests.wait(timeout); if (System.currentTimeMillis() - beforeWait >= timeout) { log.warn("timeout while waiting to consume request"); - return Collections.singletonList((STALRequest) new QuitRequest()); + return Collections.singletonList((RequestType) new QuitRequestType()); } } log.trace("consume request"); - List requests = reqMon.consume(); - if (requests.size() > 0 && requests.get(0) instanceof QuitRequest) { - log.trace("expecting no response in next nextRequest()"); - expectingResponse = false; - } - return requests; + List reqs = new ArrayList(); // reqMon.consume(); + reqs.addAll(requests); + +// if (requests.size() > 0 && requests.get(0) instanceof QuitRequestType) { +// log.trace("expecting no response in next nextRequest()"); +// expectingResponse = false; +// } + requests.clear(); + return reqs; } } catch (InterruptedException ex) { log.warn("interrupt in nextRequest(): " + ex.getMessage()); @@ -205,49 +289,95 @@ public class STALRequestBrokerImpl implements STALRequestBroker { } @Override - public synchronized List getHashDataInput() { - log.trace("return " + reqMon.hashDataInput.size() + " current HashDataInput(s) "); - return reqMon.getHashDataInput(); + public List getHashDataInput() { + synchronized (requests) { + log.trace("return " + hashDataInputs.size() + " current HashDataInput(s) "); + return hashDataInputs; //reqMon.getHashDataInput(); + } } @Override public void setLocale(Locale locale) { } - class RequestsMonitor { - List requests; - List hashDataInput; - - void produce(List req) { - requests = req; - } - - synchronized List consume() { - List reqs = requests; - requests = null; - return reqs; - } - - void setHashDataInput(List hdi) { - hashDataInput = hdi; - } - - List getHashDataInput() { - return hashDataInput; - } - } - - class ResponsesMonitor { - List responses; - - void produce(List resp) { - responses = resp; - } - - synchronized List consume() { - List resps = responses; - responses = null; - return resps; - } - } +// class RequestsMonitor { +// List requests; +// List hashDataInput; +// +// void produce(List req) { +// requests = req; +// } +// +// synchronized List consume() { +// List reqs = new ArrayList(); +// for (STALRequest request : requests) { +// if (request instanceof SignRequest) { +// at.gv.egiz.stal.service.types.SignRequest r = new at.gv.egiz.stal.service.types.SignRequest(); +// r.setKeyIdentifier(((SignRequest) request).getKeyIdentifier()); +// r.setSignedInfo(((SignRequest) request).getSignedInfo()); +// reqs.add(r); +// } else if (request instanceof InfoboxReadRequest) { +// at.gv.egiz.stal.service.types.InfoboxReadRequest r = new at.gv.egiz.stal.service.types.InfoboxReadRequest(); +// r.setDomainIdentifier(((InfoboxReadRequest) request).getDomainIdentifier()); +// r.setInfoboxIdentifier(((InfoboxReadRequest) request).getInfoboxIdentifier()); +// reqs.add(r); +// } else if (request instanceof QuitRequest) { +// at.gv.egiz.stal.service.types.QuitRequest r = new at.gv.egiz.stal.service.types.QuitRequest(); +// reqs.add(r); +// } else { +// log.error("unknown STAL request type: " + request.getClass()); +// requests = null; +// return Collections.singletonList((at.gv.egiz.stal.service.types.STALRequest) new at.gv.egiz.stal.service.types.QuitRequest()); +// } +// } +// requests = null; +// return reqs; +// } +// +// void setHashDataInput(List hdi) { +// hashDataInput = hdi; +// } +// +// List getHashDataInput() { +// return hashDataInput; +// } +// } +// +// /** TODO: now, that responses are not nulled, synchronize directly on responses? */ +// class ResponsesMonitor { +// List responses; +// +// void produce(List resp) { +// responses = resp; +// } +// +// synchronized List consume() { +// List resps = new ArrayList(); +// +// for (at.gv.egiz.stal.service.types.STALResponse response : responses) { +// if (response instanceof at.gv.egiz.stal.service.types.InfoboxReadResponse) { +// InfoboxReadResponse r = new InfoboxReadResponse(); +// r.setInfoboxValue(((at.gv.egiz.stal.service.types.InfoboxReadResponse) response).getInfoboxValue()); +// resps.add(r); +// } else if (response instanceof at.gv.egiz.stal.service.types.SignResponse) { +// SignResponse r = new SignResponse(); +// r.setSignatureValue(((at.gv.egiz.stal.service.types.SignResponse) response).getSignatureValue()); +// resps.add(r); +// } else if (response instanceof at.gv.egiz.stal.service.types.ErrorResponse) { +// ErrorResponse r = new ErrorResponse(); +// r.setErrorCode(((at.gv.egiz.stal.service.types.ErrorResponse) response).getErrorCode()); +// r.setErrorMessage(((at.gv.egiz.stal.service.types.ErrorResponse) response).getErrorMessage()); +// resps.add(r); +// } else { +// log.error("unknown STAL response type: " + response.getClass()); +// ErrorResponse r = new ErrorResponse(4000); +// r.setErrorMessage("unknown STAL response type: " + response.getClass()); +// responses = null; +// return Collections.singletonList((STALResponse) r); +// } +// } +// responses = null; +// return resps; +// } +// } } -- cgit v1.2.3