package com.yahoo.vespa.http.client.core.communication;

import com.google.common.annotations.Beta;
import com.yahoo.vespa.http.client.Result;
import com.yahoo.vespa.http.client.config.Endpoint;
import com.yahoo.vespa.http.client.core.Document;
import com.yahoo.vespa.http.client.core.EndpointResult;
import com.yahoo.vespa.http.client.core.ServerResponseException;
import com.yahoo.vespa.http.client.core.operationProcessor.EndPointResultFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/* JADX INFO: Access modifiers changed from: package-private */
@Beta
/* loaded from: input_file:com/yahoo/vespa/http/client/core/communication/IOThread.class */
public class IOThread implements Runnable, AutoCloseable {
    private static Logger log = Logger.getLogger(IOThread.class.getName());
    private final Endpoint endpoint;
    private final GatewayConnection client;
    private final DocumentQueue documentQueue;
    private final EndpointResultQueue resultQueue;
    private final Thread thread;
    private final int clusterId;
    private final int maxChunkSizeBytes;
    private final int maxInFlightRequests;
    private final long localQueueTimeOut;
    private final GatewayThrottler gatewayThrottler;
    private final CountDownLatch running = new CountDownLatch(1);
    private final CountDownLatch stopSignal = new CountDownLatch(1);
    private final AtomicInteger wrongSessionDetectedCounter = new AtomicInteger(0);
    private final AtomicInteger wrongVersionDetectedCounter = new AtomicInteger(0);
    private final AtomicInteger problemStatusCodeFromServerCounter = new AtomicInteger(0);
    private final AtomicInteger executeProblemsCounter = new AtomicInteger(0);
    private final AtomicInteger docsReceivedCounter = new AtomicInteger(0);
    private final AtomicInteger statusReceivedCounter = new AtomicInteger(0);
    private final AtomicInteger pendingDocumentStatusCount = new AtomicInteger(0);
    private final AtomicInteger successfullHandshakes = new AtomicInteger(0);
    private final AtomicInteger lastGatewayProcessTimeMillis = new AtomicInteger(0);

    /* loaded from: input_file:com/yahoo/vespa/http/client/core/communication/IOThread$ConnectionStats.class */
    public static class ConnectionStats {
        public final int wrongSessionDetectedCounter;
        public final int wrongVersionDetectedCounter;
        public final int problemStatusCodeFromServerCounter;
        public final int executeProblemsCounter;
        public final int docsReceivedCounter;
        public final int statusReceivedCounter;
        public final int pendingDocumentStatusCount;
        public final int successfullHandshakes;
        public final int lastGatewayProcessTimeMillis;

