package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException: Cannot invoke "java.util.List.forEach(java.util.function.Consumer)" because "blocks" is null
    	at jadx.core.utils.BlockUtils.collectAllInsns(BlockUtils.java:1017)
    	at jadx.core.dex.visitors.ClassModifier.removeBridgeMethod(ClassModifier.java:239)
    	at jadx.core.dex.visitors.ClassModifier.removeSyntheticMethods(ClassModifier.java:154)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.ClassModifier.visit(ClassModifier.java:64)
    */
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread.class */
public class StreamThread extends Thread {
    private static final Logger log = LoggerFactory.getLogger(StreamThread.class);
    private static final AtomicInteger STREAM_THREAD_ID_SEQUENCE = new AtomicInteger(1);
    private volatile State state;
    private StateListener stateListener;
    final PartitionGrouper partitionGrouper;
    private final StreamsMetadataState streamsMetadataState;
    public final String applicationId;
    public final String clientId;
    public final UUID processId;
    protected final StreamsConfig config;
    protected final TopologyBuilder builder;
    Producer<byte[], byte[]> threadProducer;
    private final KafkaClientSupplier clientSupplier;
    protected final Consumer<byte[], byte[]> consumer;
    final Consumer<byte[], byte[]> restoreConsumer;
    private final String logPrefix;
    private final String threadClientId;
    private final Pattern sourceTopicPattern;
    private final Map<TaskId, StreamTask> activeTasks;
    private final Map<TaskId, StandbyTask> standbyTasks;
    private final Map<TopicPartition, StreamTask> activeTasksByPartition;
    private final Map<TopicPartition, StandbyTask> standbyTasksByPartition;
    private final Set<TaskId> prevActiveTasks;
    private final Map<TaskId, StreamTask> suspendedTasks;
    private final Map<TaskId, StandbyTask> suspendedStandbyTasks;
    private final Time time;
    private final int rebalanceTimeoutMs;
    private final long pollTimeMs;
    private final long cleanTimeMs;
    private final long commitTimeMs;
    private final StreamsMetricsThreadImpl streamsMetrics;
    final StateDirectory stateDirectory;
    private String originalReset;
    private StreamPartitionAssignor partitionAssignor;
    private long timerStartedMs;
    private long lastCleanMs;
    private long lastCommitMs;
    private Throwable rebalanceException;
    private final boolean eosEnabled;
    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
    private boolean processStandbyRecords;
    private final ThreadCache cache;
    private StoreChangelogReader storeChangelogReader;
    private final TaskCreator taskCreator;
    final ConsumerRebalanceListener rebalanceListener;
    private static final int UNLIMITED_RECORDS = -1;

    /* renamed from: org.apache.kafka.streams.processor.internals.StreamThread$1 */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$1.class */
    public class AnonymousClass1 implements StreamTaskAction {
        private String name;

