package io.reacted.streams;

import io.reacted.core.config.reactors.ReActorConfig;
import io.reacted.core.drivers.system.RemotingDriver;
import io.reacted.core.mailboxes.BackpressuringMbox;
import io.reacted.core.mailboxes.BasicMbox;
import io.reacted.core.messages.SerializationUtils;
import io.reacted.core.messages.reactors.ReActorInit;
import io.reacted.core.messages.reactors.ReActorStop;
import io.reacted.core.reactors.ReActions;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.reactorsystem.ReActorSystem;
import io.reacted.core.typedsubscriptions.TypedSubscription;
import io.reacted.core.utils.ObjectUtils;
import io.reacted.core.utils.ReActedUtils;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.streams.messages.PublisherComplete;
import io.reacted.streams.messages.PublisherInterrupt;
import io.reacted.streams.messages.PublisherShutdown;
import io.reacted.streams.messages.SubscriptionReply;
import io.reacted.streams.messages.SubscriptionRequest;
import io.reacted.streams.messages.UnsubscriptionRequest;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

@NonNullByDefault
/* loaded from: input_file:io/reacted/streams/ReactedSubmissionPublisher.class */
public class ReactedSubmissionPublisher<PayloadT extends Serializable> implements Flow.Publisher<PayloadT>, AutoCloseable, Externalizable {
    public static final Duration RELIABLE_SUBSCRIPTION = BackpressuringMbox.RELIABLE_DELIVERY_TIMEOUT;
    public static final Duration BEST_EFFORT_SUBSCRIPTION = BackpressuringMbox.BEST_EFFORT_TIMEOUT;
    private static final long FEED_GATE_OFFSET = ((Long) SerializationUtils.getFieldOffset(ReactedSubmissionPublisher.class, "feedGate").orElseSneakyThrow()).longValue();
    private static final long LOCAL_REACTOR_SYSTEM = ((Long) SerializationUtils.getFieldOffset(ReactedSubmissionPublisher.class, "localReActorSystem").orElseSneakyThrow()).longValue();
    private final transient Set<ReActorRef> subscribers;
    private final transient ReActorSystem localReActorSystem;
    private final ReActorRef feedGate;

    /* loaded from: input_file:io/reacted/streams/ReactedSubmissionPublisher$ReActedSubscription.class */
    public static final class ReActedSubscription<PayloadT> {

        @Nullable
        public static final ThreadPoolExecutor NO_CUSTOM_SEQUENCER = null;
        private final Flow.Subscriber<? super PayloadT> subscriber;
        private final int bufferSize;
        private final Duration backpressureTimeout;
        private final Executor asyncBackpressurer;
        private final String subscriberName;

        @Nullable
        private final ThreadPoolExecutor sequencer;

        /* loaded from: input_file:io/reacted/streams/ReactedSubmissionPublisher$ReActedSubscription$Builder.class */
        public static final class Builder<PayloadT> {
            private Flow.Subscriber<? super PayloadT> subscriber;
            private int bufferSize;
            private Duration backpressureTimeout;
            private Executor asyncBackpressurer;
            private String subscriberName;

            @Nullable
            private ThreadPoolExecutor sequencer = ReActedSubscription.NO_CUSTOM_SEQUENCER;

            private Builder() {
            }

            public Builder<PayloadT> setSubscriber(Flow.Subscriber<? super PayloadT> subscriber) {
                this.subscriber = subscriber;
                return this;
            }

            public Builder<PayloadT> setBufferSize(int i) {
                this.bufferSize = i;
                return this;
            }

            public Builder<PayloadT> setBackpressureTimeout(Duration duration) {
                this.backpressureTimeout = duration;
                return this;
            }

            public Builder<PayloadT> setAsyncBackpressurer(Executor executor) {
                this.asyncBackpressurer = executor;
                return this;
            }

            public Builder<PayloadT> setSubscriberName(String str) {
                this.subscriberName = str;
                return this;
            }

            public Builder<PayloadT> setSequencer(@Nullable ThreadPoolExecutor threadPoolExecutor) {
                this.sequencer = threadPoolExecutor;
                return this;
            }

            public ReActedSubscription<PayloadT> build() {
                return new ReActedSubscription<>(this);
            }
        }

