1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.openehealth.ipf.platform.camel.core.process.splitter;
17
18 import java.util.Arrays;
19 import java.util.Collections;
20 import java.util.Iterator;
21
22 import org.apache.camel.Exchange;
23 import org.apache.camel.Expression;
24 import org.apache.camel.Message;
25 import org.apache.camel.Processor;
26 import org.apache.camel.processor.DelegateProcessor;
27 import org.apache.camel.processor.aggregate.AggregationStrategy;
28 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
29 import org.apache.camel.util.ExchangeHelper;
30
31 import static org.apache.camel.util.ObjectHelper.notNull;
32
33
34
35
36
37
38
39
40
41 public class Splitter extends DelegateProcessor {
42 private final Expression splitRule;
43
44 private static final UseLatestAggregationStrategy DEFAULT_AGGREGATION_STRATEGY =
45 new UseLatestAggregationStrategy();
46
47 private AggregationStrategy aggregationStrategy = DEFAULT_AGGREGATION_STRATEGY;
48
49
50
51
52
53
54
55
56
57 public Splitter(Expression splitRule, Processor processor) {
58 super(processor);
59
60 notNull(splitRule, "splitRule");
61 this.splitRule = splitRule;
62 }
63
64
65
66
67
68
69
70
71
72 public Splitter aggregate(AggregationStrategy strategy) {
73 aggregationStrategy =
74 strategy != null ? strategy : DEFAULT_AGGREGATION_STRATEGY;
75 return this;
76 }
77
78
79
80
81
82
83
84
85
86
87 @Override
88 protected void processNext(Exchange origExchange) throws Exception {
89 notNull(origExchange, "origExchange");
90 Iterable splitResult = evaluateSplitRule(origExchange);
91 Exchange aggregate = processAllResults(origExchange, splitResult);
92 finalizeAggregate(origExchange, aggregate);
93 }
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 protected void finalizeAggregate(Exchange origExchange, Exchange aggregate) {
114 if (aggregate != null) {
115 ExchangeHelper.copyResults(origExchange, aggregate);
116 }
117 }
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133 protected void finalizeSubExchange(Exchange origExchange, Exchange subExchange, SplitIndex index) {
134
135 }
136
137 private Exchange processAllResults(Exchange origExchange,
138 Iterable splitResult) throws Exception {
139
140 Exchange aggregate = null;
141 Iterator iterator = splitResult.iterator();
142 int counter = 0;
143 while (iterator.hasNext()) {
144 Object splitPart = iterator.next();
145
146 SplitIndex idx = SplitIndex.valueOf(counter, !iterator.hasNext());
147 Exchange subExchange = processResult(origExchange, idx, splitPart);
148 aggregate = doAggregate(aggregate, subExchange);
149
150 ++counter;
151 }
152 return aggregate;
153 }
154
155 private Exchange processResult(final Exchange origExchange,
156 final SplitIndex index,
157 final Object splitPart) throws Exception {
158
159 final Exchange subExchange = origExchange.copy();
160
161 Message message = subExchange.getIn();
162 message.setBody(splitPart);
163 finalizeSubExchange(origExchange, subExchange, index);
164
165 super.processNext(subExchange);
166 return subExchange;
167 }
168
169 private Exchange doAggregate(Exchange aggregate, Exchange subExchange) {
170 if (aggregationStrategy != null) {
171 if (aggregate == null) {
172 aggregate = subExchange;
173 } else {
174 aggregate = aggregationStrategy.aggregate(aggregate, subExchange);
175 }
176 }
177
178 return aggregate;
179 }
180
181 private Iterable evaluateSplitRule(Exchange origExchange) {
182 final Object splitResult = splitRule.evaluate(origExchange, Object.class);
183
184 if (null == splitResult) {
185 return Collections.emptySet();
186 }
187
188 if (splitResult instanceof Iterable) {
189 return (Iterable) splitResult;
190 }
191
192 if (splitResult instanceof Iterator) {
193 return () -> (Iterator) splitResult;
194 }
195
196 if (splitResult.getClass().isArray()) {
197 return Arrays.asList((Object[]) splitResult);
198 }
199
200 return Collections.singleton(splitResult);
201 }
202
203 }