package com.yahoo.messagebus;

import com.yahoo.messagebus.MessageBus;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.messagebus.routing.RoutingTable;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:com/yahoo/messagebus/SourceSession.class */
public final class SourceSession implements ReplyHandler, MessageBus.SendBlockedMessages {
    private final MessageBus mbus;
    private final Sequencer sequencer;
    private final ReplyHandler replyHandler;
    private final ThrottlePolicy throttlePolicy;
    private volatile double timeout;
    private final AtomicBoolean destroyed = new AtomicBoolean(false);
    private final CountDownLatch done = new CountDownLatch(1);
    private final Object lock = new Object();
    private volatile int pendingCount = 0;
    private volatile boolean closed = false;
    private final Queue<BlockedMessage> blockedQ = new LinkedList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/yahoo/messagebus/SourceSession$BlockedMessage.class */
    public class BlockedMessage {
        private final Message msg;
        private Result result = null;

        BlockedMessage(Message message) {
            this.msg = message;
        }

        private void notifyComplete(Result result) {
            synchronized (this) {
                this.result = result;
                notify();
            }
        }

        Message getMessage() {
            return this.msg;
        }

        boolean notifyIfExpired() {
            if (!this.msg.isExpired()) {
                return false;
            }
            Error error = new Error(ErrorCode.TIMEOUT, "Timed out in sendQ");
            notifyComplete(new Result(error));
            SourceSession.this.replyHandler.handleReply(SourceSession.this.createSendTimedoutReply(this.msg, error));
            return true;
        }

        boolean sendOrExpire() {
            if (notifyIfExpired()) {
                return true;
            }
            Result sendInternal = SourceSession.this.sendInternal(this.msg);
            if (SourceSession.isSendQFull(sendInternal)) {
                return false;
            }
            notifyComplete(sendInternal);
            return true;
        }

        Result waitComplete() throws InterruptedException {
            synchronized (this) {
                while (this.result == null) {
                    wait();
                }
            }
            return this.result;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SourceSession(MessageBus messageBus, SourceSessionParams sourceSessionParams) {
        this.mbus = messageBus;
        this.sequencer = new Sequencer(messageBus);
        if (!sourceSessionParams.hasReplyHandler()) {
            throw new NullPointerException("Reply handler is null.");
        }
        this.replyHandler = sourceSessionParams.getReplyHandler();
        this.throttlePolicy = sourceSessionParams.getThrottlePolicy();
        this.timeout = sourceSessionParams.getTimeout();
        messageBus.register(this);
    }

    public boolean destroy() {
        if (this.destroyed.getAndSet(true)) {
            return false;
        }
        synchronized (this.lock) {
            this.closed = true;
        }
        this.sequencer.destroy();
        this.mbus.sync();
        return true;
    }

    public void close() {
        synchronized (this.lock) {
            this.closed = true;
        }
        if (this.pendingCount == 0) {
            this.done.countDown();
        }
        try {
            this.done.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        destroy();
    }

    public Result send(Message message) {
        return sendInternal(updateTiming(message));
    }

    private Message updateTiming(Message message) {
        message.setTimeReceivedNow();
        if (message.getTimeRemaining() <= 0) {
            message.setTimeRemaining(((long) this.timeout) * 1000);
        }
        return message;
    }

    private Result sendInternal(Message message) {
        synchronized (this.lock) {
            if (this.closed) {
                return new Result(ErrorCode.SEND_QUEUE_CLOSED, "Source session is closed.");
            }
            if (this.throttlePolicy != null && !this.throttlePolicy.canSend(message, this.pendingCount)) {
                return new Result(ErrorCode.SEND_QUEUE_FULL, "Too much pending data (" + this.pendingCount + " messages).");
            }
            message.pushHandler(this.replyHandler);
            if (this.throttlePolicy != null) {
                this.throttlePolicy.processMessage(message);
            }
            this.pendingCount++;
            if (message.getTrace().shouldTrace(6)) {
                message.getTrace().trace(6, "Source session accepted a " + message.getApproxSize() + " byte message. " + this.pendingCount + " message(s) now pending.");
            }
            message.pushHandler(this);
            this.sequencer.handleMessage(message);
            return Result.ACCEPTED;
        }
    }

    @Override // com.yahoo.messagebus.MessageBus.SendBlockedMessages
    public boolean trySend() {
        if (this.destroyed.get()) {
            return false;
        }
        sendBlockedMessages();
        expireStalledBlockedMessages();
        return true;
    }

    private Reply createSendTimedoutReply(Message message, Error error) {
        EmptyReply emptyReply = new EmptyReply();
        emptyReply.setMessage(message);
        emptyReply.addError(error);
        message.swapState(emptyReply);
        return emptyReply;
    }

    private static boolean isSendQFull(Result result) {
        return !result.isAccepted() && result.getError().getCode() == 100001;
    }

    public Result sendBlocking(Message message) throws InterruptedException {
        Result send = send(message);
        if (isSendQFull(send)) {
            BlockedMessage blockedMessage = new BlockedMessage(message);
            synchronized (this.lock) {
                this.blockedQ.add(blockedMessage);
            }
            send = blockedMessage.waitComplete();
        }
        return send;
    }

    private void expireStalledBlockedMessages() {
        synchronized (this.lock) {
            this.blockedQ.removeIf((v0) -> {
                return v0.notifyIfExpired();
            });
        }
    }

    private void sendBlockedMessages() {
        synchronized (this.lock) {
            boolean z = true;
            while (z) {
                if (this.blockedQ.isEmpty()) {
                    break;
                }
                z = this.blockedQ.element().sendOrExpire();
                if (z) {
                    this.blockedQ.remove();
                }
            }
        }
    }

    @Override // com.yahoo.messagebus.ReplyHandler
    public void handleReply(Reply reply) {
        boolean z;
        if (this.destroyed.get()) {
            reply.discard();
            return;
        }
        synchronized (this.lock) {
            this.pendingCount--;
            if (this.throttlePolicy != null) {
                this.throttlePolicy.processReply(reply);
            }
            z = this.closed && this.pendingCount == 0;
            sendBlockedMessages();
        }
        if (reply.getTrace().shouldTrace(6)) {
            reply.getTrace().trace(6, "Source session received reply. " + this.pendingCount + " message(s) now pending.");
        }
        reply.popHandler().handleReply(reply);
        if (z) {
            this.done.countDown();
        }
    }

    public Result send(Message message, Route route) {
        return send(message.setRoute(route));
    }

    public Result send(Message message, String str) {
        return send(message, str, false);
    }

    public Result send(Message message, String str, boolean z) {
        boolean z2 = false;
        RoutingTable routingTable = this.mbus.getRoutingTable(message.getProtocol().toString());
        if (routingTable != null) {
            Route route = routingTable.getRoute(str);
            if (route != null) {
                message.setRoute(new Route(route));
                z2 = true;
            } else if (!z) {
                return new Result(ErrorCode.ILLEGAL_ROUTE, "Route '" + str + "' not found for protocol '" + message.getProtocol() + "'.");
            }
        } else if (!z) {
            return new Result(ErrorCode.ILLEGAL_ROUTE, "Protocol '" + message.getProtocol() + "' has no routing table.");
        }
        if (!z2) {
            message.setRoute(Route.parse(str));
        }
        return send(message);
    }

    public ReplyHandler getReplyHandler() {
        return this.replyHandler;
    }

    public int getPendingCount() {
        return this.pendingCount;
    }

    public SourceSession setTimeout(double d) {
        this.timeout = d;
        return this;
    }
}
