package io.mantisrx.api.push;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import com.netflix.spectator.api.Counter;
import com.netflix.zuul.netty.SpectatorUtils;
import io.mantisrx.api.Constants;
import io.mantisrx.api.Util;
import io.mantisrx.api.services.JobDiscoveryService;
import io.mantisrx.api.tunnel.MantisCrossRegionalClient;
import io.mantisrx.client.MantisClient;
import io.mantisrx.client.SinkConnectionFunc;
import io.mantisrx.client.SseSinkConnectionFunction;
import io.mantisrx.common.MantisServerSentEvent;
import io.mantisrx.runtime.parameter.SinkParameters;
import io.mantisrx.server.core.JobSchedulingInfo;
import io.mantisrx.server.worker.client.SseWorkerConnectionFunction;
import io.mantisrx.server.worker.client.WorkerConnectionsStatus;
import io.mantisrx.server.worker.client.WorkerMetricsClient;
import io.mantisrx.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import io.vavr.control.Try;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientRequest;
import mantis.io.reactivex.netty.protocol.http.client.HttpClientResponse;
import mantis.io.reactivex.netty.protocol.http.sse.ServerSentEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.functions.Action1;

@Singleton
/* loaded from: input_file:io/mantisrx/api/push/ConnectionBroker.class */
public class ConnectionBroker {
    private static final Logger log = LoggerFactory.getLogger(ConnectionBroker.class);
    private final MantisClient mantisClient;
    private final MantisCrossRegionalClient mantisCrossRegionalClient;
    private final WorkerMetricsClient workerMetricsClient;
    private final JobDiscoveryService jobDiscoveryService;
    private final Scheduler scheduler;
    private final ObjectMapper objectMapper;
    private final Map<PushConnectionDetails, Observable<String>> connectionCache = new WeakHashMap();

