1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.producer;
17
18 import ca.uhn.hl7v2.util.Terser;
19 import org.apache.camel.Exchange;
20 import org.openehealth.ipf.modules.hl7.message.MessageUtils;
21 import org.openehealth.ipf.platform.camel.core.util.Exchanges;
22 import org.openehealth.ipf.platform.camel.ihe.core.InterceptorSupport;
23 import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpTransactionEndpoint;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import java.util.List;
28
29 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
30 import static org.openehealth.ipf.platform.camel.ihe.mllp.core.FragmentationUtils.*;
31
32
33
34
35
36
37 public class ProducerRequestFragmenterInterceptor extends InterceptorSupport<MllpTransactionEndpoint<?>> {
38 private static final transient Logger LOG = LoggerFactory.getLogger(ProducerRequestFragmenterInterceptor.class);
39
40
41 @Override
42 public void process(Exchange exchange) throws Exception {
43 int threshold = getEndpoint().getUnsolicitedFragmentationThreshold();
44 if (threshold < 3) {
45 getWrappedProcessor().process(exchange);
46 return;
47 }
48
49 String request = exchange.getIn().getBody(String.class);
50 char fieldSeparator = request.charAt(3);
51 List<String> segments = splitString(request, '\r');
52
53
54 if (segments.size() <= threshold) {
55 getWrappedProcessor().process(exchange);
56 return;
57 }
58
59
60 List<String> mshFields = splitString(segments.get(0), request.charAt(3));
61
62
63 if ((mshFields.size() >= 14) && isNotEmpty(mshFields.get(13))) {
64 LOG.warn("MSH-14 is not empty, cannot perform automatic message fragmentation");
65 getWrappedProcessor().process(exchange);
66 return;
67 }
68
69
70
71 if (segments.get(segments.size() - 1).startsWith("DSC")) {
72 List<String> dscFields = splitString(segments.get(segments.size() - 1), request.charAt(3));
73 if ((dscFields.size() >= 2) && isNotEmpty(dscFields.get(1))) {
74 LOG.warn("DSC-1 is not empty, cannot perform automatic message fragmentation");
75 getWrappedProcessor().process(exchange);
76 return;
77 }
78 segments.remove(segments.size() - 1);
79 }
80
81 while (mshFields.size() < 14) {
82 mshFields.add("");
83 }
84
85
86 int currentSegmentIndex = 1;
87 String continuationPointer = "";
88 while (currentSegmentIndex < segments.size()) {
89 int currentSegmentsCount = 1;
90 StringBuilder sb = new StringBuilder();
91
92
93 appendSplitSegment(sb, mshFields, fieldSeparator);
94
95
96 do {
97 sb.append(segments.get(currentSegmentIndex)).append('\r');
98 } while ((++currentSegmentIndex < segments.size())
99 && (++currentSegmentsCount < threshold - 1));
100
101
102 if (currentSegmentIndex == segments.size() - 1) {
103 sb.append(segments.get(currentSegmentIndex++)).append('\r');
104 }
105
106
107 if(currentSegmentIndex < segments.size()) {
108 continuationPointer = uniqueId();
109 sb.append("DSC")
110 .append(fieldSeparator)
111 .append(continuationPointer)
112 .append(fieldSeparator)
113 .append("F\r");
114
115 LOG.debug("Send next fragment, continuation pointer = {}", continuationPointer);
116 }
117
118
119 exchange.getIn().setBody(sb.toString());
120 getWrappedProcessor().process(exchange);
121
122
123 if(currentSegmentIndex < segments.size()) {
124 String responseString = Exchanges.resultMessage(exchange).getBody(String.class);
125 Terser responseTerser = new Terser(getEndpoint().getHl7v2TransactionConfiguration().getParser().parse(responseString));
126
127 String messageType = responseTerser.get("MSH-9-1");
128 String acknowledgementCode = responseTerser.get("MSA-1");
129 String controlId = mshFields.get(9);
130
131 if (! "ACK".equals(messageType)) {
132 throw new RuntimeException("Server responded with " + messageType + " instead of ACK to the fragment with control ID " + mshFields.get(9));
133 }
134 if ("AA".equals(acknowledgementCode) || "CA".equals(acknowledgementCode)) {
135 if (! controlId.equals(responseTerser.get("MSA-2"))) {
136 throw new RuntimeException("Expected " + controlId + " in MSA-2, but got " + responseTerser.get("MSA-2"));
137 }
138 } else {
139
140 LOG.debug("Got NAK response for fragment with control ID {}", controlId);
141 break;
142 }
143
144
145 mshFields.set(6, MessageUtils.hl7Now());
146 mshFields.set(9, uniqueId());
147 mshFields.set(13, continuationPointer);
148 }
149 }
150 }
151
152 }