package ai.agnos.sparql.stream.client;

import ai.agnos.sparql.api.ErrorHandlerSupport;
import ai.agnos.sparql.api.MappedQuery;
import ai.agnos.sparql.api.QueryType;
import ai.agnos.sparql.api.ResultSet;
import ai.agnos.sparql.api.SparqlClientRequestFailed;
import ai.agnos.sparql.api.SparqlClientRequestFailedWithError;
import ai.agnos.sparql.api.SparqlQuery;
import ai.agnos.sparql.api.SparqlRequest;
import ai.agnos.sparql.api.SparqlResponse;
import ai.agnos.sparql.api.SparqlResponse$;
import ai.agnos.sparql.api.SparqlResult;
import ai.agnos.sparql.api.SparqlStatement;
import ai.agnos.sparql.api.StreamedQuery;
import ai.agnos.sparql.api.StreamingSparqlResult;
import ai.agnos.sparql.mapper.SparqlClientJsonProtocol$;
import akka.actor.ActorSystem;
import akka.http.scaladsl.model.ContentType;
import akka.http.scaladsl.model.HttpResponse;
import akka.http.scaladsl.model.HttpResponse$;
import akka.http.scaladsl.model.OptHttpResponse$;
import akka.http.scaladsl.model.ResponseEntity;
import akka.http.scaladsl.model.StatusCode;
import akka.http.scaladsl.model.StatusCodes;
import akka.http.scaladsl.model.StatusCodes$;
import akka.stream.ActorMaterializer;
import akka.stream.FlowShape;
import akka.stream.UniformFanInShape;
import akka.stream.UniformFanOutShape;
import akka.stream.scaladsl.Flow;
import akka.stream.scaladsl.Flow$;
import akka.stream.scaladsl.GraphDSL$;
import akka.stream.scaladsl.GraphDSL$Implicits$;
import akka.stream.scaladsl.Merge$;
import akka.stream.scaladsl.Partition$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.util.ByteString;
import akka.util.ByteString$;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.collection.LinearSeqOptimized;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.concurrent.ExecutionContext;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.Try$;
import spray.json.package$;

