package org.noear.socketd.transport.stream.impl;

import org.noear.socketd.transport.core.MessageInternal;
import org.noear.socketd.transport.core.Reply;
import org.noear.socketd.transport.stream.SubscribeStream;
import org.noear.socketd.utils.IoConsumer;

/* loaded from: input_file:org/noear/socketd/transport/stream/impl/SubscribeStreamImpl.class */
public class SubscribeStreamImpl extends StreamBase<SubscribeStream> implements SubscribeStream {
    private IoConsumer<Reply> doOnReply;
    private boolean isDone;

    public SubscribeStreamImpl(String str, long j) {
        super(str, 2, j);
    }

    @Override // org.noear.socketd.transport.stream.Stream
    public boolean isDone() {
        return this.isDone;
    }

    @Override // org.noear.socketd.transport.stream.StreamInternal
    public void onReply(MessageInternal messageInternal) {
        this.isDone = messageInternal.isEnd();
        try {
            if (this.doOnReply != null) {
                this.doOnReply.accept(messageInternal);
            }
        } catch (Throwable th) {
            onError(th);
        }
    }

    @Override // org.noear.socketd.transport.stream.SubscribeStream
    public SubscribeStream thenReply(IoConsumer<Reply> ioConsumer) {
        this.doOnReply = ioConsumer;
        return this;
    }
}
