package almond.interpreter.messagehandlers;

import almond.channels.Channel;
import almond.channels.Channel$Publish$;
import almond.interpreter.Message;
import almond.logger.Logger;
import almond.logger.LoggerContext;
import almond.protocol.Status;
import almond.protocol.Status$;
import argonaut.DecodeJson;
import argonaut.Json;
import cats.effect.Fiber;
import cats.effect.IO;
import cats.effect.IO$;
import fs2.Stream;
import fs2.Stream$;
import fs2.concurrent.Queue;
import fs2.concurrent.Queue$;
import fs2.internal.FreeC;
import java.io.Serializable;
import scala.$less$colon$less$;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.PartialFunction;
import scala.PartialFunction$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.Set;
import scala.concurrent.ExecutionContext;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ModuleSerializationProxy;
import scala.util.Either;
import scala.util.Left;
import scala.util.Right;

/* compiled from: MessageHandler.scala */
/* loaded from: input_file:almond/interpreter/messagehandlers/MessageHandler$.class */
public final class MessageHandler$ implements Serializable {
    public static final MessageHandler$ MODULE$ = new MessageHandler$();

    public MessageHandler empty() {
        return new MessageHandler(PartialFunction$.MODULE$.empty());
    }

    public <T> MessageHandler apply(Channel channel, String str, Function1<Message<T>, FreeC<?, BoxedUnit>> function1, DecodeJson<T> decodeJson) {
        return new MessageHandler(new MessageHandler$$anonfun$apply$1(channel, str, decodeJson, function1));
    }

