package com.alibaba.nacos.naming.consistency.ephemeral.distro;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.core.utils.SystemUtils;
import com.alibaba.nacos.naming.cluster.ServerListManager;
import com.alibaba.nacos.naming.cluster.ServerStatus;
import com.alibaba.nacos.naming.cluster.servers.Server;
import com.alibaba.nacos.naming.cluster.transport.Serializer;
import com.alibaba.nacos.naming.consistency.ApplyAction;
import com.alibaba.nacos.naming.consistency.Datum;
import com.alibaba.nacos.naming.consistency.KeyBuilder;
import com.alibaba.nacos.naming.consistency.RecordListener;
import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
import com.alibaba.nacos.naming.core.DistroMapper;
import com.alibaba.nacos.naming.core.Instances;
import com.alibaba.nacos.naming.misc.GlobalConfig;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.NamingProxy;
import com.alibaba.nacos.naming.misc.NetUtils;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.pojo.Record;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct;
import org.javatuples.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service("distroConsistencyService")
/* loaded from: input_file:com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.class */
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService {

    @Autowired
    private DistroMapper distroMapper;

    @Autowired
    private DataStore dataStore;

    @Autowired
    private TaskDispatcher taskDispatcher;

    @Autowired
    private Serializer serializer;

    @Autowired
    private ServerListManager serverListManager;

    @Autowired
    private SwitchDomain switchDomain;

    @Autowired
    private GlobalConfig globalConfig;
    private boolean initialized = false;
    private volatile Notifier notifier = new Notifier();
    private LoadDataTask loadDataTask = new LoadDataTask();
    private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap();
    private Map<String, String> syncChecksumTasks = new ConcurrentHashMap(16);

    /* loaded from: input_file:com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl$LoadDataTask.class */
    private class LoadDataTask implements Runnable {
        private LoadDataTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                DistroConsistencyServiceImpl.this.load();
                if (!DistroConsistencyServiceImpl.this.initialized) {
                    GlobalExecutor.submit(this, DistroConsistencyServiceImpl.this.globalConfig.getLoadDataRetryDelayMillis());
                }
            } catch (Exception e) {
                Loggers.DISTRO.error("load data failed.", e);
            }
        }
    }

    /* loaded from: input_file:com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl$Notifier.class */
    public class Notifier implements Runnable {
        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10240);
        private BlockingQueue<Pair> tasks = new LinkedBlockingQueue(1048576);

        public Notifier() {
        }

        public void addTask(String str, ApplyAction applyAction) {
            if (this.services.containsKey(str) && applyAction == ApplyAction.CHANGE) {
                return;
            }
            if (applyAction == ApplyAction.CHANGE) {
                this.services.put(str, "");
            }
            this.tasks.add(Pair.with(str, applyAction));
        }

        public int getTaskSize() {
            return this.tasks.size();
        }

        @Override // java.lang.Runnable
        public void run() {
            Loggers.DISTRO.info("distro notifier started");
            while (true) {
                try {
                    Pair take = this.tasks.take();
                    if (take != null) {
                        String str = (String) take.getValue0();
                        ApplyAction applyAction = (ApplyAction) take.getValue1();
                        this.services.remove(str);
                        int i = 0;
                        if (DistroConsistencyServiceImpl.this.listeners.containsKey(str)) {
                            Iterator<RecordListener> it = DistroConsistencyServiceImpl.this.listeners.get(str).iterator();
                            while (it.hasNext()) {
                                RecordListener next = it.next();
                                i++;
                                try {
                                } catch (Throwable th) {
                                    Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", str, th);
                                }
                                if (applyAction == ApplyAction.CHANGE) {
                                    next.onChange(str, DistroConsistencyServiceImpl.this.dataStore.get(str).value);
                                } else if (applyAction == ApplyAction.DELETE) {
                                    next.onDelete(str);
                                }
                            }
                            if (Loggers.DISTRO.isDebugEnabled()) {
                                Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", new Object[]{str, Integer.valueOf(i), applyAction.name()});
                            }
                        }
                    }
                } catch (Throwable th2) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", th2);
                }
            }
        }
    }

    @PostConstruct
    public void init() {
        GlobalExecutor.submit(this.loadDataTask);
        GlobalExecutor.submitDistroNotifyTask(this.notifier);
    }

    public void load() throws Exception {
        if (SystemUtils.STANDALONE_MODE) {
            this.initialized = true;
            return;
        }
        while (this.serverListManager.getHealthyServers().size() <= 1) {
            Thread.sleep(1000L);
            Loggers.DISTRO.info("waiting server list init...");
        }
        for (Server server : this.serverListManager.getHealthyServers()) {
            if (!NetUtils.localServer().equals(server.getKey())) {
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("sync from " + server);
                }
                if (syncAllDataFromRemote(server)) {
                    this.initialized = true;
                    return;
                }
            }
        }
    }

    @Override // com.alibaba.nacos.naming.consistency.ConsistencyService
    public void put(String str, Record record) throws NacosException {
        onPut(str, record);
        this.taskDispatcher.addTask(str);
    }

    @Override // com.alibaba.nacos.naming.consistency.ConsistencyService
    public void remove(String str) throws NacosException {
        onRemove(str);
        this.listeners.remove(str);
    }

    @Override // com.alibaba.nacos.naming.consistency.ConsistencyService
    public Datum get(String str) throws NacosException {
        return this.dataStore.get(str);
    }

    public void onPut(String str, Record record) {
        if (KeyBuilder.matchEphemeralInstanceListKey(str)) {
            Datum datum = new Datum();
            datum.value = (Instances) record;
            datum.key = str;
            datum.timestamp.incrementAndGet();
            this.dataStore.put(str, datum);
        }
        if (this.listeners.containsKey(str)) {
            this.notifier.addTask(str, ApplyAction.CHANGE);
        }
    }

    public void onRemove(String str) {
        this.dataStore.remove(str);
        if (this.listeners.containsKey(str)) {
            this.notifier.addTask(str, ApplyAction.DELETE);
        }
    }

    public void onReceiveChecksums(Map<String, String> map, String str) {
        if (this.syncChecksumTasks.containsKey(str)) {
            Loggers.DISTRO.warn("sync checksum task already in process with {}", str);
            return;
        }
        this.syncChecksumTasks.put(str, "1");
        try {
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            for (Map.Entry<String, String> entry : map.entrySet()) {
                if (this.distroMapper.responsible(KeyBuilder.getServiceName(entry.getKey()))) {
                    Loggers.DISTRO.error("receive responsible key timestamp of " + entry.getKey() + " from " + str);
                    this.syncChecksumTasks.remove(str);
                    return;
                } else if (!this.dataStore.contains(entry.getKey()) || this.dataStore.get(entry.getKey()).value == 0 || !this.dataStore.get(entry.getKey()).value.getChecksum().equals(entry.getValue())) {
                    arrayList.add(entry.getKey());
                }
            }
            for (String str2 : this.dataStore.keys()) {
                if (str.equals(this.distroMapper.mapSrv(KeyBuilder.getServiceName(str2)))) {
                    if (!map.containsKey(str2)) {
                        arrayList2.add(str2);
                    }
                }
            }
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.info("to remove keys: {}, to update keys: {}, source: {}", new Object[]{arrayList2, arrayList, str});
            }
            Iterator it = arrayList2.iterator();
            while (it.hasNext()) {
                onRemove((String) it.next());
            }
            if (arrayList.isEmpty()) {
                return;
            }
            try {
                processData(NamingProxy.getData(arrayList, str));
            } catch (Exception e) {
                Loggers.DISTRO.error("get data from " + str + " failed!", e);
            }
            this.syncChecksumTasks.remove(str);
        } finally {
            this.syncChecksumTasks.remove(str);
        }
    }

    public boolean syncAllDataFromRemote(Server server) {
        try {
            processData(NamingProxy.getAllData(server.getKey()));
            return true;
        } catch (Exception e) {
            Loggers.DISTRO.error("sync full data from " + server + " failed!", e);
            return false;
        }
    }

    public void processData(byte[] bArr) throws Exception {
        if (bArr.length > 0) {
            Map deserializeMap = this.serializer.deserializeMap(bArr, Instances.class);
            for (Map.Entry entry : deserializeMap.entrySet()) {
                this.dataStore.put((String) entry.getKey(), (Datum) entry.getValue());
                if (!this.listeners.containsKey(entry.getKey()) && this.switchDomain.isDefaultInstanceEphemeral()) {
                    Loggers.DISTRO.info("creating service {}", entry.getKey());
                    com.alibaba.nacos.naming.core.Service service = new com.alibaba.nacos.naming.core.Service();
                    String serviceName = KeyBuilder.getServiceName((String) entry.getKey());
                    String namespace = KeyBuilder.getNamespace((String) entry.getKey());
                    service.setName(serviceName);
                    service.setNamespaceId(namespace);
                    service.setGroupName("DEFAULT_GROUP");
                    service.setLastModifiedMillis(System.currentTimeMillis());
                    service.recalculateChecksum();
                    this.listeners.get("com.alibaba.nacos.naming.domains.meta.").get(0).onChange(KeyBuilder.buildServiceMetaKey(namespace, serviceName), service);
                }
            }
            for (Map.Entry entry2 : deserializeMap.entrySet()) {
                if (this.listeners.containsKey(entry2.getKey())) {
                    try {
                        Iterator<RecordListener> it = this.listeners.get(entry2.getKey()).iterator();
                        while (it.hasNext()) {
                            it.next().onChange((String) entry2.getKey(), ((Datum) entry2.getValue()).value);
                        }
                        this.dataStore.put((String) entry2.getKey(), (Datum) entry2.getValue());
                    } catch (Exception e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry2.getKey(), e);
                    }
                } else {
                    Loggers.DISTRO.warn("listener of {} not found.", entry2.getKey());
                }
            }
        }
    }

    @Override // com.alibaba.nacos.naming.consistency.ConsistencyService
    public void listen(String str, RecordListener recordListener) throws NacosException {
        if (!this.listeners.containsKey(str)) {
            this.listeners.put(str, new CopyOnWriteArrayList<>());
        }
        if (this.listeners.get(str).contains(recordListener)) {
            return;
        }
        this.listeners.get(str).add(recordListener);
    }

    @Override // com.alibaba.nacos.naming.consistency.ConsistencyService
    public void unlisten(String str, RecordListener recordListener) throws NacosException {
        if (this.listeners.containsKey(str)) {
            Iterator<RecordListener> it = this.listeners.get(str).iterator();
            while (it.hasNext()) {
                if (it.next().equals(recordListener)) {
                    this.listeners.get(str).remove(recordListener);
                    return;
                }
            }
        }
    }

    @Override // com.alibaba.nacos.naming.consistency.ConsistencyService
    public boolean isAvailable() {
        return isInitialized() || ServerStatus.UP.name().equals(this.switchDomain.getOverriddenServerStatus());
    }

    public boolean isInitialized() {
        return this.initialized || !this.globalConfig.isDataWarmup();
    }
}
