View Javadoc
1   /*
2    * Copyright 2008 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *     
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.core.process;
17  
18  import static org.openehealth.ipf.platform.camel.core.util.Exchanges.copyExchange;
19  import static org.openehealth.ipf.platform.camel.core.util.Exchanges.createExchange;
20  import static org.openehealth.ipf.platform.camel.core.util.Exchanges.prepareResult;
21  
22  import org.apache.camel.Exchange;
23  import org.apache.camel.ExchangePattern;
24  import org.apache.camel.Processor;
25  import org.apache.camel.Producer;
26  import org.apache.camel.support.ServiceSupport;
27  import org.apache.camel.processor.aggregate.AggregationStrategy;
28  
29  /**
30   * A content enricher that enriches input data by first obtaining additional
31   * data from a <i>resource</i> represented by an endpoint <code>producer</code>
32   * and second by aggregating input data and additional data. Aggregation of
33   * input data and additional data is delegated to an {@link AggregationStrategy}
34   * object. Aggregation of input data and additional data is delegated to an
35   * {@link AggregationStrategy} object.
36   * 
37   * @author Martin Krasser
38   */
39  public class Enricher extends ServiceSupport implements Processor {
40  
41      private final Producer producer;
42      
43      private AggregationStrategy aggregationStrategy;
44      
45      /**
46       * Creates a new {@link Enricher}. The implicit aggregation strategy is to
47       * copy the additional data obtained from the enricher's resource over the
48       * input data. When using the copy aggregation strategy the enricher
49       * degenerates to a normal transformer.
50       * 
51       * @param producer
52       *            producer to resource endpoint.
53       */
54      public Enricher(Producer producer) {
55          this(defaultAggregationStrategy(), producer);
56      }
57      
58      /**
59       * Creates a new {@link Enricher}.
60       * 
61       * @param aggregationStrategy
62       *            aggregation strategy to aggregate input data and additional
63       *            data.
64       * @param producer
65       *            producer to resource endpoint.
66       */
67      public Enricher(AggregationStrategy aggregationStrategy, Producer producer) {
68          this.aggregationStrategy = aggregationStrategy;
69          this.producer = producer;
70      }
71      
72      /**
73       * Sets the aggregation strategy for this enricher.
74       * 
75       * @param aggregationStrategy the aggregationStrategy to set
76       */
77      public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
78          this.aggregationStrategy = aggregationStrategy;
79      }
80      
81      /**
82       * Sets the default aggregation strategy for this enricher.
83       */
84      public void setDefaultAggregationStrategy() {
85          aggregationStrategy = defaultAggregationStrategy();
86      }
87      
88      /**
89       * Enriches the input data (<code>exchange</code>) by first obtaining
90       * additional data from an endpoint represented by an endpoint
91       * <code>producer</code> and second by aggregating input data and additional
92       * data. Aggregation of input data and additional data is delegated to an
93       * {@link AggregationStrategy} object set at construction time. If the
94       * message exchange with the resource endpoint fails then no aggregation
95       * will be done and the failed exchange content is copied over to the
96       * original message exchange.
97       * 
98       * @param exchange
99       *            input data.
100      */
101     @Override
102     public void process(Exchange exchange) throws Exception {
103         Exchange resourceExchange = createExchange(exchange, ExchangePattern.InOut);
104         producer.process(resourceExchange);
105         
106         if (resourceExchange.isFailed()) {
107             // copy resource exchange onto original exchange (preserving pattern)
108             copyExchange(resourceExchange, exchange);
109         } else {
110             prepareResult(exchange);
111             // aggregate original exchange and resource exchange
112             Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
113             // copy aggregation result onto original exchange (preserving pattern)
114             copyExchange(aggregatedExchange, exchange);
115         }
116     }
117 
118     @Override
119     protected void doStart() throws Exception {
120         producer.start();
121     }
122 
123     @Override
124     protected void doStop() throws Exception {
125         producer.stop();
126     }
127 
128     private static AggregationStrategy defaultAggregationStrategy() {
129         return new CopyAggregationStrategy();
130     }
131 
132     private static class CopyAggregationStrategy implements AggregationStrategy {
133 
134         @Override
135         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
136             copyExchange(newExchange, oldExchange);
137             return oldExchange;
138         }
139         
140     }
141     
142 }