diff options
Diffstat (limited to 'BKUOnline/src/main')
| -rw-r--r-- | BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java | 160 | 
1 files changed, 100 insertions, 60 deletions
| diff --git a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java index 7897f984..dc3cc6d3 100644 --- a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java +++ b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java @@ -47,82 +47,79 @@ import org.apache.commons.logging.LogFactory;  public class STALRequestBrokerImpl implements STALRequestBroker {      private static final Log log = LogFactory.getLog(STALRequestBrokerImpl.class); -    protected List<STALRequest> requests = null; -    protected List<STALResponse> responses = null; -    protected List<HashDataInput> currentHashDataInput; -//    private boolean isHandlingRequest = false; +      private boolean expectingResponse = false;      private boolean interrupted = false; +    private final RequestsMonitor reqMon = new RequestsMonitor(); +    private final ResponsesMonitor respMon = new ResponsesMonitor(); +     +    private long timeout; + +    public STALRequestBrokerImpl(long timeoutMillisec) { +      if (timeoutMillisec <= 0)  +        timeoutMillisec = DEFAULT_TIMEOUT_MS; +      this.timeout = timeoutMillisec; +    } +          /**       * Produce requests (and HashDataInputCallback) and wait for responses. -     * The next thread may enter once we consumed the responses. +     * This method is not thread safe, since every bindingprocessor thread possesses it's own instance. +     * It however assures cooperation with STAL webservice threads consuming the requests and producing responses.       *        * @param requests       * @return       *  -     * @pre requests either single SignRequest, QuitRequest or multiple ReadInfoboxRequests +     * @pre requests: either single SignRequest, QuitRequest or multiple ReadInfoboxRequests       */      @Override -    public synchronized List<STALResponse> handleRequest(List<STALRequest> requests) { +    public List<STALResponse> handleRequest(List<STALRequest> requests) {        if (interrupted) {          return null;        }          try { -//            long beforeWait = System.currentTimeMillis(); -//            while (isHandlingRequest) { -//                log.trace("waiting to produce request"); -//                wait(TIMEOUT_MS); -//                if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { -//                    log.warn("timeout while waiting to produce request"); -//                    return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000)); -//                } -//            } +          synchronized (reqMon) {              log.trace("produce request"); -//            isHandlingRequest = true; -            this.requests = requests; -            currentHashDataInput = null; +            reqMon.produce(requests); +            reqMon.setHashDataInput(null);              for (STALRequest request : requests) {                  if (request instanceof SignRequest) {                      log.trace("Received SignRequest, keep HashDataInput."); -                    currentHashDataInput = ((SignRequest) request).getHashDataInput(); +                    reqMon.setHashDataInput(((SignRequest) request).getHashDataInput());                      break;                  } else if (request instanceof QuitRequest) {                      log.trace("Received QuitRequest, do not wait for responses.");                      log.trace("notifying request consumers"); -                    notify(); +                    reqMon.notify();                      return new ArrayList<STALResponse>();                  } else if (log.isTraceEnabled()) {                      log.trace("Received STAL request: " + request.getClass().getName());                  }              }              log.trace("notifying request consumers"); -            notify(); - +            reqMon.notify(); +          } +           +          synchronized (respMon) {              long beforeWait = System.currentTimeMillis(); -            while (this.responses == null) { +            while (respMon.responses == null) {                  log.trace("waiting to consume response"); -                wait(TIMEOUT_MS); -                if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { -                    log.warn("timeout while waiting to consume response"); -                    this.requests = null; -                    currentHashDataInput = null; -//                    isHandlingRequest = false; +                respMon.wait(timeout); +                if (System.currentTimeMillis() - beforeWait >= timeout) { +                    log.warn("timeout while waiting to consume response, cleanup requests"); +                    reqMon.consume(); //TODO check deadlock? +                    reqMon.setHashDataInput(null);                      return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000));                  }              }              log.trace("consuming responses"); -            List<STALResponse> resps = responses; -            responses = null; +            List<STALResponse> responses = respMon.consume();              log.trace("notifying response producers"); -            notify(); - -//            isHandlingRequest = false; -//            log.trace("notifying request producers"); -//            notify(); +            respMon.notify(); -            return resps; +            return responses; +          }          } catch (InterruptedException ex) {              log.warn("interrupt in handleRequest(): " + ex.getMessage());              interrupted = true; @@ -131,68 +128,75 @@ public class STALRequestBrokerImpl implements STALRequestBroker {      }      /** +     * This method is thread-safe, except for  +     * an 'initial' call to nextRequest(null) followed by a +     * 'zombie' call to nextRequest(notNull).  +     * This case (per design) leads to a timeout of the original call. +     * (synchronizing the entire method does not  +     * hinder the zombie to interrupt two consecutive nextRequest() calls.)       *        * @param responses       * @return QUIT if expected responses are not provided       */      @Override -    public synchronized List<STALRequest> nextRequest(List<STALResponse> responses) { +    public List<STALRequest> nextRequest(List<STALResponse> responses) {        if (interrupted) {          return null;        }          try { +          synchronized (respMon) {              if (responses != null && responses.size() > 0) {                  if (!expectingResponse) { -                    log.warn("Received unexpected response in nextRequest()"); +                    log.warn("Received unexpected response in nextRequest(), return QUIT");                      return Collections.singletonList((STALRequest) new QuitRequest());                  }                  long beforeWait = System.currentTimeMillis(); -                while (this.responses != null) { +                while (respMon.responses != null) {                      log.trace("waiting to produce response"); -                    wait(TIMEOUT_MS); -                    if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { +                    respMon.wait(timeout); +                    if (System.currentTimeMillis() - beforeWait >= timeout) {                          log.warn("timeout while waiting to produce response");                          return Collections.singletonList((STALRequest) new QuitRequest());                      }                  }                  log.trace("produce response"); -                this.responses = responses; -                //reset HashDataInputCallback +                respMon.produce(responses); +                //reset HashDataInputCallback iff SignResponse                  if (log.isTraceEnabled()) {                      for (STALResponse response : responses) {                          log.trace("Received STAL response: " + response.getClass().getName());                      }                  }                  log.trace("notifying response consumers"); -                notify(); +                respMon.notify();              } else {                  if (expectingResponse) { -                    log.warn("No expected response received in nextRequest()"); +                    log.warn("Did not receive expected response(s) in nextRequest(), return QUIT");                      return Collections.singletonList((STALRequest) new QuitRequest());                  }                  log.trace("expecting non-null response in next nextRequest(response)");                  expectingResponse = true;              } +          } +           +          synchronized (reqMon) {              long beforeWait = System.currentTimeMillis(); -            while (this.requests == null) { +            while (reqMon.requests == null) {                  log.trace("waiting to consume request"); -                wait(TIMEOUT_MS); -                if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { +                reqMon.wait(timeout); +                if (System.currentTimeMillis() - beforeWait >= timeout) {                      log.warn("timeout while waiting to consume request");                      return Collections.singletonList((STALRequest) new QuitRequest());                  }              }              log.trace("consume request"); -            List<STALRequest> reqs = requests; -            requests = null; -            if (reqs.size() > 0 && reqs.get(0) instanceof QuitRequest) { -//                isHandlingRequest = false; -//                log.trace("consumed QUIT, notifying request producers"); -//                notify(); +            List<STALRequest> requests = reqMon.consume(); +            if (requests.size() > 0 && requests.get(0) instanceof QuitRequest) {                  log.trace("expecting no response in next nextRequest()");                  expectingResponse = false;              } -            return reqs; +            return requests; +          }          } catch (InterruptedException ex) {              log.warn("interrupt in nextRequest(): " + ex.getMessage());              interrupted = true; @@ -202,12 +206,48 @@ public class STALRequestBrokerImpl implements STALRequestBroker {      @Override      public synchronized List<HashDataInput> getHashDataInput() { -        log.trace("return " + currentHashDataInput.size() + " current HashDataInput(s) "); -        return currentHashDataInput; +        log.trace("return " + reqMon.hashDataInput.size() + " current HashDataInput(s) "); +        return reqMon.getHashDataInput();      }      @Override      public void setLocale(Locale locale) { -        // TODO Auto-generated method stub +    } +     +    class RequestsMonitor { +      List<STALRequest> requests; +      List<HashDataInput> hashDataInput; +       +      void produce(List<STALRequest> req) { +        requests = req; +      } +       +      synchronized List<STALRequest> consume() { +        List<STALRequest> reqs = requests; +        requests = null; +        return reqs; +      } +       +      void setHashDataInput(List<HashDataInput> hdi) { +        hashDataInput = hdi; +      } +       +      List<HashDataInput> getHashDataInput() { +        return hashDataInput; +      } +    } +     +    class ResponsesMonitor { +      List<STALResponse> responses; +       +      void produce(List<STALResponse> resp) { +        responses = resp; +      } +       +      synchronized List<STALResponse> consume() { +        List<STALResponse> resps = responses; +        responses = null; +        return resps; +      }      }  } | 
