package dev.restate.sdk.core;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
import com.google.protobuf.UnsafeByteOperations;
import dev.restate.sdk.core.InvocationFlow;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Flow;

/* loaded from: input_file:dev/restate/sdk/core/MessageDecoder.class */
class MessageDecoder implements InvocationFlow.InvocationInputSubscriber {
    private final Flow.Subscriber<InvocationInput> inner;
    private Flow.Subscription inputSubscription;
    private long invocationInputRequests = 0;
    private final Queue<InvocationInput> parsedMessages = new ArrayDeque();
    private ByteString internalBuffer = ByteString.EMPTY;
    private State state = State.WAITING_HEADER;
    private MessageHeader lastParsedMessageHeader = null;
    private RuntimeException lastParsingFailure = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dev/restate/sdk/core/MessageDecoder$State.class */
    public enum State {
        WAITING_HEADER,
        WAITING_PAYLOAD,
        FAILED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageDecoder(Flow.Subscriber<InvocationInput> subscriber) {
        this.inner = subscriber;
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onSubscribe(final Flow.Subscription subscription) {
        this.inputSubscription = subscription;
        this.inner.onSubscribe(new Flow.Subscription() { // from class: dev.restate.sdk.core.MessageDecoder.1
            @Override // java.util.concurrent.Flow.Subscription
            public void request(long j) {
                subscription.request(Long.MAX_VALUE);
                MessageDecoder.this.handleSubscriptionRequest(j);
            }

            @Override // java.util.concurrent.Flow.Subscription
            public void cancel() {
                subscription.cancel();
            }
        });
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onNext(ByteBuffer byteBuffer) {
        offer(UnsafeByteOperations.unsafeWrap(byteBuffer));
        tryProgress();
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onError(Throwable th) {
        if (this.inputSubscription == null) {
            return;
        }
        this.inner.onError(th);
    }

    @Override // java.util.concurrent.Flow.Subscriber
    public void onComplete() {
        if (this.inputSubscription == null) {
            return;
        }
        this.inner.onComplete();
    }

    private void handleSubscriptionRequest(long j) {
        if (j == Long.MAX_VALUE) {
            this.invocationInputRequests = j;
        } else {
            this.invocationInputRequests += j;
            if (this.invocationInputRequests < 0) {
                this.invocationInputRequests = Long.MAX_VALUE;
            }
        }
        tryProgress();
    }

    private void tryProgress() {
        InvocationInput poll;
        if (this.inputSubscription == null) {
            return;
        }
        if (this.state == State.FAILED) {
            this.inner.onError(this.lastParsingFailure);
            this.inputSubscription.cancel();
            this.inputSubscription = null;
        }
        while (this.invocationInputRequests > 0 && (poll = this.parsedMessages.poll()) != null) {
            this.invocationInputRequests--;
            this.inner.onNext(poll);
        }
    }

    private void offer(ByteString byteString) {
        if (this.state != State.FAILED) {
            this.internalBuffer = this.internalBuffer.concat(byteString);
            tryConsumeInternalBuffer();
        }
    }

    private void tryConsumeInternalBuffer() {
        while (this.state != State.FAILED && this.internalBuffer.size() >= wantBytes()) {
            if (this.state == State.WAITING_HEADER) {
                try {
                    this.lastParsedMessageHeader = MessageHeader.parse(readLongAtBeginning());
                    this.state = State.WAITING_PAYLOAD;
                    sliceInternalBuffer(8);
                } catch (RuntimeException e) {
                    this.lastParsingFailure = e;
                    this.state = State.FAILED;
                }
            } else {
                try {
                    this.parsedMessages.offer(InvocationInput.of(this.lastParsedMessageHeader, (MessageLite) this.lastParsedMessageHeader.getType().messageParser().parseFrom(this.internalBuffer.substring(0, this.lastParsedMessageHeader.getLength()))));
                    this.state = State.WAITING_HEADER;
                    sliceInternalBuffer(this.lastParsedMessageHeader.getLength());
                } catch (InvalidProtocolBufferException e2) {
                    this.lastParsingFailure = new RuntimeException("Cannot parse the protobuf message", e2);
                    this.state = State.FAILED;
                } catch (RuntimeException e3) {
                    this.lastParsingFailure = e3;
                    this.state = State.FAILED;
                }
            }
        }
    }

    private int wantBytes() {
        if (this.state == State.WAITING_HEADER) {
            return 8;
        }
        return this.lastParsedMessageHeader.getLength();
    }

    private void sliceInternalBuffer(int i) {
        if (this.internalBuffer.size() == i) {
            this.internalBuffer = ByteString.EMPTY;
        } else {
            this.internalBuffer = this.internalBuffer.substring(i);
        }
    }

    private long readLongAtBeginning() {
        return (this.internalBuffer.byteAt(7) & 255) | ((this.internalBuffer.byteAt(6) & 255) << 8) | ((this.internalBuffer.byteAt(5) & 255) << 16) | ((this.internalBuffer.byteAt(4) & 255) << 24) | ((this.internalBuffer.byteAt(3) & 255) << 32) | ((this.internalBuffer.byteAt(2) & 255) << 40) | ((this.internalBuffer.byteAt(1) & 255) << 48) | ((this.internalBuffer.byteAt(0) & 255) << 56);
    }
}
