package cn.wizzer.iot.mqtt.server.broker.service;

import cn.hutool.core.util.HexUtil;
import cn.wizzer.iot.mqtt.server.broker.config.BrokerProperties;
import cn.wizzer.iot.mqtt.server.broker.internal.InternalMessage;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.nutz.aop.interceptor.async.Async;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@IocBean
/* loaded from: input_file:cn/wizzer/iot/mqtt/server/broker/service/KafkaService.class */
public class KafkaService {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaService.class);

    @Inject
    private KafkaProducer kafkaProducer;

    @Inject
    private BrokerProperties brokerProperties;

    @Async
    public void send(InternalMessage internalMessage) {
        try {
            this.kafkaProducer.send(new ProducerRecord(this.brokerProperties.getProducerTopic(), internalMessage.getTopic(), HexUtil.encodeHexStr(internalMessage.getMessageBytes())), new Callback() { // from class: cn.wizzer.iot.mqtt.server.broker.service.KafkaService.1
                public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                    if (exc == null) {
                        KafkaService.LOGGER.info("The offset of the record we just sent is: " + recordMetadata.offset());
                    } else {
                        exc.printStackTrace();
                        KafkaService.LOGGER.error(exc.getMessage(), exc);
                    }
                }
            });
        } catch (Exception e) {
            LOGGER.error("kafka没有连接成功..");
        }
    }
}
