View Javadoc
1   /*
2    * Copyright 2008 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *     
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.core.process.splitter;
17  
18  import java.util.Arrays;
19  import java.util.Collections;
20  import java.util.Iterator;
21  
22  import org.apache.camel.Exchange;
23  import org.apache.camel.Expression;
24  import org.apache.camel.Message;
25  import org.apache.camel.Processor;
26  import org.apache.camel.processor.DelegateProcessor;
27  import org.apache.camel.processor.aggregate.AggregationStrategy;
28  import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
29  import org.apache.camel.util.ExchangeHelper;
30  
31  import static org.apache.camel.util.ObjectHelper.notNull;
32  
33  /**
34   * A processor that splits an exchange into multiple exchanges by using a rule.
35   * The rule generates the individual sub exchanges (i.e. the result exchanges
36   * of the split).
37   * An {@link AggregationStrategy} allows aggregation of the sub exchanges.
38   *
39   * @author Jens Riemschneider
40   */
41  public class Splitter extends DelegateProcessor {
42      private final Expression splitRule;
43  
44      private static final UseLatestAggregationStrategy DEFAULT_AGGREGATION_STRATEGY =
45              new UseLatestAggregationStrategy();
46  
47      private AggregationStrategy aggregationStrategy = DEFAULT_AGGREGATION_STRATEGY;
48  
49      /**
50       * Creates a splitter
51       *
52       * @param splitRule expression that performs the splitting of the original exchange
53       * @param processor destination processor for all sub exchanges. Can be {@code null}
54       *                  if the destination is set later via {@link #setProcessor(Processor)},
55       *                  e.g. via an intercept.
56       */
57      public Splitter(Expression splitRule, Processor processor) {
58          super(processor);
59  
60          notNull(splitRule, "splitRule");
61          this.splitRule = splitRule;
62      }
63  
64      /**
65       * Sets the strategy to aggregate data over all sub exchanges created by
66       * the splitter
67       * This method allows for chain configuration
68       *
69       * @param strategy the aggregation strategy
70       * @return the splitter for chaining
71       */
72      public Splitter aggregate(AggregationStrategy strategy) {
73          aggregationStrategy =
74                  strategy != null ? strategy : DEFAULT_AGGREGATION_STRATEGY;
75          return this;
76      }
77  
78      /**
79       * Processes the given exchange
80       * This method is the entry point for splitting the given exchange into
81       * its parts via the split rule. Subclasses can change the created sub
82       * exchanges by overriding {@link #finalizeSubExchange(Exchange, Exchange, SplitIndex)}
83       * and the aggregate result by overriding {@link #finalizeAggregate(Exchange, Exchange)}.
84       *
85       * @param origExchange exchange that should be split by this processor
86       */
87      @Override
88      protected void processNext(Exchange origExchange) throws Exception {
89          notNull(origExchange, "origExchange");
90          Iterable splitResult = evaluateSplitRule(origExchange);
91          Exchange aggregate = processAllResults(origExchange, splitResult);
92          finalizeAggregate(origExchange, aggregate);
93      }
94  
95      /**
96       * Creates the actual aggregation result of the processor
97       * This method is called by {@link #processNext(Exchange)} to calculate the
98       * aggregation result of the splitter. The base implementation in this class
99       * simply copies the results of the aggregate into the original exchange.
100      * Sub classes should call this base method implementation to ensure
101      * compatibility with upcoming version.
102      *
103      * @param origExchange the exchange that was originally passed to {@link #process(Exchange)}.
104      *                     The base implementation changes the content of this exchange to
105      *                     ensure that further processing in the route is performed with
106      *                     the aggregated result.
107      * @param aggregate    the aggregation result. This is the exchange that was determined
108      *                     by passing all processed sub exchanges to the {@link AggregationStrategy}
109      *                     defined by {@link #aggregate(AggregationStrategy)}. This parameter
110      *                     can be {@code null} if the  {@link AggregationStrategy} returned
111      *                     null.
112      */
113     protected void finalizeAggregate(Exchange origExchange, Exchange aggregate) {
114         if (aggregate != null) {
115             ExchangeHelper.copyResults(origExchange, aggregate);
116         }
117     }
118 
119     /**
120      * Updates the contents of sub exchanges
121      * This method is called by {@link #processNext(Exchange)} before sending
122      * a sub exchange to the destination processor. It is the final chance to
123      * change the contents (body, header, etc.) of the sub exchange.
124      * The base implementation of this method currently performs no operations.
125      * However, it is recommended to call it in sub classes to ensure
126      * compatibility with upcoming version of this class.
127      *
128      * @param origExchange original exchange passed to {@link #process(Exchange)}
129      * @param subExchange  sub exchange that was split off. The content of this exchange
130      *                     can be changed by this method.
131      * @param index        index of the sub exchange in the result list of the split
132      */
133     protected void finalizeSubExchange(Exchange origExchange, Exchange subExchange, SplitIndex index) {
134         // Do nothing
135     }
136 
137     private Exchange processAllResults(Exchange origExchange,
138                                        Iterable splitResult) throws Exception {
139 
140         Exchange aggregate = null;
141         Iterator iterator = splitResult.iterator();
142         int counter = 0;
143         while (iterator.hasNext()) {
144             Object splitPart = iterator.next();
145 
146             SplitIndex idx = SplitIndex.valueOf(counter, !iterator.hasNext());
147             Exchange subExchange = processResult(origExchange, idx, splitPart);
148             aggregate = doAggregate(aggregate, subExchange);
149 
150             ++counter;
151         }
152         return aggregate;
153     }
154 
155     private Exchange processResult(final Exchange origExchange,
156                                    final SplitIndex index,
157                                    final Object splitPart) throws Exception {
158 
159         final Exchange subExchange = origExchange.copy();
160 
161         Message message = subExchange.getIn();
162         message.setBody(splitPart);
163         finalizeSubExchange(origExchange, subExchange, index);
164 
165         super.processNext(subExchange);
166         return subExchange;
167     }
168 
169     private Exchange doAggregate(Exchange aggregate, Exchange subExchange) {
170         if (aggregationStrategy != null) {
171             if (aggregate == null) {
172                 aggregate = subExchange;
173             } else {
174                 aggregate = aggregationStrategy.aggregate(aggregate, subExchange);
175             }
176         }
177 
178         return aggregate;
179     }
180 
181     private Iterable evaluateSplitRule(Exchange origExchange) {
182         final Object splitResult = splitRule.evaluate(origExchange, Object.class);
183 
184         if (null == splitResult) {
185             return Collections.emptySet();
186         }
187 
188         if (splitResult instanceof Iterable) {
189             return (Iterable) splitResult;
190         }
191 
192         if (splitResult instanceof Iterator) {
193             return () -> (Iterator) splitResult;
194         }
195 
196         if (splitResult.getClass().isArray()) {
197             return Arrays.asList((Object[]) splitResult);
198         }
199 
200         return Collections.singleton(splitResult);
201     }
202 
203 }