package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest.class */
class CheckpointCommittableManagerImplTest {
    private static final SinkCommitterMetricGroup METRIC_GROUP = MetricsGroupTestUtils.mockCommitterMetricGroup();
    private static final int MAX_RETRIES = 1;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImplTest$NoOpCommitter.class */
    private static class NoOpCommitter implements Committer<Integer> {
        private NoOpCommitter() {
        }

        public void commit(Collection<Committer.CommitRequest<Integer>> collection) throws IOException, InterruptedException {
        }

        public void close() throws Exception {
        }
    }

    CheckpointCommittableManagerImplTest() {
    }

    @Test
    void testAddSummary() {
        CheckpointCommittableManagerImpl checkpointCommittableManagerImpl = new CheckpointCommittableManagerImpl(new HashMap(), 1, 1L, METRIC_GROUP);
        Assertions.assertThat(checkpointCommittableManagerImpl.getSubtaskCommittableManagers()).isEmpty();
        checkpointCommittableManagerImpl.addSummary(new CommittableSummary(1, 1, 1L, 1, 0));
        Assertions.assertThat(checkpointCommittableManagerImpl.getSubtaskCommittableManagers()).singleElement().returns(1, (v0) -> {
            return v0.getSubtaskId();
        }).returns(1L, (v0) -> {
            return v0.getCheckpointId();
        });
        checkpointCommittableManagerImpl.addSummary(new CommittableSummary(2, 1, 2L, 2, 1));
        Assertions.assertThat(checkpointCommittableManagerImpl.getSubtaskCommittableManagers()).hasSize(2);
    }

    @Test
    void testCommit() throws IOException, InterruptedException {
        CheckpointCommittableManagerImpl checkpointCommittableManagerImpl = new CheckpointCommittableManagerImpl(new HashMap(), 1, 1L, METRIC_GROUP);
        checkpointCommittableManagerImpl.addSummary(new CommittableSummary(1, 1, 1L, 1, 0));
        checkpointCommittableManagerImpl.addSummary(new CommittableSummary(2, 1, 1L, 2, 0));
        checkpointCommittableManagerImpl.addCommittable(new CommittableWithLineage(3, 1L, 1));
        checkpointCommittableManagerImpl.addCommittable(new CommittableWithLineage(4, 1L, 2));
        NoOpCommitter noOpCommitter = new NoOpCommitter();
        Assertions.assertThatCode(() -> {
            checkpointCommittableManagerImpl.commit(noOpCommitter, 1);
        }).hasMessageContaining("Trying to commit incomplete batch of committables");
        Assertions.assertThatCode(() -> {
            checkpointCommittableManagerImpl.commit(noOpCommitter, 1);
        }).hasMessageContaining("Trying to commit incomplete batch of committables");
        checkpointCommittableManagerImpl.addCommittable(new CommittableWithLineage(5, 1L, 2));
        Assertions.assertThatCode(() -> {
            checkpointCommittableManagerImpl.commit(noOpCommitter, 1);
        }).doesNotThrowAnyException();
        Assertions.assertThat(checkpointCommittableManagerImpl.getSuccessfulCommittables()).hasSize(3).containsExactlyInAnyOrder(new Integer[]{3, 4, 5});
    }

    @Test
    void testUpdateCommittableSummary() {
        CheckpointCommittableManagerImpl checkpointCommittableManagerImpl = new CheckpointCommittableManagerImpl(new HashMap(), 1, 1L, METRIC_GROUP);
        checkpointCommittableManagerImpl.addSummary(new CommittableSummary(1, 1, 1L, 1, 0));
        Assertions.assertThatThrownBy(() -> {
            checkpointCommittableManagerImpl.addSummary(new CommittableSummary(1, 1, 1L, 2, 0));
        }).isInstanceOf(UnsupportedOperationException.class).hasMessageContaining("FLINK-25920");
    }

    @ParameterizedTest(name = "subtaskId = {0}, numberOfSubtasks = {1}, checkpointId = {2}")
    @CsvSource({"1, 10, 100", "2, 20, 200", "3, 30, 300"})
    public void testCopy(int i, int i2, long j) {
        CheckpointCommittableManagerImpl checkpointCommittableManagerImpl = new CheckpointCommittableManagerImpl(new HashMap(), i2, j, METRIC_GROUP);
        checkpointCommittableManagerImpl.addSummary(new CommittableSummary(i, i2, j, 1, 0));
        CheckpointCommittableManagerImpl copy = checkpointCommittableManagerImpl.copy();
        Assertions.assertThat(copy.getCheckpointId()).isEqualTo(j);
        Assertions.assertThat(copy).returns(Integer.valueOf(i2), (v0) -> {
            return v0.getNumberOfSubtasks();
        }).returns(Long.valueOf(j), (v0) -> {
            return v0.getCheckpointId();
        });
    }
}
