package io.georocket.index;

import com.google.common.collect.ImmutableList;
import io.georocket.constants.AddressConstants;
import io.georocket.constants.ConfigConstants;
import io.georocket.index.elasticsearch.ElasticsearchClient;
import io.georocket.index.elasticsearch.ElasticsearchClientFactory;
import io.georocket.index.generic.DefaultMetaIndexerFactory;
import io.georocket.index.xml.JsonIndexerFactory;
import io.georocket.index.xml.MetaIndexer;
import io.georocket.index.xml.MetaIndexerFactory;
import io.georocket.index.xml.StreamIndexer;
import io.georocket.index.xml.XMLIndexerFactory;
import io.georocket.query.DefaultQueryCompiler;
import io.georocket.storage.ChunkMeta;
import io.georocket.storage.ChunkReadStream;
import io.georocket.storage.GeoJsonChunkMeta;
import io.georocket.storage.IndexMeta;
import io.georocket.storage.JsonChunkMeta;
import io.georocket.storage.RxStore;
import io.georocket.storage.StoreFactory;
import io.georocket.storage.XMLChunkMeta;
import io.georocket.util.FilteredServiceLoader;
import io.georocket.util.JsonParserTransformer;
import io.georocket.util.MapUtils;
import io.georocket.util.MimeTypeUtils;
import io.georocket.util.RxUtils;
import io.georocket.util.StreamEvent;
import io.georocket.util.ThrowableHelper;
import io.georocket.util.XMLParserTransformer;
import io.georocket.util.io.DelegateChunkReadStream;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.NoStackTraceThrowable;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.rx.java.RxHelper;
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.core.eventbus.Message;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.jooq.lambda.Seq;
import org.jooq.lambda.tuple.Tuple;
import org.jooq.lambda.tuple.Tuple2;
import org.jooq.lambda.tuple.Tuple3;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.functions.Func1;

/* loaded from: input_file:io/georocket/index/IndexerVerticle.class */
public class IndexerVerticle extends AbstractVerticle {
    private static Logger log = LoggerFactory.getLogger(IndexerVerticle.class);
    private static final long BUFFER_TIMESPAN = 5000;
    private static final int MAX_RETRIES = 5;
    private static final int RETRY_INTERVAL = 1000;
    private static final String INDEX_NAME = "georocket";
    private static final String TYPE_NAME = "object";
    private ElasticsearchClient client;
    private RxStore store;
    private DefaultQueryCompiler queryCompiler;
    private List<? extends IndexerFactory> indexerFactories;
    private List<XMLIndexerFactory> xmlIndexerFactories;
    private List<JsonIndexerFactory> jsonIndexerFactories;
    private List<MetaIndexerFactory> metaIndexerFactories;
    private boolean reportActivities;
    private int maxBulkSize;
    private int maxParallelInserts;
    private int maxQueuedChunks;
    private int queuedAddMessages;
    private boolean pauseImport;

    public void start(Future<Void> future) {
        log.info("Launching indexer ...");
        this.reportActivities = config().getBoolean(ConfigConstants.REPORT_ACTIVITIES, false).booleanValue();
        this.maxBulkSize = config().getInteger(ConfigConstants.INDEX_MAX_BULK_SIZE, Integer.valueOf(ConfigConstants.DEFAULT_INDEX_MAX_BULK_SIZE)).intValue();
        this.maxParallelInserts = config().getInteger(ConfigConstants.INDEX_MAX_PARALLEL_INSERTS, 5).intValue();
        this.maxQueuedChunks = config().getInteger(ConfigConstants.INDEX_MAX_QUEUED_CHUNKS, Integer.valueOf(ConfigConstants.DEFAULT_INDEX_MAX_QUEUED_CHUNKS)).intValue();
        this.indexerFactories = ImmutableList.copyOf(FilteredServiceLoader.load(IndexerFactory.class));
        this.xmlIndexerFactories = ImmutableList.copyOf(Seq.seq(this.indexerFactories).filter(indexerFactory -> {
            return indexerFactory instanceof XMLIndexerFactory;
        }).cast(XMLIndexerFactory.class));
        this.jsonIndexerFactories = ImmutableList.copyOf(Seq.seq(this.indexerFactories).filter(indexerFactory2 -> {
            return indexerFactory2 instanceof JsonIndexerFactory;
        }).cast(JsonIndexerFactory.class));
        this.metaIndexerFactories = ImmutableList.copyOf(Seq.seq(this.indexerFactories).filter(indexerFactory3 -> {
            return indexerFactory3 instanceof MetaIndexerFactory;
        }).cast(MetaIndexerFactory.class));
        this.store = new RxStore(StoreFactory.createStore(getVertx()));
        this.queryCompiler = createQueryCompiler();
        this.queryCompiler.setQueryCompilers(this.indexerFactories);
        new ElasticsearchClientFactory(this.vertx).createElasticsearchClient("georocket").doOnSuccess(elasticsearchClient -> {
            this.client = elasticsearchClient;
        }).flatMapCompletable(elasticsearchClient2 -> {
            return this.client.ensureIndex();
        }).andThen(Completable.defer(() -> {
            return ensureMapping();
        })).subscribe(() -> {
            registerMessageConsumers();
            future.complete();
        }, th -> {
            future.fail(th);
        });
    }

