/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.dynamic.source.enumerator;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.metrics.groups.SplitEnumeratorMetricGroup;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StoppableKafkaEnumContextProxy
implements SplitEnumeratorContext<KafkaPartitionSplit>,
AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(StoppableKafkaEnumContextProxy.class);
    private final String kafkaClusterId;
    private final KafkaMetadataService kafkaMetadataService;
    private final SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
    private final ScheduledExecutorService subEnumeratorWorker;
    private final Runnable signalNoMoreSplitsCallback;
    private boolean noMoreSplits = false;
    private volatile boolean isClosing;

    public StoppableKafkaEnumContextProxy(String kafkaClusterId, KafkaMetadataService kafkaMetadataService, SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext, @Nullable Runnable signalNoMoreSplitsCallback) {
        this.kafkaClusterId = kafkaClusterId;
        this.kafkaMetadataService = kafkaMetadataService;
        this.enumContext = enumContext;
        this.subEnumeratorWorker = Executors.newScheduledThreadPool(1, (ThreadFactory)new ExecutorThreadFactory(kafkaClusterId + "-enum-worker"));
        this.signalNoMoreSplitsCallback = signalNoMoreSplitsCallback;
        this.isClosing = false;
    }

    public SplitEnumeratorMetricGroup metricGroup() {
        return this.enumContext.metricGroup();
    }

    public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
        this.enumContext.sendEventToSourceReader(subtaskId, event);
    }

    public int currentParallelism() {
        return this.enumContext.currentParallelism();
    }

    public Map<Integer, ReaderInfo> registeredReaders() {
        return this.enumContext.registeredReaders();
    }

    public void assignSplits(SplitsAssignment<KafkaPartitionSplit> newSplitAssignments) {
        if (logger.isInfoEnabled()) {
            logger.info("Assigning {} splits for cluster {}: {}", new Object[]{newSplitAssignments.assignment().values().stream().mapToLong(Collection::size).sum(), this.kafkaClusterId, newSplitAssignments});
        }
        HashMap readerToSplitsMap = new HashMap();
        newSplitAssignments.assignment().forEach((subtask, splits) -> readerToSplitsMap.put(subtask, splits.stream().map(split -> new DynamicKafkaSourceSplit(this.kafkaClusterId, (KafkaPartitionSplit)split)).collect(Collectors.toList())));
        if (!readerToSplitsMap.isEmpty()) {
            this.enumContext.assignSplits(new SplitsAssignment(readerToSplitsMap));
        }
    }

    public void signalNoMoreSplits(int subtask) {
        this.noMoreSplits = true;
        if (this.signalNoMoreSplitsCallback != null) {
            this.signalNoMoreSplitsCallback.run();
        }
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler) {
        this.enumContext.callAsync(this.wrapCallAsyncCallable(callable), this.wrapCallAsyncCallableHandler(handler));
    }

    public <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler, long initialDelay, long period) {
        this.subEnumeratorWorker.scheduleAtFixedRate(() -> this.callAsync(callable, handler), initialDelay, period, TimeUnit.MILLISECONDS);
    }

    public void runInCoordinatorThread(Runnable runnable) {
        this.enumContext.runInCoordinatorThread(runnable);
    }

    public boolean isNoMoreSplits() {
        return this.noMoreSplits;
    }

    @Override
    public void close() throws Exception {
        logger.info("Closing enum context for {}", (Object)this.kafkaClusterId);
        if (this.subEnumeratorWorker != null) {
            this.isClosing = true;
            this.subEnumeratorWorker.shutdown();
            this.subEnumeratorWorker.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
        }
    }

    protected <T> Callable<T> wrapCallAsyncCallable(Callable<T> callable) {
        return () -> {
            try {
                return callable.call();
            }
            catch (Exception e) {
                if (this.isClosing) {
                    throw new HandledFlinkKafkaException(e, this.kafkaClusterId);
                }
                Optional throwable = ExceptionUtils.findThrowable((Throwable)e, KafkaException.class);
                if (throwable.isPresent() && !this.kafkaMetadataService.isClusterActive(this.kafkaClusterId)) {
                    throw new HandledFlinkKafkaException((Throwable)throwable.get(), this.kafkaClusterId);
                }
                throw e;
            }
        };
    }

    protected <T> BiConsumer<T, Throwable> wrapCallAsyncCallableHandler(BiConsumer<T, Throwable> mainHandler) {
        return (result, t) -> {
            Optional throwable = ExceptionUtils.findThrowable((Throwable)t, HandledFlinkKafkaException.class);
            if (throwable.isPresent()) {
                logger.warn("Swallowed handled exception for {}.", (Object)this.kafkaClusterId, throwable.get());
                return;
            }
            mainHandler.accept((Object)result, (Throwable)t);
        };
    }

    @Internal
    public static interface StoppableKafkaEnumContextProxyFactory {
        public StoppableKafkaEnumContextProxy create(SplitEnumeratorContext<DynamicKafkaSourceSplit> var1, String var2, KafkaMetadataService var3, Runnable var4);

        public static StoppableKafkaEnumContextProxyFactory getDefaultFactory() {
            return (enumContext, kafkaClusterId, kafkaMetadataService, signalNoMoreSplitsCallback) -> new StoppableKafkaEnumContextProxy(kafkaClusterId, kafkaMetadataService, (SplitEnumeratorContext<DynamicKafkaSourceSplit>)enumContext, signalNoMoreSplitsCallback);
        }
    }

    @Internal
    public static class HandledFlinkKafkaException
    extends RuntimeException {
        private static final String ERROR_MESSAGE = "An error occurred with %s";
        private final String kafkaClusterId;

        public HandledFlinkKafkaException(Throwable cause, String kafkaClusterId) {
            super(cause);
            this.kafkaClusterId = kafkaClusterId;
        }

        @Override
        public String getMessage() {
            return String.format(ERROR_MESSAGE, this.kafkaClusterId);
        }
    }
}

