package io.airlift.log;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultiset;
import com.google.common.collect.Multiset;
import com.google.common.collect.Multisets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;
import io.airlift.concurrent.Threads;
import jakarta.annotation.Nullable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.IntFunction;
import java.util.logging.ErrorManager;
import java.util.logging.Formatter;
import java.util.logging.Level;
import java.util.logging.LogRecord;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.IteratorAssert;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:io/airlift/log/TestBufferedHandler.class */
public class TestBufferedHandler {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/airlift/log/TestBufferedHandler$EntryOrDropSummary.class */
    public static final class EntryOrDropSummary extends Record {

        @Nullable
        private final LogEntry entry;

        @Nullable
        private final Multiset<String> dropSummary;

        private EntryOrDropSummary(@Nullable LogEntry logEntry, @Nullable Multiset<String> multiset) {
            Preconditions.checkArgument((logEntry == null) != (multiset == null), "Exactly one of the values must be non-null");
            ImmutableMultiset copyOf = multiset == null ? null : ImmutableMultiset.copyOf(multiset);
            this.entry = logEntry;
            this.dropSummary = copyOf;
        }

        public static EntryOrDropSummary forEntry(LogEntry logEntry) {
            return new EntryOrDropSummary(logEntry, null);
        }

        public static EntryOrDropSummary forDropSummary(Multiset<String> multiset) {
            return new EntryOrDropSummary(null, multiset);
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, EntryOrDropSummary.class), EntryOrDropSummary.class, "entry;dropSummary", "FIELD:Lio/airlift/log/TestBufferedHandler$EntryOrDropSummary;->entry:Lio/airlift/log/TestBufferedHandler$LogEntry;", "FIELD:Lio/airlift/log/TestBufferedHandler$EntryOrDropSummary;->dropSummary:Lcom/google/common/collect/Multiset;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, EntryOrDropSummary.class), EntryOrDropSummary.class, "entry;dropSummary", "FIELD:Lio/airlift/log/TestBufferedHandler$EntryOrDropSummary;->entry:Lio/airlift/log/TestBufferedHandler$LogEntry;", "FIELD:Lio/airlift/log/TestBufferedHandler$EntryOrDropSummary;->dropSummary:Lcom/google/common/collect/Multiset;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, EntryOrDropSummary.class, Object.class), EntryOrDropSummary.class, "entry;dropSummary", "FIELD:Lio/airlift/log/TestBufferedHandler$EntryOrDropSummary;->entry:Lio/airlift/log/TestBufferedHandler$LogEntry;", "FIELD:Lio/airlift/log/TestBufferedHandler$EntryOrDropSummary;->dropSummary:Lcom/google/common/collect/Multiset;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        @Nullable
        public LogEntry entry() {
            return this.entry;
        }

        @Nullable
        public Multiset<String> dropSummary() {
            return this.dropSummary;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/airlift/log/TestBufferedHandler$LogEntry.class */
    public static final class LogEntry extends Record {
        private final String loggerName;
        private final String message;

        private LogEntry(String str, String str2) {
            Objects.requireNonNull(str, "loggerName is null");
            Objects.requireNonNull(str2, "message is null");
            this.loggerName = str;
            this.message = str2;
        }

        public String serialize() {
            return Joiner.on(':').join(this.loggerName, this.message, new Object[0]);
        }

        public static LogEntry deserialize(String str) {
            List splitToList = Splitter.on(':').splitToList(str);
            return new LogEntry((String) splitToList.get(0), (String) splitToList.get(1));
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, LogEntry.class), LogEntry.class, "loggerName;message", "FIELD:Lio/airlift/log/TestBufferedHandler$LogEntry;->loggerName:Ljava/lang/String;", "FIELD:Lio/airlift/log/TestBufferedHandler$LogEntry;->message:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, LogEntry.class), LogEntry.class, "loggerName;message", "FIELD:Lio/airlift/log/TestBufferedHandler$LogEntry;->loggerName:Ljava/lang/String;", "FIELD:Lio/airlift/log/TestBufferedHandler$LogEntry;->message:Ljava/lang/String;").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, LogEntry.class, Object.class), LogEntry.class, "loggerName;message", "FIELD:Lio/airlift/log/TestBufferedHandler$LogEntry;->loggerName:Ljava/lang/String;", "FIELD:Lio/airlift/log/TestBufferedHandler$LogEntry;->message:Ljava/lang/String;").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public String loggerName() {
            return this.loggerName;
        }

        public String message() {
            return this.message;
        }
    }

