/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import com.codahale.metrics.Counter;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ClientScanner;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(value={LargeTests.class, ClientTests.class})
public class TestReplicasClient {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestReplicasClient.class);
    private static final Logger LOG = LoggerFactory.getLogger(TestReplicasClient.class);
    private static TableName TABLE_NAME;
    private Table table = null;
    private static final byte[] row;
    private static RegionInfo hriPrimary;
    private static RegionInfo hriSecondary;
    private static final HBaseTestingUtility HTU;
    private static final byte[] f;
    private static final int REFRESH_PERIOD = 1000;

    @BeforeClass
    public static void beforeClass() throws Exception {
        HTU.getConfiguration().setInt("hbase.regionserver.storefile.refresh.period", 1000);
        HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
        HTU.getConfiguration().setBoolean("hbase.client.metrics.enable", true);
        ConnectionUtils.setupMasterlessConnection((Configuration)HTU.getConfiguration());
        StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(1).numAlwaysStandByMasters(1).numMasters(1).build();
        HTU.startMiniCluster(option);
        TABLE_NAME = TableName.valueOf((String)TestReplicasClient.class.getSimpleName());
        HTableDescriptor hdt = HTU.createTableDescriptor(TABLE_NAME);
        hdt.addCoprocessor(SlowMeCopro.class.getName());
        HTU.createTable((TableDescriptor)hdt, (byte[][])new byte[][]{f}, null);
        try (RegionLocator locator = HTU.getConnection().getRegionLocator(TABLE_NAME);){
            hriPrimary = locator.getRegionLocation(row, false).getRegion();
        }
        hriSecondary = RegionReplicaUtil.getRegionInfoForReplica((RegionInfo)hriPrimary, (int)1);
        LOG.info("Master is going to be stopped");
        TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
        Configuration c = new Configuration(HTU.getConfiguration());
        c.setInt("hbase.client.retries.number", 1);
        LOG.info("Master has stopped");
    }

    @AfterClass
    public static void afterClass() throws Exception {
        HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
        HTU.shutdownMiniCluster();
    }

    @Before
    public void before() throws IOException {
        try {
            this.openRegion(hriPrimary);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            this.openRegion(hriSecondary);
        }
        catch (Exception exception) {
            // empty catch block
        }
        SlowMeCopro.slowDownNext.set(false);
        SlowMeCopro.sleepTime.set(0L);
        SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
        SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(0));
        this.table = HTU.getConnection().getTable(TABLE_NAME);
        try (ResultScanner scanner = this.table.getScanner(new Scan());){
            Result result;
            while ((result = scanner.next()) != null) {
                this.table.delete(new Delete(result.getRow()));
            }
        }
        this.flushRegion(hriPrimary);
        HTU.getConnection().clearRegionLocationCache();
        SlowMeCopro.primaryCountOfScan.set(0);
        SlowMeCopro.secondaryCountOfScan.set(0);
        SlowMeCopro.countOfNext.set(0);
    }

    @After
    public void after() throws IOException, KeeperException {
        SlowMeCopro.getPrimaryCdl().get().countDown();
        SlowMeCopro.getSecondaryCdl().get().countDown();
        try {
            this.closeRegion(hriSecondary);
        }
        catch (Exception exception) {
            // empty catch block
        }
        try {
            this.closeRegion(hriPrimary);
        }
        catch (Exception exception) {
            // empty catch block
        }
        if (this.table != null) {
            this.table.close();
        }
        HTU.getConnection().clearRegionLocationCache();
    }

    private HRegionServer getRS() {
        return HTU.getMiniHBaseCluster().getRegionServer(0);
    }

    private void openRegion(RegionInfo hri) throws Exception {
        try {
            if (this.isRegionOpened(hri)) {
                return;
            }
        }
        catch (Exception exception) {
            // empty catch block
        }
        AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest((ServerName)this.getRS().getServerName(), (RegionInfo)hri, null);
        AdminProtos.OpenRegionResponse responseOpen = this.getRS().getRSRpcServices().openRegion(null, orr);
        Assert.assertEquals((long)1L, (long)responseOpen.getOpeningStateCount());
        Assert.assertEquals((Object)AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED, (Object)responseOpen.getOpeningState(0));
        this.checkRegionIsOpened(hri);
    }

    private void closeRegion(RegionInfo hri) throws Exception {
        AdminProtos.CloseRegionRequest crr = ProtobufUtil.buildCloseRegionRequest((ServerName)this.getRS().getServerName(), (byte[])hri.getRegionName());
        AdminProtos.CloseRegionResponse responseClose = this.getRS().getRSRpcServices().closeRegion(null, crr);
        Assert.assertTrue((boolean)responseClose.getClosed());
        this.checkRegionIsClosed(hri.getEncodedName());
    }

    private void checkRegionIsOpened(RegionInfo hri) throws Exception {
        while (!this.getRS().getRegionsInTransitionInRS().isEmpty()) {
            Thread.sleep(1L);
        }
    }

    private boolean isRegionOpened(RegionInfo hri) throws Exception {
        return this.getRS().getRegionByEncodedName(hri.getEncodedName()).isAvailable();
    }

    private void checkRegionIsClosed(String encodedRegionName) throws Exception {
        while (!this.getRS().getRegionsInTransitionInRS().isEmpty()) {
            Thread.sleep(1L);
        }
        try {
            Assert.assertFalse((boolean)this.getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
        }
        catch (NotServingRegionException notServingRegionException) {
            // empty catch block
        }
    }

    private void flushRegion(RegionInfo regionInfo) throws IOException {
        TestRegionServerNoMaster.flushRegion(HTU, regionInfo);
    }

    @Test
    public void testUseRegionWithoutReplica() throws Exception {
        byte[] b1 = "testUseRegionWithoutReplica".getBytes();
        Get g = new Get(b1);
        Result r = this.table.get(g);
        Assert.assertFalse((boolean)r.isStale());
    }

    @Test
    public void testLocations() throws Exception {
        byte[] b1 = "testLocations".getBytes();
        ClusterConnection hc = (ClusterConnection)HTU.getAdmin().getConnection();
        hc.clearRegionLocationCache();
        RegionLocations rl = hc.locateRegion(this.table.getName(), b1, false, false);
        Assert.assertEquals((long)2L, (long)rl.size());
        rl = hc.locateRegion(this.table.getName(), b1, true, false);
        Assert.assertEquals((long)2L, (long)rl.size());
        hc.clearRegionLocationCache();
        rl = hc.locateRegion(this.table.getName(), b1, true, false);
        Assert.assertEquals((long)2L, (long)rl.size());
        rl = hc.locateRegion(this.table.getName(), b1, false, false);
        Assert.assertEquals((long)2L, (long)rl.size());
    }

    @Test
    public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
        byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
        Get g = new Get(b1);
        Result r = this.table.get(g);
        Assert.assertFalse((boolean)r.isStale());
    }

    @Test
    public void testGetNoResultStaleRegionWithReplica() throws Exception {
        byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
        this.openRegion(hriSecondary);
        SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
        Get g = new Get(b1);
        g.setConsistency(Consistency.TIMELINE);
        Result r = this.table.get(g);
        Assert.assertTrue((boolean)r.isStale());
    }

    @Test
    public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
        byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
        SlowMeCopro.sleepTime.set(2000L);
        Get g = new Get(b1);
        Result r = this.table.get(g);
        Assert.assertFalse((boolean)r.isStale());
    }

    @Test
    public void testFlushTable() throws Exception {
        this.flushRegion(hriPrimary);
        this.flushRegion(hriSecondary);
        Put p = new Put(row);
        p.addColumn(f, row, row);
        this.table.put(p);
        this.flushRegion(hriPrimary);
        this.flushRegion(hriSecondary);
    }

    @Test
    public void testFlushPrimary() throws Exception {
        this.flushRegion(hriPrimary);
        Put p = new Put(row);
        p.addColumn(f, row, row);
        this.table.put(p);
        this.flushRegion(hriPrimary);
    }

    @Test
    public void testFlushSecondary() throws Exception {
        this.flushRegion(hriSecondary);
        Put p = new Put(row);
        p.addColumn(f, row, row);
        this.table.put(p);
        this.flushRegion(hriSecondary);
    }

    @Test
    public void testUseRegionWithReplica() throws Exception {
        byte[] b1 = "testUseRegionWithReplica".getBytes();
        Put p = new Put(b1);
        p.addColumn(f, b1, b1);
        this.table.put(p);
        LOG.info("Put done");
        Get g = new Get(b1);
        Result r = this.table.get(g);
        Assert.assertFalse((boolean)r.isStale());
        Assert.assertFalse((boolean)r.getColumnCells(f, b1).isEmpty());
        LOG.info("get works and is not stale done");
        SlowMeCopro.sleepTime.set(2000L);
        g = new Get(b1);
        r = this.table.get(g);
        Assert.assertFalse((boolean)r.isStale());
        Assert.assertFalse((boolean)r.getColumnCells(f, b1).isEmpty());
        SlowMeCopro.sleepTime.set(0L);
        LOG.info("sleep and is not stale done");
        SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
        g = new Get(b1);
        g.setConsistency(Consistency.TIMELINE);
        r = this.table.get(g);
        Assert.assertTrue((boolean)r.isStale());
        Assert.assertTrue((boolean)r.getColumnCells(f, b1).isEmpty());
        SlowMeCopro.getPrimaryCdl().get().countDown();
        LOG.info("stale done");
        g = new Get(b1);
        g.setCheckExistenceOnly(true);
        r = this.table.get(g);
        Assert.assertFalse((boolean)r.isStale());
        Assert.assertTrue((boolean)r.getExists());
        LOG.info("exists not stale done");
        SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
        g = new Get(b1);
        g.setCheckExistenceOnly(true);
        g.setConsistency(Consistency.TIMELINE);
        r = this.table.get(g);
        Assert.assertTrue((boolean)r.isStale());
        Assert.assertFalse((String)"The secondary has stale data", (boolean)r.getExists());
        SlowMeCopro.getPrimaryCdl().get().countDown();
        LOG.info("exists stale before flush done");
        this.flushRegion(hriPrimary);
        this.flushRegion(hriSecondary);
        LOG.info("flush done");
        Thread.sleep(3000L);
        SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
        g = new Get(b1);
        g.setConsistency(Consistency.TIMELINE);
        r = this.table.get(g);
        Assert.assertTrue((boolean)r.isStale());
        Assert.assertFalse((boolean)r.isEmpty());
        SlowMeCopro.getPrimaryCdl().get().countDown();
        LOG.info("stale done");
        SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
        g = new Get(b1);
        g.setCheckExistenceOnly(true);
        g.setConsistency(Consistency.TIMELINE);
        r = this.table.get(g);
        Assert.assertTrue((boolean)r.isStale());
        Assert.assertTrue((boolean)r.getExists());
        SlowMeCopro.getPrimaryCdl().get().countDown();
        LOG.info("exists stale after flush done");
    }

    @Test
    public void testHedgedRead() throws Exception {
        byte[] b1 = "testHedgedRead".getBytes();
        Put p = new Put(b1);
        p.addColumn(f, b1, b1);
        this.table.put(p);
        LOG.info("Put done");
        Get g = new Get(b1);
        Result r = this.table.get(g);
        Assert.assertFalse((boolean)r.isStale());
        Assert.assertFalse((boolean)r.getColumnCells(f, b1).isEmpty());
        LOG.info("get works and is not stale done");
        ClusterConnection connection = (ClusterConnection)HTU.getConnection();
        Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps;
        Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin;
        hedgedReadOps.dec(hedgedReadOps.getCount());
        hedgedReadWin.dec(hedgedReadWin.getCount());
        int primaryCallTimeoutMicroSecond = connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond();
        SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond));
        SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
        g = new Get(b1);
        g.setConsistency(Consistency.TIMELINE);
        r = this.table.get(g);
        Assert.assertFalse((boolean)r.isStale());
        Assert.assertFalse((boolean)r.getColumnCells(f, b1).isEmpty());
        Assert.assertEquals((long)1L, (long)hedgedReadOps.getCount());
        Assert.assertEquals((long)0L, (long)hedgedReadWin.getCount());
        SlowMeCopro.sleepTime.set(0L);
        SlowMeCopro.getSecondaryCdl().get().countDown();
        LOG.info("hedged read occurred but not faster");
        SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
        g = new Get(b1);
        g.setConsistency(Consistency.TIMELINE);
        r = this.table.get(g);
        Assert.assertTrue((boolean)r.isStale());
        Assert.assertTrue((boolean)r.getColumnCells(f, b1).isEmpty());
        Assert.assertEquals((long)2L, (long)hedgedReadOps.getCount());
        Assert.assertEquals((long)1L, (long)hedgedReadWin.getCount());
        SlowMeCopro.getPrimaryCdl().get().countDown();
        LOG.info("hedged read occurred and faster");
    }

    @Test
    public void testScanWithReplicas() throws Exception {
        this.runMultipleScansOfOneType(false, false);
    }

    @Test
    public void testSmallScanWithReplicas() throws Exception {
        this.runMultipleScansOfOneType(false, true);
    }

    @Test
    public void testReverseScanWithReplicas() throws Exception {
        this.runMultipleScansOfOneType(true, false);
    }

    @Test
    public void testCancelOfScan() throws Exception {
        int numRows = 100;
        for (int i = 0; i < numRows; ++i) {
            byte[] b1 = Bytes.toBytes((String)("testUseRegionWithReplica" + i));
            Put p = new Put(b1);
            p.addColumn(f, b1, b1);
            this.table.put(p);
        }
        LOG.debug("PUT done");
        int caching = 20;
        byte[] start = Bytes.toBytes((String)"testUseRegionWithReplica0");
        this.flushRegion(hriPrimary);
        LOG.info("flush done");
        Thread.sleep(3000L);
        SlowMeCopro.slowDownNext.set(true);
        SlowMeCopro.countOfNext.set(0);
        SlowMeCopro.sleepTime.set(5000L);
        Scan scan = new Scan().withStartRow(start);
        scan.setCaching(caching);
        scan.setConsistency(Consistency.TIMELINE);
        ResultScanner scanner = this.table.getScanner(scan);
        Iterator iter = scanner.iterator();
        iter.next();
        Assert.assertTrue((boolean)((ClientScanner)scanner).isAnyRPCcancelled());
        SlowMeCopro.slowDownNext.set(false);
        SlowMeCopro.countOfNext.set(0);
    }

    @Test
    public void testScanOnSpecificReplica() throws Exception {
        Scan scan = new Scan().setReplicaId(1).setConsistency(Consistency.TIMELINE);
        try (ResultScanner scanner = this.table.getScanner(scan);){
            scanner.next();
        }
        Assert.assertTrue((SlowMeCopro.secondaryCountOfScan.get() > 0 ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)SlowMeCopro.primaryCountOfScan.get());
    }

    @Test
    public void testReverseScanOnSpecificReplica() throws Exception {
        Scan scan = new Scan().setReversed(true).setReplicaId(1).setConsistency(Consistency.TIMELINE);
        try (ResultScanner scanner = this.table.getScanner(scan);){
            scanner.next();
        }
        Assert.assertTrue((SlowMeCopro.secondaryCountOfScan.get() > 0 ? 1 : 0) != 0);
        Assert.assertEquals((long)0L, (long)SlowMeCopro.primaryCountOfScan.get());
    }

    private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
        int numRows = 100;
        int numCols = 10;
        for (int i = 0; i < numRows; ++i) {
            byte[] b1 = Bytes.toBytes((String)("testUseRegionWithReplica" + i));
            for (int col = 0; col < numCols; ++col) {
                Put p = new Put(b1);
                String qualifier = "qualifer" + col;
                KeyValue kv = new KeyValue(b1, f, qualifier.getBytes());
                p.add((Cell)kv);
                this.table.put(p);
            }
        }
        LOG.debug("PUT done");
        int caching = 20;
        long maxResultSize = Long.MAX_VALUE;
        byte[] start = reversed ? Bytes.toBytes((String)("testUseRegionWithReplica" + (numRows - 1))) : Bytes.toBytes((String)"testUseRegionWithReplica0");
        this.scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows, numCols, false, false);
        SlowMeCopro.sleepTime.set(5000L);
        this.scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, numRows, numCols, false, false);
        SlowMeCopro.sleepTime.set(0L);
        this.flushRegion(hriPrimary);
        LOG.info("flush done");
        Thread.sleep(3000L);
        SlowMeCopro.sleepTime.set(5000L);
        this.scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows, numCols, true, false);
        SlowMeCopro.sleepTime.set(0L);
        SlowMeCopro.slowDownNext.set(true);
        SlowMeCopro.countOfNext.set(0);
        this.scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows, numCols, true, true);
        SlowMeCopro.slowDownNext.set(false);
        SlowMeCopro.countOfNext.set(0);
        SlowMeCopro.sleepTime.set(5000L);
        this.scanWithReplicas(reversed, small, Consistency.STRONG, caching, maxResultSize, start, numRows, numCols, false, false);
        SlowMeCopro.sleepTime.set(0L);
        maxResultSize = 1L;
        SlowMeCopro.slowDownNext.set(true);
        SlowMeCopro.countOfNext.set(0);
        this.scanWithReplicas(reversed, small, Consistency.TIMELINE, caching, maxResultSize, start, numRows, numCols, true, true);
        maxResultSize = Long.MAX_VALUE;
        SlowMeCopro.slowDownNext.set(false);
        SlowMeCopro.countOfNext.set(0);
    }

    private void scanWithReplicas(boolean reversed, boolean small, Consistency consistency, int caching, long maxResultSize, byte[] startRow, int numRows, int numCols, boolean staleExpected, boolean slowNext) throws Exception {
        Scan scan = new Scan().withStartRow(startRow);
        scan.setCaching(caching);
        scan.setMaxResultSize(maxResultSize);
        scan.setReversed(reversed);
        scan.setSmall(small);
        scan.setConsistency(consistency);
        ResultScanner scanner = this.table.getScanner(scan);
        Iterator iter = scanner.iterator();
        HashMap<String, Boolean> map = new HashMap<String, Boolean>();
        int rowCount = 0;
        int cellCount = 0;
        int countOfStale = 0;
        while (iter.hasNext()) {
            ++rowCount;
            Result r = (Result)iter.next();
            String row = new String(r.getRow());
            if (map.containsKey(row)) {
                throw new Exception("Unexpected scan result. Repeated row " + Bytes.toString((byte[])r.getRow()));
            }
            map.put(row, true);
            cellCount += r.rawCells().length;
            if (!slowNext) {
                Assert.assertTrue((r.isStale() == staleExpected ? 1 : 0) != 0);
            }
            if (!r.isStale()) continue;
            ++countOfStale;
        }
        Assert.assertTrue((String)("Count of rows " + rowCount + " num rows expected " + numRows), (rowCount == numRows ? 1 : 0) != 0);
        Assert.assertTrue((String)("Count of cells: " + cellCount + " cells expected: " + numRows * numCols), (cellCount == numRows * numCols ? 1 : 0) != 0);
        if (slowNext) {
            LOG.debug("Count of Stale " + countOfStale);
            Assert.assertTrue((countOfStale > 1 ? 1 : 0) != 0);
            if (maxResultSize != Long.MAX_VALUE) {
                Assert.assertTrue((countOfStale <= numRows ? 1 : 0) != 0);
            } else {
                Assert.assertTrue((countOfStale < numRows ? 1 : 0) != 0);
            }
        }
    }

    static {
        row = TestReplicasClient.class.getName().getBytes();
        HTU = new HBaseTestingUtility();
        f = HConstants.CATALOG_FAMILY;
    }

    public static class SlowMeCopro
    implements RegionCoprocessor,
    RegionObserver {
        static final AtomicInteger primaryCountOfScan = new AtomicInteger(0);
        static final AtomicInteger secondaryCountOfScan = new AtomicInteger(0);
        static final AtomicLong sleepTime = new AtomicLong(0L);
        static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
        static final AtomicInteger countOfNext = new AtomicInteger(0);
        private static final AtomicReference<CountDownLatch> primaryCdl = new AtomicReference<CountDownLatch>(new CountDownLatch(0));
        private static final AtomicReference<CountDownLatch> secondaryCdl = new AtomicReference<CountDownLatch>(new CountDownLatch(0));

        public Optional<RegionObserver> getRegionObserver() {
            return Optional.of(this);
        }

        public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException {
            this.slowdownCode(e);
        }

        private void incrementScanCount(ObserverContext<RegionCoprocessorEnvironment> e) {
            LOG.info("==========scan {} ", (Object)((RegionCoprocessorEnvironment)e.getEnvironment()).getRegion().getRegionInfo().getReplicaId(), (Object)new Exception());
            if (((RegionCoprocessorEnvironment)e.getEnvironment()).getRegion().getRegionInfo().getReplicaId() == 0) {
                primaryCountOfScan.incrementAndGet();
            } else {
                secondaryCountOfScan.incrementAndGet();
            }
        }

        public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan) throws IOException {
            this.incrementScanCount(e);
            this.slowdownCode(e);
        }

        public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s, List<Result> results, int limit, boolean hasMore) throws IOException {
            this.incrementScanCount(e);
            if (slowDownNext.get() && countOfNext.incrementAndGet() == 2) {
                sleepTime.set(2000L);
                this.slowdownCode(e);
            }
            return true;
        }

        private void slowdownCode(ObserverContext<RegionCoprocessorEnvironment> e) {
            if (((RegionCoprocessorEnvironment)e.getEnvironment()).getRegion().getRegionInfo().getReplicaId() == 0) {
                LOG.info("We're the primary replicas.");
                CountDownLatch latch = SlowMeCopro.getPrimaryCdl().get();
                try {
                    if (sleepTime.get() > 0L) {
                        LOG.info("Sleeping for " + sleepTime.get() + " ms");
                        Thread.sleep(sleepTime.get());
                    } else if (latch.getCount() > 0L) {
                        LOG.info("Waiting for the counterCountDownLatch");
                        latch.await(2L, TimeUnit.MINUTES);
                        if (latch.getCount() > 0L) {
                            throw new RuntimeException("Can't wait more");
                        }
                    }
                }
                catch (InterruptedException e1) {
                    LOG.error(e1.toString(), (Throwable)e1);
                }
            } else {
                LOG.info("We're not the primary replicas.");
                CountDownLatch latch = SlowMeCopro.getSecondaryCdl().get();
                try {
                    if (latch.getCount() > 0L) {
                        LOG.info("Waiting for the secondary counterCountDownLatch");
                        latch.await(2L, TimeUnit.MINUTES);
                        if (latch.getCount() > 0L) {
                            throw new RuntimeException("Can't wait more");
                        }
                    }
                }
                catch (InterruptedException e1) {
                    LOG.error(e1.toString(), (Throwable)e1);
                }
            }
        }

        public static AtomicReference<CountDownLatch> getPrimaryCdl() {
            return primaryCdl;
        }

        public static AtomicReference<CountDownLatch> getSecondaryCdl() {
            return secondaryCdl;
        }
    }
}

