package io.georocket;

import io.georocket.constants.AddressConstants;
import io.georocket.constants.ConfigConstants;
import io.georocket.index.xml.XMLCRSIndexer;
import io.georocket.input.Splitter;
import io.georocket.input.geojson.GeoJsonSplitter;
import io.georocket.input.xml.FirstLevelSplitter;
import io.georocket.storage.ChunkMeta;
import io.georocket.storage.IndexMeta;
import io.georocket.storage.RxStore;
import io.georocket.storage.StoreFactory;
import io.georocket.util.JsonParserTransformer;
import io.georocket.util.MimeTypeUtils;
import io.georocket.util.RxUtils;
import io.georocket.util.StringWindow;
import io.georocket.util.UTF8BomFilter;
import io.georocket.util.Window;
import io.georocket.util.XMLParserTransformer;
import io.georocket.util.io.RxGzipReadStream;
import io.vertx.core.file.OpenOptions;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.buffer.Buffer;
import io.vertx.rxjava.core.eventbus.Message;
import io.vertx.rxjava.core.file.AsyncFile;
import io.vertx.rxjava.core.file.FileSystem;
import io.vertx.rxjava.core.streams.ReadStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import rx.Completable;
import rx.Observable;
import rx.Single;

/* loaded from: input_file:io/georocket/ImporterVerticle.class */
public class ImporterVerticle extends AbstractVerticle {
    private static Logger log = LoggerFactory.getLogger(ImporterVerticle.class);
    private static final int MAX_RETRIES = 5;
    private static final int RETRY_INTERVAL = 1000;
    private static final int MAX_PARALLEL_IMPORTS = 1;
    protected RxStore store;
    private String incoming;
    private boolean paused;
    private Set<AsyncFile> filesBeingImported = new HashSet();
    private boolean reportActivities;

    public void start() {
        log.info("Launching importer ...");
        this.reportActivities = config().getBoolean(ConfigConstants.REPORT_ACTIVITIES, false).booleanValue();
        this.store = new RxStore(StoreFactory.createStore(getVertx()));
        this.incoming = config().getString(ConfigConstants.STORAGE_FILE_PATH) + "/incoming";
        this.vertx.eventBus().localConsumer(AddressConstants.IMPORTER_IMPORT).toObservable().onBackpressureBuffer().flatMapCompletable(message -> {
            return onImport(message).onErrorComplete();
        }, false, MAX_PARALLEL_IMPORTS).subscribe(message2 -> {
        }, th -> {
            log.fatal("Could not import file", th);
        });
        this.vertx.eventBus().localConsumer(AddressConstants.IMPORTER_PAUSE, this::onPause);
    }

    protected Completable onImport(Message<JsonObject> message) {
        JsonObject jsonObject = (JsonObject) message.body();
        String string = jsonObject.getString("filename");
        String str = this.incoming + "/" + string;
        String string2 = jsonObject.getString("layer", "/");
        String string3 = jsonObject.getString("contentType");
        String string4 = jsonObject.getString("correlationId");
        String string5 = jsonObject.getString("fallbackCRSString");
        String string6 = jsonObject.getString("contentEncoding");
        JsonArray jsonArray = jsonObject.getJsonArray("tags");
        List list = jsonArray != null ? (List) jsonArray.stream().flatMap(obj -> {
            return obj != null ? Stream.of(obj.toString()) : Stream.of((Object[]) new String[0]);
        }).collect(Collectors.toList()) : null;
        JsonObject jsonObject2 = jsonObject.getJsonObject("properties");
        Map map = jsonObject2 != null ? jsonObject2.getMap() : null;
        long currentTimeMillis = System.currentTimeMillis();
        onImportingStarted(string4, str, string3, string2, jsonArray, currentTimeMillis);
        FileSystem fileSystem = this.vertx.fileSystem();
        return fileSystem.rxOpen(str, new OpenOptions().setCreate(false).setWrite(false)).flatMap(asyncFile -> {
            this.filesBeingImported.add(asyncFile);
            return importFile(string3, asyncFile, string4, string, currentTimeMillis, string2, list, map, string5, string6).doAfterTerminate(() -> {
                log.debug("Deleting " + str + " from incoming folder");
                this.filesBeingImported.remove(asyncFile);
                asyncFile.rxClose().flatMap(r5 -> {
                    return fileSystem.rxDelete(str);
                }).subscribe(r1 -> {
                }, th -> {
                    log.error("Could not delete file from 'incoming' folder", th);
                });
            });
        }).doOnSuccess(num -> {
            onImportingFinished(string4, str, string3, string2, num, System.currentTimeMillis() - currentTimeMillis, null);
        }).doOnError(th -> {
            onImportingFinished(string4, str, string3, string2, null, System.currentTimeMillis() - currentTimeMillis, th);
        }).toCompletable();
    }

    protected void onImportingStarted(String str, String str2, String str3, String str4, JsonArray jsonArray, long j) {
        log.info(String.format("Importing [%s] '%s' to layer '%s' started at '%d'", str, str2, str4, Long.valueOf(j)));
        if (this.reportActivities) {
            this.vertx.eventBus().publish(AddressConstants.ACTIVITIES, new JsonObject().put("activity", "import").put("state", "store").put("owner", deploymentID()).put("action", "enter").put("correlationId", str).put("timestamp", Long.valueOf(j)).put("mimeType", str3).put("tags", jsonArray).put("layer", str4));
        }
    }

