1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.consumer;
17
18 import ca.uhn.hl7v2.HL7Exception;
19 import ca.uhn.hl7v2.model.Message;
20 import ca.uhn.hl7v2.parser.Parser;
21 import ca.uhn.hl7v2.util.Terser;
22 import org.apache.camel.Exchange;
23 import org.openehealth.ipf.commons.ihe.hl7v2.storage.UnsolicitedFragmentationStorage;
24 import org.openehealth.ipf.modules.hl7.message.MessageUtils;
25 import org.openehealth.ipf.platform.camel.core.util.Exchanges;
26 import org.openehealth.ipf.platform.camel.ihe.core.InterceptorSupport;
27 import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpTransactionEndpoint;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import static java.util.Objects.requireNonNull;
32 import static org.apache.commons.lang3.StringUtils.isEmpty;
33 import static org.openehealth.ipf.platform.camel.ihe.mllp.core.FragmentationUtils.keyString;
34
35
36
37
38
39
40
41
42 public class ConsumerRequestDefragmenterInterceptor extends InterceptorSupport<MllpTransactionEndpoint<?>> {
43 private static final transient Logger LOG = LoggerFactory.getLogger(ConsumerRequestDefragmenterInterceptor.class);
44
45
46 private UnsolicitedFragmentationStorage storage;
47
48
49 @Override
50 public void setEndpoint(MllpTransactionEndpoint<?> endpoint) {
51 super.setEndpoint(endpoint);
52 this.storage = getEndpoint().getUnsolicitedFragmentationStorage();
53 requireNonNull(storage);
54 }
55
56
57
58
59 @Override
60 public void process(Exchange exchange) throws Exception {
61 String requestString = exchange.getIn().getBody(String.class);
62 Parser parser = getEndpoint().getHl7v2TransactionConfiguration().getParser();
63 Message requestMessage = parser.parse(requestString);
64 Terser requestTerser = new Terser(requestMessage);
65 String msh14 = requestTerser.get("MSH-14");
66 String dsc1 = null;
67 try {
68 if (! "I".equals(requestTerser.get("DSC-2"))) {
69 dsc1 = requestTerser.get("DSC-1");
70 }
71 } catch (HL7Exception e) {
72
73 }
74
75
76 if (isEmpty(msh14) && isEmpty(dsc1)) {
77 getWrappedProcessor().process(exchange);
78 return;
79 }
80
81
82 String msh31 = requestTerser.get("MSH-3-1");
83 String msh32 = requestTerser.get("MSH-3-2");
84 String msh33 = requestTerser.get("MSH-3-3");
85
86
87
88 StringBuilder accumulator;
89 if (isEmpty(msh14)) {
90 accumulator = new StringBuilder();
91 } else {
92 accumulator = storage.getAndRemove(keyString(msh14, msh31, msh32, msh33));
93 if (accumulator == null) {
94 LOG.warn("Pass unknown fragment with MSH-14=={} to the route", msh14);
95 getWrappedProcessor().process(exchange);
96 return;
97 }
98 }
99
100
101 int beginIndex = isEmpty(msh14) ? 0 : requestString.indexOf('\r');
102 int endIndex = isEmpty(dsc1) ? requestString.length() : (requestString.indexOf("\rDSC") + 1);
103 accumulator.append(requestString, beginIndex, endIndex);
104
105
106 if (isEmpty(dsc1)) {
107 LOG.debug("Finished fragment chain {}", msh14);
108 exchange.getIn().setBody(accumulator.toString());
109 getWrappedProcessor().process(exchange);
110 return;
111 }
112
113
114 LOG.debug("Processed fragment {} requesting {}", msh14, dsc1);
115
116 storage.put(keyString(dsc1, msh31, msh32, msh33), accumulator);
117 Message ack = MessageUtils.response(
118 requestMessage, "ACK",
119 requestTerser.get("MSH-9-2"));
120 Terser ackTerser = new Terser(ack);
121 ackTerser.set("MSA-1", "CA");
122 ackTerser.set("MSA-2", requestTerser.get("MSH-10"));
123 Exchanges.resultMessage(exchange).setBody(parser.encode(ack));
124 }
125
126 }