package org.apache.omid.tso;

import com.google.common.base.Charsets;
import java.io.IOException;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.omid.TestUtils;
import org.apache.omid.tso.LeaseManagement;
import org.apache.omid.tso.TSOStateManager;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/omid/tso/TestLeaseManager.class */
public class TestLeaseManager {
    private static final long DUMMY_EPOCH_1 = 1;
    private static final long DUMMY_EPOCH_2 = 2;
    private static final long DUMMY_EPOCH_3 = 3;
    private static final long DUMMY_LOW_WATERMARK_1 = 1;
    private static final long DUMMY_LOW_WATERMARK_2 = 2;
    private static final long DUMMY_LOW_WATERMARK_3 = 3;
    private static final String LEASE_MGR_ID_1 = "LM1";
    private static final String LEASE_MGR_ID_2 = "LM2";
    private static final String INSTANCE_ID_1 = "LM1#";
    private static final String INSTANCE_ID_2 = "LM2#";
    private static final Logger LOG = LoggerFactory.getLogger(TestLeaseManager.class);
    private static final long TEST_LEASE_PERIOD_IN_MS = 5000;
    private CuratorFramework zkClient;
    private TestingServer zkServer;

    @Mock
    private Panicker panicker;
    private PausableLeaseManager leaseManager1;
    private PausableLeaseManager leaseManager2;

    @BeforeClass
    public void beforeClass() throws Exception {
        LOG.info("Starting ZK Server");
        this.zkServer = TestUtils.provideTestingZKServer();
        LOG.info("ZK Server Started @ {}", this.zkServer.getConnectString());
        this.zkClient = TestUtils.provideConnectedZKClient("localhost:2181");
    }

    @AfterClass
    public void afterClass() throws Exception {
        this.zkClient.close();
        CloseableUtils.closeQuietly(this.zkServer);
        this.zkServer = null;
        LOG.info("ZK Server Stopped");
    }

