package dk.cloudcreate.essentials.reactive;

import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:dk/cloudcreate/essentials/reactive/LocalEventBus.class */
public class LocalEventBus<EVENT_TYPE> {
    private final Logger log;
    private final String busName;
    private final Scheduler listenerScheduler;
    private final Flux<EVENT_TYPE> eventFlux;
    private final Sinks.Many<EVENT_TYPE> eventSink;
    private final ConcurrentMap<Consumer<EVENT_TYPE>, Disposable> asyncSubscribers;
    private final Set<Consumer<EVENT_TYPE>> syncSubscribers;
    private final OnErrorHandler<EVENT_TYPE> onErrorHandler;

    public LocalEventBus(String str, int i, OnErrorHandler<EVENT_TYPE> onErrorHandler) {
        this(str, Schedulers.newBoundedElastic(i, 1000, (String) FailFast.requireNonNull(str, "busName was null"), 60, true), onErrorHandler);
    }

    public LocalEventBus(String str, Scheduler scheduler, OnErrorHandler<EVENT_TYPE> onErrorHandler) {
        this.busName = (String) FailFast.requireNonNull(str, "busName was null");
        this.listenerScheduler = (Scheduler) FailFast.requireNonNull(scheduler, "asyncSubscribersScheduler is null");
        this.log = LoggerFactory.getLogger("LocalEventBus - " + str);
        this.onErrorHandler = (OnErrorHandler) FailFast.requireNonNull(onErrorHandler, "onErrorHandler is null");
        this.eventSink = Sinks.many().multicast().onBackpressureBuffer();
        this.eventFlux = this.eventSink.asFlux().publishOn(this.listenerScheduler);
        this.asyncSubscribers = new ConcurrentHashMap();
        this.syncSubscribers = ConcurrentHashMap.newKeySet();
    }

    public LocalEventBus<EVENT_TYPE> publish(EVENT_TYPE event_type) {
        FailFast.requireNonNull(event_type, "No event was supplied");
        this.log.trace("Publishing {} to {} async-subscribers", event_type.getClass().getName(), Integer.valueOf(this.asyncSubscribers.size()));
        if (this.asyncSubscribers.size() > 0) {
            Sinks.EmitResult tryEmitNext = this.eventSink.tryEmitNext(event_type);
            if (tryEmitNext.isFailure()) {
                throw new PublishException(MessageFormatter.msg("Failed to publish event {} to async-subscribers: {}", new Object[]{event_type, tryEmitNext}), event_type);
            }
        }
        this.log.trace("Publishing {} to {} sync-subscribers", event_type.getClass().getName(), Integer.valueOf(this.syncSubscribers.size()));
        this.syncSubscribers.forEach(consumer -> {
            try {
                consumer.accept(event_type);
            } catch (Exception e) {
                try {
                    this.onErrorHandler.handle(consumer, event_type, e);
                } catch (Exception e2) {
                    this.log.error(MessageFormatter.msg("onErrorHandler failed to handle subscriber {} failing to handle exception {}", new Object[]{consumer, Exceptions.getStackTrace(e)}), e2);
                }
            }
        });
        return this;
    }

    public LocalEventBus<EVENT_TYPE> addAsyncSubscriber(Consumer<EVENT_TYPE> consumer) {
        FailFast.requireNonNull(consumer, "You must supply a subscriber instance");
        this.asyncSubscribers.computeIfAbsent(consumer, consumer2 -> {
            return this.eventFlux.subscribe(obj -> {
                try {
                    consumer.accept(obj);
                } catch (Exception e) {
                    try {
                        this.onErrorHandler.handle(consumer, obj, e);
                    } catch (Exception e2) {
                        this.log.error(MessageFormatter.msg("onErrorHandler failed to handle subscriber {} failing to handle exception {}", new Object[]{consumer, Exceptions.getStackTrace(e)}), e2);
                    }
                }
            });
        });
        return this;
    }

    public LocalEventBus<EVENT_TYPE> removeAsyncSubscriber(Consumer<EVENT_TYPE> consumer) {
        FailFast.requireNonNull(consumer, "You must supply a subscriber instance");
        Disposable remove = this.asyncSubscribers.remove(consumer);
        if (remove != null) {
            remove.dispose();
        }
        return this;
    }

    public LocalEventBus<EVENT_TYPE> addSyncSubscriber(Consumer<EVENT_TYPE> consumer) {
        FailFast.requireNonNull(consumer, "You must supply a subscriber instance");
        this.syncSubscribers.add(consumer);
        return this;
    }

    public LocalEventBus<EVENT_TYPE> removeSyncSubscriber(Consumer<EVENT_TYPE> consumer) {
        FailFast.requireNonNull(consumer, "You must supply a subscriber instance");
        this.syncSubscribers.remove(consumer);
        return this;
    }

    public String toString() {
        return "LocalEventBus - " + this.busName;
    }
}
