package org.apache.druid.sql.calcite.schema;

import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.BrokerInternalQueryConfig;
import org.apache.druid.client.BrokerSegmentWatcherConfig;
import org.apache.druid.client.BrokerServerView;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.FilteredServerInventoryView;
import org.apache.druid.client.FilteringSegmentCallback;
import org.apache.druid.client.ServerView;
import org.apache.druid.client.TimelineServerView;
import org.apache.druid.client.selector.HighestPriorityTierSelectorStrategy;
import org.apache.druid.client.selector.RandomServerSelectorStrategy;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.NonnullPair;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.TableDataSource;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import org.apache.druid.segment.IndexBuilder;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.join.MapJoinableFactory;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.coordination.ChangeRequestHttpSyncer;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.druid.server.security.NoopEscalator;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.SpecificSegmentsQuerySegmentWalker;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/sql/calcite/schema/SegmentDataCacheConcurrencyTest.class */
public class SegmentDataCacheConcurrencyTest extends SegmentMetadataCacheCommon {
    private static final String DATASOURCE = "datasource";
    private File tmpDir;
    private SpecificSegmentsQuerySegmentWalker walker;
    private TestServerInventoryView inventoryView;
    private BrokerServerView serverView;
    private SegmentMetadataCache schema;
    private ExecutorService exec;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/sql/calcite/schema/SegmentDataCacheConcurrencyTest$TestServerInventoryView.class */
    public static class TestServerInventoryView implements FilteredServerInventoryView {
        private final Map<String, DruidServer> serverMap;
        private final Map<String, Set<DataSegment>> segmentsMap;
        private final List<NonnullPair<ServerView.SegmentCallback, Executor>> segmentCallbacks;
        private final List<NonnullPair<ServerView.ServerRemovedCallback, Executor>> serverRemovedCallbacks;

