/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexing.kafka.supervisor;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.common.task.utils.RandomIdUtils;
import org.apache.druid.indexing.kafka.KafkaDataSourceMetadata;
import org.apache.druid.indexing.kafka.KafkaIndexTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskClientFactory;
import org.apache.druid.indexing.kafka.KafkaIndexTaskIOConfig;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
import org.apache.druid.indexing.kafka.KafkaRecordSupplier;
import org.apache.druid.indexing.kafka.KafkaSequenceNumber;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorReportPayload;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.joda.time.DateTime;

public class KafkaSupervisor
extends SeekableStreamSupervisor<Integer, Long> {
    public static final TypeReference<TreeMap<Integer, Map<Integer, Long>>> CHECKPOINTS_TYPE_REF = new TypeReference<TreeMap<Integer, Map<Integer, Long>>>(){};
    private static final EmittingLogger log = new EmittingLogger(KafkaSupervisor.class);
    private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000L;
    private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000L;
    private static final long INITIAL_EMIT_LAG_METRIC_DELAY_MILLIS = 25000L;
    private static final Long NOT_SET = -1L;
    private static final Long END_OF_PARTITION = Long.MAX_VALUE;
    private final ServiceEmitter emitter;
    private final DruidMonitorSchedulerConfig monitorSchedulerConfig;
    private volatile Map<Integer, Long> latestSequenceFromStream;
    private final KafkaSupervisorSpec spec;

    public KafkaSupervisor(TaskStorage taskStorage, TaskMaster taskMaster, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, KafkaIndexTaskClientFactory taskClientFactory, ObjectMapper mapper, KafkaSupervisorSpec spec, RowIngestionMetersFactory rowIngestionMetersFactory) {
        super(StringUtils.format((String)"KafkaSupervisor-%s", (Object[])new Object[]{spec.getDataSchema().getDataSource()}), taskStorage, taskMaster, indexerMetadataStorageCoordinator, (SeekableStreamIndexTaskClientFactory)taskClientFactory, mapper, (SeekableStreamSupervisorSpec)spec, rowIngestionMetersFactory, false);
        this.spec = spec;
        this.emitter = spec.getEmitter();
        this.monitorSchedulerConfig = spec.getMonitorSchedulerConfig();
    }

    protected RecordSupplier<Integer, Long> setupRecordSupplier() {
        return new KafkaRecordSupplier(this.spec.getIoConfig().getConsumerProperties(), this.sortingMapper);
    }

    protected void scheduleReporting(ScheduledExecutorService reportingExec) {
        KafkaSupervisorIOConfig ioConfig = this.spec.getIoConfig();
        KafkaSupervisorTuningConfig tuningConfig = this.spec.getTuningConfig();
        reportingExec.scheduleAtFixedRate(this.updateCurrentAndLatestOffsets(), ioConfig.getStartDelay().getMillis() + 15000L, Math.max(tuningConfig.getOffsetFetchPeriod().getMillis(), 5000L), TimeUnit.MILLISECONDS);
        reportingExec.scheduleAtFixedRate(this.emitLag(), ioConfig.getStartDelay().getMillis() + 25000L, this.monitorSchedulerConfig.getEmitterPeriod().getMillis(), TimeUnit.MILLISECONDS);
    }

    protected int getTaskGroupIdForPartition(Integer partitionId) {
        return partitionId % this.spec.getIoConfig().getTaskCount();
    }

    protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata) {
        return metadata instanceof KafkaDataSourceMetadata;
    }

    protected boolean doesTaskTypeMatchSupervisor(Task task) {
        return task instanceof KafkaIndexTask;
    }

    protected SeekableStreamSupervisorReportPayload<Integer, Long> createReportPayload(int numPartitions, boolean includeOffsets) {
        KafkaSupervisorIOConfig ioConfig = this.spec.getIoConfig();
        Map<Integer, Long> partitionLag = this.getLagPerPartition(this.getHighestCurrentOffsets());
        return new KafkaSupervisorReportPayload(this.spec.getDataSchema().getDataSource(), ioConfig.getTopic(), numPartitions, ioConfig.getReplicas(), ioConfig.getTaskDuration().getMillis() / 1000L, includeOffsets ? this.latestSequenceFromStream : null, includeOffsets ? partitionLag : null, includeOffsets ? Long.valueOf(partitionLag.values().stream().mapToLong(x -> Math.max(x, 0L)).sum()) : null, includeOffsets ? this.sequenceLastUpdated : null, this.spec.isSuspended(), this.stateManager.isHealthy(), this.stateManager.getSupervisorState().getBasicState(), this.stateManager.getSupervisorState(), this.stateManager.getExceptionEvents());
    }

    protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(int groupId, Map<Integer, Long> startPartitions, Map<Integer, Long> endPartitions, String baseSequenceName, DateTime minimumMessageTime, DateTime maximumMessageTime, Set<Integer> exclusiveStartSequenceNumberPartitions, SeekableStreamSupervisorIOConfig ioConfig) {
        KafkaSupervisorIOConfig kafkaIoConfig = (KafkaSupervisorIOConfig)ioConfig;
        return new KafkaIndexTaskIOConfig(groupId, baseSequenceName, (SeekableStreamStartSequenceNumbers<Integer, Long>)new SeekableStreamStartSequenceNumbers(kafkaIoConfig.getTopic(), startPartitions, Collections.emptySet()), (SeekableStreamEndSequenceNumbers<Integer, Long>)new SeekableStreamEndSequenceNumbers(kafkaIoConfig.getTopic(), endPartitions), kafkaIoConfig.getConsumerProperties(), kafkaIoConfig.getPollTimeout(), true, minimumMessageTime, maximumMessageTime, ioConfig.getInputFormat(this.spec.getDataSchema().getParser() == null ? null : this.spec.getDataSchema().getParser().getParseSpec()));
    }

    protected List<SeekableStreamIndexTask<Integer, Long>> createIndexTasks(int replicas, String baseSequenceName, ObjectMapper sortingMapper, TreeMap<Integer, Map<Integer, Long>> sequenceOffsets, SeekableStreamIndexTaskIOConfig taskIoConfig, SeekableStreamIndexTaskTuningConfig taskTuningConfig, RowIngestionMetersFactory rowIngestionMetersFactory) throws JsonProcessingException {
        String checkpoints = sortingMapper.writerFor(CHECKPOINTS_TYPE_REF).writeValueAsString(sequenceOffsets);
        Map context = this.createBaseTaskContexts();
        context.put("checkpoints", checkpoints);
        context.put("IS_INCREMENTAL_HANDOFF_SUPPORTED", true);
        ArrayList<SeekableStreamIndexTask<Integer, Long>> taskList = new ArrayList<SeekableStreamIndexTask<Integer, Long>>();
        for (int i = 0; i < replicas; ++i) {
            String taskId = Joiner.on((String)"_").join((Object)baseSequenceName, (Object)RandomIdUtils.getRandomId(), new Object[0]);
            taskList.add(new KafkaIndexTask(taskId, new TaskResource(baseSequenceName, 1), this.spec.getDataSchema(), (KafkaIndexTaskTuningConfig)taskTuningConfig, (KafkaIndexTaskIOConfig)taskIoConfig, context, null, null, rowIngestionMetersFactory, sortingMapper, null));
        }
        return taskList;
    }

    protected Map<Integer, Long> getLagPerPartition(Map<Integer, Long> currentOffsets) {
        return currentOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> this.latestSequenceFromStream != null && this.latestSequenceFromStream.get(e.getKey()) != null && e.getValue() != null ? this.latestSequenceFromStream.get(e.getKey()) - (Long)e.getValue() : Integer.MIN_VALUE));
    }

    protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<Integer, Long> map) {
        return new KafkaDataSourceMetadata((SeekableStreamSequenceNumbers<Integer, Long>)new SeekableStreamEndSequenceNumbers(topic, map));
    }

    protected OrderedSequenceNumber<Long> makeSequenceNumber(Long seq, boolean isExclusive) {
        return KafkaSequenceNumber.of(seq);
    }

    private Runnable emitLag() {
        return () -> {
            try {
                Map highestCurrentOffsets = this.getHighestCurrentOffsets();
                String dataSource = this.spec.getDataSchema().getDataSource();
                if (this.latestSequenceFromStream == null) {
                    throw new ISE("Latest offsets from Kafka have not been fetched", new Object[0]);
                }
                if (!this.latestSequenceFromStream.keySet().equals(highestCurrentOffsets.keySet())) {
                    log.warn("Lag metric: Kafka partitions %s do not match task partitions %s", new Object[]{this.latestSequenceFromStream.keySet(), highestCurrentOffsets.keySet()});
                }
                Map<Integer, Long> partitionLags = this.getLagPerPartition(highestCurrentOffsets);
                long maxLag = 0L;
                long totalLag = 0L;
                for (long lag : partitionLags.values()) {
                    if (lag > maxLag) {
                        maxLag = lag;
                    }
                    totalLag += lag;
                }
                long avgLag = partitionLags.size() == 0 ? 0L : totalLag / (long)partitionLags.size();
                this.emitter.emit(ServiceMetricEvent.builder().setDimension("dataSource", (Object)dataSource).build("ingest/kafka/lag", (Number)totalLag));
                this.emitter.emit(ServiceMetricEvent.builder().setDimension("dataSource", (Object)dataSource).build("ingest/kafka/maxLag", (Number)maxLag));
                this.emitter.emit(ServiceMetricEvent.builder().setDimension("dataSource", (Object)dataSource).build("ingest/kafka/avgLag", (Number)avgLag));
            }
            catch (Exception e) {
                log.warn((Throwable)e, "Unable to compute Kafka lag", new Object[0]);
            }
        };
    }

    protected Long getNotSetMarker() {
        return NOT_SET;
    }

    protected Long getEndOfPartitionMarker() {
        return END_OF_PARTITION;
    }

    protected boolean isEndOfShard(Long seqNum) {
        return false;
    }

    protected boolean isShardExpirationMarker(Long seqNum) {
        return false;
    }

    protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() {
        return false;
    }

    protected void updateLatestSequenceFromStream(RecordSupplier<Integer, Long> recordSupplier, Set<StreamPartition<Integer>> partitions) {
        this.latestSequenceFromStream = partitions.stream().collect(Collectors.toMap(StreamPartition::getPartitionId, arg_0 -> recordSupplier.getPosition(arg_0)));
    }

    protected String baseTaskName() {
        return "index_kafka";
    }

    @VisibleForTesting
    public KafkaSupervisorIOConfig getIoConfig() {
        return this.spec.getIoConfig();
    }

    @VisibleForTesting
    public KafkaSupervisorTuningConfig getTuningConfig() {
        return this.spec.getTuningConfig();
    }
}

