package io.gravitee.gateway.services.sync.process.repository;

import io.gravitee.common.service.AbstractService;
import io.gravitee.common.utils.RxHelper;
import io.gravitee.gateway.services.sync.SyncManager;
import io.gravitee.gateway.services.sync.process.distributed.DistributedSynchronizer;
import io.gravitee.gateway.services.sync.process.distributed.service.DistributedSyncService;
import io.gravitee.gateway.services.sync.process.repository.handler.SyncHandler;
import io.gravitee.node.api.Node;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.disposables.Disposable;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gravitee/gateway/services/sync/process/repository/DefaultSyncManager.class */
public class DefaultSyncManager extends AbstractService<SyncManager> implements SyncManager {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DefaultSyncManager.class);
    private static final String PATH = "/sync";
    public static final int TIMEFRAME_DELAY = 30000;
    public static final int RETRY_DELAY_MS = 3000;
    public static final int INITIAL_RETRY_DELAY_MS = 3000;
    public static final int MAX_RETRY_DELAY_MS = 10000;
    private final Router router;
    private final Node node;
    private final List<RepositorySynchronizer> synchronizers;
    private final List<DistributedSynchronizer> distributedSynchronizers;
    private final int retryAttempt;
    private final DistributedSyncService distributedSyncService;
    private final int delay;
    private final TimeUnit unit;
    private final AtomicLong syncCounter = new AtomicLong(0);
    private final AtomicBoolean initialSync = new AtomicBoolean(false);
    private final AtomicLong totalSyncOnErrorCounter = new AtomicLong(0);
    private final AtomicBoolean lastSyncOnError = new AtomicBoolean(false);
    private final AtomicReference<String> lastSyncErrorMessage = new AtomicReference<>();
    private final AtomicBoolean isClusterPrimaryNode = new AtomicBoolean(true);
    private long nextFromTime = -1;
    private Disposable refreshDisposable;
    private Route routeHandler;

    public DefaultSyncManager(Router router, Node node, List<RepositorySynchronizer> list, List<DistributedSynchronizer> list2, DistributedSyncService distributedSyncService, int i, TimeUnit timeUnit, int i2) {
        this.router = router;
        this.node = node;
        this.synchronizers = list;
        this.distributedSynchronizers = list2;
        this.distributedSyncService = distributedSyncService;
        this.delay = i;
        this.unit = timeUnit;
        this.retryAttempt = i2;
    }

    protected void doStart() {
        log.info("Starting sync manager");
        log.debug("Associate a new HTTP handler on {}", PATH);
        this.routeHandler = this.router.get(PATH).produces("application/json").handler(new SyncHandler(this));
        this.synchronizers.sort(Comparator.comparingInt((v0) -> {
            return v0.order();
        }));
        if (this.distributedSyncService.isEnabled()) {
            this.distributedSyncService.validate();
            if (this.distributedSynchronizers != null) {
                this.distributedSynchronizers.sort(Comparator.comparingInt((v0) -> {
                    return v0.order();
                }));
            }
            this.isClusterPrimaryNode.set(this.distributedSyncService.isPrimaryNode());
        }
        synchronize().retryWhen(RxHelper.retryExponentialBackoff(3000L, 10000L, TimeUnit.MILLISECONDS, 1.5d)).andThen(Completable.fromRunnable(() -> {
            log.info("Sync service has been scheduled with delay [{}{}]", Integer.valueOf(this.delay), this.unit.name());
            this.refreshDisposable = Flowable.generate(() -> {
                return 0L;
            }, (l, emitter) -> {
                emitter.onNext(l);
                return Long.valueOf(l.longValue() + 1);
            }).delay(this.delay, this.unit).rebatchRequests(1).concatMapCompletable(l2 -> {
                return synchronize();
            }).onErrorComplete().subscribe();
        })).subscribe();
    }

    protected void doStop() throws Exception {
        if (this.refreshDisposable != null) {
            this.refreshDisposable.dispose();
        }
        if (this.routeHandler != null) {
            this.routeHandler.remove();
        }
        super.doStop();
    }

    private Completable synchronize() {
        return Completable.defer(() -> {
            log.debug("Running synchronization process...");
            return this.distributedSyncService.ready();
        }).andThen(Single.defer(() -> {
            return (this.distributedSyncService.isEnabled() && this.distributedSyncService.isPrimaryNode() && (this.nextFromTime == -1 || this.isClusterPrimaryNode.compareAndSet(false, true))) ? this.distributedSyncService.state().map(distributedSyncState -> {
                log.debug("Retrieving distributed sync state");
                this.nextFromTime = distributedSyncState.getFrom().longValue();
                return distributedSyncState.getTo();
            }).switchIfEmpty(Single.just(Long.valueOf(System.currentTimeMillis()))) : Single.just(Long.valueOf(System.currentTimeMillis()));
        })).flatMapCompletable(l -> {
            Completable concatMapCompletable;
            log.debug("Synchronization #{} started at {}", Long.valueOf(this.syncCounter.incrementAndGet()), Instant.now());
            log.debug("Events from {} to {} would be synchronized.", Instant.ofEpochMilli(this.nextFromTime - 30000), Instant.ofEpochMilli(l.longValue() + 30000));
            if (!this.distributedSyncService.isEnabled() || this.distributedSyncService.isPrimaryNode()) {
                concatMapCompletable = Flowable.fromIterable(this.synchronizers).concatMapCompletable(repositorySynchronizer -> {
                    return repositorySynchronizer.synchronize(Long.valueOf(this.nextFromTime), l, (Set) this.node.metadata().get("environments")).compose(completable -> {
                        return retrySynchronizer(completable, repositorySynchronizer.getClass().getSimpleName());
                    });
                });
            } else {
                log.debug("Distributed synchronizers will be used as distributed sync is enabled, and current node is secondary.");
                concatMapCompletable = this.distributedSynchronizers != null ? Flowable.fromIterable(this.distributedSynchronizers).concatMapCompletable(distributedSynchronizer -> {
                    return distributedSynchronizer.synchronize(Long.valueOf(this.nextFromTime), l).compose(completable -> {
                        return retrySynchronizer(completable, distributedSynchronizer.getClass().getSimpleName());
                    });
                }) : Completable.complete();
            }
            return concatMapCompletable.andThen(this.distributedSyncService.storeState(this.nextFromTime, l.longValue())).doOnComplete(() -> {
                this.lastSyncOnError.set(false);
                this.lastSyncErrorMessage.set(null);
                if (this.nextFromTime == -1) {
                    this.initialSync.set(true);
                }
                this.nextFromTime = l.longValue();
                log.debug("Synchronization #{} ended at {} (took {}ms}", new Object[]{Long.valueOf(this.syncCounter.get()), Instant.now().toString(), Long.valueOf(System.currentTimeMillis() - l.longValue())});
            });
        }).doOnError(th -> {
            if (this.lastSyncOnError.get()) {
                log.error("Synchronization process is still failing.");
            } else {
                log.error("Synchronization process has failed", th);
            }
            this.lastSyncOnError.set(true);
            this.lastSyncErrorMessage.set(th.getMessage());
        });
    }

    private Completable retrySynchronizer(Completable completable, String str) {
        return completable.doOnError(th -> {
            log.warn("An error occurs while executing synchronizer {}, retrying...", str, th);
        }).compose(RxHelper.retry(this.retryAttempt, 3000, TimeUnit.MILLISECONDS)).doOnError(th2 -> {
            log.error("Latest attempt of synchronizer {} has failed", str, th2);
        });
    }

    public long nextSyncTime() {
        return this.nextFromTime;
    }

    public long syncCounter() {
        return this.syncCounter.longValue();
    }

    @Override // io.gravitee.gateway.services.sync.SyncManager
    public boolean syncDone() {
        return this.initialSync.get();
    }

    public long totalSyncOnError() {
        return this.totalSyncOnErrorCounter.get();
    }

    public boolean lastSyncOnError() {
        return this.lastSyncOnError.get();
    }

    public String lastSyncErrorMessage() {
        return this.lastSyncErrorMessage.get();
    }
}
