package com.ververica.cdc.connectors.db2.table;

import com.ververica.cdc.connectors.db2.Db2TestBase;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/connectors/db2/table/Db2ConnectorITCase.class */
public class Db2ConnectorITCase extends Db2TestBase {
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create(this.env, EnvironmentSettings.newInstance().inStreamingMode().build());
    private static final Logger LOG = LoggerFactory.getLogger(Db2ConnectorITCase.class);

    @ClassRule
    public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.env.setParallelism(1);
    }

    @Test
    public void testConsumingAllEvents() throws SQLException, InterruptedException, ExecutionException {
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3)) WITH ( 'connector' = 'db2-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s')", DB2_CONTAINER.getHost(), DB2_CONTAINER.getMappedPort(50000), DB2_CONTAINER.getUsername(), DB2_CONTAINER.getPassword(), DB2_CONTAINER.getDatabaseName(), "DB2INST1", "PRODUCTS"));
        this.tEnv.executeSql("CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");
        waitForSnapshotStarted("sink");
        Connection jdbcConnection = getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106;");
                createStatement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.1' WHERE ID=107;");
                createStatement.execute("INSERT INTO DB2INST1.PRODUCTS VALUES (110,'jacket','water resistent white wind breaker',0.2);");
                createStatement.execute("INSERT INTO DB2INST1.PRODUCTS VALUES (111,'scooter','Big 2-wheel scooter ',5.18);");
                createStatement.execute("UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110;");
                createStatement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.17' WHERE ID=111;");
                createStatement.execute("DELETE FROM DB2INST1.PRODUCTS WHERE ID=111;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                waitForSinkSize("sink", 20);
                Assert.assertThat(TestValuesTableFactory.getResults("sink"), Matchers.containsInAnyOrder(new String[]{"scooter,3.140", "car battery,8.100", "12-pack drill bits,0.800", "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200"}));
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testAllTypes() throws Exception {
        this.tEnv.executeSql(String.format("CREATE TABLE full_types (\n    ID INTEGER NOT NULL,\n    SMALL_C SMALLINT,\n    INT_C INTEGER,\n    BIG_C BIGINT,\n    REAL_C FLOAT,\n    DOUBLE_C DOUBLE,\n    NUMERIC_C DECIMAL(10, 5),\n    DECIMAL_C DECIMAL(10, 1),\n    VARCHAR_C STRING,\n    CHAR_C STRING,\n    CHARACTER_C STRING,\n    TIMESTAMP_C TIMESTAMP(3),\n    DATE_C DATE,\n    TIME_C TIME(0),\n    DEFAULT_NUMERIC_C DECIMAL,\n    TIMESTAMP_PRECISION_C TIMESTAMP(9)\n) WITH ( 'connector' = 'db2-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s')", DB2_CONTAINER.getHost(), DB2_CONTAINER.getMappedPort(50000), DB2_CONTAINER.getUsername(), DB2_CONTAINER.getPassword(), DB2_CONTAINER.getDatabaseName(), "DB2INST1", "FULL_TYPES"));
        this.tEnv.executeSql("CREATE TABLE sink (\n    id INTEGER NOT NULL,\n    small_c SMALLINT,\n    int_c INTEGER,\n    big_c BIGINT,\n    real_c FLOAT,\n    double_c DOUBLE,\n    numeric_c DECIMAL(10, 5),\n    decimal_c DECIMAL(10, 1),\n    varchar_c STRING,\n    char_c STRING,\n    character_c STRING,\n    timestamp_c TIMESTAMP(3),\n    date_c DATE,\n    time_c TIME(0),\n    default_numeric_c DECIMAL,\n    timestamp_precision_c TIMESTAMP(9)\n) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM full_types");
        waitForSnapshotStarted("sink");
        Connection jdbcConnection = getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("UPDATE DB2INST1.FULL_TYPES SET SMALL_C=0 WHERE ID=1;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                waitForSinkSize("sink", 3);
                Assert.assertEquals(Arrays.asList("+I(1,32767,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)", "-U(1,32767,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)", "+U(1,0,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)"), TestValuesTableFactory.getRawResults("sink"));
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testStartupFromLatestOffset() throws Exception {
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3)) WITH ( 'connector' = 'db2-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s' , 'scan.startup.mode' = 'latest-offset')", DB2_CONTAINER.getHost(), DB2_CONTAINER.getMappedPort(50000), DB2_CONTAINER.getUsername(), DB2_CONTAINER.getPassword(), DB2_CONTAINER.getDatabaseName(), "DB2INST1", "PRODUCTS1"));
        this.tEnv.executeSql("CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        do {
            Thread.sleep(5000L);
        } while (((JobClient) executeSql.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
        Thread.sleep(30000L);
        LOG.info("Snapshot should end and start to read binlog.");
        Connection jdbcConnection = getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("INSERT INTO DB2INST1.PRODUCTS1 VALUES (default,'jacket','water resistent white wind breaker',0.2)");
                createStatement.execute("INSERT INTO DB2INST1.PRODUCTS1 VALUES (default,'scooter','Big 2-wheel scooter ',5.18)");
                createStatement.execute("UPDATE DB2INST1.PRODUCTS1 SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110");
                createStatement.execute("UPDATE DB2INST1.PRODUCTS1 SET WEIGHT='5.17' WHERE ID=111");
                createStatement.execute("DELETE FROM DB2INST1.PRODUCTS1 WHERE ID=111");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                waitForSinkSize("sink", 7);
                Assert.assertThat(TestValuesTableFactory.getResults("sink"), Matchers.containsInAnyOrder(new String[]{"110,jacket,new water resistent white wind breaker,0.500"}));
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testMetadataColumns() throws Throwable {
        this.tEnv.executeSql(String.format("CREATE TABLE debezium_source ( DB_NAME STRING METADATA FROM 'database_name' VIRTUAL, SCHEMA_NAME STRING METADATA FROM 'schema_name' VIRTUAL, TABLE_NAME STRING METADATA  FROM 'table_name' VIRTUAL, ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3), PRIMARY KEY (ID) NOT ENFORCED) WITH ( 'connector' = 'db2-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s')", DB2_CONTAINER.getHost(), DB2_CONTAINER.getMappedPort(50000), DB2_CONTAINER.getUsername(), DB2_CONTAINER.getPassword(), DB2_CONTAINER.getDatabaseName(), "DB2INST1", "PRODUCTS2"));
        this.tEnv.executeSql("CREATE TABLE sink ( database_name STRING, schema_name STRING, table_name STRING, id int, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        waitForSnapshotStarted("sink");
        Connection jdbcConnection = getJdbcConnection();
        try {
            Statement createStatement = jdbcConnection.createStatement();
            try {
                createStatement.execute("UPDATE DB2INST1.PRODUCTS2 SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106;");
                createStatement.execute("UPDATE DB2INST1.PRODUCTS2 SET WEIGHT='5.1' WHERE ID=107;");
                createStatement.execute("INSERT INTO DB2INST1.PRODUCTS2 VALUES (110,'jacket','water resistent white wind breaker',0.2);");
                createStatement.execute("INSERT INTO DB2INST1.PRODUCTS2 VALUES (111,'scooter','Big 2-wheel scooter ',5.18);");
                createStatement.execute("UPDATE DB2INST1.PRODUCTS2 SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110;");
                createStatement.execute("UPDATE DB2INST1.PRODUCTS2 SET WEIGHT='5.17' WHERE ID=111;");
                createStatement.execute("DELETE FROM DB2INST1.PRODUCTS2 WHERE ID=111;");
                if (createStatement != null) {
                    createStatement.close();
                }
                if (jdbcConnection != null) {
                    jdbcConnection.close();
                }
                waitForSinkSize("sink", 16);
                List asList = Arrays.asList("+I(testdb,DB2INST1,PRODUCTS2,101,scooter,Small 2-wheel scooter,3.140)", "+I(testdb,DB2INST1,PRODUCTS2,102,car battery,12V car battery,8.100)", "+I(testdb,DB2INST1,PRODUCTS2,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)", "+I(testdb,DB2INST1,PRODUCTS2,104,hammer,12oz carpenter's hammer,0.750)", "+I(testdb,DB2INST1,PRODUCTS2,105,hammer,14oz carpenter's hammer,0.875)", "+I(testdb,DB2INST1,PRODUCTS2,106,hammer,16oz carpenter's hammer,1.000)", "+I(testdb,DB2INST1,PRODUCTS2,107,rocks,box of assorted rocks,5.300)", "+I(testdb,DB2INST1,PRODUCTS2,108,jacket,water resistent black wind breaker,0.100)", "+I(testdb,DB2INST1,PRODUCTS2,109,spare tire,24 inch spare tire,22.200)", "+U(testdb,DB2INST1,PRODUCTS2,106,hammer,18oz carpenter hammer,1.000)", "+U(testdb,DB2INST1,PRODUCTS2,107,rocks,box of assorted rocks,5.100)", "+I(testdb,DB2INST1,PRODUCTS2,110,jacket,water resistent white wind breaker,0.200)", "+I(testdb,DB2INST1,PRODUCTS2,111,scooter,Big 2-wheel scooter ,5.180)", "+U(testdb,DB2INST1,PRODUCTS2,110,jacket,new water resistent white wind breaker,0.500)", "+U(testdb,DB2INST1,PRODUCTS2,111,scooter,Big 2-wheel scooter ,5.170)", "-D(testdb,DB2INST1,PRODUCTS2,111,scooter,Big 2-wheel scooter ,5.170)");
                List rawResults = TestValuesTableFactory.getRawResults("sink");
                Collections.sort(asList);
                Collections.sort(rawResults);
                Assert.assertEquals(asList, rawResults);
                ((JobClient) executeSql.getJobClient().get()).cancel().get();
            } finally {
            }
        } catch (Throwable th) {
            if (jdbcConnection != null) {
                try {
                    jdbcConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static void waitForSnapshotStarted(String str) throws InterruptedException {
        while (sinkSize(str) == 0) {
            Thread.sleep(1000L);
        }
    }

    private static void waitForSinkSize(String str, int i) throws InterruptedException {
        while (sinkSize(str) < i) {
            Thread.sleep(1000L);
        }
    }

    private static int sinkSize(String str) {
        int size;
        synchronized (TestValuesTableFactory.class) {
            try {
                size = TestValuesTableFactory.getRawResults(str).size();
            } catch (IllegalArgumentException e) {
                return 0;
            }
        }
        return size;
    }
}
