package org.springframework.batch.item.redis.support;

import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;

/* loaded from: input_file:org/springframework/batch/item/redis/support/TransferExecution.class */
public class TransferExecution<I, O> {
    private static final Logger log = LoggerFactory.getLogger(TransferExecution.class);
    private final Transfer<I, O> transfer;
    private Collection<TransferExecution<I, O>.TransferWorker> workers;
    private boolean stopped;
    private final Collection<TransferExecutionListener> listeners = new ArrayList();
    private AtomicLong count = new AtomicLong();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/batch/item/redis/support/TransferExecution$ProcessingTransferWorker.class */
    public class ProcessingTransferWorker extends TransferExecution<I, O>.TransferWorker {
        private static final String PROCESS_TIMER_NAME = "item.process";
        private static final String PROCESS_TIMER_DESCRIPTION = "Item processing duration";

        private ProcessingTransferWorker() {
            super();
        }

        @Override // org.springframework.batch.item.redis.support.TransferExecution.TransferWorker
        protected O process(I i) throws Exception {
            Timer.Sample createTimerSample = MetricsUtils.createTimerSample();
            try {
                try {
                    O o = (O) TransferExecution.this.transfer.getProcessor().process(i);
                    createTimerSample.stop(MetricsUtils.createTimer(PROCESS_TIMER_NAME, PROCESS_TIMER_DESCRIPTION, this.nameTag, statusTag("SUCCESS")));
                    return o;
                } catch (Exception e) {
                    throw e;
                }
            } catch (Throwable th) {
                createTimerSample.stop(MetricsUtils.createTimer(PROCESS_TIMER_NAME, PROCESS_TIMER_DESCRIPTION, this.nameTag, statusTag("SUCCESS")));
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/springframework/batch/item/redis/support/TransferExecution$TransferWorker.class */
    public class TransferWorker implements Runnable {
        private static final String READ_TIMER_NAME = "item.read";
        private static final String READ_TIMER_DESCRIPTION = "Item reading duration";
        private static final String WRITE_TIMER_NAME = "chunk.write";
        private static final String WRITE_TIMER_DESCRIPTION = "Chunk writing duration";
        private static final String STATUS_TAG_NAME = "status";
        protected final Tag nameTag;
        private final Object lock;
        private List<O> items;
        private boolean running;

        private TransferWorker() {
            this.nameTag = Tag.of("transfer.name", TransferExecution.this.transfer.getName());
            this.lock = new Object();
            this.items = new ArrayList(TransferExecution.this.transfer.getOptions().getBatch());
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // java.lang.Runnable
        public void run() {
            Object read;
            this.running = true;
            do {
                try {
                    read = read();
                } catch (Exception e) {
                    TransferExecution.log.error("{}: could not read next item", TransferExecution.this.transfer.getName(), e);
                }
                if (read == null) {
                    break;
                }
                try {
                    Object process = process(read);
                    synchronized (this.lock) {
                        this.items.add(process);
                    }
                    if (this.items.size() >= TransferExecution.this.transfer.getOptions().getBatch()) {
                        write();
                    }
                } catch (Exception e2) {
                    TransferExecution.log.error("{}: could not process item", TransferExecution.this.transfer.getName(), e2);
                }
            } while (!TransferExecution.this.stopped);
            if (!TransferExecution.this.stopped) {
                write();
            }
            TransferExecution.log.debug("{}: worker completed", TransferExecution.this.transfer.getName());
            this.running = false;
        }

        /* JADX WARN: Multi-variable type inference failed */
        protected O process(I i) throws Exception {
            return i;
        }

        protected Tag statusTag(String str) {
            return Tag.of(STATUS_TAG_NAME, str);
        }

        private I read() throws Exception {
            Timer.Sample createTimerSample = MetricsUtils.createTimerSample();
            String str = "SUCCESS";
            try {
                try {
                    I i = (I) TransferExecution.this.transfer.getReader().read();
                    createTimerSample.stop(MetricsUtils.createTimer(READ_TIMER_NAME, READ_TIMER_DESCRIPTION, this.nameTag, statusTag(str)));
                    return i;
                } catch (Exception e) {
                    str = "FAILURE";
                    throw e;
                }
            } catch (Throwable th) {
                createTimerSample.stop(MetricsUtils.createTimer(READ_TIMER_NAME, READ_TIMER_DESCRIPTION, this.nameTag, statusTag(str)));
                throw th;
            }
        }

        public void write() {
            List<O> list;
            synchronized (this.lock) {
                list = this.items;
                this.items = new ArrayList(TransferExecution.this.transfer.getOptions().getBatch());
            }
            Timer.Sample createTimerSample = MetricsUtils.createTimerSample();
            String str = "SUCCESS";
            try {
                try {
                    TransferExecution.this.transfer.getWriter().write(list);
                    long addAndGet = TransferExecution.this.count.addAndGet(list.size());
                    TransferExecution.this.listeners.forEach(transferExecutionListener -> {
                        transferExecutionListener.onUpdate(addAndGet);
                    });
                    createTimerSample.stop(MetricsUtils.createTimer(WRITE_TIMER_NAME, WRITE_TIMER_DESCRIPTION, this.nameTag, statusTag(str)));
                } catch (Exception e) {
                    str = "FAILURE";
                    TransferExecution.log.error("{}: could not write items", TransferExecution.this.transfer.getName(), e);
                    createTimerSample.stop(MetricsUtils.createTimer(WRITE_TIMER_NAME, WRITE_TIMER_DESCRIPTION, this.nameTag, statusTag(str)));
                }
            } catch (Throwable th) {
                createTimerSample.stop(MetricsUtils.createTimer(WRITE_TIMER_NAME, WRITE_TIMER_DESCRIPTION, this.nameTag, statusTag(str)));
                throw th;
            }
        }

        public boolean isRunning() {
            return this.running;
        }
    }

    public void addListener(TransferExecutionListener transferExecutionListener) {
        this.listeners.add(transferExecutionListener);
    }

    public TransferExecution(Transfer<I, O> transfer) {
        this.transfer = transfer;
    }

    public CompletableFuture<Void> start() {
        this.listeners.forEach(transferExecutionListener -> {
            transferExecutionListener.onMessage("Starting");
        });
        this.stopped = false;
        this.workers = new ArrayList(this.transfer.getOptions().getThreads());
        for (int i = 0; i < this.transfer.getOptions().getThreads(); i++) {
            this.workers.add(this.transfer.getProcessor() == null ? new TransferWorker() : new ProcessingTransferWorker());
        }
        ExecutionContext executionContext = new ExecutionContext();
        if (this.transfer.getReader() instanceof ItemStream) {
            this.listeners.forEach(transferExecutionListener2 -> {
                transferExecutionListener2.onMessage("Opening reader");
            });
            log.debug("{}: opening reader", this.transfer.getName());
            this.transfer.getReader().open(executionContext);
        }
        if (this.transfer.getWriter() instanceof ItemStream) {
            this.listeners.forEach(transferExecutionListener3 -> {
                transferExecutionListener3.onMessage("Opening writer");
            });
            log.debug("{}: opening writer", this.transfer.getName());
            this.transfer.getWriter().open(executionContext);
        }
        ArrayList arrayList = new ArrayList();
        Iterator<TransferExecution<I, O>.TransferWorker> it = this.workers.iterator();
        while (it.hasNext()) {
            arrayList.add(CompletableFuture.runAsync(it.next()));
        }
        this.listeners.forEach(transferExecutionListener4 -> {
            transferExecutionListener4.onMessage("Running");
        });
        CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0]));
        allOf.whenComplete((r5, th) -> {
            if (th != null) {
                this.listeners.forEach(transferExecutionListener5 -> {
                    transferExecutionListener5.onError(th);
                });
            }
            try {
                try {
                    if (this.transfer.getWriter() instanceof ItemStream) {
                        this.listeners.forEach(transferExecutionListener6 -> {
                            transferExecutionListener6.onMessage("Closing writer");
                        });
                        log.debug("{}: closing writer", this.transfer.getName());
                        this.transfer.getWriter().close();
                    }
                    if (this.transfer.getReader() instanceof ItemStream) {
                        this.listeners.forEach(transferExecutionListener7 -> {
                            transferExecutionListener7.onMessage("Closing reader");
                        });
                        log.debug("{}: closing reader", this.transfer.getName());
                        this.transfer.getReader().close();
                    }
                } catch (Exception e) {
                    this.listeners.forEach(transferExecutionListener8 -> {
                        transferExecutionListener8.onError(e);
                    });
                    this.listeners.forEach((v0) -> {
                        v0.onComplete();
                    });
                }
            } finally {
                this.listeners.forEach((v0) -> {
                    v0.onComplete();
                });
            }
        });
        if (this.transfer.getOptions().getFlushInterval() != null) {
            ScheduledFuture<?> scheduleAtFixedRate = Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(this::flush, 0L, this.transfer.getOptions().getFlushInterval().toMillis(), TimeUnit.MILLISECONDS);
            allOf.whenComplete((r4, th2) -> {
                scheduleAtFixedRate.cancel(true);
            });
        }
        return allOf;
    }

    public void flush() {
        if (isTerminated()) {
            return;
        }
        if (this.transfer.getReader() instanceof FlushableItemStreamReader) {
            this.transfer.getReader().flush();
        }
        this.workers.forEach((v0) -> {
            v0.write();
        });
    }

    public boolean isTerminated() {
        Iterator<TransferExecution<I, O>.TransferWorker> it = this.workers.iterator();
        while (it.hasNext()) {
            if (it.next().isRunning()) {
                return false;
            }
        }
        return true;
    }

    public void stop() throws InterruptedException, ExecutionException {
        this.stopped = true;
        log.debug("{} stopped", this.transfer.getName());
    }

    public long count() {
        return this.count.get();
    }

    public Transfer<I, O> getTransfer() {
        return this.transfer;
    }
}
