View Javadoc
1   /*
2    * Copyright 2009 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.ihe.ws;
17  
18  import com.ctc.wstx.exc.WstxEOFException;
19  import org.apache.camel.Exchange;
20  import org.apache.camel.ExchangePattern;
21  import org.apache.camel.Message;
22  import org.apache.camel.RuntimeCamelException;
23  import org.apache.camel.impl.DefaultProducer;
24  import org.apache.camel.util.jsse.SSLContextParameters;
25  import org.apache.cxf.endpoint.ClientImpl;
26  import org.apache.cxf.frontend.ClientProxy;
27  import org.apache.cxf.headers.Header;
28  import org.apache.cxf.jaxws.context.WrappedMessageContext;
29  import org.apache.cxf.ws.addressing.AddressingProperties;
30  import org.apache.cxf.ws.addressing.AttributedURIType;
31  import org.apache.cxf.ws.addressing.EndpointReferenceType;
32  import org.apache.cxf.ws.addressing.JAXWSAConstants;
33  import org.openehealth.ipf.commons.ihe.ws.JaxWsClientFactory;
34  import org.openehealth.ipf.commons.ihe.ws.WsSecurityInformation;
35  import org.openehealth.ipf.commons.ihe.ws.WsTransactionConfiguration;
36  import org.openehealth.ipf.commons.ihe.ws.correlation.AsynchronyCorrelator;
37  import org.openehealth.ipf.commons.ihe.ws.cxf.audit.WsAuditDataset;
38  import org.openehealth.ipf.platform.camel.core.util.Exchanges;
39  import org.openehealth.ipf.platform.camel.ihe.core.AmbiguousBeanException;
40  import org.slf4j.Logger;
41  import org.slf4j.LoggerFactory;
42  
43  import javax.net.ssl.SSLContext;
44  import javax.xml.ws.BindingProvider;
45  import javax.xml.ws.soap.SOAPFaultException;
46  import java.util.Map;
47  import java.util.UUID;
48  
49  import static java.util.Objects.requireNonNull;
50  import static org.openehealth.ipf.platform.camel.ihe.ws.HeaderUtils.processIncomingHeaders;
51  import static org.openehealth.ipf.platform.camel.ihe.ws.HeaderUtils.processUserDefinedOutgoingHeaders;
52  
53  /**
54   * Camel producer used to make calls to a Web Service.
55   *
56   * @param <InType>  type of input data (i.e. of the data got from the route).
57   * @param <OutType> type of output data (i.e. of the data returned to the route).
58   * @author Jens Riemschneider
59   * @author Dmytro Rud
60   */
61  public abstract class AbstractWsProducer<
62          AuditDatasetType extends WsAuditDataset,
63          ConfigType extends WsTransactionConfiguration<AuditDatasetType>, InType, OutType> extends DefaultProducer {
64      private static final Logger LOG = LoggerFactory.getLogger(AbstractWsProducer.class);
65  
66      private final JaxWsClientFactory<AuditDatasetType> clientFactory;
67      private final Class<InType> requestClass;
68      private final Class<OutType> responseClass;
69  
70  
71      /**
72       * Constructs the producer.
73       *
74       * @param endpoint      the endpoint that creates this producer.
75       * @param clientFactory the factory for clients to produce messages for the service.
76       * @param requestClass  type of request messages.
77       */
78      @SuppressWarnings("unchecked")
79      public AbstractWsProducer(
80              AbstractWsEndpoint<AuditDatasetType, ConfigType> endpoint,
81              JaxWsClientFactory<AuditDatasetType> clientFactory,
82              Class<InType> requestClass,
83              Class<OutType> responseClass) {
84          super(endpoint);
85  
86          this.clientFactory = requireNonNull(clientFactory, "client factory cannot be null");
87          this.requestClass = requireNonNull(requestClass, "request class cannot be null");
88          this.responseClass = requireNonNull(responseClass, "responseClass class cannot be null");
89      }
90  
91  
92      @Override
93      public void process(Exchange exchange) throws Exception {
94          // prepare
95          InType body = exchange.getIn().getMandatoryBody(requestClass);
96          Object client = getClient();
97          configureClient(client);
98          BindingProvider bindingProvider = (BindingProvider) client;
99          WrappedMessageContext requestContext = (WrappedMessageContext) bindingProvider.getRequestContext();
100         cleanRequestContext(requestContext);
101 
102         enrichRequestContext(exchange, requestContext);
103         processUserDefinedOutgoingHeaders(requestContext, exchange.getIn(), true);
104 
105         // set request encoding based on Camel exchange property
106         String requestEncoding = exchange.getProperty(Exchange.CHARSET_NAME, String.class);
107         if (requestEncoding != null) {
108             requestContext.put(org.apache.cxf.message.Message.ENCODING, requestEncoding);
109         }
110 
111         // get and analyse WS-Addressing asynchrony configuration
112         String replyToHeader = exchange.getIn().getHeader(AbstractWsEndpoint.WSA_REPLYTO_HEADER_NAME, String.class);
113         replyToHeader = replyToHeader != null ? replyToHeader.trim() : null;
114         String replyToUri =
115                 getWsTransactionConfiguration().isAllowAsynchrony()
116                         ? (replyToHeader == null || replyToHeader.isEmpty() ? null : replyToHeader)
117                         : null;
118 
119         // for asynchronous interaction: configure WSA headers and store correlation data
120         if ((replyToUri != null) ||
121                 Boolean.TRUE.equals(requestContext.get(AsynchronyCorrelator.FORCE_CORRELATION))) {
122             String messageId = "urn:uuid:" + UUID.randomUUID().toString();
123             configureWSAHeaders(messageId, replyToUri, requestContext);
124 
125             AsynchronyCorrelator<AuditDatasetType> correlator = getEndpoint().getCorrelator();
126             correlator.storeServiceEndpointUri(messageId, getEndpoint().getEndpointUri());
127 
128             String correlationKey = exchange.getIn().getHeader(
129                     AbstractWsEndpoint.CORRELATION_KEY_HEADER_NAME,
130                     String.class);
131             if (correlationKey != null) {
132                 correlator.storeCorrelationKey(messageId, correlationKey);
133             }
134 
135             String[] alternativeKeys = getAlternativeRequestKeys(exchange);
136             if (alternativeKeys != null) {
137                 correlator.storeAlternativeKeys(messageId, alternativeKeys);
138             }
139         }
140 
141         // invoke
142         exchange.setPattern((replyToUri == null) ? ExchangePattern.InOut : ExchangePattern.InOnly);
143         OutType result = null;
144         try {
145             // normalize response type when called via reflection or similar non-type-safe mechanisms
146             result = responseClass.cast(callService(client, body));
147         } catch (SOAPFaultException fault) {
148             // handle http://www.w3.org/TR/2006/NOTE-soap11-ror-httpbinding-20060321/
149             // see also: https://issues.apache.org/jira/browse/CXF-3768
150             if ((replyToUri == null) ||
151                     (fault.getCause() == null) ||
152                     !(fault.getCause() instanceof WstxEOFException)) {
153                 throw fault;
154             }
155         }
156 
157         // for synchronous interaction (replyToUri == null): handle response.
158         // (async responses are handled in the service instance derived from 
159         // org.openehealth.ipf.platform.camel.ihe.ws.AbstractAsyncResponseWebService)
160         if (replyToUri == null) {
161             Message responseMessage = Exchanges.resultMessage(exchange);
162             responseMessage.getHeaders().putAll(exchange.getIn().getHeaders());
163             WrappedMessageContext responseContext = (WrappedMessageContext) bindingProvider.getResponseContext();
164             processIncomingHeaders(responseContext, responseMessage);
165             enrichResponseMessage(responseMessage, responseContext);
166 
167             // set Camel exchange property based on response encoding
168             exchange.setProperty(Exchange.CHARSET_NAME,
169                     responseContext.get(org.apache.cxf.message.Message.ENCODING));
170             responseMessage.setBody(result, responseClass);
171         }
172     }
173 
174 
175     /**
176      * Sends the given request body to a Web Service via the given client proxy.
177      */
178     protected abstract OutType callService(Object client, InType body) throws Exception;
179 
180 
181     /**
182      * Enriches the given Web Service request context
183      * on the basis of the given Camel exchange, and vice versa.
184      */
185     protected void enrichRequestContext(Exchange exchange, WrappedMessageContext requestContext) {
186         // does nothing per default
187     }
188 
189 
190     /**
191      * Determines the set of correlation keys for the request message contained
192      * in the given exchange, which are alternative to the WS-Addressing message ID.
193      * An example of alternative key is the query ID in HL7v3-based transactions.
194      * <p>
195      * Per default, this method returns <code>null</code>.
196      *
197      * @param exchange Camel exchange containing a request message.
198      * @return A non-empty collection of non-<code>null</code> alternative keys,
199      * or <code>null</code>, when no keys could have been extracted.
200      */
201     protected String[] getAlternativeRequestKeys(Exchange exchange) {
202         return null;
203     }
204 
205 
206     /**
207      * Enriches the given response message from the Web Service request context data.
208      */
209     protected void enrichResponseMessage(Message message, WrappedMessageContext responseContext) {
210         // does nothing per default
211     }
212 
213 
214     @Override
215     public AbstractWsEndpoint<AuditDatasetType, ConfigType> getEndpoint() {
216         return (AbstractWsEndpoint<AuditDatasetType, ConfigType>) super.getEndpoint();
217     }
218 
219 
220     /**
221      * Sets thread safety & timeout options of the given CXF client.
222      */
223     protected void configureClient(Object o) {
224         ClientImpl client = (ClientImpl) ClientProxy.getClient(o);
225         client.setThreadLocalRequestContext(true);
226         client.setSynchronousTimeout(Integer.MAX_VALUE);
227     }
228 
229     /**
230      * Request context is shared among subsequent requests, so we have to clean it.
231      */
232     protected void cleanRequestContext(WrappedMessageContext requestContext) {
233         requestContext.remove(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES);
234         requestContext.remove(org.apache.cxf.message.Message.PROTOCOL_HEADERS);
235         requestContext.remove(Header.HEADER_LIST);
236     }
237 
238 
239     /**
240      * Initializes WS-Addressing headers MessageID and ReplyTo,
241      * and stores them into the given message context.
242      */
243     private static void configureWSAHeaders(String messageId, String replyToUri, WrappedMessageContext context) {
244         // obtain headers' container
245         AddressingProperties apropos = (AddressingProperties) context.get(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES);
246         if (apropos == null) {
247             apropos = new AddressingProperties();
248             context.put(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES, apropos);
249         }
250 
251         // MessageID header
252         AttributedURIType uri = new AttributedURIType();
253         uri.setValue(messageId);
254         apropos.setMessageID(uri);
255         LOG.debug("Set WS-Addressing message ID: {}", messageId);
256 
257         // ReplyTo header
258         if (replyToUri != null) {
259             AttributedURIType uri2 = new AttributedURIType();
260             uri2.setValue(replyToUri);
261             EndpointReferenceType endpointReference = new EndpointReferenceType();
262             endpointReference.setAddress(uri2);
263             apropos.setReplyTo(endpointReference);
264         }
265     }
266 
267     /**
268      * Retrieves the client stub for the Web Service.
269      * <p>
270      * This method caches the client stub instance and therefore requires thread
271      * synchronization.
272      *
273      * @return the client stub.
274      */
275     protected Object getClient() {
276         return clientFactory.getClient(() -> {
277             // Only supply SSLContext if client has not been configured yet
278             SSLContextParameters sslContextParameters = getEndpoint().getSslContextParameters();
279             if (sslContextParameters == null && getEndpoint().isSecure()) {
280                 Map<String, SSLContextParameters> sslContextParameterMap = getEndpoint().getCamelContext().getRegistry().findByTypeWithName(SSLContextParameters.class);
281                 if (sslContextParameterMap.size() == 1) {
282                     Map.Entry<String, SSLContextParameters> entry = sslContextParameterMap.entrySet().iterator().next();
283                     sslContextParameters = entry.getValue();
284                 } else if (sslContextParameterMap.size() > 1) {
285                     throw new AmbiguousBeanException(SSLContextParameters.class);
286                 }
287             }
288             try {
289                 SSLContext sslContext = sslContextParameters != null ?
290                         sslContextParameters.createSSLContext(getEndpoint().getCamelContext()) :
291                         null;
292                 return new WsSecurityInformation(getEndpoint().isSecure(), sslContext, getEndpoint().getHostnameVerifier(),
293                         getEndpoint().getUsername(), getEndpoint().getPassword());
294             } catch (Exception e) {
295                 throw new RuntimeCamelException(e);
296             }
297         });
298     }
299 
300 
301     /**
302      * @return the info describing the Web Service.
303      */
304     public WsTransactionConfiguration getWsTransactionConfiguration() {
305         return clientFactory.getWsTransactionConfiguration();
306     }
307 
308 
309     public Class<InType> getRequestClass() {
310         return requestClass;
311     }
312 
313     public Class<OutType> getResponseClass() {
314         return responseClass;
315     }
316 
317 }