View Javadoc
1   /*
2    * Copyright 2017 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *       http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package org.apache.camel.component.hl7;
18  
19  import org.apache.mina.core.buffer.IoBuffer;
20  import org.apache.mina.core.session.IoSession;
21  import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
22  import org.apache.mina.filter.codec.ProtocolDecoderOutput;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  import java.nio.charset.CharacterCodingException;
27  import java.nio.charset.CharsetDecoder;
28  
29  /**
30   * MLLPDecoder that is aware that a HL7 message can span several buffers.
31   * In addition, it avoids rescanning TCP frames by keeping state in the IOSession.
32   *
33   * This decoder addresses a decoding error that might occur when start and/or end bytes
34   * are transferred in a different frame. It also covers the case that requests are coming
35   * in faster than they can be processed.
36   *
37   * FIXME check if this decoder is part of Camel 2.20
38   */
39  class CustomHL7MLLPDecoder extends CumulativeProtocolDecoder {
40  
41      private static final Logger LOG = LoggerFactory.getLogger(CustomHL7MLLPDecoder.class);
42      private static final String DECODER_STATE = CustomHL7MLLPDecoder.class.getName() + ".STATE";
43      private static final String CHARSET_DECODER = CustomHL7MLLPDecoder.class.getName() + ".charsetdecoder";
44  
45      private CustomHL7MLLPConfig config;
46  
47      CustomHL7MLLPDecoder(CustomHL7MLLPConfig config) {
48          this.config = config;
49      }
50  
51  
52      @Override
53      protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
54  
55          // Get the state of the current message and
56          // Skip what we have already scanned before
57          DecoderState state = decoderState(session);
58          in.position(state.current());
59  
60          LOG.debug("Received data, checking from position {} to {}", in.position(), in.limit());
61          boolean messageDecoded = false;
62  
63          while (in.hasRemaining()) {
64  
65              int previousPosition = in.position();
66              byte current = in.get();
67  
68              // Check if we are at the end of an HL7 message
69              if (current == config.getEndByte2() && state.previous() == config.getEndByte1()) {
70                  if (state.isStarted()) {
71                      // Save the current buffer pointers and reset them to surround the identifier message
72                      int currentPosition = in.position();
73                      int currentLimit = in.limit();
74                      LOG.debug("Message ends at position {} with length {}", previousPosition, previousPosition - state.start() + 1);
75                      in.position(state.start());
76                      in.limit(currentPosition);
77                      LOG.debug("Set start to position {} and limit to {}", in.position(), in.limit());
78  
79                      // Now create string or byte[] from this part of the buffer and restore the buffer pointers
80                      try {
81                          out.write(config.isProduceString()
82                                  ? parseMessageToString(in.slice(), charsetDecoder(session))
83                                  : parseMessageToByteArray(in.slice()));
84                          messageDecoded = true;
85                      } finally {
86                          LOG.debug("Resetting to position {} and limit to {}", currentPosition, currentLimit);
87                          in.position(currentPosition);
88                          in.limit(currentLimit);
89                          state.reset();
90                      }
91                  } else {
92                      LOG.warn("Ignoring message end at position {} until start byte has been seen.", previousPosition);
93                  }
94              } else {
95                  // Check if we are at the start of an HL7 message
96                  if (current == config.getStartByte()) {
97                      state.markStart(previousPosition);
98                  } else {
99                      // Remember previous byte in state object because the buffer could
100                     // be theoretically exhausted right between the two end bytes
101                     state.markPrevious(current);
102                 }
103                 messageDecoded = false;
104             }
105         }
106 
107         if (!messageDecoded) {
108             // Could not find a complete message in the buffer.
109             // Reset to the initial position (just as nothing had been read yet)
110             // and return false so that this method is called again with more data.
111             LOG.debug("No complete message yet at position {} ", in.position());
112             state.markCurrent(in.position());
113             in.position(0);
114         }
115         return messageDecoded;
116     }
117 
118     // Make a defensive byte copy (the buffer will be reused)
119     // and omit the start and the two end bytes of the MLLP message
120     // returning a byte array
121     private Object parseMessageToByteArray(IoBuffer buf) {
122         int len = buf.limit() - 3;
123         LOG.debug("Making byte array of length {}", len);
124         byte[] dst = new byte[len];
125         buf.skip(1); // skip start byte
126         buf.get(dst, 0, len);
127         buf.skip(2); // skip end bytes
128 
129         // Only do this if conversion is enabled
130         if (config.isConvertLFtoCR()) {
131             LOG.debug("Replacing LF by CR");
132             for (int i = 0; i < dst.length; i++) {
133                 if (dst[i] == (byte) '\n') {
134                     dst[i] = (byte) '\r';
135                 }
136             }
137         }
138         return dst;
139     }
140 
141     // Make a defensive byte copy (the buffer will be reused)
142     // and omit the start and the two end bytes of the MLLP message
143     // returning a String
144     private Object parseMessageToString(IoBuffer buf, CharsetDecoder decoder) throws CharacterCodingException {
145         int len = buf.limit() - 3;
146         LOG.debug("Making string of length {} using charset {}", len, decoder.charset());
147         buf.skip(1); // skip start byte
148         String message = buf.getString(len, decoder);
149         buf.skip(2); // skip end bytes
150 
151         // Only do this if conversion is enabled
152         if (config.isConvertLFtoCR()) {
153             LOG.debug("Replacing LF by CR");
154             message = message.replace('\n', '\r');
155         }
156         return message;
157     }
158 
159     @Override
160     public void dispose(IoSession session) throws Exception {
161         session.removeAttribute(DECODER_STATE);
162         session.removeAttribute(CHARSET_DECODER);
163     }
164 
165     private CharsetDecoder charsetDecoder(IoSession session) {
166         synchronized (session) {
167             CharsetDecoder decoder = (CharsetDecoder) session.getAttribute(CHARSET_DECODER);
168             if (decoder == null) {
169                 decoder = config.getCharset().newDecoder()
170                     .onMalformedInput(config.getMalformedInputErrorAction())
171                     .onUnmappableCharacter(config.getUnmappableCharacterErrorAction());
172                 session.setAttribute(CHARSET_DECODER, decoder);
173             }
174             return decoder;
175         }
176     }
177 
178     private DecoderState decoderState(IoSession session) {
179         synchronized (session) {
180             DecoderState decoderState = (DecoderState) session.getAttribute(DECODER_STATE);
181             if (decoderState == null) {
182                 decoderState = new DecoderState();
183                 session.setAttribute(DECODER_STATE, decoderState);
184             }
185             return decoderState;
186         }
187     }
188 
189     /**
190      * Holds the state of the decoding process
191      */
192     private static class DecoderState {
193         private int startPos = -1;
194         private int currentPos;
195         private byte previousByte;
196 
197         void reset() {
198             startPos = -1;
199             currentPos = 0;
200             previousByte = 0;
201         }
202 
203         void markStart(int position) {
204             if (isStarted()) {
205                 LOG.warn("Ignoring message start at position {} before previous message has ended.", position);
206             } else {
207                 startPos = position;
208                 LOG.debug("Message starts at position {}", startPos);
209             }
210         }
211 
212         void markCurrent(int position) {
213             currentPos = position;
214         }
215 
216         void markPrevious(byte previous) {
217             previousByte = previous;
218         }
219 
220         public int start() {
221             return startPos;
222         }
223 
224         public int current() {
225             return currentPos;
226         }
227 
228         public byte previous() {
229             return previousByte;
230         }
231 
232         public boolean isStarted() {
233             return startPos >= 0;
234         }
235     }
236 }