package com.hazelcast.jet.contrib.http;

import com.hazelcast.collection.IQueue;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sink;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.jet.pipeline.StreamSource;
import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.MessageEvent;
import com.launchdarkly.eventsource.ReadyState;
import io.undertow.connector.ByteBufferPool;
import io.undertow.protocols.ssl.UndertowXnioSsl;
import io.undertow.server.DefaultByteBufferPool;
import io.undertow.websockets.client.WebSocketClient;
import io.undertow.websockets.core.AbstractReceiveListener;
import io.undertow.websockets.core.BufferedTextMessage;
import io.undertow.websockets.core.WebSocketChannel;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import javax.net.ssl.SSLContext;
import okhttp3.OkHttpClient;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.xnio.OptionMap;
import org.xnio.Options;
import org.xnio.Xnio;
import org.xnio.XnioWorker;

/* loaded from: input_file:com/hazelcast/jet/contrib/http/HttpListenerSinkTest.class */
public class HttpListenerSinkTest extends HttpTestBase {

    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private XnioWorker worker;
    private ByteBufferPool buffer;
    private Job job;
    private Closeable wsOrSseClient;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/jet/contrib/http/HttpListenerSinkTest$QueueSourceContext.class */
    public static class QueueSourceContext {
        static final int DRAIN_LIMIT = 256;
        List<String> tempCollection = new ArrayList();
        IQueue<String> queue;

        QueueSourceContext(Processor.Context context, String str) {
            this.queue = context.jetInstance().getHazelcastInstance().getQueue(str);
        }

        public void fillBuffer(SourceBuilder.SourceBuffer<String> sourceBuffer) {
            this.queue.drainTo(this.tempCollection, DRAIN_LIMIT);
            List<String> list = this.tempCollection;
            sourceBuffer.getClass();
            list.forEach((v1) -> {
                r1.add(v1);
            });
            this.tempCollection.clear();
        }
    }

    @Before
    public void setUp() throws Exception {
        createJetMember();
        this.worker = Xnio.getInstance(HttpListenerSinkTest.class.getClassLoader()).createWorker(OptionMap.builder().set(Options.WORKER_IO_THREADS, 2).getMap());
        this.buffer = new DefaultByteBufferPool(true, 256);
    }

    @After
    public void after() throws IOException {
        this.worker.shutdown();
        this.buffer.close();
        if (this.job != null) {
            this.job.cancel();
            assertJobStatusEventually(this.job, JobStatus.FAILED);
        }
        if (this.wsOrSseClient != null) {
            this.wsOrSseClient.close();
        }
    }

