package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.WAL;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({MediumTests.class})
@Ignore("Flaky, needs to be rewritten, see HBASE-19125")
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicator.class */
public class TestReplicator extends TestReplicationBase {
    static final Log LOG = LogFactory.getLog(TestReplicator.class);
    static final int NUM_ROWS = 10;

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicator$FailureInjectingReplicationEndpointForTest.class */
    public static class FailureInjectingReplicationEndpointForTest extends ReplicationEndpointForTest {

        /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicator$FailureInjectingReplicationEndpointForTest$FailureInjectingBlockingInterface.class */
        static class FailureInjectingBlockingInterface implements AdminProtos.AdminService.BlockingInterface {
            private final AdminProtos.AdminService.BlockingInterface delegate;
            private volatile boolean failNext;

            public FailureInjectingBlockingInterface(AdminProtos.AdminService.BlockingInterface blockingInterface) {
                this.delegate = blockingInterface;
            }

            public AdminProtos.GetRegionInfoResponse getRegionInfo(RpcController rpcController, AdminProtos.GetRegionInfoRequest getRegionInfoRequest) throws ServiceException {
                return this.delegate.getRegionInfo(rpcController, getRegionInfoRequest);
            }

            public AdminProtos.GetStoreFileResponse getStoreFile(RpcController rpcController, AdminProtos.GetStoreFileRequest getStoreFileRequest) throws ServiceException {
                return this.delegate.getStoreFile(rpcController, getStoreFileRequest);
            }

            public AdminProtos.GetOnlineRegionResponse getOnlineRegion(RpcController rpcController, AdminProtos.GetOnlineRegionRequest getOnlineRegionRequest) throws ServiceException {
                return this.delegate.getOnlineRegion(rpcController, getOnlineRegionRequest);
            }

            public AdminProtos.OpenRegionResponse openRegion(RpcController rpcController, AdminProtos.OpenRegionRequest openRegionRequest) throws ServiceException {
                return this.delegate.openRegion(rpcController, openRegionRequest);
            }

            public AdminProtos.WarmupRegionResponse warmupRegion(RpcController rpcController, AdminProtos.WarmupRegionRequest warmupRegionRequest) throws ServiceException {
                return this.delegate.warmupRegion(rpcController, warmupRegionRequest);
            }

            public AdminProtos.CloseRegionResponse closeRegion(RpcController rpcController, AdminProtos.CloseRegionRequest closeRegionRequest) throws ServiceException {
                return this.delegate.closeRegion(rpcController, closeRegionRequest);
            }

            public AdminProtos.FlushRegionResponse flushRegion(RpcController rpcController, AdminProtos.FlushRegionRequest flushRegionRequest) throws ServiceException {
                return this.delegate.flushRegion(rpcController, flushRegionRequest);
            }

            public AdminProtos.CompactRegionResponse compactRegion(RpcController rpcController, AdminProtos.CompactRegionRequest compactRegionRequest) throws ServiceException {
                return this.delegate.compactRegion(rpcController, compactRegionRequest);
            }

            public AdminProtos.ReplicateWALEntryResponse replicateWALEntry(RpcController rpcController, AdminProtos.ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException {
                if (this.failNext) {
                    this.failNext = false;
                    throw new ServiceException("Injected failure");
                }
                this.failNext = true;
                return this.delegate.replicateWALEntry(rpcController, replicateWALEntryRequest);
            }

            public AdminProtos.ReplicateWALEntryResponse replay(RpcController rpcController, AdminProtos.ReplicateWALEntryRequest replicateWALEntryRequest) throws ServiceException {
                return this.delegate.replay(rpcController, replicateWALEntryRequest);
            }

            public AdminProtos.RollWALWriterResponse rollWALWriter(RpcController rpcController, AdminProtos.RollWALWriterRequest rollWALWriterRequest) throws ServiceException {
                return this.delegate.rollWALWriter(rpcController, rollWALWriterRequest);
            }

            public AdminProtos.GetServerInfoResponse getServerInfo(RpcController rpcController, AdminProtos.GetServerInfoRequest getServerInfoRequest) throws ServiceException {
                return this.delegate.getServerInfo(rpcController, getServerInfoRequest);
            }

            public AdminProtos.StopServerResponse stopServer(RpcController rpcController, AdminProtos.StopServerRequest stopServerRequest) throws ServiceException {
                return this.delegate.stopServer(rpcController, stopServerRequest);
            }

            public AdminProtos.UpdateFavoredNodesResponse updateFavoredNodes(RpcController rpcController, AdminProtos.UpdateFavoredNodesRequest updateFavoredNodesRequest) throws ServiceException {
                return this.delegate.updateFavoredNodes(rpcController, updateFavoredNodesRequest);
            }

            public AdminProtos.UpdateConfigurationResponse updateConfiguration(RpcController rpcController, AdminProtos.UpdateConfigurationRequest updateConfigurationRequest) throws ServiceException {
                return this.delegate.updateConfiguration(rpcController, updateConfigurationRequest);
            }

            public AdminProtos.GetRegionLoadResponse getRegionLoad(RpcController rpcController, AdminProtos.GetRegionLoadRequest getRegionLoadRequest) throws ServiceException {
                return this.delegate.getRegionLoad(rpcController, getRegionLoadRequest);
            }

            public AdminProtos.ClearCompactionQueuesResponse clearCompactionQueues(RpcController rpcController, AdminProtos.ClearCompactionQueuesRequest clearCompactionQueuesRequest) throws ServiceException {
                return this.delegate.clearCompactionQueues(rpcController, clearCompactionQueuesRequest);
            }

            public QuotaProtos.GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController rpcController, QuotaProtos.GetSpaceQuotaSnapshotsRequest getSpaceQuotaSnapshotsRequest) throws ServiceException {
                return this.delegate.getSpaceQuotaSnapshots(rpcController, getSpaceQuotaSnapshotsRequest);
            }

