diff options
Diffstat (limited to 'BKUOnline')
3 files changed, 179 insertions, 66 deletions
| diff --git a/BKUOnline/pom.xml b/BKUOnline/pom.xml index d6d26a82..e699d16b 100644 --- a/BKUOnline/pom.xml +++ b/BKUOnline/pom.xml @@ -120,6 +120,13 @@  					</execution>
  				</executions>
  			</plugin>
 +            <!--plugin>
 +              <groupId>org.apache.maven.plugins</groupId>
 +              <artifactId>maven-surefire-plugin</artifactId> 
 +              <configuration>
 +                <skip>true</skip>
 +              </configuration> 
 +            </plugin-->
  			<!--plugin> 
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>jaxws-maven-plugin</artifactId> <executions> <execution>
 diff --git a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java index 7897f984..dc3cc6d3 100644 --- a/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java +++ b/BKUOnline/src/main/java/at/gv/egiz/stal/service/impl/STALRequestBrokerImpl.java @@ -47,82 +47,79 @@ import org.apache.commons.logging.LogFactory;  public class STALRequestBrokerImpl implements STALRequestBroker {      private static final Log log = LogFactory.getLog(STALRequestBrokerImpl.class); -    protected List<STALRequest> requests = null; -    protected List<STALResponse> responses = null; -    protected List<HashDataInput> currentHashDataInput; -//    private boolean isHandlingRequest = false; +      private boolean expectingResponse = false;      private boolean interrupted = false; +    private final RequestsMonitor reqMon = new RequestsMonitor(); +    private final ResponsesMonitor respMon = new ResponsesMonitor(); +     +    private long timeout; + +    public STALRequestBrokerImpl(long timeoutMillisec) { +      if (timeoutMillisec <= 0)  +        timeoutMillisec = DEFAULT_TIMEOUT_MS; +      this.timeout = timeoutMillisec; +    } +          /**       * Produce requests (and HashDataInputCallback) and wait for responses. -     * The next thread may enter once we consumed the responses. +     * This method is not thread safe, since every bindingprocessor thread possesses it's own instance. +     * It however assures cooperation with STAL webservice threads consuming the requests and producing responses.       *        * @param requests       * @return       *  -     * @pre requests either single SignRequest, QuitRequest or multiple ReadInfoboxRequests +     * @pre requests: either single SignRequest, QuitRequest or multiple ReadInfoboxRequests       */      @Override -    public synchronized List<STALResponse> handleRequest(List<STALRequest> requests) { +    public List<STALResponse> handleRequest(List<STALRequest> requests) {        if (interrupted) {          return null;        }          try { -//            long beforeWait = System.currentTimeMillis(); -//            while (isHandlingRequest) { -//                log.trace("waiting to produce request"); -//                wait(TIMEOUT_MS); -//                if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { -//                    log.warn("timeout while waiting to produce request"); -//                    return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000)); -//                } -//            } +          synchronized (reqMon) {              log.trace("produce request"); -//            isHandlingRequest = true; -            this.requests = requests; -            currentHashDataInput = null; +            reqMon.produce(requests); +            reqMon.setHashDataInput(null);              for (STALRequest request : requests) {                  if (request instanceof SignRequest) {                      log.trace("Received SignRequest, keep HashDataInput."); -                    currentHashDataInput = ((SignRequest) request).getHashDataInput(); +                    reqMon.setHashDataInput(((SignRequest) request).getHashDataInput());                      break;                  } else if (request instanceof QuitRequest) {                      log.trace("Received QuitRequest, do not wait for responses.");                      log.trace("notifying request consumers"); -                    notify(); +                    reqMon.notify();                      return new ArrayList<STALResponse>();                  } else if (log.isTraceEnabled()) {                      log.trace("Received STAL request: " + request.getClass().getName());                  }              }              log.trace("notifying request consumers"); -            notify(); - +            reqMon.notify(); +          } +           +          synchronized (respMon) {              long beforeWait = System.currentTimeMillis(); -            while (this.responses == null) { +            while (respMon.responses == null) {                  log.trace("waiting to consume response"); -                wait(TIMEOUT_MS); -                if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { -                    log.warn("timeout while waiting to consume response"); -                    this.requests = null; -                    currentHashDataInput = null; -//                    isHandlingRequest = false; +                respMon.wait(timeout); +                if (System.currentTimeMillis() - beforeWait >= timeout) { +                    log.warn("timeout while waiting to consume response, cleanup requests"); +                    reqMon.consume(); //TODO check deadlock? +                    reqMon.setHashDataInput(null);                      return Collections.singletonList((STALResponse) new ErrorResponse(ERR_6000));                  }              }              log.trace("consuming responses"); -            List<STALResponse> resps = responses; -            responses = null; +            List<STALResponse> responses = respMon.consume();              log.trace("notifying response producers"); -            notify(); - -//            isHandlingRequest = false; -//            log.trace("notifying request producers"); -//            notify(); +            respMon.notify(); -            return resps; +            return responses; +          }          } catch (InterruptedException ex) {              log.warn("interrupt in handleRequest(): " + ex.getMessage());              interrupted = true; @@ -131,68 +128,75 @@ public class STALRequestBrokerImpl implements STALRequestBroker {      }      /** +     * This method is thread-safe, except for  +     * an 'initial' call to nextRequest(null) followed by a +     * 'zombie' call to nextRequest(notNull).  +     * This case (per design) leads to a timeout of the original call. +     * (synchronizing the entire method does not  +     * hinder the zombie to interrupt two consecutive nextRequest() calls.)       *        * @param responses       * @return QUIT if expected responses are not provided       */      @Override -    public synchronized List<STALRequest> nextRequest(List<STALResponse> responses) { +    public List<STALRequest> nextRequest(List<STALResponse> responses) {        if (interrupted) {          return null;        }          try { +          synchronized (respMon) {              if (responses != null && responses.size() > 0) {                  if (!expectingResponse) { -                    log.warn("Received unexpected response in nextRequest()"); +                    log.warn("Received unexpected response in nextRequest(), return QUIT");                      return Collections.singletonList((STALRequest) new QuitRequest());                  }                  long beforeWait = System.currentTimeMillis(); -                while (this.responses != null) { +                while (respMon.responses != null) {                      log.trace("waiting to produce response"); -                    wait(TIMEOUT_MS); -                    if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { +                    respMon.wait(timeout); +                    if (System.currentTimeMillis() - beforeWait >= timeout) {                          log.warn("timeout while waiting to produce response");                          return Collections.singletonList((STALRequest) new QuitRequest());                      }                  }                  log.trace("produce response"); -                this.responses = responses; -                //reset HashDataInputCallback +                respMon.produce(responses); +                //reset HashDataInputCallback iff SignResponse                  if (log.isTraceEnabled()) {                      for (STALResponse response : responses) {                          log.trace("Received STAL response: " + response.getClass().getName());                      }                  }                  log.trace("notifying response consumers"); -                notify(); +                respMon.notify();              } else {                  if (expectingResponse) { -                    log.warn("No expected response received in nextRequest()"); +                    log.warn("Did not receive expected response(s) in nextRequest(), return QUIT");                      return Collections.singletonList((STALRequest) new QuitRequest());                  }                  log.trace("expecting non-null response in next nextRequest(response)");                  expectingResponse = true;              } +          } +           +          synchronized (reqMon) {              long beforeWait = System.currentTimeMillis(); -            while (this.requests == null) { +            while (reqMon.requests == null) {                  log.trace("waiting to consume request"); -                wait(TIMEOUT_MS); -                if (System.currentTimeMillis() - beforeWait >= TIMEOUT_MS) { +                reqMon.wait(timeout); +                if (System.currentTimeMillis() - beforeWait >= timeout) {                      log.warn("timeout while waiting to consume request");                      return Collections.singletonList((STALRequest) new QuitRequest());                  }              }              log.trace("consume request"); -            List<STALRequest> reqs = requests; -            requests = null; -            if (reqs.size() > 0 && reqs.get(0) instanceof QuitRequest) { -//                isHandlingRequest = false; -//                log.trace("consumed QUIT, notifying request producers"); -//                notify(); +            List<STALRequest> requests = reqMon.consume(); +            if (requests.size() > 0 && requests.get(0) instanceof QuitRequest) {                  log.trace("expecting no response in next nextRequest()");                  expectingResponse = false;              } -            return reqs; +            return requests; +          }          } catch (InterruptedException ex) {              log.warn("interrupt in nextRequest(): " + ex.getMessage());              interrupted = true; @@ -202,12 +206,48 @@ public class STALRequestBrokerImpl implements STALRequestBroker {      @Override      public synchronized List<HashDataInput> getHashDataInput() { -        log.trace("return " + currentHashDataInput.size() + " current HashDataInput(s) "); -        return currentHashDataInput; +        log.trace("return " + reqMon.hashDataInput.size() + " current HashDataInput(s) "); +        return reqMon.getHashDataInput();      }      @Override      public void setLocale(Locale locale) { -        // TODO Auto-generated method stub +    } +     +    class RequestsMonitor { +      List<STALRequest> requests; +      List<HashDataInput> hashDataInput; +       +      void produce(List<STALRequest> req) { +        requests = req; +      } +       +      synchronized List<STALRequest> consume() { +        List<STALRequest> reqs = requests; +        requests = null; +        return reqs; +      } +       +      void setHashDataInput(List<HashDataInput> hdi) { +        hashDataInput = hdi; +      } +       +      List<HashDataInput> getHashDataInput() { +        return hashDataInput; +      } +    } +     +    class ResponsesMonitor { +      List<STALResponse> responses; +       +      void produce(List<STALResponse> resp) { +        responses = resp; +      } +       +      synchronized List<STALResponse> consume() { +        List<STALResponse> resps = responses; +        responses = null; +        return resps; +      }      }  } diff --git a/BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java b/BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java index 2bcc96ae..d6ce2720 100644 --- a/BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java +++ b/BKUOnline/src/test/java/at/gv/egiz/stal/service/STALRequestBrokerTest.java @@ -78,8 +78,9 @@ public class STALRequestBrokerTest {          BindingProcessorSimulator bp = new BindingProcessorSimulator();          bp.setRequests(Collections.singletonList(requests)); +        new Thread(new ServiceSimulator(), "STALService1").start();          new Thread(bp, "BindingProcessor").start(); -        new Thread(new ServiceSimulator(), "STALService").start(); +        new Thread(new ServiceSimulator(), "STALService2").start();          try {              Thread.sleep(1000); @@ -88,7 +89,7 @@ public class STALRequestBrokerTest {          }      } -    @Ignore +    @Test      public void testSign() {          log.debug("**************** test SignRequest");          List<STALRequest> requests = new ArrayList<STALRequest>(); @@ -125,7 +126,8 @@ public class STALRequestBrokerTest {          new Thread(bp, "BindingProcessor").start();  //        new Thread(bp2, "BindingProcessor2").start();          new Thread(new ServiceSimulator(), "STALService").start(); - +        new Thread(new ZombieServiceSimulator(), "STALServiceZombie").start(); +                  try {              Thread.sleep(1000);          } catch (InterruptedException ex) { @@ -171,7 +173,7 @@ public class STALRequestBrokerTest {          new Thread(new TimeoutServiceSimulator(), "STALService").start();          try { -            Thread.sleep(STALRequestBroker.TIMEOUT_MS + 1); +            Thread.sleep(STALRequestBroker.DEFAULT_TIMEOUT_MS + 1);          } catch (InterruptedException ex) {              log.error("interrupted: " + ex.getMessage());          } @@ -186,13 +188,13 @@ public class STALRequestBrokerTest {          new Thread(new ServiceSimulator(), "STALService").start();          try { -            Thread.sleep(STALRequestBroker.TIMEOUT_MS + 1); +            Thread.sleep(STALRequestBroker.DEFAULT_TIMEOUT_MS + 1);          } catch (InterruptedException ex) {              log.error("interrupted: " + ex.getMessage());          }      } -    @Test +    @Ignore      public void testMultipleServices() {          log.debug("**************** test multiple SignRequests");          List<STALRequest> requests = new ArrayList<STALRequest>(); @@ -269,6 +271,70 @@ public class STALRequestBrokerTest {          }      } +    class ZombieServiceSimulator implements Runnable { +     +        @Override +        public void run() { +            try { +                log.debug("calling stal.nextRequest(oldResponse)"); +                STALResponse oldResp = new InfoboxReadResponse(); +                List<STALRequest> requests = stal.nextRequest(Collections.singletonList(oldResp)); +                log.debug("got " + requests.size() + " requests. processing..."); +                Thread.sleep(1); +                List<STALResponse> responses = new ArrayList<STALResponse>(); +                for (STALRequest request : requests) { +                    if (request instanceof InfoboxReadRequest) { +                      log.debug("received UNEXPECTED READINFOBOX request"); +                       +                        InfoboxReadResponse r = new InfoboxReadResponse(); +                        r.setInfoboxValue("dummyInfobox".getBytes()); +                        responses.add(r); +                    } else if (request instanceof SignRequest) { + +                      log.debug("received UNEXPECTED SIGN request"); +                       +                        log.debug("calling stal.getCurrentHashDataInputCallback"); +                        List<HashDataInput> hdis = stal.getHashDataInput(); +                        assertNotNull(hdis); +                        assertEquals(hdis.size(), 1); +                        HashDataInput hdi = hdis.get(0);// cb.getHashDataInput("1234"); +                        InputStream hd = hdi.getHashDataInput(); +                        byte[] data = new byte[hd.available()]; +                        hd.read(data); +                        log.debug("got HashDataInput " + new String(data)); + + +                        SignResponse r = new SignResponse(); +                        r.setSignatureValue("dummySignature".getBytes()); +                        responses.add(r); +                    } else if (request instanceof QuitRequest) { +                        log.debug("received EXPECTED QUIT request"); +                        return; +                    } +                } + +//                if (requests.size() > 0) { +//                    log.debug("calling stal.setResponse with " + requests.size() + " responses"); +//                    stal.setResponse(responses); +//                } +                log.debug("calling stal.nextRequest with " + responses.size() + " responses"); +                requests = stal.nextRequest(responses); +                for (STALRequest request : requests) { +                    if (request instanceof QuitRequest) { +                        log.debug("got QUIT request"); +                    } else { +                        log.debug("expected QUIT request, got " + request.getClass().getName()); +                    } +                } +            } catch (IOException ex) { +                log.error(ex.getMessage()); +            } catch (InterruptedException ex) { +                log.error(ex.getMessage()); +            } +        } +       +    } +          class ServiceSimulator implements Runnable {          @Override | 
