package io.warp10.plugins.kafka;

import com.google.common.base.Charsets;
import io.warp10.continuum.store.DirectoryClient;
import io.warp10.continuum.store.StoreClient;
import io.warp10.script.MemoryWarpScriptStack;
import io.warp10.script.WarpScriptStack;
import io.warp10.warp.sdk.AbstractWarp10Plugin;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/warp10/plugins/kafka/KafkaConsumer.class */
public class KafkaConsumer {
    public static final String ATTR_CONSUMER = "kafka.consumer";
    public static final String ATTR_SEQNO = "kafka.seqno";
    private static final String PARAM_MACRO = "macro";
    private static final String PARAM_TOPICS = "topics";
    private static final String PARAM_PARALLELISM = "parallelism";
    private static final String PARAM_TIMEOUT = "timeout";
    private static final String PARAM_CONFIG = "config";
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
    private final AtomicReference<WarpScriptStack.Macro> macro = new AtomicReference<>(null);
    private AtomicLong timeout = new AtomicLong(Long.MAX_VALUE);
    private final AtomicBoolean done = new AtomicBoolean(false);
    private String warpscript;
    private Thread[] executors;
    private MemoryWarpScriptStack stack;

    public KafkaConsumer(Path path) throws Exception {
        LOG.info("INITIALIZING KafkaConsumer " + path);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        FileInputStream fileInputStream = new FileInputStream(path.toFile());
        byte[] bArr = new byte[8192];
        while (true) {
            try {
                int read = fileInputStream.read(bArr);
                if (read < 0) {
                    break;
                } else {
                    byteArrayOutputStream.write(bArr, 0, read);
                }
            } finally {
                fileInputStream.close();
            }
        }
        this.warpscript = new String(byteArrayOutputStream.toByteArray(), Charsets.UTF_8);
        this.stack = new MemoryWarpScriptStack((StoreClient) null, (DirectoryClient) null, new Properties());
        this.stack.maxLimits();
        try {
            this.stack.execMulti(this.warpscript);
        } catch (Throwable th) {
            LOG.error("Caught exception while loading '" + path.getFileName() + "'.", th);
        }
        Object pop = this.stack.pop();
        if (!(pop instanceof Map)) {
            throw new RuntimeException("Kafka consumer spec must leave a configuration map on top of the stack.");
        }
        Map map = (Map) pop;
        this.macro.set((WarpScriptStack.Macro) map.get(PARAM_MACRO));
        final ArrayList arrayList = new ArrayList();
        Pattern pattern = null;
        Object obj = map.get(PARAM_TOPICS);
        if (obj instanceof List) {
            for (Object obj2 : (List) map.get(PARAM_TOPICS)) {
                if (!(obj2 instanceof String)) {
                    throw new RuntimeException("Invalid Kafka topic, MUST be a STRING.");
                }
                arrayList.add(obj2.toString());
            }
        } else {
            if (!(obj instanceof String)) {
                throw new RuntimeException("Invalid Kafka topic, MUST be a STRING or a List thereof.");
            }
            try {
                pattern = Pattern.compile((String) obj);
            } catch (PatternSyntaxException e) {
                throw new RuntimeException("Invalid Kafka topic regexp: " + e.getMessage());
            }
        }
        Map map2 = (Map) map.get(PARAM_CONFIG);
        final Properties properties = new Properties();
        for (Map.Entry entry : map2.entrySet()) {
            if (!(entry.getKey() instanceof String) || !(entry.getValue() instanceof String)) {
                throw new RuntimeException("Invalid Kafka configuration, key and value MUST be STRINGs.");
            }
            properties.put(entry.getKey().toString(), entry.getValue().toString());
        }
        properties.put("key.deserializer", ByteArrayDeserializer.class.getName());
        properties.put("value.deserializer", ByteArrayDeserializer.class.getName());
        int parseInt = Integer.parseInt(null != map.get(PARAM_PARALLELISM) ? String.valueOf(map.get(PARAM_PARALLELISM)) : "1");
        if (map.containsKey(PARAM_TIMEOUT)) {
            this.timeout.set(Long.parseLong(String.valueOf(map.get(PARAM_TIMEOUT))));
        }
        this.executors = new Thread[parseInt];
        for (int i = 0; i < parseInt; i++) {
            final MemoryWarpScriptStack memoryWarpScriptStack = new MemoryWarpScriptStack(AbstractWarp10Plugin.getExposedStoreClient(), AbstractWarp10Plugin.getExposedDirectoryClient(), new Properties());
            memoryWarpScriptStack.maxLimits();
            memoryWarpScriptStack.setAttribute(ATTR_SEQNO, Integer.valueOf(i));
            final Pattern pattern2 = pattern;
            Thread thread = new Thread() { // from class: io.warp10.plugins.kafka.KafkaConsumer.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumer = null;
                    while (true) {
                        try {
                            try {
                                kafkaConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer(properties);
                                if (!arrayList.isEmpty()) {
                                    kafkaConsumer.subscribe(arrayList);
                                } else if (null != pattern2) {
                                    kafkaConsumer.subscribe(pattern2);
                                }
                                memoryWarpScriptStack.setAttribute(KafkaConsumer.ATTR_CONSUMER, kafkaConsumer);
                                while (!KafkaConsumer.this.done.get()) {
                                    long j = 0;
                                    Iterator it = kafkaConsumer.poll(Duration.ofMillis(KafkaConsumer.this.timeout.get())).iterator();
                                    while (it.hasNext()) {
                                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                                        j++;
                                        HashMap hashMap = new HashMap();
                                        hashMap.put("timestamp", Long.valueOf(consumerRecord.timestamp()));
                                        hashMap.put("timestampType", consumerRecord.timestampType().name());
                                        hashMap.put("topic", consumerRecord.topic());
                                        hashMap.put("offset", Long.valueOf(consumerRecord.offset()));
                                        hashMap.put("partition", Long.valueOf(consumerRecord.partition()));
                                        hashMap.put("key", consumerRecord.key());
                                        hashMap.put("value", consumerRecord.value());
                                        HashMap hashMap2 = new HashMap();
                                        for (Header header : consumerRecord.headers()) {
                                            hashMap2.put(header.key(), header.value());
                                        }
                                        hashMap.put("headers", hashMap2);
                                        memoryWarpScriptStack.push(hashMap);
                                        memoryWarpScriptStack.exec((WarpScriptStack.Macro) KafkaConsumer.this.macro.get());
                                    }
                                    if (0 == j) {
                                        memoryWarpScriptStack.push(new HashMap());
                                        memoryWarpScriptStack.exec((WarpScriptStack.Macro) KafkaConsumer.this.macro.get());
                                    }
                                }
                                if (null != kafkaConsumer) {
                                    try {
                                        kafkaConsumer.close();
                                    } catch (Exception e2) {
                                    }
                                }
                            } catch (Exception e3) {
                                e3.printStackTrace();
                                KafkaConsumer.LOG.error("Kafka Consumer caught exception ", e3);
                                if (null != kafkaConsumer) {
                                    try {
                                        kafkaConsumer.close();
                                    } catch (Exception e4) {
                                    }
                                }
                            }
                        } catch (Throwable th2) {
                            if (null != kafkaConsumer) {
                                try {
                                    kafkaConsumer.close();
                                } catch (Exception e5) {
                                }
                            }
                            throw th2;
                        }
                    }
                }
            };
            thread.setContextClassLoader(getClass().getClassLoader());
            thread.setName("Kafka Consumer Thread #" + i + " " + arrayList.toString());
            thread.setDaemon(true);
            this.executors[i] = thread;
            thread.start();
        }
    }

    public void end() {
        this.done.set(true);
        try {
            for (Thread thread : this.executors) {
                thread.interrupt();
            }
        } catch (Exception e) {
        }
    }

    public String getWarpScript() {
        return this.warpscript;
    }
}
