1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.openehealth.ipf.platform.camel.core.process;
17
18 import static org.openehealth.ipf.platform.camel.core.util.Exchanges.copyExchange;
19 import static org.openehealth.ipf.platform.camel.core.util.Exchanges.createExchange;
20 import static org.openehealth.ipf.platform.camel.core.util.Exchanges.prepareResult;
21
22 import org.apache.camel.Exchange;
23 import org.apache.camel.ExchangePattern;
24 import org.apache.camel.Processor;
25 import org.apache.camel.Producer;
26 import org.apache.camel.support.ServiceSupport;
27 import org.apache.camel.processor.aggregate.AggregationStrategy;
28
29
30
31
32
33
34
35
36
37
38
39 public class Enricher extends ServiceSupport implements Processor {
40
41 private final Producer producer;
42
43 private AggregationStrategy aggregationStrategy;
44
45
46
47
48
49
50
51
52
53
54 public Enricher(Producer producer) {
55 this(defaultAggregationStrategy(), producer);
56 }
57
58
59
60
61
62
63
64
65
66
67 public Enricher(AggregationStrategy aggregationStrategy, Producer producer) {
68 this.aggregationStrategy = aggregationStrategy;
69 this.producer = producer;
70 }
71
72
73
74
75
76
77 public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
78 this.aggregationStrategy = aggregationStrategy;
79 }
80
81
82
83
84 public void setDefaultAggregationStrategy() {
85 aggregationStrategy = defaultAggregationStrategy();
86 }
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 @Override
102 public void process(Exchange exchange) throws Exception {
103 Exchange resourceExchange = createExchange(exchange, ExchangePattern.InOut);
104 producer.process(resourceExchange);
105
106 if (resourceExchange.isFailed()) {
107
108 copyExchange(resourceExchange, exchange);
109 } else {
110 prepareResult(exchange);
111
112 Exchange aggregatedExchange = aggregationStrategy.aggregate(exchange, resourceExchange);
113
114 copyExchange(aggregatedExchange, exchange);
115 }
116 }
117
118 @Override
119 protected void doStart() throws Exception {
120 producer.start();
121 }
122
123 @Override
124 protected void doStop() throws Exception {
125 producer.stop();
126 }
127
128 private static AggregationStrategy defaultAggregationStrategy() {
129 return new CopyAggregationStrategy();
130 }
131
132 private static class CopyAggregationStrategy implements AggregationStrategy {
133
134 @Override
135 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
136 copyExchange(newExchange, oldExchange);
137 return oldExchange;
138 }
139
140 }
141
142 }