package tech.picnic.rx;

import com.google.common.util.concurrent.MoreExecutors;
import io.reactivex.Observable;
import io.reactivex.Scheduler;
import io.reactivex.functions.Function;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.TestScheduler;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

@Test(singleThreaded = true)
/* loaded from: input_file:tech/picnic/rx/PicnicRxPluginsTest.class */
public final class PicnicRxPluginsTest {
    private final ConcurrentMap<Context, AtomicInteger> verificationCounters = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:tech/picnic/rx/PicnicRxPluginsTest$Context.class */
    public static final class Context {
        private static final ThreadLocal<String> threadLocalContext = new ThreadLocal<>();
        private final Optional<String> token;

        private Context(Optional<String> optional) {
            this.token = optional;
        }

        static Context createEmpty() {
            return new Context(Optional.empty());
        }

        static Context createRandom() {
            return new Context(Optional.of(UUID.randomUUID().toString()));
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Context applyToCurrentThread() {
            if (this.token.isPresent()) {
                threadLocalContext.set(this.token.get());
            } else {
                threadLocalContext.remove();
            }
            return this;
        }

        void verifyCurrentThread() {
            Assert.assertEquals(threadLocalContext.get(), this.token.orElse(null));
        }

        static RxThreadLocal<String> createRxThreadLocal() {
            return RxThreadLocal.from(threadLocalContext);
        }
    }

    @BeforeClass
    void init() {
        PicnicRxPlugins.configureContextPropagation(new RxThreadLocal[]{Context.createRxThreadLocal()});
    }

    @AfterClass
    void clean() {
        PicnicRxPlugins.unsetContextPropagation();
    }

    public void testPropagate() {
        Observable just = Observable.just(1, 2, 3);
        Context applyToCurrentThread = Context.createRandom().applyToCurrentThread();
        RxJavaPlugins.setScheduleHandler((Function) null);
        Context createEmpty = Context.createEmpty();
        just.subscribeOn(Schedulers.io()).doOnNext(num -> {
            verifyActive(createEmpty);
        }).ignoreElements().blockingAwait();
        verifyVerificationCounter(createEmpty, 3);
        init();
        just.subscribeOn(Schedulers.io()).doOnNext(num2 -> {
            verifyActive(applyToCurrentThread);
        }).ignoreElements().blockingAwait();
        verifyVerificationCounter(applyToCurrentThread, 3);
    }

    public void testObserveOnAnotherScheduler() {
        Observable just = Observable.just(1, 2, 3);
        Context applyToCurrentThread = Context.createRandom().applyToCurrentThread();
        just.subscribeOn(Schedulers.io()).doOnNext(num -> {
            verifyActive(applyToCurrentThread);
        }).observeOn(Schedulers.io()).doOnNext(num2 -> {
            verifyActive(applyToCurrentThread);
        }).ignoreElements().blockingAwait();
        verifyVerificationCounter(applyToCurrentThread, 6);
    }

    public void testPropagateOnMultipleSchedulers() {
        Observable just = Observable.just(1, 2, 3);
        Context applyToCurrentThread = Context.createRandom().applyToCurrentThread();
        just.flatMap(num -> {
            return Observable.just(num).subscribeOn(Schedulers.io());
        }).doOnNext(num2 -> {
            verifyActive(applyToCurrentThread);
        }).flatMap(num3 -> {
            return Observable.just(num3).subscribeOn(Schedulers.computation());
        }).doOnNext(num4 -> {
            verifyActive(applyToCurrentThread);
        }).ignoreElements().blockingAwait();
        verifyVerificationCounter(applyToCurrentThread, 6);
    }

