package io.warp10.plugins.kafka;

import io.warp10.script.WarpScriptLib;
import io.warp10.warp.sdk.AbstractWarp10Plugin;
import java.io.File;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.locks.LockSupport;

/* loaded from: input_file:io/warp10/plugins/kafka/KafkaWarp10Plugin.class */
public class KafkaWarp10Plugin extends AbstractWarp10Plugin implements Runnable {
    private static final String CONF_KAFKA_DIR = "kafka.dir";
    private static final String CONF_KAFKA_PERIOD = "kafka.period";
    private static final long DEFAULT_PERIOD = 60000;
    private String dir;
    private long period;
    private Map<String, KafkaConsumer> consumers = new HashMap();
    private boolean done = false;

    @Override // java.lang.Runnable
    public void run() {
        DirectoryStream directoryStream;
        while (true) {
            directoryStream = null;
            try {
                try {
                } catch (Throwable th) {
                    th.printStackTrace();
                    if (0 != 0) {
                        try {
                            directoryStream.close();
                        } catch (Exception e) {
                        }
                    }
                }
                if (this.done) {
                    break;
                }
                DirectoryStream<Path> newDirectoryStream = Files.newDirectoryStream(new File(this.dir).toPath(), "*.mc2");
                HashSet hashSet = new HashSet();
                for (Path path : newDirectoryStream) {
                    String path2 = path.getFileName().toString();
                    boolean z = false;
                    if (!this.consumers.containsKey(path2)) {
                        z = true;
                    } else if (this.consumers.get(path2).getWarpScript().length() != path.toFile().length()) {
                        z = true;
                    }
                    if (z) {
                        load(path2);
                    }
                    hashSet.add(path2);
                }
                newDirectoryStream.close();
                HashSet hashSet2 = new HashSet(this.consumers.keySet());
                hashSet2.removeAll(hashSet);
                Iterator it = hashSet2.iterator();
                while (it.hasNext()) {
                    try {
                        this.consumers.remove((String) it.next()).end();
                    } catch (Exception e2) {
                    }
                }
                if (null != newDirectoryStream) {
                    try {
                        newDirectoryStream.close();
                    } catch (Exception e3) {
                    }
                }
                LockSupport.parkNanos(this.period * 1000000);
            } catch (Throwable th2) {
                if (0 != 0) {
                    try {
                        directoryStream.close();
                    } catch (Exception e4) {
                    }
                }
                throw th2;
            }
        }
        if (0 != 0) {
            try {
                directoryStream.close();
            } catch (Exception e5) {
            }
        }
    }

    private boolean load(String str) {
        KafkaConsumer kafkaConsumer = this.consumers.get(str);
        if (null != kafkaConsumer) {
            kafkaConsumer.end();
        }
        try {
            this.consumers.put(str, new KafkaConsumer(new File(this.dir, str).toPath()));
            return true;
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    public void init(Properties properties) {
        try {
            if (((Integer) Class.forName("io.warp10.script.functions.CHECKREV").getMethod("checkRev", String.class).invoke(null, "3.0.0")).intValue() < 0) {
                throw new RuntimeException("Kafka Plugin not compatible with your Warp 10 version.");
            }
            this.dir = properties.getProperty(CONF_KAFKA_DIR);
            if (null == this.dir) {
                throw new RuntimeException("Missing 'kafka.dir' configuration.");
            }
            this.period = Long.parseLong(properties.getProperty(CONF_KAFKA_PERIOD, Long.toString(60000L)));
            Runtime.getRuntime().addShutdownHook(new Thread() { // from class: io.warp10.plugins.kafka.KafkaWarp10Plugin.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    KafkaWarp10Plugin.this.done = true;
                    System.out.println("Kafka Plugin shutting down all consumers.");
                    interrupt();
                    Iterator it = KafkaWarp10Plugin.this.consumers.values().iterator();
                    while (it.hasNext()) {
                        try {
                            ((KafkaConsumer) it.next()).end();
                        } catch (Exception e) {
                        }
                    }
                }
            });
            Thread thread = new Thread(this);
            thread.setDaemon(true);
            thread.setName("[Warp 10 Kafka Plugin " + this.dir + "]");
            WarpScriptLib.addNamedWarpScriptFunction(new KCOMMIT("KCOMMIT"));
            thread.start();
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("Kafka Plugin not compatible with your Warp 10 version.");
        } catch (Throwable th) {
            throw new RuntimeException(th);
        }
    }
}
