View Javadoc
1   /*
2    * Copyright 2010 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *     
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.consumer;
17  
18  import ca.uhn.hl7v2.model.Message;
19  import ca.uhn.hl7v2.model.Segment;
20  import ca.uhn.hl7v2.parser.Parser;
21  import ca.uhn.hl7v2.util.Terser;
22  import org.apache.camel.Exchange;
23  import org.openehealth.ipf.commons.ihe.hl7v2.Constants;
24  import org.openehealth.ipf.commons.ihe.hl7v2.Hl7v2TransactionConfiguration;
25  import org.openehealth.ipf.commons.ihe.hl7v2.storage.InteractiveContinuationStorage;
26  import org.openehealth.ipf.modules.hl7.message.MessageUtils;
27  import org.openehealth.ipf.platform.camel.core.util.Exchanges;
28  import org.openehealth.ipf.platform.camel.ihe.core.InterceptorSupport;
29  import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpTransactionEndpoint;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  import java.util.ArrayList;
34  import java.util.List;
35  
36  import static java.util.Objects.requireNonNull;
37  import static org.apache.commons.lang3.StringUtils.isEmpty;
38  import static org.apache.commons.lang3.StringUtils.isNotEmpty;
39  import static org.openehealth.ipf.platform.camel.ihe.mllp.core.FragmentationUtils.*;
40  
41  
42  /**
43   * Consumer-side interceptor for interactive continuation support 
44   * as described in paragraph 5.6.3 of the HL7 v2.5 specification.
45   * @author Dmytro Rud
46   */
47  public class ConsumerInteractiveResponseSenderInterceptor extends InterceptorSupport<MllpTransactionEndpoint<?>> {
48      private static final transient Logger LOG = LoggerFactory.getLogger(ConsumerInteractiveResponseSenderInterceptor.class);
49      private InteractiveContinuationStorage storage;
50  
51  
52      @Override
53      public void setEndpoint(MllpTransactionEndpoint<?> endpoint) {
54          super.setEndpoint(endpoint);
55          this.storage = requireNonNull(getEndpoint().getInteractiveContinuationStorage());
56      }
57  
58      @Override
59      public void process(Exchange exchange) throws Exception {
60          Parser parser = getEndpoint().getHl7v2TransactionConfiguration().getParser();
61          Message requestMessage = exchange.getIn().getHeader(Constants.ORIGINAL_MESSAGE_ADAPTER_HEADER_NAME, Message.class);
62          Terser requestTerser = new Terser(requestMessage);
63          String requestMessageType = requestTerser.get("MSH-9-1");
64  
65          // get pieces of fragments' keys
66          final String msh31 = requestTerser.get("MSH-3-1");
67          final String msh32 = requestTerser.get("MSH-3-2");
68          final String msh33 = requestTerser.get("MSH-3-3");
69  
70          // handle cancel messages; if there is nothing to cancel -- pass to the route
71          if ("QCN".equals(requestMessageType) || "CNQ".equals(requestTerser.get("MSH-9-2"))) {
72              String queryTag = "QCN".equals(requestMessageType) ? 
73                      requestTerser.get("QID-1") :
74                      requestTerser.get("QPD-2");
75              if (storage.delete(keyString(queryTag, msh31, msh32, msh33))) {
76                  LOG.debug("Dropped response chain for query tag {}", queryTag);
77                  Message ack = requestMessage.generateACK();
78  
79                  // Workaround: HAPI misses to populate the message structure for ACKs, but client may want to see it
80                  Terser.set((Segment)ack.get("MSH"), 9, 0, 3, 1, "ACK");
81                  Exchanges.resultMessage(exchange).setBody(parser.encode(ack));
82              } else {
83                  getWrappedProcessor().process(exchange);
84              }
85              return;
86          }
87  
88          // check whether responses to messages of this type can even be splitted
89          if (! getEndpoint().getHl7v2TransactionConfiguration().isContinuable(requestMessageType)) {
90              getWrappedProcessor().process(exchange);
91              return;
92          }
93  
94          // check whether requested unit type is supported
95          String rcp22 = requestTerser.get("RCP-2-2");
96          if (! "RD".equals(rcp22)) {
97              if (rcp22 != null) {
98                  LOG.warn("Unit '{}' in RCP-2-2 is not supported", rcp22);
99              }
100             getWrappedProcessor().process(exchange);
101             return;
102         }
103 
104         // determine the threshold (maximal records count per message)
105         int threshold = -1;
106         try {
107             threshold = Integer.parseInt(requestTerser.get("RCP-2-1"));
108         } catch (NumberFormatException nfe) {
109             LOG.warn("Cannot parse RCP-2-1, try to use default threshold", nfe);
110         }
111         if (threshold < 1) {
112             threshold = getEndpoint().getInteractiveContinuationDefaultThreshold();
113         }
114         if (threshold < 1) {
115             LOG.debug("Cannot perform interactive continuation: invalid or missing threshold");
116             getWrappedProcessor().process(exchange);
117             return;
118         }
119         
120         // check whether the request is acceptable; if not -- pass it to the route, let the user decide 
121         String continuationPointer = requestTerser.get("DSC-1");
122         if (isEmpty(continuationPointer)) {
123             continuationPointer = null;
124         }
125 
126         if ((continuationPointer != null) && ! "I".equals(requestTerser.get("DSC-2"))) {
127             LOG.warn("Cannot perform interactive continuation: DSC-1 is not empty and DSC-2 is not 'I'");
128             getWrappedProcessor().process(exchange);
129             return;
130         }
131         
132         final String queryTag = requestTerser.get("QPD-2");
133         if (isEmpty(queryTag)) {
134             LOG.warn("Cannot perform interactive continuation: empty query tag in QPD-2");
135             getWrappedProcessor().process(exchange);
136             return;
137         }
138 
139         // handle query
140         final String chainId = keyString(queryTag, msh31, msh32, msh33);
141         Message responseMessage = storage.get(continuationPointer, chainId);
142         if (responseMessage != null) {
143             // a prepared response fragment found -- perform some post-processing and send it to the user
144             LOG.debug("Use prepared fragment for {}", continuationPointer);
145             synchronized (responseMessage) {
146                 Terser responseTerser = new Terser(responseMessage);
147                 responseTerser.set("MSH-7", MessageUtils.hl7Now());
148                 responseTerser.set("MSH-10", uniqueId());
149                 responseTerser.set("MSA-2", requestTerser.get("MSH-10"));
150             }
151         } else {
152             // no fragment found --> run the route and create fragments if necessary
153             getWrappedProcessor().process(exchange);
154             Message response = Exchanges.resultMessage(exchange).getBody(Message.class);
155             responseMessage = considerFragmentingResponse(response, threshold, queryTag, chainId);
156         }
157         Exchanges.resultMessage(exchange).setBody(parser.encode(responseMessage));
158     }
159      
160     
161     /**
162      * Checks whether the given response message should and can be fragmented.
163      * <br>
164      * If yes -- stores the fragments into the storage and returns the first fragment,
165      * i.e. the one that must be sent immediately.
166      * <br>
167      * If no -- simply returns the response message back.
168      */
169     private Message considerFragmentingResponse(
170             Message responseMessage,
171             int threshold,
172             String queryTag,
173             String chainId) throws Exception
174     {
175         Terser responseTerser = new Terser(responseMessage);  
176         if (isNotEmpty(responseTerser.get("DSC-1"))) {
177             LOG.warn("Cannot perform interactive continuation: DSC-1 already " +
178             		 "present in the response message returned from the route");
179             return responseMessage;
180         }
181         
182         // determine data record boundaries in the response
183         List<String> segments = splitString(responseMessage.toString(), '\r');
184         List<Integer> recordBoundaries = getRecordBoundaries(segments);
185         if (recordBoundaries.size() - 1 <= threshold) {
186             return responseMessage;
187         }
188         
189         // prepare header and footer segment groups
190         CharSequence headerSegments = joinSegments(segments, 0, recordBoundaries.get(0));
191         CharSequence footerSegments = joinSegments(
192                 segments, recordBoundaries.get(recordBoundaries.size() - 1), segments.size());
193 
194         // determine count of resulting fragments
195         final int fragmentsCount = (recordBoundaries.size() + threshold - 2) / threshold; 
196         
197         // create a new chain of fragments
198         Parser parser = getEndpoint().getHl7v2TransactionConfiguration().getParser();
199         String continuationPointer = null;
200         for (int currentFragmentIndex = 0; currentFragmentIndex < fragmentsCount; ++currentFragmentIndex) {
201             // create the current fragment as String 
202             int startRecordIndex = currentFragmentIndex * threshold;
203             int endRecordIndex = Math.min(startRecordIndex + threshold, recordBoundaries.size() - 1);
204             int startSegmentIndex = recordBoundaries.get(startRecordIndex);
205             int endSegmentIndex = recordBoundaries.get(endRecordIndex);
206 
207             StringBuilder sb = new StringBuilder(headerSegments);
208             appendSegments(sb, segments, startSegmentIndex, endSegmentIndex);
209             sb.append(footerSegments);
210 
211             // parse, post-process and register the current fragment
212             Message fragment = parser.parse(sb.toString());
213             Terser fragmentTerser = new Terser(fragment);
214             String nextContinuationPointer = uniqueId();
215             if (currentFragmentIndex != fragmentsCount - 1) {
216                 fragmentTerser.set("DSC-1", nextContinuationPointer);
217                 fragmentTerser.set("DSC-2", "I");
218             }
219             fragmentTerser.set("QAK-4", Integer.toString(recordBoundaries.size() - 1));
220             fragmentTerser.set("QAK-5", Integer.toString(endRecordIndex - startRecordIndex));
221             fragmentTerser.set("QAK-6", Integer.toString(recordBoundaries.size() - 1 - endRecordIndex));
222 
223             storage.put(continuationPointer, chainId, fragment);
224             continuationPointer = nextContinuationPointer;
225 
226             // remember the first fragment in order to return it
227             if (currentFragmentIndex == 0) {
228                 responseMessage = fragment;
229             }
230         }
231         LOG.debug("Prepared {} interactive fragments for query tag {}", fragmentsCount, queryTag);
232         return responseMessage;
233     }
234 
235     
236     /**
237      * Determines boundaries for data records among the given segments' list.
238      * For N data records there will be N+1 boundaries.
239      */
240     private List<Integer> getRecordBoundaries(List<String> segments) {
241         Hl7v2TransactionConfiguration config = getEndpoint().getHl7v2TransactionConfiguration();
242         List<Integer> recordBoundaries = new ArrayList<>();
243         boolean foundFooter = false;
244         for (int i = 1; i < segments.size(); ++i) {
245             if (config.isDataStartSegment(segments, i)) {
246                 recordBoundaries.add(i);
247             } else if ((recordBoundaries.size() > 0) && config.isFooterStartSegment(segments, i)) {
248                 foundFooter = true;
249                 recordBoundaries.add(i);
250                 break;
251             }
252         }
253         if (! foundFooter) {
254             recordBoundaries.add(segments.size());
255         }
256         return recordBoundaries;
257     }
258     
259 }