View Javadoc
1   /*
2    * Copyright 2009 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *     
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.ihe.ws;
17  
18  import org.apache.camel.Exchange;
19  import org.apache.camel.Processor;
20  import org.apache.camel.impl.DefaultConsumer;
21  import org.apache.cxf.endpoint.Server;
22  import org.openehealth.ipf.commons.ihe.core.Constants;
23  import org.openehealth.ipf.commons.ihe.ws.WsTransactionConfiguration;
24  import org.openehealth.ipf.commons.ihe.ws.cxf.audit.WsAuditDataset;
25  
26  import static java.util.Objects.requireNonNull;
27  
28  
29  /**
30   * Camel component used to create process incoming exchanges based on webservice calls.
31   *
32   * @author Jens Riemschneider
33   * @author Dmytro Rud
34   */
35  public class DefaultWsConsumer<
36          AuditDatasetType extends WsAuditDataset,
37          ConfigType extends WsTransactionConfiguration<AuditDatasetType>> extends DefaultConsumer {
38      private final Server server;
39  
40      /**
41       * Constructs the consumer.
42       * @param endpoint
43       *          the endpoint representation in Camel.
44       * @param processor
45       *          the processor to start processing incoming exchanges.
46       * @param service
47       *          the service to consume messages from.
48       * @param server
49       *          the CXF server instance driving the service.
50       */
51      public DefaultWsConsumer(AbstractWsEndpoint<AuditDatasetType, ConfigType> endpoint, Processor processor, AbstractWebService service, Server server) {
52          super(endpoint, processor);
53          requireNonNull(service, "service cannot be null");
54          requireNonNull(server, "server cannot be null");
55          service.setConsumer(this);
56          this.server = server;
57      }
58  
59      @Override
60      public AbstractWsEndpoint<AuditDatasetType, ConfigType> getEndpoint() {
61          return (AbstractWsEndpoint<AuditDatasetType, ConfigType>)super.getEndpoint();
62      }
63  
64      /**
65       * Processes an exchange with the processor configured in the constructor.
66       *
67       * @param exchange
68       *          the exchange to process.
69       */
70      public void process(Exchange exchange) {
71          try {
72              exchange.getIn().setHeader(Constants.INTERACTION_ID_NAME, getEndpoint().getComponent().getInteractionId());
73              getProcessor().process(exchange);
74          } catch (Exception e) {
75              throw new RuntimeException(e);
76          }
77      }
78  
79      @Override
80      protected void doStop() throws Exception {
81          if (server != null) {
82              server.stop();
83          }
84          super.doStop();
85      }
86  
87      @Override
88      protected void doStart() throws Exception {
89          if (server != null) {
90              server.start();
91          }
92          super.doStart();
93      }
94  }