            public AdminProtos.ExecuteProceduresResponse executeProcedures(RpcController rpcController, AdminProtos.ExecuteProceduresRequest executeProceduresRequest) throws ServiceException {
                return null;
            }
        }

        /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicator$FailureInjectingReplicationEndpointForTest$FailureInjectingReplicatorForTest.class */
        public class FailureInjectingReplicatorForTest extends ReplicationEndpointForTest.ReplicatorForTest {
            public FailureInjectingReplicatorForTest(List<WAL.Entry> list, int i) {
                super(list, i);
            }

            @Override // org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest.ReplicatorForTest
            protected void replicateEntries(AdminProtos.AdminService.BlockingInterface blockingInterface, List<WAL.Entry> list, String str, Path path, Path path2) throws IOException {
                super.replicateEntries(new FailureInjectingBlockingInterface(blockingInterface), list, str, path, path2);
            }
        }

        @Override // org.apache.hadoop.hbase.replication.regionserver.TestReplicator.ReplicationEndpointForTest
        protected HBaseInterClusterReplicationEndpoint.Replicator createReplicator(List<WAL.Entry> list, int i) {
            return new FailureInjectingReplicatorForTest(list, i);
        }
    }

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicator$ReplicationEndpointForTest.class */
    public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
        private static int batchCount;
        private static int entriesCount;
        private static final Object latch = new Object();
        private static AtomicBoolean useLatch = new AtomicBoolean(false);

        /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/TestReplicator$ReplicationEndpointForTest$ReplicatorForTest.class */
        public class ReplicatorForTest extends HBaseInterClusterReplicationEndpoint.Replicator {
            public ReplicatorForTest(List<WAL.Entry> list, int i) {
                super(ReplicationEndpointForTest.this, list, i);
            }

            protected void replicateEntries(AdminProtos.AdminService.BlockingInterface blockingInterface, List<WAL.Entry> list, String str, Path path, Path path2) throws IOException {
                try {
                    long j = 0;
                    for (WAL.Entry entry : list) {
                        j = j + entry.getKey().estimatedSerializedSizeOf() + entry.getEdit().estimatedSerializedSizeOf();
                    }
                    TestReplicator.LOG.info("Replicating batch " + System.identityHashCode(list) + " of " + list.size() + " entries with total size " + j + " bytes to " + str);
                    super.replicateEntries(blockingInterface, list, str, path, path2);
                    ReplicationEndpointForTest.entriesCount += list.size();
                    ReplicationEndpointForTest.access$108();
                    TestReplicator.LOG.info("Completed replicating batch " + System.identityHashCode(list));
                } catch (IOException e) {
                    TestReplicator.LOG.info("Failed to replicate batch " + System.identityHashCode(list), e);
                    throw e;
                }
            }

            public /* bridge */ /* synthetic */ Integer call() throws IOException {
                return super.call();
            }
        }

        public static void resume() {
            useLatch.set(false);
            synchronized (latch) {
                latch.notifyAll();
            }
        }

        public static void pause() {
            useLatch.set(true);
        }

        public static void await() throws InterruptedException {
            if (useLatch.get()) {
                TestReplicator.LOG.info("Waiting on latch");
                latch.wait();
                TestReplicator.LOG.info("Waited on latch, now proceeding");
            }
        }

