package io.gravitee.gateway.services.sync.process.kubernetes;

import io.gravitee.common.service.AbstractService;
import io.gravitee.common.utils.RxHelper;
import io.gravitee.gateway.services.sync.SyncManager;
import io.gravitee.gateway.services.sync.process.distributed.service.DistributedSyncService;
import io.gravitee.node.api.Node;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/gravitee/gateway/services/sync/process/kubernetes/KubernetesSyncManager.class */
public class KubernetesSyncManager extends AbstractService<SyncManager> implements SyncManager {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KubernetesSyncManager.class);
    private static final int EXPONENTIAL_BACKOFF_RETRY_INITIAL_DELAY_MS = 1000;
    private static final int EXPONENTIAL_BACKOFF_RETRY_MAX_DELAY_MS = 10000;
    private static final double EXPONENTIAL_BACKOFF_RETRY_FACTOR = 1.5d;
    private final List<KubernetesSynchronizer> synchronizers;
    private final DistributedSyncService distributedSyncService;
    private final Node node;
    private Disposable watcherDisposable;
    private final AtomicBoolean synced = new AtomicBoolean();

    public KubernetesSyncManager(Node node, List<KubernetesSynchronizer> list, DistributedSyncService distributedSyncService) {
        this.synchronizers = list;
        this.distributedSyncService = distributedSyncService;
        this.node = node;
    }

    protected void doStart() {
        log.debug("Starting kubernetes synchronization process");
        if (!this.distributedSyncService.isEnabled() || this.distributedSyncService.isPrimaryNode()) {
            this.watcherDisposable = sync().doOnComplete(() -> {
                log.debug("Moving to ready state as all resources have been synchronized");
                this.synced.set(true);
            }).andThen(watch()).doOnError(th -> {
                log.error("An error occurred during Kubernetes synchronization. Restarting ...", th);
            }).retryWhen(RxHelper.retryExponentialBackoff(1000L, 10000L, TimeUnit.MILLISECONDS, EXPONENTIAL_BACKOFF_RETRY_FACTOR)).subscribe();
        } else {
            log.warn("Kubernetes synchronization is disabled as distributed sync is enabled, and current node is secondary.");
        }
    }

    protected void doStop() {
        if (this.watcherDisposable == null || this.watcherDisposable.isDisposed()) {
            return;
        }
        this.watcherDisposable.dispose();
    }

    @Override // io.gravitee.gateway.services.sync.SyncManager
    public boolean syncDone() {
        return this.synced.get();
    }

    private Completable sync() {
        return Flowable.fromIterable(this.synchronizers).doOnNext(kubernetesSynchronizer -> {
            log.debug("{} will synchronize all resources ...", kubernetesSynchronizer);
        }).concatMapCompletable(kubernetesSynchronizer2 -> {
            return kubernetesSynchronizer2.synchronize((Set) this.node.metadata().get("environments"));
        });
    }

    private Completable watch() {
        return Flowable.fromIterable(this.synchronizers).doOnNext(kubernetesSynchronizer -> {
            log.debug("{} will start watching on resources ...", kubernetesSynchronizer);
        }).concatMapCompletable(kubernetesSynchronizer2 -> {
            return kubernetesSynchronizer2.watch((Set) this.node.metadata().get("environments"));
        });
    }

    @Generated
    public KubernetesSyncManager(List<KubernetesSynchronizer> list, DistributedSyncService distributedSyncService, Node node) {
        this.synchronizers = list;
        this.distributedSyncService = distributedSyncService;
        this.node = node;
    }
}