    public void testNotLeaking() throws InterruptedException {
        Observable just = Observable.just(1, 2, 3);
        Scheduler single = Schedulers.single();
        Context applyToCurrentThread = Context.createRandom().applyToCurrentThread();
        just.subscribeOn(single).doOnNext(num -> {
            verifyActive(applyToCurrentThread);
        }).ignoreElements().blockingAwait();
        verifyVerificationCounter(applyToCurrentThread, 3);
        Context applyToCurrentThread2 = Context.createRandom().applyToCurrentThread();
        just.subscribeOn(single).doOnNext(num2 -> {
            verifyActive(applyToCurrentThread2);
        }).ignoreElements().blockingAwait();
        verifyVerificationCounter(applyToCurrentThread2, 3);
        Context createEmpty = Context.createEmpty();
        Thread thread = new Thread(() -> {
            just.subscribeOn(single).doOnNext(num3 -> {
                verifyActive(createEmpty);
            }).ignoreElements().blockingAwait();
        });
        thread.start();
        thread.join();
        verifyVerificationCounter(createEmpty, 3);
        Context applyToCurrentThread3 = Context.createEmpty().applyToCurrentThread();
        just.subscribeOn(single).doOnNext(num3 -> {
            verifyActive(applyToCurrentThread3);
        }).ignoreElements().blockingAwait();
        verifyVerificationCounter(applyToCurrentThread3, 3);
    }

    public void testContextSwitchBeforeConsumption() {
        Observable just = Observable.just(1, 2, 3);
        Context applyToCurrentThread = Context.createRandom().applyToCurrentThread();
        Observable subscribeOn = just.subscribeOn(Schedulers.single());
        Context applyToCurrentThread2 = Context.createRandom().applyToCurrentThread();
        subscribeOn.doOnNext(num -> {
            verifyActive(applyToCurrentThread2);
        }).ignoreElements().blockingAwait();
        verifyVerificationCounter(applyToCurrentThread, 0);
        verifyVerificationCounter(applyToCurrentThread2, 3);
    }

    public void testPropagationOnSameThread() {
        Observable just = Observable.just(1, 2, 3);
        Context applyToCurrentThread = Context.createRandom().applyToCurrentThread();
        just.subscribeOn(Schedulers.from(MoreExecutors.directExecutor())).doOnNext(num -> {
            verifyActive(applyToCurrentThread);
        }).ignoreElements().blockingAwait();
        verifyVerificationCounter(applyToCurrentThread, 3);
        verifyActive(applyToCurrentThread);
    }

    public void testEmptyContextIsPropagated() {
        Observable just = Observable.just(1, 2, 3);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            newSingleThreadExecutor.execute(() -> {
                Context.createRandom().applyToCurrentThread();
            });
            Context applyToCurrentThread = Context.createEmpty().applyToCurrentThread();
            just.subscribeOn(Schedulers.from(newSingleThreadExecutor)).doOnNext(num -> {
                verifyActive(applyToCurrentThread);
            }).ignoreElements().blockingAwait();
            verifyVerificationCounter(applyToCurrentThread, 3);
            verifyActive(applyToCurrentThread);
            newSingleThreadExecutor.shutdownNow();
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    public void testTestSchedulerContextPropagation() throws InterruptedException {
        Context applyToCurrentThread = Context.createRandom().applyToCurrentThread();
        TestScheduler testScheduler = new TestScheduler();
        TestObserver subscribeWith = Observable.just(1, 2, 3).subscribeOn(testScheduler).doOnNext(num -> {
            verifyActive(applyToCurrentThread);
        }).subscribeWith(TestObserver.create());
        testScheduler.advanceTimeBy(2L, TimeUnit.SECONDS);
        subscribeWith.await();
        verifyVerificationCounter(applyToCurrentThread, 3);
    }

    public void testDirectExecutor() {
        Context applyToCurrentThread = Context.createRandom().applyToCurrentThread();
        TestScheduler testScheduler = new TestScheduler();
        testScheduler.createWorker().schedule(() -> {
            verifyActive(applyToCurrentThread);
        });
        testScheduler.triggerActions();
        verifyVerificationCounter(applyToCurrentThread, 1);
    }

    private void verifyActive(Context context) {
        context.verifyCurrentThread();
        this.verificationCounters.computeIfAbsent(context, context2 -> {
            return new AtomicInteger();
        }).incrementAndGet();
    }

    private void verifyVerificationCounter(Context context, int i) {
        Assert.assertEquals(this.verificationCounters.getOrDefault(context, new AtomicInteger()).intValue(), i);
    }
}
