package io.gravitee.gateway.services.sync.process.distributed.synchronizer;

import io.gravitee.gateway.services.sync.process.common.deployer.Deployer;
import io.gravitee.gateway.services.sync.process.common.model.Deployable;
import io.gravitee.gateway.services.sync.process.common.model.SyncAction;
import io.gravitee.gateway.services.sync.process.distributed.DistributedSynchronizer;
import io.gravitee.gateway.services.sync.process.distributed.fetcher.DistributedEventFetcher;
import io.gravitee.repository.distributedsync.model.DistributedEvent;
import io.gravitee.repository.distributedsync.model.DistributedEventType;
import io.gravitee.repository.distributedsync.model.DistributedSyncAction;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.time.Instant;
import java.util.Set;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gravitee/gateway/services/sync/process/distributed/synchronizer/AbstractDistributedSynchronizer.class */
public abstract class AbstractDistributedSynchronizer<T extends Deployable, Y extends Deployer<T>> implements DistributedSynchronizer {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractDistributedSynchronizer.class);
    protected static final Set<DistributedSyncAction> INIT_SYNC_ACTIONS = Set.of(DistributedSyncAction.DEPLOY);
    protected static final Set<DistributedSyncAction> INCREMENTAL_SYNC_ACTIONS = Set.of(DistributedSyncAction.DEPLOY, DistributedSyncAction.UNDEPLOY);
    private final DistributedEventFetcher distributedEventFetcher;
    private final ThreadPoolExecutor syncFetcherExecutor;
    private final ThreadPoolExecutor syncDeployerExecutor;

    @Override // io.gravitee.gateway.services.sync.process.distributed.DistributedSynchronizer
    public Completable synchronize(Long l, Long l2) {
        boolean z = l == null || l.longValue() == -1;
        AtomicLong atomicLong = new AtomicLong();
        return this.distributedEventFetcher.fetchLatest(l, l2, distributedEventType(), syncActions(z)).subscribeOn(Schedulers.from(this.syncFetcherExecutor)).flatMapMaybe(this::mapTo).compose(flowable -> {
            Y createDeployer = createDeployer();
            return flowable.parallel(this.syncDeployerExecutor.getMaximumPoolSize()).runOn(Schedulers.from(this.syncDeployerExecutor)).flatMap(deployable -> {
                return deployable.syncAction() == SyncAction.DEPLOY ? deploy(createDeployer, deployable) : deployable.syncAction() == SyncAction.UNDEPLOY ? undeploy(createDeployer, deployable) : Flowable.empty();
            }).sequential(this.distributedEventFetcher.bulkItems());
        }).count().doOnSubscribe(disposable -> {
            atomicLong.set(Instant.now().toEpochMilli());
        }).doOnSuccess(l3 -> {
            String format = String.format("%s %s(s) synchronized in %sms", l3, distributedEventType().name().toLowerCase(), Long.valueOf(System.currentTimeMillis() - atomicLong.get()));
            if (z) {
                log.info(format);
            } else {
                log.debug(format);
            }
        }).ignoreElement();
    }

    protected Set<DistributedSyncAction> syncActions(boolean z) {
        return z ? INIT_SYNC_ACTIONS : INCREMENTAL_SYNC_ACTIONS;
    }

    protected abstract DistributedEventType distributedEventType();

    protected abstract Maybe<T> mapTo(DistributedEvent distributedEvent);

    protected abstract Y createDeployer();

    private Flowable<T> deploy(Y y, T t) {
        return y.deploy(t).andThen(y.doAfterDeployment(t)).andThen(Flowable.just(t));
    }

    private Flowable<T> undeploy(Y y, T t) {
        return y.undeploy(t).andThen(y.doAfterUndeployment(t)).andThen(Flowable.just(t));
    }

    @Generated
    public AbstractDistributedSynchronizer(DistributedEventFetcher distributedEventFetcher, ThreadPoolExecutor threadPoolExecutor, ThreadPoolExecutor threadPoolExecutor2) {
        this.distributedEventFetcher = distributedEventFetcher;
        this.syncFetcherExecutor = threadPoolExecutor;
        this.syncDeployerExecutor = threadPoolExecutor2;
    }
}
