package com.hazelcast.jet.elastic.impl;

import com.hazelcast.jet.core.test.TestSupport;
import com.hazelcast.jet.elastic.impl.Shard;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.List;
import org.apache.http.Header;
import org.assertj.core.api.Assertions;
import org.assertj.core.util.Lists;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.slice.SliceBuilder;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:com/hazelcast/jet/elastic/impl/ElasticSourcePTest.class */
public class ElasticSourcePTest {
    public static final String HIT_SOURCE = "{\"name\": \"Frantisek\"}";
    public static final String HIT_SOURCE2 = "{\"name\": \"Vladimir\"}";
    public static final String SCROLL_ID = "random-scroll-id";
    private static final String KEEP_ALIVE = "42m";
    private ElasticSourceP<String> processor;
    private SerializableRestClient spyClient;
    private SearchResponse response;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/jet/elastic/impl/ElasticSourcePTest$SerializableRestClient.class */
    public static class SerializableRestClient extends RestHighLevelClient implements Serializable {
        static SerializableRestClient instanceHolder;

        SerializableRestClient(RestClientBuilder restClientBuilder) {
            super(restClientBuilder.build());
        }
    }

    @Before
    public void setUp() throws Exception {
        RestClientBuilder restClientBuilder = (RestClientBuilder) Mockito.mock(RestClientBuilder.class);
        Mockito.when(restClientBuilder.build()).thenReturn((RestClient) Mockito.mock(RestClient.class, Mockito.RETURNS_DEEP_STUBS));
        SerializableRestClient serializableRestClient = (SerializableRestClient) Mockito.spy(new SerializableRestClient(restClientBuilder));
        SerializableRestClient.instanceHolder = serializableRestClient;
        this.spyClient = serializableRestClient;
        this.response = (SearchResponse) Mockito.mock(SearchResponse.class);
        Mockito.when(this.response.getScrollId()).thenReturn(SCROLL_ID);
        ((SerializableRestClient) Mockito.doReturn(this.response).when(this.spyClient)).search((SearchRequest) ArgumentMatchers.any(SearchRequest.class), (Header[]) ArgumentMatchers.any());
    }

    private TestSupport runProcessor() throws Exception {
        return runProcessor(Collections.emptyList(), false, false);
    }

    private TestSupport runProcessorWithCoLocation(List<Shard> list) throws Exception {
        return runProcessor(list, false, true);
    }

    private TestSupport runProcessor(List<Shard> list, boolean z, boolean z2) throws Exception {
        SerializableRestClient serializableRestClient = this.spyClient;
        this.processor = new ElasticSourceP<>(new ElasticSourceConfiguration(() -> {
            return serializableRestClient;
        }, () -> {
            return new SearchRequest(new String[]{"*"});
        }, (v0) -> {
            return v0.getSourceAsString();
        }, z, z2, KEEP_ALIVE, 0), list);
        return TestSupport.verifyProcessor(() -> {
            return this.processor;
        }).disableSnapshots();
    }

    @Test
    public void when_runProcessor_then_executeSearchRequestWithScroll() throws Exception {
        Mockito.when(this.response.getHits()).thenReturn(new SearchHits(new SearchHit[0], 0L, Float.NaN));
        runProcessor().expectOutput(Collections.emptyList());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(SearchRequest.class);
        ((SerializableRestClient) Mockito.verify(this.spyClient)).search((SearchRequest) forClass.capture(), new Header[0]);
        Assertions.assertThat(((SearchRequest) forClass.getValue()).scroll().keepAlive().getStringRep()).isEqualTo(KEEP_ALIVE);
    }

    @Test
    public void given_singleHit_when_runProcessor_then_produceSingleHit() throws Exception {
        SearchHit searchHit = new SearchHit(0, "id-0", new Text("ignored"), Collections.emptyMap());
        searchHit.sourceRef(new BytesArray(HIT_SOURCE));
        Mockito.when(this.response.getHits()).thenReturn(new SearchHits(new SearchHit[]{searchHit}, 1L, Float.NaN));
        SearchResponse searchResponse = (SearchResponse) Mockito.mock(SearchResponse.class);
        Mockito.when(searchResponse.getHits()).thenReturn(new SearchHits(new SearchHit[0], 1L, Float.NaN));
        ((SerializableRestClient) Mockito.doReturn(searchResponse).when(this.spyClient)).searchScroll((SearchScrollRequest) ArgumentMatchers.any(), new Header[0]);
        runProcessor().expectOutput(Lists.newArrayList(new String[]{HIT_SOURCE}));
    }

    @Test
    public void givenMultipleResults_when_runProcessor_then_useScrollIdInFollowupScrollRequest() throws Exception {
        SearchHit searchHit = new SearchHit(0, "id-0", new Text("ignored"), Collections.emptyMap());
        searchHit.sourceRef(new BytesArray(HIT_SOURCE));
        Mockito.when(this.response.getHits()).thenReturn(new SearchHits(new SearchHit[]{searchHit}, 3L, Float.NaN));
        SearchResponse searchResponse = (SearchResponse) Mockito.mock(SearchResponse.class);
        SearchHit searchHit2 = new SearchHit(1, "id-1", new Text("ignored"), Collections.emptyMap());
        searchHit2.sourceRef(new BytesArray(HIT_SOURCE2));
        Mockito.when(searchResponse.getHits()).thenReturn(new SearchHits(new SearchHit[]{searchHit2}, 3L, Float.NaN));
        SearchResponse searchResponse2 = (SearchResponse) Mockito.mock(SearchResponse.class);
        Mockito.when(searchResponse2.getHits()).thenReturn(new SearchHits(new SearchHit[0], 3L, Float.NaN));
        ((SerializableRestClient) Mockito.doReturn(searchResponse, new Object[]{searchResponse2}).when(this.spyClient)).searchScroll((SearchScrollRequest) ArgumentMatchers.any(), new Header[0]);
        runProcessor().expectOutput(Lists.newArrayList(new String[]{HIT_SOURCE, HIT_SOURCE2}));
        ArgumentCaptor forClass = ArgumentCaptor.forClass(SearchScrollRequest.class);
        ((SerializableRestClient) Mockito.verify(this.spyClient, Mockito.times(2))).searchScroll((SearchScrollRequest) forClass.capture(), new Header[0]);
        SearchScrollRequest searchScrollRequest = (SearchScrollRequest) forClass.getValue();
        Assertions.assertThat(searchScrollRequest.scrollId()).isEqualTo(SCROLL_ID);
        Assertions.assertThat(searchScrollRequest.scroll().keepAlive().getStringRep()).isEqualTo(KEEP_ALIVE);
    }