    protected void onImportingFinished(String str, String str2, String str3, String str4, Integer num, long j, Throwable th) {
        if (th == null) {
            log.info(String.format("Finished importing [%s] %d chunks '%s' to layer '%s' after %d ms", str, num, str2, str4, Long.valueOf(j)));
        } else {
            log.error(String.format("Failed to import [%s] '%s' to layer '%s' after %d ms", str, str2, str4, Long.valueOf(j)), th);
        }
        if (this.reportActivities) {
            JsonObject put = new JsonObject().put("activity", "import").put("state", "store").put("owner", deploymentID()).put("action", "leave").put("correlationId", str).put("chunkCount", num).put("duration", Long.valueOf(j)).put("mimeType", str3).put("layer", str4);
            if (th != null) {
                put.put("error", th.getMessage());
            }
            this.vertx.eventBus().publish(AddressConstants.ACTIVITIES, put);
        }
    }

    protected Single<Integer> importFile(String str, ReadStream<Buffer> readStream, String str2, String str3, long j, String str4, List<String> list, Map<String, Object> map, String str5, String str6) {
        if ("gzip".equals(str6)) {
            log.debug("Importing file compressed with GZIP");
            readStream = new RxGzipReadStream<>(readStream);
        } else if (str6 != null && !str6.isEmpty()) {
            log.warn("Unknown content encoding: `" + str6 + "'. Trying anyway.");
        }
        return (MimeTypeUtils.belongsTo(str, "application", "xml") || MimeTypeUtils.belongsTo(str, "text", "xml")) ? importXML(readStream, str2, str3, j, str4, list, map, str5) : MimeTypeUtils.belongsTo(str, "application", "json") ? importJSON(readStream, str2, str3, j, str4, list, map) : Single.error(new NoStackTraceThrowable(String.format("Received an unexpected content type '%s' while trying to import file '%s'", str, str3)));
    }

    protected Single<Integer> importXML(ReadStream<Buffer> readStream, String str, String str2, long j, String str3, List<String> list, Map<String, Object> map, String str4) {
        UTF8BomFilter uTF8BomFilter = new UTF8BomFilter();
        Window window = new Window();
        FirstLevelSplitter firstLevelSplitter = new FirstLevelSplitter(window);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        XMLCRSIndexer xMLCRSIndexer = new XMLCRSIndexer();
        Observable map2 = readStream.toObservable().map((v0) -> {
            return v0.getDelegate();
        });
        uTF8BomFilter.getClass();
        Observable map3 = map2.map(uTF8BomFilter::filter);
        window.getClass();
        Observable doOnNext = map3.doOnNext(window::append).compose(new XMLParserTransformer()).doOnNext(xMLStreamEvent -> {
            if (xMLCRSIndexer.getCRS() == null) {
                xMLCRSIndexer.onEvent(xMLStreamEvent);
            }
        });
        firstLevelSplitter.getClass();
        return doOnNext.flatMap((v1) -> {
            return r1.onEventObservable(v1);
        }).flatMapSingle(result -> {
            String str5 = str4;
            if (xMLCRSIndexer.getCRS() != null) {
                str5 = xMLCRSIndexer.getCRS();
            }
            return addToStoreWithPause(result, str3, new IndexMeta(str, str2, j, list, map, str5), readStream, atomicInteger).toSingleDefault(Integer.valueOf(MAX_PARALLEL_IMPORTS));
        }).count().toSingle();
    }

    protected Single<Integer> importJSON(ReadStream<Buffer> readStream, String str, String str2, long j, String str3, List<String> list, Map<String, Object> map) {
        UTF8BomFilter uTF8BomFilter = new UTF8BomFilter();
        StringWindow stringWindow = new StringWindow();
        GeoJsonSplitter geoJsonSplitter = new GeoJsonSplitter(stringWindow);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        Observable map2 = readStream.toObservable().map((v0) -> {
            return v0.getDelegate();
        });
        uTF8BomFilter.getClass();
        Observable map3 = map2.map(uTF8BomFilter::filter);
        stringWindow.getClass();
        Observable compose = map3.doOnNext(stringWindow::append).compose(new JsonParserTransformer());
        geoJsonSplitter.getClass();
        return compose.flatMap((v1) -> {
            return r1.onEventObservable(v1);
        }).flatMapSingle(result -> {
            return addToStoreWithPause(result, str3, new IndexMeta(str, str2, j, list, map, (String) null), readStream, atomicInteger).toSingleDefault(Integer.valueOf(MAX_PARALLEL_IMPORTS));
        }).count().toSingle();
    }

    private void onPause(Message<Boolean> message) {
        Boolean bool = (Boolean) message.body();
        if (bool == null || !bool.booleanValue()) {
            if (this.paused) {
                log.info("Resuming import");
                this.paused = false;
                Iterator<AsyncFile> it = this.filesBeingImported.iterator();
                while (it.hasNext()) {
                    it.next().resume();
                }
                return;
            }
            return;
        }
        if (this.paused) {
            return;
        }
        log.info("Pausing import");
        this.paused = true;
        Iterator<AsyncFile> it2 = this.filesBeingImported.iterator();
        while (it2.hasNext()) {
            it2.next().pause();
        }
    }

    private Completable addToStoreWithPause(Splitter.Result<? extends ChunkMeta> result, String str, IndexMeta indexMeta, ReadStream<Buffer> readStream, AtomicInteger atomicInteger) {
        readStream.pause();
        atomicInteger.incrementAndGet();
        return addToStore(result.getChunk(), result.getMeta(), str, indexMeta).doOnCompleted(() -> {
            if (atomicInteger.decrementAndGet() != 0 || this.paused) {
                return;
            }
            readStream.resume();
        });
    }

    protected Completable addToStore(String str, ChunkMeta chunkMeta, String str2, IndexMeta indexMeta) {
        return Completable.defer(() -> {
            return this.store.rxAdd(str, chunkMeta, str2, indexMeta);
        }).retryWhen(RxUtils.makeRetry(5, RETRY_INTERVAL, log));
    }
}
