package org.apache.flink.table.filesystem.stream.compact;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.function.Function;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.filesystem.stream.TaskTracker;
import org.apache.flink.table.filesystem.stream.compact.CompactMessages;
import org.apache.flink.table.runtime.util.BinPacking;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/table/filesystem/stream/compact/CompactCoordinator.class */
public class CompactCoordinator extends AbstractStreamOperator<CompactMessages.CoordinatorOutput> implements OneInputStreamOperator<CompactMessages.CoordinatorInput, CompactMessages.CoordinatorOutput> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(CompactCoordinator.class);
    private final SupplierWithException<FileSystem, IOException> fsFactory;
    private final long targetFileSize;
    private transient FileSystem fileSystem;
    private transient ListState<Map<Long, Map<String, List<Path>>>> inputFilesState;
    private transient TreeMap<Long, Map<String, List<Path>>> inputFiles;
    private transient Map<String, List<Path>> currentInputFiles;
    private transient TaskTracker inputTaskTracker;

    public CompactCoordinator(SupplierWithException<FileSystem, IOException> supplierWithException, long j) {
        this.fsFactory = supplierWithException;
        this.targetFileSize = j;
    }

    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.fileSystem = (FileSystem) this.fsFactory.get();
        this.inputFilesState = stateInitializationContext.getOperatorStateStore().getListState(new ListStateDescriptor("files-state", new MapSerializer(LongSerializer.INSTANCE, new MapSerializer(StringSerializer.INSTANCE, new ListSerializer(new KryoSerializer(Path.class, getExecutionConfig()))))));
        this.inputFiles = new TreeMap<>();
        this.currentInputFiles = new HashMap();
        if (stateInitializationContext.isRestored()) {
            this.inputFiles.putAll((Map) ((Iterable) this.inputFilesState.get()).iterator().next());
        }
    }

    public void processElement(StreamRecord<CompactMessages.CoordinatorInput> streamRecord) throws Exception {
        CompactMessages.CoordinatorInput coordinatorInput = (CompactMessages.CoordinatorInput) streamRecord.getValue();
        if (coordinatorInput instanceof CompactMessages.InputFile) {
            CompactMessages.InputFile inputFile = (CompactMessages.InputFile) coordinatorInput;
            this.currentInputFiles.computeIfAbsent(inputFile.getPartition(), str -> {
                return new ArrayList();
            }).add(inputFile.getFile());
        } else {
            if (!(coordinatorInput instanceof CompactMessages.EndCheckpoint)) {
                throw new UnsupportedOperationException("Unsupported input message: " + coordinatorInput);
            }
            CompactMessages.EndCheckpoint endCheckpoint = (CompactMessages.EndCheckpoint) coordinatorInput;
            if (this.inputTaskTracker == null) {
                this.inputTaskTracker = new TaskTracker(endCheckpoint.getNumberOfTasks());
            }
            if (this.inputTaskTracker.add(endCheckpoint.getCheckpointId(), endCheckpoint.getTaskId())) {
                commitUpToCheckpoint(endCheckpoint.getCheckpointId());
            }
        }
    }

    private void commitUpToCheckpoint(long j) {
        NavigableMap<Long, Map<String, List<Path>>> headMap = this.inputFiles.headMap(Long.valueOf(j), true);
        for (Map.Entry<Long, Map<String, List<Path>>> entry : headMap.entrySet()) {
            coordinate(entry.getKey().longValue(), entry.getValue());
        }
        headMap.clear();
    }

    private void coordinate(long j, Map<String, List<Path>> map) {
        Function function = path -> {
            try {
                return Long.valueOf(this.fileSystem.getFileStatus(path).getLen());
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        };
        HashMap hashMap = new HashMap();
        map.forEach((str, list) -> {
            list.sort(Comparator.comparing((v0) -> {
                return v0.getPath();
            }));
            hashMap.put(str, BinPacking.pack(list, function, this.targetFileSize));
        });
        int i = 0;
        for (Map.Entry entry : hashMap.entrySet()) {
            String str2 = (String) entry.getKey();
            Iterator it = ((List) entry.getValue()).iterator();
            while (it.hasNext()) {
                this.output.collect(new StreamRecord(new CompactMessages.CompactionUnit(i, str2, (List) it.next())));
                i++;
            }
        }
        LOG.debug("Coordinate checkpoint-{}, compaction units are: {}", Long.valueOf(j), hashMap);
        this.output.collect(new StreamRecord(new CompactMessages.EndCompaction(j)));
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.inputFilesState.clear();
        this.inputFiles.put(Long.valueOf(stateSnapshotContext.getCheckpointId()), new HashMap(this.currentInputFiles));
        this.inputFilesState.add(this.inputFiles);
        this.currentInputFiles.clear();
    }
}
