package io.camunda.zeebe.topology;

import io.atomix.cluster.ClusterMembershipService;
import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.scheduler.ActorSchedulingService;
import io.camunda.zeebe.scheduler.future.ActorFuture;
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import io.camunda.zeebe.topology.TopologyInitializer;
import io.camunda.zeebe.topology.changes.NoopTopologyMembershipChangeExecutor;
import io.camunda.zeebe.topology.changes.PartitionChangeExecutor;
import io.camunda.zeebe.topology.changes.TopologyChangeAppliersImpl;
import io.camunda.zeebe.topology.changes.TopologyChangeCoordinator;
import io.camunda.zeebe.topology.changes.TopologyChangeCoordinatorImpl;
import io.camunda.zeebe.topology.gossip.ClusterTopologyGossiper;
import io.camunda.zeebe.topology.gossip.ClusterTopologyGossiperConfig;
import io.camunda.zeebe.topology.serializer.ProtoBufSerializer;
import io.camunda.zeebe.topology.state.ClusterTopology;
import io.camunda.zeebe.util.FileUtil;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/* loaded from: input_file:io/camunda/zeebe/topology/ClusterTopologyManagerService.class */
public final class ClusterTopologyManagerService extends Actor {
    private static final String COORDINATOR_ID = "0";
    private static final String TOPOLOGY_FILE_NAME = ".topology.meta";
    private final ClusterTopologyManagerImpl clusterTopologyManager;
    private final ClusterTopologyGossiper clusterTopologyGossiper;
    private final boolean isCoordinator;
    private final PersistedClusterTopology persistedClusterTopology;
    private final Path topologyFile;

    public ClusterTopologyManagerService(Path path, ClusterCommunicationService clusterCommunicationService, ClusterMembershipService clusterMembershipService, ClusterTopologyGossiperConfig clusterTopologyGossiperConfig) {
        try {
            FileUtil.ensureDirectoryExists(path);
            MemberId id = clusterMembershipService.getLocalMember().id();
            this.topologyFile = path.resolve(TOPOLOGY_FILE_NAME);
            this.persistedClusterTopology = new PersistedClusterTopology(this.topologyFile, new ProtoBufSerializer());
            this.clusterTopologyManager = new ClusterTopologyManagerImpl(this, id, this.persistedClusterTopology);
            ProtoBufSerializer protoBufSerializer = new ProtoBufSerializer();
            ClusterTopologyManagerImpl clusterTopologyManagerImpl = this.clusterTopologyManager;
            Objects.requireNonNull(clusterTopologyManagerImpl);
            this.clusterTopologyGossiper = new ClusterTopologyGossiper(this, clusterCommunicationService, clusterMembershipService, protoBufSerializer, clusterTopologyGossiperConfig, clusterTopologyManagerImpl::onGossipReceived);
            ClusterTopologyManagerImpl clusterTopologyManagerImpl2 = this.clusterTopologyManager;
            ClusterTopologyGossiper clusterTopologyGossiper = this.clusterTopologyGossiper;
            Objects.requireNonNull(clusterTopologyGossiper);
            clusterTopologyManagerImpl2.setTopologyGossiper(clusterTopologyGossiper::updateClusterTopology);
            this.isCoordinator = ((String) id.id()).equals(COORDINATOR_ID);
        } catch (IOException e) {
            throw new UncheckedIOException("Failed to create data directory", e);
        }
    }

    private TopologyInitializer getNonCoordinatorInitializer() {
        TopologyInitializer.FileInitializer fileInitializer = new TopologyInitializer.FileInitializer(this.topologyFile, new ProtoBufSerializer());
        ClusterTopologyGossiper clusterTopologyGossiper = this.clusterTopologyGossiper;
        PersistedClusterTopology persistedClusterTopology = this.persistedClusterTopology;
        ClusterTopologyGossiper clusterTopologyGossiper2 = this.clusterTopologyGossiper;
        Objects.requireNonNull(clusterTopologyGossiper2);
        return fileInitializer.orThen(new TopologyInitializer.GossipInitializer(clusterTopologyGossiper, persistedClusterTopology, clusterTopologyGossiper2::updateClusterTopology, this));
    }

    private TopologyInitializer getCoordinatorInitializer(StaticConfiguration staticConfiguration) {
        List<MemberId> list = staticConfiguration.clusterMembers().stream().filter(memberId -> {
            return !memberId.equals(staticConfiguration.localMemberId());
        }).toList();
        TopologyInitializer.FileInitializer fileInitializer = new TopologyInitializer.FileInitializer(this.topologyFile, new ProtoBufSerializer());
        ClusterTopologyGossiper clusterTopologyGossiper = this.clusterTopologyGossiper;
        ClusterTopologyGossiper clusterTopologyGossiper2 = this.clusterTopologyGossiper;
        Objects.requireNonNull(clusterTopologyGossiper2);
        return fileInitializer.orThen(new TopologyInitializer.SyncInitializer(clusterTopologyGossiper, list, this, clusterTopologyGossiper2::queryClusterTopology)).orThen(new TopologyInitializer.StaticInitializer(staticConfiguration));
    }

    public ActorFuture<Void> start(ActorSchedulingService actorSchedulingService, StaticConfiguration staticConfiguration) {
        CompletableActorFuture completableActorFuture = new CompletableActorFuture();
        actorSchedulingService.submitActor(this).onComplete((r8, th) -> {
            if (th == null) {
                this.actor.run(() -> {
                    startClusterTopologyServices(staticConfiguration, completableActorFuture);
                });
            } else {
                completableActorFuture.completeExceptionally(th);
            }
        });
        return completableActorFuture;
    }

    private void startClusterTopologyServices(StaticConfiguration staticConfiguration, CompletableActorFuture<Void> completableActorFuture) {
        TopologyInitializer coordinatorInitializer = this.isCoordinator ? getCoordinatorInitializer(staticConfiguration) : getNonCoordinatorInitializer();
        this.clusterTopologyGossiper.start().onComplete((r6, th) -> {
            if (th != null) {
                completableActorFuture.completeExceptionally(th);
            } else {
                this.clusterTopologyManager.start(coordinatorInitializer).onComplete(completableActorFuture);
            }
        });
    }

    public ActorFuture<ClusterTopology> getClusterTopology() {
        return this.clusterTopologyManager.getClusterTopology();
    }

    public Optional<TopologyChangeCoordinator> getTopologyChangeCoordinator() {
        return this.isCoordinator ? Optional.of(new TopologyChangeCoordinatorImpl(this.clusterTopologyManager, this)) : Optional.empty();
    }

    protected void onActorClosing() {
        this.clusterTopologyGossiper.closeAsync();
    }

    public void registerPartitionChangeExecutor(PartitionChangeExecutor partitionChangeExecutor) {
        this.clusterTopologyManager.registerTopologyChangeAppliers(new TopologyChangeAppliersImpl(partitionChangeExecutor, new NoopTopologyMembershipChangeExecutor()));
    }

    public void removePartitionChangeExecutor() {
        this.clusterTopologyManager.removeTopologyChangeAppliers();
    }
}
