View Javadoc
1   /*
2    * Copyright 2010 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *     
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.producer;
17  
18  import ca.uhn.hl7v2.util.Terser;
19  import org.apache.camel.Exchange;
20  import org.openehealth.ipf.modules.hl7.message.MessageUtils;
21  import org.openehealth.ipf.platform.camel.core.util.Exchanges;
22  import org.openehealth.ipf.platform.camel.ihe.core.InterceptorSupport;
23  import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpTransactionEndpoint;
24  import org.slf4j.Logger;
25  import org.slf4j.LoggerFactory;
26  
27  import java.util.List;
28  
29  import static org.apache.commons.lang3.StringUtils.isNotEmpty;
30  import static org.openehealth.ipf.platform.camel.ihe.mllp.core.FragmentationUtils.*;
31  
32  /**
33   * A producer-side interceptor which implements non-interactive request 
34   * fragmentation as described in paragraph 2.10.2.2 of the HL7 v.2.5 specification.
35   * @author Dmytro Rud
36   */
37  public class ProducerRequestFragmenterInterceptor extends InterceptorSupport<MllpTransactionEndpoint<?>> {
38      private static final transient Logger LOG = LoggerFactory.getLogger(ProducerRequestFragmenterInterceptor.class);
39      
40  
41      @Override
42      public void process(Exchange exchange) throws Exception {
43          int threshold = getEndpoint().getUnsolicitedFragmentationThreshold();
44          if (threshold < 3) {
45              getWrappedProcessor().process(exchange);
46              return;
47          }
48          
49          String request = exchange.getIn().getBody(String.class);
50          char fieldSeparator = request.charAt(3);
51          List<String> segments = splitString(request, '\r');
52  
53          // short message --> send unmodified and return
54          if (segments.size() <= threshold) {
55              getWrappedProcessor().process(exchange);
56              return;
57          }
58  
59          // parse MSH segment
60          List<String> mshFields = splitString(segments.get(0), request.charAt(3));
61          
62          // when MSH-14 is already present -- send the message unmodified and return
63          if ((mshFields.size() >= 14) && isNotEmpty(mshFields.get(13))) {
64              LOG.warn("MSH-14 is not empty, cannot perform automatic message fragmentation");
65              getWrappedProcessor().process(exchange);
66              return;
67          }
68          
69          // when DSC is present and already filled -- send the message unmodified 
70          // and return; otherwise -- delete the DSC segment, if present 
71          if (segments.get(segments.size() - 1).startsWith("DSC")) {
72              List<String> dscFields = splitString(segments.get(segments.size() - 1), request.charAt(3));
73              if ((dscFields.size() >= 2) && isNotEmpty(dscFields.get(1))) {
74                  LOG.warn("DSC-1 is not empty, cannot perform automatic message fragmentation");
75                  getWrappedProcessor().process(exchange);
76                  return;
77              }
78              segments.remove(segments.size() - 1);
79          }
80  
81          while (mshFields.size() < 14) {
82              mshFields.add("");
83          }
84          
85          // main loop
86          int currentSegmentIndex = 1;
87          String continuationPointer = "";
88          while (currentSegmentIndex < segments.size()) {
89              int currentSegmentsCount = 1;
90              StringBuilder sb = new StringBuilder();
91              
92              // add MSH (position 1)
93              appendSplitSegment(sb, mshFields, fieldSeparator);
94  
95              // add data segments (positions 2..MAX-1)
96              do {
97                  sb.append(segments.get(currentSegmentIndex)).append('\r');
98              } while ((++currentSegmentIndex  < segments.size()) 
99                    && (++currentSegmentsCount < threshold - 1));
100 
101             // one position free, one segment left -> bring them together
102             if (currentSegmentIndex == segments.size() - 1) {
103                 sb.append(segments.get(currentSegmentIndex++)).append('\r');
104             }
105             
106             // one or more segments left -> add DSC (position MAX)
107             if(currentSegmentIndex < segments.size()) {
108                 continuationPointer = uniqueId();
109                 sb.append("DSC")
110                   .append(fieldSeparator)
111                   .append(continuationPointer)
112                   .append(fieldSeparator)
113                   .append("F\r");
114 
115                 LOG.debug("Send next fragment, continuation pointer = {}", continuationPointer);
116             }
117             
118             // send the generated fragment to the receiver
119             exchange.getIn().setBody(sb.toString());
120             getWrappedProcessor().process(exchange);
121 
122             // catch and analyse the response, if this was not the last fragment
123             if(currentSegmentIndex < segments.size()) {
124                 String responseString = Exchanges.resultMessage(exchange).getBody(String.class);
125                 Terser responseTerser = new Terser(getEndpoint().getHl7v2TransactionConfiguration().getParser().parse(responseString));
126                 
127                 String messageType = responseTerser.get("MSH-9-1");
128                 String acknowledgementCode = responseTerser.get("MSA-1");
129                 String controlId = mshFields.get(9);
130                 
131                 if (! "ACK".equals(messageType)) {
132                     throw new RuntimeException("Server responded with " + messageType + " instead of ACK to the fragment with control ID " + mshFields.get(9));
133                 }
134                 if ("AA".equals(acknowledgementCode) || "CA".equals(acknowledgementCode)) {
135                     if (! controlId.equals(responseTerser.get("MSA-2"))) {
136                         throw new RuntimeException("Expected " + controlId + " in MSA-2, but got " + responseTerser.get("MSA-2"));
137                     }
138                 } else {
139                     // NAKs will go to the route
140                     LOG.debug("Got NAK response for fragment with control ID {}", controlId); 
141                     break;
142                 }
143     
144                 // update fields for next fragment
145                 mshFields.set(6, MessageUtils.hl7Now());
146                 mshFields.set(9, uniqueId());
147                 mshFields.set(13, continuationPointer);
148             }
149         }
150     }
151 
152 }