package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.configuration.SinkOptions;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkCommitterMetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableFunction;
import org.apache.flink.util.function.SerializableSupplier;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/GlobalCommitterOperator.class */
public class GlobalCommitterOperator<CommT, GlobalCommT> extends AbstractStreamOperator<Void> implements OneInputStreamOperator<CommittableMessage<CommT>, Void> {
    private static final ListStateDescriptor<byte[]> GLOBAL_COMMITTER_OPERATOR_RAW_STATES_DESC = new ListStateDescriptor<>("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
    private final SerializableFunction<CommitterInitContext, Committer<CommT>> committerFactory;
    private final SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializerFactory;
    private final boolean commitOnInput;
    private ListState<GlobalCommittableWrapper<CommT, GlobalCommT>> globalCommitterState;
    private Committer<CommT> committer;
    private CommittableCollector<CommT> committableCollector;
    private SimpleVersionedSerializer<CommT> committableSerializer;
    private SinkCommitterMetricGroup metricGroup;
    private int maxRetries;

    @Nullable
    private SimpleVersionedSerializer<GlobalCommT> globalCommittableSerializer;
    private long lastCompletedCheckpointId = -1;
    private List<GlobalCommT> sinkV1State = new ArrayList();

    public GlobalCommitterOperator(SerializableFunction<CommitterInitContext, Committer<CommT>> serializableFunction, SerializableSupplier<SimpleVersionedSerializer<CommT>> serializableSupplier, boolean z) {
        this.committerFactory = (SerializableFunction) Preconditions.checkNotNull(serializableFunction);
        this.committableSerializerFactory = (SerializableSupplier) Preconditions.checkNotNull(serializableSupplier);
        this.commitOnInput = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void setup(StreamTask<?, ?> streamTask, StreamConfig streamConfig, Output<StreamRecord<Void>> output) {
        super.setup(streamTask, streamConfig, output);
        this.metricGroup = InternalSinkCommitterMetricGroup.wrap(this.metrics);
        this.committableCollector = CommittableCollector.of(this.metricGroup);
        this.committableSerializer = (SimpleVersionedSerializer) this.committableSerializerFactory.get();
        this.maxRetries = ((Integer) streamConfig.getConfiguration().get(SinkOptions.COMMITTER_RETRIES)).intValue();
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.globalCommitterState.update(Collections.singletonList(new GlobalCommittableWrapper(this.committableCollector.copy(), new ArrayList(this.sinkV1State))));
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.CheckpointedStreamOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        OptionalLong restoredCheckpointId = stateInitializationContext.getRestoredCheckpointId();
        this.committer = (Committer) this.committerFactory.apply(new CommitterInitContextImpl(getRuntimeContext(), this.metricGroup, restoredCheckpointId));
        this.globalCommitterState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(GLOBAL_COMMITTER_OPERATOR_RAW_STATES_DESC), getCommitterStateSerializer());
        if (stateInitializationContext.isRestored()) {
            ((Iterable) this.globalCommitterState.get()).forEach(globalCommittableWrapper -> {
                this.sinkV1State.addAll(globalCommittableWrapper.getGlobalCommittables());
                this.committableCollector.merge(globalCommittableWrapper.getCommittableCollector());
            });
            if (restoredCheckpointId.isPresent()) {
                commit(restoredCheckpointId.getAsLong());
            }
        }
    }

    private SimpleVersionedSerializer<GlobalCommittableWrapper<CommT, GlobalCommT>> getCommitterStateSerializer() {
        return new GlobalCommitterSerializer(new CommittableCollectorSerializer(this.committableSerializer, getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), getRuntimeContext().getTaskInfo().getMaxNumberOfParallelSubtasks(), this.metricGroup), this.globalCommittableSerializer, this.metricGroup);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator
    public void notifyCheckpointComplete(long j) throws Exception {
        super.notifyCheckpointComplete(j);
        if (this.commitOnInput) {
            return;
        }
        commit(j);
    }

    private void commit(long j) throws IOException, InterruptedException {
        this.lastCompletedCheckpointId = Math.max(this.lastCompletedCheckpointId, j);
        for (CheckpointCommittableManager<CommT> checkpointCommittableManager : this.committableCollector.getCheckpointCommittablesUpTo(this.lastCompletedCheckpointId)) {
            if (!checkpointCommittableManager.hasGloballyReceivedAll()) {
                return;
            }
            checkpointCommittableManager.commit(this.committer, this.maxRetries);
            this.committableCollector.remove(checkpointCommittableManager);
        }
    }

    @Override // org.apache.flink.streaming.api.operators.Input
    public void processElement(StreamRecord<CommittableMessage<CommT>> streamRecord) throws Exception {
        this.committableCollector.addMessage(streamRecord.getValue());
        if (this.commitOnInput) {
            commit(streamRecord.getValue().getCheckpointIdOrEOI());
        }
    }
}
