package io.reacted.streams;

import io.reacted.core.mailboxes.BackpressuringMbox;
import io.reacted.core.mailboxes.BoundedBasicMbox;
import io.reacted.core.mailboxes.MailBox;
import io.reacted.core.messages.reactors.ReActorInit;
import io.reacted.core.messages.reactors.ReActorStop;
import io.reacted.core.reactors.ReActions;
import io.reacted.core.reactorsystem.ReActorContext;
import io.reacted.core.reactorsystem.ReActorRef;
import io.reacted.core.utils.ReActedUtils;
import io.reacted.patterns.NonNullByDefault;
import io.reacted.patterns.Try;
import io.reacted.streams.ReactedSubmissionPublisher;
import io.reacted.streams.messages.PublisherComplete;
import io.reacted.streams.messages.PublisherInterrupt;
import io.reacted.streams.messages.SubscriberError;
import io.reacted.streams.messages.SubscriptionReply;
import io.reacted.streams.messages.SubscriptionRequest;
import io.reacted.streams.messages.UnsubscriptionRequest;
import java.io.Serializable;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.function.Function;
import javax.annotation.Nullable;

@NonNullByDefault
/* loaded from: input_file:io/reacted/streams/BackpressureManager.class */
public class BackpressureManager<PayloadT extends Serializable> implements Flow.Subscription, AutoCloseable {
    private final Flow.Subscriber<? super PayloadT> subscriber;
    private final ReActorRef feedGate;
    private final BackpressuringMbox.Builder bpMailboxBuilder;
    private final CompletionStage<Void> onSubscriptionCompleteTrigger;

    @Nullable
    private MailBox backpressuringMbox;

    @Nullable
    private volatile ReActorContext backpressurerCtx;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BackpressureManager(ReactedSubmissionPublisher.ReActedSubscription<PayloadT> reActedSubscription, ReActorRef reActorRef, CompletionStage<Void> completionStage) {
        this.onSubscriptionCompleteTrigger = completionStage;
        this.subscriber = reActedSubscription.getSubscriber();
        this.feedGate = (ReActorRef) Objects.requireNonNull(reActorRef);
        this.bpMailboxBuilder = BackpressuringMbox.newBuilder().setRealMbox(new BoundedBasicMbox(reActedSubscription.getBufferSize())).setBackpressureTimeout(reActedSubscription.getBackpressureTimeout()).setBufferSize(reActedSubscription.getBufferSize()).setRequestOnStartup(0).setAsyncBackpressurer(reActedSubscription.getAsyncBackpressurer()).setNonDelayable(Set.of(ReActorInit.class, ReActorStop.class, SubscriptionRequest.class, SubscriptionReply.class, UnsubscriptionRequest.class, SubscriberError.class, PublisherInterrupt.class)).setNonBackpressurable(Set.of(PublisherComplete.class)).setSequencer(reActedSubscription.getSequencer());
    }

    @Override // java.util.concurrent.Flow.Subscription
    public void request(long j) {
        if (j <= 0) {
            errorTermination((ReActorContext) Objects.requireNonNull(this.backpressurerCtx), new IllegalArgumentException("non-positive subscription request"), this.subscriber);
        } else {
            if (this.backpressurerCtx == null || this.backpressuringMbox == null) {
                return;
            }
            this.backpressuringMbox.request(j);
        }
    }

    @Override // java.util.concurrent.Flow.Subscription
    public void cancel() {
        close();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        if (this.backpressurerCtx != null) {
            ((ReActorContext) Objects.requireNonNull(this.backpressurerCtx)).stop();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Function<ReActorContext, MailBox> getManagerMailbox() {
        return reActorContext -> {
            return this.bpMailboxBuilder.setRealMailboxOwner(reActorContext).build();
        };
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReActions getReActions() {
        return ReActions.newBuilder().reAct(ReActorInit.class, this::onInit).reAct(ReActorStop.class, this::onStop).reAct(SubscriptionReply.class, this::onSubscriptionReply).reAct(SubscriberError.class, this::onSubscriberError).reAct(PublisherComplete.class, this::onPublisherComplete).reAct(PublisherInterrupt.class, this::onPublisherInterrupt).reAct((v1, v2) -> {
            forwarder(v1, v2);
        }).build();
    }

    private void onStop(ReActorContext reActorContext, ReActorStop reActorStop) {
        this.feedGate.tell(ReActorRef.NO_REACTOR_REF, new UnsubscriptionRequest(reActorContext.getSelf()));
    }

    private void forwarder(ReActorContext reActorContext, Object obj) {
        try {
            this.subscriber.onNext((Serializable) obj);
        } catch (Exception e) {
            errorTermination(reActorContext, e, this.subscriber);
        }
    }

    private void onSubscriptionReply(ReActorContext reActorContext, SubscriptionReply subscriptionReply) {
        this.onSubscriptionCompleteTrigger.toCompletableFuture().complete(null);
        if (subscriptionReply.isSuccess()) {
            Try.ofRunnable(() -> {
                this.subscriber.onSubscribe(this);
            }).ifError(th -> {
                errorTermination(reActorContext, th, this.subscriber);
            });
        } else {
            errorTermination(reActorContext, new RuntimeException("RemoteRegistrationException"), this.subscriber);
        }
    }

    private void onSubscriberError(ReActorContext reActorContext, SubscriberError subscriberError) {
        errorTermination(reActorContext, subscriberError.getError(), this.subscriber);
    }

    private void onPublisherComplete(ReActorContext reActorContext, PublisherComplete publisherComplete) {
        completeTermination(reActorContext, this.subscriber);
    }

    private void onPublisherInterrupt(ReActorContext reActorContext, PublisherInterrupt publisherInterrupt) {
        completeTermination(reActorContext, this.subscriber);
    }

    private void onInit(ReActorContext reActorContext, ReActorInit reActorInit) {
        this.backpressurerCtx = reActorContext;
        this.backpressuringMbox = reActorContext.getMbox();
        ReActedUtils.ifNotDelivered(this.feedGate.tell(reActorContext.getSelf(), new SubscriptionRequest(reActorContext.getSelf())), th -> {
            this.subscriber.onSubscribe(this);
            errorTermination(reActorContext, th, this.subscriber);
        });
    }

    private void completeTermination(ReActorContext reActorContext, Flow.Subscriber<? super PayloadT> subscriber) {
        close();
        Objects.requireNonNull(subscriber);
        Try.ofRunnable(subscriber::onComplete).ifError(th -> {
            reActorContext.logError("Error in {} onComplete: ", new Serializable[]{subscriber.getClass().getSimpleName(), th});
        });
    }

    private void errorTermination(ReActorContext reActorContext, Throwable th, Flow.Subscriber<? super PayloadT> subscriber) {
        close();
        Try.ofRunnable(() -> {
            subscriber.onError(th);
        }).ifError(th2 -> {
            reActorContext.logError("Error in {} onError: ", new Serializable[]{subscriber.getClass().getSimpleName(), th2});
        });
    }
}
