1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.openehealth.ipf.platform.camel.ihe.fhir.core;
18
19 import ca.uhn.fhir.context.FhirContext;
20 import ca.uhn.fhir.rest.api.MethodOutcome;
21 import ca.uhn.fhir.rest.api.server.IBundleProvider;
22 import ca.uhn.fhir.rest.server.exceptions.BaseServerResponseException;
23 import ca.uhn.fhir.rest.server.exceptions.InternalErrorException;
24 import ca.uhn.fhir.rest.server.exceptions.ResourceNotFoundException;
25 import org.apache.camel.Exchange;
26 import org.apache.camel.Message;
27 import org.apache.camel.Processor;
28 import org.apache.camel.SuspendableService;
29 import org.apache.camel.impl.DefaultConsumer;
30 import org.hl7.fhir.instance.model.api.IBaseBundle;
31 import org.hl7.fhir.instance.model.api.IBaseResource;
32 import org.hl7.fhir.instance.model.api.IIdType;
33 import org.openehealth.ipf.commons.ihe.fhir.*;
34 import org.openehealth.ipf.commons.ihe.fhir.audit.FhirAuditDataset;
35 import org.openehealth.ipf.platform.camel.core.util.Exchanges;
36
37 import java.util.List;
38 import java.util.Map;
39
40 import static org.openehealth.ipf.commons.ihe.fhir.Constants.FHIR_REQUEST_SIZE_ONLY;
41
42
43
44
45
46
47
48
49
50 public class FhirConsumer<AuditDatasetType extends FhirAuditDataset> extends DefaultConsumer
51 implements SuspendableService, RequestConsumer {
52
53 private FhirContext fhirContext;
54
55 public FhirConsumer(FhirEndpoint<AuditDatasetType, ? extends FhirComponent<AuditDatasetType>> endpoint, Processor processor) {
56 super(endpoint, processor);
57 fhirContext = endpoint.getInterceptableConfiguration().getContext();
58 }
59
60 @Override
61 public String getName() {
62 return getEndpoint().getId();
63 }
64
65 @Override
66 public FhirContext getFhirContext() {
67 return fhirContext;
68 }
69
70 @Override
71 protected void doStart() throws Exception {
72 super.doStart();
73 getEndpoint().connect(this);
74 }
75
76 @Override
77 protected void doStop() throws Exception {
78 getEndpoint().disconnect(this);
79 super.doStop();
80 }
81
82 @Override
83 public FhirEndpoint<AuditDatasetType, FhirComponent<AuditDatasetType>> getEndpoint() {
84 return (FhirEndpoint<AuditDatasetType, FhirComponent<AuditDatasetType>>) super.getEndpoint();
85 }
86
87
88
89
90
91
92
93
94
95
96
97 @Override
98 public final <R extends IBaseResource> R handleResourceRequest(Object payload, Map<String, Object> headers, Class<R> resultClass) {
99 Object result = handleInRoute(payload, headers, resultClass);
100 if (result == null) return null;
101 if (resultClass.isAssignableFrom(result.getClass())) {
102 return resultClass.cast(result);
103 } else {
104 throw new IllegalArgumentException("Expected a resource of type " + resultClass.getName() +
105 " or a list thereof, but was " + result.getClass());
106 }
107 }
108
109
110
111
112
113
114
115 @Override
116 public <R extends IBaseResource> List<R> handleBundleRequest(Object payload, Map<String, Object> headers) {
117 return (List<R>) handleInRoute(payload, headers, List.class);
118 }
119
120 @Override
121 public IBundleProvider handleBundleProviderRequest(Object payload, Map<String, Object> headers) {
122 return getBundleProvider(payload, headers);
123 }
124
125 @Override
126 public <T extends IBaseBundle> T handleTransactionRequest(Object payload, Map<String, Object> headers, Class<T> bundleClass) {
127 return handleInRoute(payload, headers, bundleClass);
128 }
129
130 @Override
131 public MethodOutcome handleAction(Object payload, Map<String, Object> headers) {
132 return handleInRoute(payload, headers, MethodOutcome.class);
133 }
134
135 @Override
136 public int handleSizeRequest(Object payload, Map<String, Object> headers) {
137 Exchange exchange = runRoute(payload, headers);
138 Message resultMessage = Exchanges.resultMessage(exchange);
139 Integer size = resultMessage.getHeader(FHIR_REQUEST_SIZE_ONLY, Integer.class);
140 if (size == null) {
141 throw new InternalErrorException("Server did not obtain result size");
142 }
143 return size;
144 }
145
146 @Override
147 public boolean supportsLazyLoading() {
148 return getEndpoint().getInterceptableConfiguration().isLazyLoadBundles();
149 }
150
151
152
153
154
155
156
157
158
159 protected <T> T handleInRoute(Object payload, Map<String, Object> headers, Class<T> resultClass) {
160 Exchange exchange = runRoute(payload, headers);
161 Message resultMessage = Exchanges.resultMessage(exchange);
162 if (resultMessage.getBody() instanceof List && IBaseResource.class.isAssignableFrom(resultClass)) {
163 List<T> singletonList = (List<T>)resultMessage.getBody();
164 if (singletonList.isEmpty() && payload instanceof IIdType) {
165 throw new ResourceNotFoundException((IIdType)payload);
166 }
167 resultMessage.setBody(singletonList.isEmpty() ? null : singletonList.get(0));
168 }
169 return getEndpoint().getCamelContext().getTypeConverter().convertTo(resultClass, exchange, resultMessage.getBody());
170 }
171
172 protected Exchange runRoute(Object payload, Map<String, Object> headers) {
173 Exchange exchange = getEndpoint().createExchange();
174 exchange.getIn().setBody(payload);
175 if (headers != null) {
176 exchange.getIn().getHeaders().putAll(headers);
177 }
178
179
180 exchange.getIn().setHeader(Constants.FHIR_CONTEXT,
181 getEndpoint().getInterceptableConfiguration().getContext());
182 exchange.getIn().setHeader(org.openehealth.ipf.commons.ihe.core.Constants.INTERACTION_ID_NAME,
183 getEndpoint().getInterceptableComponent().getInteractionId());
184
185 try {
186 createUoW(exchange);
187 getProcessor().process(exchange);
188 } catch (Exception e) {
189 getExceptionHandler().handleException(e);
190 } finally {
191 doneUoW(exchange);
192 }
193
194
195
196 if (exchange.isFailed()) {
197 BaseServerResponseException e = exchange.getException(BaseServerResponseException.class);
198 throw (e != null) ? e : new InternalErrorException("Unexpected server error", exchange.getException());
199 }
200 return exchange;
201 }
202
203
204
205
206
207
208
209
210
211
212
213 protected IBundleProvider getBundleProvider(Object payload, Map<String, Object> headers) {
214 FhirEndpointConfiguration<?> endpointConfiguration = getEndpoint().getInterceptableConfiguration();
215 return supportsLazyLoading() ?
216 new LazyBundleProvider(this, endpointConfiguration.isCacheBundles(), payload, headers) :
217 new EagerBundleProvider(this, payload, headers);
218 }
219 }