/*
 * Decompiled with CFR 0.152.
 */
package io.delta.flink.sink.internal.committer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.delta.flink.internal.ConnectorUtils;
import io.delta.flink.internal.lang.Lazy;
import io.delta.flink.sink.internal.SchemaConverter;
import io.delta.flink.sink.internal.committables.DeltaCommittable;
import io.delta.flink.sink.internal.committables.DeltaGlobalCommittable;
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Operation;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.VersionLog;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.actions.Metadata;
import io.delta.standalone.actions.SetTransaction;
import io.delta.standalone.types.StructType;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.StringJoiner;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.DeltaPendingFile;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DeltaGlobalCommitter
implements GlobalCommitter<DeltaCommittable, DeltaGlobalCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(DeltaGlobalCommitter.class);
    private static final String APPEND_MODE = "Append";
    private final Configuration conf;
    private final RowType rowType;
    private final boolean mergeSchema;
    private final transient DeltaLog deltaLog;
    private transient boolean firstCommit = true;

    public DeltaGlobalCommitter(Configuration configuration, Path path, RowType rowType, boolean bl) {
        this.conf = configuration;
        this.rowType = rowType;
        this.mergeSchema = bl;
        this.deltaLog = DeltaLog.forTable((Configuration)configuration, (org.apache.hadoop.fs.Path)new org.apache.hadoop.fs.Path(path.toUri()));
    }

    public List<DeltaGlobalCommittable> filterRecoveredCommittables(List<DeltaGlobalCommittable> list) {
        return list;
    }

    public DeltaGlobalCommittable combine(List<DeltaCommittable> list) {
        if (LOG.isTraceEnabled()) {
            for (DeltaCommittable deltaCommittable : list) {
                LOG.trace("Creating global committable object with committable for: appId=" + deltaCommittable.getAppId() + " checkpointId=" + deltaCommittable.getCheckpointId() + " deltaPendingFile=" + deltaCommittable.getDeltaPendingFile());
            }
        }
        return new DeltaGlobalCommittable(list);
    }

    @Nullable
    private String resolveAppId(List<DeltaGlobalCommittable> list) {
        for (DeltaGlobalCommittable deltaGlobalCommittable : list) {
            Iterator<DeltaCommittable> iterator = deltaGlobalCommittable.getDeltaCommittables().iterator();
            if (!iterator.hasNext()) continue;
            DeltaCommittable deltaCommittable = iterator.next();
            return deltaCommittable.getAppId();
        }
        return null;
    }

    public List<DeltaGlobalCommittable> commit(List<DeltaGlobalCommittable> list) {
        String string = this.resolveAppId(list);
        if (string != null) {
            SortedMap<Long, List<CheckpointData>> sortedMap = this.getCommittablesPerCheckpoint(string, list, this.deltaLog);
            for (List<CheckpointData> list2 : sortedMap.values()) {
                this.doCommit(this.deltaLog.startTransaction(), list2, this.deltaLog.tableExists());
            }
        }
        this.firstCommit = false;
        return Collections.emptyList();
    }

    private SortedMap<Long, List<CheckpointData>> getCommittablesPerCheckpoint(String string, List<DeltaGlobalCommittable> list, DeltaLog deltaLog) {
        Lazy<Long> lazy = new Lazy<Long>(() -> deltaLog.startTransaction().txnVersion(string));
        if (!this.firstCommit || lazy.get() < 0L) {
            return this.groupCommittablesByCheckpointInterval(list);
        }
        Collection<CheckpointData> collection = this.deduplicateFiles(list, deltaLog, lazy.get());
        return this.groupCommittablesByCheckpointInterval(collection);
    }

    private Collection<CheckpointData> deduplicateFiles(List<DeltaGlobalCommittable> list, DeltaLog deltaLog, long l) {
        Object object;
        LOG.info("Processing what it seems like, a first commit. This can be first commit ever for this job or first commit after recovery.");
        HashMap<String, CheckpointData> hashMap = new HashMap<String, CheckpointData>();
        try {
            for (DeltaGlobalCommittable object22 : list) {
                for (DeltaCommittable iOException : object22.getDeltaCommittables()) {
                    object = iOException.getDeltaPendingFile().toAddFile();
                    hashMap.put(ConnectorUtils.tryRelativizePath(deltaLog.getPath().getFileSystem(this.conf), deltaLog.getPath(), new org.apache.hadoop.fs.Path(object.getPath())), new CheckpointData(iOException, (AddFile)object));
                }
            }
        }
        catch (IOException iOException) {
            throw new RuntimeException(String.format("Exception in Delta Sink, during iterating over Committable data for table path {%s}", deltaLog.getPath().toUri().toString()), iOException);
        }
        Iterator iterator = deltaLog.getChanges(l, true);
        StringJoiner stringJoiner = new StringJoiner(", ");
        while (iterator.hasNext()) {
            VersionLog versionLog = (VersionLog)iterator.next();
            try {
                DeltaCommittable iOException;
                iOException = versionLog.getActionsIterator();
                object = null;
                try {
                    iOException.forEachRemaining(action -> {
                        CheckpointData checkpointData;
                        if (action instanceof AddFile && (checkpointData = (CheckpointData)hashMap.remove(((AddFile)action).getPath())) != null) {
                            stringJoiner.add(checkpointData.addFile.getPath());
                        }
                    });
                }
                catch (Throwable throwable) {
                    object = throwable;
                    throw throwable;
                }
                finally {
                    if (iOException == null) continue;
                    if (object != null) {
                        try {
                            iOException.close();
                        }
                        catch (Throwable throwable) {
                            ((Throwable)object).addSuppressed(throwable);
                        }
                        continue;
                    }
                    iOException.close();
                }
            }
            catch (IOException iOException) {
                throw new RuntimeException(String.format("Exception in Delta Sink, during iterating over Delta table changes for table path {%s}", deltaLog.getPath().toUri().toString()), iOException);
            }
        }
        LOG.info("Files ignored after deduplication for first commit [" + stringJoiner + "]");
        return hashMap.values();
    }

    private void doCommit(OptimisticTransaction optimisticTransaction, List<CheckpointData> list, boolean bl) {
        DeltaPendingFile deltaPendingFile;
        String string = list.get(0).committable.getAppId();
        long l = list.get(0).committable.getCheckpointId();
        ArrayList<Object> arrayList = new ArrayList<Object>(list.size() + 1);
        arrayList.add(this.prepareSetTransactionAction(string, optimisticTransaction.readVersion()));
        Set<String> set = null;
        long l2 = 0L;
        long l3 = 0L;
        StringJoiner stringJoiner = new StringJoiner(", ");
        for (CheckpointData object2 : list) {
            boolean bl2;
            if (LOG.isDebugEnabled()) {
                stringJoiner.add(object2.addFile.getPath());
            }
            arrayList.add(object2.addFile);
            deltaPendingFile = object2.committable.getDeltaPendingFile();
            Set<String> set2 = deltaPendingFile.getPartitionSpec().keySet();
            if (set == null) {
                set = set2;
            }
            if (!(bl2 = this.compareKeysOfLinkedSets(set2, set))) {
                throw new RuntimeException("Partition columns cannot differ for files in the same checkpointId. checkpointId = " + l + ", file = " + deltaPendingFile.getFileName() + ", partition columns = " + String.join((CharSequence)",", deltaPendingFile.getPartitionSpec().keySet()) + " does not comply with partition columns from other checkpointData: " + String.join((CharSequence)",", set));
            }
            l2 += deltaPendingFile.getRecordCount();
            l3 += deltaPendingFile.getFileSize();
        }
        this.logGlobalCommitterData(string, l, stringJoiner);
        ArrayList arrayList2 = set == null ? Collections.emptyList() : new ArrayList(set);
        this.handleMetadataUpdate(bl, optimisticTransaction, arrayList2);
        Map<String, String> map = this.prepareOperationMetrics(arrayList.size() - 1, l2, l3);
        deltaPendingFile = this.prepareDeltaLogOperation(arrayList2, map);
        LOG.info(String.format("Attempting to commit transaction (appId='%s', checkpointId='%s')", string, l));
        optimisticTransaction.commit(arrayList, (Operation)deltaPendingFile, "flink-engine/1.16.1 flink-delta-connector/3.0.0");
        LOG.info(String.format("Successfully committed transaction (appId='%s', checkpointId='%s')", string, l));
    }

    private void logGlobalCommitterData(String string, long l, StringJoiner stringJoiner) {
        if (LOG.isInfoEnabled()) {
            LOG.info(stringJoiner.length() + " files to be committed to the Delta table for appId=" + string + " checkpointId=" + l + ".");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Files to be committed to the Delta table: appId=" + string + " checkpointId=" + l + " files [" + stringJoiner + "].");
        }
    }

    private void handleMetadataUpdate(boolean bl, OptimisticTransaction optimisticTransaction, List<String> list) {
        Metadata metadata = optimisticTransaction.metadata();
        if (bl && !list.equals(metadata.getPartitionColumns())) {
            throw new RuntimeException("Stream's partition columns are different from table's partitions columns. \nColumns in data files: " + Arrays.toString(list.toArray()) + "\nColumns in table: " + Arrays.toString(metadata.getPartitionColumns().toArray()));
        }
        StructType structType = metadata.getSchema();
        StructType structType2 = SchemaConverter.toDeltaDataType(this.rowType);
        boolean bl2 = this.areSchemasEqual(structType, structType2);
        if (!bl || !bl2 && this.mergeSchema) {
            Metadata metadata2 = metadata.copyBuilder().schema(structType2).partitionColumns(list).build();
            optimisticTransaction.updateMetadata(metadata2);
        } else if (!bl2) {
            String string = structType == null ? "null" : structType.toPrettyJson();
            String string2 = structType2.toPrettyJson();
            throw new RuntimeException("DataStream's schema is different from current table's schema. \nprovided: " + string + "\nis different from: " + string2);
        }
    }

    private boolean areSchemasEqual(@Nullable StructType structType, @Nullable StructType structType2) {
        if (structType == null || structType2 == null) {
            return false;
        }
        return structType.toJson().equals(structType2.toJson());
    }

    private SetTransaction prepareSetTransactionAction(String string, long l) {
        return new SetTransaction(string, l + 1L, Optional.of(System.currentTimeMillis()));
    }

    private Operation prepareDeltaLogOperation(List<String> list, Map<String, String> map) {
        HashMap<String, String> hashMap = new HashMap<String, String>();
        try {
            ObjectMapper objectMapper = new ObjectMapper();
            hashMap.put("mode", objectMapper.writeValueAsString((Object)APPEND_MODE));
            hashMap.put("partitionBy", objectMapper.writeValueAsString((Object)objectMapper.writeValueAsString(list)));
        }
        catch (JsonProcessingException jsonProcessingException) {
            throw new RuntimeException("Cannot map object to JSON", jsonProcessingException);
        }
        return new Operation(Operation.Name.STREAMING_UPDATE, hashMap, map);
    }

    private SortedMap<Long, List<CheckpointData>> groupCommittablesByCheckpointInterval(List<DeltaGlobalCommittable> list) {
        TreeMap<Long, List<CheckpointData>> treeMap = new TreeMap<Long, List<CheckpointData>>();
        for (DeltaGlobalCommittable deltaGlobalCommittable : list) {
            for (DeltaCommittable deltaCommittable : deltaGlobalCommittable.getDeltaCommittables()) {
                long l = deltaCommittable.getCheckpointId();
                AddFile addFile = deltaCommittable.getDeltaPendingFile().toAddFile();
                CheckpointData checkpointData = new CheckpointData(deltaCommittable, addFile);
                if (treeMap.containsKey(l)) {
                    ((List)treeMap.get(l)).add(checkpointData);
                    continue;
                }
                LinkedList<CheckpointData> linkedList = new LinkedList<CheckpointData>();
                linkedList.add(checkpointData);
                treeMap.put(l, linkedList);
            }
        }
        return treeMap;
    }

    private SortedMap<Long, List<CheckpointData>> groupCommittablesByCheckpointInterval(Collection<CheckpointData> collection) {
        TreeMap<Long, List<CheckpointData>> treeMap = new TreeMap<Long, List<CheckpointData>>();
        for (CheckpointData checkpointData : collection) {
            long l = checkpointData.committable.getCheckpointId();
            if (treeMap.containsKey(l)) {
                ((List)treeMap.get(l)).add(checkpointData);
                continue;
            }
            LinkedList<CheckpointData> linkedList = new LinkedList<CheckpointData>();
            linkedList.add(checkpointData);
            treeMap.put(l, linkedList);
        }
        return treeMap;
    }

    private Map<String, String> prepareOperationMetrics(int n, long l, long l2) {
        HashMap<String, String> hashMap = new HashMap<String, String>();
        hashMap.put("numRemovedFiles", "0");
        hashMap.put("numAddedFiles", String.valueOf(n));
        hashMap.put("numOutputRows", String.valueOf(l));
        hashMap.put("numOutputBytes", String.valueOf(l2));
        return hashMap;
    }

    private boolean compareKeysOfLinkedSets(Set<String> set, Set<String> set2) {
        Iterator<String> iterator = set.iterator();
        Iterator<String> iterator2 = set2.iterator();
        while (iterator.hasNext() && iterator2.hasNext()) {
            if (iterator.next().equals(iterator2.next())) continue;
            return false;
        }
        return true;
    }

    public void endOfInput() {
    }

    public void close() {
    }

    private static class CheckpointData {
        private final AddFile addFile;
        private final DeltaCommittable committable;

        private CheckpointData(DeltaCommittable deltaCommittable, AddFile addFile) {
            this.addFile = addFile;
            this.committable = deltaCommittable;
        }
    }
}