    private DefaultQueryCompiler createQueryCompiler() {
        try {
            return (DefaultQueryCompiler) Class.forName(this.vertx.getOrCreateContext().config().getString(ConfigConstants.QUERY_COMPILER_CLASS, DefaultQueryCompiler.class.getName())).newInstance();
        } catch (ReflectiveOperationException e) {
            throw new RuntimeException("Could not create a DefaultQueryCompiler", e);
        }
    }

    public void stop() {
        this.client.close();
    }

    private void registerMessageConsumers() {
        registerAdd();
        registerDelete();
        registerQuery();
    }

    private Func1<Observable<? extends Throwable>, Observable<Long>> makeRetry() {
        return RxUtils.makeRetry(5, RETRY_INTERVAL, log);
    }

    private void registerAdd() {
        this.vertx.eventBus().consumer(AddressConstants.INDEXER_ADD).toObservable().doOnNext(message -> {
            this.queuedAddMessages++;
            if (this.queuedAddMessages <= this.maxQueuedChunks || this.pauseImport) {
                return;
            }
            this.pauseImport = true;
            this.vertx.eventBus().send(AddressConstants.IMPORTER_PAUSE, Boolean.valueOf(this.pauseImport));
        }).buffer(BUFFER_TIMESPAN, TimeUnit.MILLISECONDS, this.maxBulkSize).onBackpressureBuffer().flatMapCompletable(list -> {
            this.queuedAddMessages -= list.size();
            if (this.pauseImport && this.queuedAddMessages <= this.maxQueuedChunks / 2) {
                this.pauseImport = false;
                this.vertx.eventBus().send(AddressConstants.IMPORTER_PAUSE, Boolean.valueOf(this.pauseImport));
            }
            return onAdd(list).onErrorComplete(th -> {
                log.error("Could not index document", th);
                list.forEach(message2 -> {
                    message2.fail(ThrowableHelper.throwableToCode(th), th.getMessage());
                });
                return true;
            });
        }, false, this.maxParallelInserts).toCompletable().subscribe(() -> {
        }, th -> {
            log.fatal("Could not index document", th);
        });
    }

    private void registerDelete() {
        this.vertx.eventBus().consumer(AddressConstants.INDEXER_DELETE).toObservable().subscribe(message -> {
            onDelete((JsonObject) message.body()).subscribe(() -> {
                message.reply((Object) null);
            }, th -> {
                log.error("Could not delete document", th);
                message.fail(ThrowableHelper.throwableToCode(th), ThrowableHelper.throwableToMessage(th, ""));
            });
        });
    }

    private void registerQuery() {
        this.vertx.eventBus().consumer(AddressConstants.INDEXER_QUERY).toObservable().subscribe(message -> {
            onQuery((JsonObject) message.body()).subscribe(jsonObject -> {
                message.reply(jsonObject);
            }, th -> {
                log.error("Could not perform query", th);
                message.fail(ThrowableHelper.throwableToCode(th), ThrowableHelper.throwableToMessage(th, ""));
            });
        });
    }

    private void onDeletingStarted(long j, JsonArray jsonArray, long j2, long j3) {
        if (jsonArray.size() < j3) {
            log.info("Deleting " + jsonArray.size() + "/" + j3 + " chunks from index ...");
        } else {
            log.info("Deleting " + jsonArray.size() + " chunks from index ...");
        }
        if (this.reportActivities) {
            this.vertx.eventBus().publish(AddressConstants.ACTIVITIES, new JsonObject().put("activity", "delete").put("state", "index").put("owner", deploymentID()).put("action", "enter").put("chunkCount", Integer.valueOf(jsonArray.size())).put("totalChunkCount", Long.valueOf(j2)).put("remainingChunkCount", Long.valueOf(j3)).put("paths", jsonArray).put("timestamp", Long.valueOf(j)));
        }
    }

