package io.zeebe.test.broker.protocol.brokerapi;

import io.zeebe.dispatcher.Dispatcher;
import io.zeebe.dispatcher.Dispatchers;
import io.zeebe.protocol.clientapi.ControlMessageType;
import io.zeebe.protocol.clientapi.EventType;
import io.zeebe.protocol.clientapi.SubscriptionType;
import io.zeebe.test.broker.protocol.MsgPackHelper;
import io.zeebe.test.broker.protocol.brokerapi.data.TopicLeader;
import io.zeebe.test.broker.protocol.brokerapi.data.Topology;
import io.zeebe.test.broker.protocol.clientapi.TestTopicClient;
import io.zeebe.test.util.collection.MapFactoryBuilder;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.ServerMessageHandler;
import io.zeebe.transport.ServerTransport;
import io.zeebe.transport.Transports;
import io.zeebe.util.actor.ActorScheduler;
import io.zeebe.util.actor.ActorSchedulerBuilder;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.junit.rules.ExternalResource;

/* loaded from: input_file:io/zeebe/test/broker/protocol/brokerapi/StubBrokerRule.class */
public class StubBrokerRule extends ExternalResource {
    public static final String TEST_TOPIC_NAME = "default-topic";
    public static final int TEST_PARTITION_ID = 0;
    protected final String host;
    protected final int port;
    protected ActorScheduler actorScheduler;
    protected ServerTransport transport;
    protected Dispatcher sendBuffer;
    protected StubResponseChannelHandler channelHandler;
    protected MsgPackHelper msgPackHelper;
    private InetSocketAddress bindAddr;
    protected AtomicReference<Topology> currentTopology;

    public StubBrokerRule() {
        this("127.0.0.1", 51015);
    }

    public StubBrokerRule(String str, int i) {
        this.currentTopology = new AtomicReference<>();
        this.host = str;
        this.port = i;
    }

    protected void before() throws Throwable {
        this.msgPackHelper = new MsgPackHelper();
        this.actorScheduler = ActorSchedulerBuilder.createDefaultScheduler("broker-rule");
        this.sendBuffer = Dispatchers.create("send-buffer").actorScheduler(this.actorScheduler).subscriptions(new String[]{"sender"}).bufferSize(1048576).build();
        this.channelHandler = new StubResponseChannelHandler(this.msgPackHelper);
        this.bindAddr = new InetSocketAddress(this.host, this.port);
        this.currentTopology.set(new Topology().addTopic(new TopicLeader(this.host, this.port, "default-topic", 0)).addTopic(new TopicLeader(this.host, this.port, "internal-system", 0)));
        stubTopologyRequest();
        bindTransport();
    }

    protected void after() {
        if (this.transport != null) {
            closeTransport();
        }
        if (this.sendBuffer != null) {
            this.sendBuffer.close();
        }
        if (this.actorScheduler != null) {
            this.actorScheduler.close();
        }
    }

    public void interruptAllServerChannels() {
        this.transport.interruptAllChannels();
    }

    public void closeTransport() {
        if (this.transport == null) {
            throw new RuntimeException("transport not open");
        }
        this.transport.close();
        this.transport = null;
    }

    public void bindTransport() {
        if (this.transport != null) {
            throw new RuntimeException("transport already open");
        }
        this.transport = Transports.newServerTransport().bindAddress(this.bindAddr).scheduler(this.actorScheduler).sendBuffer(this.sendBuffer).build((ServerMessageHandler) null, this.channelHandler);
    }

    public MapFactoryBuilder<ExecuteCommandRequest, ExecuteCommandResponseBuilder> onWorkflowRequestRespondWith(long j) {
        return onWorkflowRequestRespondWith("default-topic", 0, j);
    }

    public MapFactoryBuilder<ExecuteCommandRequest, ExecuteCommandResponseBuilder> onWorkflowRequestRespondWith(String str, int i, long j) {
        return onExecuteCommandRequest(executeCommandRequest -> {
            return executeCommandRequest.eventType() == EventType.WORKFLOW_INSTANCE_EVENT;
        }).respondWith().topicName(str).partitionId(i).key(j).event().allOf(executeCommandRequest2 -> {
            return executeCommandRequest2.getCommand();
        });
    }

    public ResponseBuilder<ExecuteCommandResponseBuilder, ErrorResponseBuilder<ExecuteCommandRequest>> onExecuteCommandRequest() {
        return onExecuteCommandRequest(executeCommandRequest -> {
            return true;
        });
    }

