package org.apache.omid.transaction;

import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.omid.TestUtils;
import org.apache.omid.transaction.Transaction;
import org.apache.omid.tso.LeaseManagement;
import org.apache.omid.tso.PausableLeaseManager;
import org.apache.omid.tso.TSOServer;
import org.apache.omid.tso.TSOServerConfig;
import org.apache.omid.tso.client.OmidClientConfiguration;
import org.apache.phoenix.thirdparty.com.google.common.base.Charsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"sharedHBase"})
/* loaded from: input_file:org/apache/omid/transaction/TestEndToEndScenariosWithHA.class */
public class TestEndToEndScenariosWithHA extends OmidTestBase {
    private static final int TEST_LEASE_PERIOD_MS = 5000;
    private static final String CURRENT_TSO_PATH = "/CURRENT_TSO_PATH";
    private static final String TSO_LEASE_PATH = "/TSO_LEASE_PATH";
    private static final String NAMESPACE = "omid";
    private static final Logger LOG = LoggerFactory.getLogger(TestEndToEndScenariosWithHA.class);
    private static final byte[] qualifier1 = Bytes.toBytes("test-q1");
    private static final byte[] qualifier2 = Bytes.toBytes("test-q2l");
    private static final byte[] row1 = Bytes.toBytes("row1");
    private static final byte[] row2 = Bytes.toBytes("row2");
    private static final byte[] initialData = Bytes.toBytes("testWrite-0");
    private static final byte[] data1_q1 = Bytes.toBytes("testWrite-1-q1");
    private static final byte[] data1_q2 = Bytes.toBytes("testWrite-1-q2");
    private static final byte[] data2_q1 = Bytes.toBytes("testWrite-2-q1");
    private static final byte[] data2_q2 = Bytes.toBytes("testWrite-2-q2");
    private static final int TSO1_PORT = 2223;
    private static final int TSO2_PORT = 4321;
    private CountDownLatch barrierTillTSOAddressPublication;
    private CuratorFramework zkClient;
    private TSOServer tso1;
    private TSOServer tso2;
    private PausableLeaseManager leaseManager1;
    private TransactionManager tm;

    @BeforeMethod(alwaysRun = true, timeOut = 30000)
    public void setup() throws Exception {
        String str = "localhost:" + hBaseUtils.getZkCluster().getClientPort();
        this.zkClient = provideInitializedZookeeperClient(str);
        this.barrierTillTSOAddressPublication = new CountDownLatch(1);
        final NodeCache nodeCache = new NodeCache(this.zkClient, CURRENT_TSO_PATH);
        nodeCache.getListenable().addListener(new NodeCacheListener() { // from class: org.apache.omid.transaction.TestEndToEndScenariosWithHA.1
            public void nodeChanged() throws Exception {
                if (new String(nodeCache.getCurrentData().getData(), Charsets.UTF_8).endsWith("#0")) {
                    TestEndToEndScenariosWithHA.this.barrierTillTSOAddressPublication.countDown();
                }
            }
        });
        nodeCache.start(true);
        TSOServerConfig tSOServerConfig = new TSOServerConfig();
        tSOServerConfig.setPort(TSO1_PORT);
        tSOServerConfig.setConflictMapSize(1000);
        tSOServerConfig.setLeaseModule(new TestHALeaseManagementModule(5000L, TSO_LEASE_PATH, CURRENT_TSO_PATH, str, NAMESPACE));
        Injector createInjector = Guice.createInjector(new Module[]{new TestTSOModule(hbaseConf, tSOServerConfig)});
        LOG.info("===================== Starting TSO 1 =====================");
        this.tso1 = (TSOServer) createInjector.getInstance(TSOServer.class);
        this.leaseManager1 = (PausableLeaseManager) createInjector.getInstance(LeaseManagement.class);
        this.tso1.startAsync();
        this.tso1.awaitRunning();
        TestUtils.waitForSocketListening("localhost", TSO1_PORT, 100);
        LOG.info("================ Finished loading TSO 1 ==================");
        TSOServerConfig tSOServerConfig2 = new TSOServerConfig();
        tSOServerConfig2.setPort(TSO2_PORT);
        tSOServerConfig2.setConflictMapSize(1000);
        tSOServerConfig2.setLeaseModule(new TestHALeaseManagementModule(5000L, TSO_LEASE_PATH, CURRENT_TSO_PATH, str, NAMESPACE));
        Injector createInjector2 = Guice.createInjector(new Module[]{new TestTSOModule(hbaseConf, tSOServerConfig2)});
        LOG.info("===================== Starting TSO 2 =====================");
        this.tso2 = (TSOServer) createInjector2.getInstance(TSOServer.class);
        createInjector2.getInstance(LeaseManagement.class);
        this.tso2.startAsync();
        this.tso2.awaitRunning();
        LOG.info("================ Finished loading TSO 2 ==================");
        this.barrierTillTSOAddressPublication.await();
        nodeCache.close();
        LOG.info("===================== Starting TM =====================");
        HBaseOmidClientConfiguration hBaseOmidClientConfiguration = new HBaseOmidClientConfiguration();
        hBaseOmidClientConfiguration.setConnectionType(OmidClientConfiguration.ConnType.HA);
        hBaseOmidClientConfiguration.setConnectionString(str);
        hBaseOmidClientConfiguration.getOmidClientConfiguration().setZkCurrentTsoPath(CURRENT_TSO_PATH);
        hBaseOmidClientConfiguration.getOmidClientConfiguration().setZkNamespace(NAMESPACE);
        hBaseOmidClientConfiguration.setHBaseConfiguration(hbaseConf);
        hbaseConf.setInt("hbase.client.retries.number", 3);
        this.tm = HBaseTransactionManager.builder(hBaseOmidClientConfiguration).build();
        LOG.info("===================== TM Started =========================");
    }