    private void onDeletingFinished(long j, JsonArray jsonArray, long j2, long j3, String str) {
        if (str != null) {
            log.error("Deleting chunks failed: " + str);
        } else {
            log.info("Finished deleting " + jsonArray.size() + " chunks from index in " + j + " ms");
        }
        if (this.reportActivities) {
            JsonObject put = new JsonObject().put("activity", "delete").put("state", "index").put("owner", deploymentID()).put("action", "leave").put("chunkCount", Integer.valueOf(jsonArray.size())).put("totalChunkCount", Long.valueOf(j2)).put("remainingChunkCount", Long.valueOf(j3)).put("paths", jsonArray).put("duration", Long.valueOf(j));
            if (str != null) {
                put.put("error", str);
            }
            this.vertx.eventBus().publish(AddressConstants.ACTIVITIES, put);
        }
    }

    private Completable ensureMapping() {
        HashMap hashMap = new HashMap();
        this.indexerFactories.stream().filter(indexerFactory -> {
            return indexerFactory instanceof DefaultMetaIndexerFactory;
        }).forEach(indexerFactory2 -> {
            MapUtils.deepMerge(hashMap, indexerFactory2.getMapping());
        });
        this.indexerFactories.stream().filter(indexerFactory3 -> {
            return !(indexerFactory3 instanceof DefaultMetaIndexerFactory);
        }).forEach(indexerFactory4 -> {
            MapUtils.deepMerge(hashMap, indexerFactory4.getMapping());
        });
        return this.client.putMapping(TYPE_NAME, new JsonObject(hashMap)).toCompletable();
    }

    private Completable insertDocuments(String str, List<Tuple3<String, JsonObject, Message<JsonObject>>> list) {
        long currentTimeMillis = System.currentTimeMillis();
        List<String> list2 = Seq.seq(list).map((v0) -> {
            return v0.v1();
        }).toList();
        onIndexingStarted(currentTimeMillis, list2);
        List<Tuple2<String, JsonObject>> list3 = Seq.seq(list).map((v0) -> {
            return v0.limit2();
        }).toList();
        List list4 = Seq.seq(list).map((v0) -> {
            return v0.v3();
        }).toList();
        return this.client.bulkInsert(str, list3).flatMapCompletable(jsonObject -> {
            JsonArray jsonArray = jsonObject.getJsonArray("items");
            for (int i = 0; i < jsonArray.size(); i++) {
                JsonObject jsonObject = jsonArray.getJsonObject(i).getJsonObject("index");
                Message message = (Message) list4.get(i);
                if (this.client.bulkResponseItemHasErrors(jsonObject)) {
                    message.fail(500, this.client.bulkResponseItemGetErrorMessage(jsonObject));
                } else {
                    message.reply((Object) null);
                }
            }
            onIndexingFinished(System.currentTimeMillis() - currentTimeMillis, Seq.seq(list4).map((v0) -> {
                return v0.body();
            }).map(jsonObject2 -> {
                return jsonObject2.getString("correlationId");
            }).toList(), list2, this.client.bulkResponseGetErrorMessage(jsonObject));
            return Completable.complete();
        });
    }

    private void onIndexingStarted(long j, List<String> list) {
        if (this.queuedAddMessages > 0) {
            log.info("Indexing " + list.size() + "/" + (list.size() + this.queuedAddMessages) + " chunks");
        } else {
            log.info("Indexing " + list.size() + " chunks");
        }
        if (this.reportActivities) {
            this.vertx.eventBus().publish(AddressConstants.ACTIVITIES, new JsonObject().put("activity", "import").put("state", "index").put("owner", deploymentID()).put("action", "enter").put("chunkIds", new JsonArray(list)).put("timestamp", Long.valueOf(j)));
        }
    }

    private void onIndexingFinished(long j, List<String> list, List<String> list2, String str) {
        if (str != null) {
            log.error("Indexing failed: " + str);
        } else {
            log.info("Finished indexing " + list.size() + " chunks in " + j + " ms");
        }
        if (this.reportActivities) {
            JsonObject put = new JsonObject().put("activity", "import").put("state", "index").put("owner", deploymentID()).put("action", "leave").put("correlationIds", new JsonArray(list)).put("chunkIds", new JsonArray(list2)).put("duration", Long.valueOf(j));
            if (str != null) {
                put.put("error", str);
            }
            this.vertx.eventBus().publish(AddressConstants.ACTIVITIES, put);
        }
    }

