From ff1e38bfa954dd747a5ff185dfe51c120d5ab5e7 Mon Sep 17 00:00:00 2001 From: Christof Rabensteiner Date: Fri, 6 Dec 2019 13:18:12 +0100 Subject: Change msg service: Acknowledge Response iff Backend Succeeds - Before: Upon receipt of a message via the msg/ endpoint, MOA ZS would immediately acknowledge the receipt without verifying that the message was successfully processed by the backend. - Now: MOA ZS receives a message via the msg/ endpoint, forwards it to the sinks, and acknowledges the receipt if and only if the processing succeeded. --- .../gv/egiz/moazs/backend/MsgResponseBackend.java | 20 +++++---- .../gv/egiz/moazs/backend/MsgResponseSinkHub.java | 25 ++++++++---- .../java/at/gv/egiz/moazs/service/MsgService.java | 47 +++++++++++++++++----- .../java/at/gv/egiz/moazs/service/MzsService.java | 3 +- 4 files changed, 65 insertions(+), 30 deletions(-) diff --git a/src/main/java/at/gv/egiz/moazs/backend/MsgResponseBackend.java b/src/main/java/at/gv/egiz/moazs/backend/MsgResponseBackend.java index 81398da..45a4000 100644 --- a/src/main/java/at/gv/egiz/moazs/backend/MsgResponseBackend.java +++ b/src/main/java/at/gv/egiz/moazs/backend/MsgResponseBackend.java @@ -35,6 +35,7 @@ import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Component; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import java.util.function.Supplier; @@ -49,7 +50,7 @@ import static java.util.concurrent.CompletableFuture.supplyAsync; * */ @Component -public class MsgResponseBackend implements Consumer { +public class MsgResponseBackend { private static final Logger log = LoggerFactory.getLogger(MsgService.class); @@ -92,15 +93,10 @@ public class MsgResponseBackend implements Consumer { * * @param responseID refers to MsgResponse Object. */ - @Override - public void accept(String responseID) { + public CompletableFuture accept(String responseID) { log.info("Backend accepts MsgResponse with responseID={}.", responseID); - supplyAsync(() -> verifySignature(responseID), taskExecutor) - .thenAcceptAsync(msgResponse -> applySinks(msgResponse), taskExecutor) - .exceptionally(ex -> { - log.error(ex.getMessage(), ex); - return null; - }); + return supplyAsync(() -> verifySignature(responseID), taskExecutor) + .thenCompose(msgResponse -> applySinks(msgResponse)); } private MsgResponse verifySignature(String responseID) { @@ -117,16 +113,18 @@ public class MsgResponseBackend implements Consumer { signatureVerifier.accept(binaryResponse); return response; + } catch (MoaZSException ex) { log.error(ex.getMessage(), ex); var text = format(MOASP_SIGNATURE_INVALID_ERROR_MSG, response.getAppDeliveryID()); var code = ERROR_MOASP_SIGNATURE_INVALID; return response.generateError(text, code); + } } - private void applySinks(MsgResponse msgResponse) { + private CompletableFuture applySinks(MsgResponse msgResponse) { var appDeliveryID = msgResponse.getAppDeliveryID(); var config = repository @@ -135,7 +133,7 @@ public class MsgResponseBackend implements Consumer { .getConfig(); var sinkParams = config.getMsgResponseSinks(); - hub.applySinks(msgResponse, sinkParams); + return hub.applySinks(msgResponse, sinkParams); } private Supplier supplyRequestWithDefaultConfig(String appDeliveryID) { diff --git a/src/main/java/at/gv/egiz/moazs/backend/MsgResponseSinkHub.java b/src/main/java/at/gv/egiz/moazs/backend/MsgResponseSinkHub.java index 3927055..64b559f 100644 --- a/src/main/java/at/gv/egiz/moazs/backend/MsgResponseSinkHub.java +++ b/src/main/java/at/gv/egiz/moazs/backend/MsgResponseSinkHub.java @@ -22,6 +22,7 @@ package at.gv.egiz.moazs.backend; import at.gv.egiz.moazs.scheme.MsgResponse; +import at.gv.egiz.moazs.scheme.RequestStatusResponse; import at.gv.zustellung.app2mzs.xsd.MsgResponseSinksType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +30,12 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.supplyAsync; /** @@ -43,37 +50,39 @@ public class MsgResponseSinkHub { private final SaveResponseToFileSink saveResponseSink; private final LogResponseSink logResponseSink; private final ForwardResponseToServiceSink forwardResponseSink; - private final TaskExecutor taskExecutor; @Autowired public MsgResponseSinkHub(SaveResponseToFileSink saveResponseSink, LogResponseSink logResponseSink, - ForwardResponseToServiceSink forwardResponseSink, TaskExecutor taskExecutor) { + ForwardResponseToServiceSink forwardResponseSink) { this.saveResponseSink = saveResponseSink; this.logResponseSink = logResponseSink; this.forwardResponseSink = forwardResponseSink; - this.taskExecutor = taskExecutor; } /** * Apply all sinks that are configured in {@code sinkParams} to {@code msgResponse}. */ - public void applySinks(MsgResponse msgResponse, MsgResponseSinksType sinkParams) { + public CompletableFuture applySinks(MsgResponse msgResponse, MsgResponseSinksType sinkParams) { log.info("Apply Sinks to msg:{} with appDeliveryID={}.", msgResponse.getRootElementLocalPart(), msgResponse.getAppDeliveryID()); + List> futures = new ArrayList(); + if (sinkParams.getSaveResponseToFile().isActive()) { - supplyAsync(() -> saveResponseSink.save(msgResponse, sinkParams.getSaveResponseToFile().getPath()), taskExecutor); + futures.add(saveResponseSink.save(msgResponse, sinkParams.getSaveResponseToFile().getPath())); } if (sinkParams.isLogResponse()) { - supplyAsync(() -> logResponseSink.log(msgResponse), taskExecutor); + futures.add(logResponseSink.log(msgResponse)); } if (sinkParams.getForwardResponseToService().isActive()) { - supplyAsync(() -> forwardResponseSink.send( - msgResponse, sinkParams.getForwardResponseToService().getAppClient()), taskExecutor); + futures.add(forwardResponseSink.send( + msgResponse, sinkParams.getForwardResponseToService().getAppClient())); } + return allOf(futures.toArray(new CompletableFuture[futures.size()])); + } } diff --git a/src/main/java/at/gv/egiz/moazs/service/MsgService.java b/src/main/java/at/gv/egiz/moazs/service/MsgService.java index 6b260e7..6fce6ae 100644 --- a/src/main/java/at/gv/egiz/moazs/service/MsgService.java +++ b/src/main/java/at/gv/egiz/moazs/service/MsgService.java @@ -20,6 +20,7 @@ * that you distribute must include a readable copy of the "NOTICE" text file. *******************************************************************************/ package at.gv.egiz.moazs.service; +import at.gv.egiz.moazs.backend.MsgResponseBackend; import at.gv.egiz.moazs.repository.DeliveryRepository; import at.gv.egiz.moazs.scheme.MsgResponse; import at.gv.egiz.moazs.scheme.NameSpace; @@ -27,15 +28,21 @@ import at.gv.egiz.moazs.scheme.NotificationResponse; import at.gv.egiz.moazs.scheme.RequestStatusResponse; import at.gv.zustellung.msg.xsd.*; import org.apache.cxf.annotations.SchemaValidation; +import org.apache.cxf.binding.soap.SoapFault; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.task.TaskExecutor; import org.springframework.stereotype.Service; -import java.util.function.Consumer; +import javax.xml.namespace.QName; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import static at.gv.zustellung.msg.xsd.DeliveryNotificationACKType.deliveryNotificationACKTypeBuilder; import static at.gv.zustellung.msg.xsd.DeliveryRequestStatusACKType.deliveryRequestStatusACKTypeBuilder; import static at.gv.zustellung.msg.xsd.GetVersionResponse.getVersionResponseBuilder; +import static java.lang.String.format; import static java.util.concurrent.CompletableFuture.runAsync; /** @@ -47,12 +54,15 @@ import static java.util.concurrent.CompletableFuture.runAsync; @org.apache.cxf.feature.Features (features = "org.apache.cxf.ext.logging.LoggingFeature") public class MsgService implements Zuse2AppPort { + private static final Logger log = LoggerFactory.getLogger(MsgService.class); + private static final String BACKEND_ERROR_MSG = "Could not process %s."; + private final DeliveryRepository repository; - private final Consumer backend; + private final MsgResponseBackend backend; private final TaskExecutor taskExecutor; @Autowired - public MsgService(DeliveryRepository repository, Consumer msgResponseBackend, TaskExecutor taskExecutor) { + public MsgService(DeliveryRepository repository, MsgResponseBackend msgResponseBackend, TaskExecutor taskExecutor) { this.repository = repository; this.backend = msgResponseBackend; this.taskExecutor = taskExecutor; @@ -68,8 +78,14 @@ public class MsgService implements Zuse2AppPort { @Override public DeliveryRequestStatusACKType status(DeliveryRequestStatusType status) { var response = new RequestStatusResponse(status); - sendToWork(response); - return statusAck(response.getAppDeliveryID(), response.getZSDeliveryID()); + try { + sendToBackend(response).get(); + return statusAck(response.getAppDeliveryID(), response.getZSDeliveryID()); + } catch (InterruptedException | ExecutionException e) { + var message = format(BACKEND_ERROR_MSG, "request status"); + log.error(message, e); + throw toFault(e, message); + } } private DeliveryRequestStatusACKType statusAck(String appDeliveryID, String zsDeliveryID) { @@ -83,13 +99,24 @@ public class MsgService implements Zuse2AppPort { @Override public DeliveryNotificationACKType notification(DeliveryNotificationType notification) { var response = new NotificationResponse(notification); - sendToWork(response); - return notificationAck(response.getAppDeliveryID(), response.getZSDeliveryID()); + try { + sendToBackend(response).get(); + return notificationAck(response.getAppDeliveryID(), response.getZSDeliveryID()); + } catch (InterruptedException | ExecutionException e) { + var message = format(BACKEND_ERROR_MSG, "delivery notification"); + log.error(message, e); + throw toFault(e, message); + } + } + + private RuntimeException toFault(Exception e, String fallbackMessage) { + Throwable cause = (e.getCause() != null) ? e.getCause() : e; + return new SoapFault(fallbackMessage, cause, new QName("faultcode")); } - private void sendToWork(MsgResponse response) { - runAsync(() -> repository.store(response), taskExecutor) - .thenRunAsync(() -> backend.accept(response.getResponseID()), taskExecutor); + private CompletableFuture sendToBackend(MsgResponse response) { + return runAsync(() -> repository.store(response), taskExecutor) + .thenCompose(ignore -> backend.accept(response.getResponseID())); } private DeliveryNotificationACKType notificationAck(String appDeliveryID, String zsDeliveryID) { diff --git a/src/main/java/at/gv/egiz/moazs/service/MzsService.java b/src/main/java/at/gv/egiz/moazs/service/MzsService.java index ee14af8..caaa251 100644 --- a/src/main/java/at/gv/egiz/moazs/service/MzsService.java +++ b/src/main/java/at/gv/egiz/moazs/service/MzsService.java @@ -142,7 +142,8 @@ public class MzsService implements App2MzsPortType { log.info(SERVICE_TIME_OUT_REACHED_MSG, appDeliveryID); var sinkParams = request.getConfig().getMsgResponseSinks(); - requestProcessed.thenAcceptAsync(response -> hub.applySinks(response, sinkParams), taskExecutor); + + requestProcessed.thenCompose(response -> hub.applySinks(response, sinkParams)); return generatePartialSuccessResponse(appDeliveryID); } -- cgit v1.2.3