package fs2.aws.kinesis;

import cats.effect.Blocker;
import cats.effect.Blocker$;
import cats.effect.ConcurrentEffect;
import cats.effect.ConcurrentEffect$;
import cats.effect.ContextShift;
import cats.effect.IO$;
import cats.effect.Sync$;
import cats.effect.Timer;
import cats.implicits$;
import fs2.Chunk;
import fs2.RaiseThrowable;
import fs2.Stream;
import fs2.Stream$;
import fs2.aws.internal.Exceptions;
import fs2.concurrent.Queue;
import fs2.concurrent.Queue$;
import fs2.internal.FreeC;
import java.util.Date;
import java.util.UUID;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.concurrent.ExecutionContext;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.util.Left;
import scala.util.Right;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.retrieval.RetrievalConfig;

/* compiled from: consumer.scala */
/* loaded from: input_file:fs2/aws/kinesis/consumer$.class */
public final class consumer$ {
    public static consumer$ MODULE$;

    static {
        new consumer$();
    }

    public KinesisAsyncClient mkDefaultKinesisClient(KinesisConsumerSettings kinesisConsumerSettings) {
        return (KinesisAsyncClient) KinesisAsyncClient.builder().region(kinesisConsumerSettings.region()).credentialsProvider((AwsCredentialsProvider) kinesisConsumerSettings.stsAssumeRole().map(sTSAssumeRoleSettings -> {
            return (StsAssumeRoleCredentialsProvider) StsAssumeRoleCredentialsProvider.builder().stsClient((StsClient) StsClient.builder().build()).refreshRequest((AssumeRoleRequest) AssumeRoleRequest.builder().roleArn(sTSAssumeRoleSettings.roleArn()).roleSessionName(sTSAssumeRoleSettings.roleSessionName()).build()).build();
        }).getOrElse(() -> {
            return DefaultCredentialsProvider.create();
        })).httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Predef$.MODULE$.int2Integer(kinesisConsumerSettings.maxConcurrency()))).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Scheduler defaultScheduler(ShardRecordProcessorFactory shardRecordProcessorFactory, KinesisConsumerSettings kinesisConsumerSettings, KinesisAsyncClient kinesisAsyncClient) {
        InitialPositionInStreamExtended newInitialPositionAtTimestamp;
        ConfigsBuilder configsBuilder = new ConfigsBuilder(kinesisConsumerSettings.streamName(), kinesisConsumerSettings.appName(), kinesisAsyncClient, (DynamoDbAsyncClient) DynamoDbAsyncClient.builder().region(kinesisConsumerSettings.region()).build(), (CloudWatchAsyncClient) CloudWatchAsyncClient.builder().region(kinesisConsumerSettings.region()).build(), UUID.randomUUID().toString(), shardRecordProcessorFactory);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        Left initialPositionInStream = kinesisConsumerSettings.initialPositionInStream();
        if (initialPositionInStream instanceof Left) {
            newInitialPositionAtTimestamp = InitialPositionInStreamExtended.newInitialPosition((InitialPositionInStream) initialPositionInStream.value());
        } else {
            if (!(initialPositionInStream instanceof Right)) {
                throw new MatchError(initialPositionInStream);
            }
            newInitialPositionAtTimestamp = InitialPositionInStreamExtended.newInitialPositionAtTimestamp((Date) ((Right) initialPositionInStream).value());
        }
        retrievalConfig.initialPositionInStreamExtended(newInitialPositionAtTimestamp);
        return new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig);
    }

    public <F> FreeC<F, CommittableRecord, BoxedUnit> readFromKinesisStream(String str, String str2, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, RaiseThrowable<F> raiseThrowable) {
        FreeC<F, CommittableRecord, BoxedUnit> raiseError;
        Right apply = KinesisConsumerSettings$.MODULE$.apply(str2, str, KinesisConsumerSettings$.MODULE$.apply$default$3(), KinesisConsumerSettings$.MODULE$.apply$default$4(), KinesisConsumerSettings$.MODULE$.apply$default$5(), KinesisConsumerSettings$.MODULE$.apply$default$6(), KinesisConsumerSettings$.MODULE$.apply$default$7(), KinesisConsumerSettings$.MODULE$.apply$default$8());
        if (apply instanceof Right) {
            raiseError = readFromKinesisStream((KinesisConsumerSettings) apply.value(), concurrentEffect, contextShift);
        } else {
            if (!(apply instanceof Left)) {
                throw new MatchError(apply);
            }
            raiseError = Stream$.MODULE$.raiseError((Throwable) ((Left) apply).value(), raiseThrowable);
        }
        return raiseError;
    }

    public <F> FreeC<F, CommittableRecord, BoxedUnit> readFromKinesisStream(KinesisConsumerSettings kinesisConsumerSettings, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift) {
        return readFromKinesisStream(kinesisConsumerSettings, shardRecordProcessorFactory -> {
            return MODULE$.defaultScheduler(shardRecordProcessorFactory, kinesisConsumerSettings, MODULE$.mkDefaultKinesisClient(kinesisConsumerSettings));
        }, concurrentEffect, contextShift);
    }

    public <F> FreeC<F, CommittableRecord, BoxedUnit> readFromKinesisStream(String str, String str2, KinesisAsyncClient kinesisAsyncClient, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift, RaiseThrowable<F> raiseThrowable) {
        FreeC<F, CommittableRecord, BoxedUnit> raiseError;
        Right apply = KinesisConsumerSettings$.MODULE$.apply(str2, str, KinesisConsumerSettings$.MODULE$.apply$default$3(), KinesisConsumerSettings$.MODULE$.apply$default$4(), KinesisConsumerSettings$.MODULE$.apply$default$5(), KinesisConsumerSettings$.MODULE$.apply$default$6(), KinesisConsumerSettings$.MODULE$.apply$default$7(), KinesisConsumerSettings$.MODULE$.apply$default$8());
        if (apply instanceof Right) {
            raiseError = readFromKinesisStream((KinesisConsumerSettings) apply.value(), kinesisAsyncClient, concurrentEffect, contextShift);
        } else {
            if (!(apply instanceof Left)) {
                throw new MatchError(apply);
            }
            raiseError = Stream$.MODULE$.raiseError((Throwable) ((Left) apply).value(), raiseThrowable);
        }
        return raiseError;
    }

    public <F> FreeC<F, CommittableRecord, BoxedUnit> readFromKinesisStream(KinesisConsumerSettings kinesisConsumerSettings, KinesisAsyncClient kinesisAsyncClient, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift) {
        return readFromKinesisStream(kinesisConsumerSettings, shardRecordProcessorFactory -> {
            return MODULE$.defaultScheduler(shardRecordProcessorFactory, kinesisConsumerSettings, kinesisAsyncClient);
        }, concurrentEffect, contextShift);
    }

    public <F> FreeC<F, Chunk<CommittableRecord>, BoxedUnit> readChunkedFromKinesisStream(KinesisConsumerSettings kinesisConsumerSettings, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift) {
        return readChunksFromKinesisStream(kinesisConsumerSettings, () -> {
            return shardRecordProcessorFactory -> {
                return MODULE$.defaultScheduler(shardRecordProcessorFactory, kinesisConsumerSettings, MODULE$.mkDefaultKinesisClient(kinesisConsumerSettings));
            };
        }, concurrentEffect, contextShift);
    }

    public <F> FreeC<F, Chunk<CommittableRecord>, BoxedUnit> readChunkedFromKinesisStream(KinesisConsumerSettings kinesisConsumerSettings, KinesisAsyncClient kinesisAsyncClient, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift) {
        return readChunksFromKinesisStream(kinesisConsumerSettings, () -> {
            return shardRecordProcessorFactory -> {
                return MODULE$.defaultScheduler(shardRecordProcessorFactory, kinesisConsumerSettings, kinesisAsyncClient);
            };
        }, concurrentEffect, contextShift);
    }

    public <F> FreeC<F, CommittableRecord, BoxedUnit> readFromKinesisStream(KinesisConsumerSettings kinesisConsumerSettings, Function1<ShardRecordProcessorFactory, Scheduler> function1, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(Queue$.MODULE$.bounded(kinesisConsumerSettings.bufferSize(), concurrentEffect)), queue -> {
            return new Stream($anonfun$readFromKinesisStream$6(concurrentEffect, contextShift, function1, kinesisConsumerSettings, queue));
        });
    }

    public <F> FreeC<F, Chunk<CommittableRecord>, BoxedUnit> readChunksFromKinesisStream(KinesisConsumerSettings kinesisConsumerSettings, Function0<Function1<ShardRecordProcessorFactory, Scheduler>> function0, ConcurrentEffect<F> concurrentEffect, ContextShift<F> contextShift) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(Queue$.MODULE$.bounded(kinesisConsumerSettings.bufferSize(), concurrentEffect)), queue -> {
            return new Stream($anonfun$readChunksFromKinesisStream$4(concurrentEffect, contextShift, function0, kinesisConsumerSettings, queue));
        });
    }

    public <F> Function1<Stream<F, CommittableRecord>, Stream<F, KinesisClientRecord>> checkpointRecords(KinesisCheckpointSettings kinesisCheckpointSettings, int i, ConcurrentEffect<F> concurrentEffect, Timer<F> timer) {
        return obj -> {
            return new Stream($anonfun$checkpointRecords$6(concurrentEffect, kinesisCheckpointSettings, i, timer, ((Stream) obj).fs2$Stream$$free()));
        };
    }

    public <F> KinesisCheckpointSettings checkpointRecords$default$1() {
        return KinesisCheckpointSettings$.MODULE$.defaultInstance();
    }

    public <F> int checkpointRecords$default$2() {
        return 10;
    }

    public <F> Function1<Stream<F, CommittableRecord>, Stream<F, BoxedUnit>> checkpointRecords_(KinesisCheckpointSettings kinesisCheckpointSettings, ConcurrentEffect<F> concurrentEffect, Timer<F> timer) {
        return obj -> {
            return new Stream($anonfun$checkpointRecords_$1(kinesisCheckpointSettings, concurrentEffect, timer, ((Stream) obj).fs2$Stream$$free()));
        };
    }

    public <F> KinesisCheckpointSettings checkpointRecords_$default$1() {
        return KinesisCheckpointSettings$.MODULE$.defaultInstance();
    }

    public static final /* synthetic */ void $anonfun$readFromKinesisStream$4(ConcurrentEffect concurrentEffect, Queue queue, CommittableRecord committableRecord) {
        ConcurrentEffect$.MODULE$.apply(concurrentEffect).runAsync(queue.enqueue1(committableRecord), either -> {
            return IO$.MODULE$.unit();
        }).unsafeRunSync();
    }

    private static final FreeC instantiateWorker$1(Queue queue, Function1 function1, ConcurrentEffect concurrentEffect, KinesisConsumerSettings kinesisConsumerSettings) {
        return Stream$.MODULE$.emit(function1.apply(() -> {
            return new SingleRecordProcessor(committableRecord -> {
                $anonfun$readFromKinesisStream$4(concurrentEffect, queue, committableRecord);
                return BoxedUnit.UNIT;
            }, kinesisConsumerSettings.terminateGracePeriod());
        }));
    }

    public static final /* synthetic */ Object $anonfun$readFromKinesisStream$8(Scheduler scheduler, ConcurrentEffect concurrentEffect, ContextShift contextShift, ExecutionContext executionContext) {
        return Blocker$.MODULE$.delay$extension(executionContext, () -> {
            scheduler.run();
        }, concurrentEffect, contextShift);
    }

    public static final /* synthetic */ FreeC $anonfun$readFromKinesisStream$7(Queue queue, ConcurrentEffect concurrentEffect, ContextShift contextShift, Scheduler scheduler) {
        return Stream$.MODULE$.onFinalize$extension(Stream$.MODULE$.concurrently$extension(queue.dequeue(), Stream$.MODULE$.eval(Blocker$.MODULE$.apply(concurrentEffect).use(obj -> {
            return $anonfun$readFromKinesisStream$8(scheduler, concurrentEffect, contextShift, ((Blocker) obj).blockingContext());
        }, concurrentEffect)), concurrentEffect), Sync$.MODULE$.apply(concurrentEffect).delay(() -> {
            scheduler.shutdown();
        }), concurrentEffect);
    }

    public static final /* synthetic */ FreeC $anonfun$readFromKinesisStream$6(ConcurrentEffect concurrentEffect, ContextShift contextShift, Function1 function1, KinesisConsumerSettings kinesisConsumerSettings, Queue queue) {
        return Stream$.MODULE$.flatMap$extension(instantiateWorker$1(queue, function1, concurrentEffect, kinesisConsumerSettings), scheduler -> {
            return new Stream($anonfun$readFromKinesisStream$7(queue, concurrentEffect, contextShift, scheduler));
        });
    }

    public static final /* synthetic */ void $anonfun$readChunksFromKinesisStream$2(ConcurrentEffect concurrentEffect, Queue queue, Chunk chunk) {
        ConcurrentEffect$.MODULE$.apply(concurrentEffect).runAsync(queue.enqueue1(chunk), either -> {
            return IO$.MODULE$.unit();
        }).unsafeRunSync();
    }

    private static final FreeC instantiateWorker$2(Queue queue, Function0 function0, ConcurrentEffect concurrentEffect, KinesisConsumerSettings kinesisConsumerSettings) {
        return Stream$.MODULE$.emit(((Function1) function0.apply()).apply(() -> {
            return new ChunkedRecordProcessor(chunk -> {
                $anonfun$readChunksFromKinesisStream$2(concurrentEffect, queue, chunk);
                return BoxedUnit.UNIT;
            }, kinesisConsumerSettings.terminateGracePeriod());
        }));
    }

    public static final /* synthetic */ Object $anonfun$readChunksFromKinesisStream$6(Scheduler scheduler, ConcurrentEffect concurrentEffect, ContextShift contextShift, ExecutionContext executionContext) {
        return Blocker$.MODULE$.delay$extension(executionContext, () -> {
            scheduler.run();
        }, concurrentEffect, contextShift);
    }

    public static final /* synthetic */ FreeC $anonfun$readChunksFromKinesisStream$5(Queue queue, ConcurrentEffect concurrentEffect, ContextShift contextShift, Scheduler scheduler) {
        return Stream$.MODULE$.onFinalize$extension(Stream$.MODULE$.concurrently$extension(queue.dequeue(), Stream$.MODULE$.eval(Blocker$.MODULE$.apply(concurrentEffect).use(obj -> {
            return $anonfun$readChunksFromKinesisStream$6(scheduler, concurrentEffect, contextShift, ((Blocker) obj).blockingContext());
        }, concurrentEffect)), concurrentEffect), Sync$.MODULE$.apply(concurrentEffect).delay(() -> {
            scheduler.shutdown();
        }), concurrentEffect);
    }

    public static final /* synthetic */ FreeC $anonfun$readChunksFromKinesisStream$4(ConcurrentEffect concurrentEffect, ContextShift contextShift, Function0 function0, KinesisConsumerSettings kinesisConsumerSettings, Queue queue) {
        return Stream$.MODULE$.flatMap$extension(instantiateWorker$2(queue, function0, concurrentEffect, kinesisConsumerSettings), scheduler -> {
            return new Stream($anonfun$readChunksFromKinesisStream$5(queue, concurrentEffect, contextShift, scheduler));
        });
    }

    public static final /* synthetic */ void $anonfun$checkpointRecords$3(CommittableRecord committableRecord, Function1 function1) {
        if (!committableRecord.canCheckpoint()) {
            function1.apply(package$.MODULE$.Left().apply(new Exceptions.KinesisCheckpointException("Record processor has been shutdown and therefore cannot checkpoint records")));
        } else {
            committableRecord.checkpoint();
            function1.apply(package$.MODULE$.Right().apply(committableRecord.record()));
        }
    }

    public static final /* synthetic */ FreeC $anonfun$checkpointRecords$2(ConcurrentEffect concurrentEffect, CommittableRecord committableRecord) {
        return Stream$.MODULE$.eval_(concurrentEffect.async(function1 -> {
            $anonfun$checkpointRecords$3(committableRecord, function1);
            return BoxedUnit.UNIT;
        }));
    }

    public static final /* synthetic */ FreeC $anonfun$checkpointRecords$1(KinesisCheckpointSettings kinesisCheckpointSettings, Timer timer, ConcurrentEffect concurrentEffect, FreeC freeC) {
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.collect$extension(Stream$.MODULE$.groupWithin$extension(freeC, kinesisCheckpointSettings.maxBatchSize(), kinesisCheckpointSettings.maxBatchWait(), timer, concurrentEffect), new consumer$$anonfun$$nestedInanonfun$checkpointRecords$1$1()), committableRecord -> {
            return new Stream($anonfun$checkpointRecords$2(concurrentEffect, committableRecord));
        });
    }

    public static final /* synthetic */ FreeC $anonfun$checkpointRecords$4(FreeC freeC) {
        return Stream$.MODULE$.map$extension(freeC, committableRecord -> {
            return committableRecord.record();
        });
    }

    private static final Function1 bypass$1() {
        return obj -> {
            return new Stream($anonfun$checkpointRecords$4(((Stream) obj).fs2$Stream$$free()));
        };
    }

    public static final /* synthetic */ FreeC $anonfun$checkpointRecords$9(KinesisCheckpointSettings kinesisCheckpointSettings, int i, ConcurrentEffect concurrentEffect, Timer timer, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        return Stream$.MODULE$.broadcastThrough$extension0(((Stream) tuple2._2()).fs2$Stream$$free(), Predef$.MODULE$.wrapRefArray(new Function1[]{obj -> {
            return new Stream($anonfun$checkpointRecords$1(kinesisCheckpointSettings, timer, concurrentEffect, ((Stream) obj).fs2$Stream$$free()));
        }, bypass$1()}), concurrentEffect);
    }

    public static final /* synthetic */ FreeC $anonfun$checkpointRecords$6(ConcurrentEffect concurrentEffect, KinesisCheckpointSettings kinesisCheckpointSettings, int i, Timer timer, FreeC freeC) {
        return Stream$.MODULE$.parJoinUnbounded$extension(Stream$.MODULE$.map$extension(Stream$.MODULE$.through$extension(freeC, fs2.aws.core.package$.MODULE$.groupBy(committableRecord -> {
            return concurrentEffect.delay(() -> {
                return committableRecord.shardId();
            });
        }, concurrentEffect)), tuple2 -> {
            return new Stream($anonfun$checkpointRecords$9(kinesisCheckpointSettings, i, concurrentEffect, timer, tuple2));
        }), Predef$.MODULE$.$conforms(), Predef$.MODULE$.$conforms(), concurrentEffect);
    }

    public static final /* synthetic */ FreeC $anonfun$checkpointRecords_$1(KinesisCheckpointSettings kinesisCheckpointSettings, ConcurrentEffect concurrentEffect, Timer timer, FreeC freeC) {
        return ((Stream) implicits$.MODULE$.toFunctorOps(new Stream(Stream$.MODULE$.through$extension(freeC, MODULE$.checkpointRecords(kinesisCheckpointSettings, MODULE$.checkpointRecords$default$2(), concurrentEffect, timer))), Stream$.MODULE$.monadErrorInstance(concurrentEffect)).void()).fs2$Stream$$free();
    }

    private consumer$() {
        MODULE$ = this;
    }
}
