1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.openehealth.ipf.platform.camel.ihe.mllp.core;
17
18
19 import org.apache.camel.CamelException;
20 import org.apache.camel.Exchange;
21 import org.apache.camel.component.mina2.Mina2Consumer;
22 import org.apache.mina.core.filterchain.IoFilterAdapter;
23 import org.apache.mina.core.future.WriteFuture;
24 import org.apache.mina.core.session.IoSession;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import javax.net.ssl.SSLException;
29
30 import static java.util.concurrent.TimeUnit.*;
31 import static org.apache.camel.component.mina2.Mina2Constants.MINA_CLOSE_SESSION_WHEN_COMPLETE;
32 import static org.apache.camel.component.mina2.Mina2PayloadHelper.getIn;
33 import static org.apache.camel.component.mina2.Mina2PayloadHelper.getOut;
34
35
36
37
38
39
40
41
42
43
44 public class MllpExceptionIoFilter extends IoFilterAdapter {
45
46 private static final Logger LOG = LoggerFactory.getLogger(MllpExceptionIoFilter.class);
47
48 private final Mina2Consumer mina2Consumer;
49
50 public MllpExceptionIoFilter(Mina2Consumer mina2Consumer) {
51 this.mina2Consumer = mina2Consumer;
52 }
53
54 @Override
55 public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception {
56 if (!session.isClosing()) {
57 if (sendResponse(cause)) {
58 Exception exception = new CamelException(cause.getMessage());
59 Exchange exchange = createExchange(exception);
60 mina2Consumer.getExceptionHandler().handleException("", exchange, exception);
61 sendResponse(session, exchange);
62 } else {
63 session.closeNow();
64 nextFilter.sessionClosed(session);
65 }
66 }
67 }
68
69 private void sendResponse(IoSession session, Exchange exchange) {
70 boolean disconnect = mina2Consumer.getEndpoint().getConfiguration().isDisconnect();
71 Object response = exchange.hasOut()? getOut(mina2Consumer.getEndpoint(), exchange):
72 getIn(mina2Consumer.getEndpoint(), exchange);
73 if (response != null) {
74 WriteFuture future = session.write(response);
75 LOG.trace("Waiting for write to complete for body: {} using session: {}", response, session);
76 if (!future.awaitUninterruptibly(10, SECONDS)) {
77 LOG.warn("Cannot write body: " + response + " using session: " + session);
78 }
79 } else {
80 LOG.debug("Writing no response");
81 disconnect = Boolean.TRUE;
82 }
83
84
85 Boolean close = (Boolean)session.getAttribute(MINA_CLOSE_SESSION_WHEN_COMPLETE);
86 if (close != null) {
87 disconnect = close;
88 }
89 if (disconnect) {
90 LOG.debug("Closing session when complete at address: {}", mina2Consumer.getAcceptor().getLocalAddress());
91 session.closeNow();
92 }
93 }
94
95 private Exchange createExchange(Throwable cause){
96 Exchange exchange = mina2Consumer.getEndpoint().createExchange();
97 exchange.setException(cause);
98 return exchange;
99 }
100
101 private boolean sendResponse(Throwable cause){
102 return !(cause instanceof SSLException);
103 }
104 }