package org.apache.druid.indexing.seekablestream.supervisor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.MapperFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskRunner;
import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.segment.indexing.DataSchema;
import org.joda.time.DateTime;
import org.joda.time.ReadableDuration;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.class */
public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType> implements Supervisor {
    public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
    private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
    private static final long MINIMUM_FUTURE_TIMEOUT_IN_SECONDS = 120;
    private static final int MAX_INITIALIZATION_RETRIES = 20;
    private static final EmittingLogger log = new EmittingLogger(SeekableStreamSupervisor.class);
    protected final ObjectMapper sortingMapper;
    protected final SeekableStreamSupervisorStateManager stateManager;
    protected volatile DateTime sequenceLastUpdated;
    private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
    protected final String dataSource;
    private final TaskStorage taskStorage;
    private final TaskMaster taskMaster;
    private final SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType> taskClient;
    private final SeekableStreamSupervisorSpec spec;
    private final SeekableStreamSupervisorIOConfig ioConfig;
    private final SeekableStreamSupervisorTuningConfig tuningConfig;
    private final SeekableStreamIndexTaskTuningConfig taskTuningConfig;
    private final String supervisorId;
    private final TaskInfoProvider taskInfoProvider;
    private final long futureTimeoutInSeconds;
    private final RowIngestionMetersFactory rowIngestionMetersFactory;
    private final ExecutorService exec;
    private final ScheduledExecutorService scheduledExec;
    private final ScheduledExecutorService reportingExec;
    private final ListeningExecutorService workerExec;
    private final boolean useExclusiveStartingSequence;
    private long lastRunTime;
    private volatile DateTime firstRunTime;
    private volatile RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier;
    private final ConcurrentHashMap<Integer, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> activelyReadingTaskGroups = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Integer, CopyOnWriteArrayList<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup>> pendingCompletionTaskGroups = new ConcurrentHashMap<>();
    protected final ConcurrentHashMap<PartitionIdType, SequenceOffsetType> partitionOffsets = new ConcurrentHashMap<>();
    protected final ConcurrentHashMap<Integer, Set<PartitionIdType>> partitionGroups = new ConcurrentHashMap<>();
    protected final List<PartitionIdType> partitionIds = new CopyOnWriteArrayList();
    private final Set<PartitionIdType> subsequentlyDiscoveredPartitions = new HashSet();
    private final BlockingQueue<Notice> notices = new LinkedBlockingDeque();
    private final Object stopLock = new Object();
    private final Object stateChangeLock = new Object();
    private final Object recordSupplierLock = new Object();
    private boolean listenerRegistered = false;
    private int initRetryCounter = 0;
    private volatile DateTime earlyStopTime = null;
    private volatile boolean started = false;
    private volatile boolean stopped = false;
    private volatile boolean lifecycleStarted = false;

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor$CheckpointNotice.class */
    protected class CheckpointNotice implements Notice {
        private final int taskGroupId;
        private final SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> checkpointMetadata;

        CheckpointNotice(int i, SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> seekableStreamDataSourceMetadata) {
            this.taskGroupId = i;
            this.checkpointMetadata = seekableStreamDataSourceMetadata;
        }

        @Override // org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.Notice
        public void handle() throws ExecutionException, InterruptedException {
            SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup = (TaskGroup) SeekableStreamSupervisor.this.activelyReadingTaskGroups.get(Integer.valueOf(this.taskGroupId));
            if (isValidTaskGroup(this.taskGroupId, taskGroup)) {
                TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> treeMap = taskGroup.checkpointSequences;
                int size = treeMap.size();
                Iterator<Integer> it = treeMap.descendingKeySet().iterator();
                while (it.hasNext() && !treeMap.get(Integer.valueOf(it.next().intValue())).equals(this.checkpointMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap())) {
                    size--;
                }
                if (size == 0) {
                    throw new ISE("No such previous checkpoint [%s] found", new Object[]{this.checkpointMetadata});
                }
                if (size < treeMap.size()) {
                    Preconditions.checkState(size == treeMap.size() - 1, "checkpoint consistency failure");
                    SeekableStreamSupervisor.log.info("Already checkpointed with sequences [%s]", new Object[]{treeMap.lastEntry().getValue()});
                } else {
                    Map<PartitionIdType, SequenceOffsetType> map = (Map) SeekableStreamSupervisor.this.checkpointTaskGroup(taskGroup, false).get();
                    taskGroup.addNewCheckpoint(map);
                    SeekableStreamSupervisor.log.info("Handled checkpoint notice, new checkpoint is [%s] for taskGroup [%s]", new Object[]{map, Integer.valueOf(this.taskGroupId)});
                }
            }
        }

        boolean isValidTaskGroup(int i, @Nullable SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup) {
            if (taskGroup != null) {
                return true;
            }
            if (SeekableStreamSupervisor.this.pendingCompletionTaskGroups.containsKey(Integer.valueOf(i))) {
                SeekableStreamSupervisor.log.warn("Ignoring checkpoint request because taskGroup[%d] has already stopped indexing and is waiting for publishing segments", new Object[]{Integer.valueOf(i)});
                return false;
            }
            if (!SeekableStreamSupervisor.this.partitionGroups.containsKey(Integer.valueOf(i))) {
                throw new ISE("WTH?! cannot find taskGroup [%s] among all activelyReadingTaskGroups [%s]", new Object[]{Integer.valueOf(i), SeekableStreamSupervisor.this.activelyReadingTaskGroups});
            }
            SeekableStreamSupervisor.log.warn("Ignoring checkpoint request because taskGroup[%d] is inactive", new Object[]{Integer.valueOf(i)});
            return false;
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor$GracefulShutdownNotice.class */
    private class GracefulShutdownNotice extends SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.ShutdownNotice {
        private GracefulShutdownNotice() {
            super();
        }

        @Override // org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.ShutdownNotice, org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.Notice
        public void handle() throws InterruptedException, ExecutionException, TimeoutException {
            SeekableStreamSupervisor.this.gracefulShutdownInternal();
            super.handle();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor$Notice.class */
    public interface Notice {
        void handle() throws ExecutionException, InterruptedException, TimeoutException;
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor$ResetNotice.class */
    private class ResetNotice implements Notice {
        final DataSourceMetadata dataSourceMetadata;

        ResetNotice(DataSourceMetadata dataSourceMetadata) {
            this.dataSourceMetadata = dataSourceMetadata;
        }

        @Override // org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.Notice
        public void handle() {
            SeekableStreamSupervisor.this.resetInternal(this.dataSourceMetadata);
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor$RunNotice.class */
    private class RunNotice implements Notice {
        private RunNotice() {
        }

        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.access$202(org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        @Override // org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.Notice
        public void handle() {
            /*
                r5 = this;
                long r0 = java.lang.System.currentTimeMillis()
                r6 = r0
                r0 = r6
                r1 = r5
                org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor r1 = org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.this
                long r1 = org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.access$200(r1)
                long r0 = r0 - r1
                r1 = 1000(0x3e8, double:4.94E-321)
                int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                if (r0 >= 0) goto L15
                return
            L15:
                r0 = r5
                org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor r0 = org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.this
                r1 = r6
                long r0 = org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.access$202(r0, r1)
                r0 = r5
                org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor r0 = org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.this
                r0.runInternal()
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.RunNotice.handle():void");
        }
    }

    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor$ShutdownNotice.class */
    private class ShutdownNotice implements Notice {
        private ShutdownNotice() {
        }

        @Override // org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.Notice
        public void handle() throws InterruptedException, ExecutionException, TimeoutException {
            SeekableStreamSupervisor.this.recordSupplier.close();
            synchronized (SeekableStreamSupervisor.this.stopLock) {
                SeekableStreamSupervisor.this.stopped = true;
                SeekableStreamSupervisor.this.stopLock.notifyAll();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor$StatsFromTaskResult.class */
    public static class StatsFromTaskResult {
        private final String groupId;
        private final String taskId;
        private final Map<String, Object> stats;

        public StatsFromTaskResult(int i, String str, Map<String, Object> map) {
            this.groupId = String.valueOf(i);
            this.taskId = str;
            this.stats = map;
        }

        public String getGroupId() {
            return this.groupId;
        }

        public String getTaskId() {
            return this.taskId;
        }

        public Map<String, Object> getStats() {
            return this.stats;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor$TaskData.class */
    public class TaskData {
        volatile TaskStatus status;
        volatile DateTime startTime;
        volatile Map<PartitionIdType, SequenceOffsetType> currentSequences;

        private TaskData() {
            this.currentSequences = new HashMap();
        }

        public String toString() {
            return "TaskData{status=" + this.status + ", startTime=" + this.startTime + ", checkpointSequences=" + this.currentSequences + '}';
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor$TaskGroup.class */
    public class TaskGroup {
        final int groupId;
        final ImmutableMap<PartitionIdType, SequenceOffsetType> startingSequences;
        final ImmutableMap<PartitionIdType, SequenceOffsetType> unfilteredStartingSequencesForSequenceName;
        final ConcurrentHashMap<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData> tasks;
        final Optional<DateTime> minimumMessageTime;
        final Optional<DateTime> maximumMessageTime;
        final Set<PartitionIdType> exclusiveStartSequenceNumberPartitions;
        final TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> checkpointSequences;
        final String baseSequenceName;
        DateTime completionTimeout;

        TaskGroup(SeekableStreamSupervisor seekableStreamSupervisor, int i, @Nullable ImmutableMap<PartitionIdType, SequenceOffsetType> immutableMap, ImmutableMap<PartitionIdType, SequenceOffsetType> immutableMap2, Optional<DateTime> optional, @Nullable Optional<DateTime> optional2, Set<PartitionIdType> set) {
            this(i, immutableMap, immutableMap2, optional, optional2, set, seekableStreamSupervisor.generateSequenceName(immutableMap2 == null ? immutableMap : immutableMap2, optional, optional2, seekableStreamSupervisor.spec.getDataSchema(), seekableStreamSupervisor.taskTuningConfig));
        }

        TaskGroup(int i, ImmutableMap<PartitionIdType, SequenceOffsetType> immutableMap, @Nullable ImmutableMap<PartitionIdType, SequenceOffsetType> immutableMap2, Optional<DateTime> optional, Optional<DateTime> optional2, Set<PartitionIdType> set, String str) {
            this.tasks = new ConcurrentHashMap<>();
            this.checkpointSequences = new TreeMap<>();
            this.groupId = i;
            this.startingSequences = immutableMap;
            this.unfilteredStartingSequencesForSequenceName = immutableMap2 == null ? immutableMap : immutableMap2;
            this.minimumMessageTime = optional;
            this.maximumMessageTime = optional2;
            this.checkpointSequences.put(0, immutableMap);
            this.exclusiveStartSequenceNumberPartitions = set != null ? set : Collections.emptySet();
            this.baseSequenceName = str;
        }

        int addNewCheckpoint(Map<PartitionIdType, SequenceOffsetType> map) {
            this.checkpointSequences.put(Integer.valueOf(this.checkpointSequences.lastKey().intValue() + 1), map);
            return this.checkpointSequences.lastKey().intValue();
        }

        Set<String> taskIds() {
            return this.tasks.keySet();
        }
    }

    public SeekableStreamSupervisor(String str, final TaskStorage taskStorage, final TaskMaster taskMaster, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, SeekableStreamIndexTaskClientFactory<? extends SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType>> seekableStreamIndexTaskClientFactory, ObjectMapper objectMapper, SeekableStreamSupervisorSpec seekableStreamSupervisorSpec, RowIngestionMetersFactory rowIngestionMetersFactory, boolean z) {
        this.taskStorage = taskStorage;
        this.taskMaster = taskMaster;
        this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator;
        this.sortingMapper = objectMapper.copy().configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true);
        this.spec = seekableStreamSupervisorSpec;
        this.rowIngestionMetersFactory = rowIngestionMetersFactory;
        this.useExclusiveStartingSequence = z;
        this.dataSource = seekableStreamSupervisorSpec.getDataSchema().getDataSource();
        this.ioConfig = seekableStreamSupervisorSpec.getIoConfig();
        this.tuningConfig = seekableStreamSupervisorSpec.getTuningConfig();
        this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
        this.supervisorId = str;
        this.exec = Execs.singleThreaded(str);
        this.scheduledExec = Execs.scheduledSingleThreaded(str + "-Scheduler-%d");
        this.reportingExec = Execs.scheduledSingleThreaded(str + "-Reporting-%d");
        this.stateManager = new SeekableStreamSupervisorStateManager(seekableStreamSupervisorSpec.getSupervisorStateManagerConfig(), seekableStreamSupervisorSpec.isSuspended());
        int intValue = this.tuningConfig.getWorkerThreads() != null ? this.tuningConfig.getWorkerThreads().intValue() : Math.min(10, this.ioConfig.getTaskCount().intValue());
        this.workerExec = MoreExecutors.listeningDecorator(Execs.multiThreaded(intValue, str + "-Worker-%d"));
        log.info("Created worker pool with [%d] threads for dataSource [%s]", new Object[]{Integer.valueOf(intValue), this.dataSource});
        this.taskInfoProvider = new TaskInfoProvider() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.1
            @Override // org.apache.druid.indexing.common.TaskInfoProvider
            public TaskLocation getTaskLocation(String str2) {
                Preconditions.checkNotNull(str2, "id");
                Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
                if (taskRunner.isPresent()) {
                    Optional tryFind = Iterables.tryFind(((TaskRunner) taskRunner.get()).getRunningTasks(), taskRunnerWorkItem -> {
                        return str2.equals(taskRunnerWorkItem.getTaskId());
                    });
                    if (tryFind.isPresent()) {
                        return ((TaskRunnerWorkItem) tryFind.get()).getLocation();
                    }
                } else {
                    SeekableStreamSupervisor.log.error("Failed to get task runner because I'm not the leader!", new Object[0]);
                }
                return TaskLocation.unknown();
            }

            @Override // org.apache.druid.indexing.common.TaskInfoProvider
            public Optional<TaskStatus> getTaskStatus(String str2) {
                return taskStorage.getStatus(str2);
            }
        };
        this.futureTimeoutInSeconds = Math.max(MINIMUM_FUTURE_TIMEOUT_IN_SECONDS, this.tuningConfig.getChatRetries().longValue() * (this.tuningConfig.getHttpTimeout().getStandardSeconds() + 10));
        int intValue2 = this.tuningConfig.getChatThreads() != null ? this.tuningConfig.getChatThreads().intValue() : Math.min(10, this.ioConfig.getTaskCount().intValue() * this.ioConfig.getReplicas().intValue());
        this.taskClient = seekableStreamIndexTaskClientFactory.build(this.taskInfoProvider, this.dataSource, intValue2, this.tuningConfig.getHttpTimeout(), this.tuningConfig.getChatRetries().longValue());
        log.info("Created taskClient with dataSource[%s] chatThreads[%d] httpTimeout[%s] chatRetries[%d]", new Object[]{this.dataSource, Integer.valueOf(intValue2), this.tuningConfig.getHttpTimeout(), this.tuningConfig.getChatRetries()});
    }

    public void start() {
        synchronized (this.stateChangeLock) {
            Preconditions.checkState(!this.lifecycleStarted, "already started");
            Preconditions.checkState(!this.exec.isShutdown(), "already stopped");
            try {
                tryInit();
            } catch (Exception e) {
                if (!this.started) {
                    log.warn("First initialization attempt failed for SeekableStreamSupervisor[%s], starting retries...", new Object[]{this.dataSource});
                    this.exec.submit(() -> {
                        try {
                            RetryUtils.retry(() -> {
                                tryInit();
                                return 0;
                            }, th -> {
                                return !this.started;
                            }, 0, MAX_INITIALIZATION_RETRIES, (RetryUtils.CleanupAfterFailure) null, (String) null);
                        } catch (Exception e2) {
                            log.makeAlert("Failed to initialize after %s retries, aborting. Please resubmit the supervisor spec to restart this supervisor [%s]", new Object[]{Integer.valueOf(MAX_INITIALIZATION_RETRIES), this.supervisorId}).emit();
                            throw new RuntimeException(e2);
                        }
                    });
                }
            }
            this.lifecycleStarted = true;
        }
    }

    public void stop(boolean z) {
        synchronized (this.stateChangeLock) {
            Preconditions.checkState(this.lifecycleStarted, "lifecycle not started");
            log.info("Beginning shutdown of [%s]", new Object[]{this.supervisorId});
            this.stateManager.maybeSetState(SupervisorStateManager.BasicState.STOPPING);
            try {
                this.scheduledExec.shutdownNow();
                this.reportingExec.shutdownNow();
                if (this.started) {
                    Optional<TaskRunner> taskRunner = this.taskMaster.getTaskRunner();
                    if (taskRunner.isPresent()) {
                        ((TaskRunner) taskRunner.get()).unregisterListener(this.supervisorId);
                    }
                    synchronized (this.stopLock) {
                        if (z) {
                            log.info("Posting GracefulShutdownNotice, signalling managed tasks to complete and publish", new Object[0]);
                            this.notices.add(new GracefulShutdownNotice());
                        } else {
                            log.info("Posting ShutdownNotice", new Object[0]);
                            this.notices.add(new ShutdownNotice());
                        }
                        long millis = this.tuningConfig.getShutdownTimeout().getMillis();
                        long currentTimeMillis = System.currentTimeMillis() + millis;
                        while (true) {
                            if (this.stopped) {
                                break;
                            }
                            long currentTimeMillis2 = currentTimeMillis - System.currentTimeMillis();
                            if (currentTimeMillis2 <= 0) {
                                log.info("Timed out while waiting for shutdown (timeout [%,dms])", new Object[]{Long.valueOf(millis)});
                                this.stopped = true;
                                break;
                            }
                            this.stopLock.wait(currentTimeMillis2);
                        }
                    }
                    log.info("Shutdown notice handled", new Object[0]);
                }
                this.taskClient.close();
                this.workerExec.shutdownNow();
                this.exec.shutdownNow();
                this.started = false;
                log.info("[%s] has stopped", new Object[]{this.supervisorId});
            } catch (Exception e) {
                this.stateManager.recordThrowableEvent(e);
                log.makeAlert(e, "Exception stopping [%s]", new Object[]{this.supervisorId}).emit();
            }
        }
    }

    public void reset(DataSourceMetadata dataSourceMetadata) {
        log.info("Posting ResetNotice", new Object[0]);
        this.notices.add(new ResetNotice(dataSourceMetadata));
    }

    @VisibleForTesting
    public void tryInit() {
        synchronized (this.stateChangeLock) {
            if (this.started) {
                log.warn("Supervisor was already started, skipping init", new Object[0]);
                return;
            }
            if (this.stopped) {
                log.warn("Supervisor was already stopped, skipping init.", new Object[0]);
                return;
            }
            try {
                this.recordSupplier = setupRecordSupplier();
                this.exec.submit(() -> {
                    try {
                        long max = Math.max(this.ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS);
                        while (!Thread.currentThread().isInterrupted() && !this.stopped) {
                            Notice poll = this.notices.poll(max, TimeUnit.MILLISECONDS);
                            if (poll != null) {
                                try {
                                    poll.handle();
                                } catch (Throwable th) {
                                    this.stateManager.recordThrowableEvent(th);
                                    log.makeAlert(th, "SeekableStreamSupervisor[%s] failed to handle notice", new Object[]{this.dataSource}).addData("noticeClass", poll.getClass().getSimpleName()).emit();
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                        this.stateManager.recordThrowableEvent(e);
                        log.info("SeekableStreamSupervisor[%s] interrupted, exiting", new Object[]{this.dataSource});
                    }
                });
                this.firstRunTime = DateTimes.nowUtc().plus(this.ioConfig.getStartDelay());
                this.scheduledExec.scheduleAtFixedRate(buildRunTask(), this.ioConfig.getStartDelay().getMillis(), Math.max(this.ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS), TimeUnit.MILLISECONDS);
                scheduleReporting(this.reportingExec);
                this.started = true;
                log.info("Started SeekableStreamSupervisor[%s], first run in [%s], with spec: [%s]", new Object[]{this.dataSource, this.ioConfig.getStartDelay(), this.spec.toString()});
            } catch (Exception e) {
                this.stateManager.recordThrowableEvent(e);
                if (this.recordSupplier != null) {
                    this.recordSupplier.close();
                }
                this.initRetryCounter++;
                log.makeAlert(e, "Exception starting SeekableStreamSupervisor[%s]", new Object[]{this.dataSource}).emit();
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Runnable buildRunTask() {
        return () -> {
            this.notices.add(new RunNotice());
        };
    }

    public SupervisorReport getStatus() {
        return generateReport(true);
    }

    public SupervisorStateManager.State getState() {
        return this.stateManager.getSupervisorState();
    }

    public Boolean isHealthy() {
        return Boolean.valueOf(this.stateManager.isHealthy());
    }

    private SupervisorReport<? extends SeekableStreamSupervisorReportPayload<PartitionIdType, SequenceOffsetType>> generateReport(boolean z) {
        SeekableStreamSupervisorReportPayload<PartitionIdType, SequenceOffsetType> createReportPayload = createReportPayload(this.partitionGroups.values().stream().mapToInt((v0) -> {
            return v0.size();
        }).sum(), z);
        SupervisorReport<? extends SeekableStreamSupervisorReportPayload<PartitionIdType, SequenceOffsetType>> supervisorReport = new SupervisorReport<>(this.dataSource, DateTimes.nowUtc(), createReportPayload);
        ArrayList arrayList = new ArrayList();
        try {
            for (SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup : this.activelyReadingTaskGroups.values()) {
                for (Map.Entry<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData> entry : taskGroup.tasks.entrySet()) {
                    String key = entry.getKey();
                    DateTime dateTime = entry.getValue().startTime;
                    Map<PartitionIdType, SequenceOffsetType> map = entry.getValue().currentSequences;
                    arrayList.add(new TaskReportData(key, z ? taskGroup.startingSequences : null, z ? map : null, dateTime, dateTime != null ? Long.valueOf(TimeUnit.MILLISECONDS.toSeconds(Math.max(0L, this.ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis() - dateTime.getMillis())))) : null, TaskReportData.TaskType.ACTIVE, z ? getLagPerPartition(map) : null));
                }
            }
            Iterator<CopyOnWriteArrayList<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup>> it = this.pendingCompletionTaskGroups.values().iterator();
            while (it.hasNext()) {
                for (SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup2 : it.next()) {
                    for (Map.Entry<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData> entry2 : taskGroup2.tasks.entrySet()) {
                        String key2 = entry2.getKey();
                        DateTime dateTime2 = entry2.getValue().startTime;
                        Map<PartitionIdType, SequenceOffsetType> map2 = entry2.getValue().currentSequences;
                        arrayList.add(new TaskReportData(key2, z ? taskGroup2.startingSequences : null, z ? map2 : null, dateTime2, taskGroup2.completionTimeout != null ? Long.valueOf(Math.max(0L, taskGroup2.completionTimeout.getMillis() - System.currentTimeMillis()) / MAX_RUN_FREQUENCY_MILLIS) : null, TaskReportData.TaskType.PUBLISHING, null));
                    }
                }
            }
            createReportPayload.getClass();
            arrayList.forEach(createReportPayload::addTask);
        } catch (Exception e) {
            log.warn(e, "Failed to generate status report", new Object[0]);
        }
        return supervisorReport;
    }

    public Map<String, Map<String, Object>> getStats() {
        try {
            return getCurrentTotalStats();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error(e, "getStats() interrupted.", new Object[0]);
            throw new RuntimeException(e);
        } catch (ExecutionException | TimeoutException e2) {
            throw new RuntimeException(e2);
        }
    }

    private Map<String, Map<String, Object>> getCurrentTotalStats() throws InterruptedException, ExecutionException, TimeoutException {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator it = this.activelyReadingTaskGroups.keySet().iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            for (String str : this.activelyReadingTaskGroups.get(Integer.valueOf(intValue)).taskIds()) {
                arrayList.add(Futures.transform(this.taskClient.getMovingAveragesAsync(str), map -> {
                    return new StatsFromTaskResult(intValue, str, map);
                }));
                arrayList2.add(new Pair(Integer.valueOf(intValue), str));
            }
        }
        Iterator it2 = this.pendingCompletionTaskGroups.keySet().iterator();
        while (it2.hasNext()) {
            int intValue2 = ((Integer) it2.next()).intValue();
            Iterator<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> it3 = this.pendingCompletionTaskGroups.get(Integer.valueOf(intValue2)).iterator();
            while (it3.hasNext()) {
                for (String str2 : it3.next().taskIds()) {
                    arrayList.add(Futures.transform(this.taskClient.getMovingAveragesAsync(str2), map2 -> {
                        return new StatsFromTaskResult(intValue2, str2, map2);
                    }));
                    arrayList2.add(new Pair(Integer.valueOf(intValue2), str2));
                }
            }
        }
        List list = (List) Futures.successfulAsList(arrayList).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        for (int i = 0; i < list.size(); i++) {
            StatsFromTaskResult statsFromTaskResult = (StatsFromTaskResult) list.get(i);
            if (statsFromTaskResult != null) {
                ((Map) hashMap.computeIfAbsent(statsFromTaskResult.getGroupId(), str3 -> {
                    return new HashMap();
                })).put(statsFromTaskResult.getTaskId(), statsFromTaskResult.getStats());
            } else {
                Pair pair = (Pair) arrayList2.get(i);
                log.error("Failed to get stats for group[%d]-task[%s]", new Object[]{pair.lhs, pair.rhs});
            }
        }
        return hashMap;
    }

    @VisibleForTesting
    public void addTaskGroupToActivelyReadingTaskGroup(int i, ImmutableMap<PartitionIdType, SequenceOffsetType> immutableMap, Optional<DateTime> optional, Optional<DateTime> optional2, Set<String> set, Set<PartitionIdType> set2) {
        SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup = new TaskGroup(this, i, immutableMap, null, optional, optional2, set2);
        taskGroup.tasks.putAll((Map) set.stream().collect(Collectors.toMap(str -> {
            return str;
        }, str2 -> {
            return new TaskData();
        })));
        if (this.activelyReadingTaskGroups.putIfAbsent(Integer.valueOf(i), taskGroup) != null) {
            throw new ISE("trying to add taskGroup with RandomIdUtils [%s] to actively reading task groups, but group already exists.", new Object[]{Integer.valueOf(i)});
        }
    }

    @VisibleForTesting
    public void addTaskGroupToPendingCompletionTaskGroup(int i, ImmutableMap<PartitionIdType, SequenceOffsetType> immutableMap, Optional<DateTime> optional, Optional<DateTime> optional2, Set<String> set, Set<PartitionIdType> set2) {
        SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup = new TaskGroup(this, i, immutableMap, null, optional, optional2, set2);
        taskGroup.tasks.putAll((Map) set.stream().collect(Collectors.toMap(str -> {
            return str;
        }, str2 -> {
            return new TaskData();
        })));
        this.pendingCompletionTaskGroups.computeIfAbsent(Integer.valueOf(i), num -> {
            return new CopyOnWriteArrayList();
        }).add(taskGroup);
    }

    @VisibleForTesting
    public void runInternal() {
        try {
            possiblyRegisterListener();
            this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CONNECTING_TO_STREAM);
            if (updatePartitionDataFromStream() || this.stateManager.isAtLeastOneSuccessfulRun()) {
                this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.DISCOVERING_INITIAL_TASKS);
                discoverTasks();
                updateTaskStatus();
                checkTaskDuration();
                checkPendingCompletionTasks();
                checkCurrentTaskState();
                if (this.spec.isSuspended()) {
                    log.info("[%s] supervisor is suspended.", new Object[]{this.dataSource});
                    gracefulShutdownInternal();
                } else {
                    log.info("[%s] supervisor is running.", new Object[]{this.dataSource});
                    this.stateManager.maybeSetState(SeekableStreamSupervisorStateManager.SeekableStreamState.CREATING_TASKS);
                    createNewTasks();
                }
                if (log.isDebugEnabled()) {
                    log.debug(generateReport(true).toString(), new Object[0]);
                } else {
                    log.info(generateReport(false).toString(), new Object[0]);
                }
            }
        } catch (Exception e) {
            this.stateManager.recordThrowableEvent(e);
            log.warn(e, "Exception in supervisor run loop for dataSource [%s]", new Object[]{this.dataSource});
        } finally {
            this.stateManager.markRunFinished();
        }
    }

    private void possiblyRegisterListener() {
        if (this.listenerRegistered) {
            return;
        }
        Optional<TaskRunner> taskRunner = this.taskMaster.getTaskRunner();
        if (taskRunner.isPresent()) {
            ((TaskRunner) taskRunner.get()).registerListener(new TaskRunnerListener() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.2
                @Override // org.apache.druid.indexing.overlord.TaskRunnerListener
                public String getListenerId() {
                    return SeekableStreamSupervisor.this.supervisorId;
                }

                @Override // org.apache.druid.indexing.overlord.TaskRunnerListener
                public void locationChanged(String str, TaskLocation taskLocation) {
                }

                @Override // org.apache.druid.indexing.overlord.TaskRunnerListener
                public void statusChanged(String str, TaskStatus taskStatus) {
                    SeekableStreamSupervisor.this.notices.add(new RunNotice());
                }
            }, Execs.directExecutor());
            this.listenerRegistered = true;
        }
    }

    @VisibleForTesting
    public void gracefulShutdownInternal() throws ExecutionException, InterruptedException, TimeoutException {
        Iterator<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> it = this.activelyReadingTaskGroups.values().iterator();
        while (it.hasNext()) {
            for (Map.Entry<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData> entry : it.next().tasks.entrySet()) {
                if (this.taskInfoProvider.getTaskLocation(entry.getKey()).equals(TaskLocation.unknown())) {
                    killTask(entry.getKey(), "Killing task for graceful shutdown", new Object[0]);
                } else {
                    entry.getValue().startTime = DateTimes.EPOCH;
                }
            }
        }
        checkTaskDuration();
    }

    @VisibleForTesting
    public void resetInternal(DataSourceMetadata dataSourceMetadata) {
        boolean resetDataSourceMetadata;
        if (dataSourceMetadata == null) {
            log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", new Object[]{this.dataSource, Boolean.valueOf(this.indexerMetadataStorageCoordinator.deleteDataSourceMetadata(this.dataSource))});
            this.activelyReadingTaskGroups.values().forEach(taskGroup -> {
                killTasksInGroup(taskGroup, "DataSourceMetadata is not found while reset", new Object[0]);
            });
            this.activelyReadingTaskGroups.clear();
            this.partitionGroups.clear();
            this.partitionOffsets.clear();
            return;
        }
        if (!checkSourceMetadataMatch(dataSourceMetadata)) {
            throw new IAE("Datasource metadata instance does not match required, found instance of [%s]", new Object[]{dataSourceMetadata.getClass()});
        }
        log.info("Reset dataSource[%s] with metadata[%s]", new Object[]{this.dataSource, dataSourceMetadata});
        SeekableStreamDataSourceMetadata seekableStreamDataSourceMetadata = (SeekableStreamDataSourceMetadata) dataSourceMetadata;
        if (!seekableStreamDataSourceMetadata.getSeekableStreamSequenceNumbers().getStream().equals(this.ioConfig.getStream())) {
            log.warn("Reset metadata stream [%s] and supervisor's stream name [%s] do not match", new Object[]{seekableStreamDataSourceMetadata.getSeekableStreamSequenceNumbers().getStream(), this.ioConfig.getStream()});
            return;
        }
        DataSourceMetadata dataSourceMetadata2 = this.indexerMetadataStorageCoordinator.getDataSourceMetadata(this.dataSource);
        if (dataSourceMetadata2 != null && !checkSourceMetadataMatch(dataSourceMetadata2)) {
            throw new IAE("Datasource metadata instance does not match required, found instance of [%s]", new Object[]{dataSourceMetadata2.getClass()});
        }
        SeekableStreamDataSourceMetadata seekableStreamDataSourceMetadata2 = (SeekableStreamDataSourceMetadata) dataSourceMetadata2;
        boolean z = false;
        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : seekableStreamDataSourceMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().entrySet()) {
            SequenceOffsetType sequenceoffsettype = seekableStreamDataSourceMetadata2 == null ? null : seekableStreamDataSourceMetadata2.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().get(entry.getKey());
            SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup2 = this.activelyReadingTaskGroups.get(Integer.valueOf(getTaskGroupIdForPartition(entry.getKey())));
            boolean z2 = taskGroup2 != null && taskGroup2.startingSequences.get(entry.getKey()).equals(entry.getValue());
            if (sequenceoffsettype != null || z2) {
                z = true;
                break;
            }
        }
        if (!z) {
            log.info("Ignoring duplicate reset request [%s]", new Object[]{dataSourceMetadata});
            return;
        }
        if (seekableStreamDataSourceMetadata2 == null) {
            resetDataSourceMetadata = true;
        } else {
            try {
                resetDataSourceMetadata = this.indexerMetadataStorageCoordinator.resetDataSourceMetadata(this.dataSource, seekableStreamDataSourceMetadata2.minus(seekableStreamDataSourceMetadata));
            } catch (IOException e) {
                log.error("Resetting DataSourceMetadata failed [%s]", new Object[]{e.getMessage()});
                throw new RuntimeException(e);
            }
        }
        if (!resetDataSourceMetadata) {
            throw new ISE("Unable to reset metadata", new Object[0]);
        }
        seekableStreamDataSourceMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap().keySet().forEach(obj -> {
            int taskGroupIdForPartition = getTaskGroupIdForPartition(obj);
            killTaskGroupForPartitions(ImmutableSet.of(obj), "DataSourceMetadata is updated while reset", new Object[0]);
            this.activelyReadingTaskGroups.remove(Integer.valueOf(taskGroupIdForPartition));
            this.partitionGroups.computeIfAbsent(Integer.valueOf(taskGroupIdForPartition), num -> {
                return new HashSet();
            });
            this.partitionOffsets.put(obj, getNotSetMarker());
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void killTask(String str, String str2, Object... objArr) {
        Optional<TaskQueue> taskQueue = this.taskMaster.getTaskQueue();
        if (taskQueue.isPresent()) {
            ((TaskQueue) taskQueue.get()).shutdown(str, str2, objArr);
        } else {
            log.error("Failed to get task queue because I'm not the leader!", new Object[0]);
        }
    }

    private void killTaskWithSuccess(String str, String str2, Object... objArr) {
        Optional<TaskQueue> taskQueue = this.taskMaster.getTaskQueue();
        if (taskQueue.isPresent()) {
            ((TaskQueue) taskQueue.get()).shutdownWithSuccess(str, str2, objArr);
        } else {
            log.error("Failed to get task queue because I'm not the leader!", new Object[0]);
        }
    }

    private void killTasksInGroup(SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup, String str, Object... objArr) {
        if (taskGroup != null) {
            Iterator it = taskGroup.tasks.keySet().iterator();
            while (it.hasNext()) {
                killTask((String) it.next(), str, objArr);
            }
        }
    }

    private void killTaskGroupForPartitions(Set<PartitionIdType> set, String str, Object... objArr) {
        Iterator<PartitionIdType> it = set.iterator();
        while (it.hasNext()) {
            int taskGroupIdForPartition = getTaskGroupIdForPartition(it.next());
            killTasksInGroup(this.activelyReadingTaskGroups.get(Integer.valueOf(taskGroupIdForPartition)), str, objArr);
            this.partitionGroups.remove(Integer.valueOf(taskGroupIdForPartition));
            this.activelyReadingTaskGroups.remove(Integer.valueOf(taskGroupIdForPartition));
        }
    }

    private boolean isTaskInPendingCompletionGroups(String str) {
        Iterator<CopyOnWriteArrayList<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup>> it = this.pendingCompletionTaskGroups.values().iterator();
        while (it.hasNext()) {
            Iterator<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> it2 = it.next().iterator();
            while (it2.hasNext()) {
                if (it2.next().tasks.containsKey(str)) {
                    return true;
                }
            }
        }
        return false;
    }

    private void discoverTasks() throws ExecutionException, InterruptedException, TimeoutException {
        int i = 0;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        List<Task> activeTasksByDatasource = this.taskStorage.getActiveTasksByDatasource(this.dataSource);
        final HashMap hashMap = new HashMap();
        for (Task task : activeTasksByDatasource) {
            if (doesTaskTypeMatchSupervisor(task)) {
                i++;
                final SeekableStreamIndexTask seekableStreamIndexTask = (SeekableStreamIndexTask) task;
                final String id = task.getId();
                if (supportsPartitionExpiration()) {
                    Set<PartitionIdType> keySet = seekableStreamIndexTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet();
                    Sets.SetView difference = Sets.difference(keySet, new HashSet(this.partitionIds));
                    if (!difference.isEmpty()) {
                        killTaskWithSuccess(id, "Task [%s] with partition set [%s] has inactive partitions [%s], stopping task.", id, keySet, difference);
                    }
                }
                Iterator<PartitionIdType> it = seekableStreamIndexTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap().keySet().iterator();
                final Integer valueOf = it.hasNext() ? Integer.valueOf(getTaskGroupIdForPartition(it.next())) : null;
                if (valueOf != null) {
                    SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup = this.activelyReadingTaskGroups.get(valueOf);
                    if (!isTaskInPendingCompletionGroups(id) && (taskGroup == null || !taskGroup.tasks.containsKey(id))) {
                        arrayList.add(id);
                        arrayList2.add(Futures.transform(this.taskClient.getStatusAsync(id), new Function<SeekableStreamIndexTaskRunner.Status, Boolean>() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.3
                            /* JADX WARN: Code restructure failed: missing block: B:42:0x018c, code lost:
                            
                                org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.log.warn("Stopping task [%s] which does not match the expected partition allocation", new java.lang.Object[]{r5});
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:44:0x019f, code lost:
                            
                                r8.this$0.stopTask(r5, false).get(r8.this$0.futureTimeoutInSeconds, java.util.concurrent.TimeUnit.SECONDS);
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:48:0x01be, code lost:
                            
                                r12 = move-exception;
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:49:0x01c0, code lost:
                            
                                r8.this$0.stateManager.recordThrowableEvent(r12);
                                org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.log.warn(r12, "Exception while stopping task", new java.lang.Object[0]);
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:52:0x01f4, code lost:
                            
                                if (r8.this$0.isTaskCurrent(r7.intValue(), r5) != false) goto L46;
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:53:0x01f7, code lost:
                            
                                org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.log.info("Stopping task [%s] which does not match the expected parameters and ingestion spec", new java.lang.Object[]{r5});
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:55:0x020a, code lost:
                            
                                r8.this$0.stopTask(r5, false).get(r8.this$0.futureTimeoutInSeconds, java.util.concurrent.TimeUnit.SECONDS);
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:59:0x0229, code lost:
                            
                                r10 = move-exception;
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:60:0x022a, code lost:
                            
                                r8.this$0.stateManager.recordThrowableEvent(r10);
                                org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.log.warn(r10, "Exception while stopping task", new java.lang.Object[0]);
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:61:0x0247, code lost:
                            
                                r0 = r8.this$0.activelyReadingTaskGroups;
                                r1 = r7;
                                r3 = r7;
                                r4 = r6;
                                r0 = (org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.TaskGroup) r0.computeIfAbsent(r1, (v3) -> { // java.util.function.Function.apply(java.lang.Object):java.lang.Object
                                    return lambda$apply$1(r3, r4, v3);
                                });
                                r8.put(r7, r0);
                                r0 = r0.tasks.putIfAbsent(r5, new org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.TaskData(r8.this$0, null));
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:62:0x0292, code lost:
                            
                                if (r0 == null) goto L50;
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:64:0x02ad, code lost:
                            
                                throw new org.apache.druid.java.util.common.ISE("WTH? a taskGroup[%s] already exists for new task[%s]", new java.lang.Object[]{r0, r5});
                             */
                            /* JADX WARN: Code restructure failed: missing block: B:65:0x02ae, code lost:
                            
                                r8.this$0.verifySameSequenceNameForAllTasksInGroup(r7.intValue());
                             */
                            /*
                                Code decompiled incorrectly, please refer to instructions dump.
                                To view partially-correct add '--show-bad-code' argument
                            */
                            public java.lang.Boolean apply(org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status r9) {
                                /*
                                    Method dump skipped, instructions count: 739
                                    To view this dump add '--comments-level debug' option
                                */
                                throw new UnsupportedOperationException("Method not decompiled: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.AnonymousClass3.apply(org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner$Status):java.lang.Boolean");
                            }
                        }, this.workerExec));
                    }
                }
            }
        }
        List list = (List) Futures.successfulAsList(arrayList2).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        for (int i2 = 0; i2 < list.size(); i2++) {
            if (list.get(i2) == null) {
                String str = (String) arrayList.get(i2);
                killTask(str, "Task [%s] failed to return status, killing task", str);
            }
        }
        log.debug("Found [%d] seekablestream indexing tasks for dataSource [%s]", new Object[]{Integer.valueOf(i), this.dataSource});
        verifyAndMergeCheckpoints(hashMap.values());
    }

    private void verifyAndMergeCheckpoints(Collection<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> collection) {
        ArrayList arrayList = new ArrayList();
        for (SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup : collection) {
            arrayList.add(this.workerExec.submit(() -> {
                verifyAndMergeCheckpoints(taskGroup);
            }));
        }
        try {
            Futures.allAsList(arrayList).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new RuntimeException(e);
        }
    }

    private void verifyAndMergeCheckpoints(SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup) {
        int i = taskGroup.groupId;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        for (String str : taskGroup.taskIds()) {
            arrayList2.add(this.taskClient.getCheckpointsAsync(str, true));
            arrayList3.add(str);
        }
        try {
            List list = (List) Futures.successfulAsList(arrayList2).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
            for (int i2 = 0; i2 < list.size(); i2++) {
                TreeMap treeMap = (TreeMap) list.get(i2);
                String str2 = (String) arrayList3.get(i2);
                if (treeMap == null) {
                    try {
                        ((ListenableFuture) arrayList2.get(i2)).get();
                    } catch (Exception e) {
                        this.stateManager.recordThrowableEvent(e);
                        log.error(e, "Problem while getting checkpoints for task [%s], killing the task", new Object[]{str2});
                        killTask(str2, "Exception[%s] while getting checkpoints", e.getClass());
                        taskGroup.tasks.remove(str2);
                    }
                } else if (treeMap.isEmpty()) {
                    log.warn("Ignoring task [%s], as probably it is not started running yet", new Object[]{str2});
                } else {
                    arrayList.add(new Pair(str2, treeMap));
                }
            }
            DataSourceMetadata dataSourceMetadata = this.indexerMetadataStorageCoordinator.getDataSourceMetadata(this.dataSource);
            if (dataSourceMetadata != null && !checkSourceMetadataMatch(dataSourceMetadata)) {
                throw new IAE("Datasource metadata instance does not match required, found instance of [%s]", new Object[]{dataSourceMetadata.getClass()});
            }
            SeekableStreamDataSourceMetadata seekableStreamDataSourceMetadata = (SeekableStreamDataSourceMetadata) dataSourceMetadata;
            Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap = seekableStreamDataSourceMetadata != null && seekableStreamDataSourceMetadata.getSeekableStreamSequenceNumbers() != null && this.ioConfig.getStream().equals(seekableStreamDataSourceMetadata.getSeekableStreamSequenceNumbers().getStream()) ? seekableStreamDataSourceMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap() : null;
            arrayList.sort((pair, pair2) -> {
                return ((Integer) ((TreeMap) pair2.rhs).firstKey()).compareTo((Integer) ((TreeMap) pair.rhs).firstKey());
            });
            HashSet hashSet = new HashSet();
            AtomicInteger atomicInteger = new AtomicInteger(-1);
            for (int i3 = 0; i3 < arrayList.size(); i3++) {
                TreeMap treeMap2 = (TreeMap) ((Pair) arrayList.get(i3)).rhs;
                String str3 = (String) ((Pair) arrayList.get(i3)).lhs;
                if (atomicInteger.get() == -1) {
                    Map<PartitionIdType, SequenceOffsetType> map = partitionSequenceNumberMap;
                    if (treeMap2.entrySet().stream().anyMatch(entry -> {
                        return ((Map) entry.getValue()).entrySet().stream().allMatch(entry -> {
                            return makeSequenceNumber(entry.getValue()).compareTo(makeSequenceNumber(map == null ? entry.getValue() : map.getOrDefault(entry.getKey(), entry.getValue()))) == 0;
                        }) && atomicInteger.compareAndSet(-1, ((Integer) entry.getKey()).intValue());
                    }) || (this.pendingCompletionTaskGroups.getOrDefault(Integer.valueOf(i), new CopyOnWriteArrayList<>()).size() > 0 && atomicInteger.compareAndSet(-1, ((Integer) treeMap2.firstKey()).intValue()))) {
                        TreeMap treeMap3 = new TreeMap(treeMap2.tailMap(Integer.valueOf(atomicInteger.get())));
                        log.info("Setting taskGroup sequences to [%s] for group [%d]", new Object[]{treeMap3, Integer.valueOf(i)});
                        taskGroup.checkpointSequences.clear();
                        taskGroup.checkpointSequences.putAll(treeMap3);
                    } else {
                        log.debug("Adding task [%s] to kill list, checkpoints[%s], latestoffsets from DB [%s]", new Object[]{str3, treeMap2, partitionSequenceNumberMap});
                        hashSet.add(str3);
                    }
                } else if (treeMap2.get(taskGroup.checkpointSequences.firstKey()) == null || !((Map) treeMap2.get(taskGroup.checkpointSequences.firstKey())).equals(taskGroup.checkpointSequences.firstEntry().getValue()) || treeMap2.tailMap(taskGroup.checkpointSequences.firstKey()).size() != taskGroup.checkpointSequences.size()) {
                    log.debug("Adding task [%s] to kill list, checkpoints[%s], taskgroup checkpoints [%s]", new Object[]{str3, treeMap2, taskGroup.checkpointSequences});
                    hashSet.add(str3);
                }
            }
            if ((hashSet.size() > 0 && hashSet.size() == taskGroup.tasks.size()) || (taskGroup.tasks.size() == 0 && this.pendingCompletionTaskGroups.getOrDefault(Integer.valueOf(i), new CopyOnWriteArrayList<>()).size() == 0)) {
                log.warn("Clearing task group [%d] information as no valid tasks left the group", new Object[]{Integer.valueOf(i)});
                this.activelyReadingTaskGroups.remove(Integer.valueOf(i));
                UnmodifiableIterator it = taskGroup.startingSequences.keySet().iterator();
                while (it.hasNext()) {
                    this.partitionOffsets.put(it.next(), getNotSetMarker());
                }
            }
            Map<PartitionIdType, SequenceOffsetType> map2 = partitionSequenceNumberMap;
            arrayList.stream().filter(pair3 -> {
                return hashSet.contains(pair3.lhs);
            }).forEach(pair4 -> {
                killTask((String) pair4.lhs, "Killing task [%s], as its checkpoints [%s] are not consistent with group checkpoints[%s] or latest persisted sequences in metadata store [%s]", pair4.lhs, pair4.rhs, taskGroup.checkpointSequences, map2);
                taskGroup.tasks.remove(pair4.lhs);
            });
        } catch (Exception e2) {
            throw new RuntimeException(e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addDiscoveredTaskToPendingCompletionTaskGroups(int i, String str, Map<PartitionIdType, SequenceOffsetType> map) {
        CopyOnWriteArrayList<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> computeIfAbsent = this.pendingCompletionTaskGroups.computeIfAbsent(Integer.valueOf(i), num -> {
            return new CopyOnWriteArrayList();
        });
        Iterator<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> it = computeIfAbsent.iterator();
        while (it.hasNext()) {
            SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup next = it.next();
            if (next.startingSequences.equals(map)) {
                if (next.tasks.putIfAbsent(str, new TaskData()) == null) {
                    log.info("Added discovered task [%s] to existing pending task group [%s]", new Object[]{str, Integer.valueOf(i)});
                    return;
                }
                return;
            }
        }
        log.info("Creating new pending completion task group [%s] for discovered task [%s]", new Object[]{Integer.valueOf(i), str});
        SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup = new TaskGroup(this, i, ImmutableMap.copyOf(map), null, Optional.absent(), Optional.absent(), null);
        taskGroup.tasks.put(str, new TaskData());
        taskGroup.completionTimeout = DateTimes.nowUtc().plus(this.ioConfig.getCompletionTimeout());
        computeIfAbsent.add(taskGroup);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void verifySameSequenceNameForAllTasksInGroup(int i) {
        String str = this.activelyReadingTaskGroups.get(Integer.valueOf(i)).baseSequenceName;
        if (!this.activelyReadingTaskGroups.get(Integer.valueOf(i)).tasks.keySet().stream().map(str2 -> {
            Optional<Task> task = this.taskStorage.getTask(str2);
            if (task.isPresent() && doesTaskTypeMatchSupervisor((Task) task.get())) {
                return ((SeekableStreamIndexTask) task.get()).getIOConfig().getBaseSequenceName();
            }
            return false;
        }).allMatch(serializable -> {
            return serializable.equals(str);
        })) {
            throw new ISE("Base sequence names do not match for the tasks in the task group with ID [%s]", new Object[]{Integer.valueOf(i)});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ListenableFuture<Void> stopTask(final String str, boolean z) {
        return Futures.transform(this.taskClient.stopAsync(str, z), new Function<Boolean, Void>() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.4
            @Nullable
            public Void apply(@Nullable Boolean bool) {
                if (bool != null && bool.booleanValue()) {
                    return null;
                }
                SeekableStreamSupervisor.log.info("Task [%s] failed to stop in a timely manner, killing task", new Object[]{str});
                SeekableStreamSupervisor.this.killTask(str, "Task [%s] failed to stop in a timely manner, killing task", str);
                return null;
            }
        });
    }

    @VisibleForTesting
    public boolean isTaskCurrent(int i, String str) {
        Optional<Task> task = this.taskStorage.getTask(str);
        if (!task.isPresent() || !doesTaskTypeMatchSupervisor((Task) task.get())) {
            return false;
        }
        SeekableStreamIndexTask seekableStreamIndexTask = (SeekableStreamIndexTask) task.get();
        String generateSequenceName = generateSequenceName(seekableStreamIndexTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(), seekableStreamIndexTask.getIOConfig().getMinimumMessageTime(), seekableStreamIndexTask.getIOConfig().getMaximumMessageTime(), seekableStreamIndexTask.getDataSchema(), seekableStreamIndexTask.getTuningConfig());
        if (this.activelyReadingTaskGroups.get(Integer.valueOf(i)) == null) {
            return generateSequenceName(seekableStreamIndexTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap(), seekableStreamIndexTask.getIOConfig().getMinimumMessageTime(), seekableStreamIndexTask.getIOConfig().getMaximumMessageTime(), this.spec.getDataSchema(), this.taskTuningConfig).equals(generateSequenceName);
        }
        SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup = this.activelyReadingTaskGroups.get(Integer.valueOf(i));
        return generateSequenceName(taskGroup.startingSequences, taskGroup.minimumMessageTime, taskGroup.maximumMessageTime, this.spec.getDataSchema(), this.taskTuningConfig).equals(generateSequenceName);
    }

    @VisibleForTesting
    protected String generateSequenceName(Map<PartitionIdType, SequenceOffsetType> map, Optional<DateTime> optional, Optional<DateTime> optional2, DataSchema dataSchema, SeekableStreamIndexTaskTuningConfig seekableStreamIndexTaskTuningConfig) {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : map.entrySet()) {
            sb.append(StringUtils.format("+%s(%s)", new Object[]{entry.getKey().toString(), entry.getValue().toString()}));
        }
        try {
            return Joiner.on("_").join(baseTaskName(), this.dataSource, new Object[]{DigestUtils.sha1Hex(this.sortingMapper.writeValueAsString(dataSchema) + this.sortingMapper.writeValueAsString(seekableStreamIndexTaskTuningConfig) + (map.size() == 0 ? "" : sb.toString().substring(1)) + (optional.isPresent() ? String.valueOf(((DateTime) optional.get()).getMillis()) : "") + (optional2.isPresent() ? String.valueOf(((DateTime) optional2.get()).getMillis()) : "")).substring(0, 15)});
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    protected abstract String baseTaskName();

    protected boolean supportsPartitionExpiration() {
        return false;
    }

    private boolean updatePartitionDataFromStream() {
        Set<PartitionIdType> partitionIds;
        ArrayList arrayList = new ArrayList(this.partitionIds);
        try {
            synchronized (this.recordSupplierLock) {
                partitionIds = this.recordSupplier.getPartitionIds(this.ioConfig.getStream());
            }
            if (partitionIds == null || partitionIds.size() == 0) {
                String format = StringUtils.format("No partitions found for stream [%s]", new Object[]{this.ioConfig.getStream()});
                this.stateManager.recordThrowableEvent(new StreamException(new ISE(format, new Object[0])));
                log.warn(format, new Object[0]);
                return false;
            }
            log.debug("Found [%d] partitions for stream [%s]", new Object[]{Integer.valueOf(partitionIds.size()), this.ioConfig.getStream()});
            Map<PartitionIdType, SequenceOffsetType> offsetsFromMetadataStorage = getOffsetsFromMetadataStorage();
            Set<PartitionIdType> keySet = offsetsFromMetadataStorage.keySet();
            Set set = (Set) offsetsFromMetadataStorage.entrySet().stream().filter(entry -> {
                return isEndOfShard(entry.getValue());
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toSet());
            Set<PartitionIdType> set2 = (Set) offsetsFromMetadataStorage.entrySet().stream().filter(entry2 -> {
                return isShardExpirationMarker(entry2.getValue());
            }).map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toSet());
            Sets.SetView difference = Sets.difference(partitionIds, set2);
            Sets.SetView difference2 = Sets.difference(difference, set);
            Sets.SetView intersection = Sets.intersection(set, new HashSet(arrayList));
            log.debug("active partitions from supplier: " + difference2, new Object[0]);
            if (difference.size() != partitionIds.size()) {
                log.warn("Previously expired partitions [%s] were present in the current list [%s] from the record supplier.", new Object[]{set2, partitionIds});
            }
            if (difference2.size() == 0) {
                String format2 = StringUtils.format("No active partitions found for stream [%s] after removing closed and previously expired partitions", new Object[]{this.ioConfig.getStream()});
                this.stateManager.recordThrowableEvent(new StreamException(new ISE(format2, new Object[0])));
                log.warn(format2, new Object[0]);
                return false;
            }
            boolean isEmpty = this.partitionIds.isEmpty();
            for (Object obj : difference) {
                if (set.contains(obj)) {
                    log.info("partition [%s] is closed and has no more data, skipping.", new Object[]{obj});
                } else if (!this.partitionIds.contains(obj)) {
                    this.partitionIds.add(obj);
                    if (!isEmpty) {
                        this.subsequentlyDiscoveredPartitions.add(obj);
                    }
                }
            }
            if (supportsPartitionExpiration()) {
                cleanupClosedAndExpiredPartitions(keySet, intersection, difference2, set2, partitionIds);
            }
            for (PartitionIdType partitionidtype : difference2) {
                int taskGroupIdForPartition = getTaskGroupIdForPartition(partitionidtype);
                this.partitionGroups.computeIfAbsent(Integer.valueOf(taskGroupIdForPartition), num -> {
                    return new HashSet();
                }).add(partitionidtype);
                if (this.partitionOffsets.putIfAbsent(partitionidtype, getNotSetMarker()) == null) {
                    log.info("New partition [%s] discovered for stream [%s], added to task group [%d]", new Object[]{partitionidtype, this.ioConfig.getStream(), Integer.valueOf(taskGroupIdForPartition)});
                }
            }
            if (this.partitionIds.equals(arrayList)) {
                return true;
            }
            Iterator<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> it = this.activelyReadingTaskGroups.values().iterator();
            while (it.hasNext()) {
                if (!it.next().taskIds().isEmpty()) {
                    this.earlyStopTime = DateTimes.nowUtc().plus(this.tuningConfig.getRepartitionTransitionDuration());
                    log.info("Previous partition set [%s] has changed to [%s] - requesting that tasks stop after [%s] at [%s]", new Object[]{arrayList, this.partitionIds, this.tuningConfig.getRepartitionTransitionDuration(), this.earlyStopTime});
                    return true;
                }
            }
            return true;
        } catch (Exception e) {
            this.stateManager.recordThrowableEvent(e);
            log.warn("Could not fetch partitions for topic/stream [%s]: %s", new Object[]{this.ioConfig.getStream(), e.getMessage()});
            log.debug(e, "full stack trace", new Object[0]);
            return false;
        }
    }

    private void cleanupClosedAndExpiredPartitions(Set<PartitionIdType> set, Set<PartitionIdType> set2, Set<PartitionIdType> set3, Set<PartitionIdType> set4, Set<PartitionIdType> set5) {
        Sets.SetView difference = Sets.difference(Sets.difference(set, set4), set5);
        if (!difference.isEmpty()) {
            log.info("Detected newly expired partitions: " + difference, new Object[0]);
            SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> seekableStreamDataSourceMetadata = (SeekableStreamDataSourceMetadata) this.indexerMetadataStorageCoordinator.getDataSourceMetadata(this.dataSource);
            SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadataWithExpiredPartitions = createDataSourceMetadataWithExpiredPartitions(seekableStreamDataSourceMetadata, difference);
            log.info("New metadata after partition expiration: " + createDataSourceMetadataWithExpiredPartitions, new Object[0]);
            validateMetadataPartitionExpiration(difference, seekableStreamDataSourceMetadata, createDataSourceMetadataWithExpiredPartitions);
            try {
                if (!this.indexerMetadataStorageCoordinator.resetDataSourceMetadata(this.dataSource, createDataSourceMetadataWithExpiredPartitions)) {
                    log.error("Failed to update datasource metadata[%s] with expired partitions removed", new Object[]{createDataSourceMetadataWithExpiredPartitions});
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        if (!set2.isEmpty()) {
            log.info("Detected newly closed partitions: " + set2, new Object[0]);
        }
        if (set2.isEmpty() && difference.isEmpty()) {
            return;
        }
        Map<Integer, Set<PartitionIdType>> recomputePartitionGroupsForExpiration = recomputePartitionGroupsForExpiration(set3);
        validatePartitionGroupReassignments(set3, recomputePartitionGroupsForExpiration);
        log.info("New partition groups after removing closed and expired partitions: " + recomputePartitionGroupsForExpiration, new Object[0]);
        this.partitionIds.clear();
        this.partitionIds.addAll(set3);
        Iterator it = this.partitionGroups.keySet().iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            if (recomputePartitionGroupsForExpiration.containsKey(num)) {
                this.partitionGroups.put(num, recomputePartitionGroupsForExpiration.get(num));
            } else {
                this.partitionGroups.put(num, new HashSet());
            }
        }
    }

    protected Map<Integer, Set<PartitionIdType>> recomputePartitionGroupsForExpiration(Set<PartitionIdType> set) {
        throw new UnsupportedOperationException("This supervisor type does not support partition expiration.");
    }

    protected SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadataWithExpiredPartitions(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> seekableStreamDataSourceMetadata, Set<PartitionIdType> set) {
        throw new UnsupportedOperationException("This supervisor type does not support partition expiration.");
    }

    protected SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetadataWithClosedPartitions(SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> seekableStreamDataSourceMetadata, Set<PartitionIdType> set) {
        throw new UnsupportedOperationException("This supervisor type does not support partition closing.");
    }

    private void validateMetadataPartitionExpiration(Set<PartitionIdType> set, SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> seekableStreamDataSourceMetadata, SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> seekableStreamDataSourceMetadata2) {
        Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap = seekableStreamDataSourceMetadata.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap();
        Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap2 = seekableStreamDataSourceMetadata2.getSeekableStreamSequenceNumbers().getPartitionSequenceNumberMap();
        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : partitionSequenceNumberMap2.entrySet()) {
            if (!partitionSequenceNumberMap.containsKey(entry.getKey())) {
                throw new IAE("Cleaned partition map [%s] contains unexpected partition ID [%s], original partition map: [%s]", new Object[]{partitionSequenceNumberMap2, entry.getKey(), partitionSequenceNumberMap});
            }
            SequenceOffsetType sequenceoffsettype = partitionSequenceNumberMap.get(entry.getKey());
            if (set.contains(entry.getKey())) {
                if (!isShardExpirationMarker(entry.getValue())) {
                    throw new IAE("Newly expired partition [%] was not marked as expired in the cleaned partition map [%s], original partition map: [%s]", new Object[]{entry.getKey(), partitionSequenceNumberMap2, partitionSequenceNumberMap});
                }
            } else if (!sequenceoffsettype.equals(entry.getValue())) {
                throw new IAE("Cleaned partition map [%s] has offset mismatch for partition ID [%s], original partition map: [%s]", new Object[]{partitionSequenceNumberMap2, entry.getKey(), partitionSequenceNumberMap});
            }
        }
    }

    private void validatePartitionGroupReassignments(Set<PartitionIdType> set, Map<Integer, Set<PartitionIdType>> map) {
        Iterator<Set<PartitionIdType>> it = map.values().iterator();
        while (it.hasNext()) {
            for (PartitionIdType partitionidtype : it.next()) {
                if (!set.contains(partitionidtype)) {
                    throw new IAE("Recomputed partition groups [%s] contains unexpected partition ID [%s], old partition groups: [%s]", new Object[]{map, partitionidtype, this.partitionGroups});
                }
            }
        }
    }

    private void updateTaskStatus() throws ExecutionException, InterruptedException, TimeoutException {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        Iterator<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> it = this.activelyReadingTaskGroups.values().iterator();
        while (it.hasNext()) {
            for (Map.Entry<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData> entry : it.next().tasks.entrySet()) {
                String key = entry.getKey();
                final SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData value = entry.getValue();
                if (value.startTime == null) {
                    arrayList2.add(key);
                    arrayList.add(Futures.transform(this.taskClient.getStartTimeAsync(key), new Function<DateTime, Boolean>() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.5
                        public Boolean apply(@Nullable DateTime dateTime) {
                            if (dateTime == null) {
                                return false;
                            }
                            value.startTime = dateTime;
                            long millis = SeekableStreamSupervisor.this.ioConfig.getTaskDuration().getMillis() - (System.currentTimeMillis() - value.startTime.getMillis());
                            if (millis > 0) {
                                SeekableStreamSupervisor.this.scheduledExec.schedule(SeekableStreamSupervisor.this.buildRunTask(), millis + SeekableStreamSupervisor.MAX_RUN_FREQUENCY_MILLIS, TimeUnit.MILLISECONDS);
                            }
                            return true;
                        }
                    }, this.workerExec));
                }
                value.status = (TaskStatus) this.taskStorage.getStatus(key).get();
            }
        }
        Iterator<CopyOnWriteArrayList<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup>> it2 = this.pendingCompletionTaskGroups.values().iterator();
        while (it2.hasNext()) {
            Iterator<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> it3 = it2.next().iterator();
            while (it3.hasNext()) {
                for (Map.Entry<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData> entry2 : it3.next().tasks.entrySet()) {
                    entry2.getValue().status = (TaskStatus) this.taskStorage.getStatus(entry2.getKey()).get();
                }
            }
        }
        List list = (List) Futures.successfulAsList(arrayList).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        for (int i = 0; i < list.size(); i++) {
            if (list.get(i) == null) {
                String str = (String) arrayList2.get(i);
                log.warn("Task [%s] failed to return start time, killing task", new Object[]{str});
                killTask(str, "Task [%s] failed to return start time, killing task", str);
            }
        }
    }

    private void checkTaskDuration() throws ExecutionException, InterruptedException, TimeoutException {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Map.Entry<Integer, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> entry : this.activelyReadingTaskGroups.entrySet()) {
            Integer key = entry.getKey();
            SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup value = entry.getValue();
            DateTime nowUtc = DateTimes.nowUtc();
            for (SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData taskData : value.tasks.values()) {
                if (taskData.startTime != null && nowUtc.isAfter(taskData.startTime)) {
                    nowUtc = taskData.startTime;
                }
            }
            boolean z = false;
            if (this.earlyStopTime != null && (this.earlyStopTime.isBeforeNow() || this.earlyStopTime.isEqualNow())) {
                log.info("Early stop requested - signalling tasks to complete", new Object[0]);
                this.earlyStopTime = null;
                z = true;
            }
            if (nowUtc.plus(this.ioConfig.getTaskDuration()).isBeforeNow() || z) {
                log.info("Task group [%d] has run for [%s]", new Object[]{key, this.ioConfig.getTaskDuration()});
                arrayList2.add(key);
                arrayList.add(checkpointTaskGroup(value, true));
            }
        }
        List list = (List) Futures.successfulAsList(arrayList).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
        for (int i = 0; i < list.size(); i++) {
            Integer num = (Integer) arrayList2.get(i);
            SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup = this.activelyReadingTaskGroups.get(num);
            Map map = (Map) list.get(i);
            if (map != null) {
                taskGroup.completionTimeout = DateTimes.nowUtc().plus(this.ioConfig.getCompletionTimeout());
                this.pendingCompletionTaskGroups.computeIfAbsent(num, num2 -> {
                    return new CopyOnWriteArrayList();
                }).add(taskGroup);
                boolean z2 = false;
                for (Map.Entry entry2 : map.entrySet()) {
                    if (entry2.getValue().equals(getEndOfPartitionMarker())) {
                        log.info("Got end of partition marker for partition [%s] in checkTaskDuration, not updating partition offset.", new Object[]{entry2.getKey()});
                        z2 = true;
                    }
                }
                if (z2) {
                    Iterator it = map.entrySet().iterator();
                    while (it.hasNext()) {
                        this.partitionOffsets.put(((Map.Entry) it.next()).getKey(), getNotSetMarker());
                    }
                } else {
                    for (Map.Entry entry3 : map.entrySet()) {
                        this.partitionOffsets.put(entry3.getKey(), entry3.getValue());
                    }
                }
            } else {
                Iterator<String> it2 = taskGroup.taskIds().iterator();
                while (it2.hasNext()) {
                    killTask(it2.next(), "All tasks in group [%s] failed to transition to publishing state", num);
                }
                UnmodifiableIterator it3 = taskGroup.startingSequences.keySet().iterator();
                while (it3.hasNext()) {
                    this.partitionOffsets.put(it3.next(), getNotSetMarker());
                }
            }
            this.activelyReadingTaskGroups.remove(num);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ListenableFuture<Map<PartitionIdType, SequenceOffsetType>> checkpointTaskGroup(final SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup, final boolean z) {
        if (z) {
            Iterator<Map.Entry<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData>> it = taskGroup.tasks.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData> next = it.next();
                String key = next.getKey();
                SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData value = next.getValue();
                if (value.status != null) {
                    if (value.status.isSuccess()) {
                        this.stateManager.recordCompletedTaskState(TaskState.SUCCESS);
                        return Futures.transform(stopTasksInGroup(taskGroup, "task[%s] succeeded in the taskGroup", value.status.getId()), new Function<Object, Map<PartitionIdType, SequenceOffsetType>>() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.6
                            @Nullable
                            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
                            public Map<PartitionIdType, SequenceOffsetType> m104apply(@Nullable Object obj) {
                                return null;
                            }
                        });
                    }
                    if (value.status.isRunnable() && this.taskInfoProvider.getTaskLocation(key).equals(TaskLocation.unknown())) {
                        killTask(key, "Killing task [%s] which hasn't been assigned to a worker", key);
                        it.remove();
                    }
                }
            }
        }
        final ArrayList arrayList = new ArrayList();
        final ImmutableList copyOf = ImmutableList.copyOf(taskGroup.taskIds());
        Iterator it2 = copyOf.iterator();
        while (it2.hasNext()) {
            arrayList.add(this.taskClient.pauseAsync((String) it2.next()));
        }
        return Futures.transform(Futures.successfulAsList(arrayList), new Function<List<Map<PartitionIdType, SequenceOffsetType>>, Map<PartitionIdType, SequenceOffsetType>>() { // from class: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.7
            @Nullable
            public Map<PartitionIdType, SequenceOffsetType> apply(List<Map<PartitionIdType, SequenceOffsetType>> list) {
                HashMap hashMap = new HashMap();
                for (int i = 0; i < list.size(); i++) {
                    Map<PartitionIdType, SequenceOffsetType> map = list.get(i);
                    String str = (String) copyOf.get(i);
                    if (map == null) {
                        try {
                            throw new ISE("WTH? The pause request for task [%s] is supposed to fail, but returned [%s]", new Object[]{str, (Map) ((ListenableFuture) arrayList.get(i)).get()});
                            break;
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        } catch (ExecutionException e2) {
                            SeekableStreamSupervisor.this.stateManager.recordThrowableEvent(e2);
                            SeekableStreamSupervisor.this.killTask(str, "An exception occured while waiting for task [%s] to pause: [%s]", str, e2.getCause() == null ? e2 : e2.getCause());
                            taskGroup.tasks.remove(str);
                        }
                    } else if (map.isEmpty()) {
                        SeekableStreamSupervisor.this.killTask(str, "Task [%s] returned empty offsets after pause", str);
                        taskGroup.tasks.remove(str);
                    } else {
                        for (Map.Entry<PartitionIdType, SequenceOffsetType> entry : map.entrySet()) {
                            if (!hashMap.containsKey(entry.getKey()) || SeekableStreamSupervisor.this.makeSequenceNumber(hashMap.get(entry.getKey())).compareTo(SeekableStreamSupervisor.this.makeSequenceNumber(entry.getValue())) < 0) {
                                hashMap.put(entry.getKey(), entry.getValue());
                            }
                        }
                    }
                }
                ArrayList arrayList2 = new ArrayList();
                ImmutableList copyOf2 = ImmutableList.copyOf(taskGroup.taskIds());
                if (copyOf2.isEmpty()) {
                    SeekableStreamSupervisor.log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", new Object[]{Integer.valueOf(taskGroup.groupId)});
                    return null;
                }
                try {
                    if (hashMap.equals(taskGroup.checkpointSequences.lastEntry().getValue())) {
                        SeekableStreamSupervisor.log.warn("Checkpoint [%s] is same as the start sequences [%s] of latest sequence for the task group [%d]", new Object[]{hashMap, taskGroup.checkpointSequences.lastEntry().getValue(), Integer.valueOf(taskGroup.groupId)});
                    }
                    SeekableStreamSupervisor.log.info("Setting endOffsets for tasks in taskGroup [%d] to %s and resuming", new Object[]{Integer.valueOf(taskGroup.groupId), hashMap});
                    Iterator it3 = copyOf2.iterator();
                    while (it3.hasNext()) {
                        arrayList2.add(SeekableStreamSupervisor.this.taskClient.setEndOffsetsAsync((String) it3.next(), hashMap, z));
                    }
                    List list2 = (List) Futures.successfulAsList(arrayList2).get(SeekableStreamSupervisor.this.futureTimeoutInSeconds, TimeUnit.SECONDS);
                    for (int i2 = 0; i2 < list2.size(); i2++) {
                        if (list2.get(i2) == null || !((Boolean) list2.get(i2)).booleanValue()) {
                            String str2 = (String) copyOf2.get(i2);
                            SeekableStreamSupervisor.this.killTask(str2, "Task [%s] failed to respond to [set end offsets] in a timely manner, killing task", str2);
                            taskGroup.tasks.remove(str2);
                        }
                    }
                    if (!taskGroup.tasks.isEmpty()) {
                        return hashMap;
                    }
                    SeekableStreamSupervisor.log.info("All tasks in taskGroup [%d] have failed, tasks will be re-created", new Object[]{Integer.valueOf(taskGroup.groupId)});
                    return null;
                } catch (Exception e3) {
                    SeekableStreamSupervisor.log.error("Something bad happened [%s]", new Object[]{e3.getMessage()});
                    throw new RuntimeException(e3);
                }
            }
        }, this.workerExec);
    }

    private ListenableFuture<?> stopTasksInGroup(@Nullable SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup, String str, Object... objArr) {
        if (taskGroup == null) {
            return Futures.immediateFuture((Object) null);
        }
        log.info("Stopping all tasks in taskGroup[%s] because: [%s]", new Object[]{Integer.valueOf(taskGroup.groupId), StringUtils.format(str, objArr)});
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData> entry : taskGroup.tasks.entrySet()) {
            String key = entry.getKey();
            SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData value = entry.getValue();
            if (value.status == null) {
                killTask(key, "Killing task since task status is not known to supervisor", new Object[0]);
            } else if (!value.status.isComplete()) {
                arrayList.add(stopTask(key, false));
            }
        }
        return Futures.successfulAsList(arrayList);
    }

    private void checkPendingCompletionTasks() throws ExecutionException, InterruptedException, TimeoutException {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Integer, CopyOnWriteArrayList<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup>> entry : this.pendingCompletionTaskGroups.entrySet()) {
            boolean z = false;
            Integer key = entry.getKey();
            CopyOnWriteArrayList<SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> value = entry.getValue();
            ArrayList arrayList2 = new ArrayList();
            Iterator it = value.iterator();
            while (it.hasNext()) {
                SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup = (TaskGroup) it.next();
                boolean z2 = false;
                boolean z3 = false;
                if (z) {
                    arrayList.add(stopTasksInGroup(taskGroup, "one of earlier groups that was handling the same partition set timed out before publishing segments", new Object[0]));
                    arrayList2.add(taskGroup);
                } else {
                    Iterator<Map.Entry<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData>> it2 = taskGroup.tasks.entrySet().iterator();
                    while (true) {
                        if (!it2.hasNext()) {
                            break;
                        }
                        Map.Entry<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData> next = it2.next();
                        String key2 = next.getKey();
                        SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData value2 = next.getValue();
                        Preconditions.checkNotNull(value2.status, "WTH? task[%s] has a null status", new Object[]{key2});
                        if (value2.status.isFailure()) {
                            this.stateManager.recordCompletedTaskState(TaskState.FAILED);
                            it2.remove();
                            if (taskGroup.tasks.isEmpty()) {
                                z3 = true;
                                break;
                            }
                        }
                        if (value2.status.isSuccess()) {
                            log.info("Task [%s] completed successfully, stopping tasks %s", new Object[]{key2, taskGroup.taskIds()});
                            this.stateManager.recordCompletedTaskState(TaskState.SUCCESS);
                            arrayList.add(stopTasksInGroup(taskGroup, "Task [%s] completed successfully, stopping tasks %s", key2, taskGroup.taskIds()));
                            z2 = true;
                            arrayList2.add(taskGroup);
                            break;
                        }
                    }
                    if ((!z2 && taskGroup.completionTimeout.isBeforeNow()) || z3) {
                        if (z3) {
                            log.warn("All tasks in group [%d] failed to publish, killing all tasks for these partitions", new Object[]{key});
                        } else {
                            log.makeAlert("No task in [%s] for taskGroup [%d] succeeded before the completion timeout elapsed [%s]!", new Object[]{taskGroup.taskIds(), key, this.ioConfig.getCompletionTimeout()}).emit();
                        }
                        UnmodifiableIterator it3 = taskGroup.startingSequences.keySet().iterator();
                        while (it3.hasNext()) {
                            this.partitionOffsets.put(it3.next(), getNotSetMarker());
                        }
                        killTasksInGroup(taskGroup, "No task in pending completion taskGroup[%d] succeeded before completion timeout elapsed", key);
                        z = true;
                        killTasksInGroup(this.activelyReadingTaskGroups.remove(key), "No task in the corresponding pending completion taskGroup[%d] succeeded before completion timeout elapsed", key);
                        arrayList2.add(taskGroup);
                    }
                }
            }
            value.removeAll(arrayList2);
        }
        Futures.successfulAsList(arrayList).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
    }

    private void checkCurrentTaskState() throws ExecutionException, InterruptedException, TimeoutException {
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<Integer, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup>> it = this.activelyReadingTaskGroups.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> next = it.next();
            Integer key = next.getKey();
            SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup value = next.getValue();
            log.debug("Task group [%d] pre-pruning: %s", new Object[]{key, value.taskIds()});
            Iterator<Map.Entry<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData>> it2 = value.tasks.entrySet().iterator();
            while (true) {
                if (it2.hasNext()) {
                    Map.Entry<String, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData> next2 = it2.next();
                    String key2 = next2.getKey();
                    SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskData value2 = next2.getValue();
                    if (isTaskCurrent(key.intValue(), key2)) {
                        Preconditions.checkNotNull(value2.status, "WTH? task[%s] has a null status", new Object[]{key2});
                        if (value2.status.isFailure()) {
                            this.stateManager.recordCompletedTaskState(TaskState.FAILED);
                            it2.remove();
                        } else if (value2.status.isSuccess()) {
                            this.stateManager.recordCompletedTaskState(TaskState.SUCCESS);
                            arrayList.add(stopTasksInGroup(value, "task[%s] succeeded in the same taskGroup", value2.status.getId()));
                            it.remove();
                            break;
                        }
                    } else {
                        log.info("Stopping task [%s] which does not match the expected sequence range and ingestion spec", new Object[]{key2});
                        arrayList.add(stopTask(key2, false));
                        it2.remove();
                    }
                }
            }
            log.debug("Task group [%d] post-pruning: %s", new Object[]{key, value.taskIds()});
        }
        Futures.successfulAsList(arrayList).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
    }

    protected Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> filterExpiredPartitionsFromStartingOffsets(Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> map) {
        return map;
    }

    private void createNewTasks() throws JsonProcessingException {
        verifyAndMergeCheckpoints((Collection) this.activelyReadingTaskGroups.values().stream().filter(taskGroup -> {
            return taskGroup.tasks.size() < this.ioConfig.getReplicas().intValue();
        }).collect(Collectors.toList()));
        Iterator it = this.partitionGroups.keySet().iterator();
        while (it.hasNext()) {
            Integer num = (Integer) it.next();
            if (!this.activelyReadingTaskGroups.containsKey(num)) {
                log.info("Creating new task group [%d] for partitions %s", new Object[]{num, this.partitionGroups.get(num)});
                Optional of = this.ioConfig.getLateMessageRejectionStartDateTime().isPresent() ? Optional.of(this.ioConfig.getLateMessageRejectionStartDateTime().get()) : this.ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of(DateTimes.nowUtc().minus((ReadableDuration) this.ioConfig.getLateMessageRejectionPeriod().get())) : Optional.absent();
                Optional of2 = this.ioConfig.getEarlyMessageRejectionPeriod().isPresent() ? Optional.of(DateTimes.nowUtc().plus(this.ioConfig.getTaskDuration()).plus((ReadableDuration) this.ioConfig.getEarlyMessageRejectionPeriod().get())) : Optional.absent();
                Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> generateStartingSequencesForPartitionGroup = generateStartingSequencesForPartitionGroup(num.intValue());
                Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> filterExpiredPartitionsFromStartingOffsets = supportsPartitionExpiration() ? filterExpiredPartitionsFromStartingOffsets(generateStartingSequencesForPartitionGroup) : generateStartingSequencesForPartitionGroup;
                ImmutableMap immutableMap = (ImmutableMap) filterExpiredPartitionsFromStartingOffsets.entrySet().stream().filter(entry -> {
                    return ((OrderedSequenceNumber) entry.getValue()).get() != null;
                }).collect(Collectors.collectingAndThen(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry2 -> {
                    return ((OrderedSequenceNumber) entry2.getValue()).get();
                }), ImmutableMap::copyOf));
                this.activelyReadingTaskGroups.put(num, new TaskGroup(this, num.intValue(), immutableMap, supportsPartitionExpiration() ? (ImmutableMap) generateStartingSequencesForPartitionGroup.entrySet().stream().filter(entry3 -> {
                    return ((OrderedSequenceNumber) entry3.getValue()).get() != null;
                }).collect(Collectors.collectingAndThen(Collectors.toMap((v0) -> {
                    return v0.getKey();
                }, entry4 -> {
                    return ((OrderedSequenceNumber) entry4.getValue()).get();
                }), ImmutableMap::copyOf)) : immutableMap, of, of2, !this.useExclusiveStartingSequence ? Collections.emptySet() : (Set) filterExpiredPartitionsFromStartingOffsets.entrySet().stream().filter(entry5 -> {
                    return ((OrderedSequenceNumber) entry5.getValue()).get() != null && ((OrderedSequenceNumber) entry5.getValue()).isExclusive();
                }).map((v0) -> {
                    return v0.getKey();
                }).collect(Collectors.toSet())));
            }
        }
        boolean z = false;
        for (Map.Entry<Integer, SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup> entry6 : this.activelyReadingTaskGroups.entrySet()) {
            SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup value = entry6.getValue();
            Integer key = entry6.getKey();
            if (value.startingSequences == null || value.startingSequences.size() == 0 || value.startingSequences.values().stream().allMatch(obj -> {
                return obj == 0 || isEndOfShard(obj);
            })) {
                log.debug("Nothing to read in any partition for taskGroup [%d], skipping task creation", new Object[]{key});
            } else if (this.ioConfig.getReplicas().intValue() > value.tasks.size()) {
                log.info("Number of tasks [%d] does not match configured numReplicas [%d] in task group [%d], creating more tasks", new Object[]{Integer.valueOf(value.tasks.size()), this.ioConfig.getReplicas(), key});
                createTasksForGroup(key.intValue(), this.ioConfig.getReplicas().intValue() - value.tasks.size());
                z = true;
            }
        }
        if (z && this.firstRunTime.isBeforeNow()) {
            this.scheduledExec.schedule(buildRunTask(), 5000L, TimeUnit.MILLISECONDS);
        }
    }

    private void addNotice(Notice notice) {
        this.notices.add(notice);
    }

    @VisibleForTesting
    public void moveTaskGroupToPendingCompletion(int i) {
        SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup remove = this.activelyReadingTaskGroups.remove(Integer.valueOf(i));
        if (remove != null) {
            this.pendingCompletionTaskGroups.computeIfAbsent(Integer.valueOf(i), num -> {
                return new CopyOnWriteArrayList();
            }).add(remove);
        }
    }

    @VisibleForTesting
    public int getNoticesQueueSize() {
        return this.notices.size();
    }

    private Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> generateStartingSequencesForPartitionGroup(int i) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        for (PartitionIdType partitionidtype : this.partitionGroups.get(Integer.valueOf(i))) {
            SequenceOffsetType sequenceoffsettype = this.partitionOffsets.get(partitionidtype);
            if (getNotSetMarker().equals(sequenceoffsettype)) {
                OrderedSequenceNumber<SequenceOffsetType> offsetFromStorageForPartition = getOffsetFromStorageForPartition(partitionidtype);
                if (offsetFromStorageForPartition != null) {
                    builder.put(partitionidtype, offsetFromStorageForPartition);
                }
            } else if (!isEndOfShard(sequenceoffsettype)) {
                builder.put(partitionidtype, makeSequenceNumber(sequenceoffsettype, useExclusiveStartSequenceNumberForNonFirstSequence()));
            }
        }
        return builder.build();
    }

    private OrderedSequenceNumber<SequenceOffsetType> getOffsetFromStorageForPartition(PartitionIdType partitionidtype) {
        SequenceOffsetType sequenceoffsettype = getOffsetsFromMetadataStorage().get(partitionidtype);
        if (sequenceoffsettype != null) {
            log.debug("Getting sequence [%s] from metadata storage for partition [%s]", new Object[]{sequenceoffsettype, partitionidtype});
            if (this.taskTuningConfig.isSkipSequenceNumberAvailabilityCheck() || checkOffsetAvailability(partitionidtype, sequenceoffsettype)) {
                return makeSequenceNumber(sequenceoffsettype, useExclusiveStartSequenceNumberForNonFirstSequence());
            }
            if (!this.taskTuningConfig.isResetOffsetAutomatically()) {
                throw new StreamException(new ISE("Previous sequenceNumber [%s] is no longer available for partition [%s]. You can clear the previous sequenceNumber and start reading from a valid message by using the supervisor's reset API.", new Object[]{sequenceoffsettype, partitionidtype}));
            }
            resetInternal(createDataSourceMetaDataForReset(this.ioConfig.getStream(), ImmutableMap.of(partitionidtype, sequenceoffsettype)));
            throw new StreamException(new ISE("Previous sequenceNumber [%s] is no longer available for partition [%s] - automatically resetting sequence", new Object[]{sequenceoffsettype, partitionidtype}));
        }
        boolean isUseEarliestSequenceNumber = this.ioConfig.isUseEarliestSequenceNumber();
        if (this.subsequentlyDiscoveredPartitions.contains(partitionidtype)) {
            log.info("Overriding useEarliestSequenceNumber and starting from beginning of newly discovered partition [%s] (which is probably from a split or merge)", new Object[]{partitionidtype});
            isUseEarliestSequenceNumber = true;
        }
        SequenceOffsetType offsetFromStreamForPartition = getOffsetFromStreamForPartition(partitionidtype, isUseEarliestSequenceNumber);
        if (offsetFromStreamForPartition == null) {
            throw new ISE("unable to fetch sequence number for partition[%s] from stream", new Object[]{partitionidtype});
        }
        log.info("Getting sequence number [%s] for partition [%s]", new Object[]{offsetFromStreamForPartition, partitionidtype});
        return makeSequenceNumber(offsetFromStreamForPartition, false);
    }

    private Map<PartitionIdType, SequenceOffsetType> getOffsetsFromMetadataStorage() {
        SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> seekableStreamSequenceNumbers;
        DataSourceMetadata dataSourceMetadata = this.indexerMetadataStorageCoordinator.getDataSourceMetadata(this.dataSource);
        if ((dataSourceMetadata instanceof SeekableStreamDataSourceMetadata) && checkSourceMetadataMatch(dataSourceMetadata) && (seekableStreamSequenceNumbers = ((SeekableStreamDataSourceMetadata) dataSourceMetadata).getSeekableStreamSequenceNumbers()) != null) {
            if (!this.ioConfig.getStream().equals(seekableStreamSequenceNumbers.getStream())) {
                log.warn("Topic/stream in metadata storage [%s] doesn't match spec topic/stream [%s], ignoring stored sequences", new Object[]{seekableStreamSequenceNumbers.getStream(), this.ioConfig.getStream()});
                return Collections.emptyMap();
            }
            if (seekableStreamSequenceNumbers.getPartitionSequenceNumberMap() != null) {
                return seekableStreamSequenceNumbers.getPartitionSequenceNumberMap();
            }
        }
        return Collections.emptyMap();
    }

    @Nullable
    private SequenceOffsetType getOffsetFromStreamForPartition(PartitionIdType partitionidtype, boolean z) {
        SequenceOffsetType earliestSequenceNumber;
        synchronized (this.recordSupplierLock) {
            StreamPartition<PartitionIdType> streamPartition = new StreamPartition<>(this.ioConfig.getStream(), partitionidtype);
            if (!this.recordSupplier.getAssignment().contains(streamPartition)) {
                Set<StreamPartition<PartitionIdType>> singleton = Collections.singleton(streamPartition);
                this.recordSupplier.assign(singleton);
                try {
                    this.recordSupplier.seekToEarliest(singleton);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
            earliestSequenceNumber = z ? this.recordSupplier.getEarliestSequenceNumber(streamPartition) : this.recordSupplier.getLatestSequenceNumber(streamPartition);
        }
        return earliestSequenceNumber;
    }

    private void createTasksForGroup(int i, int i2) throws JsonProcessingException {
        SeekableStreamSupervisor<PartitionIdType, SequenceOffsetType>.TaskGroup taskGroup = this.activelyReadingTaskGroups.get(Integer.valueOf(i));
        ImmutableMap<PartitionIdType, SequenceOffsetType> immutableMap = taskGroup.startingSequences;
        HashMap hashMap = new HashMap();
        Iterator<PartitionIdType> it = immutableMap.keySet().iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), getEndOfPartitionMarker());
        }
        for (SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType> seekableStreamIndexTask : createIndexTasks(i2, taskGroup.baseSequenceName, this.sortingMapper, taskGroup.checkpointSequences, createTaskIoConfig(i, immutableMap, hashMap, taskGroup.baseSequenceName, (DateTime) taskGroup.minimumMessageTime.orNull(), (DateTime) taskGroup.maximumMessageTime.orNull(), this.activelyReadingTaskGroups.get(Integer.valueOf(i)).exclusiveStartSequenceNumberPartitions, this.ioConfig), this.taskTuningConfig, this.rowIngestionMetersFactory)) {
            Optional<TaskQueue> taskQueue = this.taskMaster.getTaskQueue();
            if (taskQueue.isPresent()) {
                try {
                    ((TaskQueue) taskQueue.get()).add(seekableStreamIndexTask);
                } catch (EntryExistsException e) {
                    this.stateManager.recordThrowableEvent(e);
                    log.error("Tried to add task [%s] but it already exists", new Object[]{seekableStreamIndexTask.getId()});
                }
            } else {
                log.error("Failed to get task queue because I'm not the leader!", new Object[0]);
            }
        }
    }

    @VisibleForTesting
    public Runnable updateCurrentAndLatestOffsets() {
        return () -> {
            try {
                updateCurrentOffsets();
                updateLatestOffsetsFromStream();
                this.sequenceLastUpdated = DateTimes.nowUtc();
            } catch (Exception e) {
                log.warn(e, "Exception while getting current/latest sequences", new Object[0]);
            }
        };
    }

    private void updateCurrentOffsets() throws InterruptedException, ExecutionException, TimeoutException {
        Futures.successfulAsList((List) Stream.concat(this.activelyReadingTaskGroups.values().stream().flatMap(taskGroup -> {
            return taskGroup.tasks.entrySet().stream();
        }), this.pendingCompletionTaskGroups.values().stream().flatMap((v0) -> {
            return v0.stream();
        }).flatMap(taskGroup2 -> {
            return taskGroup2.tasks.entrySet().stream();
        })).map(entry -> {
            return Futures.transform(this.taskClient.getCurrentOffsetsAsync((String) entry.getKey(), false), map -> {
                if (map == null || map.isEmpty()) {
                    return null;
                }
                ((TaskData) entry.getValue()).currentSequences = map;
                return null;
            });
        }).collect(Collectors.toList())).get(this.futureTimeoutInSeconds, TimeUnit.SECONDS);
    }

    private void updateLatestOffsetsFromStream() throws InterruptedException {
        synchronized (this.recordSupplierLock) {
            try {
                Set<StreamPartition<PartitionIdType>> set = (Set) this.recordSupplier.getPartitionIds(this.ioConfig.getStream()).stream().map(obj -> {
                    return new StreamPartition(this.ioConfig.getStream(), obj);
                }).collect(Collectors.toSet());
                this.recordSupplier.assign(set);
                this.recordSupplier.seekToLatest(set);
                updateLatestSequenceFromStream(this.recordSupplier, set);
            } catch (Exception e) {
                log.warn("Could not fetch partitions for topic/stream [%s]", new Object[]{this.ioConfig.getStream()});
                throw new StreamException(e);
            }
        }
    }

    protected abstract void updateLatestSequenceFromStream(RecordSupplier<PartitionIdType, SequenceOffsetType> recordSupplier, Set<StreamPartition<PartitionIdType>> set);

    protected Map<PartitionIdType, SequenceOffsetType> getHighestCurrentOffsets() {
        return (Map) this.activelyReadingTaskGroups.values().stream().flatMap(taskGroup -> {
            return taskGroup.tasks.entrySet().stream();
        }).flatMap(entry -> {
            return ((TaskData) entry.getValue()).currentSequences.entrySet().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (obj, obj2) -> {
            return makeSequenceNumber(obj).compareTo(makeSequenceNumber(obj2)) > 0 ? obj : obj2;
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OrderedSequenceNumber<SequenceOffsetType> makeSequenceNumber(SequenceOffsetType sequenceoffsettype) {
        return makeSequenceNumber(sequenceoffsettype, false);
    }

    @VisibleForTesting
    public boolean isStarted() {
        return this.started;
    }

    @VisibleForTesting
    public boolean isLifecycleStarted() {
        return this.lifecycleStarted;
    }

    @VisibleForTesting
    public int getInitRetryCounter() {
        return this.initRetryCounter;
    }

    @VisibleForTesting
    public SeekableStreamSupervisorIOConfig getIoConfig() {
        return this.ioConfig;
    }

    public void checkpoint(int i, DataSourceMetadata dataSourceMetadata) {
        Preconditions.checkNotNull(dataSourceMetadata, "checkpointMetadata");
        SeekableStreamDataSourceMetadata seekableStreamDataSourceMetadata = (SeekableStreamDataSourceMetadata) dataSourceMetadata;
        Preconditions.checkArgument(this.spec.getIoConfig().getStream().equals(seekableStreamDataSourceMetadata.getSeekableStreamSequenceNumbers().getStream()), "Supervisor stream [%s] and stream in checkpoint [%s] does not match", new Object[]{this.spec.getIoConfig().getStream(), seekableStreamDataSourceMetadata.getSeekableStreamSequenceNumbers().getStream()});
        Preconditions.checkArgument(seekableStreamDataSourceMetadata.getSeekableStreamSequenceNumbers() instanceof SeekableStreamStartSequenceNumbers, "checkpointMetadata must be SeekableStreamStartSequenceNumbers");
        log.info("Checkpointing [%s] for taskGroup [%s]", new Object[]{dataSourceMetadata, Integer.valueOf(i)});
        addNotice(new CheckpointNotice(i, seekableStreamDataSourceMetadata));
    }

    @VisibleForTesting
    public Map<String, Object> createBaseTaskContexts() {
        HashMap hashMap = new HashMap();
        if (this.spec.getContext() != null) {
            hashMap.putAll(this.spec.getContext());
        }
        return hashMap;
    }

    @VisibleForTesting
    public ConcurrentHashMap<Integer, Set<PartitionIdType>> getPartitionGroups() {
        return this.partitionGroups;
    }

    @VisibleForTesting
    public boolean isPartitionIdsEmpty() {
        return this.partitionIds.isEmpty();
    }

    public ConcurrentHashMap<PartitionIdType, SequenceOffsetType> getPartitionOffsets() {
        return this.partitionOffsets;
    }

    @VisibleForTesting
    public void setPartitionIdsForTests(List<PartitionIdType> list) {
        this.partitionIds.clear();
        this.partitionIds.addAll(list);
    }

    protected abstract SeekableStreamIndexTaskIOConfig createTaskIoConfig(int i, Map<PartitionIdType, SequenceOffsetType> map, Map<PartitionIdType, SequenceOffsetType> map2, String str, DateTime dateTime, DateTime dateTime2, Set<PartitionIdType> set, SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig);

    protected abstract List<SeekableStreamIndexTask<PartitionIdType, SequenceOffsetType>> createIndexTasks(int i, String str, ObjectMapper objectMapper, TreeMap<Integer, Map<PartitionIdType, SequenceOffsetType>> treeMap, SeekableStreamIndexTaskIOConfig seekableStreamIndexTaskIOConfig, SeekableStreamIndexTaskTuningConfig seekableStreamIndexTaskTuningConfig, RowIngestionMetersFactory rowIngestionMetersFactory) throws JsonProcessingException;

    protected abstract int getTaskGroupIdForPartition(PartitionIdType partitionidtype);

    protected abstract boolean checkSourceMetadataMatch(DataSourceMetadata dataSourceMetadata);

    protected abstract boolean doesTaskTypeMatchSupervisor(Task task);

    protected abstract SeekableStreamDataSourceMetadata<PartitionIdType, SequenceOffsetType> createDataSourceMetaDataForReset(String str, Map<PartitionIdType, SequenceOffsetType> map);

    protected abstract OrderedSequenceNumber<SequenceOffsetType> makeSequenceNumber(SequenceOffsetType sequenceoffsettype, boolean z);

    protected abstract void scheduleReporting(ScheduledExecutorService scheduledExecutorService);

    protected abstract Map<PartitionIdType, SequenceOffsetType> getLagPerPartition(Map<PartitionIdType, SequenceOffsetType> map);

    protected abstract RecordSupplier<PartitionIdType, SequenceOffsetType> setupRecordSupplier();

    protected abstract SeekableStreamSupervisorReportPayload<PartitionIdType, SequenceOffsetType> createReportPayload(int i, boolean z);

    private boolean checkOffsetAvailability(@NotNull PartitionIdType partitionidtype, @NotNull SequenceOffsetType sequenceoffsettype) {
        SequenceOffsetType offsetFromStreamForPartition = getOffsetFromStreamForPartition(partitionidtype, true);
        return offsetFromStreamForPartition != null && makeSequenceNumber(offsetFromStreamForPartition).compareTo(makeSequenceNumber(sequenceoffsettype)) <= 0;
    }

    protected abstract SequenceOffsetType getNotSetMarker();

    protected abstract SequenceOffsetType getEndOfPartitionMarker();

    protected abstract boolean isEndOfShard(SequenceOffsetType sequenceoffsettype);

    protected abstract boolean isShardExpirationMarker(SequenceOffsetType sequenceoffsettype);

    protected abstract boolean useExclusiveStartSequenceNumberForNonFirstSequence();

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.access$202(org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$202(org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastRunTime = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor.access$202(org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor, long):long");
    }

    static {
    }
}
