package io.vepo.maestro.framework;

import io.vepo.maestro.framework.annotations.MaestroConsumer;
import io.vepo.maestro.framework.parallel.WorkerThreadFactory;
import io.vepo.maestro.framework.utils.Topics;
import jakarta.enterprise.context.spi.Contextual;
import jakarta.enterprise.inject.se.SeContainer;
import jakarta.enterprise.inject.se.SeContainerInitializer;
import jakarta.enterprise.inject.spi.Bean;
import java.lang.annotation.Annotation;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.eclipse.microprofile.config.ConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/vepo/maestro/framework/MaestroApplication.class */
public class MaestroApplication implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(MaestroApplication.class.getName());
    private SeContainer container;
    private final List<ExecutorService> loadedExecutors = new ArrayList();
    private final AtomicBoolean running = new AtomicBoolean(false);

    public void run() {
        this.container = SeContainerInitializer.newInstance().initialize();
        start(null);
    }

    public void run(Class<?> cls) {
        this.container = SeContainerInitializer.newInstance().initialize();
        start(cls);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.running.set(false);
        this.loadedExecutors.forEach((v0) -> {
            v0.shutdown();
        });
        this.container.close();
    }

    private void start(Class<?> cls) {
        this.running.set(true);
        logger.info("Container initialized. Starting consumers...");
        List list = this.container.getBeanManager().getBeans(Object.class, new Annotation[0]).stream().filter(bean -> {
            return cls == null || bean.getBeanClass().getPackageName().contains(cls.getPackageName());
        }).filter(bean2 -> {
            return bean2.getBeanClass().isAnnotationPresent(MaestroConsumer.class);
        }).toList();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(list.size(), new WorkerThreadFactory("consumers"));
        this.loadedExecutors.add(newFixedThreadPool);
        list.stream().forEach(bean3 -> {
            newFixedThreadPool.submit(() -> {
                consume(bean3);
            });
        });
    }

    private void consume(Bean<?> bean) {
        try {
            logger.info("Starting consumer: {}", bean);
            System.getProperties().entrySet().forEach(entry -> {
                logger.info("Value {}={}", entry.getKey(), entry.getValue());
            });
            String str = (String) ConfigProvider.getConfig().getOptionalValue(String.format("%s.kafka.bootstrap.servers", bean.getBeanClass().getName()), String.class).or(() -> {
                return ConfigProvider.getConfig().getOptionalValue("kafka.bootstrap.servers", String.class);
            }).orElseThrow(() -> {
                return new IllegalArgumentException("Kafka bootstrap servers not found");
            });
            logger.info("Kafka bootstrap servers found: {}", str);
            Properties properties = new Properties();
            properties.put("bootstrap.servers", str);
            properties.put("group.id", bean.getBeanClass().getName());
            properties.put("key.deserializer", ((MaestroConsumer) bean.getBeanClass().getAnnotation(MaestroConsumer.class)).keyDeserializer().getName());
            properties.put("value.deserializer", ((MaestroConsumer) bean.getBeanClass().getAnnotation(MaestroConsumer.class)).valueDeserializer().getName());
            properties.put("value.deserializer.type", bean.getBeanClass());
            properties.put("auto.offset.reset", "earliest");
            logger.info("Creating consumer: {}", properties);
            Object create = bean.create(this.container.getBeanManager().createCreationalContext((Contextual) null));
            KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
            try {
                List list = Stream.of((Object[]) bean.getBeanClass().getDeclaredMethods()).filter(method -> {
                    return method.getParameterTypes().length == 1;
                }).map(method2 -> {
                    return Topics.toTopicName(method2);
                }).toList();
                Map map = (Map) Stream.of((Object[]) bean.getBeanClass().getDeclaredMethods()).filter(method3 -> {
                    return method3.getParameterTypes().length == 1;
                }).collect(Collectors.toMap(Topics::toTopicName, Function.identity()));
                logger.info("Subscribing to topics: {}", list);
                kafkaConsumer.subscribe(list);
                while (this.running.get()) {
                    kafkaConsumer.poll(Duration.ofSeconds(1L)).forEach(consumerRecord -> {
                        logger.info("Received record: {}", consumerRecord);
                        try {
                            Method method4 = (Method) map.get(consumerRecord.topic());
                            method4.invoke(create, method4.getParameterTypes()[0].cast(consumerRecord.value()));
                        } catch (IllegalAccessException | InvocationTargetException e) {
                            logger.error("Error invoking method", e);
                        }
                    });
                }
                kafkaConsumer.close();
            } finally {
            }
        } catch (Exception e) {
            logger.error("Error consuming messages", e);
        }
    }
}