    public <T> MessageHandler apply(Set<Channel> set, String str, Function2<Channel, Message<T>, FreeC<?, BoxedUnit>> function2, DecodeJson<T> decodeJson) {
        return new MessageHandler(new MessageHandler$$anonfun$apply$2(str, set, decodeJson, function2));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <T> Either.RightProjection<Exception, Message<T>> almond$interpreter$messagehandlers$MessageHandler$$tryDecode(Message<Json> message, DecodeJson<T> decodeJson) {
        return message.decodeAs($less$colon$less$.MODULE$.refl(), decodeJson).left().map(str -> {
            return new Exception(new StringBuilder(24).append("Error decoding message: ").append(str).toString());
        }).right();
    }

    public <T> MessageHandler blocking(Channel channel, String str, ExecutionContext executionContext, LoggerContext loggerContext, Function2<Message<T>, Queue<IO, Tuple2<Channel, almond.channels.Message>>, IO<BoxedUnit>> function2, DecodeJson<T> decodeJson) {
        return apply(channel, str, message -> {
            return new Stream($anonfun$blocking$1(executionContext, loggerContext, function2, message));
        }, decodeJson);
    }

    private FreeC<?, BoxedUnit> blockingTaskStream(Message<?> message, ExecutionContext executionContext, LoggerContext loggerContext, Function1<Queue<IO, Tuple2<Channel, almond.channels.Message>>, IO<BoxedUnit>> function1) {
        Logger apply = loggerContext.apply(getClass());
        Tuple2 tuple2 = null;
        return Stream$.MODULE$.flatMap$extension(Stream$.MODULE$.eval(((IO) Queue$.MODULE$.bounded(40, IO$.MODULE$.ioConcurrentEffect(IO$.MODULE$.contextShift(executionContext)))).map(queue -> {
            return new Tuple2(queue, (IO) function1.apply(queue));
        }).flatMap(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError(tuple22);
            }
            Queue queue2 = (Queue) tuple22._1();
            IO io = (IO) tuple22._2();
            return status$1(queue2, Status$.MODULE$.busy(), message).flatMap(boxedUnit -> {
                return io.attempt().map(either -> {
                    $anonfun$blockingTaskStream$4(apply, message, either);
                    return BoxedUnit.UNIT;
                }).flatMap(boxedUnit -> {
                    return status$1(queue2, Status$.MODULE$.idle(), message).flatMap(boxedUnit -> {
                        return ((IO) queue2.enqueue1(tuple2)).map(boxedUnit -> {
                            BoxedUnit.UNIT;
                            return BoxedUnit.UNIT;
                        });
                    });
                });
            }).attempt().flatMap(either -> {
                IO unit;
                if (!(either instanceof Left)) {
                    if (either instanceof Right) {
                        BoxedUnit boxedUnit2 = (BoxedUnit) ((Right) either).value();
                        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                        if (boxedUnit3 != null ? boxedUnit3.equals(boxedUnit2) : boxedUnit2 == null) {
                            unit = IO$.MODULE$.unit();
                        }
                    }
                    throw new MatchError(either);
                }
                Throwable th = (Throwable) ((Left) either).value();
                if (apply.underlying().errorEnabled()) {
                    apply.underlying().error(new StringBuilder(40).append("Internal error while processing ").append(message.header().msg_type()).append(" message").toString(), th);
                    BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                } else {
                    BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
                }
                unit = IO$.MODULE$.raiseError(th);
                return unit;
            }).start(IO$.MODULE$.contextShift(executionContext)).map(fiber -> {
                return new Stream($anonfun$blockingTaskStream$10(queue2, tuple2, fiber));
            });
        })), obj -> {
            return new Stream($anonfun$blockingTaskStream$12(((Stream) obj).fs2$Stream$$free()));
        });
    }

    public MessageHandler discard(PartialFunction<Tuple2<Channel, Message<Json>>, BoxedUnit> partialFunction) {
        return new MessageHandler(partialFunction.andThen(boxedUnit -> {
            return package$.MODULE$.Right().apply(new Stream(Stream$.MODULE$.empty()));
        }));
    }

    public MessageHandler apply(PartialFunction<Tuple2<Channel, Message<Json>>, Either<Throwable, FreeC<?, BoxedUnit>>> partialFunction) {
        return new MessageHandler(partialFunction);
    }

    public Option<PartialFunction<Tuple2<Channel, Message<Json>>, Either<Throwable, FreeC<?, BoxedUnit>>>> unapply(MessageHandler messageHandler) {
        return messageHandler == null ? None$.MODULE$ : new Some(messageHandler.handler());
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(MessageHandler$.class);
    }

    public static final /* synthetic */ FreeC $anonfun$blocking$1(ExecutionContext executionContext, LoggerContext loggerContext, Function2 function2, Message message) {
        return MODULE$.blockingTaskStream(message, executionContext, loggerContext, queue -> {
            return (IO) function2.apply(message, queue);
        });
    }

    private static final IO status$1(Queue queue, String str, Message message) {
        return message.publish(Status$.MODULE$.messageType(), new Status(str), message.publish$default$3(), message.publish$default$4()).enqueueOn(Channel$Publish$.MODULE$, queue, Status$.MODULE$.encoder());
    }

    public static final /* synthetic */ void $anonfun$blockingTaskStream$5(Logger logger, Message message, Throwable th) {
        if (!logger.underlying().errorEnabled()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            logger.underlying().error(new StringBuilder(31).append("Error while processing ").append(message.header().msg_type()).append(" message").toString(), th);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$blockingTaskStream$4(Logger logger, Message message, Either either) {
        either.left().foreach(th -> {
            $anonfun$blockingTaskStream$5(logger, message, th);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ boolean $anonfun$blockingTaskStream$11(Tuple2 tuple2, Tuple2 tuple22) {
        return tuple22 != null ? !tuple22.equals(tuple2) : tuple2 != null;
    }

    public static final /* synthetic */ FreeC $anonfun$blockingTaskStream$10(Queue queue, Tuple2 tuple2, Fiber fiber) {
        FreeC dequeue = queue.dequeue();
        return Stream$.MODULE$.takeWhile$extension(dequeue, tuple22 -> {
            return BoxesRunTime.boxToBoolean($anonfun$blockingTaskStream$11(tuple2, tuple22));
        }, Stream$.MODULE$.takeWhile$default$2$extension(dequeue));
    }

    public static final /* synthetic */ FreeC $anonfun$blockingTaskStream$12(FreeC freeC) {
        return freeC;
    }

    private MessageHandler$() {
    }
}
