package io.georocket.storage.mongodb;

import com.mongodb.async.client.gridfs.AsyncInputStream;
import io.georocket.storage.ChunkReadStream;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import java.nio.ByteBuffer;

/* loaded from: input_file:io/georocket/storage/mongodb/MongoDBChunkReadStream.class */
public class MongoDBChunkReadStream implements ChunkReadStream {
    private static Logger log = LoggerFactory.getLogger(MongoDBChunkReadStream.class);
    private final AsyncInputStream is;
    private final long size;
    private final int readBufferSize;
    private final Context context;
    private boolean closed;
    private boolean paused;
    private boolean readInProgress;
    private Handler<Buffer> handler;
    private Handler<Void> endHandler;
    private Handler<Throwable> exceptionHandler;

    public MongoDBChunkReadStream(AsyncInputStream asyncInputStream, long j, int i, Context context) {
        this.is = asyncInputStream;
        this.size = j;
        this.readBufferSize = i;
        this.context = context;
    }

    private void check() {
        if (this.closed) {
            throw new IllegalStateException("Read stream is closed");
        }
    }

    private void doRead() {
        if (this.readInProgress) {
            return;
        }
        this.readInProgress = true;
        doRead(Buffer.buffer(this.readBufferSize), ByteBuffer.allocate(this.readBufferSize), asyncResult -> {
            if (!asyncResult.succeeded()) {
                handleException(asyncResult.cause());
                return;
            }
            this.readInProgress = false;
            Buffer buffer = (Buffer) asyncResult.result();
            if (buffer.length() == 0) {
                handleEnd();
                return;
            }
            handleData(buffer);
            if (this.paused || this.handler == null) {
                return;
            }
            doRead();
        });
    }

    private void doRead(Buffer buffer, ByteBuffer byteBuffer, Handler<AsyncResult<Buffer>> handler) {
        this.is.read(byteBuffer, (num, th) -> {
            if (th != null) {
                this.context.runOnContext(r5 -> {
                    handler.handle(Future.failedFuture(th));
                });
            } else if (num.intValue() == -1 || !byteBuffer.hasRemaining()) {
                this.context.runOnContext(r7 -> {
                    byteBuffer.flip();
                    buffer.setBytes(0, byteBuffer);
                    handler.handle(Future.succeededFuture(buffer));
                });
            } else {
                doRead(buffer, byteBuffer, handler);
            }
        });
    }

    private void handleData(Buffer buffer) {
        if (this.handler != null) {
            this.handler.handle(buffer);
        }
    }

    private void handleEnd() {
        if (this.endHandler != null) {
            this.endHandler.handle((Object) null);
        }
    }

    private void handleException(Throwable th) {
        if (this.exceptionHandler == null || !(th instanceof Exception)) {
            log.error("Unhandled exception", th);
        } else {
            this.exceptionHandler.handle(th);
        }
    }

    public long getSize() {
        return this.size;
    }

    public ReadStream<Buffer> exceptionHandler(Handler<Throwable> handler) {
        check();
        this.exceptionHandler = handler;
        return this;
    }

    public ReadStream<Buffer> handler(Handler<Buffer> handler) {
        check();
        this.handler = handler;
        if (handler != null && !this.paused && !this.closed) {
            doRead();
        }
        return this;
    }

    public ReadStream<Buffer> pause() {
        check();
        this.paused = true;
        return this;
    }

    public ReadStream<Buffer> resume() {
        check();
        if (this.paused && !this.closed) {
            this.paused = false;
            if (this.handler != null) {
                doRead();
            }
        }
        return this;
    }

    public ReadStream<Buffer> endHandler(Handler<Void> handler) {
        check();
        this.endHandler = handler;
        return this;
    }

    public void close(Handler<AsyncResult<Void>> handler) {
        check();
        this.closed = true;
        this.is.close((r6, th) -> {
            Future future = Future.future();
            if (th != null) {
                future.fail(th);
            } else {
                future.complete(r6);
            }
            if (handler != null) {
                this.context.runOnContext(r5 -> {
                    handler.handle(future);
                });
            }
        });
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m26exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
