View Javadoc
1   /*
2    * Copyright 2008 the original author or authors.
3    * 
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    *
8    *     http://www.apache.org/licenses/LICENSE-2.0
9    *     
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  package org.openehealth.ipf.platform.camel.core.process.splitter;
17  
18  import static org.junit.Assert.assertEquals;
19  import static org.junit.Assert.fail;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Arrays;
24  import java.util.Collections;
25  import java.util.Iterator;
26  import java.util.List;
27  
28  import org.apache.camel.CamelContext;
29  import org.apache.camel.Exchange;
30  import org.apache.camel.ExchangePattern;
31  import org.apache.camel.Expression;
32  import org.apache.camel.Message;
33  import org.apache.camel.Processor;
34  import org.apache.camel.impl.DefaultCamelContext;
35  import org.apache.camel.impl.DefaultExchange;
36  import org.apache.camel.processor.aggregate.AggregationStrategy;
37  import org.apache.camel.util.ObjectHelper;
38  import org.junit.*;
39  import org.openehealth.ipf.platform.camel.core.process.splitter.Splitter;
40  import org.openehealth.ipf.platform.camel.core.process.splitter.support.TextFileIterator;
41  
42  
43  /**
44   * @author Jens Riemschneider
45   */
46  public class SplitterTest {
47      private static CamelContext camelContext;
48  
49      private Splitter splitter;
50      private TestProcessor dest;
51  
52      @BeforeClass
53      public static void setUpClass() {
54          camelContext = new DefaultCamelContext();
55      }
56  
57      @Before
58      public void setUp() {
59          TestSplitRule splitRule = new TestSplitRule();
60          TestAggregationStrategy aggregationStrat = new TestAggregationStrategy();
61          dest = new TestProcessor();
62          
63          splitter = new Splitter(splitRule, dest);
64          splitter.aggregate(aggregationStrat);
65      }
66      
67      @Test
68      public void testProcess() throws Exception {
69          Exchange origExchange = createTestExchange();
70          origExchange.getIn().setBody("bla,blu");
71          splitter.process(origExchange);
72          
73          List<Exchange> received = dest.getReceived();
74          assertEquals(2, received.size());
75          
76          assertEquals("bla", getContent(received.get(0)));
77          assertEquals("blu", getContent(received.get(1)));
78          
79          assertEquals("bla:blu", origExchange.getOut().getBody());
80      }
81      
82      @Test(expected=IllegalArgumentException.class)
83      public void testForNullSafeConstructor() {
84          splitter = new Splitter(null, dest);
85      }
86      
87      @Test
88      public void testResetToDefaults() throws Exception {
89          splitter.aggregate(null);
90          
91          Exchange origExchange = createTestExchange();
92          origExchange.getIn().setBody("bla,blu");
93          splitter.process(origExchange);
94          
95          List<Exchange> received = dest.getReceived();
96          assertEquals(2, received.size());
97          
98          assertEquals("bla", getContent(received.get(0)));
99          assertEquals("blu", getContent(received.get(1)));
100         
101         assertEquals("blu", origExchange.getOut().getBody());
102     }
103     
104     @Test
105     public void testSplitRuleWithArrayResult() throws Exception {
106         Exchange origExchange = createTestExchange();
107         origExchange.getIn().setBody("bla,blu");
108         Splitter splitterWithArrayResult = new Splitter(new Expression() {
109             @Override
110             public <T> T evaluate(Exchange exchange, Class<T> type) {
111                 return type.cast(getContent(exchange).split(","));            
112             }}, dest);
113         splitterWithArrayResult.aggregate(new TestAggregationStrategy());
114 
115         splitterWithArrayResult.process(origExchange);
116         
117         List<Exchange> received = dest.getReceived();
118         assertEquals(2, received.size());
119         
120         assertEquals("bla", getContent(received.get(0)));
121         assertEquals("blu", getContent(received.get(1)));
122         
123         assertEquals("bla:blu", origExchange.getOut().getBody());
124     }
125     
126     @Test
127     public void testSplitRuleWithNonIterableResult() throws Exception {
128         Splitter splitterSimpleRule = new Splitter(new Expression() {
129             @Override
130             public <T> T evaluate(Exchange exchange, Class<T> type) {
131                 return type.cast("smurf:" + exchange.getIn().getBody());
132             }}, dest);
133 
134         Exchange origExchange = createTestExchange();
135         origExchange.getIn().setBody("bla,blu");
136         splitterSimpleRule.process(origExchange);
137 
138         List<Exchange> received = dest.getReceived();
139         assertEquals(1, received.size());        
140         assertEquals("smurf:bla,blu", getContent(received.get(0)));
141     }
142     
143     @Test
144     public void testSplitRuleResultsInNothing() throws Exception {
145         Splitter splitterEmptyRule = new Splitter(new Expression() {
146             @Override
147             public <T> T evaluate(Exchange exchange, Class<T> type) {
148                 return null;
149             }}, dest);
150 
151         Exchange origExchange = createTestExchange();
152         origExchange.getIn().setBody("bla,blu");
153         splitterEmptyRule.process(origExchange);
154 
155         List<Exchange> received = dest.getReceived();
156         assertEquals(0, received.size());        
157     }
158     
159     @Test
160     public void testSplitRuleResultsInIterator() throws Exception {
161         final List<String> results = Arrays.asList("bla", "blu"); 
162         
163         Splitter splitterIteratorRule = new Splitter(new Expression() {
164             @Override
165             public <T> T evaluate(Exchange exchange, Class<T> type) {
166                 return type.cast(results.iterator());
167             }}, dest);
168 
169         Exchange origExchange = createTestExchange();
170         origExchange.getIn().setBody("bla,blu");
171         splitterIteratorRule.process(origExchange);
172 
173         List<Exchange> received = dest.getReceived();
174         assertEquals(2, received.size());
175         
176         assertEquals("bla", getContent(received.get(0)));
177         assertEquals("blu", getContent(received.get(1)));        
178         assertEquals("blu", origExchange.getOut().getBody());
179     }
180 
181     private Exchange createTestExchange() {
182         return new DefaultExchange(camelContext, ExchangePattern.InOut);
183     }
184     
185     private static String getContent(Exchange exchange) {
186         Message message = exchange.getIn();
187         return (String)message.getBody();
188     }
189     
190     /**
191      * Split rule that splits a comma separated message body into its parts
192      * E.g.: "blue,smurf" creates two messages "blue" and "smurf"
193      */
194     public static class TestSplitRule implements Expression {
195         @Override
196         public <T> T evaluate(Exchange exchange, Class<T> type) {
197             String[] parts = getContent(exchange).split(",");            
198             return type.cast(Arrays.asList(parts));
199         }
200     }
201     
202     /**
203      * An {@link Iterable} that only allows to call {@link #iterator()} once
204      */
205     private static class OneTimeUsageIterable<T> implements Iterable<T> {
206         public OneTimeUsageIterable(Iterable<T> baseIterable) {
207             ObjectHelper.notNull(baseIterable, "baseIterable");
208             this.baseIterable = baseIterable;
209         }
210 
211         /**
212          * Delegates to the base iterable
213          */
214         @Override
215         public Iterator<T> iterator() {
216             if (iteratorCalled) {
217                 throw new IllegalStateException("iterator() can only be called once");
218             }
219             
220             iteratorCalled = true;
221             return baseIterable.iterator();
222         }        
223 
224         private Iterable<T> baseIterable;
225         private boolean iteratorCalled;
226     }
227     
228     public static class TestSplitRuleSingleUse implements Expression {
229         @Override
230         public <T> T evaluate(Exchange exchange, Class<T> type) {
231             if (evaluateCalled) {
232                 throw new IllegalStateException("evaluate() can only be called once");
233             }
234             evaluateCalled = true;
235             
236             String[] parts = getContent(exchange).split(",");
237             return type.cast(new OneTimeUsageIterable<>(Arrays.asList(parts)));
238         }
239 
240         private boolean evaluateCalled;
241     }
242     
243     public static class TestSplitFileRule implements Expression {
244         @Override
245         public <T> T evaluate(Exchange exchange, Class<T> type) {
246             String filename = (String)exchange.getIn().getBody();
247             try {
248                 return type.cast(new TextFileIterator(filename));
249             } catch (IOException e) {
250                 fail("Caught exception: " + e);
251             }
252             return null;
253         }
254     }
255     
256     public static class TestAggregationStrategy implements AggregationStrategy {
257         @Override
258         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
259             String oldContent = getContent(oldExchange);
260             String newContent = getContent(newExchange);
261             String aggregateContent = oldContent + ":" + newContent;
262             Exchange aggregate = oldExchange.copy();
263             aggregate.getIn().setBody(aggregateContent);
264             return aggregate;
265         }
266     }
267     
268     private static class TestProcessor implements Processor {
269         @Override
270         public void process(Exchange exchange) throws Exception {
271             received.add(exchange);
272         }
273         
274         public List<Exchange> getReceived() {
275             return Collections.unmodifiableList(received);
276         }
277         
278         private List<Exchange> received = new ArrayList<>();
279     }
280 }