    @Test
    public void when_runProcessorWithCoLocation_thenSearchShardsWithPreference() throws Exception {
        Mockito.when(this.response.getHits()).thenReturn(new SearchHits(new SearchHit[0], 0L, Float.NaN));
        runProcessorWithCoLocation(Lists.newArrayList(new Shard[]{new Shard("my-index", 0, Shard.Prirep.p, 42, "STARTED", "10.0.0.1", "10.0.0.1:9200", "es1"), new Shard("my-index", 1, Shard.Prirep.p, 42, "STARTED", "10.0.0.1", "10.0.0.1:9200", "es1"), new Shard("my-index", 2, Shard.Prirep.p, 42, "STARTED", "10.0.0.1", "10.0.0.1:9200", "es1")})).expectOutput(Collections.emptyList());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(SearchRequest.class);
        ((SerializableRestClient) Mockito.verify(this.spyClient)).search((SearchRequest) forClass.capture(), new Header[0]);
        Assertions.assertThat(((SearchRequest) forClass.getValue()).preference()).isEqualTo("_shards:0,1,2|_only_local");
    }

    @Test
    public void when_runProcessorWithParallelism_thenUseSlicingBasedOnGlobalValues() throws Exception {
        Mockito.when(this.response.getHits()).thenReturn(new SearchHits(new SearchHit[0], 0L, Float.NaN));
        TestSupport runProcessor = runProcessor(Collections.emptyList(), true, false);
        runProcessor.localProcessorIndex(1);
        runProcessor.localParallelism(2);
        runProcessor.globalProcessorIndex(4);
        runProcessor.totalParallelism(6);
        runProcessor.expectOutput(Collections.emptyList());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(SearchRequest.class);
        ((SerializableRestClient) Mockito.verify(this.spyClient)).search((SearchRequest) forClass.capture(), new Header[0]);
        SliceBuilder slice = ((SearchRequest) forClass.getValue()).source().slice();
        Assertions.assertThat(slice.getId()).isEqualTo(4);
        Assertions.assertThat(slice.getMax()).isEqualTo(6);
    }

    @Test
    public void when_runProcessorWithCoLocationAndSlicing_thenUseSlicingBasedOnLocalValues() throws Exception {
        Mockito.when(this.response.getHits()).thenReturn(new SearchHits(new SearchHit[0], 0L, Float.NaN));
        TestSupport runProcessor = runProcessor(Lists.newArrayList(new Shard[]{new Shard("my-index", 0, Shard.Prirep.p, 42, "STARTED", "10.0.0.1", "10.0.0.1:9200", "es1"), new Shard("my-index", 1, Shard.Prirep.p, 42, "STARTED", "10.0.0.1", "10.0.0.1:9200", "es1"), new Shard("my-index", 2, Shard.Prirep.p, 42, "STARTED", "10.0.0.1", "10.0.0.1:9200", "es1")}), true, true);
        runProcessor.localProcessorIndex(1);
        runProcessor.localParallelism(2);
        runProcessor.globalProcessorIndex(4);
        runProcessor.totalParallelism(6);
        runProcessor.expectOutput(Collections.emptyList());
        ArgumentCaptor forClass = ArgumentCaptor.forClass(SearchRequest.class);
        ((SerializableRestClient) Mockito.verify(this.spyClient)).search((SearchRequest) forClass.capture(), new Header[0]);
        SliceBuilder slice = ((SearchRequest) forClass.getValue()).source().slice();
        Assertions.assertThat(slice.getId()).isEqualTo(1);
        Assertions.assertThat(slice.getMax()).isEqualTo(2);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 2584512:
                if (implMethodName.equals("lambda$runProcessor$2fc9b8b5$1")) {
                    z = false;
                    break;
                }
                break;
            case 379889471:
                if (implMethodName.equals("lambda$runProcessor$e0edb72e$1")) {
                    z = true;
                    break;
                }
                break;
            case 904423046:
                if (implMethodName.equals("lambda$runProcessor$a3a95f53$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1363884308:
                if (implMethodName.equals("getSourceAsString")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/elastic/impl/ElasticSourcePTest") && serializedLambda.getImplMethodSignature().equals("()Lorg/elasticsearch/action/search/SearchRequest;")) {
                    return () -> {
                        return new SearchRequest(new String[]{"*"});
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/elastic/impl/ElasticSourcePTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    ElasticSourcePTest elasticSourcePTest = (ElasticSourcePTest) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return this.processor;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/elastic/impl/ElasticSourcePTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/elasticsearch/client/RestHighLevelClient;)Lorg/elasticsearch/client/RestHighLevelClient;")) {
                    RestHighLevelClient restHighLevelClient = (RestHighLevelClient) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return restHighLevelClient;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/elasticsearch/search/SearchHit") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getSourceAsString();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
