package com.qiniu.pandora.pipeline.sender;

import com.qiniu.pandora.common.PandoraClient;
import com.qiniu.pandora.common.QiniuException;
import com.qiniu.pandora.common.QiniuRuntimeException;
import com.qiniu.pandora.pipeline.error.SendPointError;
import com.qiniu.pandora.pipeline.points.Batch;
import com.qiniu.pandora.pipeline.points.Point;
import com.qiniu.pandora.util.Auth;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:com/qiniu/pandora/pipeline/sender/ParallelDataSender.class */
public class ParallelDataSender extends DataSender {
    final ExecutorService service;
    final int parrallel;

    public ParallelDataSender(String str, Auth auth) {
        this(str, auth, 1);
    }

    public ParallelDataSender(String str, Auth auth, int i) {
        super(str, auth);
        this.parrallel = i;
        this.service = Executors.newFixedThreadPool(i);
    }

    public ParallelDataSender(String str, PandoraClient pandoraClient, int i) {
        super(str, pandoraClient);
        this.parrallel = i;
        this.service = Executors.newFixedThreadPool(i);
    }

    @Override // com.qiniu.pandora.pipeline.sender.DataSender, com.qiniu.pandora.pipeline.sender.Sender
    public SendPointError send(final Iterable<Point> iterable) {
        final SendPointError sendPointError = new SendPointError();
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.service.submit(new Runnable() { // from class: com.qiniu.pandora.pipeline.sender.ParallelDataSender.1
            @Override // java.lang.Runnable
            public void run() {
                Batch batch = new Batch();
                for (Point point : iterable) {
                    if (point.isTooLarge()) {
                        sendPointError.addFail(point, new QiniuRuntimeException("Point too large"));
                    } else {
                        if (!batch.canAdd(point) && batch.getSize() > 0) {
                            concurrentLinkedQueue.add(batch);
                            batch = new Batch();
                        }
                        batch.add(point);
                    }
                }
                concurrentLinkedQueue.add(batch);
                atomicBoolean.set(true);
            }
        });
        final CountDownLatch countDownLatch = new CountDownLatch(this.parrallel);
        for (int i = 0; i < this.parrallel; i++) {
            this.service.submit(new Runnable() { // from class: com.qiniu.pandora.pipeline.sender.ParallelDataSender.2
                @Override // java.lang.Runnable
                public void run() {
                    while (true) {
                        Batch batch = (Batch) concurrentLinkedQueue.poll();
                        if (atomicBoolean.get() && batch == null) {
                            countDownLatch.countDown();
                            return;
                        } else if (batch != null) {
                            try {
                                ParallelDataSender.this.send(batch);
                            } catch (QiniuException e) {
                                sendPointError.addFails(batch.getPoints(), e);
                            } catch (Exception e2) {
                                sendPointError.addFails(batch.getPoints(), new QiniuRuntimeException(e2));
                            }
                        }
                    }
                }
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
        }
        return sendPointError;
    }

    @Override // com.qiniu.pandora.pipeline.sender.DataSender, com.qiniu.pandora.pipeline.sender.Sender
    public void close() {
        super.close();
        this.service.shutdown();
    }
}