        private TestServerInventoryView() {
            this.serverMap = new HashMap();
            this.segmentsMap = new HashMap();
            this.segmentCallbacks = new ArrayList();
            this.serverRemovedCallbacks = new ArrayList();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void init() {
            this.segmentCallbacks.forEach(nonnullPair -> {
                Executor executor = (Executor) nonnullPair.rhs;
                ServerView.SegmentCallback segmentCallback = (ServerView.SegmentCallback) nonnullPair.lhs;
                Objects.requireNonNull(segmentCallback);
                executor.execute(segmentCallback::segmentViewInitialized);
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addServerSegment(DruidServer druidServer, DataSegment dataSegment) {
            this.serverMap.put(druidServer.getName(), druidServer);
            this.segmentsMap.computeIfAbsent(druidServer.getName(), str -> {
                return new HashSet();
            }).add(dataSegment);
            this.segmentCallbacks.forEach(nonnullPair -> {
                ((Executor) nonnullPair.rhs).execute(() -> {
                    ((ServerView.SegmentCallback) nonnullPair.lhs).segmentAdded(druidServer.getMetadata(), dataSegment);
                });
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void removeServerSegment(DruidServer druidServer, DataSegment dataSegment) {
            this.segmentsMap.computeIfAbsent(druidServer.getName(), str -> {
                return new HashSet();
            }).remove(dataSegment);
            this.segmentCallbacks.forEach(nonnullPair -> {
                ((Executor) nonnullPair.rhs).execute(() -> {
                    ((ServerView.SegmentCallback) nonnullPair.lhs).segmentRemoved(druidServer.getMetadata(), dataSegment);
                });
            });
        }

        private void removeServer(DruidServer druidServer) {
            this.serverMap.remove(druidServer.getName());
            this.segmentsMap.remove(druidServer.getName());
            this.serverRemovedCallbacks.forEach(nonnullPair -> {
                ((Executor) nonnullPair.rhs).execute(() -> {
                    ((ServerView.ServerRemovedCallback) nonnullPair.lhs).serverRemoved(druidServer);
                });
            });
        }

        @Override // org.apache.druid.client.FilteredServerInventoryView
        public void registerSegmentCallback(Executor executor, ServerView.SegmentCallback segmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>> predicate) {
            this.segmentCallbacks.add(new NonnullPair<>(new FilteringSegmentCallback(segmentCallback, predicate), executor));
        }

        @Override // org.apache.druid.client.FilteredServerInventoryView
        public void registerServerRemovedCallback(Executor executor, ServerView.ServerRemovedCallback serverRemovedCallback) {
            this.serverRemovedCallbacks.add(new NonnullPair<>(serverRemovedCallback, executor));
        }

        @Override // org.apache.druid.client.InventoryView
        @Nullable
        public DruidServer getInventoryValue(String str) {
            return this.serverMap.get(str);
        }

        @Override // org.apache.druid.client.InventoryView
        public Collection<DruidServer> getInventory() {
            return this.serverMap.values();
        }

        @Override // org.apache.druid.client.InventoryView
        public boolean isStarted() {
            return true;
        }

        @Override // org.apache.druid.client.InventoryView
        public boolean isSegmentLoadedByServer(String str, DataSegment dataSegment) {
            Set<DataSegment> set = this.segmentsMap.get(str);
            return set != null && set.contains(dataSegment);
        }
    }

    @Before
    public void setUp() throws Exception {
        this.tmpDir = this.temporaryFolder.newFolder();
        this.walker = new SpecificSegmentsQuerySegmentWalker(conglomerate);
        this.inventoryView = new TestServerInventoryView();
        this.serverView = newBrokerServerView(this.inventoryView);
        this.inventoryView.init();
        this.serverView.awaitInitialization();
        this.exec = Execs.multiThreaded(4, "DruidSchemaConcurrencyTest-%d");
    }

    @After
    public void tearDown() throws Exception {
        this.exec.shutdownNow();
        this.walker.close();
    }

    @Test(timeout = 30000)
    public void testSegmentMetadataRefreshAndInventoryViewAddSegmentAndBrokerServerViewGetTimeline() throws InterruptedException, ExecutionException, TimeoutException {
        this.schema = new SegmentMetadataCache(CalciteTests.createMockQueryLifecycleFactory(this.walker, conglomerate), this.serverView, this.segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new BrokerInternalQueryConfig(), new NoopServiceEmitter()) { // from class: org.apache.druid.sql.calcite.schema.SegmentDataCacheConcurrencyTest.1
            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.druid.sql.calcite.schema.SegmentMetadataCache
            public DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(String str) {
                doInLock(() -> {
                    try {
                        Thread.sleep(ChangeRequestHttpSyncer.HTTP_TIMEOUT_EXTRA_MS);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
                return super.buildDruidTable(str);
            }
        };
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        this.serverView.registerTimelineCallback(Execs.directExecutor(), new TimelineServerView.TimelineCallback() { // from class: org.apache.druid.sql.calcite.schema.SegmentDataCacheConcurrencyTest.2
            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction timelineInitialized() {
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction segmentAdded(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                countDownLatch.countDown();
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction segmentRemoved(DataSegment dataSegment) {
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction serverSegmentRemoved(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                return ServerView.CallbackAction.CONTINUE;
            }
        });
        addSegmentsToCluster(0, 19, 100);
        Assert.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        Future submit = this.exec.submit(() -> {
            this.schema.refresh((Set) this.walker.getSegments().stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toSet()), Sets.newHashSet("datasource"));
            return null;
        });
        addSegmentsToCluster(100, 19, 50);
        addReplicasToCluster(1, 19, 30);
        removeSegmentsFromCluster(19, 50);
        Assert.assertFalse(submit.isDone());
        for (int i = 0; i < 1000; i++) {
            Assert.assertTrue(((Boolean) this.exec.submit(() -> {
                return Boolean.valueOf(this.serverView.getTimeline(new TableDataSource("datasource").getAnalysis()).isPresent());
            }).get(100L, TimeUnit.MILLISECONDS)).booleanValue());
            Thread.sleep(2L);
        }
        submit.get(10L, TimeUnit.SECONDS);
    }

    @Test(timeout = 30000)
    public void testSegmentMetadataRefreshAndDruidSchemaGetSegmentMetadata() throws InterruptedException, ExecutionException, TimeoutException {
        this.schema = new SegmentMetadataCache(CalciteTests.createMockQueryLifecycleFactory(this.walker, conglomerate), this.serverView, this.segmentManager, new MapJoinableFactory(ImmutableSet.of(), ImmutableMap.of()), SEGMENT_CACHE_CONFIG_DEFAULT, new NoopEscalator(), new BrokerInternalQueryConfig(), new NoopServiceEmitter()) { // from class: org.apache.druid.sql.calcite.schema.SegmentDataCacheConcurrencyTest.3
            /* JADX INFO: Access modifiers changed from: package-private */
            @Override // org.apache.druid.sql.calcite.schema.SegmentMetadataCache
            public DatasourceTable.PhysicalDatasourceMetadata buildDruidTable(String str) {
                doInLock(() -> {
                    try {
                        Thread.sleep(ChangeRequestHttpSyncer.HTTP_TIMEOUT_EXTRA_MS);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
                return super.buildDruidTable(str);
            }
        };
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        this.serverView.registerTimelineCallback(Execs.directExecutor(), new TimelineServerView.TimelineCallback() { // from class: org.apache.druid.sql.calcite.schema.SegmentDataCacheConcurrencyTest.4
            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction timelineInitialized() {
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction segmentAdded(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                countDownLatch.countDown();
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction segmentRemoved(DataSegment dataSegment) {
                return ServerView.CallbackAction.CONTINUE;
            }

            @Override // org.apache.druid.client.TimelineServerView.TimelineCallback
            public ServerView.CallbackAction serverSegmentRemoved(DruidServerMetadata druidServerMetadata, DataSegment dataSegment) {
                return ServerView.CallbackAction.CONTINUE;
            }
        });
        addSegmentsToCluster(0, 19, 100);
        Assert.assertTrue(countDownLatch.await(5L, TimeUnit.SECONDS));
        Future submit = this.exec.submit(() -> {
            this.schema.refresh((Set) this.walker.getSegments().stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toSet()), Sets.newHashSet("datasource"));
            return null;
        });
        Assert.assertFalse(submit.isDone());
        for (int i = 0; i < 1000; i++) {
            Assert.assertFalse(((Map) this.exec.submit(() -> {
                return this.schema.getSegmentMetadataSnapshot();
            }).get(100L, TimeUnit.MILLISECONDS)).isEmpty());
            Thread.sleep(2L);
        }
        submit.get(10L, TimeUnit.SECONDS);
    }

    private void addSegmentsToCluster(int i, int i2, int i3) {
        for (int i4 = 0; i4 < i3; i4++) {
            DataSegment newSegment = newSegment(i4 + i);
            this.walker.add(newSegment, newQueryableIndex(i4 + i));
            this.inventoryView.addServerSegment(newServer("server_" + (i4 % i2)), newSegment);
        }
    }

    private void addReplicasToCluster(int i, int i2, int i3) {
        for (int i4 = 0; i4 < i3; i4++) {
            int i5 = (i4 % i2) + i;
            this.inventoryView.addServerSegment(newServer("server_" + (i5 < i2 ? i5 : i5 - i2)), newSegment(i4));
        }
    }

    private void removeSegmentsFromCluster(int i, int i2) {
        for (int i3 = 0; i3 < i2; i3++) {
            this.inventoryView.removeServerSegment(newServer("server_" + (i3 % i)), newSegment(i3));
        }
    }

    private static BrokerServerView newBrokerServerView(FilteredServerInventoryView filteredServerInventoryView) {
        return new BrokerServerView((QueryToolChestWarehouse) EasyMock.createMock(QueryToolChestWarehouse.class), (QueryWatcher) EasyMock.createMock(QueryWatcher.class), new DefaultObjectMapper(), (HttpClient) EasyMock.createMock(HttpClient.class), filteredServerInventoryView, new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()), new NoopServiceEmitter(), new BrokerSegmentWatcherConfig());
    }

    private static DruidServer newServer(String str) {
        return new DruidServer(str, "host:8083", "host:8283", 1000L, ServerType.HISTORICAL, "tier", 0);
    }

    private DataSegment newSegment(int i) {
        return new DataSegment("datasource", Intervals.of("2012/2013"), "version1", null, ImmutableList.of(), ImmutableList.of(), new NumberedShardSpec(i, 0), null, 1, 100L, DataSegment.PruneSpecsHolder.DEFAULT);
    }

    private QueryableIndex newQueryableIndex(int i) {
        return IndexBuilder.create().tmpDir(new File(this.tmpDir, "" + i)).segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance()).schema(new IncrementalIndexSchema.Builder().withMetrics(new CountAggregatorFactory("cnt"), new DoubleSumAggregatorFactory("m1", "m1")).withRollup(false).build()).rows(ROWS1).buildMMappedIndex();
    }
}
