package com.azure.monitor.ingestion;

import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.exception.HttpResponseException;
import com.azure.core.http.rest.RequestOptions;
import com.azure.core.http.rest.Response;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.monitor.ingestion.implementation.Batcher;
import com.azure.monitor.ingestion.implementation.IngestionUsingDataCollectionRulesAsyncClient;
import com.azure.monitor.ingestion.implementation.LogsIngestionRequest;
import com.azure.monitor.ingestion.implementation.UploadLogsResponseHolder;
import com.azure.monitor.ingestion.implementation.Utils;
import com.azure.monitor.ingestion.models.LogsUploadError;
import com.azure.monitor.ingestion.models.LogsUploadException;
import com.azure.monitor.ingestion.models.LogsUploadOptions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;

@ServiceClient(isAsync = true, builder = LogsIngestionClientBuilder.class)
/* loaded from: input_file:com/azure/monitor/ingestion/LogsIngestionAsyncClient.class */
public final class LogsIngestionAsyncClient {
    private final IngestionUsingDataCollectionRulesAsyncClient service;

    /* JADX INFO: Access modifiers changed from: package-private */
    public LogsIngestionAsyncClient(IngestionUsingDataCollectionRulesAsyncClient ingestionUsingDataCollectionRulesAsyncClient) {
        this.service = ingestionUsingDataCollectionRulesAsyncClient;
    }

    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> upload(String str, String str2, Iterable<Object> iterable) {
        return upload(str, str2, iterable, new LogsUploadOptions());
    }

    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> upload(String str, String str2, Iterable<Object> iterable, LogsUploadOptions logsUploadOptions) {
        return FluxUtil.withContext(context -> {
            return upload(str, str2, iterable, logsUploadOptions, context);
        });
    }

    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Void>> uploadWithResponse(String str, String str2, BinaryData binaryData, RequestOptions requestOptions) {
        Objects.requireNonNull(str, "'ruleId' cannot be null.");
        Objects.requireNonNull(str2, "'streamName' cannot be null.");
        Objects.requireNonNull(binaryData, "'logs' cannot be null.");
        if (requestOptions == null) {
            requestOptions = new RequestOptions();
        }
        requestOptions.addRequestCallback(httpRequest -> {
            if (httpRequest.getHeaders().get(Utils.CONTENT_ENCODING) == null) {
                httpRequest.setBody(BinaryData.fromBytes(Utils.gzipRequest(binaryData.toBytes())));
                httpRequest.setHeader(Utils.CONTENT_ENCODING, Utils.GZIP);
            }
        });
        return this.service.uploadWithResponse(str, str2, binaryData, requestOptions);
    }

    Mono<Void> upload(String str, String str2, Iterable<Object> iterable, LogsUploadOptions logsUploadOptions, Context context) {
        return Mono.defer(() -> {
            return splitAndUpload(str, str2, iterable, logsUploadOptions, context);
        });
    }

    private Mono<Void> splitAndUpload(String str, String str2, Iterable<Object> iterable, LogsUploadOptions logsUploadOptions, Context context) {
        return new Batcher(logsUploadOptions, iterable).toFlux().flatMapSequential(logsIngestionRequest -> {
            return uploadToService(str, str2, context, logsIngestionRequest);
        }, Utils.getConcurrency(logsUploadOptions)).handle((uploadLogsResponseHolder, synchronousSink) -> {
            processResponse(logsUploadOptions, uploadLogsResponseHolder, synchronousSink);
        }).collectList().handle((list, synchronousSink2) -> {
            processExceptions(list, synchronousSink2);
        });
    }

    private void processExceptions(List<LogsUploadException> list, SynchronousSink<Void> synchronousSink) {
        long j = 0;
        ArrayList arrayList = new ArrayList();
        for (LogsUploadException logsUploadException : list) {
            arrayList.addAll(logsUploadException.getLogsUploadErrors());
            j += logsUploadException.getFailedLogsCount();
        }
        if (arrayList.isEmpty()) {
            synchronousSink.complete();
        } else {
            synchronousSink.error(new LogsUploadException(arrayList, j));
        }
    }

    private void processResponse(LogsUploadOptions logsUploadOptions, UploadLogsResponseHolder uploadLogsResponseHolder, SynchronousSink<LogsUploadException> synchronousSink) {
        if (uploadLogsResponseHolder.getException() != null) {
            Consumer<LogsUploadError> consumer = null;
            if (logsUploadOptions != null) {
                consumer = logsUploadOptions.getLogsUploadErrorConsumer();
            }
            if (consumer != null) {
                consumer.accept(new LogsUploadError(uploadLogsResponseHolder.getException(), uploadLogsResponseHolder.getRequest().getLogs()));
            } else {
                synchronousSink.next(new LogsUploadException(Collections.singletonList(uploadLogsResponseHolder.getException()), uploadLogsResponseHolder.getRequest().getLogs().size()));
            }
        }
    }

    private Mono<UploadLogsResponseHolder> uploadToService(String str, String str2, Context context, LogsIngestionRequest logsIngestionRequest) {
        return this.service.uploadWithResponse(str, str2, BinaryData.fromBytes(logsIngestionRequest.getRequestBody()), new RequestOptions().addHeader(Utils.CONTENT_ENCODING, Utils.GZIP).setContext(context)).map(response -> {
            return new UploadLogsResponseHolder(null, null);
        }).onErrorResume(HttpResponseException.class, httpResponseException -> {
            return Mono.fromSupplier(() -> {
                return new UploadLogsResponseHolder(logsIngestionRequest, httpResponseException);
            });
        });
    }
}
