1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.openehealth.ipf.commons.audit.protocol;
18
19 import io.vertx.core.Vertx;
20 import io.vertx.core.buffer.Buffer;
21 import io.vertx.core.buffer.impl.BufferImpl;
22 import io.vertx.core.net.*;
23 import lombok.Setter;
24 import org.openehealth.ipf.commons.audit.AuditContext;
25 import org.openehealth.ipf.commons.audit.AuditException;
26 import org.openehealth.ipf.commons.audit.utils.AuditUtils;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import java.nio.charset.StandardCharsets;
31 import java.security.KeyStore;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34 import java.util.concurrent.atomic.AtomicReference;
35 import java.util.stream.Stream;
36
37
38
39
40
41
42
43 public class VertxTLSSyslogSenderImpl extends RFC5424Protocol implements AuditTransmissionProtocol {
44
45 private static final Logger LOG = LoggerFactory.getLogger(VertxTLSSyslogSenderImpl.class);
46
47 private volatile AtomicReference<String> writeHandlerId = new AtomicReference<>();
48 private final Vertx vertx;
49
50 @Setter
51 private boolean trustAll;
52
53
54 public VertxTLSSyslogSenderImpl() {
55 this(Vertx.vertx());
56 }
57
58 public VertxTLSSyslogSenderImpl(Vertx vertx) {
59 super(AuditUtils.getLocalHostName(), AuditUtils.getProcessId());
60 this.vertx = vertx;
61 }
62
63 @Override
64 public void send(AuditContext auditContext, String... auditMessages) {
65 if (auditMessages != null) {
66 for (String auditMessage : auditMessages) {
67
68
69 byte[] msgBytes = getTransportPayload(auditContext.getSendingApplication(), auditMessage);
70 byte[] syslogFrame = String.format("%d ", msgBytes.length).getBytes();
71 LOG.debug("Auditing to {}:{}",
72 auditContext.getAuditRepositoryAddress().getHostAddress(),
73 auditContext.getAuditRepositoryPort());
74 LOG.trace("{}", new String(msgBytes, StandardCharsets.UTF_8));
75 Buffer buffer = new BufferImpl()
76 .appendBytes(syslogFrame)
77 .appendBytes(msgBytes);
78
79
80 vertx.eventBus().send(ensureEstablishedConnection(auditContext), buffer);
81 }
82 }
83 }
84
85 @Override
86 public String getTransportName() {
87 return "NIO-TLS";
88 }
89
90 @Override
91 public void shutdown() {
92 vertx.close();
93 }
94
95 private String ensureEstablishedConnection(AuditContext auditContext) {
96 if (writeHandlerId.get() == null) {
97 CountDownLatch latch = new CountDownLatch(1);
98 NetClientOptions options = new NetClientOptions()
99 .setConnectTimeout(1000)
100 .setReconnectAttempts(5)
101 .setReconnectInterval(1000)
102 .setSsl(true);
103
104 if (trustAll) {
105 options.setTrustAll(true);
106 } else {
107 initializeTLSParameters(options);
108 }
109
110 NetClient client = vertx.createNetClient(options);
111 client.connect(
112 auditContext.getAuditRepositoryPort(),
113 auditContext.getAuditRepositoryAddress().getHostAddress(),
114 event -> {
115 LOG.info("Attempt to connect to {}:{} : {}",
116 auditContext.getAuditRepositoryAddress().getHostAddress(),
117 auditContext.getAuditRepositoryPort(),
118 event.succeeded());
119 if (event.succeeded()) {
120 NetSocket socket = event.result();
121 socket
122 .exceptionHandler(exceptionEvent -> {
123 LOG.info("Audit Connection caught exception", exceptionEvent);
124 writeHandlerId.set(null);
125 client.close();
126 })
127 .closeHandler(closeEvent -> {
128 LOG.info("Audit Connection closed");
129 writeHandlerId.set(null);
130 client.close();
131 });
132 writeHandlerId.compareAndSet(null, socket.writeHandlerID());
133 latch.countDown();
134 }
135 });
136
137
138 try {
139 latch.await(10000, TimeUnit.MILLISECONDS);
140 } catch (InterruptedException e) {
141 throw new AuditException(String.format("Could not establish TLS connection to %s:%d",
142 auditContext.getAuditRepositoryAddress().getHostAddress(),
143 auditContext.getAuditRepositoryPort()));
144 }
145 }
146 return writeHandlerId.get();
147 }
148
149 private void initializeTLSParameters(NetClientOptions options) {
150 String keyStoreType = System.getProperty(JAVAX_NET_SSL_KEYSTORE_TYPE, KeyStore.getDefaultType());
151 if ("JKS".equalsIgnoreCase(keyStoreType)) {
152 options.setKeyStoreOptions(new JksOptions()
153 .setPath(System.getProperty(JAVAX_NET_SSL_KEYSTORE))
154 .setPassword(System.getProperty(JAVAX_NET_SSL_KEYSTORE_PASSWORD)));
155 } else {
156 options.setPfxKeyCertOptions(new PfxOptions()
157 .setPath(System.getProperty(JAVAX_NET_SSL_KEYSTORE))
158 .setPassword(System.getProperty(JAVAX_NET_SSL_KEYSTORE_PASSWORD)));
159 }
160 String trustStoreType = System.getProperty(JAVAX_NET_SSL_TRUSTSTORE_TYPE, KeyStore.getDefaultType());
161 if ("JKS".equalsIgnoreCase(trustStoreType)) {
162 options.setTrustStoreOptions(new JksOptions()
163 .setPath(System.getProperty(JAVAX_NET_SSL_TRUSTSTORE))
164 .setPassword(System.getProperty(JAVAX_NET_SSL_TRUSTSTORE_PASSWORD)));
165 } else {
166 options.setPfxTrustOptions(new PfxOptions()
167 .setPath(System.getProperty(JAVAX_NET_SSL_TRUSTSTORE))
168 .setPassword(System.getProperty(JAVAX_NET_SSL_TRUSTSTORE_PASSWORD)));
169 }
170 String allowedProtocols = System.getProperty(JDK_TLS_CLIENT_PROTOCOLS, "TLSv1.2");
171 Stream.of(allowedProtocols.split("\\s*,\\s*"))
172 .forEach(options::addEnabledSecureTransportProtocol);
173
174 String allowedCiphers = System.getProperty(HTTPS_CIPHERSUITES);
175 if (allowedCiphers != null) {
176 Stream.of(allowedCiphers.split("\\s*,\\s*"))
177 .forEach(options::addEnabledCipherSuite);
178 }
179 }
180
181 }