1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.openehealth.ipf.platform.camel.ihe.ws;
17
18 import com.ctc.wstx.exc.WstxEOFException;
19 import org.apache.camel.Exchange;
20 import org.apache.camel.ExchangePattern;
21 import org.apache.camel.Message;
22 import org.apache.camel.RuntimeCamelException;
23 import org.apache.camel.impl.DefaultProducer;
24 import org.apache.camel.util.jsse.SSLContextParameters;
25 import org.apache.cxf.endpoint.ClientImpl;
26 import org.apache.cxf.frontend.ClientProxy;
27 import org.apache.cxf.headers.Header;
28 import org.apache.cxf.jaxws.context.WrappedMessageContext;
29 import org.apache.cxf.ws.addressing.AddressingProperties;
30 import org.apache.cxf.ws.addressing.AttributedURIType;
31 import org.apache.cxf.ws.addressing.EndpointReferenceType;
32 import org.apache.cxf.ws.addressing.JAXWSAConstants;
33 import org.openehealth.ipf.commons.ihe.ws.JaxWsClientFactory;
34 import org.openehealth.ipf.commons.ihe.ws.WsSecurityInformation;
35 import org.openehealth.ipf.commons.ihe.ws.WsTransactionConfiguration;
36 import org.openehealth.ipf.commons.ihe.ws.correlation.AsynchronyCorrelator;
37 import org.openehealth.ipf.commons.ihe.ws.cxf.audit.WsAuditDataset;
38 import org.openehealth.ipf.platform.camel.core.util.Exchanges;
39 import org.openehealth.ipf.platform.camel.ihe.core.AmbiguousBeanException;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import javax.net.ssl.SSLContext;
44 import javax.xml.ws.BindingProvider;
45 import javax.xml.ws.soap.SOAPFaultException;
46 import java.util.Map;
47 import java.util.UUID;
48
49 import static java.util.Objects.requireNonNull;
50 import static org.openehealth.ipf.platform.camel.ihe.ws.HeaderUtils.processIncomingHeaders;
51 import static org.openehealth.ipf.platform.camel.ihe.ws.HeaderUtils.processUserDefinedOutgoingHeaders;
52
53
54
55
56
57
58
59
60
61 public abstract class AbstractWsProducer<
62 AuditDatasetType extends WsAuditDataset,
63 ConfigType extends WsTransactionConfiguration<AuditDatasetType>, InType, OutType> extends DefaultProducer {
64 private static final Logger LOG = LoggerFactory.getLogger(AbstractWsProducer.class);
65
66 private final JaxWsClientFactory<AuditDatasetType> clientFactory;
67 private final Class<InType> requestClass;
68 private final Class<OutType> responseClass;
69
70
71
72
73
74
75
76
77
78 @SuppressWarnings("unchecked")
79 public AbstractWsProducer(
80 AbstractWsEndpoint<AuditDatasetType, ConfigType> endpoint,
81 JaxWsClientFactory<AuditDatasetType> clientFactory,
82 Class<InType> requestClass,
83 Class<OutType> responseClass) {
84 super(endpoint);
85
86 this.clientFactory = requireNonNull(clientFactory, "client factory cannot be null");
87 this.requestClass = requireNonNull(requestClass, "request class cannot be null");
88 this.responseClass = requireNonNull(responseClass, "responseClass class cannot be null");
89 }
90
91
92 @Override
93 public void process(Exchange exchange) throws Exception {
94
95 InType body = exchange.getIn().getMandatoryBody(requestClass);
96 Object client = getClient();
97 configureClient(client);
98 BindingProvider bindingProvider = (BindingProvider) client;
99 WrappedMessageContext requestContext = (WrappedMessageContext) bindingProvider.getRequestContext();
100 cleanRequestContext(requestContext);
101
102 enrichRequestContext(exchange, requestContext);
103 processUserDefinedOutgoingHeaders(requestContext, exchange.getIn(), true);
104
105
106 String requestEncoding = exchange.getProperty(Exchange.CHARSET_NAME, String.class);
107 if (requestEncoding != null) {
108 requestContext.put(org.apache.cxf.message.Message.ENCODING, requestEncoding);
109 }
110
111
112 String replyToHeader = exchange.getIn().getHeader(AbstractWsEndpoint.WSA_REPLYTO_HEADER_NAME, String.class);
113 replyToHeader = replyToHeader != null ? replyToHeader.trim() : null;
114 String replyToUri =
115 getWsTransactionConfiguration().isAllowAsynchrony()
116 ? (replyToHeader == null || replyToHeader.isEmpty() ? null : replyToHeader)
117 : null;
118
119
120 if ((replyToUri != null) ||
121 Boolean.TRUE.equals(requestContext.get(AsynchronyCorrelator.FORCE_CORRELATION))) {
122 String messageId = "urn:uuid:" + UUID.randomUUID().toString();
123 configureWSAHeaders(messageId, replyToUri, requestContext);
124
125 AsynchronyCorrelator<AuditDatasetType> correlator = getEndpoint().getCorrelator();
126 correlator.storeServiceEndpointUri(messageId, getEndpoint().getEndpointUri());
127
128 String correlationKey = exchange.getIn().getHeader(
129 AbstractWsEndpoint.CORRELATION_KEY_HEADER_NAME,
130 String.class);
131 if (correlationKey != null) {
132 correlator.storeCorrelationKey(messageId, correlationKey);
133 }
134
135 String[] alternativeKeys = getAlternativeRequestKeys(exchange);
136 if (alternativeKeys != null) {
137 correlator.storeAlternativeKeys(messageId, alternativeKeys);
138 }
139 }
140
141
142 exchange.setPattern((replyToUri == null) ? ExchangePattern.InOut : ExchangePattern.InOnly);
143 OutType result = null;
144 try {
145
146 result = responseClass.cast(callService(client, body));
147 } catch (SOAPFaultException fault) {
148
149
150 if ((replyToUri == null) ||
151 (fault.getCause() == null) ||
152 !(fault.getCause() instanceof WstxEOFException)) {
153 throw fault;
154 }
155 }
156
157
158
159
160 if (replyToUri == null) {
161 Message responseMessage = Exchanges.resultMessage(exchange);
162 responseMessage.getHeaders().putAll(exchange.getIn().getHeaders());
163 WrappedMessageContext responseContext = (WrappedMessageContext) bindingProvider.getResponseContext();
164 processIncomingHeaders(responseContext, responseMessage);
165 enrichResponseMessage(responseMessage, responseContext);
166
167
168 exchange.setProperty(Exchange.CHARSET_NAME,
169 responseContext.get(org.apache.cxf.message.Message.ENCODING));
170 responseMessage.setBody(result, responseClass);
171 }
172 }
173
174
175
176
177
178 protected abstract OutType callService(Object client, InType body) throws Exception;
179
180
181
182
183
184
185 protected void enrichRequestContext(Exchange exchange, WrappedMessageContext requestContext) {
186
187 }
188
189
190
191
192
193
194
195
196
197
198
199
200
201 protected String[] getAlternativeRequestKeys(Exchange exchange) {
202 return null;
203 }
204
205
206
207
208
209 protected void enrichResponseMessage(Message message, WrappedMessageContext responseContext) {
210
211 }
212
213
214 @Override
215 public AbstractWsEndpoint<AuditDatasetType, ConfigType> getEndpoint() {
216 return (AbstractWsEndpoint<AuditDatasetType, ConfigType>) super.getEndpoint();
217 }
218
219
220
221
222
223 protected void configureClient(Object o) {
224 ClientImpl client = (ClientImpl) ClientProxy.getClient(o);
225 client.setThreadLocalRequestContext(true);
226 client.setSynchronousTimeout(Integer.MAX_VALUE);
227 }
228
229
230
231
232 protected void cleanRequestContext(WrappedMessageContext requestContext) {
233 requestContext.remove(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES);
234 requestContext.remove(org.apache.cxf.message.Message.PROTOCOL_HEADERS);
235 requestContext.remove(Header.HEADER_LIST);
236 }
237
238
239
240
241
242
243 private static void configureWSAHeaders(String messageId, String replyToUri, WrappedMessageContext context) {
244
245 AddressingProperties apropos = (AddressingProperties) context.get(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES);
246 if (apropos == null) {
247 apropos = new AddressingProperties();
248 context.put(JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES, apropos);
249 }
250
251
252 AttributedURIType uri = new AttributedURIType();
253 uri.setValue(messageId);
254 apropos.setMessageID(uri);
255 LOG.debug("Set WS-Addressing message ID: {}", messageId);
256
257
258 if (replyToUri != null) {
259 AttributedURIType uri2 = new AttributedURIType();
260 uri2.setValue(replyToUri);
261 EndpointReferenceType endpointReference = new EndpointReferenceType();
262 endpointReference.setAddress(uri2);
263 apropos.setReplyTo(endpointReference);
264 }
265 }
266
267
268
269
270
271
272
273
274
275 protected Object getClient() {
276 return clientFactory.getClient(() -> {
277
278 SSLContextParameters sslContextParameters = getEndpoint().getSslContextParameters();
279 if (sslContextParameters == null && getEndpoint().isSecure()) {
280 Map<String, SSLContextParameters> sslContextParameterMap = getEndpoint().getCamelContext().getRegistry().findByTypeWithName(SSLContextParameters.class);
281 if (sslContextParameterMap.size() == 1) {
282 Map.Entry<String, SSLContextParameters> entry = sslContextParameterMap.entrySet().iterator().next();
283 sslContextParameters = entry.getValue();
284 } else if (sslContextParameterMap.size() > 1) {
285 throw new AmbiguousBeanException(SSLContextParameters.class);
286 }
287 }
288 try {
289 SSLContext sslContext = sslContextParameters != null ?
290 sslContextParameters.createSSLContext(getEndpoint().getCamelContext()) :
291 null;
292 return new WsSecurityInformation(getEndpoint().isSecure(), sslContext, getEndpoint().getHostnameVerifier(),
293 getEndpoint().getUsername(), getEndpoint().getPassword());
294 } catch (Exception e) {
295 throw new RuntimeCamelException(e);
296 }
297 });
298 }
299
300
301
302
303
304 public WsTransactionConfiguration getWsTransactionConfiguration() {
305 return clientFactory.getWsTransactionConfiguration();
306 }
307
308
309 public Class<InType> getRequestClass() {
310 return requestClass;
311 }
312
313 public Class<OutType> getResponseClass() {
314 return responseClass;
315 }
316
317 }