View Javadoc
1   /*
2    * Copyright 2010 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.ihe.hl7v3.iti55;
17  
18  import java.util.concurrent.ExecutorService;
19  
20  import groovy.util.slurpersupport.GPathResult;
21  import org.apache.camel.CamelContext;
22  import org.apache.camel.Exchange;
23  import org.apache.camel.ProducerTemplate;
24  import org.apache.camel.impl.DefaultExchange;
25  import org.apache.camel.spi.ExecutorServiceManager;
26  import org.apache.cxf.jaxws.context.WebServiceContextImpl;
27  import org.apache.cxf.jaxws.context.WrappedMessageContext;
28  import org.apache.cxf.message.Message;
29  import org.apache.cxf.ws.addressing.AddressingProperties;
30  import org.apache.cxf.ws.addressing.AttributedURIType;
31  import org.openehealth.ipf.commons.ihe.hl7v3.Hl7v3Exception;
32  import org.openehealth.ipf.commons.ihe.hl7v3.Hl7v3Utils;
33  import org.openehealth.ipf.commons.ihe.hl7v3.Hl7v3WsTransactionConfiguration;
34  import org.openehealth.ipf.commons.ihe.hl7v3.iti55.Iti55PortType;
35  import org.openehealth.ipf.commons.ihe.hl7v3.iti55.Iti55Utils;
36  import org.openehealth.ipf.commons.ihe.ws.cxf.audit.AbstractAuditInterceptor;
37  import org.openehealth.ipf.commons.ihe.ws.cxf.audit.WsAuditDataset;
38  import org.openehealth.ipf.platform.camel.core.util.Exchanges;
39  import org.openehealth.ipf.platform.camel.ihe.hl7v3.AbstractHl7v3WebService;
40  import org.openehealth.ipf.platform.camel.ihe.hl7v3.Hl7v3Endpoint;
41  import org.openehealth.ipf.platform.camel.ihe.ws.AbstractWsEndpoint;
42  import org.slf4j.Logger;
43  import org.slf4j.LoggerFactory;
44  
45  import static org.apache.cxf.ws.addressing.JAXWSAConstants.ADDRESSING_PROPERTIES_INBOUND;
46  import static org.apache.cxf.ws.addressing.JAXWSAConstants.ADDRESSING_PROPERTIES_OUTBOUND;
47  import static org.openehealth.ipf.commons.ihe.hl7v3.Hl7v3NakFactory.response;
48  import static org.openehealth.ipf.commons.ihe.hl7v3.XCPD.Interactions.ITI_55;
49  import static org.openehealth.ipf.platform.camel.ihe.hl7v3.iti55.deferredresponse.Iti55DeferredResponseComponent.THREAD_POOL_NAME;
50  
51  /**
52   * Service implementation for the Responding Gateway actor
53   * of the IHE ITI-55 transaction (XCPD)
54   * with support for the Deferred Response option.
55   *
56   * @author Dmytro Rud
57   */
58  public class Iti55Service extends AbstractHl7v3WebService implements Iti55PortType {
59      private static final transient Logger LOG = LoggerFactory.getLogger(Iti55Service.class);
60  
61      private final ProducerTemplate producerTemplate;
62      private final ExecutorService executorService;
63      private final Hl7v3Endpoint<Hl7v3WsTransactionConfiguration> endpoint;
64      private final CamelContext camelContext;
65  
66  
67      /**
68       * Constructor.
69       * @param endpoint
70       *      Camel endpoint instance this Web Service corresponds to.
71       */
72      Iti55Service(Hl7v3Endpoint<Hl7v3WsTransactionConfiguration> endpoint) {
73          super(ITI_55);
74          this.endpoint = endpoint;
75          this.camelContext = endpoint.getCamelContext();
76          this.producerTemplate = this.camelContext.createProducerTemplate();
77          this.executorService = getDeferredResponseExecutorService();
78      }
79  
80      private ExecutorService getDeferredResponseExecutorService(){
81          ExecutorServiceManager manager = this.camelContext.getExecutorServiceManager();
82  
83          //Try to get one from the registry
84          ExecutorService result = manager.newThreadPool(this, THREAD_POOL_NAME, THREAD_POOL_NAME);
85          
86          //Create a default one with non-daemon threads
87          if (result == null){
88              result = manager.newDefaultThreadPool(this, THREAD_POOL_NAME);
89          }
90          return result;
91      }
92  
93  
94      @Override
95      public String discoverPatients(String requestString) {
96          return doProcess(requestString);
97      }
98  
99      @Override
100     public String discoverPatientsDeferred(String requestString) {
101         return doProcess(requestString);
102     }
103 
104 
105     @Override
106     protected String doProcess(String request) {
107         final String requestString = request;
108         final GPathResult requestXml = Hl7v3Utils.slurp(requestString);
109         final String processingMode = Iti55Utils.processingMode(requestXml);
110 
111         // process regular requests in a synchronous route
112         if ("I".equals(processingMode)) {
113             String response = doProcess0(requestString, requestXml);
114             configureWsaAction(Iti55PortType.REGULAR_REQUEST_OUTPUT_ACTION);
115             return response;
116         }
117 
118         else if ("D".equals(processingMode)) {
119             // check whether deferred response URI is specified
120             final String deferredResponseUri = Iti55Utils.normalizedDeferredResponseUri(requestXml);
121             if (deferredResponseUri == null) {
122                 Hl7v3Exception hl7v3Exception = new Hl7v3Exception();
123                 hl7v3Exception.setMessage("Deferred response URI is missing or not HTTP(S)");
124                 hl7v3Exception.setTypeCode("AE");
125                 hl7v3Exception.setAcknowledgementDetailCode("SYN105");
126                 hl7v3Exception.setQueryResponseCode("AE");
127                 return createNak(requestXml, hl7v3Exception);
128             }
129 
130             // determine original request message ID
131             final WrappedMessageContext messageContext = (WrappedMessageContext) new WebServiceContextImpl().getMessageContext();
132             AddressingProperties apropos = (AddressingProperties) messageContext.get(ADDRESSING_PROPERTIES_INBOUND);
133             final String requestMessageId = ((apropos != null) && (apropos.getMessageID() != null)) ?
134                     apropos.getMessageID().getValue() : null;
135             if (requestMessageId == null) {
136                 LOG.warn("Cannot determine WS-Addressing ID of the request message");
137             }
138 
139             final WsAuditDataset auditDataset = (WsAuditDataset) messageContext.getWrappedMessage()
140                     .getContextualProperty(AbstractAuditInterceptor.DATASET_CONTEXT_KEY);
141 
142             // in a separate thread: run the route, send its result synchronously
143             // to the deferred response URI, ignore all errors and ACKs
144             Runnable processRouteAndNotifyTask = () -> {
145                 // Message context is a thread local object, so we need to propagate in into
146                 // this new thread.  Note that the producer (see producerTemplate below) will
147                 // get its own message context, precisely spoken a freshly created one.
148                 WebServiceContextImpl.setMessageContext(messageContext);
149 
150                 // run the route
151                 Object result = doProcess0(requestString, requestXml);
152 
153                 // prepare and send deferred response.
154                 // NB: Camel message headers will be used in Iti55DeferredResponseProducer
155                 Exchange exchange = new DefaultExchange(camelContext);
156                 exchange.getIn().setBody(result);
157                 exchange.getIn().setHeader("iti55.deferred.requestMessageId", requestMessageId);
158                 exchange.getIn().setHeader("iti55.deferred.auditDataset", auditDataset);
159 
160                 AbstractWsEndpoint<?, ?> responseEndpoint = (AbstractWsEndpoint<?, ?>) camelContext.getEndpoint(deferredResponseUri);
161                 responseEndpoint.setAudit(endpoint.isAudit());
162 
163                 exchange = producerTemplate.send(responseEndpoint, exchange);
164                 Exception exception = Exchanges.extractException(exchange);
165                 if (exception != null) {
166                     LOG.error("Sending deferred response failed", exception);
167                 }
168            };
169 
170             executorService.submit(processRouteAndNotifyTask);
171 
172             // return an immediate MCCI ACK
173             configureWsaAction(Iti55PortType.DEFERRED_REQUEST_OUTPUT_ACTION);
174             return response(requestXml, null, "MCCI_IN000002UV01", null, false);
175         }
176 
177         else {
178             Hl7v3Exception hl7v3Exception = new Hl7v3Exception();
179             hl7v3Exception.setMessage(String.format("Unsupported processing mode '%s'", processingMode));
180             hl7v3Exception.setTypeCode("AE");
181             hl7v3Exception.setAcknowledgementDetailCode("NS250");
182             hl7v3Exception.setQueryResponseCode("AE");
183             return createNak(requestXml, hl7v3Exception);
184         }
185     }
186 
187 
188     private String doProcess0(String requestString, GPathResult requestXml) {
189         Exchange result = process(requestString);
190         Exception exception = Exchanges.extractException(result);
191         return (exception != null) ?
192             nak(exception, requestXml) :
193             Exchanges.resultMessage(result).getBody(String.class);
194     }
195 
196 
197     /**
198      * Generates an XCPD-specific NAK from the given exception.
199      * @param exception
200      *      occurred exception.
201      * @param requestXml
202      *      original request as GPath object.
203      * @return
204      *      NAK as XML string.
205      */
206     private String nak(Exception exception, GPathResult requestXml) {
207         Hl7v3Exception hl7v3Exception;
208         if (exception instanceof Hl7v3Exception) {
209             hl7v3Exception = (Hl7v3Exception) exception;
210         } else {
211             hl7v3Exception = new Hl7v3Exception();
212             hl7v3Exception.setCause(exception);
213             hl7v3Exception.setMessage(exception.getMessage());
214             hl7v3Exception.setDetectedIssueManagementCode("InternalError");
215             hl7v3Exception.setDetectedIssueManagementCodeSystem("1.3.6.1.4.1.19376.1.2.27.3");
216         }
217         return createNak(requestXml, hl7v3Exception);
218     }
219 
220 
221     /**
222      * Configures outbound WS-Addressing header "Action".
223      * @param action
224      *      WS-Addressing action.
225      */
226     private static void configureWsaAction(String action) {
227         WrappedMessageContext messageContext = (WrappedMessageContext) new WebServiceContextImpl().getMessageContext();
228         Message outMessage = messageContext.getWrappedMessage().getExchange().getOutMessage();
229 
230         // when WS-Addressing headers were missing from the beginning
231         // TODO: is this check still necessary under CXF 2.5?
232         if (outMessage == null) {
233             return;
234         }
235 
236         AddressingProperties apropos = (AddressingProperties) outMessage.get(ADDRESSING_PROPERTIES_OUTBOUND);
237         if (apropos == null) {
238             apropos = new AddressingProperties();
239             outMessage.put(ADDRESSING_PROPERTIES_OUTBOUND, apropos);
240         }
241 
242         AttributedURIType actionHolder = new AttributedURIType();
243         actionHolder.setValue(action);
244         apropos.setAction(actionHolder);
245     }
246 
247 }