    public ResponseBuilder<ExecuteCommandResponseBuilder, ErrorResponseBuilder<ExecuteCommandRequest>> onExecuteCommandRequest(Predicate<ExecuteCommandRequest> predicate) {
        StubResponseChannelHandler stubResponseChannelHandler = this.channelHandler;
        stubResponseChannelHandler.getClass();
        ExecuteCommandResponseBuilder executeCommandResponseBuilder = new ExecuteCommandResponseBuilder(stubResponseChannelHandler::addExecuteCommandRequestStub, this.msgPackHelper, predicate);
        StubResponseChannelHandler stubResponseChannelHandler2 = this.channelHandler;
        stubResponseChannelHandler2.getClass();
        return new ResponseBuilder<>(executeCommandResponseBuilder, new ErrorResponseBuilder(stubResponseChannelHandler2::addExecuteCommandRequestStub, this.msgPackHelper, predicate));
    }

    public ResponseBuilder<ExecuteCommandResponseBuilder, ErrorResponseBuilder<ExecuteCommandRequest>> onExecuteCommandRequest(EventType eventType, String str) {
        return onExecuteCommandRequest(executeCommandRequest -> {
            return executeCommandRequest.eventType() == eventType && str.equals(executeCommandRequest.getCommand().get(TestTopicClient.PROP_STATE));
        });
    }

    public ResponseBuilder<ExecuteCommandResponseBuilder, ErrorResponseBuilder<ExecuteCommandRequest>> onExecuteCommandRequest(String str, int i, EventType eventType, String str2) {
        return onExecuteCommandRequest(executeCommandRequest -> {
            return str.equals(executeCommandRequest.topicName()) && executeCommandRequest.partitionId() == i && executeCommandRequest.eventType() == eventType && str2.equals(executeCommandRequest.getCommand().get(TestTopicClient.PROP_STATE));
        });
    }

    public ResponseBuilder<ControlMessageResponseBuilder, ErrorResponseBuilder<ControlMessageRequest>> onControlMessageRequest() {
        return onControlMessageRequest(controlMessageRequest -> {
            return true;
        });
    }

    public ResponseBuilder<ControlMessageResponseBuilder, ErrorResponseBuilder<ControlMessageRequest>> onControlMessageRequest(Predicate<ControlMessageRequest> predicate) {
        StubResponseChannelHandler stubResponseChannelHandler = this.channelHandler;
        stubResponseChannelHandler.getClass();
        ControlMessageResponseBuilder controlMessageResponseBuilder = new ControlMessageResponseBuilder(stubResponseChannelHandler::addControlMessageRequestStub, this.msgPackHelper, predicate);
        StubResponseChannelHandler stubResponseChannelHandler2 = this.channelHandler;
        stubResponseChannelHandler2.getClass();
        return new ResponseBuilder<>(controlMessageResponseBuilder, new ErrorResponseBuilder(stubResponseChannelHandler2::addControlMessageRequestStub, this.msgPackHelper, predicate));
    }

    public List<ControlMessageRequest> getReceivedControlMessageRequests() {
        return this.channelHandler.getReceivedControlMessageRequests();
    }

    public List<ExecuteCommandRequest> getReceivedCommandRequests() {
        return this.channelHandler.getReceivedCommandRequests();
    }

    public List<Object> getAllReceivedRequests() {
        return this.channelHandler.getAllReceivedRequests();
    }

    public SubscribedEventBuilder newSubscribedEvent() {
        return new SubscribedEventBuilder(this.msgPackHelper, this.transport);
    }

    protected void stubTopologyRequest() {
        ((ControlMessageResponseBuilder) onControlMessageRequest(controlMessageRequest -> {
            return controlMessageRequest.messageType() == ControlMessageType.REQUEST_TOPOLOGY;
        }).respondWith().data().put("topicLeaders", controlMessageRequest2 -> {
            return this.currentTopology.get().getTopicLeaders();
        }).put("brokers", controlMessageRequest3 -> {
            return this.currentTopology.get().getBrokers();
        }).done()).register();
    }

    public void addTopic(String str, int i) {
        Topology topology = new Topology(this.currentTopology.get());
        topology.addTopic(new TopicLeader(this.host, this.port, str, i));
        this.currentTopology.set(topology);
    }

    public void setCurrentTopology(Topology topology) {
        this.currentTopology.set(topology);
    }

