/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.topic;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import kafka.server.LogDirFailureChannel;
import kafka.tier.TierTopicManagerCommitter;
import kafka.tier.TopicIdPartition;
import kafka.tier.client.TierTopicConsumerSupplier;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.TierRecordType;
import kafka.tier.exceptions.TierMetadataFatalException;
import kafka.tier.exceptions.TierMetadataRetriableException;
import kafka.tier.state.TierPartitionState;
import kafka.tier.state.TierPartitionStatus;
import kafka.tier.topic.InitializedTierTopic;
import kafka.tier.topic.TierCatchupConsumer;
import kafka.tier.topic.TierTopicListeners;
import kafka.tier.topic.TierTopicManager;
import kafka.tier.topic.TierTopicManagerConfig;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.KafkaThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TierTopicConsumer
implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(TierTopicConsumer.class);
    private final TierTopicManagerConfig config;
    private final TierTopicListeners resultListeners = new TierTopicListeners();
    private final Map<TopicIdPartition, ClientCtx> immigratingPartitions = new HashMap<TopicIdPartition, ClientCtx>();
    private final Map<TopicIdPartition, ClientCtx> onlinePartitions = new HashMap<TopicIdPartition, ClientCtx>();
    private final Map<TopicIdPartition, ClientCtx> catchingUpPartitions = new HashMap<TopicIdPartition, ClientCtx>();
    private final Thread consumerThread = new KafkaThread("TierTopicConsumer", (Runnable)this, false);
    private final Supplier<Consumer<byte[], byte[]>> primaryConsumerSupplier;
    private final TierTopicManagerCommitter committer;
    private volatile Consumer<byte[], byte[]> primaryConsumer;
    private volatile boolean ready = true;
    private volatile boolean shutdown = false;
    private InitializedTierTopic tierTopic;
    private TierCatchupConsumer catchupConsumer;

    public TierTopicConsumer(TierTopicManagerConfig config, LogDirFailureChannel logDirFailureChannel) {
        this(config, new TierTopicConsumerSupplier(config, "primary"), new TierTopicConsumerSupplier(config, "catchup"), new TierTopicManagerCommitter(config, logDirFailureChannel));
    }

    public TierTopicConsumer(TierTopicManagerConfig config, Supplier<Consumer<byte[], byte[]>> primaryConsumerSupplier, Supplier<Consumer<byte[], byte[]>> catchupConsumerSupplier, TierTopicManagerCommitter committer) {
        this.config = config;
        this.committer = committer;
        this.primaryConsumerSupplier = primaryConsumerSupplier;
        this.catchupConsumer = new TierCatchupConsumer(catchupConsumerSupplier);
    }

    public synchronized void register(TopicIdPartition partition, ClientCtx clientCtx) {
        if (this.immigratingPartitions.containsKey(partition) || this.onlinePartitions.containsKey(partition) || this.catchingUpPartitions.containsKey(partition)) {
            throw new IllegalStateException("Duplicate registration for " + partition);
        }
        this.immigratingPartitions.put(partition, clientCtx);
    }

    public synchronized void register(Map<TopicIdPartition, ClientCtx> partitionsToRegister) {
        for (Map.Entry<TopicIdPartition, ClientCtx> partitionToRegister : partitionsToRegister.entrySet()) {
            this.register(partitionToRegister.getKey(), partitionToRegister.getValue());
        }
    }

    public synchronized void deregister(TopicIdPartition partition) {
        this.immigratingPartitions.remove(partition);
        this.onlinePartitions.remove(partition);
        this.catchingUpPartitions.remove(partition);
    }

    public void trackMaterialization(AbstractTierMetadata metadata, CompletableFuture<TierPartitionState.AppendResult> future) {
        this.resultListeners.addTracked(metadata, future);
    }

    public void cancelTracked(AbstractTierMetadata metadata) {
        this.resultListeners.getAndRemoveTracked(metadata);
    }

    public void startConsume(boolean startConsumeThread, InitializedTierTopic tierTopic) {
        Set<TopicPartition> tierTopicPartitions = TierTopicManager.partitions(tierTopic.topicName(), tierTopic.numPartitions().getAsInt());
        this.primaryConsumer = this.primaryConsumerSupplier.get();
        this.primaryConsumer.assign(tierTopicPartitions);
        for (TopicPartition topicPartition : tierTopicPartitions) {
            Long position = this.committer.positionFor(topicPartition.partition());
            if (position != null) {
                log.info("seeking primary consumer to committed offset {} for partition {}", (Object)position, (Object)topicPartition);
                this.primaryConsumer.seek(topicPartition, position.longValue());
                continue;
            }
            log.info("primary consumer missing committed offset for partition {}. Seeking to beginning", (Object)topicPartition);
            this.primaryConsumer.seekToBeginning(Collections.singletonList(topicPartition));
        }
        if (startConsumeThread) {
            this.consumerThread.start();
        }
        this.tierTopic = tierTopic;
        this.ready = true;
    }

    public void commitPositions(Iterator<TierPartitionState> tierPartitionStateIterator) {
        this.committer.flush(tierPartitionStateIterator);
    }

    public boolean isReady() {
        return this.ready;
    }

    public void shutdown() {
        this.shutdown = true;
        if (this.primaryConsumer != null) {
            this.primaryConsumer.wakeup();
        }
        this.catchupConsumer.wakeup();
        try {
            this.consumerThread.join();
        }
        catch (InterruptedException e) {
            log.error("Shutdown interrupted", (Throwable)e);
        }
        this.resultListeners.shutdown();
    }

    public void cleanup() {
        if (this.primaryConsumer != null) {
            this.primaryConsumer.close();
        }
        this.catchupConsumer.close();
    }

    @Override
    public void run() {
        try {
            while (!this.shutdown) {
                this.doWork();
            }
        }
        catch (Exception e) {
            if (this.shutdown) {
                log.debug("Exception caught during shutdown", (Throwable)e);
            } else {
                log.error("Fatal exception in TierTopicConsumer", (Throwable)e);
            }
        }
        finally {
            this.ready = false;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void doWork() {
        if (this.catchupConsumer.tryComplete(this.primaryConsumer)) {
            TierTopicConsumer tierTopicConsumer = this;
            synchronized (tierTopicConsumer) {
                for (ClientCtx ctx : this.catchingUpPartitions.values()) {
                    ctx.completeCatchup();
                }
                this.onlinePartitions.putAll(this.catchingUpPartitions);
                this.catchingUpPartitions.clear();
            }
        }
        this.processPendingImmigrations();
        this.processRecords((ConsumerRecords<byte[], byte[]>)this.primaryConsumer.poll(this.config.pollDuration), TierPartitionStatus.ONLINE, true);
        this.processRecords(this.catchupConsumer.poll(this.config.pollDuration), TierPartitionStatus.CATCHUP, false);
    }

    InitializedTierTopic tierTopic() {
        return this.tierTopic;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processPendingImmigrations() {
        HashMap<TopicIdPartition, ClientCtx> newCatchupPartitions = new HashMap<TopicIdPartition, ClientCtx>();
        HashMap<TopicIdPartition, ClientCtx> newOnlinePartitions = new HashMap<TopicIdPartition, ClientCtx>();
        if (!this.catchupConsumer.active()) {
            TierTopicConsumer tierTopicConsumer = this;
            synchronized (tierTopicConsumer) {
                for (Map.Entry<TopicIdPartition, ClientCtx> entry2 : this.immigratingPartitions.entrySet()) {
                    TopicIdPartition partition = entry2.getKey();
                    ClientCtx clientCtx = entry2.getValue();
                    TierPartitionStatus status = clientCtx.status();
                    if (status == TierPartitionStatus.INIT || status == TierPartitionStatus.CATCHUP) {
                        newCatchupPartitions.put(partition, clientCtx);
                        continue;
                    }
                    if (status == TierPartitionStatus.ONLINE) {
                        newOnlinePartitions.put(partition, clientCtx);
                        continue;
                    }
                    log.debug("Ignoring immigration of partition {} in state {}", (Object)partition, (Object)status);
                }
                this.catchingUpPartitions.putAll(newCatchupPartitions);
                this.onlinePartitions.putAll(newOnlinePartitions);
                this.immigratingPartitions.clear();
            }
        }
        if (!newCatchupPartitions.isEmpty()) {
            this.beginCatchup(newCatchupPartitions);
        }
    }

    private void beginCatchup(Map<TopicIdPartition, ClientCtx> partitionsToCatchup) {
        for (ClientCtx ctx : partitionsToCatchup.values()) {
            ctx.beginCatchup();
        }
        Set<TopicPartition> tierTopicPartitions = this.tierTopic.toTierTopicPartitions(partitionsToCatchup.keySet());
        this.catchupConsumer.doStart(tierTopicPartitions);
    }

    private void processRecords(ConsumerRecords<byte[], byte[]> records, TierPartitionStatus requiredState, boolean commitPositions) {
        if (records == null) {
            return;
        }
        for (ConsumerRecord record2 : records) {
            try {
                Optional<AbstractTierMetadata> entryOpt = AbstractTierMetadata.deserialize((byte[])record2.key(), (byte[])record2.value());
                if (entryOpt.isPresent()) {
                    AbstractTierMetadata entry2 = entryOpt.get();
                    log.trace("Read {} at offset {} of partition {} requiredState {}", new Object[]{entry2, record2.offset(), record2.partition(), requiredState});
                    this.processEntry(entry2, record2.partition(), record2.offset(), requiredState);
                    if (!commitPositions) continue;
                    this.committer.updatePosition(record2.partition(), record2.offset() + 1L);
                    continue;
                }
                throw new TierMetadataFatalException(String.format("Fatal Exception message for %s and unknown type: %d cannot be deserialized (requiredState:%s).", new Object[]{AbstractTierMetadata.deserializeKey((byte[])record2.key()).toString(), AbstractTierMetadata.getTypeId((byte[])record2.value()), requiredState}));
            }
            catch (Exception e) {
                throw new TierMetadataFatalException(String.format("Unable to process message at offset %d of partition %d, requiredState %s", new Object[]{record2.offset(), record2.partition(), requiredState}), e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processEntry(AbstractTierMetadata entry2, int partition, long offset2, TierPartitionStatus requiredState) throws TierMetadataFatalException {
        try {
            ClientCtx clientCtx;
            TopicIdPartition topicIdPartition = entry2.topicIdPartition();
            TierTopicConsumer tierTopicConsumer = this;
            synchronized (tierTopicConsumer) {
                clientCtx = this.onlinePartitions.containsKey(topicIdPartition) ? this.onlinePartitions.get(topicIdPartition) : this.catchingUpPartitions.get(topicIdPartition);
            }
            if (clientCtx != null) {
                TierPartitionStatus currentState = clientCtx.status();
                switch (currentState) {
                    case DISK_OFFLINE: {
                        this.resultListeners.getAndRemoveTracked(entry2).ifPresent(c -> c.completeExceptionally(new TierMetadataFatalException("Partition " + topicIdPartition + " is offline")));
                        break;
                    }
                    default: {
                        if (currentState == requiredState) {
                            TierPartitionState.AppendResult result = clientCtx.process(entry2, offset2);
                            this.resultListeners.getAndRemoveTracked(entry2).ifPresent(c -> c.complete(result));
                            break;
                        }
                        log.debug("Ignoring metadata {}. currentState: {} requiredState: {}", new Object[]{entry2, currentState, requiredState});
                        break;
                    }
                }
            } else if (entry2.type() == TierRecordType.PartitionDeleteInitiate || entry2.type() == TierRecordType.PartitionDeleteComplete) {
                this.resultListeners.getAndRemoveTracked(entry2).ifPresent(c -> c.complete(TierPartitionState.AppendResult.ACCEPTED));
            } else {
                this.resultListeners.getAndRemoveTracked(entry2).ifPresent(c -> c.completeExceptionally((Throwable)((Object)new TierMetadataRetriableException("Tier partition state for " + topicIdPartition + " does not exist"))));
            }
        }
        catch (Exception e) {
            throw new TierMetadataFatalException(String.format("Error processing message %s at offset %d, partition %d, requiredState %s", new Object[]{entry2, offset2, partition, requiredState}), e);
        }
    }

    synchronized Map<TopicIdPartition, ClientCtx> immigratingPartitions() {
        return new HashMap<TopicIdPartition, ClientCtx>(this.immigratingPartitions);
    }

    synchronized Map<TopicIdPartition, ClientCtx> onlinePartitions() {
        return new HashMap<TopicIdPartition, ClientCtx>(this.onlinePartitions);
    }

    synchronized Map<TopicIdPartition, ClientCtx> catchingUpPartitions() {
        return new HashMap<TopicIdPartition, ClientCtx>(this.catchingUpPartitions);
    }

    synchronized long numListeners() {
        return this.resultListeners.numListeners();
    }

    public static interface ClientCtx {
        public TierPartitionState.AppendResult process(AbstractTierMetadata var1, long var2);

        public TierPartitionStatus status();

        public void beginCatchup();

        public void completeCatchup();
    }
}

