/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.Map;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.sink.FileStoreSink;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.table.AppendOnlyFileStoreTable;

public class UnawareBucketWriteSink
extends FileStoreSink {
    private final boolean enableCompaction;
    private final AppendOnlyFileStoreTable table;
    private final Integer parallelism;

    public UnawareBucketWriteSink(AppendOnlyFileStoreTable table, Map<String, String> overwritePartitions, LogSinkFunction logSinkFunction, Integer parallelism) {
        super(table, overwritePartitions, logSinkFunction);
        this.table = table;
        this.enableCompaction = !table.coreOptions().writeOnly();
        this.parallelism = parallelism;
    }

    @Override
    public DataStreamSink<?> sinkFrom(DataStream<RowData> input, String initialCommitUser) {
        boolean isStreamingMode;
        DataStream written = this.doWrite(input, initialCommitUser, this.parallelism);
        boolean bl = isStreamingMode = input.getExecutionEnvironment().getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
        if (this.enableCompaction && isStreamingMode) {
            UnawareBucketCompactionTopoBuilder builder = new UnawareBucketCompactionTopoBuilder(input.getExecutionEnvironment(), this.table.name(), this.table);
            builder.withContinuousMode(true);
            written = written.union(new DataStream[]{builder.fetchUncommitted(initialCommitUser)});
        }
        return this.doCommit(written, initialCommitUser);
    }
}

