package org.apache.flink.connector.file.table.stream.compact;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages;
import org.apache.flink.connector.file.table.stream.compact.CompactReader;
import org.apache.flink.connector.file.table.stream.compact.CompactWriter;
import org.apache.flink.connector.file.table.utils.CompactFileUtils;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

@Internal
/* loaded from: input_file:org/apache/flink/connector/file/table/stream/compact/CompactOperator.class */
public class CompactOperator<T> extends AbstractStreamOperator<PartitionCommitInfo> implements OneInputStreamOperator<CompactMessages.CoordinatorOutput, PartitionCommitInfo>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    public static final String UNCOMPACTED_PREFIX = ".uncompacted-";
    public static final String COMPACTED_PREFIX = "compacted-";
    private final SupplierWithException<FileSystem, IOException> fsFactory;
    private final CompactReader.Factory<T> readerFactory;
    private final CompactWriter.Factory<T> writerFactory;
    private transient FileSystem fileSystem;
    private transient ListState<Map<Long, List<Path>>> expiredFilesState;
    private transient TreeMap<Long, List<Path>> expiredFiles;
    private transient List<Path> currentExpiredFiles;
    private transient Set<String> partitions;

    public CompactOperator(SupplierWithException<FileSystem, IOException> supplierWithException, CompactReader.Factory<T> factory, CompactWriter.Factory<T> factory2) {
        this.fsFactory = supplierWithException;
        this.readerFactory = factory;
        this.writerFactory = factory2;
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.partitions = new HashSet();
        this.fileSystem = (FileSystem) this.fsFactory.get();
        this.expiredFilesState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("expired-files", new MapSerializer(LongSerializer.INSTANCE, new ListSerializer(new KryoSerializer(Path.class, getExecutionConfig())))));
        this.expiredFiles = new TreeMap<>();
        this.currentExpiredFiles = new ArrayList();
        if (stateInitializationContext.isRestored()) {
            this.expiredFiles.putAll((Map) ((Iterable) this.expiredFilesState.get()).iterator().next());
        }
    }

    public void processElement(StreamRecord<CompactMessages.CoordinatorOutput> streamRecord) throws Exception {
        CompactMessages.CoordinatorOutput coordinatorOutput = (CompactMessages.CoordinatorOutput) streamRecord.getValue();
        if (!(coordinatorOutput instanceof CompactMessages.CompactionUnit)) {
            if (coordinatorOutput instanceof CompactMessages.EndCompaction) {
                endCompaction(((CompactMessages.EndCompaction) coordinatorOutput).getCheckpointId());
                return;
            }
            return;
        }
        CompactMessages.CompactionUnit compactionUnit = (CompactMessages.CompactionUnit) coordinatorOutput;
        if (compactionUnit.isTaskMessage(getRuntimeContext().getNumberOfParallelSubtasks(), getRuntimeContext().getIndexOfThisSubtask())) {
            String partition = compactionUnit.getPartition();
            List<Path> paths = compactionUnit.getPaths();
            CompactFileUtils.doCompact(this.fileSystem, partition, paths, createCompactedFile(paths), getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(), this.readerFactory, this.writerFactory);
            this.partitions.add(partition);
            this.currentExpiredFiles.addAll(paths);
        }
    }

    private void endCompaction(long j) {
        this.output.collect(new StreamRecord(new PartitionCommitInfo(j, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks(), new ArrayList(this.partitions))));
        this.partitions.clear();
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        snapshotState(stateSnapshotContext.getCheckpointId());
    }

    private void snapshotState(long j) throws Exception {
        this.expiredFilesState.clear();
        this.expiredFiles.put(Long.valueOf(j), new ArrayList(this.currentExpiredFiles));
        this.expiredFilesState.add(this.expiredFiles);
        this.currentExpiredFiles.clear();
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        clearExpiredFiles(j);
    }

    public void endInput() throws Exception {
        endCompaction(Long.MAX_VALUE);
        snapshotState(Long.MAX_VALUE);
        clearExpiredFiles(Long.MAX_VALUE);
    }

    private void clearExpiredFiles(long j) throws IOException {
        NavigableMap<Long, List<Path>> headMap = this.expiredFiles.headMap(Long.valueOf(j), true);
        Iterator<List<Path>> it = headMap.values().iterator();
        while (it.hasNext()) {
            Iterator<Path> it2 = it.next().iterator();
            while (it2.hasNext()) {
                this.fileSystem.delete(it2.next(), true);
            }
        }
        headMap.clear();
    }

    private static Path createCompactedFile(List<Path> list) {
        Path convertFromUncompacted = convertFromUncompacted(list.get(0));
        return new Path(convertFromUncompacted.getParent(), "compacted-" + convertFromUncompacted.getName());
    }

    public static String convertToUncompacted(String str) {
        return UNCOMPACTED_PREFIX + str;
    }

    public static Path convertFromUncompacted(Path path) {
        Preconditions.checkArgument(path.getName().startsWith(UNCOMPACTED_PREFIX), "This should be uncompacted file: " + path);
        return new Path(path.getParent(), path.getName().substring(UNCOMPACTED_PREFIX.length()));
    }
}