        private ReActedSubscription(Builder<PayloadT> builder) {
            this.subscriber = (Flow.Subscriber) Objects.requireNonNull(((Builder) builder).subscriber);
            this.bufferSize = ((Integer) ObjectUtils.requiredInRange(Integer.valueOf(((Builder) builder).bufferSize), 1, Integer.MAX_VALUE, IllegalArgumentException::new)).intValue();
            this.backpressureTimeout = (Duration) ObjectUtils.requiredCondition((Duration) Objects.requireNonNull(((Builder) builder).backpressureTimeout), duration -> {
                return duration.compareTo(ReactedSubmissionPublisher.RELIABLE_SUBSCRIPTION) <= 0 && !duration.isNegative();
            }, IllegalArgumentException::new);
            this.asyncBackpressurer = (Executor) Objects.requireNonNull(((Builder) builder).asyncBackpressurer);
            this.subscriberName = (String) Objects.requireNonNull(((Builder) builder).subscriberName);
            this.sequencer = ((Builder) builder).sequencer != null ? (ThreadPoolExecutor) ObjectUtils.requiredCondition(((Builder) builder).sequencer, threadPoolExecutor -> {
                return threadPoolExecutor.getMaximumPoolSize() == 1;
            }, IllegalArgumentException::new) : null;
        }

        public Flow.Subscriber<? super PayloadT> getSubscriber() {
            return this.subscriber;
        }

        public int getBufferSize() {
            return this.bufferSize;
        }

        public Duration getBackpressureTimeout() {
            return this.backpressureTimeout;
        }

        public Executor getAsyncBackpressurer() {
            return this.asyncBackpressurer;
        }

        public String getSubscriberName() {
            return this.subscriberName;
        }

        @Nullable
        public ThreadPoolExecutor getSequencer() {
            return this.sequencer;
        }

        public static <PayloadT> Builder<PayloadT> newBuilder() {
            return new Builder<>();
        }
    }

    public ReactedSubmissionPublisher(ReActorSystem reActorSystem, String str) {
        this.localReActorSystem = (ReActorSystem) Objects.requireNonNull(reActorSystem);
        this.subscribers = ConcurrentHashMap.newKeySet(10);
        this.feedGate = (ReActorRef) reActorSystem.spawn(ReActions.newBuilder().reAct(ReActorInit.class, (v0, v1) -> {
            ReActions.noReAction(v0, v1);
        }).reAct(PublisherShutdown.class, (reActorContext, publisherShutdown) -> {
            reActorContext.stop();
        }).reAct(PublisherInterrupt.class, this::onInterrupt).reAct(ReActorStop.class, this::onStop).reAct(SubscriptionRequest.class, this::onSubscriptionRequest).reAct(UnsubscriptionRequest.class, this::onUnSubscriptionRequest).build(), ((ReActorConfig.Builder) ((ReActorConfig.Builder) ReActorConfig.newBuilder().setReActorName(ReactedSubmissionPublisher.class.getSimpleName() + "-" + ((String) Objects.requireNonNull(str)))).setMailBoxProvider(reActorContext2 -> {
            return new BasicMbox();
        })).build()).orElseThrow(IllegalArgumentException::new);
    }

    public ReactedSubmissionPublisher() {
        this.subscribers = Set.of();
        this.feedGate = ReActorRef.NO_REACTOR_REF;
        this.localReActorSystem = null;
    }

    @Override // java.io.Externalizable
    public void writeExternal(ObjectOutput objectOutput) throws IOException {
        objectOutput.writeObject(this.feedGate);
    }

