View Javadoc
1   /*
2    * Copyright 2009 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *     
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.producer;
17  
18  import ca.uhn.hl7v2.HL7Exception;
19  import ca.uhn.hl7v2.model.Message;
20  import ca.uhn.hl7v2.model.Segment;
21  import ca.uhn.hl7v2.model.v25.message.QCN_J01;
22  import ca.uhn.hl7v2.parser.Parser;
23  import ca.uhn.hl7v2.util.Terser;
24  import org.apache.camel.Exchange;
25  import org.openehealth.ipf.commons.ihe.hl7v2.Hl7v2TransactionConfiguration;
26  import org.openehealth.ipf.modules.hl7.message.MessageUtils;
27  import org.openehealth.ipf.platform.camel.core.util.Exchanges;
28  import org.openehealth.ipf.platform.camel.ihe.core.InterceptorSupport;
29  import org.openehealth.ipf.platform.camel.ihe.hl7v2.intercept.producer.ProducerMarshalInterceptor;
30  import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpTransactionEndpoint;
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  import java.util.List;
35  
36  import static org.apache.commons.lang3.StringUtils.isEmpty;
37  import static org.apache.commons.lang3.StringUtils.isNotEmpty;
38  import static org.openehealth.ipf.platform.camel.ihe.mllp.core.FragmentationUtils.*;
39  
40  /**
41   * Producer-side Hl7 marshalling/unmarshalling interceptor 
42   * with support for interactive continuation.
43   * <p>
44   * Note that this interceptor has the same ID as {@link ProducerMarshalInterceptor}.
45   *
46   * @author Dmytro Rud
47   */
48  public class ProducerMarshalAndInteractiveResponseReceiverInterceptor extends InterceptorSupport<MllpTransactionEndpoint<?>> {
49      private static final transient Logger LOG = LoggerFactory.getLogger(ProducerMarshalAndInteractiveResponseReceiverInterceptor.class);
50  
51      public ProducerMarshalAndInteractiveResponseReceiverInterceptor() {
52          super();
53          setId(ProducerMarshalInterceptor.class.getName());
54      }
55  
56  
57      /**
58       * Marshals the request, sends it to the route, and unmarshals the response.
59       */
60      @Override
61      public void process(Exchange exchange) throws Exception {
62          Hl7v2TransactionConfiguration config = getEndpoint().getHl7v2TransactionConfiguration();
63          Message request = exchange.getIn().getBody(Message.class);
64          
65          Terser requestTerser = null;
66          String responseString = null;
67          StringBuilder fragmentAccumulator = null;
68          
69          // Determine whether we should automatically handle message continuations.
70          // Conditions:
71          //     1. It must be allowed for the endpoint.
72          //     2. It must be allowed for the given request message type.
73          //     3. The user must not have already filled the DSC segment.
74          boolean supportContinuations = false;
75          if (getEndpoint().isSupportInteractiveContinuation()) {
76              requestTerser = new Terser(request);
77              if (config.isContinuable(requestTerser.get("MSH-9-1")) && isEmpty(requestTerser.get("DSC-1"))) {
78                  supportContinuations = true;
79                  fragmentAccumulator = new StringBuilder();
80              }
81          }
82          
83          // communication with optional continuation handling
84          boolean mustSend = true;
85          int fragmentsCount = 0;
86          int recordsCount = 0;
87          String continuationPointer; 
88          while (mustSend) {
89              mustSend = false;
90      
91              // marshal, send and wait for response
92              exchange.getIn().setBody(request.toString());
93              getWrappedProcessor().process(exchange);
94              responseString = Exchanges.resultMessage(exchange).getBody(String.class);
95  
96              // continuations handling 
97              if (supportContinuations) {
98                  List<String> segments = splitString(responseString, '\r');
99  
100                 // analyse whether this fragment is a positive response
101                 boolean positiveResponse = false;
102                 for (String segment : segments) {
103                     if (segment.startsWith("MSA")) {
104                         positiveResponse = (segment.length() >= 7) && (segment.charAt(5) == 'A');
105                         break;
106                     }
107                 }
108                 if (! positiveResponse) {
109                     // ignore all collected fragments, pass this response as is to the route
110                     LOG.debug("Not a positive response, cannot perform continuation");
111                     fragmentsCount = 0;
112                     recordsCount = 0;
113                     break;
114                 }
115 
116                 // analyse whether we should request the next fragment   
117                 if (segments.get(segments.size() - 1).startsWith("DSC")) {
118                     List<String> dscFields = splitString(segments.get(segments.size() - 1), responseString.charAt(3));
119                     
120                     if ((dscFields.size() >= 3)
121                             && "I".equals(dscFields.get(2))
122                             && isNotEmpty(dscFields.get(1)))
123                     {
124                         continuationPointer = dscFields.get(1);
125                         LOG.debug("Automatically query interactive fragment {}", continuationPointer);
126                         requestTerser.set("DSC-1", continuationPointer);
127                         requestTerser.set("DSC-2", "I");
128                         requestTerser.set("MSH-7", MessageUtils.hl7Now());
129                         requestTerser.set("MSH-10", uniqueId());
130                         mustSend = true;
131                     }
132                 }
133                 
134                 // accumulate response fragments:
135                 //      - header segments from the first one,
136                 //      - data records from all
137                 //      - footer segments from the last one
138                 int startDataSegmentIndex = -1;
139                 int startFooterSegmentIndex = segments.size();
140                 for (int i = 1; i < segments.size(); ++i) {
141                     if(config.isDataStartSegment(segments, i)) {
142                         ++recordsCount;
143                         if (startDataSegmentIndex == -1) {
144                             startDataSegmentIndex = i;
145                         }
146                     } 
147                     else if (config.isFooterStartSegment(segments, i)) {
148                         startFooterSegmentIndex = i;
149                         break;
150                     }
151                 }
152                 
153                 if (startDataSegmentIndex == -1) {
154                     // no data records in this message
155                     startDataSegmentIndex = segments.size();
156                 }
157                 
158                 if (++fragmentsCount == 1) {
159                     appendSegments(fragmentAccumulator, segments, 0, startDataSegmentIndex);
160                 }
161                 appendSegments(fragmentAccumulator, segments, startDataSegmentIndex, startFooterSegmentIndex);
162                 if (! mustSend) {
163                     appendSegments(fragmentAccumulator, segments, startFooterSegmentIndex, segments.size());
164                 }
165             }
166         }
167 
168         // get accumulated response
169         if (fragmentsCount > 1) {
170             responseString = fragmentAccumulator.toString();
171 
172             // prepare and send automatic cancel request, if necessary.
173             // All errors will be ignored
174             if (getEndpoint().isAutoCancel()) {
175                 try {
176                     String cancel = createCancelMessage(request, config.getParser());
177                     exchange.getIn().setBody(cancel);
178                     getWrappedProcessor().process(exchange);
179                 } catch (Exception e) {
180                     LOG.warn("Error while preparing and sending automatic cancel message", e);
181                 }
182             }
183         }
184 
185         // unmarshal and return
186         Message rsp = config.getParser().parse(responseString);
187         if (recordsCount != 0) {
188             Terser responseTerser = new Terser(rsp);
189             String recordsCountString = Integer.toString(recordsCount);
190             responseTerser.set("QAK-4", recordsCountString);
191             responseTerser.set("QAK-5", recordsCountString);
192             responseTerser.set("QAK-6", "0");
193         }
194         Exchanges.resultMessage(exchange).setBody(rsp);
195     }
196 
197 
198     /**
199      * Creates a continuation cancel message on the basis of the given request.
200      * <p>
201      * For requests with HL7 version (MSH-12) prior to 2.4, a <tt>CNQ</tt>
202      * message will be created.  For requests with version 2.4 and above,
203      * a <tt>QCN^J01</tt> message will be created.
204      * See paragraph 5.6.3 in HL7 v2.5 specification.
205      */
206     private static String createCancelMessage(Message request, Parser parser) throws HL7Exception {
207         return (request.getVersion().charAt(2) < '4') ?
208             createCnqMessage(request, parser) :
209             createQcnJ01Message(request, parser);
210     }
211 
212 
213     private static String createQcnJ01Message(Message request, Parser parser) throws HL7Exception {
214         Message cancel = new QCN_J01();
215 
216         // ===== Segment MSH =====
217         Segment requestMsh = (Segment) request.get("MSH");
218         Segment cancelMsh = (Segment) cancel.get("MSH");
219 
220         Terser.set(cancelMsh, 1, 0, 1, 1, Terser.get(requestMsh, 1, 0, 1, 1));
221         Terser.set(cancelMsh, 2, 0, 1, 1, Terser.get(requestMsh, 2, 0, 1, 1));
222 
223         // sender & receiver
224         for (int field = 3; field <= 6; ++field) {
225             for (int component = 1; component <= 3; ++component) {
226                Terser.set(cancelMsh,  field, 0, component, 1,
227                Terser.get(requestMsh, field, 0, component, 1));
228             }
229         }
230         Terser.set(cancelMsh,  7, 0, 1, 1, MessageUtils.hl7Now());
231         Terser.set(cancelMsh,  9, 0, 1, 1, "QCN");
232         Terser.set(cancelMsh,  9, 0, 2, 1, "J01");
233         Terser.set(cancelMsh,  9, 0, 3, 1, "QCN_J01");
234         Terser.set(cancelMsh, 10, 0, 1, 1, uniqueId());
235         Terser.set(cancelMsh, 11, 0, 1, 1, "P");
236 
237         // version
238         for (int component = 1; component <= 3; ++component) {
239            Terser.set(cancelMsh,  12, 0, component, 1,
240            Terser.get(requestMsh, 12, 0, component, 1));
241         }
242 
243         // ===== Segment QID =====
244         Segment requestQpd = (Segment) request.get("QPD");
245         Segment cancelQid = (Segment) cancel.get("QID");
246 
247         // query tag: QPD-2 --> QID-1
248         Terser.set(cancelQid, 1, 0, 1, 1, Terser.get(requestQpd, 2, 0, 1, 1));
249 
250         // message query name: QPD-1 --> QID-2, 6 components
251         for (int component = 1; component <= 6; ++component) {
252             Terser.set(cancelQid,  2, 0, component, 1,
253             Terser.get(requestQpd, 1, 0, component, 1));
254         }
255 
256         // return
257         return parser.encode(cancel);
258     }
259 
260 
261     private static String createCnqMessage(Message request, Parser parser) throws HL7Exception {
262         Message cancel = parser.parse(parser.encode(request));
263         Segment cancelMsh = (Segment) cancel.get("MSH");
264 
265         Terser.set(cancelMsh,  7, 0, 1, 1, MessageUtils.hl7Now());
266         Terser.set(cancelMsh,  9, 0, 2, 1, "CNQ");
267         Terser.set(cancelMsh,  9, 0, 3, 1, "");
268         Terser.set(cancelMsh, 10, 0, 1, 1, uniqueId());
269 
270         return parser.encode(cancel);
271     }
272 }