    private Single<ChunkReadStream> getChunkFromStore(String str) {
        Buffer buffer = IndexableChunkCache.getInstance().get(str);
        return buffer != null ? Single.just(new DelegateChunkReadStream(buffer)) : this.store.rxGetOne(str);
    }

    private Observable<Map<String, Object>> openChunkToDocument(String str, ChunkMeta chunkMeta, IndexMeta indexMeta) {
        return Observable.defer(() -> {
            return getChunkFromStore(str).flatMapObservable(chunkReadStream -> {
                List<JsonIndexerFactory> list;
                JsonParserTransformer xMLParserTransformer;
                String mimeType = chunkMeta.getMimeType();
                if (MimeTypeUtils.belongsTo(mimeType, "application", "xml") || MimeTypeUtils.belongsTo(mimeType, "text", "xml")) {
                    list = this.xmlIndexerFactories;
                    xMLParserTransformer = new XMLParserTransformer();
                } else {
                    if (!MimeTypeUtils.belongsTo(mimeType, "application", "json")) {
                        return Observable.error(new NoStackTraceThrowable(String.format("Unexpected mime type '%s' while trying to index chunk '%s'", mimeType, str)));
                    }
                    list = this.jsonIndexerFactories;
                    xMLParserTransformer = new JsonParserTransformer();
                }
                HashMap hashMap = new HashMap();
                Iterator<MetaIndexerFactory> it = this.metaIndexerFactories.iterator();
                while (it.hasNext()) {
                    MetaIndexer createIndexer = it.next().createIndexer();
                    createIndexer.onIndexChunk(str, chunkMeta, indexMeta);
                    hashMap.putAll(createIndexer.getResult());
                }
                Observable<Map<String, Object>> chunkToDocument = chunkToDocument(chunkReadStream, indexMeta.getFallbackCRSString(), xMLParserTransformer, list);
                chunkReadStream.getClass();
                return chunkToDocument.doAfterTerminate(chunkReadStream::close).doOnNext(map -> {
                    map.putAll(hashMap);
                });
            });
        }).retryWhen(makeRetry());
    }

    private <T extends StreamEvent> Observable<Map<String, Object>> chunkToDocument(ChunkReadStream chunkReadStream, String str, Observable.Transformer<Buffer, T> transformer, List<? extends IndexerFactory> list) {
        ArrayList arrayList = new ArrayList();
        list.forEach(indexerFactory -> {
            CRSAware cRSAware = (StreamIndexer) indexerFactory.createIndexer();
            if (str != null && (cRSAware instanceof CRSAware)) {
                cRSAware.setFallbackCRSString(str);
            }
            arrayList.add(cRSAware);
        });
        return RxHelper.toObservable(chunkReadStream).compose(transformer).doOnNext(streamEvent -> {
            arrayList.forEach(streamIndexer -> {
                streamIndexer.onEvent(streamEvent);
            });
        }).last().map(streamEvent2 -> {
            HashMap hashMap = new HashMap();
            arrayList.forEach(streamIndexer -> {
                hashMap.putAll(streamIndexer.getResult());
            });
            return hashMap;
        });
    }

    private ChunkMeta getMeta(JsonObject jsonObject) {
        String string = jsonObject.getString("mimeType", MimeTypeUtils.XML);
        return (MimeTypeUtils.belongsTo(string, "application", "xml") || MimeTypeUtils.belongsTo(string, "text", "xml")) ? new XMLChunkMeta(jsonObject) : MimeTypeUtils.belongsTo(string, "application", "geo+json") ? new GeoJsonChunkMeta(jsonObject) : MimeTypeUtils.belongsTo(string, "application", "json") ? new JsonChunkMeta(jsonObject) : new ChunkMeta(jsonObject);
    }

