/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.table.action.commit;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.execution.JavaLazyInsertIterable;
import org.apache.hudi.io.CreateHandleFactory;
import org.apache.hudi.io.HoodieConcatHandle;
import org.apache.hudi.io.HoodieMergeHandle;
import org.apache.hudi.io.HoodieSortedMergeHandle;
import org.apache.hudi.io.WriteHandleFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.hudi.table.action.HoodieWriteMetadata;
import org.apache.hudi.table.action.commit.BaseCommitActionExecutor;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
import org.apache.hudi.table.action.commit.JavaMergeHelper;
import org.apache.hudi.table.action.commit.JavaUpsertPartitioner;
import org.apache.hudi.table.action.commit.Partitioner;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public abstract class BaseJavaCommitActionExecutor<T extends HoodieRecordPayload>
extends BaseCommitActionExecutor<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>, HoodieWriteMetadata> {
    private static final Logger LOG = LogManager.getLogger(BaseJavaCommitActionExecutor.class);

    public BaseJavaCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType) {
        super(context, config, table, instantTime, operationType, Option.empty());
    }

    public BaseJavaCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable table, String instantTime, WriteOperationType operationType, Option extraMetadata) {
        super(context, config, table, instantTime, operationType, extraMetadata);
    }

    public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
        HoodieWriteMetadata result = new HoodieWriteMetadata();
        WorkloadProfile workloadProfile = null;
        if (this.isWorkloadProfileNeeded()) {
            workloadProfile = new WorkloadProfile(this.buildProfile(inputRecords), this.table.getIndex().canIndexLogFiles());
            LOG.info((Object)("Input workload profile :" + workloadProfile));
        }
        Partitioner partitioner = this.getPartitioner(workloadProfile);
        try {
            this.saveWorkloadProfileMetadataToInflight(workloadProfile, this.instantTime);
        }
        catch (Exception e) {
            HoodieTableMetaClient metaClient = this.table.getMetaClient();
            HoodieInstant inflightInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), this.instantTime);
            try {
                if (!metaClient.getFs().exists(new Path(metaClient.getMetaPath(), inflightInstant.getFileName()))) {
                    throw new HoodieCommitException("Failed to commit " + this.instantTime + " unable to save inflight metadata ", (Throwable)e);
                }
            }
            catch (IOException ex) {
                LOG.error((Object)"Check file exists failed");
                throw new HoodieCommitException("Failed to commit " + this.instantTime + " unable to save inflight metadata ", (Throwable)ex);
            }
        }
        Map<Integer, List<HoodieRecord<T>>> partitionedRecords = this.partition(inputRecords, partitioner);
        LinkedList<WriteStatus> writeStatuses = new LinkedList<WriteStatus>();
        partitionedRecords.forEach((partition, records) -> {
            if (WriteOperationType.isChangingRecords((WriteOperationType)this.operationType)) {
                this.handleUpsertPartition(this.instantTime, (Integer)partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
            } else {
                this.handleInsertPartition(this.instantTime, (Integer)partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);
            }
        });
        this.updateIndex(writeStatuses, (HoodieWriteMetadata<List<WriteStatus>>)result);
        this.updateIndexAndCommitIfNeeded(writeStatuses, result);
        return result;
    }

    protected List<WriteStatus> updateIndex(List<WriteStatus> writeStatuses, HoodieWriteMetadata<List<WriteStatus>> result) {
        Instant indexStartTime = Instant.now();
        List statuses = this.table.getIndex().updateLocation((HoodieData)HoodieListData.eager(writeStatuses), this.context, this.table).collectAsList();
        result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
        result.setWriteStatuses((Object)statuses);
        return statuses;
    }

    protected String getCommitActionType() {
        return this.table.getMetaClient().getCommitActionType();
    }

    private Partitioner getPartitioner(WorkloadProfile profile) {
        if (WriteOperationType.isChangingRecords((WriteOperationType)this.operationType)) {
            return this.getUpsertPartitioner(profile);
        }
        return this.getInsertPartitioner(profile);
    }

    private Map<Integer, List<HoodieRecord<T>>> partition(List<HoodieRecord<T>> dedupedRecords, Partitioner partitioner) {
        Map<Integer, List<Pair>> partitionedMidRecords = dedupedRecords.stream().map(record -> Pair.of((Object)Pair.of((Object)record.getKey(), (Object)Option.ofNullable((Object)record.getCurrentLocation())), (Object)record)).collect(Collectors.groupingBy(x -> partitioner.getPartition(x.getLeft())));
        LinkedHashMap results = new LinkedHashMap();
        partitionedMidRecords.forEach((key, value) -> results.put((Integer)key, value.stream().map(x -> (HoodieRecord)x.getRight()).collect(Collectors.toList())));
        return results;
    }

    protected Pair<HashMap<String, WorkloadStat>, WorkloadStat> buildProfile(List<HoodieRecord<T>> inputRecords) {
        HashMap<String, WorkloadStat> partitionPathStatMap = new HashMap<String, WorkloadStat>();
        WorkloadStat globalStat = new WorkloadStat();
        Map<Pair, Long> partitionLocationCounts = inputRecords.stream().map(record -> Pair.of((Object)Pair.of((Object)record.getPartitionPath(), (Object)Option.ofNullable((Object)record.getCurrentLocation())), (Object)record)).collect(Collectors.groupingBy(Pair::getLeft, Collectors.counting()));
        for (Map.Entry<Pair, Long> e : partitionLocationCounts.entrySet()) {
            String partitionPath = (String)e.getKey().getLeft();
            Long count = e.getValue();
            Option locOption = (Option)e.getKey().getRight();
            if (!partitionPathStatMap.containsKey(partitionPath)) {
                partitionPathStatMap.put(partitionPath, new WorkloadStat());
            }
            if (locOption.isPresent()) {
                ((WorkloadStat)partitionPathStatMap.get(partitionPath)).addUpdates((HoodieRecordLocation)locOption.get(), count.longValue());
                globalStat.addUpdates((HoodieRecordLocation)locOption.get(), count.longValue());
                continue;
            }
            ((WorkloadStat)partitionPathStatMap.get(partitionPath)).addInserts(count.longValue());
            globalStat.addInserts(count.longValue());
        }
        return Pair.of(partitionPathStatMap, (Object)globalStat);
    }

    protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result) {
        this.commit(extraMetadata, result, ((List)result.getWriteStatuses()).stream().map(WriteStatus::getStat).collect(Collectors.toList()));
    }

    protected void setCommitMetadata(HoodieWriteMetadata<List<WriteStatus>> result) {
        result.setCommitMetadata(Option.of((Object)CommitUtils.buildMetadata(((List)result.getWriteStatuses()).stream().map(WriteStatus::getStat).collect(Collectors.toList()), (Map)result.getPartitionToReplaceFileIds(), (Option)this.extraMetadata, (WriteOperationType)this.operationType, (String)this.getSchemaToStoreInCommit(), (String)this.getCommitActionType())));
    }

    protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata<List<WriteStatus>> result, List<HoodieWriteStat> writeStats) {
        String actionType = this.getCommitActionType();
        LOG.info((Object)("Committing " + this.instantTime + ", action Type " + actionType));
        result.setCommitted(true);
        result.setWriteStats(writeStats);
        this.finalizeWrite(this.instantTime, writeStats, result);
        try {
            LOG.info((Object)("Committing " + this.instantTime + ", action Type " + this.getCommitActionType()));
            HoodieActiveTimeline activeTimeline = this.table.getActiveTimeline();
            HoodieCommitMetadata metadata = (HoodieCommitMetadata)result.getCommitMetadata().get();
            this.writeTableMetadata(metadata, actionType);
            activeTimeline.saveAsComplete(new HoodieInstant(true, this.getCommitActionType(), this.instantTime), Option.of((Object)metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
            LOG.info((Object)("Committed " + this.instantTime));
            result.setCommitMetadata(Option.of((Object)metadata));
        }
        catch (IOException e) {
            throw new HoodieCommitException("Failed to complete commit " + this.config.getBasePath() + " at time " + this.instantTime, (Throwable)e);
        }
    }

    protected Map<String, List<String>> getPartitionToReplacedFileIds(HoodieWriteMetadata<List<WriteStatus>> writeMetadata) {
        return Collections.emptyMap();
    }

    protected boolean isWorkloadProfileNeeded() {
        return true;
    }

    protected Iterator<List<WriteStatus>> handleUpsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        JavaUpsertPartitioner javaUpsertPartitioner = (JavaUpsertPartitioner)partitioner;
        BucketInfo binfo = javaUpsertPartitioner.getBucketInfo(partition);
        BucketType btype = binfo.bucketType;
        try {
            if (btype.equals((Object)BucketType.INSERT)) {
                return this.handleInsert(binfo.fileIdPrefix, recordItr);
            }
            if (btype.equals((Object)BucketType.UPDATE)) {
                return this.handleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);
            }
            throw new HoodieUpsertException("Unknown bucketType " + btype + " for partition :" + partition);
        }
        catch (Throwable t) {
            String msg = "Error upserting bucketType " + btype + " for partition :" + partition;
            LOG.error((Object)msg, t);
            throw new HoodieUpsertException(msg, t);
        }
    }

    protected Iterator<List<WriteStatus>> handleInsertPartition(String instantTime, Integer partition, Iterator recordItr, Partitioner partitioner) {
        return this.handleUpsertPartition(instantTime, partition, recordItr, partitioner);
    }

    public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) throws IOException {
        if (!recordItr.hasNext()) {
            LOG.info((Object)("Empty partition with fileId => " + fileId));
            return Collections.singletonList(Collections.EMPTY_LIST).iterator();
        }
        HoodieMergeHandle upsertHandle = this.getUpdateHandle(partitionPath, fileId, recordItr);
        return this.handleUpdateInternal(upsertHandle, fileId);
    }

    protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId) throws IOException {
        if (upsertHandle.getOldFilePath() == null) {
            throw new HoodieUpsertException("Error in finding the old file path at commit " + this.instantTime + " for fileId: " + fileId);
        }
        JavaMergeHelper.newInstance().runMerge(this.table, upsertHandle);
        List statuses = upsertHandle.writeStatuses();
        if (upsertHandle.getPartitionPath() == null) {
            LOG.info((Object)("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", " + statuses));
        }
        return Collections.singletonList(statuses).iterator();
    }

    protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
        if (this.table.requireSortedRecords()) {
            return new HoodieSortedMergeHandle(this.config, this.instantTime, this.table, recordItr, partitionPath, fileId, this.taskContextSupplier, Option.empty());
        }
        if (!WriteOperationType.isChangingRecords((WriteOperationType)this.operationType) && this.config.allowDuplicateInserts()) {
            return new HoodieConcatHandle(this.config, this.instantTime, this.table, recordItr, partitionPath, fileId, this.taskContextSupplier, Option.empty());
        }
        return new HoodieMergeHandle(this.config, this.instantTime, this.table, recordItr, partitionPath, fileId, this.taskContextSupplier, Option.empty());
    }

    protected HoodieMergeHandle getUpdateHandle(String partitionPath, String fileId, Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
        return new HoodieMergeHandle(this.config, this.instantTime, this.table, keyToNewRecords, partitionPath, fileId, dataFileToBeMerged, this.taskContextSupplier, Option.empty());
    }

    public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
        if (!recordItr.hasNext()) {
            LOG.info((Object)"Empty partition");
            return Collections.singletonList(Collections.EMPTY_LIST).iterator();
        }
        return new JavaLazyInsertIterable<T>(recordItr, true, this.config, this.instantTime, this.table, idPfx, this.taskContextSupplier, (WriteHandleFactory)new CreateHandleFactory());
    }

    public Partitioner getUpsertPartitioner(WorkloadProfile profile) {
        if (profile == null) {
            throw new HoodieUpsertException("Need workload profile to construct the upsert partitioner.");
        }
        return new JavaUpsertPartitioner(profile, this.context, this.table, this.config);
    }

    public Partitioner getInsertPartitioner(WorkloadProfile profile) {
        return this.getUpsertPartitioner(profile);
    }

    public void updateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses, HoodieWriteMetadata result) {
        Instant indexStartTime = Instant.now();
        List statuses = this.table.getIndex().updateLocation((HoodieData)HoodieListData.eager(writeStatuses), this.context, this.table).collectAsList();
        result.setIndexUpdateDuration(Duration.between(indexStartTime, Instant.now()));
        result.setWriteStatuses((Object)statuses);
        result.setPartitionToReplaceFileIds(this.getPartitionToReplacedFileIds((HoodieWriteMetadata<List<WriteStatus>>)result));
        this.commitOnAutoCommit(result);
    }
}

