package cn.ponfee.disjob.registry.etcd;

import cn.ponfee.disjob.common.concurrent.LoopThread;
import cn.ponfee.disjob.common.exception.Throwables;
import cn.ponfee.disjob.core.base.Server;
import cn.ponfee.disjob.registry.ConnectionStateListener;
import cn.ponfee.disjob.registry.RegistryException;
import cn.ponfee.disjob.registry.ServerRegistry;
import cn.ponfee.disjob.registry.ServerRole;
import cn.ponfee.disjob.registry.etcd.configuration.EtcdRegistryProperties;
import io.etcd.jetcd.common.exception.ErrorCode;
import io.etcd.jetcd.common.exception.EtcdException;
import io.etcd.jetcd.support.CloseableClient;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.PreDestroy;
import org.apache.commons.collections4.CollectionUtils;

/* loaded from: input_file:cn/ponfee/disjob/registry/etcd/EtcdServerRegistry.class */
public abstract class EtcdServerRegistry<R extends Server, D extends Server> extends ServerRegistry<R, D> {
    private static final String PLACEHOLDER_VALUE = "1";
    private final Object keepAliveLock;
    private final long ttl;
    private final EtcdClient client;
    private final LoopThread keepAliveCheckThread;
    private volatile long leaseId;
    private volatile CloseableClient keepAlive;

    /* JADX INFO: Access modifiers changed from: protected */
    public EtcdServerRegistry(EtcdRegistryProperties etcdRegistryProperties) {
        super(etcdRegistryProperties.getNamespace(), '/');
        this.keepAliveLock = new Object();
        this.ttl = etcdRegistryProperties.getSessionTimeoutMs() / 2000;
        CountDownLatch countDownLatch = new CountDownLatch(1);
        EtcdClient etcdClient = null;
        try {
            try {
                etcdClient = new EtcdClient(etcdRegistryProperties);
                this.client = etcdClient;
                this.client.createPersistentKey(this.registryRootPath, PLACEHOLDER_VALUE);
                createLeaseIdAndKeepAlive();
                this.client.watchChildChanged(this.discoveryRootPath, countDownLatch, this::doRefreshDiscoveryServers);
                long max = Math.max(this.ttl / 4, 1L) * 1000;
                this.keepAliveCheckThread = LoopThread.createStarted("etcd_keep_alive_check", max, max, this::keepAliveCheck);
                this.client.addConnectionStateListener(ConnectionStateListener.builder().onConnected(etcdClient2 -> {
                    keepAliveRecover();
                }).build());
                doRefreshDiscoveryServers(this.client.getKeyChildren(this.discoveryRootPath));
                countDownLatch.countDown();
            } catch (Exception e) {
                if (etcdClient != null) {
                    etcdClient.close();
                }
                throw new RegistryException("Etcd registry init error: " + etcdRegistryProperties, e);
            }
        } catch (Throwable th) {
            countDownLatch.countDown();
            throw th;
        }
    }

    public final boolean isConnected() {
        return this.client.isConnected();
    }

    public final void register(R r) {
        if (this.state.isStopped()) {
            return;
        }
        try {
            this.client.createEphemeralKey(buildRegistryServerId(r), PLACEHOLDER_VALUE, this.leaseId);
            this.registered.add(r);
            this.log.info("Etcd server registered: {}, {}", this.registryRole, r);
        } catch (Throwable th) {
            throw new RegistryException("Etcd server register failed: " + r, th);
        }
    }

    public final void deregister(R r) {
        try {
            this.registered.remove(r);
            this.client.deleteKey(buildRegistryServerId(r));
            this.log.info("Etcd server deregister: {}, {}", this.registryRole, r);
        } catch (Throwable th) {
            this.log.error("Etcd server deregister error.", th);
        }
    }

    public List<R> getRegisteredServers() throws Exception {
        return deserializeRegistryServers(this.client.getKeyChildren(this.registryRootPath));
    }

    @PreDestroy
    public void close() {
        if (this.state.stop()) {
            this.keepAliveCheckThread.terminate();
            this.registered.forEach(this::deregister);
            this.registered.clear();
            CloseableClient closeableClient = this.keepAlive;
            if (closeableClient != null) {
                closeableClient.getClass();
                Throwables.ThrowingRunnable.doCaught(closeableClient::close);
            }
            Throwables.ThrowingRunnable.doCaught(() -> {
                this.client.revokeLease(this.leaseId);
            });
            EtcdClient etcdClient = this.client;
            etcdClient.getClass();
            Throwables.ThrowingRunnable.doCaught(etcdClient::close);
            Throwables.ThrowingRunnable.doCaught(() -> {
                super.close();
            });
        }
    }

    private String buildRegistryServerId(R r) {
        return this.registryRootPath + this.separator + r.serialize();
    }

    private synchronized void doRefreshDiscoveryServers(List<String> list) {
        List list2;
        if (CollectionUtils.isEmpty(list)) {
            this.log.warn("Not discovered available {} from etcd.", this.discoveryRole);
            list2 = Collections.emptyList();
        } else {
            Stream<String> filter = list.stream().filter((v0) -> {
                return Objects.nonNull(v0);
            });
            ServerRole serverRole = this.discoveryRole;
            serverRole.getClass();
            list2 = (List) filter.map(serverRole::deserialize).collect(Collectors.toList());
        }
        refreshDiscoveredServers(list2);
    }

    private void keepAliveCheck() {
        synchronized (this.keepAliveLock) {
            if (this.keepAlive == null) {
                this.log.warn("Keep alive is null, will be create.");
                try {
                    createLeaseIdAndKeepAlive();
                } catch (Throwable th) {
                    this.log.error("keep alive check occur error.", th);
                }
            }
        }
    }

    private void keepAliveRecover() {
        synchronized (this.keepAliveLock) {
            CloseableClient closeableClient = this.keepAlive;
            if (closeableClient != null) {
                try {
                    closeableClient.getClass();
                    Throwables.ThrowingRunnable.doCaught(closeableClient::close);
                    this.keepAlive = null;
                    Throwables.ThrowingRunnable.doCaught(() -> {
                        this.client.revokeLease(this.leaseId);
                    });
                } catch (Throwable th) {
                    this.log.error("Keep alive retry occur error.", th);
                }
            }
            createLeaseIdAndKeepAlive();
        }
    }

    private void createLeaseIdAndKeepAlive() throws Exception {
        this.leaseId = this.client.createLease(this.ttl);
        this.keepAlive = this.client.keepAliveLease(this.leaseId, th -> {
            if (!(th instanceof EtcdException)) {
                this.log.error("Keep alive on fail.", th);
                return;
            }
            EtcdException etcdException = (EtcdException) th;
            this.log.error("Keep alive on error: " + etcdException.getErrorCode(), th);
            if (etcdException.getErrorCode() != ErrorCode.NOT_FOUND) {
                keepAliveRecover();
            }
        }, () -> {
            this.log.error("Keep alive on completed.");
            keepAliveRecover();
        });
        this.registered.forEach(this::register);
    }
}
