1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package org.openehealth.ipf.platform.camel.ihe.hl7v2.intercept.consumer;
17
18 import ca.uhn.hl7v2.AcknowledgmentCode;
19 import ca.uhn.hl7v2.HL7Exception;
20 import ca.uhn.hl7v2.model.Message;
21 import org.apache.camel.Exchange;
22 import org.apache.commons.lang3.ClassUtils;
23 import org.openehealth.ipf.commons.ihe.hl7v2.Constants;
24 import org.openehealth.ipf.platform.camel.core.util.Exchanges;
25 import org.openehealth.ipf.platform.camel.ihe.core.InterceptorSupport;
26 import org.openehealth.ipf.platform.camel.ihe.hl7v2.HL7v2Endpoint;
27 import org.openehealth.ipf.platform.camel.ihe.hl7v2.Hl7v2AdaptingException;
28 import org.openehealth.ipf.platform.camel.ihe.hl7v2.Hl7v2MarshalUtils;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import java.io.IOException;
33
34 import static org.openehealth.ipf.platform.camel.core.util.Exchanges.resultMessage;
35
36
37
38
39
40
41
42
43 public class ConsumerAdaptingInterceptor extends InterceptorSupport<HL7v2Endpoint> {
44 private static final transient Logger LOG = LoggerFactory.getLogger(ConsumerAdaptingInterceptor.class);
45 public static final String ACK_TYPE_CODE_HEADER = "ipf.hl7v2.AckTypeCode";
46
47 private final String charsetName;
48
49
50
51
52
53
54
55
56
57
58 public ConsumerAdaptingInterceptor(String charsetName) {
59 super();
60 this.charsetName = charsetName;
61 }
62
63
64
65
66
67
68
69 public ConsumerAdaptingInterceptor() {
70 this(null);
71 }
72
73
74
75
76
77
78 @Override
79 public void process(Exchange exchange) throws Exception {
80 Message originalMessage = exchange.getIn().getHeader(Constants.ORIGINAL_MESSAGE_ADAPTER_HEADER_NAME, Message.class);
81
82
83 try {
84 getWrappedProcessor().process(exchange);
85 Exception exception = Exchanges.extractException(exchange);
86 if (exception != null) {
87 throw exception;
88 }
89 } catch (Exception e) {
90 LOG.warn("Message processing failed", e);
91 resultMessage(exchange).setBody(getEndpoint().getNakFactory().createNak(originalMessage, e));
92 }
93
94 org.apache.camel.Message m = Exchanges.resultMessage(exchange);
95 Object body = m.getBody();
96
97
98 if (charsetName != null) {
99 exchange.setProperty(Exchange.CHARSET_NAME, charsetName);
100 }
101 Message msg = Hl7v2MarshalUtils.extractHapiMessage(
102 m,
103 characterSet(exchange),
104 getEndpoint().getHl7v2TransactionConfiguration().getParser());
105
106
107 if((msg == null) && (body instanceof Throwable)) {
108 msg = getEndpoint().getNakFactory().createNak(originalMessage, (Throwable) body);
109 }
110
111
112 if(msg == null) {
113 msg = analyseMagicHeader(m, originalMessage);
114 }
115
116
117 if(msg == null) {
118 throw new Hl7v2AdaptingException("Cannot create HL7v2 message from " +
119 ClassUtils.getSimpleName(body, "<null>") +
120 " returned from the route");
121 }
122
123 m.setBody(msg);
124 }
125
126
127
128
129
130
131 @SuppressWarnings("rawtypes")
132 private Message analyseMagicHeader(org.apache.camel.Message m, Message originalMessage) throws HL7Exception, IOException {
133 Object header = m.getHeader(ACK_TYPE_CODE_HEADER);
134 if ((header == null) || ! (header instanceof AcknowledgmentCode)) {
135 return null;
136 }
137
138 Message ack;
139 if ((header == AcknowledgmentCode.AA) || (header == AcknowledgmentCode.CA)) {
140 ack = getEndpoint().getNakFactory().createAck(
141 originalMessage);
142 } else {
143 HL7Exception exception = new HL7Exception(
144 "HL7v2 processing failed",
145 getEndpoint().getHl7v2TransactionConfiguration().getResponseErrorDefaultErrorCode());
146 ack = getEndpoint().getNakFactory().createNak(
147 originalMessage,
148 exception,
149 (AcknowledgmentCode) header);
150 }
151 return ack;
152 }
153
154 }