package me.ahoo.cosky.config.redis;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import me.ahoo.cosky.config.Config;
import me.ahoo.cosky.config.ConfigChangedEvent;
import me.ahoo.cosky.config.ConfigHistory;
import me.ahoo.cosky.config.ConfigKeyGenerator;
import me.ahoo.cosky.config.ConfigService;
import me.ahoo.cosky.config.ConfigVersion;
import me.ahoo.cosky.config.ListenableConfigService;
import me.ahoo.cosky.config.NamespacedConfigId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.ReactiveRedisMessageListenerContainer;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:me/ahoo/cosky/config/redis/ConsistencyRedisConfigService.class */
public class ConsistencyRedisConfigService implements ListenableConfigService {
    private static final Logger log = LoggerFactory.getLogger(ConsistencyRedisConfigService.class);
    private final ConfigService delegate;
    private final ReactiveRedisMessageListenerContainer listenerContainer;
    private final ConcurrentHashMap<NamespacedConfigId, Mono<Config>> configMapCache;

    @VisibleForTesting
    @Nullable
    private final Consumer<ConfigChangedEvent> hookOnResetCache;

    /* loaded from: input_file:me/ahoo/cosky/config/redis/ConsistencyRedisConfigService$ConfigChangedEventSubscriber.class */
    public static class ConfigChangedEventSubscriber extends BaseSubscriber<ConfigChangedEvent> {
        private final ConsistencyRedisConfigService configService;

        public ConfigChangedEventSubscriber(ConsistencyRedisConfigService consistencyRedisConfigService) {
            this.configService = consistencyRedisConfigService;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void hookOnNext(ConfigChangedEvent configChangedEvent) {
            if (ConsistencyRedisConfigService.log.isInfoEnabled()) {
                ConsistencyRedisConfigService.log.info("hookOnNext - NamespacedConfigId:[{}] - Event:[{}].", configChangedEvent.getNamespacedConfigId(), configChangedEvent.getEvent());
            }
            this.configService.onConfigChanged(configChangedEvent);
        }

        protected void hookOnError(Throwable th) {
            if (ConsistencyRedisConfigService.log.isErrorEnabled()) {
                ConsistencyRedisConfigService.log.error("hookOnError - " + th.getMessage(), th);
            }
        }
    }

    public ConsistencyRedisConfigService(ConfigService configService, ReactiveRedisMessageListenerContainer reactiveRedisMessageListenerContainer) {
        this(configService, reactiveRedisMessageListenerContainer, null);
    }

    public ConsistencyRedisConfigService(ConfigService configService, ReactiveRedisMessageListenerContainer reactiveRedisMessageListenerContainer, Consumer<ConfigChangedEvent> consumer) {
        this.listenerContainer = reactiveRedisMessageListenerContainer;
        this.configMapCache = new ConcurrentHashMap<>();
        this.delegate = configService;
        this.hookOnResetCache = consumer;
    }

    public Flux<ConfigChangedEvent> listen(NamespacedConfigId namespacedConfigId) {
        return listen(namespacedConfigId.getNamespace(), namespacedConfigId.getConfigId());
    }

    public Flux<ConfigChangedEvent> listen(String str, String str2) {
        return this.listenerContainer.receive(new ChannelTopic[]{ChannelTopic.of(ConfigKeyGenerator.getConfigKey(str, str2))}).map(message -> {
            return new ConfigChangedEvent(ConfigKeyGenerator.getConfigIdOfKey((String) message.getChannel()), ConfigChangedEvent.Event.of((String) message.getMessage()));
        });
    }

    @Override // me.ahoo.cosky.config.ConfigService
    public Flux<String> getConfigs(String str) {
        return this.delegate.getConfigs(str);
    }

    @Override // me.ahoo.cosky.config.ConfigService
    public Mono<Config> getConfig(String str, String str2) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "namespace can not be empty!");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str2), "configId can not be empty!");
        return this.configMapCache.computeIfAbsent(NamespacedConfigId.of(str, str2), this::listenAndGetCache);
    }

    private Mono<Config> listenAndGetCache(NamespacedConfigId namespacedConfigId) {
        String namespace = namespacedConfigId.getNamespace();
        String configId = namespacedConfigId.getConfigId();
        listen(namespace, configId).subscribe(new ConfigChangedEventSubscriber(this));
        return this.delegate.getConfig(namespace, configId).cache();
    }

    @Override // me.ahoo.cosky.config.ConfigService
    public Mono<Boolean> setConfig(String str, String str2, String str3) {
        return this.delegate.setConfig(str, str2, str3);
    }

    @Override // me.ahoo.cosky.config.ConfigService
    public Mono<Boolean> removeConfig(String str) {
        return this.delegate.removeConfig(str);
    }

    @Override // me.ahoo.cosky.config.ConfigService
    public Mono<Boolean> removeConfig(String str, String str2) {
        return this.delegate.removeConfig(str, str2);
    }

    @Override // me.ahoo.cosky.config.ConfigService
    public Mono<Boolean> containsConfig(String str, String str2) {
        return this.delegate.containsConfig(str, str2);
    }

    @Override // me.ahoo.cosky.config.ConfigRollback
    public Mono<Boolean> rollback(String str, int i) {
        return this.delegate.rollback(str, i);
    }

    @Override // me.ahoo.cosky.config.ConfigRollback
    public Mono<Boolean> rollback(String str, String str2, int i) {
        return this.delegate.rollback(str, str2, i);
    }

    @Override // me.ahoo.cosky.config.ConfigRollback
    public Flux<ConfigVersion> getConfigVersions(String str, String str2) {
        return this.delegate.getConfigVersions(str, str2);
    }

    @Override // me.ahoo.cosky.config.ConfigRollback
    public Mono<ConfigHistory> getConfigHistory(String str, String str2, int i) {
        return this.delegate.getConfigHistory(str, str2, i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onConfigChanged(ConfigChangedEvent configChangedEvent) {
        NamespacedConfigId namespacedConfigId = configChangedEvent.getNamespacedConfigId();
        this.configMapCache.put(namespacedConfigId, this.delegate.getConfig(namespacedConfigId.getNamespace(), namespacedConfigId.getConfigId()).cache());
        if (null != this.hookOnResetCache) {
            this.hookOnResetCache.accept(configChangedEvent);
        }
    }
}
