View Javadoc
1   /*
2    * Copyright 2018 the original author or authors.
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *         http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package org.openehealth.ipf.commons.audit.protocol;
18  
19  import io.vertx.core.Vertx;
20  import io.vertx.core.buffer.Buffer;
21  import io.vertx.core.buffer.impl.BufferImpl;
22  import io.vertx.core.net.*;
23  import lombok.Setter;
24  import org.openehealth.ipf.commons.audit.AuditContext;
25  import org.openehealth.ipf.commons.audit.AuditException;
26  import org.openehealth.ipf.commons.audit.utils.AuditUtils;
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  
30  import java.nio.charset.StandardCharsets;
31  import java.security.KeyStore;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.TimeUnit;
34  import java.util.concurrent.atomic.AtomicReference;
35  import java.util.stream.Stream;
36  
37  /**
38   * NIO implemention of a TLS Syslog sender by using an embedded Vert.x instance.
39   *
40   * @author Christian Ohr
41   * @since 3.5
42   */
43  public class VertxTLSSyslogSenderImpl extends RFC5424Protocol implements AuditTransmissionProtocol {
44  
45      private static final Logger LOG = LoggerFactory.getLogger(VertxTLSSyslogSenderImpl.class);
46  
47      private volatile AtomicReference<String> writeHandlerId = new AtomicReference<>();
48      private final Vertx vertx;
49  
50      @Setter
51      private boolean trustAll;
52  
53  
54      public VertxTLSSyslogSenderImpl() {
55          this(Vertx.vertx());
56      }
57  
58      public VertxTLSSyslogSenderImpl(Vertx vertx) {
59          super(AuditUtils.getLocalHostName(), AuditUtils.getProcessId());
60          this.vertx = vertx;
61      }
62  
63      @Override
64      public void send(AuditContext auditContext, String... auditMessages) {
65          if (auditMessages != null) {
66              for (String auditMessage : auditMessages) {
67  
68                  // Could use a Vertx codec for this
69                  byte[] msgBytes = getTransportPayload(auditContext.getSendingApplication(), auditMessage);
70                  byte[] syslogFrame = String.format("%d ", msgBytes.length).getBytes();
71                  LOG.debug("Auditing to {}:{}",
72                          auditContext.getAuditRepositoryAddress().getHostAddress(),
73                          auditContext.getAuditRepositoryPort());
74                  LOG.trace("{}", new String(msgBytes, StandardCharsets.UTF_8));
75                  Buffer buffer = new BufferImpl()
76                          .appendBytes(syslogFrame)
77                          .appendBytes(msgBytes);
78  
79                  // The net socket has registered itself on the Vertx EventBus
80                  vertx.eventBus().send(ensureEstablishedConnection(auditContext), buffer);
81              }
82          }
83      }
84  
85      @Override
86      public String getTransportName() {
87          return "NIO-TLS";
88      }
89  
90      @Override
91      public void shutdown() {
92          vertx.close();
93      }
94  
95      private String ensureEstablishedConnection(AuditContext auditContext) {
96          if (writeHandlerId.get() == null) {
97              CountDownLatch latch = new CountDownLatch(1);
98              NetClientOptions options = new NetClientOptions()
99                      .setConnectTimeout(1000)
100                     .setReconnectAttempts(5)
101                     .setReconnectInterval(1000)
102                     .setSsl(true);
103 
104             if (trustAll) {
105                 options.setTrustAll(true);
106             } else {
107                 initializeTLSParameters(options);
108             }
109 
110             NetClient client = vertx.createNetClient(options);
111             client.connect(
112                     auditContext.getAuditRepositoryPort(),
113                     auditContext.getAuditRepositoryAddress().getHostAddress(),
114                     event -> {
115                         LOG.info("Attempt to connect to {}:{} : {}",
116                                 auditContext.getAuditRepositoryAddress().getHostAddress(),
117                                 auditContext.getAuditRepositoryPort(),
118                                 event.succeeded());
119                         if (event.succeeded()) {
120                             NetSocket socket = event.result();
121                             socket
122                                     .exceptionHandler(exceptionEvent -> {
123                                         LOG.info("Audit Connection caught exception", exceptionEvent);
124                                         writeHandlerId.set(null);
125                                         client.close();
126                                     })
127                                     .closeHandler(closeEvent -> {
128                                         LOG.info("Audit Connection closed");
129                                         writeHandlerId.set(null);
130                                         client.close();
131                                     });
132                             writeHandlerId.compareAndSet(null, socket.writeHandlerID());
133                             latch.countDown();
134                         }
135                     });
136 
137             // Ensure that connection is established before returning
138             try {
139                 latch.await(10000, TimeUnit.MILLISECONDS);
140             } catch (InterruptedException e) {
141                 throw new AuditException(String.format("Could not establish TLS connection to %s:%d",
142                         auditContext.getAuditRepositoryAddress().getHostAddress(),
143                         auditContext.getAuditRepositoryPort()));
144             }
145         }
146         return writeHandlerId.get();
147     }
148 
149     private void initializeTLSParameters(NetClientOptions options) {
150         String keyStoreType = System.getProperty(JAVAX_NET_SSL_KEYSTORE_TYPE, KeyStore.getDefaultType());
151         if ("JKS".equalsIgnoreCase(keyStoreType)) {
152             options.setKeyStoreOptions(new JksOptions()
153                     .setPath(System.getProperty(JAVAX_NET_SSL_KEYSTORE))
154                     .setPassword(System.getProperty(JAVAX_NET_SSL_KEYSTORE_PASSWORD)));
155         } else {
156             options.setPfxKeyCertOptions(new PfxOptions()
157                     .setPath(System.getProperty(JAVAX_NET_SSL_KEYSTORE))
158                     .setPassword(System.getProperty(JAVAX_NET_SSL_KEYSTORE_PASSWORD)));
159         }
160         String trustStoreType = System.getProperty(JAVAX_NET_SSL_TRUSTSTORE_TYPE, KeyStore.getDefaultType());
161         if ("JKS".equalsIgnoreCase(trustStoreType)) {
162             options.setTrustStoreOptions(new JksOptions()
163                     .setPath(System.getProperty(JAVAX_NET_SSL_TRUSTSTORE))
164                     .setPassword(System.getProperty(JAVAX_NET_SSL_TRUSTSTORE_PASSWORD)));
165         } else {
166             options.setPfxTrustOptions(new PfxOptions()
167                     .setPath(System.getProperty(JAVAX_NET_SSL_TRUSTSTORE))
168                     .setPassword(System.getProperty(JAVAX_NET_SSL_TRUSTSTORE_PASSWORD)));
169         }
170         String allowedProtocols = System.getProperty(JDK_TLS_CLIENT_PROTOCOLS, "TLSv1.2");
171         Stream.of(allowedProtocols.split("\\s*,\\s*"))
172                 .forEach(options::addEnabledSecureTransportProtocol);
173 
174         String allowedCiphers = System.getProperty(HTTPS_CIPHERSUITES);
175         if (allowedCiphers != null) {
176             Stream.of(allowedCiphers.split("\\s*,\\s*"))
177                     .forEach(options::addEnabledCipherSuite);
178         }
179     }
180 
181 }