    public void stubTopicSubscriptionApi(long j) {
        AtomicLong atomicLong = new AtomicLong(j);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        ((ExecuteCommandResponseBuilder) onExecuteCommandRequest(EventType.SUBSCRIBER_EVENT, "SUBSCRIBE").respondWith().key(executeCommandRequest -> {
            return Long.valueOf(atomicLong.getAndIncrement());
        }).topicName(executeCommandRequest2 -> {
            return executeCommandRequest2.topicName();
        }).partitionId(executeCommandRequest3 -> {
            return Integer.valueOf(executeCommandRequest3.partitionId());
        }).event().allOf(executeCommandRequest4 -> {
            return executeCommandRequest4.getCommand();
        }).put(TestTopicClient.PROP_STATE, "SUBSCRIBED").done()).register();
        ((ControlMessageResponseBuilder) onControlMessageRequest(controlMessageRequest -> {
            return controlMessageRequest.messageType() == ControlMessageType.REMOVE_TOPIC_SUBSCRIPTION;
        }).respondWith().data().allOf(controlMessageRequest2 -> {
            return controlMessageRequest2.getData();
        }).done()).register();
        ((ExecuteCommandResponseBuilder) onExecuteCommandRequest(EventType.SUBSCRIPTION_EVENT, "ACKNOWLEDGE").respondWith().key(executeCommandRequest5 -> {
            return Long.valueOf(atomicLong2.getAndIncrement());
        }).topicName(executeCommandRequest6 -> {
            return executeCommandRequest6.topicName();
        }).partitionId(executeCommandRequest7 -> {
            return Integer.valueOf(executeCommandRequest7.partitionId());
        }).event().allOf(executeCommandRequest8 -> {
            return executeCommandRequest8.getCommand();
        }).put(TestTopicClient.PROP_STATE, "ACKNOWLEDGED").done()).register();
    }

    public void stubTaskSubscriptionApi(long j) {
        AtomicLong atomicLong = new AtomicLong(j);
        ((ControlMessageResponseBuilder) onControlMessageRequest(controlMessageRequest -> {
            return controlMessageRequest.messageType() == ControlMessageType.ADD_TASK_SUBSCRIPTION;
        }).respondWith().data().allOf(controlMessageRequest2 -> {
            return controlMessageRequest2.getData();
        }).put("subscriberKey", controlMessageRequest3 -> {
            return Long.valueOf(atomicLong.getAndIncrement());
        }).done()).register();
        ((ControlMessageResponseBuilder) onControlMessageRequest(controlMessageRequest4 -> {
            return controlMessageRequest4.messageType() == ControlMessageType.REMOVE_TASK_SUBSCRIPTION;
        }).respondWith().data().allOf(controlMessageRequest5 -> {
            return controlMessageRequest5.getData();
        }).done()).register();
        ((ControlMessageResponseBuilder) onControlMessageRequest(controlMessageRequest6 -> {
            return controlMessageRequest6.messageType() == ControlMessageType.INCREASE_TASK_SUBSCRIPTION_CREDITS;
        }).respondWith().data().allOf(controlMessageRequest7 -> {
            return controlMessageRequest7.getData();
        }).done()).register();
    }

    public void pushTopicEvent(RemoteAddress remoteAddress, long j, long j2, long j3) {
        pushTopicEvent(remoteAddress, j, j2, j3, EventType.RAFT_EVENT);
    }

    public void pushTopicEvent(RemoteAddress remoteAddress, long j, long j2, long j3, EventType eventType) {
        ((SubscribedEventBuilder) newSubscribedEvent().topicName("default-topic").partitionId(0).key(j2).position(j3).eventType(eventType).subscriberKey(j).subscriptionType(SubscriptionType.TOPIC_SUBSCRIPTION).event().done()).push(remoteAddress);
    }

    public void pushLockedTask(RemoteAddress remoteAddress, long j, long j2, long j3, String str, String str2) {
        ((SubscribedEventBuilder) newSubscribedEvent().topicName("default-topic").partitionId(0).key(j2).position(j3).eventType(EventType.TASK_EVENT).subscriberKey(j).subscriptionType(SubscriptionType.TASK_SUBSCRIPTION).event().put("type", str2).put("lockTime", 1000L).put("lockOwner", str).put("retries", 3).put(TestTopicClient.PROP_WORKFLOW_PAYLOAD, this.msgPackHelper.encodeAsMsgPack(new HashMap())).put(TestTopicClient.PROP_STATE, "LOCKED").done()).push(remoteAddress);
    }
}