    @Test
    public void testWebsocket_when_clientConnectsAfterAccumulation() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().accumulateItems(100).buildWebsocket());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromWebSocket(sinkAddress, arrayBlockingQueue);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
    }

    @Test
    public void testWebsocket_when_accumulateEnabled() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().accumulateItems(100).buildWebsocket());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromWebSocket(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i * 2, arrayBlockingQueue);
        });
    }

    @Test
    public void testWebsocket_when_accumulateEnabledWithSmallNumber() {
        int i = 5;
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().accumulateItems(5).buildWebsocket());
        int i2 = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromWebSocket(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i2 + i, arrayBlockingQueue);
        });
    }

    @Test
    public void testWebsocket_when_accumulateDisabled() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().buildWebsocket());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromWebSocket(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
    }

    @Test
    public void testWebsocket_when_sslEnabled() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().sslContextFn(sslContextFn()).buildWebsocket());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromWebSocket(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
    }

    @Test
    public void testWebsocket_when_mutualAuthEnabled() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().sslContextFn(sslContextFn()).enableMutualAuthentication().buildWebsocket());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromWebSocket(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
    }

    @Test
    public void testWebsocket_when_portConfigured() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().port(8091).buildWebsocket());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        Assert.assertTrue(sinkAddress.endsWith("8091/"));
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromWebSocket(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
    }

    @Test
    public void testWebsocket_when_pathConfigured() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().path("/user").buildWebsocket());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        Assert.assertTrue(sinkAddress.endsWith("/user"));
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromWebSocket(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
    }

    @Test
    public void testWebsocket_when_toStringFnConfigured() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().toStringFn(obj -> {
            return obj.toString().toUpperCase();
        }).buildWebsocket());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromWebSocket(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
        Assert.assertEquals(10, arrayBlockingQueue.stream().filter(str -> {
            return str.startsWith("MESSAGE-");
        }).count());
    }

    @Test
    public void testSSE_when_clientConnectsAfterAccumulation() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().accumulateItems(100).buildServerSent());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromSse(sinkAddress, arrayBlockingQueue);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
    }

    @Test
    public void testSSE_when_accumulateEnabled() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().accumulateItems(100).buildServerSent());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromSse(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i * 2, arrayBlockingQueue);
        });
    }

    @Test
    public void testSSE_when_accumulateDisabled() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().buildServerSent());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromSse(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
    }

    @Test
    public void testSSE_when_sslEnabled() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().sslContextFn(sslContextFn()).buildServerSent());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromSse(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
    }

    @Test
    public void testSSE_when_mutualAuthEnabled() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().sslContextFn(sslContextFn()).enableMutualAuthentication().buildServerSent());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromSse(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
    }

    @Test
    public void testSSE_when_portConfigured() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().port(8091).buildServerSent());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        Assert.assertTrue(sinkAddress.endsWith("8091/"));
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromSse(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
    }

    @Test
    public void testSSE_when_pathConfigured() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().path("/user").buildServerSent());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        Assert.assertTrue(sinkAddress.endsWith("/user"));
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromSse(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
    }

    @Test
    public void testSSE_when_toStringFnConfigured() {
        IQueue<String> queue = this.jet.getHazelcastInstance().getQueue(randomName());
        startJob(queue, HttpListenerSinks.builder().toStringFn(obj -> {
            return obj.toString().toUpperCase();
        }).buildServerSent());
        int i = 10;
        postMessages(queue, 10);
        String sinkAddress = HttpListenerSinks.sinkAddress(this.jet, this.job);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(10 * 2);
        receiveFromSse(sinkAddress, arrayBlockingQueue);
        postMessages(queue, 10);
        assertTrueEventually(() -> {
            assertSizeEventually(i, arrayBlockingQueue);
        });
        Assert.assertEquals(10, arrayBlockingQueue.stream().filter(str -> {
            return str.startsWith("MESSAGE-");
        }).count());
    }

    private void receiveFromSse(String str, final Collection<String> collection) {
        EventHandler eventHandler = new EventHandler() { // from class: com.hazelcast.jet.contrib.http.HttpListenerSinkTest.1
            public void onOpen() {
            }

            public void onClosed() {
            }

            public void onMessage(String str2, MessageEvent messageEvent) {
                collection.add(messageEvent.getData());
            }

            public void onComment(String str2) {
            }

            public void onError(Throwable th) {
            }
        };
        OkHttpClient.Builder builder = new OkHttpClient.Builder();
        if (str.startsWith("https")) {
            builder.sslSocketFactory(((SSLContext) sslContextFn().get()).getSocketFactory(), x509TrustManager());
            builder.hostnameVerifier(new NoopHostnameVerifier());
        }
        EventSource.Builder builder2 = new EventSource.Builder(eventHandler, URI.create(str));
        builder2.client(builder.build());
        EventSource build = builder2.build();
        build.start();
        assertTrueEventually(() -> {
            Assert.assertSame(ReadyState.OPEN, build.getState());
        });
        this.wsOrSseClient = build;
    }

    private void receiveFromWebSocket(String str, final Collection<String> collection) {
        WebSocketChannel connectWithRetry = connectWithRetry(str);
        connectWithRetry.getReceiveSetter().set(new AbstractReceiveListener() { // from class: com.hazelcast.jet.contrib.http.HttpListenerSinkTest.2
            protected void onFullTextMessage(WebSocketChannel webSocketChannel, BufferedTextMessage bufferedTextMessage) {
                collection.add(bufferedTextMessage.getData());
            }
        });
        connectWithRetry.resumeReceives();
        this.wsOrSseClient = connectWithRetry;
    }

    private WebSocketChannel connectWithRetry(String str) {
        Xnio xnio = Xnio.getInstance(HttpListenerSinkTest.class.getClassLoader());
        for (int i = 0; i < 30; i++) {
            try {
                WebSocketClient.ConnectionBuilder connectionBuilder = WebSocketClient.connectionBuilder(this.worker, this.buffer, URI.create(str));
                if (str.startsWith("wss")) {
                    connectionBuilder.setSsl(new UndertowXnioSsl(xnio, OptionMap.EMPTY, (SSLContext) sslContextFn().get()));
                }
                return (WebSocketChannel) connectionBuilder.connect().get();
            } catch (Exception e) {
                this.logger.warning(e.getMessage());
                sleepAtLeastMillis(100L);
            }
        }
        throw new AssertionError("Failed to connect to " + str);
    }

    private void startJob(IQueue<String> iQueue, Sink<Object> sink) {
        Pipeline create = Pipeline.create();
        create.readFrom(queueSource(iQueue)).withoutTimestamps().writeTo(sink);
        this.job = this.jet.newJob(create);
        assertJobStatusEventually(this.job, JobStatus.RUNNING);
    }

    StreamSource<String> queueSource(IQueue<String> iQueue) {
        String name = iQueue.getName();
        return SourceBuilder.stream(name, context -> {
            return new QueueSourceContext(context, name);
        }).fillBufferFn((v0, v1) -> {
            v0.fillBuffer(v1);
        }).build();
    }

    void postMessages(IQueue<String> iQueue, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            iQueue.offer(String.format("message-%d", Integer.valueOf(i2)));
        }
        iQueue.getClass();
        assertEqualsEventually(iQueue::size, 0);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1718816293:
                if (implMethodName.equals("lambda$queueSource$8ac0a96a$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1061877853:
                if (implMethodName.equals("fillBuffer")) {
                    z = true;
                    break;
                }
                break;
            case -687821871:
                if (implMethodName.equals("lambda$testSSE_when_toStringFnConfigured$e215b3f2$1")) {
                    z = false;
                    break;
                }
                break;
            case -89699313:
                if (implMethodName.equals("lambda$testWebsocket_when_toStringFnConfigured$e215b3f2$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/contrib/http/HttpListenerSinkTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;")) {
                    return obj -> {
                        return obj.toString().toUpperCase();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/contrib/http/HttpListenerSinkTest$QueueSourceContext") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/pipeline/SourceBuilder$SourceBuffer;)V")) {
                    return (v0, v1) -> {
                        v0.fillBuffer(v1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/contrib/http/HttpListenerSinkTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lcom/hazelcast/jet/core/Processor$Context;)Lcom/hazelcast/jet/contrib/http/HttpListenerSinkTest$QueueSourceContext;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return context -> {
                        return new QueueSourceContext(context, str);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/contrib/http/HttpListenerSinkTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;")) {
                    return obj2 -> {
                        return obj2.toString().toUpperCase();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