    @Override // java.io.Externalizable
    public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
        ReActorRef reActorRef = new ReActorRef();
        reActorRef.readExternal(objectInput);
        setFeedGate(reActorRef).setLocalReActorSystem((ReActorSystem) RemotingDriver.getDriverCtx().map((v0) -> {
            return v0.getLocalReActorSystem();
        }).orElseThrow());
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.feedGate.tell(this.feedGate, new PublisherShutdown());
    }

    public void interrupt() {
        this.feedGate.tell(this.feedGate, new PublisherInterrupt());
    }

    @Override // java.util.concurrent.Flow.Publisher
    public void subscribe(Flow.Subscriber<? super PayloadT> subscriber) {
        subscribe(subscriber, UUID.randomUUID().toString());
    }

    public CompletionStage<Void> subscribe(Flow.Subscriber<? super PayloadT> subscriber, String str) {
        return subscribe(ReActedSubscription.newBuilder().setSubscriber(subscriber).setBufferSize(Flow.defaultBufferSize()).setBackpressureTimeout(BEST_EFFORT_SUBSCRIPTION).setSubscriberName(str).setAsyncBackpressurer(ForkJoinPool.commonPool()).setSequencer(ReActedSubscription.NO_CUSTOM_SEQUENCER).build());
    }

    public CompletionStage<Void> subscribe(Flow.Subscriber<? super PayloadT> subscriber, int i) {
        return subscribe(ReActedSubscription.newBuilder().setSubscriber(subscriber).setBufferSize(i).setBackpressureTimeout(BEST_EFFORT_SUBSCRIPTION).setAsyncBackpressurer(ForkJoinPool.commonPool()).setSubscriberName(UUID.randomUUID().toString()).setSequencer(ReActedSubscription.NO_CUSTOM_SEQUENCER).build());
    }

    public CompletionStage<Void> subscribe(Flow.Subscriber<? super PayloadT> subscriber, int i, String str) {
        return subscribe(ReActedSubscription.newBuilder().setSubscriber(subscriber).setBufferSize(i).setBackpressureTimeout(BEST_EFFORT_SUBSCRIPTION).setAsyncBackpressurer(ForkJoinPool.commonPool()).setSubscriberName(str).setSequencer(ReActedSubscription.NO_CUSTOM_SEQUENCER).build());
    }

    public CompletionStage<Void> subscribe(Flow.Subscriber<? super PayloadT> subscriber, Executor executor, Duration duration) {
        return subscribe(ReActedSubscription.newBuilder().setSubscriber(subscriber).setBufferSize(Flow.defaultBufferSize()).setBackpressureTimeout((Duration) ObjectUtils.requiredCondition((Duration) Objects.requireNonNull(duration), Predicate.not((v0) -> {
            return v0.isZero();
        }), IllegalArgumentException::new)).setAsyncBackpressurer(executor).setSubscriberName(UUID.randomUUID().toString()).setSequencer(ReActedSubscription.NO_CUSTOM_SEQUENCER).build());
    }

    public CompletionStage<Void> subscribe(Flow.Subscriber<? super PayloadT> subscriber, Duration duration) {
        return subscribe(ReActedSubscription.newBuilder().setSubscriber(subscriber).setBufferSize(Flow.defaultBufferSize()).setBackpressureTimeout((Duration) ObjectUtils.requiredCondition((Duration) Objects.requireNonNull(duration), Predicate.not((v0) -> {
            return v0.isZero();
        }), IllegalArgumentException::new)).setAsyncBackpressurer(ForkJoinPool.commonPool()).setSubscriberName(UUID.randomUUID().toString()).setSequencer(ReActedSubscription.NO_CUSTOM_SEQUENCER).build());
    }

    public CompletionStage<Void> subscribe(Flow.Subscriber<? super PayloadT> subscriber, Duration duration, String str) {
        return subscribe(ReActedSubscription.newBuilder().setSubscriber(subscriber).setBufferSize(Flow.defaultBufferSize()).setBackpressureTimeout((Duration) ObjectUtils.requiredCondition((Duration) Objects.requireNonNull(duration), Predicate.not((v0) -> {
            return v0.isZero();
        }), IllegalArgumentException::new)).setAsyncBackpressurer(ForkJoinPool.commonPool()).setSubscriberName(str).setSequencer(ReActedSubscription.NO_CUSTOM_SEQUENCER).build());
    }

    public CompletionStage<Void> subscribe(Flow.Subscriber<? super PayloadT> subscriber, Duration duration, Executor executor, String str) {
        return subscribe(ReActedSubscription.newBuilder().setSubscriber(subscriber).setBufferSize(Flow.defaultBufferSize()).setBackpressureTimeout((Duration) ObjectUtils.requiredCondition((Duration) Objects.requireNonNull(duration), Predicate.not((v0) -> {
            return v0.isZero();
        }), IllegalArgumentException::new)).setAsyncBackpressurer(executor).setSubscriberName(str).setSequencer(ReActedSubscription.NO_CUSTOM_SEQUENCER).build());
    }

    public CompletionStage<Void> subscribe(Flow.Subscriber<? super PayloadT> subscriber, int i, Duration duration) {
        return subscribe(ReActedSubscription.newBuilder().setSubscriber(subscriber).setBufferSize(i).setBackpressureTimeout((Duration) ObjectUtils.requiredCondition((Duration) Objects.requireNonNull(duration), Predicate.not((v0) -> {
            return v0.isZero();
        }), IllegalArgumentException::new)).setAsyncBackpressurer(ForkJoinPool.commonPool()).setSubscriberName(UUID.randomUUID().toString()).setSequencer(ReActedSubscription.NO_CUSTOM_SEQUENCER).build());
    }

    public CompletionStage<Void> subscribe(Flow.Subscriber<? super PayloadT> subscriber, int i, Executor executor, Duration duration) {
        return subscribe(ReActedSubscription.newBuilder().setSubscriber(subscriber).setBufferSize(i).setBackpressureTimeout((Duration) ObjectUtils.requiredCondition((Duration) Objects.requireNonNull(duration), Predicate.not((v0) -> {
            return v0.isZero();
        }), IllegalArgumentException::new)).setAsyncBackpressurer(executor).setSubscriberName(UUID.randomUUID().toString()).setSequencer(ReActedSubscription.NO_CUSTOM_SEQUENCER).build());
    }

    public CompletionStage<Void> subscribe(ReActedSubscription<PayloadT> reActedSubscription) {
        CompletableFuture completableFuture = new CompletableFuture();
        BackpressureManager backpressureManager = new BackpressureManager(reActedSubscription, this.feedGate, completableFuture);
        this.localReActorSystem.spawnChild(backpressureManager.getReActions(), this.localReActorSystem.getUserReActorsRoot(), ((ReActorConfig.Builder) ((ReActorConfig.Builder) ((ReActorConfig.Builder) ((ReActorConfig.Builder) ReActorConfig.newBuilder().setReActorName(this.feedGate.getReActorId().getReActorName() + "_subscriber_" + reActedSubscription.getSubscriberName() + "_" + this.feedGate.getReActorId().getReActorUUID().toString())).setDispatcherName("ReactorSystemDispatcher")).setTypedSubscriptions(TypedSubscription.NO_SUBSCRIPTIONS)).setMailBoxProvider(backpressureManager.getManagerMailbox())).build()).orElseSneakyThrow();
        return completableFuture;
    }

    public CompletionStage<Void> backpressurableSubmit(PayloadT payloadt) {
        return (CompletionStage) ((List) this.subscribers.stream().map(reActorRef -> {
            return reActorRef.atell(reActorRef, payloadt);
        }).collect(Collectors.toUnmodifiableList())).stream().reduce((completionStage, completionStage2) -> {
            return completionStage.thenCompose(r3 -> {
                return completionStage2;
            });
        }).map(completionStage3 -> {
            return completionStage3.thenAccept(r1 -> {
            });
        }).orElse(CompletableFuture.completedFuture(null));
    }

    public void submit(PayloadT payloadt) {
        this.subscribers.forEach(reActorRef -> {
            reActorRef.tell(reActorRef, payloadt);
        });
    }

    private void onInterrupt(ReActorContext reActorContext, PublisherInterrupt publisherInterrupt) {
        this.subscribers.forEach(reActorRef -> {
            reActorRef.tell(reActorContext.getSelf(), publisherInterrupt);
        });
        this.subscribers.clear();
        reActorContext.stop();
    }

    private void onStop(ReActorContext reActorContext, ReActorStop reActorStop) {
        this.subscribers.forEach(reActorRef -> {
            reActorRef.tell(reActorContext.getSelf(), new PublisherComplete());
        });
        this.subscribers.clear();
    }

    private void onSubscriptionRequest(ReActorContext reActorContext, SubscriptionRequest subscriptionRequest) {
        ReActorRef subscriptionBackpressuringManager = subscriptionRequest.getSubscriptionBackpressuringManager();
        ReActedUtils.ifNotDelivered(subscriptionBackpressuringManager.atell(reActorContext.getSelf(), new SubscriptionReply(this.subscribers.add(subscriptionBackpressuringManager))), th -> {
            reActorContext.logError("Unable to deliver subscription confirmation to {}", new Serializable[]{subscriptionRequest.getSubscriptionBackpressuringManager(), th});
        });
    }

    private void onUnSubscriptionRequest(ReActorContext reActorContext, UnsubscriptionRequest unsubscriptionRequest) {
        this.subscribers.remove(unsubscriptionRequest.getSubscriptionBackpressuringManager());
    }

    private ReactedSubmissionPublisher<PayloadT> setFeedGate(ReActorRef reActorRef) {
        return (ReactedSubmissionPublisher) SerializationUtils.setObjectField(this, FEED_GATE_OFFSET, reActorRef);
    }

    private ReactedSubmissionPublisher<PayloadT> setLocalReActorSystem(ReActorSystem reActorSystem) {
        return (ReactedSubmissionPublisher) SerializationUtils.setObjectField(this, LOCAL_REACTOR_SYSTEM, reActorSystem);
    }
}
