package io.zeebe.test.broker.protocol.clientapi;

import io.zeebe.dispatcher.Dispatcher;
import io.zeebe.dispatcher.Dispatchers;
import io.zeebe.protocol.clientapi.ControlMessageType;
import io.zeebe.protocol.clientapi.MessageHeaderDecoder;
import io.zeebe.test.broker.protocol.MsgPackHelper;
import io.zeebe.transport.ClientTransport;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.transport.SocketAddress;
import io.zeebe.transport.Transports;
import io.zeebe.util.actor.ActorScheduler;
import io.zeebe.util.actor.ActorSchedulerBuilder;
import java.util.stream.Stream;
import org.agrona.DirectBuffer;
import org.junit.rules.ExternalResource;

/* loaded from: input_file:io/zeebe/test/broker/protocol/clientapi/ClientApiRule.class */
public class ClientApiRule extends ExternalResource {
    public static final String DEFAULT_TOPIC_NAME = "default-topic";
    public static final int DEFAULT_PARTITION_ID = 0;
    public static final long DEFAULT_LOCK_DURATION = 1000;
    protected ClientTransport transport;
    protected Dispatcher sendBuffer;
    protected final SocketAddress brokerAddress;
    protected RemoteAddress streamAddress;
    protected MsgPackHelper msgPackHelper;
    protected RawMessageCollector incomingMessageCollector;
    private ActorScheduler scheduler;

    public ClientApiRule() {
        this("localhost", 51015);
    }

    public ClientApiRule(String str, int i) {
        this.brokerAddress = new SocketAddress(str, i);
    }

    protected void before() throws Throwable {
        this.scheduler = ActorSchedulerBuilder.createDefaultScheduler("client-rule");
        this.sendBuffer = Dispatchers.create("clientSendBuffer").bufferSize(33554432).subscriptions(new String[]{"sender"}).actorScheduler(this.scheduler).build();
        this.incomingMessageCollector = new RawMessageCollector();
        this.transport = Transports.newClientTransport().inputListener(this.incomingMessageCollector).scheduler(this.scheduler).requestPoolSize(128).sendBuffer(this.sendBuffer).build();
        this.msgPackHelper = new MsgPackHelper();
        this.streamAddress = this.transport.registerRemoteAddress(this.brokerAddress);
    }

    protected void after() {
        if (this.sendBuffer != null) {
            this.sendBuffer.close();
        }
        if (this.transport != null) {
            this.transport.close();
        }
        if (this.scheduler != null) {
            this.scheduler.close();
        }
    }

    public ExecuteCommandRequestBuilder createCmdRequest() {
        return new ExecuteCommandRequestBuilder(this.transport.getOutput(), this.streamAddress, this.msgPackHelper);
    }

    public ControlMessageRequestBuilder createControlMessageRequest() {
        return new ControlMessageRequestBuilder(this.transport.getOutput(), this.streamAddress, this.msgPackHelper);
    }

    public ClientApiRule moveMessageStreamToTail() {
        this.incomingMessageCollector.moveToTail();
        return this;
    }

    public ClientApiRule moveMessageStreamToHead() {
        this.incomingMessageCollector.moveToHead();
        return this;
    }

    public int numSubscribedEventsAvailable() {
        return (int) this.incomingMessageCollector.getNumMessagesFulfilling(this::isSubscribedEvent);
    }

    public TestTopicClient topic() {
        return topic("default-topic", 0);
    }

    public TestTopicClient topic(String str, int i) {
        return new TestTopicClient(this, str, i);
    }

    public ExecuteCommandRequest openTopicSubscription(String str, long j) {
        return openTopicSubscription("default-topic", 0, str, j);
    }

    public ExecuteCommandRequest openTopicSubscription(String str, int i, String str2, long j) {
        return ((ExecuteCommandRequestBuilder) createCmdRequest().topicName(str).partitionId(i).eventTypeSubscriber().command().put("startPosition", Long.valueOf(j)).put("name", str2).put(TestTopicClient.PROP_STATE, "SUBSCRIBE").done()).send();
    }

    public ControlMessageRequest closeTopicSubscription(long j) {
        return ((ControlMessageRequestBuilder) createControlMessageRequest().messageType(ControlMessageType.REMOVE_TOPIC_SUBSCRIPTION).data().put("topicName", "default-topic").put("partitionId", 0).put("subscriberKey", Long.valueOf(j)).done()).send();
    }

    public ControlMessageRequest openTaskSubscription(String str) {
        return openTaskSubscription("default-topic", 0, str, 1000L);
    }

    public ControlMessageRequest openTaskSubscription(String str, int i, String str2, long j) {
        return ((ControlMessageRequestBuilder) createControlMessageRequest().messageType(ControlMessageType.ADD_TASK_SUBSCRIPTION).data().put("topicName", str).put("partitionId", Integer.valueOf(i)).put("taskType", str2).put("lockDuration", Long.valueOf(j)).put("lockOwner", "test").put("credits", 10).done()).send();
    }

    public Stream<RawMessage> incomingMessages() {
        return Stream.generate(this.incomingMessageCollector);
    }

    public Stream<SubscribedEvent> subscribedEvents() {
        return incomingMessages().filter(this::isSubscribedEvent).map(this::asSubscribedEvent);
    }

    public Stream<RawMessage> commandResponses() {
        return incomingMessages().filter(this::isCommandResponse);
    }

    public void interruptAllChannels() {
        this.transport.interruptAllChannels();
    }

    public SocketAddress getBrokerAddress() {
        return this.brokerAddress;
    }

    protected SubscribedEvent asSubscribedEvent(RawMessage rawMessage) {
        SubscribedEvent subscribedEvent = new SubscribedEvent(rawMessage);
        subscribedEvent.wrap(rawMessage.getMessage(), 0, rawMessage.getMessage().capacity());
        return subscribedEvent;
    }

    protected boolean isCommandResponse(RawMessage rawMessage) {
        return rawMessage.isResponse() && isMessageOfType(rawMessage.getMessage(), 21);
    }

    protected boolean isSubscribedEvent(RawMessage rawMessage) {
        return rawMessage.isMessage() && isMessageOfType(rawMessage.getMessage(), 30);
    }

    protected boolean isMessageOfType(DirectBuffer directBuffer, int i) {
        MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder();
        messageHeaderDecoder.wrap(directBuffer, 0);
        return messageHeaderDecoder.templateId() == i;
    }
}
