1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package org.openehealth.ipf.commons.audit.queue;
18
19 import org.openehealth.ipf.commons.audit.AuditContext;
20 import org.openehealth.ipf.commons.audit.AuditException;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
23
24 import java.util.concurrent.CompletableFuture;
25 import java.util.concurrent.ExecutorService;
26 import java.util.concurrent.TimeUnit;
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 public class AsynchronousAuditMessageQueue extends AbstractAuditMessageQueue {
42
43 private static final Logger LOG = LoggerFactory.getLogger(AsynchronousAuditMessageQueue.class);
44
45 private ExecutorService executorService;
46 private int shutdownTimeoutSeconds = 30;
47
48
49
50
51
52
53 public void setExecutorService(ExecutorService executorService) {
54 this.executorService = executorService;
55 }
56
57
58
59
60
61
62 public void setShutdownTimeoutSeconds(int shutdownTimeoutSeconds) {
63 this.shutdownTimeoutSeconds = shutdownTimeoutSeconds;
64 }
65
66 @Override
67 protected void handle(AuditContext auditContext, String... auditRecords) {
68 Runnable runnable = runnable(auditContext, auditRecords);
69 if (executorService != null && !executorService.isShutdown()) {
70 CompletableFuture.runAsync(runnable, executorService)
71 .exceptionally(e -> {
72 auditContext.getAuditExceptionHandler().handleException(auditContext, e, auditRecords);
73 return null;
74 });
75 } else {
76 runnable.run();
77 }
78 }
79
80 private Runnable runnable(AuditContext auditContext, String... auditRecords) {
81 return () -> {
82 try {
83 auditContext.getAuditTransmissionProtocol().send(auditContext, auditRecords);
84 } catch (Exception e) {
85 throw new AuditException(e);
86 }
87 };
88 }
89
90 @Override
91 public void shutdown() {
92 if (executorService != null) {
93 executorService.shutdown();
94 try {
95 if (!executorService.awaitTermination(shutdownTimeoutSeconds, TimeUnit.SECONDS)) {
96 LOG.warn("Timeout occurred when flushing Audit events, some events might have been lost");
97 executorService.shutdownNow();
98 }
99 } catch (InterruptedException e) {
100 LOG.warn("Thread interrupt when flushing ATNA events, some events might have been lost", e);
101 }
102 }
103 }
104
105 }