/* compiled from: SparqlQueryFlowBuilder.scala */
@ScalaSignature(bytes = "\u0006\u0001}3q!\u0001\u0002\u0011\u0002\u0007\u0005QB\u0001\fTa\u0006\u0014\u0018\u000f\\)vKJLh\t\\8x\u0005VLG\u000eZ3s\u0015\t\u0019A!\u0001\u0004dY&,g\u000e\u001e\u0006\u0003\u000b\u0019\taa\u001d;sK\u0006l'BA\u0004\t\u0003\u0019\u0019\b/\u0019:rY*\u0011\u0011BC\u0001\u0006C\u001etwn\u001d\u0006\u0002\u0017\u0005\u0011\u0011-[\u0002\u0001'\u0011\u0001a\u0002\u0006\r\u0011\u0005=\u0011R\"\u0001\t\u000b\u0003E\tQa]2bY\u0006L!a\u0005\t\u0003\r\u0005s\u0017PU3g!\t)b#D\u0001\u0003\u0013\t9\"AA\nTa\u0006\u0014\u0018\u000f\\\"mS\u0016tG\u000fS3ma\u0016\u00148\u000f\u0005\u0002\u001a95\t!D\u0003\u0002\u001c\r\u0005\u0019\u0011\r]5\n\u0005uQ\"aE#se>\u0014\b*\u00198eY\u0016\u00148+\u001e9q_J$\b\"B\u0010\u0001\t\u0003\u0001\u0013A\u0002\u0013j]&$H\u0005F\u0001\"!\ty!%\u0003\u0002$!\t!QK\\5u\u0011\u001d)\u0003A1A\u0007\u0004\u0019\naa]=ti\u0016lW#A\u0014\u0011\u0005!jS\"A\u0015\u000b\u0005)Z\u0013!B1di>\u0014(\"\u0001\u0017\u0002\t\u0005\\7.Y\u0005\u0003]%\u00121\"Q2u_J\u001c\u0016p\u001d;f[\"9\u0001\u0007\u0001b\u0001\u000e\u0007\t\u0014\u0001D7bi\u0016\u0014\u0018.\u00197ju\u0016\u0014X#\u0001\u001a\u0011\u0005M*T\"\u0001\u001b\u000b\u0005\u0015Y\u0013B\u0001\u001c5\u0005E\t5\r^8s\u001b\u0006$XM]5bY&TXM\u001d\u0005\bq\u0001\u0011\rQb\u0001:\u0003)!\u0017n\u001d9bi\u000eDWM]\u000b\u0002uA\u00111HP\u0007\u0002y)\u0011Q\bE\u0001\u000bG>t7-\u001e:sK:$\u0018BA =\u0005A)\u00050Z2vi&|gnQ8oi\u0016DH\u000fC\u0003B\u0001\u0011\u0005!)A\bta\u0006\u0014\u0018\u000f\\)vKJLh\t\\8x)\t\u0019%\u000bE\u0003E\u000f&cu*D\u0001F\u0015\t1E'\u0001\u0005tG\u0006d\u0017\rZ:m\u0013\tAUI\u0001\u0003GY><\bCA\rK\u0013\tY%DA\u0007Ta\u0006\u0014\u0018\u000f\u001c*fcV,7\u000f\u001e\t\u000335K!A\u0014\u000e\u0003\u001dM\u0003\u0018M]9m%\u0016\u001c\bo\u001c8tKB\u0011q\u0002U\u0005\u0003#B\u00111!\u00118z\u0011\u0015\u0019\u0006\t1\u0001U\u00031)g\u000e\u001a9pS:$h\t\\8x!\r)R+S\u0005\u0003-\n\u0011\u0001\u0003\u0013;ua\u0016sG\r]8j]R4En\\<\t\u000ba\u0003A\u0011B-\u00021I,7\u000f]8og\u0016,f.\\1sg\"\fG\u000e\\3s\r2|w\u000fF\u0001[!\u0015!u\t\u0014'P\u0011\u0015a\u0006\u0001\"\u0003^\u0003]\u0019\b/\u0019:rYF+XM]=U_N#(/Z1n\r2|w\u000f\u0006\u0002D=\")1k\u0017a\u0001)\u0002")
/* loaded from: input_file:ai/agnos/sparql/stream/client/SparqlQueryFlowBuilder.class */
public interface SparqlQueryFlowBuilder extends SparqlClientHelpers, ErrorHandlerSupport {
    @Override // ai.agnos.sparql.stream.client.SparqlClientHelpers
    ActorSystem system();

    @Override // ai.agnos.sparql.stream.client.SparqlClientHelpers
    ActorMaterializer materializer();

    ExecutionContext dispatcher();

    default Flow<SparqlRequest, SparqlResponse, Object> sparqlQueryFlow(HttpEndpointFlow<SparqlRequest> httpEndpointFlow) {
        return Flow$.MODULE$.fromGraph(GraphDSL$.MODULE$.create(builder -> {
            UniformFanOutShape add = builder.add(Partition$.MODULE$.apply(2, sparqlRequest -> {
                return BoxesRunTime.boxToInteger($anonfun$sparqlQueryFlow$2(sparqlRequest));
            }));
            UniformFanInShape add2 = builder.add(Merge$.MODULE$.apply(2, Merge$.MODULE$.apply$default$2()).named("merge.sparqlResponse"));
            GraphDSL$Implicits$.MODULE$.port2flow(add.out(0), builder).$tilde$greater(this.sparqlQueryToStreamFlow(httpEndpointFlow), builder).$tilde$greater(add2, builder);
            GraphDSL$Implicits$.MODULE$.port2flow(add.out(1), builder).$tilde$greater(this.sparqlQueryToStreamFlow(httpEndpointFlow), builder).$tilde$greater(this.responseUnmarshallerFlow(), builder).$tilde$greater(add2, builder);
            return new FlowShape(add.in(), add2.out());
        }).named("flow.sparqlRequestFlow"));
    }

