1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.consumer;
17
18 import ca.uhn.hl7v2.model.Message;
19 import ca.uhn.hl7v2.model.Segment;
20 import ca.uhn.hl7v2.parser.Parser;
21 import ca.uhn.hl7v2.util.Terser;
22 import org.apache.camel.Exchange;
23 import org.openehealth.ipf.commons.ihe.hl7v2.Constants;
24 import org.openehealth.ipf.commons.ihe.hl7v2.Hl7v2TransactionConfiguration;
25 import org.openehealth.ipf.commons.ihe.hl7v2.storage.InteractiveContinuationStorage;
26 import org.openehealth.ipf.modules.hl7.message.MessageUtils;
27 import org.openehealth.ipf.platform.camel.core.util.Exchanges;
28 import org.openehealth.ipf.platform.camel.ihe.core.InterceptorSupport;
29 import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpTransactionEndpoint;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import java.util.ArrayList;
34 import java.util.List;
35
36 import static java.util.Objects.requireNonNull;
37 import static org.apache.commons.lang3.StringUtils.isEmpty;
38 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
39 import static org.openehealth.ipf.platform.camel.ihe.mllp.core.FragmentationUtils.*;
40
41
42
43
44
45
46
47 public class ConsumerInteractiveResponseSenderInterceptor extends InterceptorSupport<MllpTransactionEndpoint<?>> {
48 private static final transient Logger LOG = LoggerFactory.getLogger(ConsumerInteractiveResponseSenderInterceptor.class);
49 private InteractiveContinuationStorage storage;
50
51
52 @Override
53 public void setEndpoint(MllpTransactionEndpoint<?> endpoint) {
54 super.setEndpoint(endpoint);
55 this.storage = requireNonNull(getEndpoint().getInteractiveContinuationStorage());
56 }
57
58 @Override
59 public void process(Exchange exchange) throws Exception {
60 Parser parser = getEndpoint().getHl7v2TransactionConfiguration().getParser();
61 Message requestMessage = exchange.getIn().getHeader(Constants.ORIGINAL_MESSAGE_ADAPTER_HEADER_NAME, Message.class);
62 Terser requestTerser = new Terser(requestMessage);
63 String requestMessageType = requestTerser.get("MSH-9-1");
64
65
66 final String msh31 = requestTerser.get("MSH-3-1");
67 final String msh32 = requestTerser.get("MSH-3-2");
68 final String msh33 = requestTerser.get("MSH-3-3");
69
70
71 if ("QCN".equals(requestMessageType) || "CNQ".equals(requestTerser.get("MSH-9-2"))) {
72 String queryTag = "QCN".equals(requestMessageType) ?
73 requestTerser.get("QID-1") :
74 requestTerser.get("QPD-2");
75 if (storage.delete(keyString(queryTag, msh31, msh32, msh33))) {
76 LOG.debug("Dropped response chain for query tag {}", queryTag);
77 Message ack = requestMessage.generateACK();
78
79
80 Terser.set((Segment)ack.get("MSH"), 9, 0, 3, 1, "ACK");
81 Exchanges.resultMessage(exchange).setBody(parser.encode(ack));
82 } else {
83 getWrappedProcessor().process(exchange);
84 }
85 return;
86 }
87
88
89 if (! getEndpoint().getHl7v2TransactionConfiguration().isContinuable(requestMessageType)) {
90 getWrappedProcessor().process(exchange);
91 return;
92 }
93
94
95 String rcp22 = requestTerser.get("RCP-2-2");
96 if (! "RD".equals(rcp22)) {
97 if (rcp22 != null) {
98 LOG.warn("Unit '{}' in RCP-2-2 is not supported", rcp22);
99 }
100 getWrappedProcessor().process(exchange);
101 return;
102 }
103
104
105 int threshold = -1;
106 try {
107 threshold = Integer.parseInt(requestTerser.get("RCP-2-1"));
108 } catch (NumberFormatException nfe) {
109 LOG.warn("Cannot parse RCP-2-1, try to use default threshold", nfe);
110 }
111 if (threshold < 1) {
112 threshold = getEndpoint().getInteractiveContinuationDefaultThreshold();
113 }
114 if (threshold < 1) {
115 LOG.debug("Cannot perform interactive continuation: invalid or missing threshold");
116 getWrappedProcessor().process(exchange);
117 return;
118 }
119
120
121 String continuationPointer = requestTerser.get("DSC-1");
122 if (isEmpty(continuationPointer)) {
123 continuationPointer = null;
124 }
125
126 if ((continuationPointer != null) && ! "I".equals(requestTerser.get("DSC-2"))) {
127 LOG.warn("Cannot perform interactive continuation: DSC-1 is not empty and DSC-2 is not 'I'");
128 getWrappedProcessor().process(exchange);
129 return;
130 }
131
132 final String queryTag = requestTerser.get("QPD-2");
133 if (isEmpty(queryTag)) {
134 LOG.warn("Cannot perform interactive continuation: empty query tag in QPD-2");
135 getWrappedProcessor().process(exchange);
136 return;
137 }
138
139
140 final String chainId = keyString(queryTag, msh31, msh32, msh33);
141 Message responseMessage = storage.get(continuationPointer, chainId);
142 if (responseMessage != null) {
143
144 LOG.debug("Use prepared fragment for {}", continuationPointer);
145 synchronized (responseMessage) {
146 Terser responseTerser = new Terser(responseMessage);
147 responseTerser.set("MSH-7", MessageUtils.hl7Now());
148 responseTerser.set("MSH-10", uniqueId());
149 responseTerser.set("MSA-2", requestTerser.get("MSH-10"));
150 }
151 } else {
152
153 getWrappedProcessor().process(exchange);
154 Message response = Exchanges.resultMessage(exchange).getBody(Message.class);
155 responseMessage = considerFragmentingResponse(response, threshold, queryTag, chainId);
156 }
157 Exchanges.resultMessage(exchange).setBody(parser.encode(responseMessage));
158 }
159
160
161
162
163
164
165
166
167
168
169 private Message considerFragmentingResponse(
170 Message responseMessage,
171 int threshold,
172 String queryTag,
173 String chainId) throws Exception
174 {
175 Terser responseTerser = new Terser(responseMessage);
176 if (isNotEmpty(responseTerser.get("DSC-1"))) {
177 LOG.warn("Cannot perform interactive continuation: DSC-1 already " +
178 "present in the response message returned from the route");
179 return responseMessage;
180 }
181
182
183 List<String> segments = splitString(responseMessage.toString(), '\r');
184 List<Integer> recordBoundaries = getRecordBoundaries(segments);
185 if (recordBoundaries.size() - 1 <= threshold) {
186 return responseMessage;
187 }
188
189
190 CharSequence headerSegments = joinSegments(segments, 0, recordBoundaries.get(0));
191 CharSequence footerSegments = joinSegments(
192 segments, recordBoundaries.get(recordBoundaries.size() - 1), segments.size());
193
194
195 final int fragmentsCount = (recordBoundaries.size() + threshold - 2) / threshold;
196
197
198 Parser parser = getEndpoint().getHl7v2TransactionConfiguration().getParser();
199 String continuationPointer = null;
200 for (int currentFragmentIndex = 0; currentFragmentIndex < fragmentsCount; ++currentFragmentIndex) {
201
202 int startRecordIndex = currentFragmentIndex * threshold;
203 int endRecordIndex = Math.min(startRecordIndex + threshold, recordBoundaries.size() - 1);
204 int startSegmentIndex = recordBoundaries.get(startRecordIndex);
205 int endSegmentIndex = recordBoundaries.get(endRecordIndex);
206
207 StringBuilder sb = new StringBuilder(headerSegments);
208 appendSegments(sb, segments, startSegmentIndex, endSegmentIndex);
209 sb.append(footerSegments);
210
211
212 Message fragment = parser.parse(sb.toString());
213 Terser fragmentTerser = new Terser(fragment);
214 String nextContinuationPointer = uniqueId();
215 if (currentFragmentIndex != fragmentsCount - 1) {
216 fragmentTerser.set("DSC-1", nextContinuationPointer);
217 fragmentTerser.set("DSC-2", "I");
218 }
219 fragmentTerser.set("QAK-4", Integer.toString(recordBoundaries.size() - 1));
220 fragmentTerser.set("QAK-5", Integer.toString(endRecordIndex - startRecordIndex));
221 fragmentTerser.set("QAK-6", Integer.toString(recordBoundaries.size() - 1 - endRecordIndex));
222
223 storage.put(continuationPointer, chainId, fragment);
224 continuationPointer = nextContinuationPointer;
225
226
227 if (currentFragmentIndex == 0) {
228 responseMessage = fragment;
229 }
230 }
231 LOG.debug("Prepared {} interactive fragments for query tag {}", fragmentsCount, queryTag);
232 return responseMessage;
233 }
234
235
236
237
238
239
240 private List<Integer> getRecordBoundaries(List<String> segments) {
241 Hl7v2TransactionConfiguration config = getEndpoint().getHl7v2TransactionConfiguration();
242 List<Integer> recordBoundaries = new ArrayList<>();
243 boolean foundFooter = false;
244 for (int i = 1; i < segments.size(); ++i) {
245 if (config.isDataStartSegment(segments, i)) {
246 recordBoundaries.add(i);
247 } else if ((recordBoundaries.size() > 0) && config.isFooterStartSegment(segments, i)) {
248 foundFooter = true;
249 recordBoundaries.add(i);
250 break;
251 }
252 }
253 if (! foundFooter) {
254 recordBoundaries.add(segments.size());
255 }
256 return recordBoundaries;
257 }
258
259 }