package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.AbstractPartitionGroup;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions;
import org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics;
import org.apache.kafka.streams.state.internals.ThreadCache;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamTask.class */
public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, Task {
    private final Time time;
    private final Consumer<byte[], byte[]> mainConsumer;
    private final boolean eosEnabled;
    private final int maxBufferedSize;
    private final AbstractPartitionGroup partitionGroup;
    private final RecordCollector recordCollector;
    private final AbstractPartitionGroup.RecordInfo recordInfo;
    private final Map<TopicPartition, Long> consumedOffsets;
    private final Map<TopicPartition, Long> committedOffsets;
    private final Map<TopicPartition, Long> highWatermark;
    private final Set<TopicPartition> resetOffsetsForPartitions;
    private final Set<TopicPartition> partitionsToResume;
    private final PunctuationQueue streamTimePunctuationQueue;
    private final PunctuationQueue systemTimePunctuationQueue;
    private final StreamsMetricsImpl streamsMetrics;
    private long processTimeMs;
    private final Sensor closeTaskSensor;
    private final Sensor processRatioSensor;
    private final Sensor processLatencySensor;
    private final Sensor restoreSensor;
    private final Sensor restoreRemainingSensor;
    private final Sensor punctuateLatencySensor;
    private final Sensor bufferedRecordsSensor;
    private final Sensor droppedRecordsSensor;
    private final Map<String, Sensor> e2eLatencySensors;
    private final RecordQueueCreator recordQueueCreator;
    protected final InternalProcessorContext processorContext;
    private final ProcessingExceptionHandler processingExceptionHandler;
    private StampedRecord record;
    private boolean commitNeeded;
    private boolean commitRequested;
    private boolean hasPendingTxCommit;
    private Optional<Long> timeCurrentIdlingStarted;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.streams.processor.internals.StreamTask$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamTask$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$streams$processor$internals$Task$State;

        static {
            try {
                $SwitchMap$org$apache$kafka$streams$processor$PunctuationType[PunctuationType.STREAM_TIME.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$processor$PunctuationType[PunctuationType.WALL_CLOCK_TIME.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            $SwitchMap$org$apache$kafka$streams$processor$internals$Task$State = new int[Task.State.values().length];
            try {
                $SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[Task.State.RUNNING.ordinal()] = 1;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[Task.State.RESTORING.ordinal()] = 2;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[Task.State.CREATED.ordinal()] = 3;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[Task.State.SUSPENDED.ordinal()] = 4;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[Task.State.CLOSED.ordinal()] = 5;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamTask$RecordQueueCreator.class */
    public class RecordQueueCreator {
        private final LogContext logContext;
        private final TimestampExtractor defaultTimestampExtractor;
        private final DeserializationExceptionHandler defaultDeserializationExceptionHandler;

        private RecordQueueCreator(LogContext logContext, TimestampExtractor timestampExtractor, DeserializationExceptionHandler deserializationExceptionHandler) {
            this.logContext = logContext;
            this.defaultTimestampExtractor = timestampExtractor;
            this.defaultDeserializationExceptionHandler = deserializationExceptionHandler;
        }

        public RecordQueue createQueue(TopicPartition topicPartition) {
            SourceNode<?, ?> source = StreamTask.this.topology.source(topicPartition.topic());
            if (source == null) {
                throw new TopologyException("Topic " + topicPartition.topic() + " is unknown to the topology. This may happen if different KafkaStreams instances of the same application execute different Topologies. Note that Topologies are only identical if all operators are added in the same order.");
            }
            TimestampExtractor timestampExtractor = source.getTimestampExtractor();
            return new RecordQueue(topicPartition, source, timestampExtractor != null ? timestampExtractor : this.defaultTimestampExtractor, this.defaultDeserializationExceptionHandler, StreamTask.this.processorContext, this.logContext);
        }

        /* synthetic */ RecordQueueCreator(StreamTask streamTask, LogContext logContext, TimestampExtractor timestampExtractor, DeserializationExceptionHandler deserializationExceptionHandler, AnonymousClass1 anonymousClass1) {
            this(logContext, timestampExtractor, deserializationExceptionHandler);
        }
    }

    public StreamTask(TaskId taskId, Set<TopicPartition> set, ProcessorTopology processorTopology, Consumer<byte[], byte[]> consumer, TopologyConfig.TaskConfig taskConfig, StreamsMetricsImpl streamsMetricsImpl, StateDirectory stateDirectory, ThreadCache threadCache, Time time, ProcessorStateManager processorStateManager, RecordCollector recordCollector, InternalProcessorContext internalProcessorContext, LogContext logContext, boolean z) {
        super(taskId, processorTopology, stateDirectory, processorStateManager, set, taskConfig, "task", StreamTask.class);
        this.processTimeMs = 0L;
        this.e2eLatencySensors = new HashMap();
        this.commitNeeded = false;
        this.commitRequested = false;
        this.hasPendingTxCommit = false;
        this.mainConsumer = consumer;
        this.processorContext = internalProcessorContext;
        internalProcessorContext.transitionToActive(this, recordCollector, threadCache);
        this.time = time;
        this.recordCollector = recordCollector;
        this.eosEnabled = taskConfig.eosEnabled;
        String name = Thread.currentThread().getName();
        this.streamsMetrics = streamsMetricsImpl;
        this.closeTaskSensor = ThreadMetrics.closeTaskSensor(name, streamsMetricsImpl);
        String taskId2 = taskId.toString();
        this.restoreSensor = TaskMetrics.restoreSensor(name, taskId2, streamsMetricsImpl, new Sensor[0]);
        this.restoreRemainingSensor = TaskMetrics.restoreRemainingRecordsSensor(name, taskId2, streamsMetricsImpl);
        this.processRatioSensor = TaskMetrics.activeProcessRatioSensor(name, taskId2, streamsMetricsImpl);
        this.processLatencySensor = TaskMetrics.processLatencySensor(name, taskId2, streamsMetricsImpl);
        this.punctuateLatencySensor = TaskMetrics.punctuateSensor(name, taskId2, streamsMetricsImpl);
        this.bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(name, taskId2, streamsMetricsImpl);
        this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(name, taskId2, streamsMetricsImpl);
        for (String str : processorTopology.terminalNodes()) {
            this.e2eLatencySensors.put(str, ProcessorNodeMetrics.e2ELatencySensor(name, taskId2, str, streamsMetricsImpl));
        }
        Iterator<SourceNode<?, ?>> it = processorTopology.sources().iterator();
        while (it.hasNext()) {
            String name2 = it.next().name();
            this.e2eLatencySensors.put(name2, ProcessorNodeMetrics.e2ELatencySensor(name, taskId2, name2, streamsMetricsImpl));
        }
        this.streamTimePunctuationQueue = new PunctuationQueue();
        this.systemTimePunctuationQueue = new PunctuationQueue();
        this.maxBufferedSize = taskConfig.maxBufferedSize;
        this.consumedOffsets = new HashMap();
        this.resetOffsetsForPartitions = new HashSet();
        this.partitionsToResume = new HashSet();
        this.recordQueueCreator = new RecordQueueCreator(this, this.logContext, taskConfig.timestampExtractor, taskConfig.deserializationExceptionHandler, null);
        this.recordInfo = new AbstractPartitionGroup.RecordInfo();
        Sensor enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(name, taskId2, streamsMetricsImpl, new Sensor[0]);
        long j = taskConfig.maxTaskIdleMs;
        if (z) {
            Map<TopicPartition, RecordQueue> createPartitionQueues = createPartitionQueues();
            consumer.getClass();
            this.partitionGroup = new SynchronizedPartitionGroup(new PartitionGroup(logContext, createPartitionQueues, consumer::currentLag, TaskMetrics.recordLatenessSensor(name, taskId2, streamsMetricsImpl), enforcedProcessingSensor, j));
        } else {
            Map<TopicPartition, RecordQueue> createPartitionQueues2 = createPartitionQueues();
            consumer.getClass();
            this.partitionGroup = new PartitionGroup(logContext, createPartitionQueues2, consumer::currentLag, TaskMetrics.recordLatenessSensor(name, taskId2, streamsMetricsImpl), enforcedProcessingSensor, j);
        }
        processorStateManager.registerGlobalStateStores(processorTopology.globalStateStores());
        this.committedOffsets = new HashMap();
        this.highWatermark = new HashMap();
        for (TopicPartition topicPartition : set) {
            this.committedOffsets.put(topicPartition, -1L);
            this.highWatermark.put(topicPartition, -1L);
        }
        this.timeCurrentIdlingStarted = Optional.empty();
        this.processingExceptionHandler = taskConfig.processingExceptionHandler;
    }

    private Map<TopicPartition, RecordQueue> createPartitionQueues() {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : inputPartitions()) {
            hashMap.put(topicPartition, this.recordQueueCreator.createQueue(topicPartition));
        }
        return hashMap;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean isActive() {
        return true;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void recordRestoration(Time time, long j, boolean z) {
        if (z) {
            StreamsMetricsImpl.maybeRecordSensor(j, time, this.restoreRemainingSensor);
        } else {
            StreamsMetricsImpl.maybeRecordSensor(j, time, this.restoreSensor);
            StreamsMetricsImpl.maybeRecordSensor((-1) * j, time, this.restoreRemainingSensor);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void initializeIfNeeded() {
        if (state() == Task.State.CREATED) {
            this.recordCollector.initialize();
            StateManagerUtil.registerStateStores(this.log, this.logPrefix, this.topology, this.stateMgr, this.stateDirectory, this.processorContext);
            this.offsetSnapshotSinceLastFlush = Collections.emptyMap();
            transitionTo(Task.State.RESTORING);
            this.log.info("Initialized");
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void addPartitionsForOffsetReset(Set<TopicPartition> set) {
        this.mainConsumer.pause(set);
        this.resetOffsetsForPartitions.addAll(set);
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void completeRestoration(java.util.function.Consumer<Set<TopicPartition>> consumer) {
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[state().ordinal()]) {
            case 1:
                return;
            case 2:
                resetOffsetsIfNeededAndInitializeMetadata(consumer);
                initializeTopology();
                this.processorContext.initialize();
                if (!this.eosEnabled) {
                    maybeCheckpoint(true);
                }
                transitionTo(Task.State.RUNNING);
                this.log.info("Restored and ready to run");
                return;
            case StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION /* 3 */:
            case RackAwareTaskAssignor.STANDBY_OPTIMIZER_MAX_ITERATION /* 4 */:
            case StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE /* 5 */:
                throw new IllegalStateException("Illegal state " + state() + " while completing restoration for active task " + this.id);
            default:
                throw new IllegalStateException("Unknown state " + state() + " while completing restoration for active task " + this.id);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void suspend() {
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[state().ordinal()]) {
            case 1:
                try {
                    closeTopology();
                    this.partitionGroup.clear();
                    return;
                } finally {
                    transitToSuspend();
                }
            case 2:
                return;
            case StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION /* 3 */:
                return;
            case RackAwareTaskAssignor.STANDBY_OPTIMIZER_MAX_ITERATION /* 4 */:
                this.log.info("Skip suspending since state is {}", state());
                return;
            case StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE /* 5 */:
                throw new IllegalStateException("Illegal state " + state() + " while suspending active task " + this.id);
            default:
                throw new IllegalStateException("Unknown state " + state() + " while suspending active task " + this.id);
        }
    }

    private void closeTopology() {
        this.log.trace("Closing processor topology");
        RuntimeException runtimeException = null;
        for (ProcessorNode<?, ?, ?, ?> processorNode : this.topology.processors()) {
            this.processorContext.setCurrentNode(processorNode);
            try {
                processorNode.close();
                this.processorContext.setCurrentNode(null);
            } catch (RuntimeException e) {
                runtimeException = e;
                this.processorContext.setCurrentNode(null);
            } catch (Throwable th) {
                this.processorContext.setCurrentNode(null);
                throw th;
            }
        }
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void resume() {
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[state().ordinal()]) {
            case 1:
            case 2:
            case StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION /* 3 */:
                this.log.trace("Skip resuming since state is {}", state());
                break;
            case RackAwareTaskAssignor.STANDBY_OPTIMIZER_MAX_ITERATION /* 4 */:
                try {
                    this.stateMgr.deleteCheckPointFileIfEOSEnabled();
                    this.log.debug("Deleted check point file upon resuming with EOS enabled");
                } catch (IOException e) {
                    this.log.error("Encountered error while deleting the checkpoint file due to this exception", e);
                }
                transitionTo(Task.State.RESTORING);
                this.log.info("Resumed to restoring state");
                break;
            case StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE /* 5 */:
                throw new IllegalStateException("Illegal state " + state() + " while resuming active task " + this.id);
            default:
                throw new IllegalStateException("Unknown state " + state() + " while resuming active task " + this.id);
        }
        this.timeCurrentIdlingStarted = Optional.empty();
    }

    public void flush() {
        this.stateMgr.flushCache();
        this.recordCollector.flush();
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[state().ordinal()]) {
            case 1:
            case 2:
            case StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION /* 3 */:
            case RackAwareTaskAssignor.STANDBY_OPTIMIZER_MAX_ITERATION /* 4 */:
                if (!this.commitNeeded) {
                    this.log.debug("Skipped preparing {} task for commit since there is nothing to commit", state());
                    return Collections.emptyMap();
                }
                flush();
                this.hasPendingTxCommit = this.eosEnabled;
                this.log.debug("Prepared {} task for committing", state());
                return committableOffsetsAndMetadata();
            case StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE /* 5 */:
                throw new IllegalStateException("Illegal state " + state() + " while preparing active task " + this.id + " for committing");
            default:
                throw new IllegalStateException("Unknown state " + state() + " while preparing active task " + this.id + " for committing");
        }
    }

    private Long findOffset(TopicPartition topicPartition) {
        Long headRecordOffset = this.partitionGroup.headRecordOffset(topicPartition);
        if (headRecordOffset == null) {
            try {
                headRecordOffset = Long.valueOf(this.mainConsumer.position(topicPartition));
            } catch (TimeoutException e) {
                throw new IllegalStateException((Throwable) e);
            } catch (KafkaException e2) {
                throw new StreamsException((Throwable) e2);
            }
        }
        return headRecordOffset;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v33, types: [java.util.Map] */
    private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
        HashMap hashMap;
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[state().ordinal()]) {
            case 1:
            case RackAwareTaskAssignor.STANDBY_OPTIMIZER_MAX_ITERATION /* 4 */:
                Map<TopicPartition, Long> extractPartitionTimes = extractPartitionTimes();
                Set<TopicPartition> inputPartitions = this.processorContext.getProcessorMetadata().needsCommit() ? inputPartitions() : this.consumedOffsets.keySet();
                hashMap = new HashMap(inputPartitions.size());
                for (TopicPartition topicPartition : inputPartitions) {
                    hashMap.put(topicPartition, new OffsetAndMetadata(findOffset(topicPartition).longValue(), new TopicPartitionMetadata(extractPartitionTimes.get(topicPartition).longValue(), this.processorContext.getProcessorMetadata()).encode()));
                }
                break;
            case 2:
            case StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION /* 3 */:
                hashMap = Collections.emptyMap();
                break;
            case StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE /* 5 */:
                throw new IllegalStateException("Illegal state " + state() + " while getting committable offsets for active task " + this.id);
            default:
                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + this.id);
        }
        return hashMap;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void postCommit(boolean z) {
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[state().ordinal()]) {
            case 1:
                if (z || !this.eosEnabled) {
                    maybeCheckpoint(z);
                }
                this.log.debug("Finalized commit for {} task with eos {} enforce checkpoint {}", new Object[]{state(), Boolean.valueOf(this.eosEnabled), Boolean.valueOf(z)});
                break;
            case 2:
            case RackAwareTaskAssignor.STANDBY_OPTIMIZER_MAX_ITERATION /* 4 */:
                maybeCheckpoint(z);
                this.log.debug("Finalized commit for {} task with enforce checkpoint {}", state(), Boolean.valueOf(z));
                break;
            case StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION /* 3 */:
                this.log.debug("Skipped writing checkpoint for {} task", state());
                break;
            case StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE /* 5 */:
                throw new IllegalStateException("Illegal state " + state() + " while post committing active task " + this.id);
            default:
                throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + this.id);
        }
        clearCommitStatuses();
    }

    private void clearCommitStatuses() {
        this.commitNeeded = false;
        this.commitRequested = false;
        this.hasPendingTxCommit = false;
        this.processorContext.getProcessorMetadata().setNeedsCommit(false);
    }

    private Map<TopicPartition, Long> extractPartitionTimes() {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : this.partitionGroup.partitions()) {
            hashMap.put(topicPartition, Long.valueOf(this.partitionGroup.partitionTimestamp(topicPartition)));
        }
        return hashMap;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void closeClean() {
        validateClean();
        removeAllSensors();
        clearCommitStatuses();
        close(true);
        this.log.info("Closed clean");
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void closeDirty() {
        removeAllSensors();
        clearCommitStatuses();
        close(false);
        this.log.info("Closed dirty");
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractTask, org.apache.kafka.streams.processor.internals.Task
    public void updateInputPartitions(Set<TopicPartition> set, Map<String, List<String>> map) {
        super.updateInputPartitions(set, map);
        AbstractPartitionGroup abstractPartitionGroup = this.partitionGroup;
        RecordQueueCreator recordQueueCreator = this.recordQueueCreator;
        recordQueueCreator.getClass();
        abstractPartitionGroup.updatePartitions(set, recordQueueCreator::createQueue);
        this.processorContext.getProcessorMetadata().setNeedsCommit(true);
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void prepareRecycle() {
        validateClean();
        removeAllSensors();
        clearCommitStatuses();
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[state().ordinal()]) {
            case 1:
            case 2:
            case StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION /* 3 */:
            case StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE /* 5 */:
                throw new IllegalStateException("Illegal state " + state() + " while recycling active task " + this.id);
            case RackAwareTaskAssignor.STANDBY_OPTIMIZER_MAX_ITERATION /* 4 */:
                this.stateMgr.recycle();
                this.partitionGroup.close();
                this.recordCollector.closeClean();
                this.closeTaskSensor.record();
                transitionTo(Task.State.CLOSED);
                this.log.info("Closed and recycled state");
                return;
            default:
                throw new IllegalStateException("Unknown state " + state() + " while recycling active task " + this.id);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void resumePollingForPartitionsWithAvailableSpace() {
        if (this.partitionsToResume.isEmpty()) {
            return;
        }
        this.mainConsumer.resume(this.partitionsToResume);
        this.partitionsToResume.clear();
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void updateLags() {
        if (state() == Task.State.RUNNING) {
            this.partitionGroup.updateLags();
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.AbstractTask, org.apache.kafka.streams.processor.internals.Task
    public void maybeCheckpoint(boolean z) {
        if (this.commitNeeded || z) {
            this.stateMgr.updateChangelogOffsets(checkpointableOffsets());
        }
        super.maybeCheckpoint(z);
    }

    private void validateClean() {
        if (this.commitNeeded) {
            this.log.debug("Tried to close clean but there was pending uncommitted data, this means we failed to commit and should close as dirty instead");
            throw new TaskMigratedException("Tried to close dirty task as clean");
        }
    }

    private void removeAllSensors() {
        this.streamsMetrics.removeAllTaskLevelSensors(Thread.currentThread().getName(), this.id.toString());
        Iterator<String> it = this.e2eLatencySensors.keySet().iterator();
        while (it.hasNext()) {
            this.streamsMetrics.removeAllNodeLevelSensors(Thread.currentThread().getName(), this.id.toString(), it.next());
        }
    }

    private void close(boolean z) {
        Runnable runnable;
        switch (AnonymousClass1.$SwitchMap$org$apache$kafka$streams$processor$internals$Task$State[state().ordinal()]) {
            case 1:
            case 2:
            case StreamsAssignmentProtocolVersions.EARLIEST_PROBEABLE_VERSION /* 3 */:
                throw new IllegalStateException("Illegal state " + state() + " while closing active task " + this.id);
            case RackAwareTaskAssignor.STANDBY_OPTIMIZER_MAX_ITERATION /* 4 */:
                AbstractPartitionGroup abstractPartitionGroup = this.partitionGroup;
                abstractPartitionGroup.getClass();
                TaskManager.executeAndMaybeSwallow(z, abstractPartitionGroup::close, "partition group close", this.log);
                TaskManager.executeAndMaybeSwallow(z, () -> {
                    StateManagerUtil.closeStateManager(this.log, this.logPrefix, z, this.eosEnabled, this.stateMgr, this.stateDirectory, Task.TaskType.ACTIVE);
                }, "state manager close", this.log);
                if (z) {
                    RecordCollector recordCollector = this.recordCollector;
                    recordCollector.getClass();
                    runnable = recordCollector::closeClean;
                } else {
                    RecordCollector recordCollector2 = this.recordCollector;
                    recordCollector2.getClass();
                    runnable = recordCollector2::closeDirty;
                }
                TaskManager.executeAndMaybeSwallow(z, runnable, "record collector close", this.log);
                this.record = null;
                this.closeTaskSensor.record();
                this.partitionsToResume.clear();
                transitionTo(Task.State.CLOSED);
                return;
            case StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE /* 5 */:
                this.log.trace("Skip closing since state is {}", state());
                return;
            default:
                throw new IllegalStateException("Unknown state " + state() + " while closing active task " + this.id);
        }
    }

    public boolean isProcessable(long j) {
        if (state() == Task.State.CLOSED) {
            this.log.info("Stream task {} is already in {} state, skip processing it.", id(), state());
            return false;
        }
        if (this.hasPendingTxCommit) {
            return false;
        }
        boolean readyToProcess = this.partitionGroup.readyToProcess(j);
        if (readyToProcess) {
            this.timeCurrentIdlingStarted = Optional.empty();
        } else if (!this.timeCurrentIdlingStarted.isPresent()) {
            this.timeCurrentIdlingStarted = Optional.of(Long.valueOf(j));
        }
        return readyToProcess;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean process(long j) {
        if (this.record == null) {
            if (!isProcessable(j)) {
                return false;
            }
            this.record = this.partitionGroup.nextRecord(this.recordInfo, j);
            if (this.record == null) {
                return false;
            }
        }
        try {
            try {
                try {
                    try {
                        TopicPartition partition = this.recordInfo.partition();
                        if (!(this.record instanceof CorruptedRecord)) {
                            doProcess(j);
                        }
                        this.consumedOffsets.put(partition, Long.valueOf(this.record.offset()));
                        this.commitNeeded = true;
                        if (this.recordInfo.queue().size() <= this.maxBufferedSize) {
                            this.partitionsToResume.add(partition);
                        }
                        this.record = null;
                        this.processorContext.setCurrentNode(null);
                        return true;
                    } catch (RuntimeException e) {
                        handleException(e);
                        this.processorContext.setCurrentNode(null);
                        return true;
                    }
                } catch (FailedProcessingException e2) {
                    handleException(e2.getMessage(), e2.getCause());
                    this.processorContext.setCurrentNode(null);
                    return true;
                }
            } catch (StreamsException e3) {
                this.record = null;
                throw e3;
            } catch (TimeoutException e4) {
                if (!this.eosEnabled) {
                    throw e4;
                }
                this.record = null;
                throw new TaskCorruptedException(Collections.singleton(this.id));
            }
        } catch (Throwable th) {
            this.processorContext.setCurrentNode(null);
            throw th;
        }
    }

    private void handleException(Throwable th) {
        handleException(String.format("Exception caught in process. taskId=%s, processor=%s, topic=%s, partition=%d, offset=%d", id(), this.processorContext.currentNode().name(), this.record.topic(), Integer.valueOf(this.record.partition()), Long.valueOf(this.record.offset())), th);
    }

    private void handleException(String str, Throwable th) {
        if (str == null) {
            handleException(th);
        }
        KafkaException streamsException = new StreamsException(str, th);
        this.record = null;
        throw streamsException;
    }

    private void doProcess(long j) {
        ProcessorNode<?, ?, ?, ?> node = this.recordInfo.node();
        this.log.trace("Start processing one record [{}]", this.record);
        updateProcessorContext(node, j, new ProcessorRecordContext(this.record.timestamp, this.record.offset(), this.record.partition(), this.record.topic(), this.record.headers()));
        maybeRecordE2ELatency(this.record.timestamp, j, node.name());
        Record record = new Record(this.record.key(), this.record.value(), this.processorContext.timestamp(), this.processorContext.headers());
        StreamsMetricsImpl.maybeMeasureLatency(() -> {
            node.process(record);
        }, this.time, this.processLatencySensor);
        this.log.trace("Completed processing one record [{}]", this.record);
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void recordProcessBatchTime(long j) {
        this.processTimeMs += j;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void recordProcessTimeRatioAndBufferSize(long j, long j2) {
        this.bufferedRecordsSensor.record(this.partitionGroup.numBuffered());
        this.processRatioSensor.record(this.processTimeMs / j, j2);
        this.processTimeMs = 0L;
    }

    @Override // org.apache.kafka.streams.processor.internals.ProcessorNodePunctuator
    public void punctuate(ProcessorNode<?, ?, ?, ?> processorNode, long j, PunctuationType punctuationType, Punctuator punctuator) {
        if (this.processorContext.currentNode() != null) {
            throw new IllegalStateException(String.format("%sCurrent node is not null", this.logPrefix));
        }
        ProcessorRecordContext processorRecordContext = new ProcessorRecordContext(j, -1L, -1, null, new RecordHeaders());
        updateProcessorContext(processorNode, this.time.milliseconds(), processorRecordContext);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Punctuating processor {} with timestamp {} and punctuation type {}", new Object[]{processorNode.name(), Long.valueOf(j), punctuationType});
        }
        try {
            try {
                try {
                    try {
                        StreamsMetricsImpl.maybeMeasureLatency(() -> {
                            punctuator.punctuate(j);
                        }, this.time, this.punctuateLatencySensor);
                        this.processorContext.setCurrentNode(null);
                    } catch (FailedProcessingException e) {
                        throw createStreamsException(processorNode.name(), e.getCause());
                    }
                } catch (TaskCorruptedException | TaskMigratedException e2) {
                    throw e2;
                }
            } catch (Exception e3) {
                DefaultErrorHandlerContext defaultErrorHandlerContext = new DefaultErrorHandlerContext(null, processorRecordContext.topic(), processorRecordContext.partition(), processorRecordContext.offset(), processorRecordContext.headers(), processorNode.name(), id(), processorRecordContext.timestamp());
                try {
                    if (((ProcessingExceptionHandler.ProcessingHandlerResponse) Objects.requireNonNull(this.processingExceptionHandler.handle(defaultErrorHandlerContext, null, e3), "Invalid ProcessingExceptionHandler response.")) == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
                        this.log.error("Processing exception handler is set to fail upon a processing error. If you would rather have the streaming pipeline continue after a processing error, please set the processing.exception.handler appropriately.");
                        throw createStreamsException(processorNode.name(), e3);
                    }
                    this.droppedRecordsSensor.record();
                    this.processorContext.setCurrentNode(null);
                } catch (Exception e4) {
                    this.log.error("Processing error callback failed after processing error for record: {}", defaultErrorHandlerContext, e3);
                    throw new FailedProcessingException("Fatal user code error in processing error callback", e4);
                }
            } catch (TimeoutException e5) {
                if (!this.eosEnabled) {
                    throw e5;
                }
                this.record = null;
                throw new TaskCorruptedException(Collections.singleton(this.id));
            }
        } catch (Throwable th) {
            this.processorContext.setCurrentNode(null);
            throw th;
        }
    }

    private StreamsException createStreamsException(String str, Throwable th) {
        return new StreamsException(String.format("%sException caught while punctuating processor '%s'", this.logPrefix, str), th);
    }

    private void updateProcessorContext(ProcessorNode<?, ?, ?, ?> processorNode, long j, ProcessorRecordContext processorRecordContext) {
        this.processorContext.setRecordContext(processorRecordContext);
        this.processorContext.setCurrentNode(processorNode);
        this.processorContext.setSystemTimeMs(j);
    }

    private Map<TopicPartition, Long> checkpointableOffsets() {
        HashMap hashMap = new HashMap(this.recordCollector.offsets());
        for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
            hashMap.putIfAbsent(entry.getKey(), entry.getValue());
        }
        this.log.debug("Checkpointable offsets {}", hashMap);
        return hashMap;
    }

    private void resetOffsetsIfNeededAndInitializeMetadata(java.util.function.Consumer<Set<TopicPartition>> consumer) {
        OffsetAndMetadata offsetAndMetadata;
        try {
            Map committed = this.mainConsumer.committed(inputPartitions());
            for (Map.Entry entry : committed.entrySet()) {
                if (this.resetOffsetsForPartitions.contains(entry.getKey()) && (offsetAndMetadata = (OffsetAndMetadata) entry.getValue()) != null) {
                    this.mainConsumer.seek((TopicPartition) entry.getKey(), offsetAndMetadata);
                    this.resetOffsetsForPartitions.remove(entry.getKey());
                }
            }
            consumer.accept(this.resetOffsetsForPartitions);
            this.resetOffsetsForPartitions.clear();
            initializeTaskTimeAndProcessorMetadata((Map) committed.entrySet().stream().filter(entry2 -> {
                return entry2.getValue() != null;
            }).collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            })));
        } catch (TimeoutException e) {
            this.log.warn("Encountered {} while trying to fetch committed offsets, will retry initializing the metadata in the next loop.\nConsider overwriting consumer config {} to a larger value to avoid timeout errors", this.time.toString(), "default.api.timeout.ms");
            throw e;
        } catch (KafkaException e2) {
            throw new StreamsException(String.format("task [%s] Failed to initialize offsets for %s", this.id, inputPartitions()), (Throwable) e2);
        }
    }

    private void initializeTaskTimeAndProcessorMetadata(Map<TopicPartition, OffsetAndMetadata> map) {
        ProcessorMetadata processorMetadata = new ProcessorMetadata();
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            OffsetAndMetadata value = entry.getValue();
            if (value != null) {
                TopicPartitionMetadata decode = TopicPartitionMetadata.decode(value.metadata());
                long partitionTime = decode.partitionTime();
                this.partitionGroup.setPartitionTime(key, partitionTime);
                this.log.debug("A committed timestamp was detected: setting the partition time of partition {} to {} in stream task {}", new Object[]{key, Long.valueOf(partitionTime), this.id});
                processorMetadata.update(decode.processorMetadata());
            } else {
                this.log.debug("No committed timestamp was found in metadata for partition {}", key);
            }
        }
        this.processorContext.setProcessorMetadata(processorMetadata);
        HashSet hashSet = new HashSet(inputPartitions());
        hashSet.removeAll(map.keySet());
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            this.log.debug("No committed offset for partition {}, therefore no timestamp can be found for this partition", (TopicPartition) it.next());
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Map<TopicPartition, Long> purgeableOffsets() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : this.committedOffsets.entrySet()) {
            TopicPartition key = entry.getKey();
            if (this.topology.isRepartitionTopic(key.topic()) && entry.getValue().longValue() > -1) {
                hashMap.put(key, entry.getValue());
            }
        }
        return hashMap;
    }

    private void initializeTopology() {
        this.log.trace("Initializing processor nodes of the topology");
        for (ProcessorNode<?, ?, ?, ?> processorNode : this.topology.processors()) {
            this.processorContext.setCurrentNode(processorNode);
            try {
                processorNode.init(this.processorContext, this.processingExceptionHandler);
            } finally {
                this.processorContext.setCurrentNode(null);
            }
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public void addRecords(TopicPartition topicPartition, Iterable<ConsumerRecord<byte[], byte[]>> iterable) {
        int addRawRecords = this.partitionGroup.addRawRecords(topicPartition, iterable);
        if (this.log.isTraceEnabled()) {
            this.log.trace("Added records into the buffered queue of partition {}, new queue size is {}", topicPartition, Integer.valueOf(addRawRecords));
        }
        if (addRawRecords > this.maxBufferedSize) {
            this.mainConsumer.pause(Collections.singleton(topicPartition));
        }
    }

    public Cancellable schedule(long j, PunctuationType punctuationType, Punctuator punctuator) {
        switch (punctuationType) {
            case STREAM_TIME:
                return schedule(0L, j, punctuationType, punctuator);
            case WALL_CLOCK_TIME:
                return schedule(this.time.milliseconds() + j, j, punctuationType, punctuator);
            default:
                throw new IllegalArgumentException("Unrecognized PunctuationType: " + punctuationType);
        }
    }

    private Cancellable schedule(long j, long j2, PunctuationType punctuationType, Punctuator punctuator) {
        if (this.processorContext.currentNode() == null) {
            throw new IllegalStateException(String.format("%sCurrent node is null", this.logPrefix));
        }
        PunctuationSchedule punctuationSchedule = new PunctuationSchedule(this.processorContext.currentNode(), j, j2, punctuator);
        switch (punctuationType) {
            case STREAM_TIME:
                return this.streamTimePunctuationQueue.schedule(punctuationSchedule);
            case WALL_CLOCK_TIME:
                return this.systemTimePunctuationQueue.schedule(punctuationSchedule);
            default:
                throw new IllegalArgumentException("Unrecognized PunctuationType: " + punctuationType);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean maybePunctuateStreamTime() {
        long streamTime = this.partitionGroup.streamTime();
        if (streamTime == -1) {
            return false;
        }
        boolean maybePunctuate = this.streamTimePunctuationQueue.maybePunctuate(streamTime, PunctuationType.STREAM_TIME, this);
        if (maybePunctuate) {
            this.commitNeeded = true;
        }
        return maybePunctuate;
    }

    public boolean canPunctuateStreamTime() {
        return this.streamTimePunctuationQueue.canPunctuate(this.partitionGroup.streamTime());
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean maybePunctuateSystemTime() {
        boolean maybePunctuate = this.systemTimePunctuationQueue.maybePunctuate(this.time.milliseconds(), PunctuationType.WALL_CLOCK_TIME, this);
        if (maybePunctuate) {
            this.commitNeeded = true;
        }
        return maybePunctuate;
    }

    public boolean canPunctuateSystemTime() {
        return this.systemTimePunctuationQueue.canPunctuate(this.time.milliseconds());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybeRecordE2ELatency(long j, long j2, String str) {
        Sensor sensor = this.e2eLatencySensors.get(str);
        if (sensor == null) {
            throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + str);
        }
        if (sensor.shouldRecord() && sensor.hasMetrics()) {
            sensor.record(j2 - j, j2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void requestCommit() {
        this.commitRequested = true;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean commitRequested() {
        return this.commitRequested;
    }

    public InternalProcessorContext processorContext() {
        return this.processorContext;
    }

    public String toString() {
        return toString("");
    }

    public String toString(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append(str);
        sb.append("TaskId: ");
        sb.append(this.id);
        sb.append("\n");
        if (this.topology != null) {
            sb.append(str).append(this.topology.toString(str + "\t"));
        }
        Set<TopicPartition> inputPartitions = inputPartitions();
        if (inputPartitions != null && !inputPartitions.isEmpty()) {
            sb.append(str).append("Partitions [");
            Iterator<TopicPartition> it = inputPartitions.iterator();
            while (it.hasNext()) {
                sb.append(it.next()).append(", ");
            }
            sb.setLength(sb.length() - 2);
            sb.append("]\n");
        }
        return sb.toString();
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public boolean commitNeeded() {
        if (this.commitNeeded) {
            return true;
        }
        for (Map.Entry<TopicPartition, Long> entry : this.consumedOffsets.entrySet()) {
            TopicPartition key = entry.getKey();
            try {
                long position = this.mainConsumer.position(key);
                if (position > entry.getValue().longValue() + 1) {
                    this.commitNeeded = true;
                    entry.setValue(Long.valueOf(position - 1));
                }
            } catch (TimeoutException e) {
                this.log.debug(String.format("Could not get consumer position for partition %s", key), e);
            } catch (KafkaException e2) {
                throw new StreamsException((Throwable) e2);
            }
        }
        return this.commitNeeded;
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Map<TopicPartition, Long> changelogOffsets() {
        return state() == Task.State.RUNNING ? (Map) changelogPartitions().stream().collect(Collectors.toMap(Function.identity(), topicPartition -> {
            return -2L;
        })) : Collections.unmodifiableMap(this.stateMgr.changelogOffsets());
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Map<TopicPartition, Long> committedOffsets() {
        return Collections.unmodifiableMap(this.committedOffsets);
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Map<TopicPartition, Long> highWaterMark() {
        return Collections.unmodifiableMap(this.highWatermark);
    }

    private void transitToSuspend() {
        this.log.info("Suspended from {}", state());
        transitionTo(Task.State.SUSPENDED);
        this.timeCurrentIdlingStarted = Optional.of(Long.valueOf(System.currentTimeMillis()));
    }

    @Override // org.apache.kafka.streams.processor.internals.Task
    public Optional<Long> timeCurrentIdlingStarted() {
        return this.timeCurrentIdlingStarted;
    }

    public void updateCommittedOffsets(TopicPartition topicPartition, Long l) {
        this.committedOffsets.put(topicPartition, l);
    }

    public void updateEndOffsets(TopicPartition topicPartition, Long l) {
        this.highWatermark.put(topicPartition, l);
    }

    public boolean hasRecordsQueued() {
        return numBuffered() > 0;
    }

    RecordCollector recordCollector() {
        return this.recordCollector;
    }

    int numBuffered() {
        return this.partitionGroup.numBuffered();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long streamTime() {
        return this.partitionGroup.streamTime();
    }
}