    private default Flow<SparqlResponse, SparqlResponse, Object> responseUnmarshallerFlow() {
        return Flow$.MODULE$.apply().flatMapConcat(sparqlResponse -> {
            Source single;
            if (sparqlResponse != null) {
                SparqlRequest request = sparqlResponse.request();
                boolean success = sparqlResponse.success();
                List<SparqlResult> result = sparqlResponse.result();
                if (request != null) {
                    SparqlStatement statement = request.statement();
                    if (statement instanceof SparqlQuery) {
                        QueryType queryType = ((SparqlQuery) statement).queryType();
                        if (queryType instanceof MappedQuery) {
                            MappedQuery mappedQuery = (MappedQuery) queryType;
                            if (true == success) {
                                Some unapplySeq = List$.MODULE$.unapplySeq(result);
                                if (!unapplySeq.isEmpty() && unapplySeq.get() != null && ((LinearSeqOptimized) unapplySeq.get()).lengthCompare(1) == 0) {
                                    SparqlResult sparqlResult = (SparqlResult) ((LinearSeqOptimized) unapplySeq.get()).apply(0);
                                    if (sparqlResult instanceof StreamingSparqlResult) {
                                        StreamingSparqlResult streamingSparqlResult = (StreamingSparqlResult) sparqlResult;
                                        Source<ByteString, Object> dataStream = streamingSparqlResult.dataStream();
                                        Some contentType = streamingSparqlResult.contentType();
                                        if ((contentType instanceof Some) && this.isSparqlResultsJson((ContentType) contentType.value())) {
                                            single = Source$.MODULE$.fromFuture(dataStream.runFold(ByteString$.MODULE$.empty(), (byteString, byteString2) -> {
                                                return byteString.$plus$plus(byteString2);
                                            }, this.materializer()).map(byteString3 -> {
                                                SparqlResponse copy;
                                                ResultSet resultSet;
                                                Success apply = Try$.MODULE$.apply(() -> {
                                                    return (ResultSet) SparqlClientJsonProtocol$.MODULE$.format3().read(package$.MODULE$.enrichString(byteString3.utf8String()).parseJson());
                                                });
                                                if ((apply instanceof Success) && (resultSet = (ResultSet) apply.value()) != null) {
                                                    copy = sparqlResponse.copy(sparqlResponse.copy$default$1(), sparqlResponse.copy$default$2(), sparqlResponse.copy$default$3(), mappedQuery.mapper().map(resultSet), sparqlResponse.copy$default$5());
                                                } else {
                                                    if (!(apply instanceof Failure)) {
                                                        throw new MatchError(apply);
                                                    }
                                                    Throwable exception = ((Failure) apply).exception();
                                                    this.errorHandler().handleError(exception, this.system());
                                                    copy = sparqlResponse.copy(sparqlResponse.copy$default$1(), false, sparqlResponse.copy$default$3(), Nil$.MODULE$, new Some(new SparqlClientRequestFailedWithError("failed to un-marshall result", exception)));
                                                }
                                                return copy;
                                            }, this.dispatcher()));
                                            return single;
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            }
            if (sparqlResponse != null) {
                Some unapplySeq2 = List$.MODULE$.unapplySeq(sparqlResponse.result());
                if (!unapplySeq2.isEmpty() && unapplySeq2.get() != null && ((LinearSeqOptimized) unapplySeq2.get()).lengthCompare(1) == 0) {
                    SparqlResult sparqlResult2 = (SparqlResult) ((LinearSeqOptimized) unapplySeq2.get()).apply(0);
                    if (sparqlResult2 instanceof StreamingSparqlResult) {
                        StreamingSparqlResult streamingSparqlResult2 = (StreamingSparqlResult) sparqlResult2;
                        Source<ByteString, Object> dataStream2 = streamingSparqlResult2.dataStream();
                        Option<ContentType> contentType2 = streamingSparqlResult2.contentType();
                        single = (Source) dataStream2.map(byteString4 -> {
                            return sparqlResponse.copy(sparqlResponse.copy$default$1(), false, sparqlResponse.copy$default$3(), Nil$.MODULE$, new Some(new SparqlClientRequestFailed(new StringBuilder(30).append("unsupported result type [").append(contentType2.getOrElse(() -> {
                                return "Unknown";
                            })).append("] ").append(byteString4.take(1024).utf8String()).append("...").toString())));
                        });
                        return single;
                    }
                }
            }
            if (sparqlResponse == null) {
                throw new MatchError(sparqlResponse);
            }
            single = Source$.MODULE$.single(sparqlResponse.copy(sparqlResponse.copy$default$1(), false, sparqlResponse.copy$default$3(), Nil$.MODULE$, new Some(new SparqlClientRequestFailed("invalid request"))));
            return single;
        });
    }

    private default Flow<SparqlRequest, SparqlResponse, Object> sparqlQueryToStreamFlow(HttpEndpointFlow<SparqlRequest> httpEndpointFlow) {
        Flow map = Flow$.MODULE$.apply().map(sparqlRequest -> {
            if (sparqlRequest != null) {
                SparqlStatement statement = sparqlRequest.statement();
                if (statement instanceof SparqlQuery) {
                    return new Tuple2(this.makeHttpRequest(httpEndpointFlow.endpoint(), (SparqlQuery) statement), sparqlRequest);
                }
            }
            throw new MatchError(sparqlRequest);
        });
        Function1 log$default$2 = map.log$default$2();
        Flow via = map.log("SPARQL endpoint request", log$default$2, map.log$default$3("SPARQL endpoint request", log$default$2)).via(httpEndpointFlow.flow());
        Function1 log$default$22 = via.log$default$2();
        return via.log("SPARQL endpoint response", log$default$22, via.log$default$3("SPARQL endpoint response", log$default$22)).map(tuple2 -> {
            SparqlResponse sparqlResponse;
            if (tuple2 != null) {
                Success success = (Try) tuple2._1();
                SparqlRequest sparqlRequest2 = (SparqlRequest) tuple2._2();
                if (success instanceof Success) {
                    HttpResponse unapply = HttpResponse$.MODULE$.unapply((HttpResponse) success.value());
                    if (!OptHttpResponse$.MODULE$.isEmpty$extension(unapply)) {
                        StatusCode _1 = unapply._1();
                        ResponseEntity _3 = unapply._3();
                        StatusCodes.Success OK = StatusCodes$.MODULE$.OK();
                        sparqlResponse = new SparqlResponse(sparqlRequest2, _1 != null ? _1.equals(OK) : OK == null, _1, Nil$.MODULE$.$colon$colon(new StreamingSparqlResult(_3.dataBytes(), new Some(_3.contentType()))), SparqlResponse$.MODULE$.apply$default$5());
                        return sparqlResponse;
                    }
                }
            }
            if (tuple2 != null) {
                Failure failure = (Try) tuple2._1();
                SparqlRequest sparqlRequest3 = (SparqlRequest) tuple2._2();
                if (failure instanceof Failure) {
                    sparqlResponse = new SparqlResponse(sparqlRequest3, false, SparqlResponse$.MODULE$.apply$default$3(), SparqlResponse$.MODULE$.apply$default$4(), new Some(new SparqlClientRequestFailedWithError("failed to execute sparql query", failure.exception())));
                    return sparqlResponse;
                }
            }
            throw new MatchError(tuple2);
        });
    }

    static /* synthetic */ int $anonfun$sparqlQueryFlow$2(SparqlRequest sparqlRequest) {
        int i;
        if (sparqlRequest != null) {
            SparqlStatement statement = sparqlRequest.statement();
            if ((statement instanceof SparqlQuery) && (((SparqlQuery) statement).queryType() instanceof StreamedQuery)) {
                i = 0;
                return i;
            }
        }
        if (sparqlRequest != null) {
            SparqlStatement statement2 = sparqlRequest.statement();
            if ((statement2 instanceof SparqlQuery) && (((SparqlQuery) statement2).queryType() instanceof MappedQuery)) {
                i = 1;
                return i;
            }
        }
        throw new MatchError(sparqlRequest);
    }

    static void $init$(SparqlQueryFlowBuilder sparqlQueryFlowBuilder) {
    }
}
