View Javadoc
1   /*
2    * Copyright 2008 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *     
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.core.adapter;
17  
18  import groovy.lang.Closure;
19  import groovy.transform.stc.ClosureParams;
20  import groovy.transform.stc.SimpleType;
21  import org.apache.camel.Exchange;
22  import org.apache.camel.Expression;
23  import org.apache.camel.processor.aggregate.AggregationStrategy;
24  import org.openehealth.ipf.commons.core.modules.api.Aggregator;
25  import org.openehealth.ipf.platform.camel.core.closures.DelegatingExpression;
26  
27  import java.util.Arrays;
28  
29  import static org.apache.camel.builder.Builder.body;
30  import static org.openehealth.ipf.platform.camel.core.util.Exchanges.prepareResult;
31  
32  /**
33   * Adapts an {@link Aggregator}. 
34   * 
35   * @author Martin Krasser
36   */
37  public class AggregatorAdapter extends AdapterSupport implements AggregationStrategy {
38  
39      private Expression aggregationInputExpression;
40  
41      private final Aggregator aggregator;
42  
43      /**
44       * Creates a new {@link AggregatorAdapter} and sets the delegate
45       * {@link Aggregator}.
46       * 
47       * @param aggregator
48       *            an aggregator.
49       */
50      public AggregatorAdapter(Aggregator aggregator) {
51          this.aggregator = aggregator;
52          aggregationInputExpression = body();
53      }
54      
55      /**
56       * Sets an {@link Expression} for obtaining data to be obtained from an
57       * additional (new) {@link Exchange}. The default expression obtains the
58       * body from the input message.
59       * 
60       * @param aggregationInputExpression
61       *            expression for obtaining aggregation input data.
62       * @return this object.
63       * 
64       * @see #aggregate(Exchange, Exchange)
65       */
66      public AggregatorAdapter aggregationInput(Expression aggregationInputExpression) {
67          this.aggregationInputExpression = aggregationInputExpression;
68          return this;
69      }
70  
71      /**
72       * Sets an expression {@link Closure} for obtaining data to be obtained from
73       * an additional (new) {@link Exchange}. The default expression obtains the
74       * body from the input message.
75       * 
76       * @param aggregationInputExpressionLogic
77       *            expression for obtaining aggregation input data.
78       * @return this object.
79       * 
80       * @see #aggregate(Exchange, Exchange)
81       */
82      public AggregatorAdapter aggregationInput(@ClosureParams(value = SimpleType.class, options = { "org.apache.camel.Expression"})
83                                                        Closure aggregationInputExpressionLogic) {
84          return aggregationInput(new DelegatingExpression(aggregationInputExpressionLogic));
85      }
86  
87      /**
88       * Applies expressions to <code>oldExchange</code> and
89       * <code>newExchange</code> and delegates further processing to
90       * {@link #doAggregate(Exchange, Object, Object, Object...)}
91       * 
92       * @see #aggregationInput(Expression)
93       * @see #input(Expression)
94       * @see #params(Expression)
95       */
96      @Override
97      public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
98          Object newInput = adaptAggregationInput(newExchange);
99          Object oldInput = adaptInput(oldExchange);
100         Object params = adaptParams(oldExchange);
101         if (params == null) {
102             doAggregate(oldExchange, oldInput, newInput, (Object[])null);
103         } else if (params.getClass().isArray()) {
104             doAggregate(oldExchange, oldInput, newInput, (Object[])params);
105         } else {
106             doAggregate(oldExchange, oldInput, newInput, params);
107         }
108         return oldExchange;
109     }
110 
111     /**
112      * Aggregates <code>oldInputData</code> and <code>newInputData</code>.
113      * The aggregation result is written to body of the message returned by
114      * {@link org.openehealth.ipf.platform.camel.core.util.Exchanges#resultMessage(Exchange)}.
115      * 
116      * @param oldExchange original message exchange to write results to.
117      * @param oldInputData original input data
118      * @param newInputData additional input data
119      * @param inputParams input parameters
120      */
121     protected void doAggregate(Exchange oldExchange, Object oldInputData, 
122             Object newInputData, Object... inputParams) {
123         
124         prepareResult(oldExchange).setBody(
125                 aggregator.zap(Arrays.asList(oldInputData, newInputData), inputParams));
126     }
127     
128     /**
129      * Applies the {@link Expression} set by
130      * {@link #aggregationInput(Expression)} to obtain input data from the
131      * <code>exchange</code>.
132      * 
133      * @param exchange
134      *            message exchange.
135      * @return aggregation input data or <code>null</code> if the expression
136      *         evaluates to <code>null</code> or the expression object is
137      *         <code>null</code>.
138      */
139     private Object adaptAggregationInput(Exchange exchange) {
140         if (aggregationInputExpression == null) {
141             return null;
142         }
143         return aggregationInputExpression.evaluate(exchange, Object.class);
144     }
145 
146 }