View Javadoc
1   /*
2    * Copyright 2009 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *     
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.consumer;
17  
18  import ca.uhn.hl7v2.HL7Exception;
19  import ca.uhn.hl7v2.model.Message;
20  import ca.uhn.hl7v2.parser.Parser;
21  import ca.uhn.hl7v2.util.Terser;
22  import org.apache.camel.Exchange;
23  import org.openehealth.ipf.commons.ihe.hl7v2.storage.UnsolicitedFragmentationStorage;
24  import org.openehealth.ipf.modules.hl7.message.MessageUtils;
25  import org.openehealth.ipf.platform.camel.core.util.Exchanges;
26  import org.openehealth.ipf.platform.camel.ihe.core.InterceptorSupport;
27  import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpTransactionEndpoint;
28  import org.slf4j.Logger;
29  import org.slf4j.LoggerFactory;
30  
31  import static java.util.Objects.requireNonNull;
32  import static org.apache.commons.lang3.StringUtils.isEmpty;
33  import static org.openehealth.ipf.platform.camel.ihe.mllp.core.FragmentationUtils.keyString;
34  
35  
36  /**
37   * Consumer-side interceptor for receiving unsolicited request fragments
38   * as described in paragraph 2.10.2.2 of the HL7 v.2.5 specification.
39   * 
40   * @author Dmytro Rud
41   */
42  public class ConsumerRequestDefragmenterInterceptor extends InterceptorSupport<MllpTransactionEndpoint<?>> {
43      private static final transient Logger LOG = LoggerFactory.getLogger(ConsumerRequestDefragmenterInterceptor.class);
44      
45      // keys consist of: continuation pointer, MSH-3-1, MSH-3-2, and MSH-3-3  
46      private UnsolicitedFragmentationStorage storage;
47  
48  
49      @Override
50      public void setEndpoint(MllpTransactionEndpoint<?> endpoint) {
51          super.setEndpoint(endpoint);
52          this.storage = getEndpoint().getUnsolicitedFragmentationStorage();
53          requireNonNull(storage);
54      }
55  
56      /**
57       * Accumulates fragments and passes the "big" message to the processing route. 
58       */
59      @Override
60      public void process(Exchange exchange) throws Exception {
61          String requestString = exchange.getIn().getBody(String.class);
62          Parser parser = getEndpoint().getHl7v2TransactionConfiguration().getParser();
63          Message requestMessage = parser.parse(requestString);
64          Terser requestTerser = new Terser(requestMessage);
65          String msh14 = requestTerser.get("MSH-14");
66          String dsc1 = null;
67          try {
68              if (! "I".equals(requestTerser.get("DSC-2"))) {
69                  dsc1 = requestTerser.get("DSC-1");
70              }
71          } catch (HL7Exception e) {
72              // segment DSC does not exist in cancel requests
73          }
74  
75          // pass when the message is not fragmented
76          if (isEmpty(msh14) && isEmpty(dsc1)) {
77              getWrappedProcessor().process(exchange);
78              return;
79          }
80  
81          // get pieces of the accumulator's key
82          String msh31 = requestTerser.get("MSH-3-1");
83          String msh32 = requestTerser.get("MSH-3-2");
84          String msh33 = requestTerser.get("MSH-3-3");
85  
86          // create an accumulator (on the arrival of the first fragment) 
87          // or get an existing one (on the arrival of fragments 2..n)
88          StringBuilder accumulator; 
89          if (isEmpty(msh14)) {
90              accumulator = new StringBuilder();
91          } else {
92              accumulator = storage.getAndRemove(keyString(msh14, msh31, msh32, msh33));
93              if (accumulator == null) {
94                  LOG.warn("Pass unknown fragment with MSH-14=={} to the route", msh14);
95                  getWrappedProcessor().process(exchange);
96                  return;
97              }
98          }
99  
100         // append current fragment to the accumulator
101         int beginIndex = isEmpty(msh14) ? 0 : requestString.indexOf('\r');
102         int endIndex = isEmpty(dsc1) ? requestString.length() : (requestString.indexOf("\rDSC") + 1);
103         accumulator.append(requestString, beginIndex, endIndex);
104         
105         // DSC-1 is empty -- finish accumulation, pass message to the marshaller
106         if (isEmpty(dsc1)) {
107             LOG.debug("Finished fragment chain {}", msh14);
108             exchange.getIn().setBody(accumulator.toString());
109             getWrappedProcessor().process(exchange);
110             return;
111         } 
112 
113         // DSC-1 is not empty -- update accumulators map, request the next fragment
114         LOG.debug("Processed fragment {} requesting {}", msh14, dsc1);
115             
116         storage.put(keyString(dsc1, msh31, msh32, msh33), accumulator);
117         Message ack = MessageUtils.response(
118                 requestMessage, "ACK", 
119                 requestTerser.get("MSH-9-2"));
120         Terser ackTerser = new Terser(ack);
121         ackTerser.set("MSA-1", "CA");
122         ackTerser.set("MSA-2", requestTerser.get("MSH-10"));
123         Exchanges.resultMessage(exchange).setBody(parser.encode(ack));
124     }
125     
126 }