package io.trino.plugin.opa;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.Session;
import io.trino.plugin.blackhole.BlackHolePlugin;
import io.trino.plugin.opa.FunctionalHelpers;
import io.trino.spi.security.Identity;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.TestingSession;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Named;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:io/trino/plugin/opa/TestOpaAccessControlSystem.class */
public class TestOpaAccessControlSystem {
    private URI opaServerUri;
    private DistributedQueryRunner runner;
    private static final int OPA_PORT = 8181;

    @Container
    private static final GenericContainer<?> OPA_CONTAINER = new GenericContainer(DockerImageName.parse("openpolicyagent/opa:latest-rootless")).withCommand(new String[]{"run", "--server", "--addr", ":%d".formatted(Integer.valueOf(OPA_PORT))}).withExposedPorts(new Integer[]{Integer.valueOf(OPA_PORT)});

    @DisplayName("Batched Authorizer Tests")
    @TestInstance(TestInstance.Lifecycle.PER_CLASS)
    @Nested
    /* loaded from: input_file:io/trino/plugin/opa/TestOpaAccessControlSystem$BatchedAuthorizerTests.class */
    class BatchedAuthorizerTests {
        BatchedAuthorizerTests() {
        }

        @BeforeAll
        public void setupTrino() throws Exception {
            TestOpaAccessControlSystem.this.setupTrinoWithOpa("v1/data/trino/allow", Optional.of("v1/data/trino/batchAllow"));
        }

        @AfterAll
        public void teardown() {
            if (TestOpaAccessControlSystem.this.runner != null) {
                TestOpaAccessControlSystem.this.runner.close();
            }
        }

        @MethodSource({"io.trino.plugin.opa.TestOpaAccessControlSystem#filterSchemaTests"})
        @ParameterizedTest(name = "{index}: {0}")
        public void testFilterOutItemsBatch(String str, Set<String> set) throws IOException, InterruptedException {
            TestOpaAccessControlSystem.this.submitPolicy("package trino\nimport future.keywords.in\nimport future.keywords.if\ndefault allow = false\n\nallow if is_admin\n\nallow {\n    is_bob\n    input.action.operation in [\"AccessCatalog\", \"ExecuteQuery\", \"ImpersonateUser\", \"ShowSchemas\", \"SelectFromColumns\"]\n}\n\nis_bob {\n    input.context.identity.user == \"bob\"\n}\n\nis_admin {\n    input.context.identity.user == \"admin\"\n}\n\nbatchAllow[i] {\n    some i\n    is_bob\n    input.action.operation == \"FilterCatalogs\"\n    input.action.filterResources[i].catalog.name == \"catalog_one\"\n}\n\nbatchAllow[i] {\n    some i\n    input.action.filterResources[i]\n    is_admin\n}\n");
            Assertions.assertThat(TestOpaAccessControlSystem.this.querySetOfStrings(TestOpaAccessControlSystem.this.user(str), "SHOW CATALOGS")).containsExactlyInAnyOrderElementsOf(set);
        }

        @Test
        public void testDenyUnbatchedQuery() throws IOException, InterruptedException {
            TestOpaAccessControlSystem.this.submitPolicy("package trino\nimport future.keywords.in\ndefault allow = false\n");
            Assertions.assertThatThrownBy(() -> {
                TestOpaAccessControlSystem.this.runner.execute(TestOpaAccessControlSystem.this.user("bob"), "SELECT version()");
            }).isInstanceOf(RuntimeException.class).hasMessageContaining("Access Denied");
        }

        @Test
        public void testAllowUnbatchedQuery() throws IOException, InterruptedException {
            TestOpaAccessControlSystem.this.submitPolicy("package trino\nimport future.keywords.in\ndefault allow = false\nallow {\n    input.context.identity.user == \"bob\"\n    input.action.operation in [\"ImpersonateUser\", \"ExecuteFunction\", \"AccessCatalog\", \"ExecuteQuery\"]\n}\n");
            Assertions.assertThat(TestOpaAccessControlSystem.this.querySetOfStrings(TestOpaAccessControlSystem.this.user("bob"), "SELECT version()")).isNotEmpty();
        }
    }

    @DisplayName("Unbatched Authorizer Tests")
    @TestInstance(TestInstance.Lifecycle.PER_CLASS)
    @Nested
    /* loaded from: input_file:io/trino/plugin/opa/TestOpaAccessControlSystem$UnbatchedAuthorizerTests.class */
    class UnbatchedAuthorizerTests {
        UnbatchedAuthorizerTests() {
        }

        @BeforeAll
        public void setupTrino() throws Exception {
            TestOpaAccessControlSystem.this.setupTrinoWithOpa("v1/data/trino/allow", Optional.empty());
        }

        @AfterAll
        public void teardown() {
            if (TestOpaAccessControlSystem.this.runner != null) {
                TestOpaAccessControlSystem.this.runner.close();
            }
        }

