/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink;

import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.java.typeutils.EitherTypeInfo;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.FileSinkCommittableSerializer;
import org.apache.flink.connector.file.sink.committer.FileCommitter;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.FileCompactor;
import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinatorFactory;
import org.apache.flink.connector.file.sink.compactor.operator.CompactCoordinatorStateHandlerFactory;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorFactory;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandlerFactory;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequestTypeInfo;
import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
import org.apache.flink.connector.file.sink.writer.FileWriter;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableSupplierWithException;

@Experimental
public class FileSink<IN>
implements StatefulSink<IN, FileWriterBucketState>,
TwoPhaseCommittingSink<IN, FileSinkCommittable>,
StatefulSink.WithCompatibleState,
WithPreCommitTopology<IN, FileSinkCommittable> {
    private final BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuilder;

    private FileSink(BucketsBuilder<IN, ? extends BucketsBuilder<IN, ?>> bucketsBuilder) {
        this.bucketsBuilder = (BucketsBuilder)Preconditions.checkNotNull(bucketsBuilder);
    }

    public FileWriter<IN> createWriter(Sink.InitContext context) throws IOException {
        return this.restoreWriter(context, (Collection)Collections.emptyList());
    }

    public FileWriter<IN> restoreWriter(Sink.InitContext context, Collection<FileWriterBucketState> recoveredState) throws IOException {
        FileWriter<IN> writer = this.bucketsBuilder.createWriter(context);
        writer.initializeState(recoveredState);
        return writer;
    }

    public SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer() {
        try {
            return this.bucketsBuilder.getWriterStateSerializer();
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Could not create writer state serializer.", (Throwable)e);
        }
    }

    public Committer<FileSinkCommittable> createCommitter() throws IOException {
        return this.bucketsBuilder.createCommitter();
    }

    public SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() {
        try {
            return this.bucketsBuilder.getCommittableSerializer();
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Could not create committable serializer.", (Throwable)e);
        }
    }

    public Collection<String> getCompatibleWriterStateNames() {
        return Collections.singleton("bucket-states");
    }

    public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(Path basePath, Encoder<IN> encoder) {
        return new DefaultRowFormatBuilder(basePath, encoder, (BucketAssigner)new DateTimeBucketAssigner());
    }

    public static <IN> DefaultBulkFormatBuilder<IN> forBulkFormat(Path basePath, BulkWriter.Factory<IN> bulkWriterFactory) {
        return new DefaultBulkFormatBuilder(basePath, bulkWriterFactory, (BucketAssigner)new DateTimeBucketAssigner());
    }

    public DataStream<CommittableMessage<FileSinkCommittable>> addPreCommitTopology(DataStream<CommittableMessage<FileSinkCommittable>> committableStream) {
        FileCompactStrategy strategy = this.bucketsBuilder.getCompactStrategy();
        if (strategy == null && !this.bucketsBuilder.isCompactDisabledExplicitly()) {
            return committableStream;
        }
        if (strategy == null) {
            SingleOutputStreamOperator coordinatorOp = committableStream.forward().transform("CompactorCoordinatorPlaceHolder", (TypeInformation)new EitherTypeInfo(committableStream.getType(), (TypeInformation)new CompactorRequestTypeInfo((SerializableSupplierWithException<SimpleVersionedSerializer<FileSinkCommittable>, IOException>)((SerializableSupplierWithException & Serializable)this.bucketsBuilder::getCommittableSerializer))), (OneInputStreamOperatorFactory)new CompactCoordinatorStateHandlerFactory((SerializableSupplierWithException<SimpleVersionedSerializer<FileSinkCommittable>, IOException>)((SerializableSupplierWithException & Serializable)this.bucketsBuilder::getCommittableSerializer))).setParallelism(committableStream.getParallelism()).uid("FileSinkCompactorCoordinator");
            return coordinatorOp.forward().transform("CompactorOperatorPlaceHolder", committableStream.getType(), (OneInputStreamOperatorFactory)new CompactorOperatorStateHandlerFactory((SerializableSupplierWithException<SimpleVersionedSerializer<FileSinkCommittable>, IOException>)((SerializableSupplierWithException & Serializable)this.bucketsBuilder::getCommittableSerializer), this.bucketsBuilder::createBucketWriter)).setParallelism(committableStream.getParallelism()).uid("FileSinkCompactorOperator");
        }
        SingleOutputStreamOperator coordinatorOp = committableStream.rebalance().transform("CompactorCoordinator", (TypeInformation)new CompactorRequestTypeInfo((SerializableSupplierWithException<SimpleVersionedSerializer<FileSinkCommittable>, IOException>)((SerializableSupplierWithException & Serializable)this.bucketsBuilder::getCommittableSerializer)), (OneInputStreamOperatorFactory)new CompactCoordinatorFactory(strategy, (SerializableSupplierWithException<SimpleVersionedSerializer<FileSinkCommittable>, IOException>)((SerializableSupplierWithException & Serializable)this.bucketsBuilder::getCommittableSerializer))).setParallelism(1).uid("FileSinkCompactorCoordinator");
        TypeInformation committableType = committableStream.getType();
        return coordinatorOp.transform("CompactorOperator", committableType, (OneInputStreamOperatorFactory)new CompactorOperatorFactory(strategy, this.bucketsBuilder.getFileCompactor(), (SerializableSupplierWithException<SimpleVersionedSerializer<FileSinkCommittable>, IOException>)((SerializableSupplierWithException & Serializable)this.bucketsBuilder::getCommittableSerializer), this.bucketsBuilder::createBucketWriter)).setParallelism(committableStream.getParallelism()).uid("FileSinkCompactorOperator");
    }

    public static final class DefaultBulkFormatBuilder<IN>
    extends BulkFormatBuilder<IN, DefaultBulkFormatBuilder<IN>> {
        private static final long serialVersionUID = 7493169281036370228L;

        private DefaultBulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, String> assigner) {
            super(basePath, writerFactory, assigner);
        }
    }

    @PublicEvolving
    public static class BulkFormatBuilder<IN, T extends BulkFormatBuilder<IN, T>>
    extends BucketsBuilder<IN, T> {
        private static final long serialVersionUID = 1L;
        private final Path basePath;
        private long bucketCheckInterval;
        private final BulkWriter.Factory<IN> writerFactory;
        private final FileWriterBucketFactory<IN> bucketFactory;
        private BucketAssigner<IN, String> bucketAssigner;
        private CheckpointRollingPolicy<IN, String> rollingPolicy;
        private OutputFileConfig outputFileConfig;
        private boolean isCompactDisabledExplicitly = false;
        private FileCompactStrategy compactStrategy;
        private FileCompactor fileCompactor;

        protected BulkFormatBuilder(Path basePath, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, String> assigner) {
            this(basePath, 60000L, writerFactory, assigner, (CheckpointRollingPolicy<IN, String>)OnCheckpointRollingPolicy.build(), new DefaultFileWriterBucketFactory(), OutputFileConfig.builder().build());
        }

        protected BulkFormatBuilder(Path basePath, long bucketCheckInterval, BulkWriter.Factory<IN> writerFactory, BucketAssigner<IN, String> assigner, CheckpointRollingPolicy<IN, String> policy, FileWriterBucketFactory<IN> bucketFactory, OutputFileConfig outputFileConfig) {
            this.basePath = (Path)Preconditions.checkNotNull((Object)basePath);
            this.bucketCheckInterval = bucketCheckInterval;
            this.writerFactory = writerFactory;
            this.bucketAssigner = (BucketAssigner)Preconditions.checkNotNull(assigner);
            this.rollingPolicy = (CheckpointRollingPolicy)Preconditions.checkNotNull(policy);
            this.bucketFactory = (FileWriterBucketFactory)Preconditions.checkNotNull(bucketFactory);
            this.outputFileConfig = (OutputFileConfig)Preconditions.checkNotNull((Object)outputFileConfig);
        }

        public T withBucketCheckInterval(long interval) {
            this.bucketCheckInterval = interval;
            return (T)((BulkFormatBuilder)this.self());
        }

        public T withBucketAssigner(BucketAssigner<IN, String> assigner) {
            this.bucketAssigner = (BucketAssigner)Preconditions.checkNotNull(assigner);
            return (T)((BulkFormatBuilder)this.self());
        }

        public T withRollingPolicy(CheckpointRollingPolicy<IN, String> rollingPolicy) {
            this.rollingPolicy = (CheckpointRollingPolicy)Preconditions.checkNotNull(rollingPolicy);
            return (T)((BulkFormatBuilder)this.self());
        }

        public T withOutputFileConfig(OutputFileConfig outputFileConfig) {
            this.outputFileConfig = outputFileConfig;
            return (T)((BulkFormatBuilder)this.self());
        }

        public BulkFormatBuilder<IN, ? extends BulkFormatBuilder<IN, ?>> withNewBucketAssigner(BucketAssigner<IN, String> assigner) {
            Preconditions.checkState((this.bucketFactory.getClass() == DefaultFileWriterBucketFactory.class ? 1 : 0) != 0, (Object)"newBuilderWithBucketAssigner() cannot be called after specifying a customized bucket factory");
            return new BulkFormatBuilder<IN, T>(this.basePath, this.bucketCheckInterval, this.writerFactory, (BucketAssigner)Preconditions.checkNotNull(assigner), this.rollingPolicy, this.bucketFactory, this.outputFileConfig);
        }

        public T enableCompact(FileCompactStrategy strategy, FileCompactor compactor) {
            this.compactStrategy = (FileCompactStrategy)Preconditions.checkNotNull((Object)strategy);
            this.fileCompactor = (FileCompactor)Preconditions.checkNotNull((Object)compactor);
            return (T)((BulkFormatBuilder)this.self());
        }

        public T disableCompact() {
            this.isCompactDisabledExplicitly = true;
            return (T)((BulkFormatBuilder)this.self());
        }

        public FileSink<IN> build() {
            return new FileSink(this);
        }

        @Override
        FileWriter<IN> createWriter(Sink.InitContext context) throws IOException {
            OutputFileConfig writerFileConfig = this.compactStrategy == null ? this.outputFileConfig : OutputFileConfig.builder().withPartPrefix("." + this.outputFileConfig.getPartPrefix()).withPartSuffix(this.outputFileConfig.getPartSuffix()).build();
            return new FileWriter<IN>(this.basePath, context.metricGroup(), this.bucketAssigner, this.bucketFactory, this.createBucketWriter(), this.rollingPolicy, writerFileConfig, context.getProcessingTimeService(), this.bucketCheckInterval);
        }

        @Override
        FileCommitter createCommitter() throws IOException {
            return new FileCommitter(this.createBucketWriter());
        }

        @Override
        boolean isCompactDisabledExplicitly() {
            return this.isCompactDisabledExplicitly;
        }

        @Override
        FileCompactStrategy getCompactStrategy() {
            return this.compactStrategy;
        }

        @Override
        FileCompactor getFileCompactor() {
            return this.fileCompactor;
        }

        @Override
        SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer() throws IOException {
            BucketWriter<IN, String> bucketWriter = this.createBucketWriter();
            return new FileWriterBucketStateSerializer((SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable>)bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), (SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable>)bucketWriter.getProperties().getPendingFileRecoverableSerializer());
        }

        @Override
        SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException {
            BucketWriter<IN, String> bucketWriter = this.createBucketWriter();
            return new FileSinkCommittableSerializer((SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable>)bucketWriter.getProperties().getPendingFileRecoverableSerializer(), (SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable>)bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
        }

        @Override
        BucketWriter<IN, String> createBucketWriter() throws IOException {
            return new BulkBucketWriter(FileSystem.get((URI)this.basePath.toUri()).createRecoverableWriter(), this.writerFactory);
        }
    }

    public static final class DefaultRowFormatBuilder<IN>
    extends RowFormatBuilder<IN, DefaultRowFormatBuilder<IN>> {
        private static final long serialVersionUID = -8503344257202146718L;

        private DefaultRowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, String> bucketAssigner) {
            super(basePath, encoder, bucketAssigner);
        }
    }

    public static class RowFormatBuilder<IN, T extends RowFormatBuilder<IN, T>>
    extends BucketsBuilder<IN, T> {
        private static final long serialVersionUID = 1L;
        private final Path basePath;
        private long bucketCheckInterval;
        private final Encoder<IN> encoder;
        private final FileWriterBucketFactory<IN> bucketFactory;
        private BucketAssigner<IN, String> bucketAssigner;
        private RollingPolicy<IN, String> rollingPolicy;
        private OutputFileConfig outputFileConfig;
        private boolean isCompactDisabledExplicitly = false;
        private FileCompactStrategy compactStrategy;
        private FileCompactor fileCompactor;

        protected RowFormatBuilder(Path basePath, Encoder<IN> encoder, BucketAssigner<IN, String> bucketAssigner) {
            this(basePath, 60000L, encoder, bucketAssigner, (RollingPolicy<IN, String>)DefaultRollingPolicy.builder().build(), new DefaultFileWriterBucketFactory(), OutputFileConfig.builder().build());
        }

        protected RowFormatBuilder(Path basePath, long bucketCheckInterval, Encoder<IN> encoder, BucketAssigner<IN, String> assigner, RollingPolicy<IN, String> policy, FileWriterBucketFactory<IN> bucketFactory, OutputFileConfig outputFileConfig) {
            this.basePath = (Path)Preconditions.checkNotNull((Object)basePath);
            this.bucketCheckInterval = bucketCheckInterval;
            this.encoder = (Encoder)Preconditions.checkNotNull(encoder);
            this.bucketAssigner = (BucketAssigner)Preconditions.checkNotNull(assigner);
            this.rollingPolicy = (RollingPolicy)Preconditions.checkNotNull(policy);
            this.bucketFactory = (FileWriterBucketFactory)Preconditions.checkNotNull(bucketFactory);
            this.outputFileConfig = (OutputFileConfig)Preconditions.checkNotNull((Object)outputFileConfig);
        }

        public T withBucketCheckInterval(long interval) {
            this.bucketCheckInterval = interval;
            return (T)((RowFormatBuilder)this.self());
        }

        public T withBucketAssigner(BucketAssigner<IN, String> assigner) {
            this.bucketAssigner = (BucketAssigner)Preconditions.checkNotNull(assigner);
            return (T)((RowFormatBuilder)this.self());
        }

        public T withRollingPolicy(RollingPolicy<IN, String> policy) {
            this.rollingPolicy = (RollingPolicy)Preconditions.checkNotNull(policy);
            return (T)((RowFormatBuilder)this.self());
        }

        public T withOutputFileConfig(OutputFileConfig outputFileConfig) {
            this.outputFileConfig = outputFileConfig;
            return (T)((RowFormatBuilder)this.self());
        }

        public T enableCompact(FileCompactStrategy strategy, FileCompactor compactor) {
            this.compactStrategy = (FileCompactStrategy)Preconditions.checkNotNull((Object)strategy);
            this.fileCompactor = (FileCompactor)Preconditions.checkNotNull((Object)compactor);
            return (T)((RowFormatBuilder)this.self());
        }

        public T disableCompact() {
            this.isCompactDisabledExplicitly = true;
            return (T)((RowFormatBuilder)this.self());
        }

        public FileSink<IN> build() {
            return new FileSink(this);
        }

        @Override
        FileWriter<IN> createWriter(Sink.InitContext context) throws IOException {
            OutputFileConfig writerFileConfig = this.compactStrategy == null ? this.outputFileConfig : OutputFileConfig.builder().withPartPrefix("." + this.outputFileConfig.getPartPrefix()).withPartSuffix(this.outputFileConfig.getPartSuffix()).build();
            return new FileWriter<IN>(this.basePath, context.metricGroup(), this.bucketAssigner, this.bucketFactory, this.createBucketWriter(), this.rollingPolicy, writerFileConfig, context.getProcessingTimeService(), this.bucketCheckInterval);
        }

        @Override
        FileCommitter createCommitter() throws IOException {
            return new FileCommitter(this.createBucketWriter());
        }

        @Override
        boolean isCompactDisabledExplicitly() {
            return this.isCompactDisabledExplicitly;
        }

        @Override
        FileCompactStrategy getCompactStrategy() {
            return this.compactStrategy;
        }

        @Override
        FileCompactor getFileCompactor() {
            return this.fileCompactor;
        }

        @Override
        SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer() throws IOException {
            BucketWriter<IN, String> bucketWriter = this.createBucketWriter();
            return new FileWriterBucketStateSerializer((SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable>)bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), (SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable>)bucketWriter.getProperties().getPendingFileRecoverableSerializer());
        }

        @Override
        SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException {
            BucketWriter<IN, String> bucketWriter = this.createBucketWriter();
            return new FileSinkCommittableSerializer((SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable>)bucketWriter.getProperties().getPendingFileRecoverableSerializer(), (SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable>)bucketWriter.getProperties().getInProgressFileRecoverableSerializer());
        }

        @Override
        BucketWriter<IN, String> createBucketWriter() throws IOException {
            return new RowWiseBucketWriter(FileSystem.get((URI)this.basePath.toUri()).createRecoverableWriter(), this.encoder);
        }
    }

    @Internal
    private static abstract class BucketsBuilder<IN, T extends BucketsBuilder<IN, T>>
    implements Serializable {
        private static final long serialVersionUID = 1L;
        protected static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60000L;

        private BucketsBuilder() {
        }

        protected T self() {
            return (T)this;
        }

        @Internal
        abstract FileWriter<IN> createWriter(Sink.InitContext var1) throws IOException;

        @Internal
        abstract FileCommitter createCommitter() throws IOException;

        @Internal
        abstract SimpleVersionedSerializer<FileWriterBucketState> getWriterStateSerializer() throws IOException;

        @Internal
        abstract SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() throws IOException;

        @Internal
        abstract boolean isCompactDisabledExplicitly();

        @Internal
        abstract FileCompactStrategy getCompactStrategy();

        @Internal
        abstract FileCompactor getFileCompactor();

        @Internal
        abstract BucketWriter<IN, String> createBucketWriter() throws IOException;
    }
}

