package io.confluent.parallelconsumer.state;

import io.confluent.csid.utils.BackportUtils;
import io.confluent.csid.utils.JavaUtils;
import io.confluent.csid.utils.StringUtils;
import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.internal.RateLimiter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/parallelconsumer/state/ProcessingShard.class */
public class ProcessingShard<K, V> {
    private static final Logger log = LoggerFactory.getLogger(ProcessingShard.class);
    private final ShardKey key;
    private final ParallelConsumerOptions<?, ?> options;
    private final PartitionStateManager<K, V> pm;
    private final NavigableMap<Long, WorkContainer<K, V>> entries = new ConcurrentSkipListMap();
    private final RateLimiter slowWarningRateLimit = new RateLimiter(5);

    public boolean workIsWaitingToBeProcessed() {
        return this.entries.values().parallelStream().anyMatch(workContainer -> {
            return workContainer.isAvailableToTakeAsWork();
        });
    }

    public void addWorkContainer(WorkContainer<K, V> workContainer) {
        long offset = workContainer.offset();
        if (this.entries.containsKey(Long.valueOf(offset))) {
            log.debug("Entry for {} already exists in shard queue, dropping record", workContainer);
        } else {
            this.entries.put(Long.valueOf(offset), workContainer);
        }
    }

    public void onSuccess(WorkContainer<?, ?> workContainer) {
        this.entries.remove(Long.valueOf(workContainer.offset()));
    }

    public boolean isEmpty() {
        return this.entries.isEmpty();
    }

    public long getCountOfWorkAwaitingSelection() {
        return this.entries.values().stream().filter((v0) -> {
            return v0.isAvailableToTakeAsWork();
        }).count();
    }

    public long getCountOfWorkTracked() {
        return this.entries.size();
    }

    public long getCountWorkInFlight() {
        return this.entries.values().stream().filter((v0) -> {
            return v0.isInFlight();
        }).count();
    }

    public WorkContainer<K, V> remove(long j) {
        return (WorkContainer) this.entries.remove(Long.valueOf(j));
    }

    public boolean removeStaleWorkContainersFromShard() {
        return this.entries.entrySet().removeIf(entry -> {
            return isWorkContainerStale((WorkContainer) entry.getValue());
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ArrayList<WorkContainer<K, V>> getWorkIfAvailable(int i) {
        log.trace("Looking for work on shardQueueEntry: {}", getKey());
        HashSet hashSet = new HashSet();
        ArrayList<WorkContainer<K, V>> arrayList = new ArrayList<>();
        Iterator<Map.Entry<Long, WorkContainer<K, V>>> it = this.entries.entrySet().iterator();
        while (true) {
            if (arrayList.size() >= i || !it.hasNext()) {
                break;
            }
            WorkContainer<K, V> value = it.next().getValue();
            if (!this.pm.couldBeTakenAsWork(value)) {
                log.trace("Partition for shard {} is blocked for work taking, stopping shard scan", this);
                break;
            }
            if (value.isAvailableToTakeAsWork()) {
                log.trace("Taking {} as work", value);
                value.onQueueingForExecution();
                arrayList.add(value);
            } else {
                log.trace("Skipping {} as work, not available to take as work", value);
                addToSlowWorkMaybe(hashSet, value);
            }
            if (isOrderRestricted()) {
                log.trace("Processing by {}, so have cannot get more messages on this ({}) shardEntry.", this.options.getOrdering(), getKey());
                break;
            }
        }
        if (arrayList.size() == i) {
            log.trace("Work taken ({}) exceeds max ({})", Integer.valueOf(arrayList.size()), Integer.valueOf(i));
        }
        logSlowWork(hashSet);
        return arrayList;
    }

    private void logSlowWork(Set<WorkContainer<?, ?>> set) {
        if (set.isEmpty()) {
            return;
        }
        List list = (List) set.parallelStream().map(workContainer -> {
            return workContainer.getTopicPartition().toString();
        }).distinct().collect(Collectors.toList());
        this.slowWarningRateLimit.performIfNotLimited(() -> {
            log.warn("Warning: {} records in the queue have been waiting longer than {}s for following topics {}.", new Object[]{Integer.valueOf(set.size()), Long.valueOf(BackportUtils.toSeconds(this.options.getThresholdForTimeSpendInQueueWarning())), list});
        });
    }

    private void addToSlowWorkMaybe(Set<WorkContainer<?, ?>> set, WorkContainer<?, ?> workContainer) {
        Duration timeInFlight = workContainer.getTimeInFlight();
        Duration thresholdForTimeSpendInQueueWarning = this.options.getThresholdForTimeSpendInQueueWarning();
        if (!JavaUtils.isGreaterThan(timeInFlight, thresholdForTimeSpendInQueueWarning)) {
            if (log.isTraceEnabled()) {
                log.trace(cantTakeAsWorkMsg(workContainer, timeInFlight));
            }
        } else {
            if (!set.contains(workContainer)) {
                this.pm.incrementSlowWorkCounter(workContainer.getTopicPartition());
            }
            set.add(workContainer);
            if (log.isTraceEnabled()) {
                log.trace("Work has spent over " + thresholdForTimeSpendInQueueWarning + " in queue! " + cantTakeAsWorkMsg(workContainer, timeInFlight));
            }
        }
    }

    private static String cantTakeAsWorkMsg(WorkContainer<?, ?> workContainer, Duration duration) {
        Object[] objArr = new Object[5];
        objArr[0] = workContainer;
        objArr[1] = Boolean.valueOf(workContainer.isDelayPassed());
        objArr[2] = Boolean.valueOf(workContainer.isNotInFlight());
        objArr[3] = Boolean.valueOf(!workContainer.isUserFunctionSucceeded());
        objArr[4] = duration;
        return StringUtils.msg("Can't take as work: Work ({}). Must all be true: Delay passed= {}. Is not in flight= {}. Has not succeeded already= {}. Time spent in execution queue: {}.", objArr);
    }

    private boolean isOrderRestricted() {
        return this.options.getOrdering() != ParallelConsumerOptions.ProcessingOrder.UNORDERED;
    }

    private boolean isWorkContainerStale(WorkContainer<K, V> workContainer) {
        return this.pm.getPartitionState(workContainer).checkIfWorkIsStale(workContainer);
    }

    public ProcessingShard(ShardKey shardKey, ParallelConsumerOptions<?, ?> parallelConsumerOptions, PartitionStateManager<K, V> partitionStateManager) {
        this.key = shardKey;
        this.options = parallelConsumerOptions;
        this.pm = partitionStateManager;
    }

    public NavigableMap<Long, WorkContainer<K, V>> getEntries() {
        return this.entries;
    }

    private ShardKey getKey() {
        return this.key;
    }
}
