package io.georocket.storage.hdfs;

import com.google.common.base.Preconditions;
import io.georocket.constants.ConfigConstants;
import io.georocket.storage.ChunkReadStream;
import io.georocket.storage.indexed.IndexedStore;
import io.georocket.util.PathUtils;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.Queue;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/* loaded from: input_file:io/georocket/storage/hdfs/HDFSStore.class */
public class HDFSStore extends IndexedStore {
    private final Vertx vertx;
    private final Configuration configuration;
    private final String root;
    private FileSystem fs;

    public HDFSStore(Vertx vertx) {
        super(vertx);
        this.vertx = vertx;
        JsonObject config = vertx.getOrCreateContext().config();
        this.root = config.getString(ConfigConstants.STORAGE_HDFS_PATH);
        Preconditions.checkNotNull(this.root, "Missing configuration item \"georocket.storage.hdfs.path\"");
        String string = config.getString(ConfigConstants.STORAGE_HDFS_DEFAULT_FS);
        Preconditions.checkNotNull(string, "Missing configuration item \"georocket.storage.hdfs.defaultFS\"");
        this.configuration = new Configuration();
        this.configuration.set("fs.defaultFS", string);
    }

    private synchronized FileSystem getFS() throws IOException {
        if (this.fs == null) {
            this.fs = FileSystem.get(this.configuration);
        }
        return this.fs;
    }

    public void getOne(String str, Handler<AsyncResult<ChunkReadStream>> handler) {
        this.vertx.executeBlocking(future -> {
            long len;
            FSDataInputStream open;
            try {
                Path path = new Path(PathUtils.join(new String[]{this.root, str}));
                synchronized (this) {
                    FileSystem fs = getFS();
                    len = fs.getFileStatus(path).getLen();
                    open = fs.open(path);
                }
                future.complete(Pair.of(Long.valueOf(len), open));
            } catch (IOException e) {
                future.fail(e);
            }
        }, asyncResult -> {
            if (asyncResult.failed()) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
            } else {
                Pair pair = (Pair) asyncResult.result();
                handler.handle(Future.succeededFuture(new InputStreamChunkReadStream((InputStream) pair.getValue(), ((Long) pair.getKey()).longValue(), this.vertx)));
            }
        });
    }

    private synchronized FSDataOutputStream createFile(String str) throws IOException {
        return getFS().create(new Path(PathUtils.join(new String[]{this.root, str})), false);
    }

    @Override // io.georocket.storage.indexed.IndexedStore
    protected void doAddChunk(String str, String str2, Handler<AsyncResult<String>> handler) {
        if (str2 == null || str2.isEmpty()) {
            str2 = "/";
        }
        String join = PathUtils.join(new String[]{str2, generateChunkId()});
        this.vertx.executeBlocking(future -> {
            try {
                try {
                    FSDataOutputStream createFile = createFile(join);
                    Throwable th = null;
                    OutputStreamWriter outputStreamWriter = new OutputStreamWriter((OutputStream) createFile, StandardCharsets.UTF_8);
                    Throwable th2 = null;
                    try {
                        outputStreamWriter.write(str);
                        if (outputStreamWriter != null) {
                            if (0 != 0) {
                                try {
                                    outputStreamWriter.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                outputStreamWriter.close();
                            }
                        }
                        if (createFile != null) {
                            if (0 != 0) {
                                try {
                                    createFile.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                createFile.close();
                            }
                        }
                        future.complete(join);
                    } catch (Throwable th5) {
                        if (outputStreamWriter != null) {
                            if (0 != 0) {
                                try {
                                    outputStreamWriter.close();
                                } catch (Throwable th6) {
                                    th2.addSuppressed(th6);
                                }
                            } else {
                                outputStreamWriter.close();
                            }
                        }
                        throw th5;
                    }
                } finally {
                }
            } catch (IOException e) {
                future.fail(e);
            }
        }, handler);
    }

    @Override // io.georocket.storage.indexed.IndexedStore
    protected void doDeleteChunks(Queue<String> queue, Handler<AsyncResult<Void>> handler) {
        if (queue.isEmpty()) {
            handler.handle(Future.succeededFuture());
        } else {
            String join = PathUtils.join(new String[]{this.root, queue.poll()});
            this.vertx.executeBlocking(future -> {
                try {
                    synchronized (this) {
                        FileSystem fs = getFS();
                        Path path = new Path(join);
                        if (fs.exists(path)) {
                            fs.delete(path, false);
                        }
                    }
                    future.complete();
                } catch (IOException e) {
                    future.fail(e);
                }
            }, handler);
        }
    }
}
