package io.gravitee.gateway.jupiter.reactor.v4.subscription;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.gravitee.common.service.AbstractService;
import io.gravitee.common.utils.RxHelper;
import io.gravitee.gateway.api.service.Subscription;
import io.gravitee.gateway.jupiter.api.ListenerType;
import io.gravitee.gateway.jupiter.core.context.MutableExecutionContext;
import io.gravitee.gateway.jupiter.reactor.ApiReactor;
import io.gravitee.gateway.reactor.handler.Acceptor;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableTransformer;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Predicate;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gravitee/gateway/jupiter/reactor/v4/subscription/DefaultSubscriptionDispatcher.class */
public class DefaultSubscriptionDispatcher extends AbstractService<SubscriptionDispatcher> implements SubscriptionDispatcher {
    public static final int ON_SUBSCRIPTION_ERROR_RETRY_COUNT = 5;
    public static final int ON_SUBSCRIPTION_ERROR_RETRY_DELAY_MS = 3000;
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSubscriptionDispatcher.class);
    private static final String SUBSCRIPTION_ENTRYPOINT_FIELD = "entrypointId";
    private final SubscriptionAcceptorResolver subscriptionAcceptorResolver;
    private final SubscriptionExecutionRequestFactory subscriptionExecutionRequestFactory;
    private final Map<String, Subscription> activeSubscriptions = new ConcurrentHashMap();
    private final Map<String, Disposable> activeDisposables = new ConcurrentHashMap();
    private final ObjectMapper mapper = new ObjectMapper();

    public DefaultSubscriptionDispatcher(SubscriptionAcceptorResolver subscriptionAcceptorResolver, SubscriptionExecutionRequestFactory subscriptionExecutionRequestFactory) {
        this.subscriptionAcceptorResolver = subscriptionAcceptorResolver;
        this.subscriptionExecutionRequestFactory = subscriptionExecutionRequestFactory;
    }

    private static boolean statusIsAccepted(Subscription subscription) {
        return "ACCEPTED".equalsIgnoreCase(subscription.getStatus());
    }

    private static boolean consumerStatusIsActive(Subscription subscription) {
        return Subscription.ConsumerStatus.STARTED.equals(subscription.getConsumerStatus());
    }

    private static Predicate<Throwable> manageErrors() {
        return th -> {
            if (!(th instanceof SubscriptionExpiredException)) {
                return false;
            }
            LOGGER.debug(th.getMessage());
            return true;
        };
    }

    @Override // io.gravitee.gateway.jupiter.reactor.v4.subscription.SubscriptionDispatcher
    public void dispatch(Subscription subscription) {
        if (statusIsAccepted(subscription) && consumerStatusIsActive(subscription) && !isExpired(subscription)) {
            this.activeSubscriptions.compute(subscription.getId(), (str, subscription2) -> {
                return subscription2 == null ? activateSubscription(subscription) : (!subscription.equals(subscription2) || subscription.isForceDispatch()) ? updateSubscription(subscription) : subscription2;
            });
        } else {
            disposeSubscription(subscription.getId());
        }
    }

    private Subscription activateSubscription(Subscription subscription) {
        Acceptor<SubscriptionAcceptor> resolve = this.subscriptionAcceptorResolver.resolve(subscription);
        if (resolve == null) {
            return null;
        }
        ApiReactor apiReactor = (ApiReactor) resolve.reactor();
        try {
            String asText = this.mapper.readTree(subscription.getConfiguration()).path(SUBSCRIPTION_ENTRYPOINT_FIELD).asText();
            if (asText == null || asText.trim().isEmpty()) {
                LOGGER.error("Unable to handle subscription without known entrypoint id");
                return null;
            }
            this.activeDisposables.put(subscription.getId(), buildSubscriptionObservable(subscription, apiReactor, asText).doOnComplete(() -> {
                this.activeDisposables.remove(subscription.getId());
            }).subscribe());
            return subscription;
        } catch (Exception e) {
            LOGGER.error("Unable to dispatch subscription id[{}] api[{}]", new Object[]{subscription.getId(), subscription.getApi(), e});
            return null;
        }
    }

    private Completable buildSubscriptionObservable(Subscription subscription, ApiReactor apiReactor, String str) {
        Single fromCallable = Single.fromCallable(() -> {
            MutableExecutionContext create = this.subscriptionExecutionRequestFactory.create(subscription);
            create.setInternalAttribute("subscriptionType", str);
            create.setInternalAttribute("subscription", subscription);
            create.setAttribute("gravitee.attribute.plan", subscription.getPlan());
            create.setAttribute("gravitee.attribute.application", subscription.getApplication());
            create.setAttribute("gravitee.attribute.user-id", subscription.getId());
            create.setInternalAttribute("securityChain.skip", true);
            create.setInternalAttribute("listener.type", ListenerType.SUBSCRIPTION);
            return create;
        });
        Objects.requireNonNull(apiReactor);
        return fromCallable.flatMapCompletable(apiReactor::handle).compose(delayToStartDate(subscription)).compose(timeoutAtEndingDate(subscription)).onErrorComplete(manageErrors()).compose(RxHelper.retry(5, ON_SUBSCRIPTION_ERROR_RETRY_DELAY_MS, TimeUnit.MILLISECONDS)).onErrorResumeNext(th -> {
            return Completable.complete();
        });
    }

    private CompletableTransformer delayToStartDate(Subscription subscription) {
        return completable -> {
            return subscription.getStartingAt() != null ? completable.delaySubscription(getMillisecondsTo(subscription.getStartingAt()), TimeUnit.MILLISECONDS) : completable;
        };
    }

    private CompletableTransformer timeoutAtEndingDate(Subscription subscription) {
        return completable -> {
            return subscription.getEndingAt() != null ? completable.timeout(getMillisecondsTo(subscription.getEndingAt()), TimeUnit.MILLISECONDS, Completable.error(new SubscriptionExpiredException(subscription))) : completable;
        };
    }

    private void disposeAllSubscriptions() {
        Iterator<String> it = this.activeDisposables.keySet().iterator();
        while (it.hasNext()) {
            disposeSubscription(it.next());
        }
    }

    private void disposeSubscription(String str) {
        Disposable remove = this.activeDisposables.remove(str);
        if (remove != null) {
            try {
                remove.dispose();
            } catch (Exception e) {
                LOGGER.warn("Unexpected exception while disposing subscription [{}]: {}", str, e.getMessage());
            }
        }
        this.activeSubscriptions.remove(str);
    }

    private Subscription updateSubscription(Subscription subscription) {
        if (this.activeDisposables.containsKey(subscription.getId())) {
            this.activeDisposables.get(subscription.getId()).dispose();
        }
        return activateSubscription(subscription);
    }

    private long getMillisecondsTo(Date date) {
        long time = date.getTime() - Calendar.getInstance().getTimeInMillis();
        if (time > 0) {
            return time;
        }
        return 0L;
    }

    private boolean isExpired(Subscription subscription) {
        return subscription.getEndingAt() != null && subscription.getEndingAt().before(Calendar.getInstance().getTime());
    }

    protected void doStop() throws Exception {
        disposeAllSubscriptions();
    }

    protected Map<String, Subscription> getActiveSubscriptions() {
        return this.activeSubscriptions;
    }

    protected Map<String, Disposable> getActiveDisposables() {
        return this.activeDisposables;
    }
}