        protected ConnectionStats(int i, int i2, int i3, int i4, int i5, int i6, int i7, int i8, int i9) {
            this.wrongSessionDetectedCounter = i;
            this.wrongVersionDetectedCounter = i2;
            this.problemStatusCodeFromServerCounter = i3;
            this.executeProblemsCounter = i4;
            this.docsReceivedCounter = i5;
            this.statusReceivedCounter = i6;
            this.pendingDocumentStatusCount = i7;
            this.successfullHandshakes = i8;
            this.lastGatewayProcessTimeMillis = i9;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/yahoo/vespa/http/client/core/communication/IOThread$ProcessResponse.class */
    public static class ProcessResponse {
        private final int transitiveErrorCount;
        private final int processResultsCount;

        ProcessResponse(int i, int i2) {
            this.transitiveErrorCount = i;
            this.processResultsCount = i2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/yahoo/vespa/http/client/core/communication/IOThread$ThreadState.class */
    public enum ThreadState {
        DISCONNECTED,
        CONNECTED,
        SESSION_SYNCED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public IOThread(EndpointResultQueue endpointResultQueue, GatewayConnection gatewayConnection, int i, int i2, int i3, long j, DocumentQueue documentQueue, long j2) {
        this.documentQueue = documentQueue;
        this.endpoint = gatewayConnection.getEndpoint();
        this.client = gatewayConnection;
        this.resultQueue = endpointResultQueue;
        this.clusterId = i;
        this.maxChunkSizeBytes = i2;
        this.maxInFlightRequests = i3;
        this.gatewayThrottler = new GatewayThrottler(j2);
        this.thread = new Thread(this, "IOThread " + this.endpoint);
        this.thread.setDaemon(true);
        this.localQueueTimeOut = j;
        this.thread.start();
    }

    public Endpoint getEndpoint() {
        return this.endpoint;
    }

    public ConnectionStats getConnectionStats() {
        return new ConnectionStats(this.wrongSessionDetectedCounter.get(), this.wrongVersionDetectedCounter.get(), this.problemStatusCodeFromServerCounter.get(), this.executeProblemsCounter.get(), this.docsReceivedCounter.get(), this.statusReceivedCounter.get(), this.pendingDocumentStatusCount.get(), this.successfullHandshakes.get(), this.lastGatewayProcessTimeMillis.get());
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.documentQueue.close();
        if (this.stopSignal.getCount() == 0) {
            return;
        }
        this.stopSignal.countDown();
        log.finer("Closed called.");
        int pendingSize = this.resultQueue.getPendingSize();
        if (pendingSize > 0) {
            log.info("We have outstanding operations (" + pendingSize + ") , trying to fetch responses.");
            try {
                processResponse(this.client.drain());
            } catch (Throwable th) {
                log.log(Level.SEVERE, "Some failures while trying to get latest responses from vespa.", th);
            }
        }
        try {
            this.client.close();
            log.fine("Session to " + this.endpoint + " closed.");
        } finally {
            drainDocumentQueueWhenFailingPermanently(new Exception("Closed call, did not manage to process everything so failing this document."));
        }
    }

    public void post(Document document) throws InterruptedException {
        this.documentQueue.put(document);
    }

    public String toString() {
        return "I/O thread (for " + this.endpoint + ")";
    }

    List<Document> getNextDocsForFeeding(int i, TimeUnit timeUnit) {
        ArrayList arrayList = new ArrayList();
        int i2 = 0;
        try {
            drainFirstDocumentsInQueueIfOld();
            Document poll = this.documentQueue.poll(i, timeUnit);
            if (poll != null) {
                arrayList.add(poll);
                i2 = poll.size();
            }
            for (int pendingSize = 1 + this.resultQueue.getPendingSize(); i2 < this.maxChunkSizeBytes && pendingSize < this.maxInFlightRequests; pendingSize++) {
                drainFirstDocumentsInQueueIfOld();
                Document poll2 = this.documentQueue.poll();
                if (poll2 == null) {
                    break;
                }
                arrayList.add(poll2);
                i2 += poll2.size();
            }
            log.finest("Chunk has " + arrayList.size() + " docs with a size " + i2 + " bytes.");
            this.docsReceivedCounter.addAndGet(arrayList.size());
            return arrayList;
        } catch (InterruptedException e) {
            log.fine("Got break signal while waiting for new documents to feed.");
            return arrayList;
        }
    }

    private void addDocumentsToResultQueue(List<Document> list) {
        Iterator<Document> it = list.iterator();
        while (it.hasNext()) {
            this.resultQueue.operationSent(it.next().getOperationId());
        }
    }

    private void markDocumentAsFailed(List<Document> list, ServerResponseException serverResponseException) {
        Iterator<Document> it = list.iterator();
        while (it.hasNext()) {
            this.resultQueue.failOperation(EndPointResultFactory.createTransientError(this.endpoint, it.next().getOperationId(), serverResponseException), this.clusterId);
        }
    }

    private InputStream sendAndReceive(List<Document> list) throws IOException, ServerResponseException {
        try {
            return this.client.writeOperations(list);
        } catch (ServerResponseException e) {
            markDocumentAsFailed(list, e);
            throw e;
        } catch (Exception e2) {
            markDocumentAsFailed(list, new ServerResponseException(e2.getMessage()));
            throw e2;
        }
    }

    private ProcessResponse processResponse(InputStream inputStream) throws IOException {
        Collection<EndpointResult> createResult = EndPointResultFactory.createResult(this.endpoint, inputStream);
        this.statusReceivedCounter.addAndGet(createResult.size());
        int i = 0;
        for (EndpointResult endpointResult : createResult) {
            if (endpointResult.getDetail().getResultType() == Result.ResultType.TRANSITIVE_ERROR) {
                i++;
            }
            this.resultQueue.resultReceived(endpointResult, this.clusterId);
        }
        return new ProcessResponse(i, createResult.size());
    }

    private ProcessResponse feedDocumentAndProcessResults(List<Document> list) throws ServerResponseException, IOException {
        addDocumentsToResultQueue(list);
        long currentTimeMillis = System.currentTimeMillis();
        ProcessResponse processResponse = processResponse(sendAndReceive(list));
        this.lastGatewayProcessTimeMillis.set((int) (System.currentTimeMillis() - currentTimeMillis));
        return processResponse;
    }

    private ProcessResponse pullAndProcessData(int i) throws ServerResponseException, IOException {
        int pendingSize = this.resultQueue.getPendingSize();
        this.pendingDocumentStatusCount.set(pendingSize);
        List<Document> arrayList = pendingSize > this.maxInFlightRequests ? new ArrayList<>() : getNextDocsForFeeding(i, TimeUnit.MILLISECONDS);
        if (arrayList.isEmpty() && pendingSize == 0) {
            log.finest("No document awaiting feeding, not waiting for results.");
            return new ProcessResponse(0, 0);
        }
        log.finest("Awaiting " + pendingSize + " results.");
        ProcessResponse feedDocumentAndProcessResults = feedDocumentAndProcessResults(arrayList);
        if (pendingSize > this.maxInFlightRequests && feedDocumentAndProcessResults.processResultsCount == 0) {
            try {
                Thread.sleep(300L);
            } catch (InterruptedException e) {
            }
        }
        return feedDocumentAndProcessResults;
    }

    private ThreadState cycle(ThreadState threadState) {
        switch (threadState) {
            case DISCONNECTED:
                try {
                    if (this.client.connect()) {
                        return ThreadState.CONNECTED;
                    }
                    log.log(Level.WARNING, "Connect returned null " + this.endpoint);
                    drainFirstDocumentsInQueueIfOld();
                    return ThreadState.DISCONNECTED;
                } catch (Throwable th) {
                    drainFirstDocumentsInQueueIfOld();
                    log.log(Level.INFO, "Connect did not work out " + this.endpoint, th);
                    this.executeProblemsCounter.incrementAndGet();
                    return ThreadState.DISCONNECTED;
                }
            case CONNECTED:
                try {
                    this.client.handshake();
                    this.successfullHandshakes.getAndIncrement();
                    return ThreadState.SESSION_SYNCED;
                } catch (ServerResponseException e) {
                    this.executeProblemsCounter.incrementAndGet();
                    log.log(Level.INFO, "Handshake did not work out " + this.endpoint, e.getMessage());
                    drainFirstDocumentsInQueueIfOld();
                    return ThreadState.CONNECTED;
                } catch (Throwable th2) {
                    this.executeProblemsCounter.incrementAndGet();
                    log.log(Level.INFO, "Problem with Handshake " + this.endpoint, th2.getMessage());
                    drainFirstDocumentsInQueueIfOld();
                    this.client.close();
                    return ThreadState.DISCONNECTED;
                }
            case SESSION_SYNCED:
                try {
                    this.gatewayThrottler.handleCall(pullAndProcessData(100).transitiveErrorCount);
                    return ThreadState.SESSION_SYNCED;
                } catch (ServerResponseException e2) {
                    log.info("Problems while handing data over to gateway " + this.endpoint + " " + e2.getMessage());
                    return ThreadState.CONNECTED;
                } catch (Throwable th3) {
                    log.info("Problems while handing data over to gateway  " + this.endpoint + " " + th3.getMessage());
                    this.client.close();
                    return ThreadState.DISCONNECTED;
                }
            default:
                log.severe("Should never get here.");
                this.client.close();
                return ThreadState.DISCONNECTED;
        }
    }

    private void sleepIfProblemsGettingSyncedConnection(ThreadState threadState, ThreadState threadState2) {
        if (threadState == ThreadState.SESSION_SYNCED) {
            return;
        }
        if (threadState == ThreadState.CONNECTED && threadState2 == ThreadState.DISCONNECTED) {
            return;
        }
        try {
            if (this.stopSignal.getCount() > 0 || !this.documentQueue.isEmpty()) {
                Thread.sleep(this.gatewayThrottler.distribute(3000));
            }
        } catch (InterruptedException e) {
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        ThreadState threadState = ThreadState.DISCONNECTED;
        while (true) {
            if (this.stopSignal.getCount() <= 0 && this.documentQueue.isEmpty()) {
                log.finer(toString() + " exiting, documentQueue.size()=" + this.documentQueue.size());
                this.running.countDown();
                return;
            } else {
                ThreadState threadState2 = threadState;
                threadState = cycle(threadState);
                sleepIfProblemsGettingSyncedConnection(threadState, threadState2);
            }
        }
    }

    private void drainFirstDocumentsInQueueIfOld() {
        while (true) {
            Optional<Document> pollDocumentIfTimedoutInQueue = this.documentQueue.pollDocumentIfTimedoutInQueue(this.localQueueTimeOut);
            if (!pollDocumentIfTimedoutInQueue.isPresent()) {
                return;
            }
            this.resultQueue.failOperation(EndPointResultFactory.createTransientError(this.endpoint, pollDocumentIfTimedoutInQueue.get().getOperationId(), new Exception("Not sending document operation, timed out in queue after " + pollDocumentIfTimedoutInQueue.get().timeInQueueMillis() + " ms.")), this.clusterId);
        }
    }

    private void drainDocumentQueueWhenFailingPermanently(Exception exc) {
        this.resultQueue.failPending(exc);
        Iterator<Document> it = this.documentQueue.removeAllDocuments().iterator();
        while (it.hasNext()) {
            this.resultQueue.failOperation(EndPointResultFactory.createError(this.endpoint, it.next().getOperationId(), exc), this.clusterId);
        }
    }
}