    /* JADX WARN: Type inference failed for: r2v3, types: [byte[], byte[][]] */
    @AfterMethod(alwaysRun = true, timeOut = 60000)
    public void cleanup() throws Exception {
        LOG.info("Cleanup");
        deleteTable(hBaseUtils.getHBaseAdmin(), TableName.valueOf("OMID_TIMESTAMP_TABLE"));
        hBaseUtils.createTable(TableName.valueOf("OMID_TIMESTAMP_TABLE"), (byte[][]) new byte[]{"MAX_TIMESTAMP_CF".getBytes()}, Integer.MAX_VALUE);
        this.tso1.stopAsync();
        this.tso1.awaitTerminated();
        TestUtils.waitForSocketNotListening("localhost", TSO1_PORT, 100);
        this.tso2.stopAsync();
        this.tso2.awaitTerminated();
        TestUtils.waitForSocketNotListening("localhost", TSO2_PORT, 100);
        this.zkClient.delete().forPath(TSO_LEASE_PATH);
        LOG.info("ZKPath {} deleted", TSO_LEASE_PATH);
        this.zkClient.delete().forPath(CURRENT_TSO_PATH);
        LOG.info("ZKPaths {} deleted", CURRENT_TSO_PATH);
        this.zkClient.close();
    }

    @Test(timeOut = 60000)
    public void testScenario1() throws Exception {
        TTable tTable = new TTable(connection, "test");
        Throwable th = null;
        try {
            HBaseTransaction begin = this.tm.begin();
            long epoch = begin.getEpoch();
            LOG.info("Starting Tx {} writing initial values for cells ({}) ", begin, Bytes.toString(initialData));
            Put put = new Put(row1);
            put.addColumn("data".getBytes(), qualifier1, initialData);
            tTable.put(begin, put);
            Put put2 = new Put(row2);
            put2.addColumn("data".getBytes(), qualifier2, initialData);
            tTable.put(begin, put2);
            this.tm.commit(begin);
            checkRowValues(tTable, initialData, initialData);
            HBaseTransaction begin2 = this.tm.begin();
            LOG.info("Starting Tx {} writing values for cells ({}, {}) ", new Object[]{begin2, Bytes.toString(data1_q1), Bytes.toString(data1_q2)});
            Put put3 = new Put(row1);
            put3.addColumn("data".getBytes(), qualifier1, data1_q1);
            tTable.put(begin2, put3);
            Put put4 = new Put(row2);
            put4.addColumn("data".getBytes(), qualifier2, data1_q2);
            tTable.put(begin2, put4);
            Transaction begin3 = this.tm.begin();
            LOG.info("Starting Interleaving Read Tx {} for checking cell values", Long.valueOf(begin3.getTransactionId()));
            LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
            LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
            LOG.info("++++++++++++++++++++ PAUSING TSO 1 +++++++++++++++++++");
            LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
            LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
            this.leaseManager1.pausedInStillInLeasePeriod();
            Get maxVersions = new Get(row1).setMaxVersions(1);
            maxVersions.addColumn("data".getBytes(), qualifier1);
            Result result = tTable.get(begin3, maxVersions);
            Assert.assertEquals(result.getValue("data".getBytes(), qualifier1), initialData, "Unexpected value for SI read R1Q1" + begin3 + ": " + Bytes.toString(result.getValue("data".getBytes(), qualifier1)));
            try {
                this.tm.commit(begin2);
                Assert.fail();
            } catch (RollbackException e) {
                LOG.info("Rollback cause for Tx {}: ", begin2, e.getCause());
                Assert.assertEquals(begin2.getStatus(), Transaction.Status.ROLLEDBACK);
                Assert.assertEquals(begin2.getEpoch(), epoch);
            }
            Result result2 = tTable.get(begin3, new Get(row2).setMaxVersions(1));
            Assert.assertEquals(result2.getValue("data".getBytes(), qualifier2), initialData, "Unexpected value for SI read R2Q2" + begin3 + ": " + Bytes.toString(result2.getValue("data".getBytes(), qualifier2)));
            this.tm.commit(begin3);
            Assert.assertEquals(begin3.getEpoch(), epoch);
            Assert.assertEquals(begin3.getStatus(), Transaction.Status.COMMITTED_RO);
            LOG.info("Wait till the client is informed about the connection parameters of the new TSO");
            TestUtils.waitForSocketListening("localhost", TSO2_PORT, 100);
            checkRowValues(tTable, initialData, initialData);
            this.leaseManager1.resume();
            if (tTable != null) {
                if (0 == 0) {
                    tTable.close();
                    return;
                }
                try {
                    tTable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (tTable != null) {
                if (0 != 0) {
                    try {
                        tTable.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    tTable.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeOut = 60000)
    public void testScenario2() throws Exception {
        TTable tTable = new TTable(connection, "test");
        Throwable th = null;
        try {
            HBaseTransaction begin = this.tm.begin();
            long epoch = begin.getEpoch();
            LOG.info("Starting Tx {} writing initial values for cells ({}) ", begin, Bytes.toString(initialData));
            Put put = new Put(row1);
            put.addColumn("data".getBytes(), qualifier1, initialData);
            tTable.put(begin, put);
            Put put2 = new Put(row2);
            put2.addColumn("data".getBytes(), qualifier2, initialData);
            tTable.put(begin, put2);
            this.tm.commit(begin);
            HBaseTransaction begin2 = this.tm.begin();
            LOG.info("Starting Tx {} writing values for cells ({}, {}) ", new Object[]{begin2, Bytes.toString(data1_q1), Bytes.toString(data1_q2)});
            Put put3 = new Put(row1);
            put3.addColumn("data".getBytes(), qualifier1, data1_q1);
            tTable.put(begin2, put3);
            Put put4 = new Put(row2);
            put4.addColumn("data".getBytes(), qualifier2, data1_q2);
            tTable.put(begin2, put4);
            LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
            LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
            LOG.info("++++++++++++++++++++ KILLING TSO 1 +++++++++++++++++++");
            LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
            LOG.info("++++++++++++++++++++++++++++++++++++++++++++++++++++++");
            this.tso1.stopAsync();
            this.tso1.awaitTerminated();
            TestUtils.waitForSocketNotListening("localhost", TSO1_PORT, 100);
            try {
                this.tm.commit(begin2);
                Assert.fail(String.format("%s should not commit. Initial epoch was: %d", begin2, Long.valueOf(epoch)));
            } catch (RollbackException e) {
                LOG.info("Rollback cause for Tx {}: ", begin2, e.getCause());
                Assert.assertEquals(begin2.getStatus(), Transaction.Status.ROLLEDBACK);
                Assert.assertEquals(begin2.getEpoch(), epoch);
            }
            LOG.info("Sleep some time till the client is informed aboutthe new TSO connection parameters and how can connect");
            TimeUnit.SECONDS.sleep(12L);
            HBaseTransaction begin3 = this.tm.begin();
            LOG.info("Starting Tx {} writing values for cells ({}, {}) ", new Object[]{begin3, Bytes.toString(data1_q1), Bytes.toString(data1_q2)});
            Result result = tTable.get(begin3, new Get(row1).setMaxVersions(1));
            Assert.assertEquals(result.getValue("data".getBytes(), qualifier1), initialData, "Unexpected value for SI read R1Q1" + begin3 + ": " + Bytes.toString(result.getValue("data".getBytes(), qualifier1)));
            Result result2 = tTable.get(begin3, new Get(row2).setMaxVersions(1));
            Assert.assertEquals(result2.getValue("data".getBytes(), qualifier2), initialData, "Unexpected value for SI read R1Q1" + begin3 + ": " + Bytes.toString(result2.getValue("data".getBytes(), qualifier2)));
            Put put5 = new Put(row1);
            put5.addColumn("data".getBytes(), qualifier1, data2_q1);
            tTable.put(begin3, put5);
            Put put6 = new Put(row2);
            put6.addColumn("data".getBytes(), qualifier2, data2_q2);
            tTable.put(begin3, put6);
            this.tm.commit(begin3);
            Assert.assertEquals(begin3.getStatus(), Transaction.Status.COMMITTED);
            Assert.assertTrue(begin3.getEpoch() > begin.getCommitTimestamp());
            checkRowValues(tTable, data2_q1, data2_q2);
            if (tTable != null) {
                if (0 == 0) {
                    tTable.close();
                    return;
                }
                try {
                    tTable.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (tTable != null) {
                if (0 != 0) {
                    try {
                        tTable.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    tTable.close();
                }
            }
            throw th3;
        }
    }

    private void checkRowValues(TTable tTable, byte[] bArr, byte[] bArr2) throws IOException, RollbackException {
        Transaction begin = this.tm.begin();
        LOG.info("Starting Read Tx {} for checking cell values", Long.valueOf(begin.getTransactionId()));
        Get maxVersions = new Get(row1).setMaxVersions(1);
        maxVersions.addColumn("data".getBytes(), qualifier1);
        Result result = tTable.get(begin, maxVersions);
        Assert.assertEquals(result.getValue("data".getBytes(), qualifier1), bArr, "Unexpected value for SI read R1Q1" + begin + ": " + Bytes.toString(result.getValue("data".getBytes(), qualifier1)));
        Result result2 = tTable.get(begin, new Get(row2).setMaxVersions(1));
        Assert.assertEquals(result2.getValue("data".getBytes(), qualifier2), bArr2, "Unexpected value for SI read R2Q2" + begin + ": " + Bytes.toString(result2.getValue("data".getBytes(), qualifier2)));
        this.tm.commit(begin);
    }

    private static CuratorFramework provideInitializedZookeeperClient(String str) throws Exception {
        LOG.info("Creating Zookeeper Client connecting to {}", str);
        CuratorFramework build = CuratorFrameworkFactory.builder().namespace(NAMESPACE).connectString(str).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
        LOG.info("Connecting to ZK cluster {}", build.getState());
        build.start();
        build.blockUntilConnected();
        LOG.info("Connection to ZK cluster {}", build.getState());
        return build;
    }
}
