1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.apache.camel.component.hl7;
18
19 import org.apache.mina.core.buffer.IoBuffer;
20 import org.apache.mina.core.session.IoSession;
21 import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
22 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import java.nio.charset.CharacterCodingException;
27 import java.nio.charset.CharsetDecoder;
28
29
30
31
32
33
34
35
36
37
38
39 class CustomHL7MLLPDecoder extends CumulativeProtocolDecoder {
40
41 private static final Logger LOG = LoggerFactory.getLogger(CustomHL7MLLPDecoder.class);
42 private static final String DECODER_STATE = CustomHL7MLLPDecoder.class.getName() + ".STATE";
43 private static final String CHARSET_DECODER = CustomHL7MLLPDecoder.class.getName() + ".charsetdecoder";
44
45 private CustomHL7MLLPConfig config;
46
47 CustomHL7MLLPDecoder(CustomHL7MLLPConfig config) {
48 this.config = config;
49 }
50
51
52 @Override
53 protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
54
55
56
57 DecoderState state = decoderState(session);
58 in.position(state.current());
59
60 LOG.debug("Received data, checking from position {} to {}", in.position(), in.limit());
61 boolean messageDecoded = false;
62
63 while (in.hasRemaining()) {
64
65 int previousPosition = in.position();
66 byte current = in.get();
67
68
69 if (current == config.getEndByte2() && state.previous() == config.getEndByte1()) {
70 if (state.isStarted()) {
71
72 int currentPosition = in.position();
73 int currentLimit = in.limit();
74 LOG.debug("Message ends at position {} with length {}", previousPosition, previousPosition - state.start() + 1);
75 in.position(state.start());
76 in.limit(currentPosition);
77 LOG.debug("Set start to position {} and limit to {}", in.position(), in.limit());
78
79
80 try {
81 out.write(config.isProduceString()
82 ? parseMessageToString(in.slice(), charsetDecoder(session))
83 : parseMessageToByteArray(in.slice()));
84 messageDecoded = true;
85 } finally {
86 LOG.debug("Resetting to position {} and limit to {}", currentPosition, currentLimit);
87 in.position(currentPosition);
88 in.limit(currentLimit);
89 state.reset();
90 }
91 } else {
92 LOG.warn("Ignoring message end at position {} until start byte has been seen.", previousPosition);
93 }
94 } else {
95
96 if (current == config.getStartByte()) {
97 state.markStart(previousPosition);
98 } else {
99
100
101 state.markPrevious(current);
102 }
103 messageDecoded = false;
104 }
105 }
106
107 if (!messageDecoded) {
108
109
110
111 LOG.debug("No complete message yet at position {} ", in.position());
112 state.markCurrent(in.position());
113 in.position(0);
114 }
115 return messageDecoded;
116 }
117
118
119
120
121 private Object parseMessageToByteArray(IoBuffer buf) {
122 int len = buf.limit() - 3;
123 LOG.debug("Making byte array of length {}", len);
124 byte[] dst = new byte[len];
125 buf.skip(1);
126 buf.get(dst, 0, len);
127 buf.skip(2);
128
129
130 if (config.isConvertLFtoCR()) {
131 LOG.debug("Replacing LF by CR");
132 for (int i = 0; i < dst.length; i++) {
133 if (dst[i] == (byte) '\n') {
134 dst[i] = (byte) '\r';
135 }
136 }
137 }
138 return dst;
139 }
140
141
142
143
144 private Object parseMessageToString(IoBuffer buf, CharsetDecoder decoder) throws CharacterCodingException {
145 int len = buf.limit() - 3;
146 LOG.debug("Making string of length {} using charset {}", len, decoder.charset());
147 buf.skip(1);
148 String message = buf.getString(len, decoder);
149 buf.skip(2);
150
151
152 if (config.isConvertLFtoCR()) {
153 LOG.debug("Replacing LF by CR");
154 message = message.replace('\n', '\r');
155 }
156 return message;
157 }
158
159 @Override
160 public void dispose(IoSession session) throws Exception {
161 session.removeAttribute(DECODER_STATE);
162 session.removeAttribute(CHARSET_DECODER);
163 }
164
165 private CharsetDecoder charsetDecoder(IoSession session) {
166 synchronized (session) {
167 CharsetDecoder decoder = (CharsetDecoder) session.getAttribute(CHARSET_DECODER);
168 if (decoder == null) {
169 decoder = config.getCharset().newDecoder()
170 .onMalformedInput(config.getMalformedInputErrorAction())
171 .onUnmappableCharacter(config.getUnmappableCharacterErrorAction());
172 session.setAttribute(CHARSET_DECODER, decoder);
173 }
174 return decoder;
175 }
176 }
177
178 private DecoderState decoderState(IoSession session) {
179 synchronized (session) {
180 DecoderState decoderState = (DecoderState) session.getAttribute(DECODER_STATE);
181 if (decoderState == null) {
182 decoderState = new DecoderState();
183 session.setAttribute(DECODER_STATE, decoderState);
184 }
185 return decoderState;
186 }
187 }
188
189
190
191
192 private static class DecoderState {
193 private int startPos = -1;
194 private int currentPos;
195 private byte previousByte;
196
197 void reset() {
198 startPos = -1;
199 currentPos = 0;
200 previousByte = 0;
201 }
202
203 void markStart(int position) {
204 if (isStarted()) {
205 LOG.warn("Ignoring message start at position {} before previous message has ended.", position);
206 } else {
207 startPos = position;
208 LOG.debug("Message starts at position {}", startPos);
209 }
210 }
211
212 void markCurrent(int position) {
213 currentPos = position;
214 }
215
216 void markPrevious(byte previous) {
217 previousByte = previous;
218 }
219
220 public int start() {
221 return startPos;
222 }
223
224 public int current() {
225 return currentPos;
226 }
227
228 public byte previous() {
229 return previousByte;
230 }
231
232 public boolean isStarted() {
233 return startPos >= 0;
234 }
235 }
236 }