        @MethodSource({"io.trino.plugin.opa.TestOpaAccessControlSystem#filterSchemaTests"})
        @ParameterizedTest(name = "{index}: {0}")
        public void testAllowsQueryAndFilters(String str, Set<String> set) throws IOException, InterruptedException {
            TestOpaAccessControlSystem.this.submitPolicy("package trino\nimport future.keywords.in\nimport future.keywords.if\n\ndefault allow = false\nallow {\n  is_bob\n  can_be_accessed_by_bob\n}\nallow if is_admin\n\nis_admin {\n  input.context.identity.user == \"admin\"\n}\nis_bob {\n  input.context.identity.user == \"bob\"\n}\ncan_be_accessed_by_bob {\n  input.action.operation in [\"ImpersonateUser\", \"ExecuteQuery\"]\n}\ncan_be_accessed_by_bob {\n  input.action.operation in [\"FilterCatalogs\", \"AccessCatalog\"]\n  input.action.resource.catalog.name == \"catalog_one\"\n}\n");
            Assertions.assertThat(TestOpaAccessControlSystem.this.querySetOfStrings(TestOpaAccessControlSystem.this.user(str), "SHOW CATALOGS")).containsExactlyInAnyOrderElementsOf(set);
        }

        @Test
        public void testShouldDenyQueryIfDirected() throws IOException, InterruptedException {
            TestOpaAccessControlSystem.this.submitPolicy("package trino\nimport future.keywords.in\ndefault allow = false\n\nallow {\n    input.context.identity.user in [\"someone\", \"admin\"]\n}\n");
            Assertions.assertThatThrownBy(() -> {
                TestOpaAccessControlSystem.this.runner.execute(TestOpaAccessControlSystem.this.user("bob"), "SHOW CATALOGS");
            }).isInstanceOf(RuntimeException.class).hasMessageContaining("Access Denied");
            TestOpaAccessControlSystem.this.runner.execute(TestOpaAccessControlSystem.this.user("admin"), "SHOW CATALOGS");
        }
    }

    private void ensureOpaUp() throws IOException, InterruptedException {
        Assertions.assertThat(OPA_CONTAINER.isRunning()).isTrue();
        InetSocketAddress inetSocketAddress = new InetSocketAddress(OPA_CONTAINER.getHost(), OPA_CONTAINER.getMappedPort(OPA_PORT).intValue());
        String format = String.format("%s:%d", inetSocketAddress.getHostString(), Integer.valueOf(inetSocketAddress.getPort()));
        awaitSocketOpen(inetSocketAddress, 100, 200);
        this.opaServerUri = URI.create(String.format("http://%s/", format));
    }

    private void setupTrinoWithOpa(String str, Optional<String> optional) throws Exception {
        ensureOpaUp();
        ImmutableMap.Builder builder = ImmutableMap.builder();
        builder.put("opa.policy.uri", this.opaServerUri.resolve(str).toString());
        optional.ifPresent(str2 -> {
            builder.put("opa.policy.batched-uri", this.opaServerUri.resolve(str2).toString());
        });
        this.runner = DistributedQueryRunner.builder(TestingSession.testSessionBuilder().build()).setSystemAccessControl(new OpaAccessControlFactory().create(builder.buildOrThrow())).setNodeCount(1).build();
        this.runner.installPlugin(new BlackHolePlugin());
        this.runner.createCatalog("catalog_one", "blackhole");
        this.runner.createCatalog("catalog_two", "blackhole");
    }

    private static void awaitSocketOpen(InetSocketAddress inetSocketAddress, int i, int i2) throws IOException, InterruptedException {
        for (int i3 = 0; i3 < i; i3++) {
            try {
                Socket socket = new Socket();
                try {
                    socket.connect(inetSocketAddress, i2);
                    socket.close();
                    return;
                } catch (Throwable th) {
                    try {
                        socket.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } catch (SocketTimeoutException e) {
            } catch (IOException e2) {
                Thread.sleep(i2);
            }
        }
        throw new SocketTimeoutException("Timed out waiting for addr %s to be available (%d attempts made with a %d ms wait)".formatted(inetSocketAddress, Integer.valueOf(i), Integer.valueOf(i2)));
    }

    private static String stringOfLines(String... strArr) {
        StringBuilder sb = new StringBuilder();
        for (String str : strArr) {
            sb.append(str);
            sb.append("\r\n");
        }
        return sb.toString();
    }

    private void submitPolicy(String... strArr) throws IOException, InterruptedException {
        HttpResponse send = HttpClient.newHttpClient().send(HttpRequest.newBuilder(this.opaServerUri.resolve("v1/policies/trino")).PUT(HttpRequest.BodyPublishers.ofString(stringOfLines(strArr))).header("Content-Type", "text/plain").build(), HttpResponse.BodyHandlers.ofString());
        Assertions.assertThat(send.statusCode()).withFailMessage("Failed to submit policy: %s", new Object[]{send.body()}).isEqualTo(200);
    }

    private Session user(String str) {
        return TestingSession.testSessionBuilder().setIdentity(Identity.ofUser(str)).build();
    }

    private Set<String> querySetOfStrings(Session session, String str) {
        return (Set) this.runner.execute(session, str).getMaterializedRows().stream().map(materializedRow -> {
            return materializedRow.getField(0).toString();
        }).collect(ImmutableSet.toImmutableSet());
    }

    private static Stream<Arguments> filterSchemaTests() {
        return Stream.of((Object[]) new FunctionalHelpers.Pair[]{FunctionalHelpers.Pair.of("bob", ImmutableSet.of("catalog_one")), FunctionalHelpers.Pair.of("admin", ImmutableSet.of("catalog_one", "catalog_two", "system"))}).map(pair -> {
            return Arguments.of(new Object[]{Named.of((String) pair.first(), (String) pair.first()), pair.second()});
        });
    }
}
