From 6d30f261c618a3b69a8f1be092056383e6dea424 Mon Sep 17 00:00:00 2001 From: Christof Rabensteiner Date: Thu, 18 Jul 2019 09:23:00 +0200 Subject: Integrate Mzs Service Timeout and Handle Async Responses - Service Timeout : Add service timeout to mzs schema (DeliveryRequest / Config), application.yaml, convert service timeout from spring environment to Config, and merge service timeouts in Configs. - Handling of Asynchronous DeliveryRequestStatus: Move "Applying Response Sinks" from backend into dedicated component "MsgResponseSinkHub" and integrate SinkHub into MzsService (apply sinks to asynchronous responses). - Remove line breaks in SignatureVerifier's log statements. - Revise documentation of parameters in application.yaml. --- .../java/at/gv/egiz/moazs/service/MzsService.java | 54 +++++++++++++--------- 1 file changed, 32 insertions(+), 22 deletions(-) (limited to 'src/main/java/at/gv/egiz/moazs/service/MzsService.java') diff --git a/src/main/java/at/gv/egiz/moazs/service/MzsService.java b/src/main/java/at/gv/egiz/moazs/service/MzsService.java index 8f0ef86..c6871da 100644 --- a/src/main/java/at/gv/egiz/moazs/service/MzsService.java +++ b/src/main/java/at/gv/egiz/moazs/service/MzsService.java @@ -1,5 +1,6 @@ package at.gv.egiz.moazs.service; +import at.gv.egiz.moazs.backend.MsgResponseSinkHub; import at.gv.egiz.moazs.preprocess.DeliveryRequestAugmenter; import at.gv.egiz.moazs.repository.DeliveryRepository; import at.gv.egiz.moazs.scheme.Msg2MzsConverter; @@ -7,7 +8,6 @@ import at.gv.egiz.moazs.scheme.RequestStatusResponse; import at.gv.zustellung.app2mzs.xsd.App2MzsPortType; import at.gv.zustellung.app2mzs.xsd.DeliveryRequestType; import at.gv.zustellung.app2mzs.xsd.DeliveryResponseType; -import at.gv.zustellung.msg.xsd.DeliveryRequestStatusType; import org.apache.cxf.annotations.SchemaValidation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,46 +31,57 @@ public class MzsService implements App2MzsPortType { private static final Logger logger = LoggerFactory.getLogger(MzsService.class); - //TODO move timeout and namespaces to config - private static final int TIMEOUT_FOR_ANWSER = 10; private static final String MZS_SERVICE_ERROR_MSG = "An error occurred while processing DeliveryRequest " + "with AppDeliveryID=%s."; private static final String RESPONSE_MISSING_ERROR_MSG = "Could not get a response for AppDeliveryID=%s."; + private static final String SERVICE_TIME_OUT_REACHED_MSG = "Backend processing for DeliveryRequest with " + + "AppDeliveryID=%s timed out. "; private final DeliveryRepository repository; private final Consumer backend; private final DeliveryRequestAugmenter augmenter; private final Msg2MzsConverter converter; + private final MsgResponseSinkHub hub; @Autowired public MzsService(DeliveryRepository repository, Consumer deliveryRequestBackend, - DeliveryRequestAugmenter augmenter, Msg2MzsConverter converter) { + DeliveryRequestAugmenter augmenter, Msg2MzsConverter converter, + MsgResponseSinkHub hub) { this.repository = repository; this.backend = deliveryRequestBackend; this.augmenter = augmenter; this.converter = converter; + this.hub = hub; } @Override public DeliveryResponseType app2Mzs( - @WebParam(partName = "DeliveryRequest", - name = "DeliveryRequest") - DeliveryRequestType deliveryRequest) { + @WebParam(partName = "DeliveryRequest",name = "DeliveryRequest") DeliveryRequestType deliveryRequest) { var appDeliveryID = deliveryRequest.getMetaData().getAppDeliveryID(); - var responseID = RequestStatusResponse.getResponseID(appDeliveryID); - var future = supplyAsync(() -> augmenter.augment(deliveryRequest)) - .thenApply(this::process) - .thenApply(status -> converter.convert(status, repository.retrieveBinaryResponse(responseID))); + var completeRequest = augmenter.augment(deliveryRequest); + + var requestProcessed = supplyAsync(() -> process(completeRequest)); try { - return future.get(TIMEOUT_FOR_ANWSER, TimeUnit.SECONDS); - } catch (TimeoutException e) { - logger.info("Answer Timed Out", e); + var serviceTimeout = completeRequest.getConfig().getServiceTimeout(); + RequestStatusResponse response; + + if (serviceTimeout == null) { + response = requestProcessed.get(); + } else { + response = requestProcessed.get(serviceTimeout.longValue(), TimeUnit.SECONDS); + } + + var status = response.getResponse(); + var binaryStatus = repository.retrieveBinaryResponse(response.getResponseID()); + return converter.convert(status, binaryStatus); - //TODO: revise how notification should be sent - //future.thenAccept(appClient::sendNotification); + } catch (TimeoutException e) { + logger.info(format(SERVICE_TIME_OUT_REACHED_MSG, appDeliveryID), e); + var sinkParams = completeRequest.getConfig().getMsgResponseSinks(); + requestProcessed.thenAcceptAsync(response -> hub.applySinks(response, sinkParams)); return generatePartialSuccessResponse(appDeliveryID); } catch (Exception e) { @@ -80,19 +91,18 @@ public class MzsService implements App2MzsPortType { } - private DeliveryRequestStatusType process(DeliveryRequestType deliveryRequest) { + private RequestStatusResponse process(DeliveryRequestType deliveryRequest) { var appDeliveryID = deliveryRequest.getMetaData().getAppDeliveryID(); - //TODO: fix too. - logger.info("Receive request with appDeliveryID = {}.", appDeliveryID); repository.store(deliveryRequest); backend.accept(appDeliveryID); - var statusId = RequestStatusResponse.getResponseID(appDeliveryID); - var response = repository.retrieveResponse(statusId) + var responseID = RequestStatusResponse.getResponseID(appDeliveryID); + + return (RequestStatusResponse) repository.retrieveResponse(responseID) .orElseThrow(() -> moaZSException(format(RESPONSE_MISSING_ERROR_MSG, appDeliveryID))); - return (DeliveryRequestStatusType) response.getResponse(); + } private DeliveryResponseType generatePartialSuccessResponse(String appDeliveryId) { -- cgit v1.2.3