        AnonymousClass1() {
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.StreamTaskAction
        public String name() {
            return this.name;
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.StreamTaskAction
        public void apply(StreamTask streamTask) {
            this.name = "punctuate";
            StreamThread.this.maybePunctuate(streamTask);
            if (streamTask.commitNeeded()) {
                this.name = "commit";
                long milliseconds = StreamThread.this.time.milliseconds();
                StreamThread.this.commitOne(streamTask);
                if (StreamThread.log.isDebugEnabled()) {
                    StreamThread.log.debug("{} Committed active task {} per user request in {}ms", new Object[]{StreamThread.this.logPrefix, streamTask.id(), Long.valueOf(StreamThread.this.timerStartedMs - milliseconds)});
                }
            }
        }
    }

    /* renamed from: org.apache.kafka.streams.processor.internals.StreamThread$2 */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$2.class */
    public class AnonymousClass2 implements StreamTaskAction {
        AnonymousClass2() {
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.StreamTaskAction
        public String name() {
            return "commit";
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.StreamTaskAction
        public void apply(StreamTask streamTask) {
            StreamThread.this.commitOne(streamTask);
        }
    }

    /* renamed from: org.apache.kafka.streams.processor.internals.StreamThread$3 */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$3.class */
    public class AnonymousClass3 implements StreamTaskAction {
        AnonymousClass3() {
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.StreamTaskAction
        public String name() {
            return "suspend";
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.StreamTaskAction
        public void apply(StreamTask streamTask) {
            try {
                streamTask.suspend();
            } catch (Exception e) {
                try {
                    streamTask.close(false);
                } catch (Exception e2) {
                    StreamThread.log.error("{} Closing task {} failed: ", new Object[]{StreamThread.this.logPrefix, streamTask.id, e2});
                }
                throw e;
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$AbstractTaskCreator.class */
    public abstract class AbstractTaskCreator {
        static final long MAX_BACKOFF_TIME_MS = 1000;

        AbstractTaskCreator() {
        }

        void retryWithBackoff(Map<TaskId, Set<TopicPartition>> map, long j) {
            long j2 = 50;
            HashSet hashSet = new HashSet();
            while (true) {
                Iterator<Map.Entry<TaskId, Set<TopicPartition>>> it = map.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<TaskId, Set<TopicPartition>> next = it.next();
                    TaskId key = next.getKey();
                    try {
                        createTask(key, next.getValue());
                        it.remove();
                        j2 = 50;
                        hashSet.remove(key);
                    } catch (LockException e) {
                        if (!hashSet.contains(key)) {
                            StreamThread.log.warn("{} Could not create task {}. Will retry: ", new Object[]{StreamThread.this.logPrefix, key, e});
                            hashSet.add(key);
                        }
                    }
                }
                if (map.isEmpty() || StreamThread.this.time.milliseconds() - j > StreamThread.this.rebalanceTimeoutMs) {
                    return;
                }
                try {
                    Thread.sleep(j2);
                    j2 = Math.min(j2 << 1, MAX_BACKOFF_TIME_MS);
                } catch (InterruptedException e2) {
                }
            }
        }

        abstract void createTask(TaskId taskId, Set<TopicPartition> set);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$RebalanceListener.class */
    public class RebalanceListener implements ConsumerRebalanceListener {
        private final Time time;
        private final int requestTimeOut;

        RebalanceListener(Time time, int i) {
            this.time = time;
            this.requestTimeOut = i;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            StreamThread.log.debug("{} at state {}: new partitions {} assigned at the end of consumer rebalance.\n\tassigned active tasks: {}\n\tassigned standby tasks: {}\n\tcurrent suspended active tasks: {}\n\tcurrent suspended standby tasks: {}\n\tprevious active tasks: {}", new Object[]{StreamThread.this.logPrefix, StreamThread.this.state, collection, StreamThread.this.partitionAssignor.activeTasks().keySet(), StreamThread.this.partitionAssignor.standbyTasks().keySet(), StreamThread.this.suspendedTasks.keySet(), StreamThread.this.suspendedStandbyTasks.keySet(), StreamThread.this.prevActiveTasks});
            long milliseconds = this.time.milliseconds();
            try {
                try {
                    StreamThread.this.storeChangelogReader = new StoreChangelogReader(StreamThread.this.getName(), StreamThread.this.restoreConsumer, this.time, this.requestTimeOut);
                    StreamThread.this.setStateWhenNotInPendingShutdown(State.ASSIGNING_PARTITIONS);
                    StreamThread.this.closeNonAssignedSuspendedStandbyTasks();
                    StreamThread.this.closeNonAssignedSuspendedTasks();
                    StreamThread.this.addStreamTasks(collection, milliseconds);
                    StreamThread.this.storeChangelogReader.restore();
                    StreamThread.this.addStandbyTasks(milliseconds);
                    StreamThread.this.streamsMetadataState.onChange(StreamThread.this.partitionAssignor.getPartitionsByHostState(), StreamThread.this.partitionAssignor.clusterMetadata());
                    StreamThread.access$1402(StreamThread.this, this.time.milliseconds());
                    StreamThread.this.setStateWhenNotInPendingShutdown(State.RUNNING);
                    StreamThread.log.info("{} partition assignment took {} ms.\n\tcurrent active tasks: {}\n\tcurrent standby tasks: {}\n\tprevious active tasks: {}\n", new Object[]{StreamThread.this.logPrefix, Long.valueOf(this.time.milliseconds() - milliseconds), StreamThread.this.activeTasks.keySet(), StreamThread.this.standbyTasks.keySet(), StreamThread.this.prevActiveTasks});
                } catch (Throwable th) {
                    StreamThread.this.rebalanceException = th;
                    throw th;
                }
            } catch (Throwable th2) {
                StreamThread.log.info("{} partition assignment took {} ms.\n\tcurrent active tasks: {}\n\tcurrent standby tasks: {}\n\tprevious active tasks: {}\n", new Object[]{StreamThread.this.logPrefix, Long.valueOf(this.time.milliseconds() - milliseconds), StreamThread.this.activeTasks.keySet(), StreamThread.this.standbyTasks.keySet(), StreamThread.this.prevActiveTasks});
                throw th2;
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            StreamThread.log.debug("{} at state {}: partitions {} revoked at the beginning of consumer rebalance.\n\tcurrent assigned active tasks: {}\n\tcurrent assigned standby tasks: {}\n", new Object[]{StreamThread.this.logPrefix, StreamThread.this.state, collection, StreamThread.this.activeTasks.keySet(), StreamThread.this.standbyTasks.keySet()});
            long milliseconds = this.time.milliseconds();
            try {
                try {
                    StreamThread.this.setStateWhenNotInPendingShutdown(State.PARTITIONS_REVOKED);
                    StreamThread.access$1402(StreamThread.this, Long.MAX_VALUE);
                    StreamThread.this.suspendTasksAndState();
                    StreamThread.this.streamsMetadataState.onChange(Collections.emptyMap(), StreamThread.this.partitionAssignor.clusterMetadata());
                    StreamThread.this.removeStreamTasks();
                    StreamThread.this.removeStandbyTasks();
                    StreamThread.log.info("{} partition revocation took {} ms.\n\tsuspended active tasks: {}\n\tsuspended standby tasks: {}", new Object[]{StreamThread.this.logPrefix, Long.valueOf(this.time.milliseconds() - milliseconds), StreamThread.this.suspendedTasks.keySet(), StreamThread.this.suspendedStandbyTasks.keySet()});
                } catch (Throwable th) {
                    StreamThread.this.rebalanceException = th;
                    throw th;
                }
            } catch (Throwable th2) {
                StreamThread.this.streamsMetadataState.onChange(Collections.emptyMap(), StreamThread.this.partitionAssignor.clusterMetadata());
                StreamThread.this.removeStreamTasks();
                StreamThread.this.removeStandbyTasks();
                StreamThread.log.info("{} partition revocation took {} ms.\n\tsuspended active tasks: {}\n\tsuspended standby tasks: {}", new Object[]{StreamThread.this.logPrefix, Long.valueOf(this.time.milliseconds() - milliseconds), StreamThread.this.suspendedTasks.keySet(), StreamThread.this.suspendedStandbyTasks.keySet()});
                throw th2;
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$StandbyTaskCreator.class */
    public class StandbyTaskCreator extends AbstractTaskCreator {
        private final Map<TopicPartition, Long> checkpointedOffsets;

        StandbyTaskCreator(Map<TopicPartition, Long> map) {
            super();
            this.checkpointedOffsets = map;
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskCreator
        void createTask(TaskId taskId, Set<TopicPartition> set) {
            StreamThread.this.updateStandByTaskMaps(this.checkpointedOffsets, taskId, set, StreamThread.this.createStandbyTask(taskId, set));
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$State.class */
    public enum State {
        CREATED(1),
        RUNNING(1, 2, 4),
        PARTITIONS_REVOKED(3, 4),
        ASSIGNING_PARTITIONS(1, 4),
        PENDING_SHUTDOWN(5),
        DEAD(new Integer[0]);

        private final Set<Integer> validTransitions = new HashSet();

        State(Integer... numArr) {
            this.validTransitions.addAll(Arrays.asList(numArr));
        }

        public boolean isRunning() {
            return (equals(PENDING_SHUTDOWN) || equals(CREATED) || equals(DEAD)) ? false : true;
        }

        public boolean isValidTransition(State state) {
            return this.validTransitions.contains(Integer.valueOf(state.ordinal()));
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$StateListener.class */
    public interface StateListener {
        void onChange(StreamThread streamThread, State state, State state2);
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$StreamTaskAction.class */
    public interface StreamTaskAction {
        String name();

        void apply(StreamTask streamTask);
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$StreamsMetricsThreadImpl.class */
    public class StreamsMetricsThreadImpl extends StreamsMetricsImpl {
        final Sensor commitTimeSensor;
        final Sensor pollTimeSensor;
        final Sensor processTimeSensor;
        final Sensor punctuateTimeSensor;
        final Sensor taskCreatedSensor;
        final Sensor tasksClosedSensor;
        final Sensor skippedRecordsSensor;

        StreamsMetricsThreadImpl(Metrics metrics, String str, String str2, Map<String, String> map) {
            super(metrics, str, map);
            this.commitTimeSensor = metrics.sensor(str2 + ".commit-latency", Sensor.RecordingLevel.INFO);
            this.commitTimeSensor.add(metrics.metricName("commit-latency-avg", this.groupName, "The average commit time in ms", this.tags), new Avg());
            this.commitTimeSensor.add(metrics.metricName("commit-latency-max", this.groupName, "The maximum commit time in ms", this.tags), new Max());
            this.commitTimeSensor.add(metrics.metricName("commit-rate", this.groupName, "The average per-second number of commit calls", this.tags), new Rate(new Count()));
            this.pollTimeSensor = metrics.sensor(str2 + ".poll-latency", Sensor.RecordingLevel.INFO);
            this.pollTimeSensor.add(metrics.metricName("poll-latency-avg", this.groupName, "The average poll time in ms", this.tags), new Avg());
            this.pollTimeSensor.add(metrics.metricName("poll-latency-max", this.groupName, "The maximum poll time in ms", this.tags), new Max());
            this.pollTimeSensor.add(metrics.metricName("poll-rate", this.groupName, "The average per-second number of record-poll calls", this.tags), new Rate(new Count()));
            this.processTimeSensor = metrics.sensor(str2 + ".process-latency", Sensor.RecordingLevel.INFO);
            this.processTimeSensor.add(metrics.metricName("process-latency-avg", this.groupName, "The average process time in ms", this.tags), new Avg());
            this.processTimeSensor.add(metrics.metricName("process-latency-max", this.groupName, "The maximum process time in ms", this.tags), new Max());
            this.processTimeSensor.add(metrics.metricName("process-rate", this.groupName, "The average per-second number of process calls", this.tags), new Rate(new Count()));
            this.punctuateTimeSensor = metrics.sensor(str2 + ".punctuate-latency", Sensor.RecordingLevel.INFO);
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-latency-avg", this.groupName, "The average punctuate time in ms", this.tags), new Avg());
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-latency-max", this.groupName, "The maximum punctuate time in ms", this.tags), new Max());
            this.punctuateTimeSensor.add(metrics.metricName("punctuate-rate", this.groupName, "The average per-second number of punctuate calls", this.tags), new Rate(new Count()));
            this.taskCreatedSensor = metrics.sensor(str2 + ".task-created", Sensor.RecordingLevel.INFO);
            this.taskCreatedSensor.add(metrics.metricName("task-created-rate", this.groupName, "The average per-second number of newly created tasks", this.tags), new Rate(new Count()));
            this.tasksClosedSensor = metrics.sensor(str2 + ".task-closed", Sensor.RecordingLevel.INFO);
            this.tasksClosedSensor.add(metrics.metricName("task-closed-rate", this.groupName, "The average per-second number of closed tasks", this.tags), new Rate(new Count()));
            this.skippedRecordsSensor = metrics.sensor(str2 + ".skipped-records");
            this.skippedRecordsSensor.add(metrics.metricName("skipped-records-rate", this.groupName, "The average per-second number of skipped records.", this.tags), new Rate(new Sum()));
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamsMetricsImpl, org.apache.kafka.streams.StreamsMetrics
        public void recordLatency(Sensor sensor, long j, long j2) {
            sensor.record(j2 - j, StreamThread.this.timerStartedMs);
        }

        void removeAllSensors() {
            removeSensor(this.commitTimeSensor);
            removeSensor(this.pollTimeSensor);
            removeSensor(this.processTimeSensor);
            removeSensor(this.punctuateTimeSensor);
            removeSensor(this.taskCreatedSensor);
            removeSensor(this.tasksClosedSensor);
            removeSensor(this.skippedRecordsSensor);
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThread$TaskCreator.class */
    public class TaskCreator extends AbstractTaskCreator {
        TaskCreator() {
            super();
        }

        @Override // org.apache.kafka.streams.processor.internals.StreamThread.AbstractTaskCreator
        void createTask(TaskId taskId, Set<TopicPartition> set) {
            StreamTask createStreamTask = StreamThread.this.createStreamTask(taskId, set);
            StreamThread.this.activeTasks.put(taskId, createStreamTask);
            Iterator<TopicPartition> it = set.iterator();
            while (it.hasNext()) {
                StreamThread.this.activeTasksByPartition.put(it.next(), createStreamTask);
            }
        }
    }

    public StreamThread(TopologyBuilder topologyBuilder, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier, String str, String str2, UUID uuid, Metrics metrics, Time time, StreamsMetadataState streamsMetadataState, long j) {
        super(str2 + "-StreamThread-" + STREAM_THREAD_ID_SEQUENCE.getAndIncrement());
        this.state = State.CREATED;
        this.stateListener = null;
        this.rebalanceException = null;
        this.processStandbyRecords = false;
        this.taskCreator = new TaskCreator();
        this.applicationId = str;
        this.config = streamsConfig;
        this.builder = topologyBuilder;
        this.clientSupplier = kafkaClientSupplier;
        this.sourceTopicPattern = topologyBuilder.sourceTopicPattern();
        this.clientId = str2;
        this.processId = uuid;
        this.partitionGrouper = (PartitionGrouper) streamsConfig.getConfiguredInstance(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, PartitionGrouper.class);
        this.streamsMetadataState = streamsMetadataState;
        this.threadClientId = getName();
        this.logPrefix = String.format("stream-thread [%s]", this.threadClientId);
        this.streamsMetrics = new StreamsMetricsThreadImpl(metrics, "stream-metrics", "thread." + this.threadClientId, Collections.singletonMap("client-id", this.threadClientId));
        if (streamsConfig.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG).longValue() < 0) {
            log.warn("{} Negative cache size passed in thread. Reverting to cache size of 0 bytes.", this.logPrefix);
        }
        this.cache = new ThreadCache(this.threadClientId, j, this.streamsMetrics);
        this.eosEnabled = StreamsConfig.EXACTLY_ONCE.equals(streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG));
        log.info("{} Creating consumer client", this.logPrefix);
        Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(this, str, this.threadClientId);
        if (!topologyBuilder.latestResetTopicsPattern().pattern().equals("") || !topologyBuilder.earliestResetTopicsPattern().pattern().equals("")) {
            this.originalReset = (String) consumerConfigs.get("auto.offset.reset");
            log.info("{} Custom offset resets specified updating configs original auto offset reset {}", this.logPrefix, this.originalReset);
            consumerConfigs.put("auto.offset.reset", "none");
        }
        this.consumer = kafkaClientSupplier.getConsumer(consumerConfigs);
        log.info("{} Creating restore consumer client", this.logPrefix);
        this.restoreConsumer = kafkaClientSupplier.getRestoreConsumer(streamsConfig.getRestoreConsumerConfigs(this.threadClientId));
        this.activeTasks = new ConcurrentHashMap();
        this.standbyTasks = new HashMap();
        this.activeTasksByPartition = new HashMap();
        this.standbyTasksByPartition = new HashMap();
        this.prevActiveTasks = new HashSet();
        this.suspendedTasks = new HashMap();
        this.suspendedStandbyTasks = new HashMap();
        this.standbyRecords = new HashMap();
        this.stateDirectory = new StateDirectory(str, this.threadClientId, streamsConfig.getString(StreamsConfig.STATE_DIR_CONFIG), time);
        this.rebalanceTimeoutMs = ((Integer) ConfigDef.parseType("max.poll.interval.ms", consumerConfigs.get("max.poll.interval.ms"), ConfigDef.Type.INT)).intValue();
        this.pollTimeMs = streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG).longValue();
        this.commitTimeMs = streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG).longValue();
        this.cleanTimeMs = streamsConfig.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG).longValue();
        this.time = time;
        this.timerStartedMs = time.milliseconds();
        this.lastCleanMs = Long.MAX_VALUE;
        this.lastCommitMs = this.timerStartedMs;
        this.rebalanceListener = new RebalanceListener(time, streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG).intValue());
        setState(State.RUNNING);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        log.info("{} Starting", this.logPrefix);
        boolean z = false;
        try {
            try {
                try {
                    runLoop();
                    z = true;
                    shutdown(true);
                } catch (KafkaException e) {
                    throw e;
                }
            } catch (Exception e2) {
                log.error("{} Streams application error during processing: ", this.logPrefix, e2);
                throw e2;
            }
        } catch (Throwable th) {
            shutdown(z);
            throw th;
        }
    }

    private void runLoop() {
        long j = -1;
        this.consumer.subscribe(this.sourceTopicPattern, this.rebalanceListener);
        while (stillRunning()) {
            this.timerStartedMs = this.time.milliseconds();
            ConsumerRecords<byte[], byte[]> pollRequests = pollRequests();
            if (pollRequests != null && !pollRequests.isEmpty() && !this.activeTasks.isEmpty()) {
                this.streamsMetrics.pollTimeSensor.record(computeLatency(), this.timerStartedMs);
                addRecordsToTasks(pollRequests);
                long processAndPunctuate = processAndPunctuate(this.activeTasks, j);
                if (processAndPunctuate > 0) {
                    long computeLatency = computeLatency();
                    this.streamsMetrics.processTimeSensor.record(computeLatency / processAndPunctuate, this.timerStartedMs);
                    j = adjustRecordsProcessedBeforeCommit(j, processAndPunctuate, computeLatency, this.commitTimeMs);
                }
            }
            maybeCommit(this.timerStartedMs);
            maybeUpdateStandbyTasks(this.timerStartedMs);
            maybeClean(this.timerStartedMs);
        }
        log.info("{} Shutting down at user request", this.logPrefix);
    }

    private ConsumerRecords<byte[], byte[]> pollRequests() {
        ConsumerRecords<byte[], byte[]> consumerRecords = null;
        try {
            consumerRecords = this.consumer.poll(this.pollTimeMs);
        } catch (InvalidOffsetException e) {
            resetInvalidOffsets(e);
        }
        if (this.rebalanceException == null || (this.rebalanceException instanceof ProducerFencedException)) {
            return consumerRecords;
        }
        throw new StreamsException(this.logPrefix + " Failed to rebalance.", this.rebalanceException);
    }

    private void resetInvalidOffsets(InvalidOffsetException invalidOffsetException) {
        Set<TopicPartition> partitions = invalidOffsetException.partitions();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        HashSet hashSet3 = new HashSet();
        for (TopicPartition topicPartition : partitions) {
            if (this.builder.earliestResetTopicsPattern().matcher(topicPartition.topic()).matches()) {
                addToResetList(topicPartition, hashSet2, "{} Setting topic '{}' to consume from {} offset", "earliest", hashSet);
            } else if (this.builder.latestResetTopicsPattern().matcher(topicPartition.topic()).matches()) {
                addToResetList(topicPartition, hashSet3, "{} Setting topic '{}' to consume from {} offset", "latest", hashSet);
            } else {
                if (this.originalReset == null || !(this.originalReset.equals("earliest") || this.originalReset.equals("latest"))) {
                    setState(State.PENDING_SHUTDOWN);
                    throw new StreamsException(String.format("No valid committed offset found for input topic %s (partition %s) and no valid reset policy configured. You need to set configuration parameter \"auto.offset.reset\" or specify a topic specific reset policy via KStreamBuilder#stream(StreamsConfig.AutoOffsetReset offsetReset, ...) or KStreamBuilder#table(StreamsConfig.AutoOffsetReset offsetReset, ...)", topicPartition.topic(), Integer.valueOf(topicPartition.partition())), invalidOffsetException);
                }
                if (this.originalReset.equals("earliest")) {
                    addToResetList(topicPartition, hashSet2, "{} No custom setting defined for topic '{}' using original config '{}' for offset reset", "earliest", hashSet);
                } else if (this.originalReset.equals("latest")) {
                    addToResetList(topicPartition, hashSet3, "{} No custom setting defined for topic '{}' using original config '{}' for offset reset", "latest", hashSet);
                }
            }
        }
        if (!hashSet2.isEmpty()) {
            this.consumer.seekToBeginning(hashSet2);
        }
        if (hashSet3.isEmpty()) {
            return;
        }
        this.consumer.seekToEnd(hashSet3);
    }

    private void addToResetList(TopicPartition topicPartition, Set<TopicPartition> set, String str, String str2, Set<String> set2) {
        String str3 = topicPartition.topic();
        if (set2.add(str3)) {
            log.info(str, new Object[]{this.logPrefix, str3, str2});
        }
        set.add(topicPartition);
    }

    private void addRecordsToTasks(ConsumerRecords<byte[], byte[]> consumerRecords) {
        if (consumerRecords == null || consumerRecords.isEmpty()) {
            return;
        }
        int i = 0;
        for (TopicPartition topicPartition : consumerRecords.partitions()) {
            i += this.activeTasksByPartition.get(topicPartition).addRecords(topicPartition, consumerRecords.records(topicPartition));
        }
        this.streamsMetrics.skippedRecordsSensor.record(consumerRecords.count() - i, this.timerStartedMs);
    }

    private long processAndPunctuate(Map<TaskId, StreamTask> map, long j) {
        long j2;
        long j3 = 0;
        do {
            j2 = 0;
            Iterator<Map.Entry<TaskId, StreamTask>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                StreamTask value = it.next().getValue();
                try {
                    if (value.process()) {
                        j2++;
                        j3++;
                    }
                } catch (ProducerFencedException e) {
                    closeZombieTask(value);
                    it.remove();
                }
            }
            if (j != -1 && j3 >= j) {
                j3 = 0;
                this.streamsMetrics.processTimeSensor.record(computeLatency() / 0, this.timerStartedMs);
                maybeCommit(this.timerStartedMs);
            }
        } while (j2 != 0);
        RuntimeException performOnStreamTasks = performOnStreamTasks(new StreamTaskAction() { // from class: org.apache.kafka.streams.processor.internals.StreamThread.1
            private String name;

            AnonymousClass1() {
            }

            @Override // org.apache.kafka.streams.processor.internals.StreamThread.StreamTaskAction
            public String name() {
                return this.name;
            }

            @Override // org.apache.kafka.streams.processor.internals.StreamThread.StreamTaskAction
            public void apply(StreamTask streamTask) {
                this.name = "punctuate";
                StreamThread.this.maybePunctuate(streamTask);
                if (streamTask.commitNeeded()) {
                    this.name = "commit";
                    long milliseconds = StreamThread.this.time.milliseconds();
                    StreamThread.this.commitOne(streamTask);
                    if (StreamThread.log.isDebugEnabled()) {
                        StreamThread.log.debug("{} Committed active task {} per user request in {}ms", new Object[]{StreamThread.this.logPrefix, streamTask.id(), Long.valueOf(StreamThread.this.timerStartedMs - milliseconds)});
                    }
                }
            }
        });
        if (performOnStreamTasks != null) {
            throw performOnStreamTasks;
        }
        return j3;
    }

    public void maybePunctuate(StreamTask streamTask) {
        try {
            if (streamTask.maybePunctuate()) {
                this.streamsMetrics.punctuateTimeSensor.record(computeLatency(), this.timerStartedMs);
            }
        } catch (KafkaException e) {
            log.error("{} Failed to punctuate active task {}: ", new Object[]{this.logPrefix, streamTask.id(), e});
            throw e;
        }
    }

    private long adjustRecordsProcessedBeforeCommit(long j, long j2, long j3, long j4) {
        long j5 = -1;
        if (j3 > 0 && j3 > j4) {
            j5 = Math.max(1L, (j4 * j2) / j3);
            log.debug("{} processing latency {} > commit time {} for {} records. Adjusting down recordsProcessedBeforeCommit={}", new Object[]{this.logPrefix, Long.valueOf(j3), Long.valueOf(j4), Long.valueOf(j2), Long.valueOf(j5)});
        } else if (j != -1 && j3 > 0) {
            j5 = Math.max(1L, (j4 * j2) / j3);
            log.debug("{} processing latency {} < commit time {} for {} records. Adjusting up recordsProcessedBeforeCommit={}", new Object[]{this.logPrefix, Long.valueOf(j3), Long.valueOf(j4), Long.valueOf(j2), Long.valueOf(j5)});
        }
        return j5;
    }

    protected void maybeCommit(long j) {
        if (this.commitTimeMs < 0 || this.lastCommitMs + this.commitTimeMs >= j) {
            return;
        }
        if (log.isTraceEnabled()) {
            log.trace("{} Committing all active tasks {} and standby tasks {} since {}ms has elapsed (commit interval is {}ms)", new Object[]{this.logPrefix, this.activeTasks.keySet(), this.standbyTasks.keySet(), Long.valueOf(j - this.lastCommitMs), Long.valueOf(this.commitTimeMs)});
        }
        commitAll();
        if (log.isDebugEnabled()) {
            log.info("{} Committed all active tasks {} and standby tasks {} in {}ms", new Object[]{this.logPrefix, this.activeTasks.keySet(), this.standbyTasks.keySet(), Long.valueOf(this.timerStartedMs - j)});
        }
        this.lastCommitMs = j;
        this.processStandbyRecords = true;
    }

    private void commitAll() {
        RuntimeException performOnStreamTasks = performOnStreamTasks(new StreamTaskAction() { // from class: org.apache.kafka.streams.processor.internals.StreamThread.2
            AnonymousClass2() {
            }

            @Override // org.apache.kafka.streams.processor.internals.StreamThread.StreamTaskAction
            public String name() {
                return "commit";
            }

            @Override // org.apache.kafka.streams.processor.internals.StreamThread.StreamTaskAction
            public void apply(StreamTask streamTask) {
                StreamThread.this.commitOne(streamTask);
            }
        });
        if (performOnStreamTasks != null) {
            throw performOnStreamTasks;
        }
        Iterator<StandbyTask> it = this.standbyTasks.values().iterator();
        while (it.hasNext()) {
            commitOne(it.next());
        }
    }

    public void commitOne(AbstractTask abstractTask) {
        try {
            abstractTask.commit();
        } catch (KafkaException e) {
            log.error("{} Failed to commit {} {} state: ", new Object[]{this.logPrefix, abstractTask.getClass().getSimpleName(), abstractTask.id(), e});
            throw e;
        } catch (CommitFailedException e2) {
            log.warn("{} Failed to commit {} {} state: ", new Object[]{this.logPrefix, abstractTask.getClass().getSimpleName(), abstractTask.id(), e2});
        }
        this.streamsMetrics.commitTimeSensor.record(computeLatency(), this.timerStartedMs);
    }

    private void maybeUpdateStandbyTasks(long j) {
        if (this.standbyTasks.isEmpty()) {
            return;
        }
        if (this.processStandbyRecords) {
            if (!this.standbyRecords.isEmpty()) {
                HashMap hashMap = new HashMap();
                for (Map.Entry<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> entry : this.standbyRecords.entrySet()) {
                    TopicPartition key = entry.getKey();
                    List<ConsumerRecord<byte[], byte[]>> value = entry.getValue();
                    if (value != null) {
                        List<ConsumerRecord<byte[], byte[]>> update = this.standbyTasksByPartition.get(key).update(key, value);
                        if (update != null) {
                            hashMap.put(key, update);
                        } else {
                            this.restoreConsumer.resume(Collections.singleton(key));
                        }
                    }
                }
                this.standbyRecords = hashMap;
                log.debug("{} Updated standby tasks {} in {}ms", new Object[]{this.logPrefix, this.standbyTasks.keySet(), Long.valueOf(this.time.milliseconds() - j)});
            }
            this.processStandbyRecords = false;
        }
        ConsumerRecords poll = this.restoreConsumer.poll(0L);
        if (poll.isEmpty()) {
            return;
        }
        for (TopicPartition topicPartition : poll.partitions()) {
            StandbyTask standbyTask = this.standbyTasksByPartition.get(topicPartition);
            if (standbyTask == null) {
                throw new StreamsException(this.logPrefix + " Missing standby task for partition " + topicPartition);
            }
            List<ConsumerRecord<byte[], byte[]>> update2 = standbyTask.update(topicPartition, poll.records(topicPartition));
            if (update2 != null) {
                this.restoreConsumer.pause(Collections.singleton(topicPartition));
                this.standbyRecords.put(topicPartition, update2);
            }
        }
    }

    protected void maybeClean(long j) {
        if (j > this.lastCleanMs + this.cleanTimeMs) {
            this.stateDirectory.cleanRemovedTasks(this.cleanTimeMs);
            this.lastCleanMs = j;
        }
    }

    private long computeLatency() {
        long j = this.timerStartedMs;
        this.timerStartedMs = this.time.milliseconds();
        return Math.max(this.timerStartedMs - j, 0L);
    }

    public synchronized void close() {
        log.info("{} Informed thread to shut down", this.logPrefix);
        setState(State.PENDING_SHUTDOWN);
    }

    public synchronized boolean isInitialized() {
        return this.state == State.RUNNING;
    }

    public synchronized boolean stillRunning() {
        return this.state.isRunning();
    }

    public Map<TaskId, StreamTask> tasks() {
        return Collections.unmodifiableMap(this.activeTasks);
    }

    public Set<TaskId> prevActiveTasks() {
        return Collections.unmodifiableSet(this.prevActiveTasks);
    }

    public Set<TaskId> cachedTasks() {
        HashSet hashSet = new HashSet();
        File[] listTaskDirectories = this.stateDirectory.listTaskDirectories();
        if (listTaskDirectories != null) {
            for (File file : listTaskDirectories) {
                try {
                    TaskId parse = TaskId.parse(file.getName());
                    if (new File(file, ".checkpoint").exists()) {
                        hashSet.add(parse);
                    }
                } catch (TaskIdFormatException e) {
                }
            }
        }
        return hashSet;
    }

    public void setStateListener(StateListener stateListener) {
        this.stateListener = stateListener;
    }

    public synchronized State state() {
        return this.state;
    }

    public synchronized void setStateWhenNotInPendingShutdown(State state) {
        if (this.state == State.PENDING_SHUTDOWN) {
            return;
        }
        setState(state);
    }

    private synchronized void setState(State state) {
        State state2 = this.state;
        if (this.state.isValidTransition(state)) {
            log.info("{} State transition from {} to {}.", new Object[]{this.logPrefix, state2, state});
        } else {
            log.warn("{} Unexpected state transition from {} to {}.", new Object[]{this.logPrefix, state2, state});
        }
        this.state = state;
        if (this.stateListener != null) {
            this.stateListener.onChange(this, this.state, state2);
        }
    }

    @Override // java.lang.Thread
    public String toString() {
        return toString("");
    }

    public String toString(String str) {
        StringBuilder append = new StringBuilder().append(str).append("StreamsThread appId: ").append(this.applicationId).append("\n").append(str).append("\tStreamsThread clientId: ").append(this.clientId).append("\n").append(str).append("\tStreamsThread threadId: ").append(getName()).append("\n");
        if (this.activeTasks != null) {
            append.append(str).append("\tActive tasks:\n");
            Iterator<Map.Entry<TaskId, StreamTask>> it = this.activeTasks.entrySet().iterator();
            while (it.hasNext()) {
                append.append(str).append(it.next().getValue().toString(str + "\t\t"));
            }
        }
        if (this.standbyTasks != null) {
            append.append(str).append("\tStandby tasks:\n");
            Iterator<StandbyTask> it2 = this.standbyTasks.values().iterator();
            while (it2.hasNext()) {
                append.append(str).append(it2.next().toString(str + "\t\t"));
            }
            append.append("\n");
        }
        return append.toString();
    }

    String threadClientId() {
        return this.threadClientId;
    }

    public void setPartitionAssignor(StreamPartitionAssignor streamPartitionAssignor) {
        this.partitionAssignor = streamPartitionAssignor;
    }

    private void shutdown(boolean z) {
        log.info("{} Shutting down", this.logPrefix);
        shutdownTasksAndState(z);
        if (this.threadProducer != null) {
            try {
                this.threadProducer.close();
            } catch (Throwable th) {
                log.error("{} Failed to close producer: ", this.logPrefix, th);
            }
        }
        try {
            this.consumer.close();
        } catch (Throwable th2) {
            log.error("{} Failed to close consumer: ", this.logPrefix, th2);
        }
        try {
            this.restoreConsumer.close();
        } catch (Throwable th3) {
            log.error("{} Failed to close restore consumer: ", this.logPrefix, th3);
        }
        try {
            this.partitionAssignor.close();
        } catch (Throwable th4) {
            log.error("{} Failed to close KafkaStreamClient: ", this.logPrefix, th4);
        }
        removeStreamTasks();
        removeStandbyTasks();
        log.info("{} Stream thread shutdown complete", this.logPrefix);
        setState(State.DEAD);
        this.streamsMetrics.removeAllSensors();
    }

    private void shutdownTasksAndState(boolean z) {
        log.debug("{} Shutting down all active tasks {}, standby tasks {}, suspended tasks {}, and suspended standby tasks {}", new Object[]{this.logPrefix, this.activeTasks.keySet(), this.standbyTasks.keySet(), this.suspendedTasks.keySet(), this.suspendedStandbyTasks.keySet()});
        for (AbstractTask abstractTask : allTasks()) {
            try {
                abstractTask.close(z);
            } catch (RuntimeException e) {
                log.error("{} Failed while closing {} {}: ", new Object[]{this.logPrefix, abstractTask.getClass().getSimpleName(), abstractTask.id(), e});
            }
        }
        unAssignChangeLogPartitions();
    }

    public void suspendTasksAndState() {
        log.debug("{} Suspending all active tasks {} and standby tasks {}", new Object[]{this.logPrefix, this.activeTasks.keySet(), this.standbyTasks.keySet()});
        AtomicReference atomicReference = new AtomicReference(null);
        atomicReference.compareAndSet(null, performOnStreamTasks(new StreamTaskAction() { // from class: org.apache.kafka.streams.processor.internals.StreamThread.3
            AnonymousClass3() {
            }

            @Override // org.apache.kafka.streams.processor.internals.StreamThread.StreamTaskAction
            public String name() {
                return "suspend";
            }

            @Override // org.apache.kafka.streams.processor.internals.StreamThread.StreamTaskAction
            public void apply(StreamTask streamTask) {
                try {
                    streamTask.suspend();
                } catch (Exception e) {
                    try {
                        streamTask.close(false);
                    } catch (Exception e2) {
                        StreamThread.log.error("{} Closing task {} failed: ", new Object[]{StreamThread.this.logPrefix, streamTask.id, e2});
                    }
                    throw e;
                }
            }
        }));
        for (StandbyTask standbyTask : this.standbyTasks.values()) {
            try {
                try {
                    standbyTask.suspend();
                } catch (Exception e) {
                    try {
                        standbyTask.close(false);
                    } catch (Exception e2) {
                        log.error("{} Closing standby task {} failed: ", new Object[]{this.logPrefix, standbyTask.id, e2});
                    }
                    throw e;
                    break;
                }
            } catch (RuntimeException e3) {
                atomicReference.compareAndSet(null, e3);
            }
        }
        atomicReference.compareAndSet(null, unAssignChangeLogPartitions());
        updateSuspendedTasks();
        if (atomicReference.get() != null) {
            throw new StreamsException(this.logPrefix + " failed to suspend stream tasks", (Throwable) atomicReference.get());
        }
    }

    private RuntimeException unAssignChangeLogPartitions() {
        try {
            this.restoreConsumer.assign(Collections.emptyList());
            return null;
        } catch (RuntimeException e) {
            log.error("{} Failed to un-assign change log partitions: ", this.logPrefix, e);
            return e;
        }
    }

    private List<AbstractTask> allTasks() {
        List<AbstractTask> activeAndStandbytasks = activeAndStandbytasks();
        activeAndStandbytasks.addAll(suspendedAndSuspendedStandbytasks());
        return activeAndStandbytasks;
    }

    private List<AbstractTask> activeAndStandbytasks() {
        ArrayList arrayList = new ArrayList(this.activeTasks.values());
        arrayList.addAll(this.standbyTasks.values());
        return arrayList;
    }

    private List<AbstractTask> suspendedAndSuspendedStandbytasks() {
        ArrayList arrayList = new ArrayList(this.suspendedTasks.values());
        arrayList.addAll(this.suspendedStandbyTasks.values());
        return arrayList;
    }

    private StreamTask findMatchingSuspendedTask(TaskId taskId, Set<TopicPartition> set) {
        if (!this.suspendedTasks.containsKey(taskId)) {
            return null;
        }
        StreamTask streamTask = this.suspendedTasks.get(taskId);
        if (streamTask.partitions.equals(set)) {
            return streamTask;
        }
        return null;
    }

    private StandbyTask findMatchingSuspendedStandbyTask(TaskId taskId, Set<TopicPartition> set) {
        if (!this.suspendedStandbyTasks.containsKey(taskId)) {
            return null;
        }
        StandbyTask standbyTask = this.suspendedStandbyTasks.get(taskId);
        if (standbyTask.partitions.equals(set)) {
            return standbyTask;
        }
        return null;
    }

    public void closeNonAssignedSuspendedTasks() {
        Map<TaskId, Set<TopicPartition>> activeTasks = this.partitionAssignor.activeTasks();
        Iterator<Map.Entry<TaskId, StreamTask>> it = this.suspendedTasks.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, StreamTask> next = it.next();
            StreamTask value = next.getValue();
            if (!value.partitions().equals(activeTasks.get(next.getKey()))) {
                log.debug("{} Closing suspended non-assigned active task {}", this.logPrefix, value.id());
                try {
                    try {
                        value.closeSuspended(true, null);
                        it.remove();
                    } catch (Exception e) {
                        log.error("{} Failed to remove suspended task {}: ", new Object[]{this.logPrefix, next.getKey(), e});
                        it.remove();
                    }
                } catch (Throwable th) {
                    it.remove();
                    throw th;
                }
            }
        }
    }

    public void closeNonAssignedSuspendedStandbyTasks() {
        Set<TaskId> keySet = this.partitionAssignor.standbyTasks().keySet();
        Iterator<Map.Entry<TaskId, StandbyTask>> it = this.suspendedStandbyTasks.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<TaskId, StandbyTask> next = it.next();
            if (!keySet.contains(next.getKey())) {
                StandbyTask value = next.getValue();
                log.debug("{} Closing suspended non-assigned standby task {}", this.logPrefix, value.id());
                try {
                    try {
                        value.close(true);
                        it.remove();
                    } catch (Exception e) {
                        log.error("{} Failed to remove suspended standby task {}: ", new Object[]{this.logPrefix, value.id(), e});
                        it.remove();
                    }
                } catch (Throwable th) {
                    it.remove();
                    throw th;
                }
            }
        }
    }

    protected StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
        this.streamsMetrics.taskCreatedSensor.record();
        try {
            StreamTask streamTask = new StreamTask(taskId, this.applicationId, collection, this.builder.build(Integer.valueOf(taskId.topicGroupId)), this.consumer, this.storeChangelogReader, this.config, this.streamsMetrics, this.stateDirectory, this.cache, this.time, createProducer(taskId));
            log.trace("{} Created active task {} with assigned partitions {}", new Object[]{this.logPrefix, taskId, collection});
            return streamTask;
        } catch (Throwable th) {
            log.trace("{} Created active task {} with assigned partitions {}", new Object[]{this.logPrefix, taskId, collection});
            throw th;
        }
    }

    private Producer<byte[], byte[]> createProducer(TaskId taskId) {
        Producer<byte[], byte[]> producer;
        if (this.eosEnabled) {
            Map<String, Object> producerConfigs = this.config.getProducerConfigs(this.threadClientId + "-" + taskId);
            log.info("{} Creating producer client for task {}", this.logPrefix, taskId);
            producerConfigs.put("transactional.id", this.applicationId + "-" + taskId);
            producer = this.clientSupplier.getProducer(producerConfigs);
        } else {
            if (this.threadProducer == null) {
                Map<String, Object> producerConfigs2 = this.config.getProducerConfigs(this.threadClientId);
                log.info("{} Creating shared producer client", this.logPrefix);
                this.threadProducer = this.clientSupplier.getProducer(producerConfigs2);
            }
            producer = this.threadProducer;
        }
        return producer;
    }

    public void addStreamTasks(Collection<TopicPartition> collection, long j) {
        if (this.partitionAssignor == null) {
            throw new IllegalStateException(this.logPrefix + " Partition assignor has not been initialized while adding stream tasks: this should not happen.");
        }
        HashMap hashMap = new HashMap();
        log.debug("{} Adding assigned tasks as active: {}", this.logPrefix, this.partitionAssignor.activeTasks());
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : this.partitionAssignor.activeTasks().entrySet()) {
            TaskId key = entry.getKey();
            Set<TopicPartition> value = entry.getValue();
            if (collection.containsAll(value)) {
                try {
                    StreamTask findMatchingSuspendedTask = findMatchingSuspendedTask(key, value);
                    if (findMatchingSuspendedTask != null) {
                        this.suspendedTasks.remove(key);
                        findMatchingSuspendedTask.resume();
                        this.activeTasks.put(key, findMatchingSuspendedTask);
                        Iterator<TopicPartition> it = value.iterator();
                        while (it.hasNext()) {
                            this.activeTasksByPartition.put(it.next(), findMatchingSuspendedTask);
                        }
                    } else {
                        hashMap.put(key, value);
                    }
                } catch (StreamsException e) {
                    log.error("{} Failed to create an active task {}: ", new Object[]{this.logPrefix, key, e});
                    throw e;
                }
            } else {
                log.warn("{} Task {} owned partitions {} are not contained in the assignment {}", new Object[]{this.logPrefix, key, value, collection});
            }
        }
        log.trace("{} New active tasks to be created: {}", this.logPrefix, hashMap);
        this.taskCreator.retryWithBackoff(hashMap, j);
    }

    protected StandbyTask createStandbyTask(TaskId taskId, Collection<TopicPartition> collection) {
        this.streamsMetrics.taskCreatedSensor.record();
        ProcessorTopology build = this.builder.build(Integer.valueOf(taskId.topicGroupId));
        if (build.stateStores().isEmpty()) {
            log.trace("{} Skipped standby task {} with assigned partitions {} since it does not have any state stores to materialize", new Object[]{this.logPrefix, taskId, collection});
            return null;
        }
        try {
            StandbyTask standbyTask = new StandbyTask(taskId, this.applicationId, collection, build, this.consumer, this.storeChangelogReader, this.config, this.streamsMetrics, this.stateDirectory);
            log.trace("{} Created standby task {} with assigned partitions {}", new Object[]{this.logPrefix, taskId, collection});
            return standbyTask;
        } catch (Throwable th) {
            log.trace("{} Created standby task {} with assigned partitions {}", new Object[]{this.logPrefix, taskId, collection});
            throw th;
        }
    }

    public void addStandbyTasks(long j) {
        if (this.partitionAssignor == null) {
            throw new IllegalStateException(this.logPrefix + " Partition assignor has not been initialized while adding standby tasks: this should not happen.");
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        log.debug("{} Adding assigned standby tasks {}", this.logPrefix, this.partitionAssignor.standbyTasks());
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : this.partitionAssignor.standbyTasks().entrySet()) {
            TaskId key = entry.getKey();
            Set<TopicPartition> value = entry.getValue();
            StandbyTask findMatchingSuspendedStandbyTask = findMatchingSuspendedStandbyTask(key, value);
            if (findMatchingSuspendedStandbyTask != null) {
                this.suspendedStandbyTasks.remove(key);
                findMatchingSuspendedStandbyTask.resume();
            } else {
                hashMap2.put(key, value);
            }
            updateStandByTaskMaps(hashMap, key, value, findMatchingSuspendedStandbyTask);
        }
        log.trace("{} New standby tasks to be created: {}", this.logPrefix, hashMap2);
        new StandbyTaskCreator(hashMap).retryWithBackoff(hashMap2, j);
        this.restoreConsumer.assign(new ArrayList(hashMap.keySet()));
        for (Map.Entry<TopicPartition, Long> entry2 : hashMap.entrySet()) {
            TopicPartition key2 = entry2.getKey();
            long longValue = entry2.getValue().longValue();
            if (longValue >= 0) {
                this.restoreConsumer.seek(key2, longValue);
            } else {
                this.restoreConsumer.seekToBeginning(Collections.singleton(key2));
            }
        }
    }

    public void updateStandByTaskMaps(Map<TopicPartition, Long> map, TaskId taskId, Set<TopicPartition> set, StandbyTask standbyTask) {
        if (standbyTask != null) {
            this.standbyTasks.put(taskId, standbyTask);
            Iterator<TopicPartition> it = set.iterator();
            while (it.hasNext()) {
                this.standbyTasksByPartition.put(it.next(), standbyTask);
            }
            Iterator<TopicPartition> it2 = standbyTask.checkpointedOffsets().keySet().iterator();
            while (it2.hasNext()) {
                this.standbyTasksByPartition.put(it2.next(), standbyTask);
            }
            map.putAll(standbyTask.checkpointedOffsets());
        }
    }

    private void updateSuspendedTasks() {
        this.suspendedTasks.clear();
        this.suspendedTasks.putAll(this.activeTasks);
        this.suspendedStandbyTasks.putAll(this.standbyTasks);
    }

    public void removeStreamTasks() {
        log.debug("{} Removing all active tasks {}", this.logPrefix, this.activeTasks.keySet());
        try {
            this.prevActiveTasks.clear();
            this.prevActiveTasks.addAll(this.activeTasks.keySet());
            this.activeTasks.clear();
            this.activeTasksByPartition.clear();
        } catch (Exception e) {
            log.error("{} Failed to remove stream tasks: ", this.logPrefix, e);
        }
    }

    public void removeStandbyTasks() {
        log.debug("{} Removing all standby tasks {}", this.logPrefix, this.standbyTasks.keySet());
        this.standbyTasks.clear();
        this.standbyTasksByPartition.clear();
        this.standbyRecords.clear();
    }

    private void closeZombieTask(StreamTask streamTask) {
        log.warn("{} Producer of task {} fenced; closing zombie task.", this.logPrefix, streamTask.id);
        try {
            streamTask.close(false);
        } catch (Exception e) {
            if (!log.isDebugEnabled() && !log.isTraceEnabled()) {
                log.warn("{} Failed to close zombie task: {}", this.logPrefix, e.getMessage());
            }
            log.debug("{} Failed to close zombie task: ", this.logPrefix, e);
        }
        this.activeTasks.remove(streamTask.id);
    }

    private RuntimeException performOnStreamTasks(StreamTaskAction streamTaskAction) {
        RuntimeException runtimeException = null;
        Iterator<Map.Entry<TaskId, StreamTask>> it = this.activeTasks.entrySet().iterator();
        while (it.hasNext()) {
            StreamTask value = it.next().getValue();
            try {
                streamTaskAction.apply(value);
            } catch (RuntimeException e) {
                log.error("{} Failed to {} stream task {}: ", new Object[]{this.logPrefix, streamTaskAction.name(), value.id(), e});
                if (runtimeException == null) {
                    runtimeException = e;
                }
            } catch (ProducerFencedException e2) {
                closeZombieTask(value);
                it.remove();
            }
        }
        return runtimeException;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.kafka.streams.processor.internals.StreamThread.access$1402(org.apache.kafka.streams.processor.internals.StreamThread, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$1402(org.apache.kafka.streams.processor.internals.StreamThread r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.lastCleanMs = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.streams.processor.internals.StreamThread.access$1402(org.apache.kafka.streams.processor.internals.StreamThread, long):long");
    }

    static {
    }
}
