View Javadoc
1   /*
2    * Copyright 2016 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *       http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.openehealth.ipf.platform.camel.ihe.fhir.core;
18  
19  import ca.uhn.fhir.context.FhirContext;
20  import ca.uhn.fhir.rest.api.MethodOutcome;
21  import ca.uhn.fhir.rest.api.server.IBundleProvider;
22  import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
23  import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
24  import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
25  import org.apache.camel.Exchange;
26  import org.apache.camel.Message;
27  import org.apache.camel.Processor;
28  import org.apache.camel.SuspendableService;
29  import org.apache.camel.impl.DefaultConsumer;
30  import org.hl7.fhir.instance.model.api.IBaseBundle;
31  import org.hl7.fhir.instance.model.api.IBaseResource;
32  import org.hl7.fhir.instance.model.api.IIdType;
33  import org.openehealth.ipf.commons.ihe.fhir.*;
34  import org.openehealth.ipf.commons.ihe.fhir.audit.FhirAuditDataset;
35  import org.openehealth.ipf.platform.camel.core.util.Exchanges;
36  
37  import java.util.List;
38  import java.util.Map;
39  
40  import static org.openehealth.ipf.commons.ihe.fhir.Constants.FHIR_REQUEST_SIZE_ONLY;
41  
42  /**
43   * FHIR consumer, which is an implementation of a {@link RequestConsumer} that handles requests
44   * by sending the request data and parameters into a Camel route and returning the result of
45   * the route processing.
46   *
47   * @author Christian Ohr
48   * @since 3.1
49   */
50  public class FhirConsumer<AuditDatasetType extends FhirAuditDataset> extends DefaultConsumer
51          implements SuspendableService, RequestConsumer {
52  
53      private FhirContext fhirContext;
54  
55      public FhirConsumer(FhirEndpoint<AuditDatasetType, ? extends FhirComponent<AuditDatasetType>> endpoint, Processor processor) {
56          super(endpoint, processor);
57          fhirContext = endpoint.getInterceptableConfiguration().getContext();
58      }
59  
60      @Override
61      public String getName() {
62          return getEndpoint().getId();
63      }
64  
65      @Override
66      public FhirContext getFhirContext() {
67          return fhirContext;
68      }
69  
70      @Override
71      protected void doStart() throws Exception {
72          super.doStart();
73          getEndpoint().connect(this);
74      }
75  
76      @Override
77      protected void doStop() throws Exception {
78          getEndpoint().disconnect(this);
79          super.doStop();
80      }
81  
82      @Override
83      public FhirEndpoint<AuditDatasetType, FhirComponent<AuditDatasetType>> getEndpoint() {
84          return (FhirEndpoint<AuditDatasetType, FhirComponent<AuditDatasetType>>) super.getEndpoint();
85      }
86  
87      /**
88       * This method can be called by {@link ca.uhn.fhir.rest.server.IResourceProvider} objects to send the received
89       * (and potentially handled) request further down a Camel route.
90       *
91       * @param payload     FHIR request content
92       * @param headers     headers
93       * @param resultClass class of the result resource
94       * @param <R>         Resource type being returned
95       * @return result of processing the FHIR request in Camel
96       */
97      @Override
98      public final <R extends IBaseResource> R handleResourceRequest(Object payload, Map<String, Object> headers, Class<R> resultClass) {
99          Object result = handleInRoute(payload, headers, resultClass);
100         if (result == null) return null;
101         if (resultClass.isAssignableFrom(result.getClass())) {
102             return resultClass.cast(result);
103         } else {
104             throw new IllegalArgumentException("Expected a resource of type " + resultClass.getName() +
105                     " or a list thereof, but was " + result.getClass());
106         }
107     }
108 
109     /**
110      * @param payload request payload
111      * @param headers request parameters, e.g. search parameters
112      * @param <R> resource type
113      * @return list of resources to be packaged into a bundle
114      */
115     @Override
116     public <R extends IBaseResource> List<R> handleBundleRequest(Object payload, Map<String, Object> headers) {
117         return (List<R>) handleInRoute(payload, headers, List.class);
118     }
119 
120     @Override
121     public IBundleProvider handleBundleProviderRequest(Object payload, Map<String, Object> headers) {
122         return getBundleProvider(payload, headers);
123     }
124 
125     @Override
126     public <T extends IBaseBundle> T handleTransactionRequest(Object payload, Map<String, Object> headers, Class<T> bundleClass) {
127         return handleInRoute(payload, headers, bundleClass);
128     }
129 
130     @Override
131     public MethodOutcome handleAction(Object payload, Map<String, Object> headers) {
132         return handleInRoute(payload, headers, MethodOutcome.class);
133     }
134 
135     @Override
136     public int handleSizeRequest(Object payload, Map<String, Object> headers) {
137         Exchange exchange = runRoute(payload, headers);
138         Message resultMessage = Exchanges.resultMessage(exchange);
139         Integer size = resultMessage.getHeader(FHIR_REQUEST_SIZE_ONLY, Integer.class);
140         if (size == null) {
141             throw new InternalErrorException("Server did not obtain result size");
142         }
143         return size;
144     }
145 
146     @Override
147     public boolean supportsLazyLoading() {
148         return getEndpoint().getInterceptableConfiguration().isLazyLoadBundles();
149     }
150 
151     /**
152      * Forwards the request to be handled into a Camel route
153      *
154      * @param payload     request payload, will become the Camel message body
155      * @param headers     request parameters, will be added to the Camel headers
156      * @param resultClass expected body type to be returned
157      * @return request result, type-converted into the required result class
158      */
159     protected <T> T handleInRoute(Object payload, Map<String, Object> headers, Class<T> resultClass) {
160         Exchange exchange = runRoute(payload, headers);
161         Message resultMessage = Exchanges.resultMessage(exchange);
162         if (resultMessage.getBody() instanceof List && IBaseResource.class.isAssignableFrom(resultClass)) {
163             List<T> singletonList = (List<T>)resultMessage.getBody();
164             if (singletonList.isEmpty() && payload instanceof IIdType) {
165                 throw new ResourceNotFoundException((IIdType)payload);
166             }
167             resultMessage.setBody(singletonList.isEmpty() ? null : singletonList.get(0));
168         }
169         return getEndpoint().getCamelContext().getTypeConverter().convertTo(resultClass, exchange, resultMessage.getBody());
170     }
171 
172     protected Exchange runRoute(Object payload, Map<String, Object> headers) {
173         Exchange exchange = getEndpoint().createExchange();
174         exchange.getIn().setBody(payload);
175         if (headers != null) {
176             exchange.getIn().getHeaders().putAll(headers);
177         }
178 
179         // Add the FHIR context and InteractionID as header
180         exchange.getIn().setHeader(Constants.FHIR_CONTEXT,
181                 getEndpoint().getInterceptableConfiguration().getContext());
182         exchange.getIn().setHeader(org.openehealth.ipf.commons.ihe.core.Constants.INTERACTION_ID_NAME,
183                 getEndpoint().getInterceptableComponent().getInteractionId());
184 
185         try {
186             createUoW(exchange);
187             getProcessor().process(exchange);
188         } catch (Exception e) {
189             getExceptionHandler().handleException(e);
190         } finally {
191             doneUoW(exchange);
192         }
193 
194         // If the exchange has failed, throw the exception back into the servlet
195         // TODO care about auditing?
196         if (exchange.isFailed()) {
197             BaseServerResponseException e = exchange.getException(BaseServerResponseException.class);
198             throw (e != null) ? e : new InternalErrorException("Unexpected server error", exchange.getException());
199         }
200         return exchange;
201     }
202 
203 
204     /**
205      * Returns the {@link IBundleProvider}, providing the matching bundles.
206      * Depending on {@link FhirEndpointConfiguration#isLazyLoadBundles()}, the bundle provider either eagerly fetches all
207      * matching bundles or fetches the requested subset on request.
208      *
209      * @param payload request payload
210      * @param headers request headers
211      * @return resulting bundle provider
212      */
213     protected IBundleProvider getBundleProvider(Object payload, Map<String, Object> headers) {
214         FhirEndpointConfiguration<?> endpointConfiguration = getEndpoint().getInterceptableConfiguration();
215         return supportsLazyLoading() ?
216                 new LazyBundleProvider(this, endpointConfiguration.isCacheBundles(), payload, headers) :
217                 new EagerBundleProvider(this, payload, headers);
218     }
219 }