    @Inject
    public ConnectionBroker(MantisClient mantisClient, MantisCrossRegionalClient mantisCrossRegionalClient, WorkerMetricsClient workerMetricsClient, @Named("io-scheduler") Scheduler scheduler, ObjectMapper objectMapper) {
        this.mantisClient = mantisClient;
        this.mantisCrossRegionalClient = mantisCrossRegionalClient;
        this.workerMetricsClient = workerMetricsClient;
        this.jobDiscoveryService = JobDiscoveryService.getInstance(mantisClient, scheduler);
        this.scheduler = scheduler;
        this.objectMapper = objectMapper;
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:4:0x0018. Please report as an issue. */
    public Observable<String> connect(PushConnectionDetails pushConnectionDetails) {
        if (!this.connectionCache.containsKey(pushConnectionDetails)) {
            switch (pushConnectionDetails.type) {
                case CONNECT_BY_NAME:
                    return getConnectByNameFor(pushConnectionDetails).subscribeOn(this.scheduler).doOnUnsubscribe(() -> {
                        log.info("Purging {} from cache.", pushConnectionDetails);
                        this.connectionCache.remove(pushConnectionDetails);
                    }).doOnCompleted(() -> {
                        log.info("Purging {} from cache.", pushConnectionDetails);
                        this.connectionCache.remove(pushConnectionDetails);
                    }).share();
                case CONNECT_BY_ID:
                    return getConnectByIdFor(pushConnectionDetails).subscribeOn(this.scheduler).doOnUnsubscribe(() -> {
                        log.info("Purging {} from cache.", pushConnectionDetails);
                        this.connectionCache.remove(pushConnectionDetails);
                    }).doOnCompleted(() -> {
                        log.info("Purging {} from cache.", pushConnectionDetails);
                        this.connectionCache.remove(pushConnectionDetails);
                    }).share();
                case METRICS:
                    return getWorkerMetrics(pushConnectionDetails).subscribeOn(this.scheduler).doOnUnsubscribe(() -> {
                        log.info("Purging {} from cache.", pushConnectionDetails);
                        this.connectionCache.remove(pushConnectionDetails);
                    }).doOnCompleted(() -> {
                        log.info("Purging {} from cache.", pushConnectionDetails);
                        this.connectionCache.remove(pushConnectionDetails);
                    });
                case JOB_STATUS:
                    this.connectionCache.put(pushConnectionDetails, this.mantisClient.getJobStatusObservable(pushConnectionDetails.target).subscribeOn(this.scheduler).doOnCompleted(() -> {
                        log.info("Purging {} from cache.", pushConnectionDetails);
                        this.connectionCache.remove(pushConnectionDetails);
                    }).doOnUnsubscribe(() -> {
                        log.info("Purging {} from cache.", pushConnectionDetails);
                        this.connectionCache.remove(pushConnectionDetails);
                    }).replay(25).autoConnect());
                    log.info("Caching connection for: {}", pushConnectionDetails);
                    break;
                case JOB_SCHEDULING_INFO:
                    this.connectionCache.put(pushConnectionDetails, this.mantisClient.getSchedulingChanges(pushConnectionDetails.target).subscribeOn(this.scheduler).map(jobSchedulingInfo -> {
                        return (String) Try.of(() -> {
                            return this.objectMapper.writeValueAsString(jobSchedulingInfo);
                        }).getOrElse("Error");
                    }).doOnCompleted(() -> {
                        log.info("Purging {} from cache.", pushConnectionDetails);
                        this.connectionCache.remove(pushConnectionDetails);
                    }).doOnUnsubscribe(() -> {
                        log.info("Purging {} from cache.", pushConnectionDetails);
                        this.connectionCache.remove(pushConnectionDetails);
                    }).replay(1).autoConnect());
                    log.info("Caching connection for: {}", pushConnectionDetails);
                    break;
                case JOB_CLUSTER_DISCOVERY:
                    this.connectionCache.put(pushConnectionDetails, this.jobDiscoveryService.jobDiscoveryInfoStream(this.jobDiscoveryService.key(JobDiscoveryService.LookupType.JOB_CLUSTER, pushConnectionDetails.target)).subscribeOn(this.scheduler).map(jobSchedulingInfo2 -> {
                        return (String) Try.of(() -> {
                            return this.objectMapper.writeValueAsString(jobSchedulingInfo2);
                        }).getOrElse("Error");
                    }).doOnCompleted(() -> {
                        log.info("Purging {} from cache.", pushConnectionDetails);
                        this.connectionCache.remove(pushConnectionDetails);
                    }).doOnUnsubscribe(() -> {
                        log.info("Purging {} from cache.", pushConnectionDetails);
                        this.connectionCache.remove(pushConnectionDetails);
                    }).replay(1).autoConnect());
                    log.info("Caching connection for: {}", pushConnectionDetails);
                    break;
                default:
                    log.info("Caching connection for: {}", pushConnectionDetails);
                    break;
            }
        }
        return this.connectionCache.get(pushConnectionDetails);
    }

    private Observable<String> getConnectByNameFor(PushConnectionDetails pushConnectionDetails) {
        return pushConnectionDetails.regions.isEmpty() ? getResults(false, this.mantisClient, pushConnectionDetails.target, pushConnectionDetails.getSinkparameters()).flatMap(observable -> {
            return observable;
        }).map((v0) -> {
            return v0.getEventAsString();
        }) : getRemoteDataObservable(pushConnectionDetails.getUri(), pushConnectionDetails.target, pushConnectionDetails.getRegions().asJava());
    }

    private Observable<String> getConnectByIdFor(PushConnectionDetails pushConnectionDetails) {
        return pushConnectionDetails.getRegions().isEmpty() ? getResults(true, this.mantisClient, pushConnectionDetails.target, pushConnectionDetails.getSinkparameters()).flatMap(observable -> {
            return observable;
        }).map((v0) -> {
            return v0.getEventAsString();
        }) : getRemoteDataObservable(pushConnectionDetails.getUri(), pushConnectionDetails.target, pushConnectionDetails.getRegions().asJava());
    }

    private static SinkConnectionFunc<MantisServerSentEvent> getSseConnFunc(String str, SinkParameters sinkParameters) {
        return new SseSinkConnectionFunction(true, th -> {
            log.warn("Reconnecting to sink of job " + str + " after error: " + th.getMessage());
        }, sinkParameters);
    }

    private static Observable<Observable<MantisServerSentEvent>> getResults(boolean z, MantisClient mantisClient, String str, SinkParameters sinkParameters) {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        return z ? mantisClient.getSinkClientByJobId(str, getSseConnFunc(str, sinkParameters), (Observer) null).getResults() : mantisClient.getSinkClientByJobName(str, getSseConnFunc(str, sinkParameters), (Observer) null).switchMap(sinkClient -> {
            if (!sinkClient.hasError()) {
                return sinkClient.getResults();
            }
            atomicBoolean.set(true);
            return Observable.error(new Exception(sinkClient.getError()));
        }).takeWhile(observable -> {
            return Boolean.valueOf(!atomicBoolean.get());
        });
    }

    private Observable<String> getRemoteDataObservable(String str, String str2, List<String> list) {
        return Observable.from(list).flatMap(str3 -> {
            String str3 = "\\{\"mantis.meta.origin\": \"" + str3 + "\", ";
            if (str3.equalsIgnoreCase(Util.getLocalRegion())) {
                return connect(PushConnectionDetails.from(str)).map(str4 -> {
                    return str4.replaceFirst("^\\{", str3);
                });
            }
            log.info("Connecting to remote region {} at {}.", str3, str);
            return this.mantisCrossRegionalClient.getSecureSseClient(str3).submit(HttpClientRequest.createGet(str)).retryWhen(Util.getRetryFunc(log, str + " in " + str3)).doOnError(th -> {
                log.warn("Error getting response from remote SSE server for uri {} in region {}: {}", new Object[]{str, str3, th.getMessage(), th});
            }).flatMap(httpClientResponse -> {
                if (httpClientResponse.getStatus().reasonPhrase().equals("OK")) {
                    return clientResponseToObservable(httpClientResponse, str2, str3, str).map(str5 -> {
                        return str5.replaceFirst("^\\{", str3);
                    }).doOnError(th2 -> {
                        log.error(th2.getMessage());
                    });
                }
                log.warn("Unexpected response from remote sink for uri {} region {}: {}", new Object[]{str, str3, httpClientResponse.getStatus().reasonPhrase()});
                String str6 = httpClientResponse.getHeaders().get(Constants.metaErrorMsgHeader);
                if (str6 == null || str6.isEmpty()) {
                    str6 = httpClientResponse.getStatus().reasonPhrase();
                }
                return Observable.error(new Exception(str6)).map(mantisServerSentEvent -> {
                    return mantisServerSentEvent.getEventAsString();
                });
            }).subscribeOn(this.scheduler).observeOn(this.scheduler).doOnError(th2 -> {
                log.warn("Error streaming in remote data ({}). Will retry: {}", new Object[]{str3, th2.getMessage(), th2});
            }).doOnCompleted(() -> {
                log.info(String.format("remote sink connection complete for uri %s, region=%s", str, str3));
            });
        }).observeOn(this.scheduler).subscribeOn(this.scheduler).doOnError(th -> {
            log.error("Error in flatMapped cross-regional observable for {}", str, th);
        });
    }

    private Observable<String> clientResponseToObservable(HttpClientResponse<ServerSentEvent> httpClientResponse, String str, String str2, String str3) {
        Counter newCounter = SpectatorUtils.newCounter(Constants.numRemoteBytesCounterName, str, new String[]{"region", str2});
        Counter newCounter2 = SpectatorUtils.newCounter(Constants.numRemoteMessagesCounterName, str, new String[]{"region", str2});
        Counter newCounter3 = SpectatorUtils.newCounter(Constants.numSseErrorsCounterName, str, new String[]{"region", str2});
        return httpClientResponse.getContent().doOnError(th -> {
            log.warn(th.getMessage());
        }).timeout(36L, TimeUnit.SECONDS).doOnError(th2 -> {
            log.warn("Timeout getting data from remote {} connection for {}", str2, str3);
        }).filter(serverSentEvent -> {
            return Boolean.valueOf((serverSentEvent.hasEventType() && serverSentEvent.getEventTypeAsString().startsWith("error:")) || !Constants.TunnelPingMessage.equals(serverSentEvent.contentAsString()));
        }).map(serverSentEvent2 -> {
            String str4 = "";
            if (serverSentEvent2.hasEventType() && serverSentEvent2.getEventTypeAsString().startsWith("error:")) {
                log.error("SSE has error, type=" + serverSentEvent2.getEventTypeAsString() + ", content=" + serverSentEvent2.contentAsString());
                newCounter3.increment();
                throw new RuntimeException("Got error SSE event: " + serverSentEvent2.contentAsString());
            }
            try {
                str4 = serverSentEvent2.contentAsString();
                if (str4 != null) {
                    newCounter.increment(str4.length());
                    newCounter2.increment();
                }
            } catch (Exception e) {
                log.error("Could not extract data from SSE " + e.getMessage(), e);
            }
            return str4;
        });
    }

    private Observable<String> getWorkerMetrics(PushConnectionDetails pushConnectionDetails) {
        return this.workerMetricsClient.getMetricsClientByJobId(pushConnectionDetails.target, new SseWorkerConnectionFunction(true, new Action1<Throwable>() { // from class: io.mantisrx.api.push.ConnectionBroker.1
            public void call(Throwable th) {
                ConnectionBroker.log.error("Metric connection error: " + th.getMessage());
                try {
                    Thread.sleep(500L);
                } catch (InterruptedException e) {
                    ConnectionBroker.log.error("Interrupted waiting for retrying connection");
                }
            }
        }, pushConnectionDetails.getSinkparameters()), new Observer<WorkerConnectionsStatus>() { // from class: io.mantisrx.api.push.ConnectionBroker.2
            public void onCompleted() {
                ConnectionBroker.log.info("got onCompleted in WorkerConnStatus obs");
            }

            public void onError(Throwable th) {
                ConnectionBroker.log.info("got onError in WorkerConnStatus obs");
            }

            public void onNext(WorkerConnectionsStatus workerConnectionsStatus) {
                ConnectionBroker.log.info("got WorkerConnStatus {}", workerConnectionsStatus);
            }
        }).getResults().flatMap(observable -> {
            return observable.map((v0) -> {
                return v0.getEventAsString();
            });
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1275488005:
                if (implMethodName.equals("lambda$null$8d130385$1")) {
                    z = true;
                    break;
                }
                break;
            case 563148658:
                if (implMethodName.equals("lambda$null$efc811b1$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/CheckedFunction0") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/mantisrx/api/push/ConnectionBroker") && serializedLambda.getImplMethodSignature().equals("(Lio/mantisrx/server/core/JobSchedulingInfo;)Ljava/lang/String;")) {
                    ConnectionBroker connectionBroker = (ConnectionBroker) serializedLambda.getCapturedArg(0);
                    JobSchedulingInfo jobSchedulingInfo = (JobSchedulingInfo) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return this.objectMapper.writeValueAsString(jobSchedulingInfo);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("io/vavr/CheckedFunction0") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("io/mantisrx/api/push/ConnectionBroker") && serializedLambda.getImplMethodSignature().equals("(Lio/mantisrx/server/core/JobSchedulingInfo;)Ljava/lang/String;")) {
                    ConnectionBroker connectionBroker2 = (ConnectionBroker) serializedLambda.getCapturedArg(0);
                    JobSchedulingInfo jobSchedulingInfo2 = (JobSchedulingInfo) serializedLambda.getCapturedArg(1);
                    return () -> {
                        return this.objectMapper.writeValueAsString(jobSchedulingInfo2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
