package dk.cloudcreate.essentials.reactive;

import dk.cloudcreate.essentials.shared.Exceptions;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.LockSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.ParallelFlux;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:dk/cloudcreate/essentials/reactive/LocalEventBus.class */
public class LocalEventBus implements EventBus {
    private final Logger log;
    private final String busName;
    private final Scheduler listenerScheduler;
    private final ParallelFlux eventFlux;
    private final Sinks.Many eventSink;
    private final ConcurrentMap<EventHandler, Disposable> asyncSubscribers;
    private final Set<EventHandler> syncSubscribers;
    private final OnErrorHandler onErrorHandler;

    public LocalEventBus(String str, OnErrorHandler onErrorHandler) {
        this(str, Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000, (String) FailFast.requireNonNull(str, "busName was null"), 60, true), onErrorHandler);
    }

    public LocalEventBus(String str) {
        this(str, Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000, (String) FailFast.requireNonNull(str, "busName was null"), 60, true), (Optional<OnErrorHandler>) Optional.empty());
    }

    public LocalEventBus(String str, Optional<OnErrorHandler> optional) {
        this(str, Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000, (String) FailFast.requireNonNull(str, "busName was null"), 60, true), optional);
    }

    public LocalEventBus(String str, int i, Optional<OnErrorHandler> optional) {
        this(str, Schedulers.newBoundedElastic(i, 1000, (String) FailFast.requireNonNull(str, "busName was null"), 60, true), optional);
    }

    public LocalEventBus(String str, int i, OnErrorHandler onErrorHandler) {
        this(str, Schedulers.newBoundedElastic(i, 1000, (String) FailFast.requireNonNull(str, "busName was null"), 60, true), onErrorHandler);
    }

    public LocalEventBus(String str, int i) {
        this(str, Schedulers.newBoundedElastic(i, 1000, (String) FailFast.requireNonNull(str, "busName was null"), 60, true), (Optional<OnErrorHandler>) Optional.empty());
    }

    public LocalEventBus(String str, Scheduler scheduler, Optional<OnErrorHandler> optional) {
        this.busName = (String) FailFast.requireNonNull(str, "busName was null");
        this.listenerScheduler = (Scheduler) FailFast.requireNonNull(scheduler, "asyncSubscribersScheduler is null");
        this.log = LoggerFactory.getLogger("LocalEventBus - " + str);
        this.onErrorHandler = (OnErrorHandler) ((Optional) FailFast.requireNonNull(optional, "onErrorHandler is null")).orElse((eventHandler, obj, exc) -> {
            this.log.error(MessageFormatter.msg("Error for '{}' handling {}", new Object[]{eventHandler, obj}), exc);
        });
        this.eventSink = Sinks.many().multicast().onBackpressureBuffer();
        this.eventFlux = this.eventSink.asFlux().parallel().runOn(this.listenerScheduler);
        this.asyncSubscribers = new ConcurrentHashMap();
        this.syncSubscribers = ConcurrentHashMap.newKeySet();
    }

    public LocalEventBus(String str, Scheduler scheduler, OnErrorHandler onErrorHandler) {
        this(str, scheduler, (Optional<OnErrorHandler>) Optional.of(onErrorHandler));
    }

    @Override // dk.cloudcreate.essentials.reactive.EventBus
    public EventBus publish(Object obj) {
        FailFast.requireNonNull(obj, "No event was supplied");
        this.log.debug("Publishing event of type '{}' to {} sync-subscriber(s)", obj.getClass().getName(), Integer.valueOf(this.syncSubscribers.size()));
        this.syncSubscribers.forEach(eventHandler -> {
            try {
                eventHandler.handle(obj);
            } catch (Exception e) {
                this.log.error(MessageFormatter.msg("Subscriber '{}' failed with exception {}", new Object[]{eventHandler, Exceptions.getStackTrace(e)}), e);
                throw e;
            }
        });
        this.log.debug("Publishing event of type '{}' to {} async-subscriber(s)", obj.getClass().getName(), Integer.valueOf(this.asyncSubscribers.size()));
        if (this.asyncSubscribers.size() > 0) {
            this.eventSink.emitNext(obj, (signalType, emitResult) -> {
                if (Sinks.EmitResult.FAIL_NON_SERIALIZED == emitResult) {
                    LockSupport.parkNanos(100L);
                    return true;
                }
                if (!emitResult.isFailure()) {
                    return false;
                }
                this.log.error("Failed to publish event of type '{}' to {} async-subscriber(s): {}", new Object[]{obj.getClass().getName(), Integer.valueOf(this.asyncSubscribers.size()), emitResult});
                this.onErrorHandler.handle(null, obj, null);
                return false;
            });
        }
        return this;
    }

    @Override // dk.cloudcreate.essentials.reactive.EventBus
    public EventBus addAsyncSubscriber(EventHandler eventHandler) {
        FailFast.requireNonNull(eventHandler, "You must supply a subscriber instance");
        this.log.info("[{}] Adding asynchronous subscriber {}", this.busName, eventHandler);
        this.asyncSubscribers.computeIfAbsent(eventHandler, eventHandler2 -> {
            return this.eventFlux.subscribe(obj -> {
                try {
                    eventHandler.handle(obj);
                } catch (Exception e) {
                    try {
                        this.onErrorHandler.handle(eventHandler, obj, e);
                    } catch (Exception e2) {
                        this.log.error(MessageFormatter.msg("onErrorHandler failed to handle subscriber {} failing to handle exception {}", new Object[]{eventHandler, Exceptions.getStackTrace(e)}), e2);
                    }
                }
            });
        });
        return this;
    }

    @Override // dk.cloudcreate.essentials.reactive.EventBus
    public EventBus removeAsyncSubscriber(EventHandler eventHandler) {
        FailFast.requireNonNull(eventHandler, "You must supply a subscriber instance");
        this.log.info("[{}] Removing asynchronous subscriber {}", this.busName, eventHandler);
        Disposable remove = this.asyncSubscribers.remove(eventHandler);
        if (remove != null) {
            remove.dispose();
        }
        return this;
    }

    @Override // dk.cloudcreate.essentials.reactive.EventBus
    public EventBus addSyncSubscriber(EventHandler eventHandler) {
        FailFast.requireNonNull(eventHandler, "You must supply a subscriber instance");
        this.log.info("[{}] Adding synchronous subscriber {}", this.busName, eventHandler);
        this.syncSubscribers.add(eventHandler);
        return this;
    }

    @Override // dk.cloudcreate.essentials.reactive.EventBus
    public EventBus removeSyncSubscriber(EventHandler eventHandler) {
        FailFast.requireNonNull(eventHandler, "You must supply a subscriber instance");
        this.log.info("[{}] Removing synchronous subscriber {}", this.busName, eventHandler);
        this.syncSubscribers.remove(eventHandler);
        return this;
    }

    @Override // dk.cloudcreate.essentials.reactive.EventBus
    public boolean hasSyncSubscriber(EventHandler eventHandler) {
        return this.syncSubscribers.contains(eventHandler);
    }

    @Override // dk.cloudcreate.essentials.reactive.EventBus
    public boolean hasAsyncSubscriber(EventHandler eventHandler) {
        return this.asyncSubscribers.containsKey(eventHandler);
    }

    public String toString() {
        return "LocalEventBus - " + this.busName;
    }
}
