1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.consumer;
17
18 import ca.uhn.hl7v2.ErrorCode;
19 import ca.uhn.hl7v2.HL7Exception;
20 import ca.uhn.hl7v2.preparser.PreParser;
21 import org.apache.camel.CamelContext;
22 import org.apache.camel.CamelException;
23 import org.apache.camel.Exchange;
24 import org.apache.camel.Route;
25 import org.apache.camel.RuntimeCamelException;
26 import org.apache.camel.StartupListener;
27 import org.openehealth.ipf.commons.ihe.hl7v2.Hl7v2AcceptanceException;
28 import org.openehealth.ipf.commons.ihe.hl7v2.Hl7v2TransactionConfiguration;
29 import org.openehealth.ipf.platform.camel.ihe.core.Interceptor;
30 import org.openehealth.ipf.platform.camel.ihe.core.InterceptorSupport;
31 import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpConsumer;
32 import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpDispatchEndpoint;
33 import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpEndpoint;
34 import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpTransactionEndpoint;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37
38 import java.util.ArrayList;
39 import java.util.Arrays;
40 import java.util.HashMap;
41 import java.util.List;
42 import java.util.Map;
43
44 import static org.openehealth.ipf.platform.camel.core.util.Exchanges.resultMessage;
45
46
47
48
49
50
51
52 public final class ConsumerDispatchingInterceptor extends InterceptorSupport<MllpDispatchEndpoint>
53 implements StartupListener {
54 private static final transient Logger LOG = LoggerFactory.getLogger(ConsumerDispatchingInterceptor.class);
55
56 private final List<String> routeIds = new ArrayList<>();
57 private Map<String, Interceptor<? extends MllpEndpoint>> map = new HashMap<>();
58
59
60
61
62
63
64
65
66 public ConsumerDispatchingInterceptor(CamelContext camelContext, String... routeIds) {
67 this(camelContext);
68 if (routeIds != null && routeIds.length > 0) {
69 addTransactionRoutes(routeIds);
70 }
71 }
72
73
74
75
76
77
78 public ConsumerDispatchingInterceptor(CamelContext camelContext) {
79 super();
80 try {
81 camelContext.addStartupListener(this);
82 } catch (Exception e) {
83 throw new RuntimeCamelException(e);
84 }
85 }
86
87 public boolean addTransactionRoutes(String... routeIds) {
88 return this.routeIds.addAll(Arrays.asList(routeIds));
89 }
90
91 @Override
92 public void onCamelContextStarted(CamelContext camelContext, boolean alreadyStarted) throws Exception {
93 collectTransactionTargets(camelContext);
94 if (!addTargets(camelContext)) {
95 LOG.info("Mllp Dispatcher endpoint exposed without transaction targets.");
96 }
97 }
98
99
100
101
102
103
104
105 private void collectTransactionTargets(CamelContext camelContext) {
106 camelContext.getRoutes().stream()
107 .filter(route -> route.getEndpoint() instanceof MllpTransactionEndpoint)
108 .forEach(route -> {
109 MllpTransactionEndpoint<?> endpoint = (MllpTransactionEndpoint<?>) route.getEndpoint();
110 if (endpoint.getDispatcher() == this) {
111 addTransactionRoutes(route.getId());
112 }
113 });
114 }
115
116 private boolean addTargets(CamelContext camelContext) throws CamelException {
117 for (String routeId : routeIds) {
118 try {
119 Route route = camelContext.getRoute(routeId);
120 if (route != null) {
121 MllpConsumer consumer = (MllpConsumer) route.getConsumer();
122 Interceptor interceptor = (Interceptor) consumer.getProcessor();
123 while (!(interceptor instanceof ConsumerStringProcessingInterceptor)) {
124 interceptor = (Interceptor) interceptor.getWrappedProcessor();
125 }
126 LOG.debug("Adding MLLP transaction route {} to dispatcher", routeId);
127 map.put(routeId, (Interceptor) interceptor.getWrappedProcessor());
128 } else {
129 throw new CamelException("Route with ID='" + routeId + "' not found or is not an IPF MLLP route");
130 }
131 } catch (ClassCastException e) {
132 throw new CamelException("Route with ID='" + routeId + "' is not an IPF MLLP route", e);
133 }
134 }
135 return !map.isEmpty();
136 }
137
138 @Override
139 public void process(Exchange exchange) throws Exception {
140
141
142 String message = exchange.getIn().getBody(String.class);
143 String[] fields = PreParser.getFields(message, "MSH-9-1", "MSH-9-2", "MSH-9-3", "MSH-12");
144 String messageType = fields[0];
145 String triggerEvent = fields[1];
146 String messageStructure = fields[2];
147 String version = fields[3];
148
149
150 boolean found = false;
151 for (String routeId : routeIds) {
152 Interceptor<? extends MllpEndpoint> interceptor = map.get(routeId);
153 Hl7v2TransactionConfiguration config = interceptor.getEndpoint().getHl7v2TransactionConfiguration();
154 try {
155 config.checkMessageAcceptance(messageType, triggerEvent, messageStructure, version, true);
156
157 LOG.debug("Dispatch message with MSH-9-1='{}', MSH-9-2='{}', MSH-9-3='{}', MSH-12='{}' to route '{}'",
158 messageType, triggerEvent, messageStructure, version, routeId);
159 found = true;
160 interceptor.process(exchange);
161 break;
162 } catch (Hl7v2AcceptanceException e) {
163
164 }
165 }
166
167 if (!found) {
168 LOG.debug("Nobody can process message with MSH-9-1='{}', MSH-9-2='{}', MSH-9-3='{}', MSH-12='{}'",
169 messageType, triggerEvent, messageStructure, version);
170 HL7Exception exception = new HL7Exception(
171 "Unsupported message type and/or version", ErrorCode.APPLICATION_INTERNAL_ERROR);
172 resultMessage(exchange).setBody(getEndpoint().getNakFactory().createDefaultNak(exception).encode());
173 }
174 }
175
176
177 }