    /* loaded from: input_file:io/airlift/log/TestBufferedHandler$TestingMessageOutput.class */
    private static class TestingMessageOutput implements MessageOutput {
        private final AtomicBoolean closed = new AtomicBoolean();
        private final Queue<String> writeMessages = new LinkedBlockingQueue();
        private final Queue<String> flushedMessages = new LinkedBlockingQueue();
        private final BlockingQueue<CountDownLatch> nextWriteAttemptLatches = new LinkedBlockingQueue();
        private final CountDownLatch firstWriteAttemptLatch = new CountDownLatch(1);
        private final CountDownLatch firstFlushAttemptLatch = new CountDownLatch(1);
        private final AtomicBoolean throwOnWrite = new AtomicBoolean();
        private final AtomicBoolean throwOnFlush = new AtomicBoolean();

        private TestingMessageOutput() {
        }

        public TestingMessageOutput setThrowOnWrite(boolean z) {
            this.throwOnWrite.set(z);
            return this;
        }

        public TestingMessageOutput setThrowOnFlush(boolean z) {
            this.throwOnFlush.set(z);
            return this;
        }

        public CountDownLatch getNextWriteAttemptLatch() {
            CountDownLatch countDownLatch = new CountDownLatch(1);
            this.nextWriteAttemptLatches.add(countDownLatch);
            return countDownLatch;
        }

        public void awaitFirstWriteAttempt(long j, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
            if (!this.firstWriteAttemptLatch.await(j, timeUnit)) {
                throw new TimeoutException();
            }
        }

        public void awaitFirstFlushAttempt(long j, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
            if (!this.firstFlushAttemptLatch.await(j, timeUnit)) {
                throw new TimeoutException();
            }
        }

        public List<String> getFlushedMessages() {
            return ImmutableList.copyOf(this.flushedMessages);
        }

        private static void signal(BlockingQueue<CountDownLatch> blockingQueue) {
            ArrayList arrayList = new ArrayList();
            blockingQueue.drainTo(arrayList);
            arrayList.forEach((v0) -> {
                v0.countDown();
            });
        }

        public void writeMessage(byte[] bArr) {
            try {
                Preconditions.checkState(!this.closed.get(), "Already closed");
                if (this.throwOnWrite.get()) {
                    throw new RuntimeException();
                }
                this.writeMessages.offer(new String(bArr, StandardCharsets.UTF_8));
            } finally {
                this.firstWriteAttemptLatch.countDown();
                signal(this.nextWriteAttemptLatches);
            }
        }

        public void flush() {
            try {
                Preconditions.checkState(!this.closed.get(), "Already closed");
                if (this.throwOnFlush.get()) {
                    throw new RuntimeException();
                }
                flushInternal();
            } finally {
                this.firstFlushAttemptLatch.countDown();
            }
        }

        public void close() {
            if (this.closed.compareAndSet(false, true)) {
                flushInternal();
            }
        }

        private void flushInternal() {
            while (true) {
                String poll = this.writeMessages.poll();
                if (poll == null) {
                    return;
                } else {
                    this.flushedMessages.add(poll);
                }
            }
        }
    }

    @Test
    public void testIdleFlush() throws Exception {
        TestingMessageOutput testingMessageOutput = new TestingMessageOutput();
        BufferedHandler bufferedHandler = new BufferedHandler(testingMessageOutput, testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create(10.0d), Duration.ofSeconds(5L), 100, 100);
        bufferedHandler.initialize();
        LogRecord logRecord = logRecord(Level.INFO, "TestLogger", "Test");
        bufferedHandler.publish(logRecord);
        testingMessageOutput.awaitFirstFlushAttempt(5L, TimeUnit.SECONDS);
        Assertions.assertThat(testingMessageOutput.getFlushedMessages()).as("Buffer should flush when idle, even if still less than recordFlushCount", new Object[0]).containsExactly(new String[]{testingFormatter().format(logRecord)});
        bufferedHandler.close();
    }

