package com.hazelcast.jet.kafka.impl;

import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:com/hazelcast/jet/kafka/impl/WriteKafkaP.class */
public final class WriteKafkaP<T, K, V> implements Processor {
    private final KafkaProducer<K, V> producer;
    private final Function<? super T, ? extends ProducerRecord<K, V>> toRecordFn;
    private final AtomicReference<Throwable> lastError = new AtomicReference<>();
    private final Callback callback = (recordMetadata, exc) -> {
        if (exc != null) {
            this.lastError.compareAndSet(null, exc);
        }
    };

    /* loaded from: input_file:com/hazelcast/jet/kafka/impl/WriteKafkaP$Supplier.class */
    public static class Supplier<T, K, V> implements ProcessorSupplier {
        private static final long serialVersionUID = 1;
        private final Properties properties;
        private final Function<? super T, ? extends ProducerRecord<K, V>> toRecordFn;
        private transient KafkaProducer<K, V> producer;

        public Supplier(Properties properties, Function<? super T, ? extends ProducerRecord<K, V>> function) {
            this.properties = properties;
            this.toRecordFn = function;
        }

        public void init(@Nonnull ProcessorSupplier.Context context) {
            this.producer = new KafkaProducer<>(this.properties);
        }

        @Nonnull
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public List<Processor> m1get(int i) {
            return (List) Stream.generate(() -> {
                return new WriteKafkaP(this.producer, this.toRecordFn);
            }).limit(i).collect(Collectors.toList());
        }

        public void close(Throwable th) {
            if (this.producer != null) {
                this.producer.close();
            }
        }
    }

    WriteKafkaP(KafkaProducer<K, V> kafkaProducer, Function<? super T, ? extends ProducerRecord<K, V>> function) {
        this.producer = kafkaProducer;
        this.toRecordFn = function;
    }

    public boolean isCooperative() {
        return false;
    }

    public boolean tryProcess() {
        checkError();
        return true;
    }

    public void process(int i, @Nonnull Inbox inbox) {
        checkError();
        inbox.drain(obj -> {
            this.producer.send(this.toRecordFn.apply(obj), this.callback);
        });
    }

    public boolean complete() {
        ensureAllWritten();
        return true;
    }

    public boolean saveToSnapshot() {
        ensureAllWritten();
        return true;
    }

    private void ensureAllWritten() {
        checkError();
        this.producer.flush();
    }

    private void checkError() {
        Throwable th = this.lastError.get();
        if (th != null) {
            throw ExceptionUtil.sneakyThrow(th);
        }
    }
}