    @Test(timeOut = 80000)
    public void testErrorInitializingTSOStateExitsTheTSO() throws Exception {
        Panicker panicker = (Panicker) Mockito.spy(new MockPanicker());
        TSOChannelHandler tSOChannelHandler = (TSOChannelHandler) Mockito.mock(TSOChannelHandler.class);
        TSOStateManager tSOStateManager = (TSOStateManager) Mockito.mock(TSOStateManager.class);
        Mockito.when(tSOStateManager.initialize()).thenThrow(new Throwable[]{new IOException()});
        this.leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1, tSOChannelHandler, tSOStateManager, TEST_LEASE_PERIOD_IN_MS, "/test0_tsolease", "/test0_currenttso", this.zkClient, panicker);
        this.leaseManager1.startService();
        Thread.sleep(10000L);
        ((Panicker) Mockito.verify(panicker, Mockito.timeout(2000).atLeastOnce())).panic(Matchers.anyString(), (Throwable) Matchers.any(IOException.class));
        this.leaseManager1.stopService();
    }

    @Test(timeOut = 80000)
    public void testLeaseHolderDoesNotChangeWhenPausedForALongTimeAndTheresNoOtherInstance() throws Exception {
        TSOChannelHandler tSOChannelHandler = (TSOChannelHandler) Mockito.mock(TSOChannelHandler.class);
        TSOStateManager tSOStateManager = (TSOStateManager) Mockito.mock(TSOStateManager.class);
        Mockito.when(tSOStateManager.initialize()).thenReturn(new TSOStateManager.TSOState(1L, 1L));
        this.leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1, tSOChannelHandler, tSOStateManager, TEST_LEASE_PERIOD_IN_MS, "/test1_tsolease", "/test1_currenttso", this.zkClient, this.panicker);
        this.leaseManager1.startService();
        Thread.sleep(10000L);
        checkLeaseHolder("/test1_tsolease", LEASE_MGR_ID_1);
        checkInstanceId("/test1_currenttso", "LM1#1");
        Assert.assertTrue(this.leaseManager1.stillInLeasePeriod());
        this.leaseManager1.pausedInTryToRenewLeasePeriod();
        Thread.sleep(10000L);
        checkLeaseHolder("/test1_tsolease", LEASE_MGR_ID_1);
        checkInstanceId("/test1_currenttso", "LM1#1");
        this.leaseManager1.resume();
        Thread.sleep(10000L);
        checkLeaseHolder("/test1_tsolease", LEASE_MGR_ID_1);
        checkInstanceId("/test1_currenttso", "LM1#1");
        Assert.assertFalse(this.leaseManager1.stillInLeasePeriod());
    }

    @Test(timeOut = 80000)
    public void testLeaseHolderDoesNotChangeWhenANewLeaseManagerIsUp() throws Exception {
        TSOChannelHandler tSOChannelHandler = (TSOChannelHandler) Mockito.mock(TSOChannelHandler.class);
        TSOStateManager tSOStateManager = (TSOStateManager) Mockito.mock(TSOStateManager.class);
        Mockito.when(tSOStateManager.initialize()).thenReturn(new TSOStateManager.TSOState(1L, 1L));
        this.leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1, tSOChannelHandler, tSOStateManager, TEST_LEASE_PERIOD_IN_MS, "/test2_tsolease", "/test2_currenttso", this.zkClient, this.panicker);
        this.leaseManager1.startService();
        Thread.sleep(10000L);
        checkLeaseHolder("/test2_tsolease", LEASE_MGR_ID_1);
        checkInstanceId("/test2_currenttso", "LM1#1");
        Assert.assertTrue(this.leaseManager1.stillInLeasePeriod());
        TSOChannelHandler tSOChannelHandler2 = (TSOChannelHandler) Mockito.mock(TSOChannelHandler.class);
        TSOStateManager tSOStateManager2 = (TSOStateManager) Mockito.mock(TSOStateManager.class);
        Mockito.when(tSOStateManager2.initialize()).thenReturn(new TSOStateManager.TSOState(2L, 2L));
        this.leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2, tSOChannelHandler2, tSOStateManager2, TEST_LEASE_PERIOD_IN_MS, "/test2_tsolease", "/test2_currenttso", this.zkClient, this.panicker);
        this.leaseManager2.startService();
        Thread.sleep(10000L);
        checkLeaseHolder("/test2_tsolease", LEASE_MGR_ID_1);
        checkInstanceId("/test2_currenttso", "LM1#1");
        Assert.assertTrue(this.leaseManager1.stillInLeasePeriod());
        Assert.assertFalse(this.leaseManager2.stillInLeasePeriod());
    }

    @Test(timeOut = 80000)
    public void testLeaseHolderChangesWhenActiveLeaseManagerIsPaused() throws Exception {
        TSOChannelHandler tSOChannelHandler = (TSOChannelHandler) Mockito.mock(TSOChannelHandler.class);
        TSOStateManager tSOStateManager = (TSOStateManager) Mockito.mock(TSOStateManager.class);
        Mockito.when(tSOStateManager.initialize()).thenReturn(new TSOStateManager.TSOState(1L, 1L));
        this.leaseManager1 = new PausableLeaseManager(LEASE_MGR_ID_1, tSOChannelHandler, tSOStateManager, TEST_LEASE_PERIOD_IN_MS, "/test3_tsolease", "/test3_currenttso", this.zkClient, this.panicker);
        this.leaseManager1.startService();
        Thread.sleep(10000L);
        checkLeaseHolder("/test3_tsolease", LEASE_MGR_ID_1);
        checkInstanceId("/test3_currenttso", "LM1#1");
        Assert.assertTrue(this.leaseManager1.stillInLeasePeriod());
        TSOChannelHandler tSOChannelHandler2 = (TSOChannelHandler) Mockito.mock(TSOChannelHandler.class);
        TSOStateManager tSOStateManager2 = (TSOStateManager) Mockito.mock(TSOStateManager.class);
        Mockito.when(tSOStateManager2.initialize()).thenReturn(new TSOStateManager.TSOState(2L, 2L));
        this.leaseManager2 = new PausableLeaseManager(LEASE_MGR_ID_2, tSOChannelHandler2, tSOStateManager2, TEST_LEASE_PERIOD_IN_MS, "/test3_tsolease", "/test3_currenttso", this.zkClient, this.panicker);
        this.leaseManager2.startService();
        this.leaseManager1.pausedInStillInLeasePeriod();
        Thread.sleep(10000L);
        checkLeaseHolder("/test3_tsolease", LEASE_MGR_ID_2);
        checkInstanceId("/test3_currenttso", "LM2#2");
        Assert.assertTrue(this.leaseManager2.stillInLeasePeriod());
        Mockito.when(tSOStateManager.initialize()).thenReturn(new TSOStateManager.TSOState(3L, 3L));
        this.leaseManager1.resume();
        Thread.sleep(10000L);
        checkLeaseHolder("/test3_tsolease", LEASE_MGR_ID_2);
        checkInstanceId("/test3_currenttso", "LM2#2");
        Assert.assertFalse(this.leaseManager1.stillInLeasePeriod());
        Assert.assertTrue(this.leaseManager2.stillInLeasePeriod());
        this.leaseManager2.pausedInTryToRenewLeasePeriod();
        Thread.sleep(10000L);
        checkLeaseHolder("/test3_tsolease", LEASE_MGR_ID_1);
        checkInstanceId("/test3_currenttso", "LM1#3");
        Assert.assertFalse(this.leaseManager2.stillInLeasePeriod());
        Assert.assertTrue(this.leaseManager1.stillInLeasePeriod());
        this.leaseManager2.resume();
        Thread.sleep(10000L);
        checkLeaseHolder("/test3_tsolease", LEASE_MGR_ID_1);
        checkInstanceId("/test3_currenttso", "LM1#3");
        Assert.assertFalse(this.leaseManager2.stillInLeasePeriod());
        Assert.assertTrue(this.leaseManager1.stillInLeasePeriod());
    }

    @Test(timeOut = 80000)
    public void testLeaseManagerPanicsWhenUnexpectedInfoIsFoundInCurrentTSOZnode() throws Exception {
        Panicker panicker = (Panicker) Mockito.spy(new MockPanicker());
        TSOStateManager tSOStateManager = (TSOStateManager) Mockito.mock(TSOStateManager.class);
        Mockito.when(tSOStateManager.initialize()).thenReturn(new TSOStateManager.TSOState(1L, 1L));
        PausableLeaseManager pausableLeaseManager = new PausableLeaseManager(LEASE_MGR_ID_1, (TSOChannelHandler) Mockito.mock(TSOChannelHandler.class), tSOStateManager, TEST_LEASE_PERIOD_IN_MS, "/test_wronginfo_tsolease", "/test_wronginfo_currenttso", this.zkClient, panicker);
        pausableLeaseManager.startService();
        Thread.sleep(10000L);
        pausableLeaseManager.pausedInTryToRenewLeasePeriod();
        this.zkClient.setData().forPath("/test_wronginfo_currenttso", "CorruptedData!!!".getBytes());
        Thread.sleep(10000L);
        pausableLeaseManager.resume();
        Thread.sleep(10000L);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(IllegalArgumentException.class);
        ((Panicker) Mockito.verify(panicker, Mockito.times(2))).panic(Matchers.anyString(), (Throwable) forClass.capture());
        Assert.assertTrue(forClass.getValue() != null);
        Assert.assertTrue(((IllegalArgumentException) forClass.getValue()).getMessage().contains("Incorrect TSO Info found"));
        Mockito.reset(new Panicker[]{panicker});
        this.zkClient.setData().forPath("/test_wronginfo_currenttso", "newTSO:12345#10000".getBytes());
        pausableLeaseManager.pausedInTryToRenewLeasePeriod();
        Thread.sleep(10000L);
        pausableLeaseManager.resume();
        Thread.sleep(10000L);
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(LeaseManagement.LeaseManagementException.class);
        ((Panicker) Mockito.verify(panicker, Mockito.times(2))).panic(Matchers.anyString(), (Throwable) forClass2.capture());
        Assert.assertTrue(forClass2.getValue() != null);
        Assert.assertTrue(((LeaseManagement.LeaseManagementException) forClass2.getValue()).getMessage().contains("Another TSO replica was found"));
    }

    @Test(timeOut = 1000)
    public void testNonHALeaseManager() throws Exception {
        VoidLeaseManager voidLeaseManager = new VoidLeaseManager((TSOChannelHandler) Mockito.mock(TSOChannelHandler.class), (TSOStateManager) Mockito.mock(TSOStateManager.class));
        voidLeaseManager.startService();
        Assert.assertTrue(voidLeaseManager.stillInLeasePeriod());
        voidLeaseManager.stopService();
    }

    private void checkLeaseHolder(String str, String str2) throws Exception {
        Assert.assertEquals(new String((byte[]) this.zkClient.getData().forPath(str), Charsets.UTF_8), str2);
    }

    private void checkInstanceId(String str, String str2) throws Exception {
        Assert.assertEquals(new String((byte[]) this.zkClient.getData().forPath(str), Charsets.UTF_8), str2);
    }
}