    @Test
    public void testLoggingSequence() {
        TestingMessageOutput testingMessageOutput = new TestingMessageOutput();
        BufferedHandler bufferedHandler = new BufferedHandler(testingMessageOutput, testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create(10.0d), Duration.ofSeconds(5L), 100, 100);
        bufferedHandler.initialize();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100 - 1; i++) {
            LogRecord logRecord = logRecord(Level.INFO, "TestLogger", String.valueOf(i));
            arrayList.add(logRecord);
            bufferedHandler.publish(logRecord);
        }
        bufferedHandler.close();
        Assertions.assertThat(testingMessageOutput.getFlushedMessages()).as("Every record should be present if the buffer size is not exceeded", new Object[0]).containsExactlyElementsOf((Iterable) arrayList.stream().map(logRecord2 -> {
            return testingFormatter().format(logRecord2);
        }).collect(ImmutableList.toImmutableList()));
    }

    @Test
    public void testLoggingOverloadSingleThread() {
        TestingMessageOutput testingMessageOutput = new TestingMessageOutput();
        BufferedHandler bufferedHandler = new BufferedHandler(testingMessageOutput, testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create(10.0d), Duration.ofSeconds(5L), 2, 2);
        bufferedHandler.initialize();
        for (int i = 0; i < 1000; i++) {
            bufferedHandler.publish(logRecord(Level.INFO, "TestLogger", String.valueOf(i)));
        }
        bufferedHandler.close();
        Assertions.assertThat(bufferedHandler.getDroppedMessages()).as("Test control check to make sure that it is dropping some messages", new Object[0]).isGreaterThan(0L);
        Assertions.assertThat(assertLogStreamContents(testingMessageOutput.getFlushedMessages(), "TestLogger", 1000, String::valueOf)).isEqualTo(bufferedHandler.getDroppedMessages());
    }

    @Test
    public void testLoggingOverloadMultiThread() {
        TestingMessageOutput testingMessageOutput = new TestingMessageOutput();
        BufferedHandler bufferedHandler = new BufferedHandler(testingMessageOutput, testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create(10.0d), Duration.ofSeconds(5L), 2, 2);
        bufferedHandler.initialize();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(Threads.daemonThreadsNamed("submitter-%s"));
        newCachedThreadPool.execute(() -> {
            for (int i = 0; i < 1000; i++) {
                bufferedHandler.publish(logRecord(Level.INFO, "A-TestLogger", String.valueOf(i)));
            }
        });
        newCachedThreadPool.execute(() -> {
            for (int i = 0; i < 1000; i++) {
                bufferedHandler.publish(logRecord(Level.INFO, "B-TestLogger", String.valueOf(i)));
            }
        });
        Assertions.assertThat(MoreExecutors.shutdownAndAwaitTermination(newCachedThreadPool, 10L, TimeUnit.SECONDS)).isTrue();
        bufferedHandler.close();
        Assertions.assertThat(bufferedHandler.getDroppedMessages()).as("Test control check to make sure that it is dropping some messages", new Object[0]).isGreaterThan(0L);
        Assertions.assertThat(assertLogStreamContents(testingMessageOutput.getFlushedMessages(), "A-TestLogger", 1000, String::valueOf) + assertLogStreamContents(testingMessageOutput.getFlushedMessages(), "B-TestLogger", 1000, String::valueOf)).isEqualTo(bufferedHandler.getDroppedMessages());
    }

    @Test
    public void testMultiThreadErrorRetry() throws InterruptedException, TimeoutException {
        TestingMessageOutput throwOnFlush = new TestingMessageOutput().setThrowOnWrite(true).setThrowOnFlush(true);
        BufferedHandler bufferedHandler = new BufferedHandler(throwOnFlush, testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create(10.0d), Duration.ofSeconds(5L), 50, 100);
        bufferedHandler.initialize();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(Threads.daemonThreadsNamed("submitter-%s"));
        newCachedThreadPool.execute(() -> {
            for (int i = 0; i < 1000; i++) {
                bufferedHandler.publish(logRecord(Level.INFO, "A-TestLogger", String.valueOf(i)));
            }
        });
        newCachedThreadPool.execute(() -> {
            for (int i = 0; i < 1000; i++) {
                bufferedHandler.publish(logRecord(Level.INFO, "B-TestLogger", String.valueOf(i)));
            }
        });
        throwOnFlush.awaitFirstWriteAttempt(5L, TimeUnit.SECONDS);
        throwOnFlush.setThrowOnWrite(false);
        throwOnFlush.awaitFirstFlushAttempt(5L, TimeUnit.SECONDS);
        throwOnFlush.setThrowOnFlush(false);
        Assertions.assertThat(MoreExecutors.shutdownAndAwaitTermination(newCachedThreadPool, 10L, TimeUnit.SECONDS)).isTrue();
        bufferedHandler.close();
        Assertions.assertThat(assertLogStreamContents(throwOnFlush.getFlushedMessages(), "A-TestLogger", 1000, String::valueOf) + assertLogStreamContents(throwOnFlush.getFlushedMessages(), "B-TestLogger", 1000, String::valueOf)).isEqualTo(bufferedHandler.getDroppedMessages());
    }

    @Test
    public void testCapacityErrorRetryDuringClose() throws InterruptedException, TimeoutException, ExecutionException {
        TestingMessageOutput throwOnWrite = new TestingMessageOutput().setThrowOnWrite(true);
        BufferedHandler bufferedHandler = new BufferedHandler(throwOnWrite, testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create(10.0d), Duration.ofSeconds(5L), 10, 1);
        bufferedHandler.publish(logRecord(Level.INFO, "A-TestLogger", "1"));
        bufferedHandler.publish(logRecord(Level.INFO, "B-TestLogger", "1"));
        bufferedHandler.initialize();
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool(Threads.daemonThreadsNamed("submitter-%s"));
        Future<?> submit = newCachedThreadPool.submit(() -> {
            for (int i = 0; i < 100; i++) {
                bufferedHandler.publish(logRecord(Level.INFO, "Spam-TestLogger", String.valueOf(i)));
            }
        });
        Future<?> submit2 = newCachedThreadPool.submit(() -> {
            for (int i = 0; i < 100; i++) {
                bufferedHandler.publish(logRecord(Level.INFO, "Spam-TestLogger", String.valueOf(i)));
            }
        });
        Objects.requireNonNull(bufferedHandler);
        Future<?> submit3 = newCachedThreadPool.submit(bufferedHandler::close);
        while (!bufferedHandler.isTerminalMessageDequeued()) {
            Uninterruptibles.sleepUninterruptibly(20L, TimeUnit.MILLISECONDS);
        }
        throwOnWrite.getNextWriteAttemptLatch().await(5L, TimeUnit.SECONDS);
        throwOnWrite.getNextWriteAttemptLatch().await(5L, TimeUnit.SECONDS);
        throwOnWrite.setThrowOnWrite(false);
        submit.get(5L, TimeUnit.SECONDS);
        submit2.get(5L, TimeUnit.SECONDS);
        submit3.get(5L, TimeUnit.SECONDS);
        assertLogStreamContents(throwOnWrite.getFlushedMessages(), "A-TestLogger", 1, String::valueOf);
        assertLogStreamContents(throwOnWrite.getFlushedMessages(), "B-TestLogger", 1, String::valueOf);
    }

    @Test
    public void testIgnoreWriteAfterClose() {
        TestingMessageOutput testingMessageOutput = new TestingMessageOutput();
        BufferedHandler bufferedHandler = new BufferedHandler(testingMessageOutput, testingFormatter(), TestBufferedHandler::serializeMultiset, new ErrorManager(), RateLimiter.create(10.0d), Duration.ofSeconds(5L), 2, 2);
        bufferedHandler.initialize();
        LogRecord logRecord = logRecord(Level.INFO, "TestLogger", "Test message");
        bufferedHandler.publish(logRecord);
        bufferedHandler.close();
        bufferedHandler.publish(logRecord(Level.INFO, "TestLogger", "Test message after close"));
        Assertions.assertThat(bufferedHandler.getDroppedMessages()).as("Messages after close are ignored and not counted as dropped", new Object[0]).isZero();
        Assertions.assertThat(testingMessageOutput.getFlushedMessages()).containsExactly(new String[]{testingFormatter().format(logRecord)});
    }

    private static Formatter testingFormatter() {
        return new Formatter() { // from class: io.airlift.log.TestBufferedHandler.1
            @Override // java.util.logging.Formatter
            public String format(LogRecord logRecord) {
                return new LogEntry(logRecord.getLoggerName(), logRecord.getMessage()).serialize();
            }
        };
    }

    private static LogRecord logRecord(Level level, String str, String str2) {
        LogRecord logRecord = new LogRecord(level, str2);
        logRecord.setLoggerName(str);
        return logRecord;
    }

    private static int assertLogStreamContents(List<String> list, String str, int i, IntFunction<String> intFunction) {
        int i2 = 0;
        Objects.requireNonNull(str);
        Iterator<EntryOrDropSummary> it = deserializeMessages(list, (v1) -> {
            return r1.equals(v1);
        }).iterator();
        HashMultiset create = HashMultiset.create();
        for (int i3 = 0; i3 < i; i3++) {
            if (create.isEmpty()) {
                ((IteratorAssert) Assertions.assertThat(it).as("More entries expected in the result", new Object[0])).hasNext();
                EntryOrDropSummary next = it.next();
                if (next.entry() != null) {
                    ((AbstractStringAssert) Assertions.assertThat(next.entry().message()).as("Verify that the message contents match the value sequence", new Object[0])).isEqualTo(intFunction.apply(i3));
                } else {
                    create = HashMultiset.create(next.dropSummary());
                }
            }
            ((AbstractBooleanAssert) Assertions.assertThat(create.remove(str)).as("Expected to contain this logger name next in sequence", new Object[0])).isTrue();
            i2++;
        }
        Assertions.assertThat(create).as("Should not have more drop summary entries than total submitted", new Object[0]).isEmpty();
        return i2;
    }

    private static <T> Map<T, Integer> toMap(Multiset<T> multiset) {
        return (Map) multiset.entrySet().stream().collect(ImmutableMap.toImmutableMap((v0) -> {
            return v0.getElement();
        }, (v0) -> {
            return v0.getCount();
        }));
    }

    private static String serializeMultiset(Multiset<String> multiset) {
        return Joiner.on('\n').withKeyValueSeparator('=').join(toMap(multiset));
    }

    private static Multiset<String> deserializeMultiset(String str) {
        Map split = Splitter.on('\n').withKeyValueSeparator('=').split(str);
        ImmutableMultiset.Builder builder = ImmutableMultiset.builder();
        for (Map.Entry entry : split.entrySet()) {
            builder.addCopies((String) entry.getKey(), Integer.parseInt((String) entry.getValue()));
        }
        return builder.build();
    }

    private static List<EntryOrDropSummary> deserializeMessages(List<String> list, Predicate<String> predicate) {
        return (List) list.stream().map(str -> {
            LogEntry deserialize = LogEntry.deserialize(str);
            return deserialize.loggerName().equals(BufferedHandler.class.getName()) ? EntryOrDropSummary.forDropSummary(Multisets.filter(deserializeMultiset(deserialize.message()), predicate)) : EntryOrDropSummary.forEntry(deserialize);
        }).filter(entryOrDropSummary -> {
            return (entryOrDropSummary.entry() == null && entryOrDropSummary.dropSummary().isEmpty()) ? false : true;
        }).filter(entryOrDropSummary2 -> {
            return entryOrDropSummary2.dropSummary() != null || predicate.apply(entryOrDropSummary2.entry().loggerName());
        }).collect(ImmutableList.toImmutableList());
    }
}
