View Javadoc
1   /*
2    * Copyright 2008 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *     
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.core.process;
17  
18  import static org.openehealth.ipf.platform.camel.core.util.Exchanges.copyExchange;
19  
20  import org.apache.camel.Exchange;
21  import org.apache.camel.Processor;
22  import org.apache.camel.Producer;
23  import org.apache.camel.impl.DefaultExchange;
24  import org.apache.camel.processor.DelegateProcessor;
25  
26  /**
27   * Implements a response generation process. For generating a response the
28   * incoming {@link Exchange} is sent to a
29   * <code>responseGeneratorProcessor</code> (set at construction time) that is
30   * generating a response message from the original {@link Exchange}. The
31   * response is then communicated back to the initiator and the original exchange
32   * is forwarded to the next processor as in-only {@link Exchange}.
33   * 
34   * @author Martin Krasser
35   */
36  public class Responder extends DelegateProcessor {
37  
38      private Processor responseGeneratorProcessor;
39      private Producer responseGeneratorProducer;
40      
41      /**
42       * Creates a new {@link Responder}.
43       * 
44       * @param responseGeneratorProcessor
45       *            processor that generates the response message for an exchange.
46       */
47      public Responder(Processor responseGeneratorProcessor) {
48          this.responseGeneratorProcessor = responseGeneratorProcessor;
49      }
50  
51      /**
52       * Creates a new {@link Responder}.
53       * 
54       * @param responseGeneratorProducer
55       *            producer that generates the response message for an exchange.
56       */
57      public Responder(Producer responseGeneratorProducer) {
58          this.responseGeneratorProducer = responseGeneratorProducer;
59      }
60  
61      /**
62       * Sends the incoming {@link Exchange} to an
63       * <code>responseGeneratorProcessor</code> (set at construction time) that
64       * is generating a response message from the original {@link Exchange}. The
65       * response is then communicated back to the initiator and the original
66       * exchange is forwarded to the next processor as in-only {@link Exchange}.
67       * 
68       * @param exchange
69       *            original exchange.
70       */
71      @Override
72      protected void processNext(Exchange exchange) throws Exception {
73          Exchange delegateExchange = createDelegateExchange(exchange);
74          Exchange serviceExchange = exchange.copy();
75          
76          getResponseGenerator().process(serviceExchange);
77          // copy service exchange over to original exchange
78          // (sends response back to exchange initiator)
79          copyExchange(serviceExchange, exchange);
80          
81          if (process(exchange, serviceExchange)) {
82              super.processNext(delegateExchange);
83          }
84          
85      }
86  
87      /**
88       * Processes the <code>original</code> exchange and the
89       * <code>response</code> exchange (returned from a
90       * <code>responseGeneratorProcessor</code>) and returns a decision whether
91       * to continue processing. The default implementation always returns
92       * <code>true</code>. This method is intended to be overwritten by
93       * subclasses.
94       * 
95       * @param original
96       *            original message exchange.
97       * @param response
98       *            response message exchange.
99       * @return <code>true</code> if processing shall continue,
100      *         <code>false</code> otherwise.
101      */
102     protected boolean process(Exchange original, Exchange response) {
103         return true;
104     }
105 
106     /**
107      * Creates the exchange for the next processor returned by
108      * {@link #getProcessor()} from a source exchange.
109      * 
110      * @param source
111      *            a source exchange.
112      * @return exchange for the next processor.
113      */
114     protected Exchange createDelegateExchange(Exchange source) {
115         DefaultExchange result = new DefaultExchange(source.getContext());
116         result.getIn().copyFrom(source.getIn());
117         return result;
118     }
119     
120     @Override
121     protected void doStart() throws Exception {
122         super.doStart();
123         if (responseGeneratorProducer != null) {
124             responseGeneratorProducer.start();
125         }
126     }
127 
128     @Override
129     protected void doStop() throws Exception {
130         if (responseGeneratorProducer != null) {
131             responseGeneratorProducer.stop();
132         }
133         super.doStop();
134     }
135 
136     private Processor getResponseGenerator() {
137         if (responseGeneratorProcessor != null) {
138             return responseGeneratorProcessor;
139         } else {
140             return responseGeneratorProducer;
141         }
142     }
143     
144 }