1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.producer;
17
18 import ca.uhn.hl7v2.HL7Exception;
19 import ca.uhn.hl7v2.model.Message;
20 import ca.uhn.hl7v2.model.Segment;
21 import ca.uhn.hl7v2.model.v25.message.QCN_J01;
22 import ca.uhn.hl7v2.parser.Parser;
23 import ca.uhn.hl7v2.util.Terser;
24 import org.apache.camel.Exchange;
25 import org.openehealth.ipf.commons.ihe.hl7v2.Hl7v2TransactionConfiguration;
26 import org.openehealth.ipf.modules.hl7.message.MessageUtils;
27 import org.openehealth.ipf.platform.camel.core.util.Exchanges;
28 import org.openehealth.ipf.platform.camel.ihe.core.InterceptorSupport;
29 import org.openehealth.ipf.platform.camel.ihe.hl7v2.intercept.producer.ProducerMarshalInterceptor;
30 import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpTransactionEndpoint;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import java.util.List;
35
36 import static org.apache.commons.lang3.StringUtils.isEmpty;
37 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
38 import static org.openehealth.ipf.platform.camel.ihe.mllp.core.FragmentationUtils.*;
39
40
41
42
43
44
45
46
47
48 public class ProducerMarshalAndInteractiveResponseReceiverInterceptor extends InterceptorSupport<MllpTransactionEndpoint<?>> {
49 private static final transient Logger LOG = LoggerFactory.getLogger(ProducerMarshalAndInteractiveResponseReceiverInterceptor.class);
50
51 public ProducerMarshalAndInteractiveResponseReceiverInterceptor() {
52 super();
53 setId(ProducerMarshalInterceptor.class.getName());
54 }
55
56
57
58
59
60 @Override
61 public void process(Exchange exchange) throws Exception {
62 Hl7v2TransactionConfiguration config = getEndpoint().getHl7v2TransactionConfiguration();
63 Message request = exchange.getIn().getBody(Message.class);
64
65 Terser requestTerser = null;
66 String responseString = null;
67 StringBuilder fragmentAccumulator = null;
68
69
70
71
72
73
74 boolean supportContinuations = false;
75 if (getEndpoint().isSupportInteractiveContinuation()) {
76 requestTerser = new Terser(request);
77 if (config.isContinuable(requestTerser.get("MSH-9-1")) && isEmpty(requestTerser.get("DSC-1"))) {
78 supportContinuations = true;
79 fragmentAccumulator = new StringBuilder();
80 }
81 }
82
83
84 boolean mustSend = true;
85 int fragmentsCount = 0;
86 int recordsCount = 0;
87 String continuationPointer;
88 while (mustSend) {
89 mustSend = false;
90
91
92 exchange.getIn().setBody(request.toString());
93 getWrappedProcessor().process(exchange);
94 responseString = Exchanges.resultMessage(exchange).getBody(String.class);
95
96
97 if (supportContinuations) {
98 List<String> segments = splitString(responseString, '\r');
99
100
101 boolean positiveResponse = false;
102 for (String segment : segments) {
103 if (segment.startsWith("MSA")) {
104 positiveResponse = (segment.length() >= 7) && (segment.charAt(5) == 'A');
105 break;
106 }
107 }
108 if (! positiveResponse) {
109
110 LOG.debug("Not a positive response, cannot perform continuation");
111 fragmentsCount = 0;
112 recordsCount = 0;
113 break;
114 }
115
116
117 if (segments.get(segments.size() - 1).startsWith("DSC")) {
118 List<String> dscFields = splitString(segments.get(segments.size() - 1), responseString.charAt(3));
119
120 if ((dscFields.size() >= 3)
121 && "I".equals(dscFields.get(2))
122 && isNotEmpty(dscFields.get(1)))
123 {
124 continuationPointer = dscFields.get(1);
125 LOG.debug("Automatically query interactive fragment {}", continuationPointer);
126 requestTerser.set("DSC-1", continuationPointer);
127 requestTerser.set("DSC-2", "I");
128 requestTerser.set("MSH-7", MessageUtils.hl7Now());
129 requestTerser.set("MSH-10", uniqueId());
130 mustSend = true;
131 }
132 }
133
134
135
136
137
138 int startDataSegmentIndex = -1;
139 int startFooterSegmentIndex = segments.size();
140 for (int i = 1; i < segments.size(); ++i) {
141 if(config.isDataStartSegment(segments, i)) {
142 ++recordsCount;
143 if (startDataSegmentIndex == -1) {
144 startDataSegmentIndex = i;
145 }
146 }
147 else if (config.isFooterStartSegment(segments, i)) {
148 startFooterSegmentIndex = i;
149 break;
150 }
151 }
152
153 if (startDataSegmentIndex == -1) {
154
155 startDataSegmentIndex = segments.size();
156 }
157
158 if (++fragmentsCount == 1) {
159 appendSegments(fragmentAccumulator, segments, 0, startDataSegmentIndex);
160 }
161 appendSegments(fragmentAccumulator, segments, startDataSegmentIndex, startFooterSegmentIndex);
162 if (! mustSend) {
163 appendSegments(fragmentAccumulator, segments, startFooterSegmentIndex, segments.size());
164 }
165 }
166 }
167
168
169 if (fragmentsCount > 1) {
170 responseString = fragmentAccumulator.toString();
171
172
173
174 if (getEndpoint().isAutoCancel()) {
175 try {
176 String cancel = createCancelMessage(request, config.getParser());
177 exchange.getIn().setBody(cancel);
178 getWrappedProcessor().process(exchange);
179 } catch (Exception e) {
180 LOG.warn("Error while preparing and sending automatic cancel message", e);
181 }
182 }
183 }
184
185
186 Message rsp = config.getParser().parse(responseString);
187 if (recordsCount != 0) {
188 Terser responseTerser = new Terser(rsp);
189 String recordsCountString = Integer.toString(recordsCount);
190 responseTerser.set("QAK-4", recordsCountString);
191 responseTerser.set("QAK-5", recordsCountString);
192 responseTerser.set("QAK-6", "0");
193 }
194 Exchanges.resultMessage(exchange).setBody(rsp);
195 }
196
197
198
199
200
201
202
203
204
205
206 private static String createCancelMessage(Message request, Parser parser) throws HL7Exception {
207 return (request.getVersion().charAt(2) < '4') ?
208 createCnqMessage(request, parser) :
209 createQcnJ01Message(request, parser);
210 }
211
212
213 private static String createQcnJ01Message(Message request, Parser parser) throws HL7Exception {
214 Message cancel = new QCN_J01();
215
216
217 Segment requestMsh = (Segment) request.get("MSH");
218 Segment cancelMsh = (Segment) cancel.get("MSH");
219
220 Terser.set(cancelMsh, 1, 0, 1, 1, Terser.get(requestMsh, 1, 0, 1, 1));
221 Terser.set(cancelMsh, 2, 0, 1, 1, Terser.get(requestMsh, 2, 0, 1, 1));
222
223
224 for (int field = 3; field <= 6; ++field) {
225 for (int component = 1; component <= 3; ++component) {
226 Terser.set(cancelMsh, field, 0, component, 1,
227 Terser.get(requestMsh, field, 0, component, 1));
228 }
229 }
230 Terser.set(cancelMsh, 7, 0, 1, 1, MessageUtils.hl7Now());
231 Terser.set(cancelMsh, 9, 0, 1, 1, "QCN");
232 Terser.set(cancelMsh, 9, 0, 2, 1, "J01");
233 Terser.set(cancelMsh, 9, 0, 3, 1, "QCN_J01");
234 Terser.set(cancelMsh, 10, 0, 1, 1, uniqueId());
235 Terser.set(cancelMsh, 11, 0, 1, 1, "P");
236
237
238 for (int component = 1; component <= 3; ++component) {
239 Terser.set(cancelMsh, 12, 0, component, 1,
240 Terser.get(requestMsh, 12, 0, component, 1));
241 }
242
243
244 Segment requestQpd = (Segment) request.get("QPD");
245 Segment cancelQid = (Segment) cancel.get("QID");
246
247
248 Terser.set(cancelQid, 1, 0, 1, 1, Terser.get(requestQpd, 2, 0, 1, 1));
249
250
251 for (int component = 1; component <= 6; ++component) {
252 Terser.set(cancelQid, 2, 0, component, 1,
253 Terser.get(requestQpd, 1, 0, component, 1));
254 }
255
256
257 return parser.encode(cancel);
258 }
259
260
261 private static String createCnqMessage(Message request, Parser parser) throws HL7Exception {
262 Message cancel = parser.parse(parser.encode(request));
263 Segment cancelMsh = (Segment) cancel.get("MSH");
264
265 Terser.set(cancelMsh, 7, 0, 1, 1, MessageUtils.hl7Now());
266 Terser.set(cancelMsh, 9, 0, 2, 1, "CNQ");
267 Terser.set(cancelMsh, 9, 0, 3, 1, "");
268 Terser.set(cancelMsh, 10, 0, 1, 1, uniqueId());
269
270 return parser.encode(cancel);
271 }
272 }