/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.SetupableStreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.paimon.Snapshot;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.utils.SerializableSupplier;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

public class AutoTagForSavepointCommitterOperator<CommitT, GlobalCommitT>
implements OneInputStreamOperator<CommitT, CommitT>,
SetupableStreamOperator,
BoundedOneInput {
    public static final String SAVEPOINT_TAG_PREFIX = "savepoint-";
    private static final long serialVersionUID = 1L;
    private final CommitterOperator<CommitT, GlobalCommitT> commitOperator;
    private final SerializableSupplier<SnapshotManager> snapshotManagerFactory;
    private final SerializableSupplier<TagManager> tagManagerFactory;
    private final Set<Long> identifiersForTags;
    protected SnapshotManager snapshotManager;
    protected TagManager tagManager;
    private transient ListState<Long> identifiersForTagsState;

    public AutoTagForSavepointCommitterOperator(CommitterOperator<CommitT, GlobalCommitT> commitOperator, SerializableSupplier<SnapshotManager> snapshotManagerFactory, SerializableSupplier<TagManager> tagManagerFactory) {
        this.commitOperator = commitOperator;
        this.tagManagerFactory = tagManagerFactory;
        this.snapshotManagerFactory = snapshotManagerFactory;
        this.identifiersForTags = new HashSet<Long>();
    }

    public void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception {
        try {
            this.commitOperator.initializeState(streamTaskStateManager);
        }
        finally {
            this.snapshotManager = (SnapshotManager)this.snapshotManagerFactory.get();
            this.tagManager = (TagManager)this.tagManagerFactory.get();
            this.identifiersForTagsState = this.commitOperator.getOperatorStateBackend().getListState(new ListStateDescriptor("streaming_committer_for_tags_states", (TypeSerializer)LongSerializer.INSTANCE));
            ArrayList<Long> restored = new ArrayList<Long>();
            ((Iterable)this.identifiersForTagsState.get()).forEach(restored::add);
            this.identifiersForTagsState.clear();
            this.createTagForIdentifiers(restored);
        }
    }

    public OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory storageLocation) throws Exception {
        if (checkpointOptions.getCheckpointType().isSavepoint()) {
            this.identifiersForTags.add(checkpointId);
        }
        this.identifiersForTagsState.update(new ArrayList<Long>(this.identifiersForTags));
        return this.commitOperator.snapshotState(checkpointId, timestamp, checkpointOptions, storageLocation);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        this.commitOperator.notifyCheckpointComplete(checkpointId);
        if (this.identifiersForTags.remove(checkpointId)) {
            this.createTagForIdentifiers(Collections.singletonList(checkpointId));
        }
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        this.commitOperator.notifyCheckpointAborted(checkpointId);
        this.identifiersForTags.remove(checkpointId);
    }

    private void createTagForIdentifiers(List<Long> identifiers) {
        List<Snapshot> snapshotForTags = this.snapshotManager.findSnapshotsForIdentifiers(this.commitOperator.getCommitUser(), identifiers);
        for (Snapshot snapshot : snapshotForTags) {
            String tagName = SAVEPOINT_TAG_PREFIX + snapshot.commitIdentifier();
            if (this.tagManager.tagExists(tagName)) continue;
            this.tagManager.createTag(snapshot, tagName);
        }
    }

    public void open() throws Exception {
        this.commitOperator.open();
    }

    public void processElement(StreamRecord<CommitT> element) throws Exception {
        this.commitOperator.processElement(element);
    }

    public void processWatermark(Watermark watermark) throws Exception {
        this.commitOperator.processWatermark(watermark);
    }

    public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception {
        this.commitOperator.processWatermarkStatus(watermarkStatus);
    }

    public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        this.commitOperator.processLatencyMarker(latencyMarker);
    }

    public void finish() throws Exception {
        this.commitOperator.finish();
    }

    public void close() throws Exception {
        this.commitOperator.close();
    }

    public void prepareSnapshotPreBarrier(long checkpointId) throws Exception {
        this.commitOperator.prepareSnapshotPreBarrier(checkpointId);
    }

    public void setKeyContextElement1(StreamRecord<?> record) throws Exception {
        this.commitOperator.setKeyContextElement1(record);
    }

    public void setKeyContextElement2(StreamRecord<?> record) throws Exception {
        this.commitOperator.setKeyContextElement2(record);
    }

    public OperatorMetricGroup getMetricGroup() {
        return this.commitOperator.getMetricGroup();
    }

    public OperatorID getOperatorID() {
        return this.commitOperator.getOperatorID();
    }

    public void setCurrentKey(Object key) {
        this.commitOperator.setCurrentKey(key);
    }

    public Object getCurrentKey() {
        return this.commitOperator.getCurrentKey();
    }

    public void setKeyContextElement(StreamRecord<CommitT> record) throws Exception {
        this.commitOperator.setKeyContextElement(record);
    }

    public void endInput() throws Exception {
        this.commitOperator.endInput();
    }

    public void setup(StreamTask containingTask, StreamConfig config, Output output) {
        this.commitOperator.setup(containingTask, config, output);
    }

    public ChainingStrategy getChainingStrategy() {
        return this.commitOperator.getChainingStrategy();
    }

    public void setChainingStrategy(ChainingStrategy strategy) {
        this.commitOperator.setChainingStrategy(strategy);
    }
}

