1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.openehealth.ipf.platform.camel.ihe.mllp.core;
17
18 import lombok.experimental.Delegate;
19 import org.apache.camel.component.mina2.Mina2Consumer;
20 import org.apache.camel.impl.DefaultConsumer;
21 import org.apache.mina.core.future.CloseFuture;
22 import org.apache.mina.core.service.IoAcceptor;
23 import org.apache.mina.core.session.IoSession;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27
28
29
30
31 public class MllpConsumer extends DefaultConsumer {
32
33 private static final Logger LOG = LoggerFactory.getLogger(MllpConsumer.class);
34
35
36
37 private interface DoStop {
38 @SuppressWarnings("unused")
39 void stop() throws Exception;
40 }
41
42 @Delegate(excludes = DoStop.class)
43 private final Mina2Consumer consumer;
44
45 MllpConsumer(Mina2Consumer consumer) {
46
47 super(consumer.getEndpoint(), consumer.getProcessor());
48 this.consumer = consumer;
49 }
50
51 @Override
52 protected void handleException(String message, Throwable t) {
53 super.handleException(message, t);
54 }
55
56 @Override
57 protected void handleException(Throwable t) {
58 super.handleException(t);
59 }
60
61 @Override
62 public void stop() throws Exception {
63 super.stop();
64 }
65
66 @Override
67 protected void doStop() throws Exception {
68 super.doStop();
69 IoAcceptor ioAcceptor = getAcceptor();
70 if (ioAcceptor != null) {
71 for (IoSession ss : ioAcceptor.getManagedSessions().values()) {
72 CloseFuture future = ss.closeNow();
73 if (!future.awaitUninterruptibly(1000)) {
74 LOG.warn("Could not close IoSession, consumer may hang");
75 }
76 }
77 ioAcceptor.unbind();
78 ioAcceptor.dispose(false);
79 }
80 }
81
82 }