    private Completable onAdd(List<Message<JsonObject>> list) {
        return Observable.from(list).flatMap(message -> {
            JsonObject jsonObject = (JsonObject) message.body();
            String string = jsonObject.getString("path");
            if (string == null) {
                message.fail(400, "Missing path to the chunk to index");
                return Observable.empty();
            }
            JsonObject jsonObject2 = jsonObject.getJsonObject("meta");
            if (jsonObject2 == null) {
                message.fail(400, "Missing metadata for chunk " + string);
                return Observable.empty();
            }
            JsonArray jsonArray = jsonObject.getJsonArray("tags");
            List list2 = jsonArray != null ? (List) jsonArray.stream().flatMap(obj -> {
                return obj != null ? Stream.of(obj.toString()) : Stream.of((Object[]) new String[0]);
            }).collect(Collectors.toList()) : null;
            JsonObject jsonObject3 = jsonObject.getJsonObject("properties");
            Map map = jsonObject3 != null ? jsonObject3.getMap() : null;
            String string2 = jsonObject.getString("fallbackCRSString");
            log.trace("Indexing " + string);
            return openChunkToDocument(string, getMeta(jsonObject2), new IndexMeta(jsonObject.getString("correlationId"), jsonObject.getString("filename"), jsonObject.getLong("timestamp", Long.valueOf(System.currentTimeMillis())).longValue(), list2, map, string2)).map(map2 -> {
                return Tuple.tuple(string, new JsonObject(map2), message);
            }).onErrorResumeNext(th -> {
                message.fail(ThrowableHelper.throwableToCode(th), ThrowableHelper.throwableToMessage(th, ""));
                return Observable.empty();
            });
        }).toList().flatMapCompletable(list2 -> {
            return !list2.isEmpty() ? insertDocuments(TYPE_NAME, list2) : Completable.complete();
        }).toCompletable();
    }

    private Single<JsonObject> onQuery(JsonObject jsonObject) {
        Single<JsonObject> beginScroll;
        String string = jsonObject.getString("search");
        String string2 = jsonObject.getString("path");
        String string3 = jsonObject.getString("scrollId");
        JsonObject put = new JsonObject().put("size", Integer.valueOf(jsonObject.getInteger("size", 100).intValue()));
        put.put("_source", "chunkMeta");
        if (string3 == null) {
            try {
                beginScroll = this.client.beginScroll(TYPE_NAME, null, this.queryCompiler.compileQuery(string, string2), put, "1m");
            } catch (Throwable th) {
                return Single.error(th);
            }
        } else {
            beginScroll = this.client.continueScroll(string3, "1m");
        }
        return beginScroll.map(jsonObject2 -> {
            JsonObject jsonObject2 = jsonObject2.getJsonObject("hits");
            long longValue = jsonObject2.getLong("total").longValue();
            JsonArray jsonArray = new JsonArray();
            Iterator it = jsonObject2.getJsonArray("hits").iterator();
            while (it.hasNext()) {
                JsonObject jsonObject3 = (JsonObject) it.next();
                jsonArray.add(getMeta(jsonObject3.getJsonObject("_source").getJsonObject("chunkMeta")).toJsonObject().put("id", jsonObject3.getString("_id")));
            }
            return new JsonObject().put("totalHits", Long.valueOf(longValue)).put("hits", jsonArray).put("scrollId", jsonObject2.getString("_scroll_id"));
        });
    }

    private Completable onDelete(JsonObject jsonObject) {
        JsonArray jsonArray = jsonObject.getJsonArray("paths");
        long longValue = jsonObject.getLong("totalChunks", Long.valueOf(jsonArray.size())).longValue();
        long longValue2 = jsonObject.getLong("remainingChunks", Long.valueOf(jsonArray.size())).longValue();
        long currentTimeMillis = System.currentTimeMillis();
        onDeletingStarted(currentTimeMillis, jsonArray, longValue, longValue2);
        return this.client.bulkDelete(TYPE_NAME, jsonArray).flatMapCompletable(jsonObject2 -> {
            long currentTimeMillis2 = System.currentTimeMillis();
            if (!this.client.bulkResponseHasErrors(jsonObject2)) {
                onDeletingFinished(currentTimeMillis2 - currentTimeMillis, jsonArray, longValue, longValue2, null);
                return Completable.complete();
            }
            String bulkResponseGetErrorMessage = this.client.bulkResponseGetErrorMessage(jsonObject2);
            log.error("One or more chunks could not be deleted");
            log.error(bulkResponseGetErrorMessage);
            onDeletingFinished(currentTimeMillis2 - currentTimeMillis, jsonArray, longValue, longValue2, bulkResponseGetErrorMessage);
            return Completable.error(new NoStackTraceThrowable("One or more chunks could not be deleted"));
        });
    }
}
