View Javadoc
1   /*
2    * Copyright 2014 the original author or authors.
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.ihe.mllp.core.intercept.consumer;
17  
18  import ca.uhn.hl7v2.ErrorCode;
19  import ca.uhn.hl7v2.HL7Exception;
20  import ca.uhn.hl7v2.preparser.PreParser;
21  import org.apache.camel.CamelContext;
22  import org.apache.camel.CamelException;
23  import org.apache.camel.Exchange;
24  import org.apache.camel.Route;
25  import org.apache.camel.RuntimeCamelException;
26  import org.apache.camel.StartupListener;
27  import org.openehealth.ipf.commons.ihe.hl7v2.Hl7v2AcceptanceException;
28  import org.openehealth.ipf.commons.ihe.hl7v2.Hl7v2TransactionConfiguration;
29  import org.openehealth.ipf.platform.camel.ihe.core.Interceptor;
30  import org.openehealth.ipf.platform.camel.ihe.core.InterceptorSupport;
31  import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpConsumer;
32  import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpDispatchEndpoint;
33  import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpEndpoint;
34  import org.openehealth.ipf.platform.camel.ihe.mllp.core.MllpTransactionEndpoint;
35  import org.slf4j.Logger;
36  import org.slf4j.LoggerFactory;
37  
38  import java.util.ArrayList;
39  import java.util.Arrays;
40  import java.util.HashMap;
41  import java.util.List;
42  import java.util.Map;
43  
44  import static org.openehealth.ipf.platform.camel.core.util.Exchanges.resultMessage;
45  
46  /**
47   * Interceptor which dispatches an incoming request message to another MLLP route.
48   * <p>
49   *
50   * @author Dmytro Rud
51   */
52  public final class ConsumerDispatchingInterceptor extends InterceptorSupport<MllpDispatchEndpoint>
53          implements StartupListener {
54      private static final transient Logger LOG = LoggerFactory.getLogger(ConsumerDispatchingInterceptor.class);
55  
56      private final List<String> routeIds = new ArrayList<>();
57      private Map<String, Interceptor<? extends MllpEndpoint>> map = new HashMap<>();
58  
59  
60      /**
61       * Constructs a dispatching interceptor.
62       *
63       * @param camelContext Camel context.
64       * @param routeIds     IDs of routes containing target IPF MLLP endpoints.
65       */
66      public ConsumerDispatchingInterceptor(CamelContext camelContext, String... routeIds) {
67          this(camelContext);
68          if (routeIds != null && routeIds.length > 0) {
69              addTransactionRoutes(routeIds);
70          }
71      }
72  
73      /**
74       * Constructs a dispatching interceptor
75       *
76       * @param camelContext Camel context
77       */
78      public ConsumerDispatchingInterceptor(CamelContext camelContext) {
79          super();
80          try {
81              camelContext.addStartupListener(this);
82          } catch (Exception e) {
83              throw new RuntimeCamelException(e);
84          }
85      }
86  
87      public boolean addTransactionRoutes(String... routeIds) {
88          return this.routeIds.addAll(Arrays.asList(routeIds));
89      }
90  
91      @Override
92      public void onCamelContextStarted(CamelContext camelContext, boolean alreadyStarted) throws Exception {
93          collectTransactionTargets(camelContext);
94          if (!addTargets(camelContext)) {
95              LOG.info("Mllp Dispatcher endpoint exposed without transaction targets.");
96          }
97      }
98  
99      /**
100      * Dynamically find all {@link MllpTransactionEndpoint} containing a reference
101      * to this instance and append it to the routeId list
102      *
103      * @param camelContext camel context
104      */
105     private void collectTransactionTargets(CamelContext camelContext) {
106         camelContext.getRoutes().stream()
107                 .filter(route -> route.getEndpoint() instanceof MllpTransactionEndpoint)
108                 .forEach(route -> {
109                     MllpTransactionEndpoint<?> endpoint = (MllpTransactionEndpoint<?>) route.getEndpoint();
110                     if (endpoint.getDispatcher() == this) {
111                         addTransactionRoutes(route.getId());
112                     }
113                 });
114     }
115 
116     private boolean addTargets(CamelContext camelContext) throws CamelException {
117         for (String routeId : routeIds) {
118             try {
119                 Route route = camelContext.getRoute(routeId);
120                 if (route != null) {
121                     MllpConsumer consumer = (MllpConsumer) route.getConsumer();
122                     Interceptor interceptor = (Interceptor) consumer.getProcessor();
123                     while (!(interceptor instanceof ConsumerStringProcessingInterceptor)) {
124                         interceptor = (Interceptor) interceptor.getWrappedProcessor();
125                     }
126                     LOG.debug("Adding MLLP transaction route {} to dispatcher", routeId);
127                     map.put(routeId, (Interceptor) interceptor.getWrappedProcessor());
128                 } else {
129                     throw new CamelException("Route with ID='" + routeId + "' not found or is not an IPF MLLP route");
130                 }
131             } catch (ClassCastException e) {
132                 throw new CamelException("Route with ID='" + routeId + "' is not an IPF MLLP route", e);
133             }
134         }
135         return !map.isEmpty();
136     }
137 
138     @Override
139     public void process(Exchange exchange) throws Exception {
140 
141         // determine attributes of the message
142         String message = exchange.getIn().getBody(String.class);
143         String[] fields = PreParser.getFields(message, "MSH-9-1", "MSH-9-2", "MSH-9-3", "MSH-12");
144         String messageType = fields[0];
145         String triggerEvent = fields[1];
146         String messageStructure = fields[2];
147         String version = fields[3];
148 
149         // check who can accept the message
150         boolean found = false;
151         for (String routeId : routeIds) {
152             Interceptor<? extends MllpEndpoint> interceptor = map.get(routeId);
153             Hl7v2TransactionConfiguration config = interceptor.getEndpoint().getHl7v2TransactionConfiguration();
154             try {
155                 config.checkMessageAcceptance(messageType, triggerEvent, messageStructure, version, true);
156 
157                 LOG.debug("Dispatch message with MSH-9-1='{}', MSH-9-2='{}', MSH-9-3='{}', MSH-12='{}' to route '{}'",
158                         messageType, triggerEvent, messageStructure, version, routeId);
159                 found = true;
160                 interceptor.process(exchange);
161                 break;
162             } catch (Hl7v2AcceptanceException e) {
163                 // no problem
164             }
165         }
166 
167         if (!found) {
168             LOG.debug("Nobody can process message with MSH-9-1='{}', MSH-9-2='{}', MSH-9-3='{}', MSH-12='{}'",
169                     messageType, triggerEvent, messageStructure, version);
170             HL7Exception exception = new HL7Exception(
171                     "Unsupported message type and/or version", ErrorCode.APPLICATION_INTERNAL_ERROR);
172             resultMessage(exchange).setBody(getEndpoint().getNakFactory().createDefaultNak(exception).encode());
173         }
174     }
175 
176 
177 }