package org.apache.flink.streaming.runtime.translators;

import java.lang.invoke.SerializedLambda;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies;
import org.apache.flink.streaming.api.connector.sink2.SupportsPostCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreCommitTopology;
import org.apache.flink.streaming.api.connector.sink2.SupportsPreWriteTopology;
import org.apache.flink.streaming.api.datastream.CustomSinkOperatorUidHashes;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.class */
public class SinkTransformationTranslator<Input, Output> implements TransformationTranslator<Output, SinkTransformation<Input, Output>> {
    private static final String COMMITTER_NAME = "Committer";
    private static final String WRITER_NAME = "Writer";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator$SinkExpander.class */
    public static class SinkExpander<T> {
        private final SinkTransformation<T, ?> transformation;
        private final Sink<T> sink;
        private final TransformationTranslator.Context context;
        private final DataStream<T> inputStream;
        private final StreamExecutionEnvironment executionEnvironment;
        private final Optional<Integer> environmentParallelism;
        private final boolean isBatchMode;
        private final boolean isCheckpointingEnabled;

        public SinkExpander(DataStream<T> dataStream, Sink<T> sink, SinkTransformation<T, ?> sinkTransformation, TransformationTranslator.Context context, boolean z) {
            this.inputStream = dataStream;
            this.executionEnvironment = dataStream.getExecutionEnvironment();
            this.environmentParallelism = this.executionEnvironment.getConfig().toConfiguration().getOptional(CoreOptions.DEFAULT_PARALLELISM);
            this.isCheckpointingEnabled = this.executionEnvironment.getCheckpointConfig().isCheckpointingEnabled();
            this.transformation = sinkTransformation;
            this.sink = sink;
            this.context = context;
            this.isBatchMode = z;
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Multi-variable type inference failed */
        public void expand() {
            int size = this.executionEnvironment.getTransformations().size();
            DataStream dataStream = this.inputStream;
            DataStream dataStream2 = dataStream;
            if (this.sink instanceof SupportsPreWriteTopology) {
                SupportsPreWriteTopology supportsPreWriteTopology = this.sink;
                Objects.requireNonNull(supportsPreWriteTopology);
                dataStream2 = (DataStream) adjustTransformations(dataStream, supportsPreWriteTopology::addPreWriteTopology, true, this.sink instanceof SupportsConcurrentExecutionAttempts);
            }
            if (this.sink instanceof SupportsPreCommitTopology) {
                Preconditions.checkArgument(this.sink instanceof SupportsCommitter, "Sink with SupportsPreCommitTopology should implement SupportsCommitter");
            }
            if (this.sink instanceof SupportsPostCommitTopology) {
                Preconditions.checkArgument(this.sink instanceof SupportsCommitter, "Sink with SupportsPostCommitTopology should implement SupportsCommitter");
            }
            if (this.sink instanceof SupportsCommitter) {
                addCommittingTopology(this.sink, dataStream2);
            } else {
                adjustTransformations(dataStream2, dataStream3 -> {
                    return dataStream3.transform(SinkTransformationTranslator.WRITER_NAME, CommittableMessageTypeInfo.noOutput(), new SinkWriterOperatorFactory(this.sink));
                }, false, this.sink instanceof SupportsConcurrentExecutionAttempts);
            }
            List<Transformation<?>> sinkTransformations = getSinkTransformations(size);
            TransformationTranslator.Context context = this.context;
            Objects.requireNonNull(context);
            sinkTransformations.forEach(context::transform);
            repeatUntilConverged(() -> {
                return (List) getSinkTransformations(size).stream().flatMap(transformation -> {
                    return this.context.transform(transformation).stream();
                }).collect(Collectors.toList());
            });
            disallowUnalignedCheckpoint(getSinkTransformations(size));
            while (this.executionEnvironment.getTransformations().size() > size) {
                this.executionEnvironment.getTransformations().remove(this.executionEnvironment.getTransformations().size() - 1);
            }
        }

        private <R> void repeatUntilConverged(Supplier<R> supplier) {
            R r = supplier.get();
            while (true) {
                R r2 = supplier.get();
                if (r.equals(r2)) {
                    return;
                } else {
                    r = r2;
                }
            }
        }

        private List<Transformation<?>> getSinkTransformations(int i) {
            return this.executionEnvironment.getTransformations().subList(i, this.executionEnvironment.getTransformations().size());
        }

        private void disallowUnalignedCheckpoint(List<Transformation<?>> list) {
            Optional<Transformation<?>> findFirst = list.stream().filter(SinkExpander::isWriter).findFirst();
            Preconditions.checkState(findFirst.isPresent(), "Writer transformation not found.");
            Transformation<?> transformation = findFirst.get();
            int indexOf = list.indexOf(transformation);
            HashSet hashSet = new HashSet(transformation.getId());
            ArrayDeque arrayDeque = new ArrayDeque(list.subList(indexOf + 1, list.size()));
            while (!arrayDeque.isEmpty()) {
                Transformation transformation2 = (Transformation) arrayDeque.poll();
                hashSet.add(Integer.valueOf(transformation2.getId()));
                for (Transformation transformation3 : transformation2.getInputs()) {
                    if (transformation3 instanceof PartitionTransformation) {
                        ((PartitionTransformation) transformation3).getPartitioner().disableUnalignedCheckpoints();
                    }
                    if (hashSet.add(Integer.valueOf(transformation3.getId()))) {
                        arrayDeque.add(transformation3);
                    }
                }
            }
        }

        private static boolean isWriter(Transformation<?> transformation) {
            if (transformation instanceof OneInputTransformation) {
                return ((OneInputTransformation) transformation).getOperatorFactory() instanceof SinkWriterOperatorFactory;
            }
            return false;
        }

        /* JADX WARN: Multi-variable type inference failed */
        private <CommT, WriteResultT> void addCommittingTopology(Sink<T> sink, DataStream<T> dataStream) {
            DataStream addWriter;
            SupportsCommitter supportsCommitter = (SupportsCommitter) sink;
            Objects.requireNonNull(supportsCommitter);
            TypeInformation<CommittableMessage<WriteResultT>> of = CommittableMessageTypeInfo.of(supportsCommitter::getCommittableSerializer);
            if (sink instanceof SupportsPreCommitTopology) {
                SupportsPreCommitTopology supportsPreCommitTopology = (SupportsPreCommitTopology) sink;
                Objects.requireNonNull(supportsPreCommitTopology);
                DataStream<CommittableMessage<WriteResultT>> addWriter2 = addWriter(sink, dataStream, CommittableMessageTypeInfo.of(supportsPreCommitTopology::getWriteResultSerializer));
                Objects.requireNonNull(supportsPreCommitTopology);
                addWriter = (DataStream) adjustTransformations(addWriter2, supportsPreCommitTopology::addPreCommitTopology, true, false);
            } else {
                addWriter = addWriter(sink, dataStream, of);
            }
            DataStream dataStream2 = (DataStream) adjustTransformations(addWriter, dataStream3 -> {
                return dataStream3.transform(SinkTransformationTranslator.COMMITTER_NAME, of, new CommitterOperatorFactory(supportsCommitter, this.isBatchMode, this.isCheckpointingEnabled));
            }, false, false);
            if (sink instanceof SupportsPostCommitTopology) {
                adjustTransformations(addFailOverRegion(dataStream2), dataStream4 -> {
                    ((SupportsPostCommitTopology) sink).addPostCommitTopology(dataStream4);
                    return null;
                }, true, false);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        private <WriteResultT> DataStream<CommittableMessage<WriteResultT>> addWriter(Sink<T> sink, DataStream<T> dataStream, TypeInformation<CommittableMessage<WriteResultT>> typeInformation) {
            return (DataStream<CommittableMessage<WriteResultT>>) addFailOverRegion((DataStream) adjustTransformations(dataStream, dataStream2 -> {
                return dataStream2.transform(SinkTransformationTranslator.WRITER_NAME, typeInformation, new SinkWriterOperatorFactory(sink));
            }, false, sink instanceof SupportsConcurrentExecutionAttempts));
        }

        private <I> DataStream<I> addFailOverRegion(DataStream<I> dataStream) {
            return new DataStream<>(this.executionEnvironment, new PartitionTransformation(dataStream.getTransformation(), new ForwardPartitioner(), StreamExchangeMode.BATCH));
        }

        private <I, R> R adjustTransformations(DataStream<I> dataStream, Function<DataStream<I>, R> function, boolean z, boolean z2) {
            this.executionEnvironment.setParallelism(-1);
            int size = this.executionEnvironment.getTransformations().size();
            R apply = function.apply(dataStream);
            List<Transformation<?>> transformations = this.executionEnvironment.getTransformations();
            List<Transformation<?>> subList = transformations.subList(size, transformations.size());
            CustomSinkOperatorUidHashes sinkOperatorsUidHashes = this.transformation.getSinkOperatorsUidHashes();
            for (Transformation<?> transformation : subList) {
                String uid = transformation.getUid();
                if (z && uid != null && !uid.isEmpty()) {
                    Preconditions.checkState((this.transformation.getUid() == null || this.transformation.getUid().isEmpty()) ? false : true, "Sink " + this.transformation.getName() + " requires to set a uid since its customized topology has set uid for some operators.");
                }
                setOperatorUidHashIfPossible(transformation, SinkTransformationTranslator.WRITER_NAME, sinkOperatorsUidHashes.getWriterUidHash());
                setOperatorUidHashIfPossible(transformation, SinkTransformationTranslator.COMMITTER_NAME, sinkOperatorsUidHashes.getCommitterUidHash());
                setOperatorUidHashIfPossible(transformation, StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME, sinkOperatorsUidHashes.getGlobalCommitterUidHash());
                concatUid(transformation, transformation.getName());
                concatProperty(transformation, (v0) -> {
                    return v0.getCoLocationGroupKey();
                }, (v0, v1) -> {
                    v0.setCoLocationGroupKey(v1);
                });
                concatProperty(transformation, (v0) -> {
                    return v0.getName();
                }, (v0, v1) -> {
                    v0.setName(v1);
                });
                concatProperty(transformation, (v0) -> {
                    return v0.getDescription();
                }, (v0, v1) -> {
                    v0.setDescription(v1);
                });
                String coLocationGroupKey = this.transformation.getCoLocationGroupKey();
                if (coLocationGroupKey != null && transformation.getCoLocationGroupKey() == null) {
                    transformation.setCoLocationGroupKey(coLocationGroupKey);
                }
                Optional slotSharingGroup = this.transformation.getSlotSharingGroup();
                if (slotSharingGroup.isPresent() && !transformation.getSlotSharingGroup().isPresent()) {
                    transformation.setSlotSharingGroup((SlotSharingGroup) slotSharingGroup.get());
                }
                if (transformation.getParallelism() == -1) {
                    transformation.setParallelism(this.transformation.getParallelism(), this.transformation.isParallelismConfigured());
                }
                if (transformation.getMaxParallelism() < 0 && this.transformation.getMaxParallelism() > 0) {
                    transformation.setMaxParallelism(this.transformation.getMaxParallelism());
                }
                if (transformation instanceof PhysicalTransformation) {
                    PhysicalTransformation physicalTransformation = (PhysicalTransformation) transformation;
                    if (this.transformation.getChainingStrategy() != null) {
                        physicalTransformation.setChainingStrategy(this.transformation.getChainingStrategy());
                    }
                    physicalTransformation.setSupportsConcurrentExecutionAttempts(z2);
                }
            }
            if (this.environmentParallelism.isPresent()) {
                this.executionEnvironment.getConfig().setParallelism(this.environmentParallelism.get().intValue());
            } else {
                this.executionEnvironment.getConfig().resetParallelism();
            }
            return apply;
        }

        private void setOperatorUidHashIfPossible(Transformation<?> transformation, String str, @Nullable String str2) {
            if (str2 == null || !transformation.getName().equals(str)) {
                return;
            }
            transformation.setUidHash(str2);
        }

        private void concatUid(Transformation<?> transformation, @Nullable String str) {
            if (str != null && this.transformation.getUid() != null) {
                if (str.equals(SinkTransformationTranslator.COMMITTER_NAME)) {
                    transformation.setUid(String.format("Sink Committer: %s", this.transformation.getUid()));
                    return;
                } else if (str.equals(SinkTransformationTranslator.WRITER_NAME)) {
                    transformation.setUid(this.transformation.getUid());
                    return;
                } else if (str.equals(StandardSinkTopologies.GLOBAL_COMMITTER_TRANSFORMATION_NAME)) {
                    transformation.setUid(String.format("Sink %s Global Committer", this.transformation.getUid()));
                    return;
                }
            }
            concatProperty(transformation, (v0) -> {
                return v0.getUid();
            }, (v0, v1) -> {
                v0.setUid(v1);
            });
        }

        private void concatProperty(Transformation<?> transformation, Function<Transformation<?>, String> function, BiConsumer<Transformation<?>, String> biConsumer) {
            if (function.apply(this.transformation) == null || function.apply(transformation) == null) {
                return;
            }
            biConsumer.accept(transformation, function.apply(this.transformation) + ": " + function.apply(transformation));
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -2090456616:
                    if (implMethodName.equals("getWriteResultSerializer")) {
                        z = true;
                        break;
                    }
                    break;
                case -731654093:
                    if (implMethodName.equals("getCommittableSerializer")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case WatermarkStatus.ACTIVE_STATUS /* 0 */:
                    if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/api/connector/sink2/SupportsCommitter") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/core/io/SimpleVersionedSerializer;")) {
                        SupportsCommitter supportsCommitter = (SupportsCommitter) serializedLambda.getCapturedArg(0);
                        return supportsCommitter::getCommittableSerializer;
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/util/function/SerializableSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/api/connector/sink2/SupportsPreCommitTopology") && serializedLambda.getImplMethodSignature().equals("()Lorg/apache/flink/core/io/SimpleVersionedSerializer;")) {
                        SupportsPreCommitTopology supportsPreCommitTopology = (SupportsPreCommitTopology) serializedLambda.getCapturedArg(0);
                        return supportsPreCommitTopology::getWriteResultSerializer;
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    @Override // org.apache.flink.streaming.api.graph.TransformationTranslator
    public Collection<Integer> translateForBatch(SinkTransformation<Input, Output> sinkTransformation, TransformationTranslator.Context context) {
        return translateInternal(sinkTransformation, context, true);
    }

    @Override // org.apache.flink.streaming.api.graph.TransformationTranslator
    public Collection<Integer> translateForStreaming(SinkTransformation<Input, Output> sinkTransformation, TransformationTranslator.Context context) {
        return translateInternal(sinkTransformation, context, false);
    }

    private Collection<Integer> translateInternal(SinkTransformation<Input, Output> sinkTransformation, TransformationTranslator.Context context, boolean z) {
        new SinkExpander(sinkTransformation.getInputStream(), sinkTransformation.getSink(), sinkTransformation, context, z).expand();
        return Collections.emptyList();
    }
}
