1 /*
2 * Copyright 2008 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package org.openehealth.ipf.platform.camel.core.adapter;
17
18 import groovy.lang.Closure;
19 import groovy.transform.stc.ClosureParams;
20 import groovy.transform.stc.SimpleType;
21 import org.apache.camel.Exchange;
22 import org.apache.camel.Expression;
23 import org.apache.camel.processor.aggregate.AggregationStrategy;
24 import org.openehealth.ipf.commons.core.modules.api.Aggregator;
25 import org.openehealth.ipf.platform.camel.core.closures.DelegatingExpression;
26
27 import java.util.Arrays;
28
29 import static org.apache.camel.builder.Builder.body;
30 import static org.openehealth.ipf.platform.camel.core.util.Exchanges.prepareResult;
31
32 /**
33 * Adapts an {@link Aggregator}.
34 *
35 * @author Martin Krasser
36 */
37 public class AggregatorAdapter extends AdapterSupport implements AggregationStrategy {
38
39 private Expression aggregationInputExpression;
40
41 private final Aggregator aggregator;
42
43 /**
44 * Creates a new {@link AggregatorAdapter} and sets the delegate
45 * {@link Aggregator}.
46 *
47 * @param aggregator
48 * an aggregator.
49 */
50 public AggregatorAdapter(Aggregator aggregator) {
51 this.aggregator = aggregator;
52 aggregationInputExpression = body();
53 }
54
55 /**
56 * Sets an {@link Expression} for obtaining data to be obtained from an
57 * additional (new) {@link Exchange}. The default expression obtains the
58 * body from the input message.
59 *
60 * @param aggregationInputExpression
61 * expression for obtaining aggregation input data.
62 * @return this object.
63 *
64 * @see #aggregate(Exchange, Exchange)
65 */
66 public AggregatorAdapter aggregationInput(Expression aggregationInputExpression) {
67 this.aggregationInputExpression = aggregationInputExpression;
68 return this;
69 }
70
71 /**
72 * Sets an expression {@link Closure} for obtaining data to be obtained from
73 * an additional (new) {@link Exchange}. The default expression obtains the
74 * body from the input message.
75 *
76 * @param aggregationInputExpressionLogic
77 * expression for obtaining aggregation input data.
78 * @return this object.
79 *
80 * @see #aggregate(Exchange, Exchange)
81 */
82 public AggregatorAdapter aggregationInput(@ClosureParams(value = SimpleType.class, options = { "org.apache.camel.Expression"})
83 Closure aggregationInputExpressionLogic) {
84 return aggregationInput(new DelegatingExpression(aggregationInputExpressionLogic));
85 }
86
87 /**
88 * Applies expressions to <code>oldExchange</code> and
89 * <code>newExchange</code> and delegates further processing to
90 * {@link #doAggregate(Exchange, Object, Object, Object...)}
91 *
92 * @see #aggregationInput(Expression)
93 * @see #input(Expression)
94 * @see #params(Expression)
95 */
96 @Override
97 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
98 Object newInput = adaptAggregationInput(newExchange);
99 Object oldInput = adaptInput(oldExchange);
100 Object params = adaptParams(oldExchange);
101 if (params == null) {
102 doAggregate(oldExchange, oldInput, newInput, (Object[])null);
103 } else if (params.getClass().isArray()) {
104 doAggregate(oldExchange, oldInput, newInput, (Object[])params);
105 } else {
106 doAggregate(oldExchange, oldInput, newInput, params);
107 }
108 return oldExchange;
109 }
110
111 /**
112 * Aggregates <code>oldInputData</code> and <code>newInputData</code>.
113 * The aggregation result is written to body of the message returned by
114 * {@link org.openehealth.ipf.platform.camel.core.util.Exchanges#resultMessage(Exchange)}.
115 *
116 * @param oldExchange original message exchange to write results to.
117 * @param oldInputData original input data
118 * @param newInputData additional input data
119 * @param inputParams input parameters
120 */
121 protected void doAggregate(Exchange oldExchange, Object oldInputData,
122 Object newInputData, Object... inputParams) {
123
124 prepareResult(oldExchange).setBody(
125 aggregator.zap(Arrays.asList(oldInputData, newInputData), inputParams));
126 }
127
128 /**
129 * Applies the {@link Expression} set by
130 * {@link #aggregationInput(Expression)} to obtain input data from the
131 * <code>exchange</code>.
132 *
133 * @param exchange
134 * message exchange.
135 * @return aggregation input data or <code>null</code> if the expression
136 * evaluates to <code>null</code> or the expression object is
137 * <code>null</code>.
138 */
139 private Object adaptAggregationInput(Exchange exchange) {
140 if (aggregationInputExpression == null) {
141 return null;
142 }
143 return aggregationInputExpression.evaluate(exchange, Object.class);
144 }
145
146 }