        public static int getBatchCount() {
            return batchCount;
        }

        public static void setBatchCount(int i) {
            batchCount = i;
        }

        public static int getEntriesCount() {
            return entriesCount;
        }

        public static void setEntriesCount(int i) {
            entriesCount = i;
        }

        public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
            try {
                await();
            } catch (InterruptedException e) {
                TestReplicator.LOG.warn("Interrupted waiting for latch", e);
            }
            return super.replicate(replicateContext);
        }

        protected HBaseInterClusterReplicationEndpoint.Replicator createReplicator(List<WAL.Entry> list, int i) {
            return new ReplicatorForTest(list, i);
        }

        static /* synthetic */ int access$108() {
            int i = batchCount;
            batchCount = i + 1;
            return i;
        }
    }

    @BeforeClass
    public static void setUpBeforeClass() throws Exception {
        conf1.setInt("hbase.ipc.max.request.size", 10240);
        TestReplicationBase.setUpBeforeClass();
        admin.removePeer("2");
    }

    @Test
    public void testReplicatorBatching() throws Exception {
        truncateTable(utility1, tableName);
        truncateTable(utility2, tableName);
        admin.addPeer("testReplicatorBatching", new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()).setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), (Map) null);
        ReplicationEndpointForTest.setBatchCount(0);
        ReplicationEndpointForTest.setEntriesCount(0);
        try {
            ReplicationEndpointForTest.pause();
            try {
                byte[] bArr = new byte[8192];
                for (int i = 0; i < 10; i++) {
                    htable1.put(new Put(("row" + Integer.toString(i)).getBytes()).addColumn(famName, (byte[]) null, bArr));
                }
                ReplicationEndpointForTest.resume();
                Waiter.waitFor(conf1, 60000L, new Waiter.ExplainingPredicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestReplicator.1
                    public boolean evaluate() throws Exception {
                        return ReplicationEndpointForTest.getBatchCount() >= 10;
                    }

                    public String explainFailure() throws Exception {
                        return "We waited too long for expected replication of 10 entries";
                    }
                });
                Assert.assertEquals("We sent an incorrect number of batches", 10L, ReplicationEndpointForTest.getBatchCount());
                Assert.assertEquals("We did not replicate enough rows", 10L, utility2.countRows(htable2));
                admin.removePeer("testReplicatorBatching");
            } catch (Throwable th) {
                ReplicationEndpointForTest.resume();
                throw th;
            }
        } catch (Throwable th2) {
            admin.removePeer("testReplicatorBatching");
            throw th2;
        }
    }

    @Test
    public void testReplicatorWithErrors() throws Exception {
        truncateTable(utility1, tableName);
        truncateTable(utility2, tableName);
        admin.addPeer("testReplicatorWithErrors", new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey()).setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()), (Map) null);
        FailureInjectingReplicationEndpointForTest.setBatchCount(0);
        FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
        try {
            FailureInjectingReplicationEndpointForTest.pause();
            try {
                byte[] bArr = new byte[8192];
                for (int i = 0; i < 10; i++) {
                    htable1.put(new Put(("row" + Integer.toString(i)).getBytes()).addColumn(famName, (byte[]) null, bArr));
                }
                FailureInjectingReplicationEndpointForTest.resume();
                Waiter.waitFor(conf1, 60000L, new Waiter.ExplainingPredicate<Exception>() { // from class: org.apache.hadoop.hbase.replication.regionserver.TestReplicator.2
                    public boolean evaluate() throws Exception {
                        return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= 10;
                    }

                    public String explainFailure() throws Exception {
                        return "We waited too long for expected replication of 10 entries";
                    }
                });
                Assert.assertEquals("We did not replicate enough rows", 10L, utility2.countRows(htable2));
                admin.removePeer("testReplicatorWithErrors");
            } catch (Throwable th) {
                FailureInjectingReplicationEndpointForTest.resume();
                throw th;
            }
        } catch (Throwable th2) {
            admin.removePeer("testReplicatorWithErrors");
            throw th2;
        }
    }

    @AfterClass
    public static void tearDownAfterClass() throws Exception {
        TestReplicationBase.tearDownAfterClass();
    }

    private void truncateTable(HBaseTestingUtility hBaseTestingUtility, TableName tableName) throws IOException {
        HBaseAdmin hBaseAdmin = hBaseTestingUtility.getHBaseAdmin();
        hBaseAdmin.disableTable(tableName);
        hBaseAdmin.truncateTable(tableName, false);
    }
}
