/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManagerImpl;
import org.apache.flink.streaming.runtime.operators.sink.committables.SubtaskCommittableManager;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

class CheckpointCommittableManagerImplTest {
    private static final SinkCommitterMetricGroup METRIC_GROUP = MetricsGroupTestUtils.mockCommitterMetricGroup();
    private static final int MAX_RETRIES = 1;

    CheckpointCommittableManagerImplTest() {
    }

    @Test
    void testAddSummary() {
        CheckpointCommittableManagerImpl checkpointCommittables = new CheckpointCommittableManagerImpl(new HashMap(), 1, 1L, METRIC_GROUP);
        Assertions.assertThat((Collection)checkpointCommittables.getSubtaskCommittableManagers()).isEmpty();
        CommittableSummary first = new CommittableSummary(1, 1, 1L, 1, 0);
        checkpointCommittables.addSummary(first);
        ((ObjectAssert)((ObjectAssert)Assertions.assertThat((Collection)checkpointCommittables.getSubtaskCommittableManagers()).singleElement()).returns((Object)1, SubtaskCommittableManager::getSubtaskId)).returns((Object)1L, SubtaskCommittableManager::getCheckpointId);
        CommittableSummary third = new CommittableSummary(2, 1, 2L, 2, 1);
        checkpointCommittables.addSummary(third);
        Assertions.assertThat((Collection)checkpointCommittables.getSubtaskCommittableManagers()).hasSize(2);
    }

    @Test
    void testCommit() throws IOException, InterruptedException {
        CheckpointCommittableManagerImpl checkpointCommittables = new CheckpointCommittableManagerImpl(new HashMap(), 1, 1L, METRIC_GROUP);
        checkpointCommittables.addSummary(new CommittableSummary(1, 1, 1L, 1, 0));
        checkpointCommittables.addSummary(new CommittableSummary(2, 1, 1L, 2, 0));
        checkpointCommittables.addCommittable(new CommittableWithLineage((Object)3, 1L, 1));
        checkpointCommittables.addCommittable(new CommittableWithLineage((Object)4, 1L, 2));
        NoOpCommitter committer = new NoOpCommitter();
        Assertions.assertThatCode(() -> checkpointCommittables.commit(committer, 1)).hasMessageContaining("Trying to commit incomplete batch of committables");
        Assertions.assertThatCode(() -> checkpointCommittables.commit(committer, 1)).hasMessageContaining("Trying to commit incomplete batch of committables");
        checkpointCommittables.addCommittable(new CommittableWithLineage((Object)5, 1L, 2));
        Assertions.assertThatCode(() -> checkpointCommittables.commit(committer, 1)).doesNotThrowAnyException();
        ((AbstractCollectionAssert)Assertions.assertThat((Collection)checkpointCommittables.getSuccessfulCommittables()).hasSize(3)).containsExactlyInAnyOrder((Object[])new Integer[]{3, 4, 5});
    }

    @Test
    void testUpdateCommittableSummary() {
        CheckpointCommittableManagerImpl checkpointCommittables = new CheckpointCommittableManagerImpl(new HashMap(), 1, 1L, METRIC_GROUP);
        checkpointCommittables.addSummary(new CommittableSummary(1, 1, 1L, 1, 0));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> checkpointCommittables.addSummary(new CommittableSummary(1, 1, 1L, 2, 0))).isInstanceOf(UnsupportedOperationException.class)).hasMessageContaining("FLINK-25920");
    }

    @ParameterizedTest(name="subtaskId = {0}, numberOfSubtasks = {1}, checkpointId = {2}")
    @CsvSource(value={"1, 10, 100", "2, 20, 200", "3, 30, 300"})
    public void testCopy(int subtaskId, int numberOfSubtasks, long checkpointId) {
        CheckpointCommittableManagerImpl original = new CheckpointCommittableManagerImpl(new HashMap(), numberOfSubtasks, checkpointId, METRIC_GROUP);
        original.addSummary(new CommittableSummary(subtaskId, numberOfSubtasks, checkpointId, 1, 0));
        CheckpointCommittableManagerImpl copy = original.copy();
        Assertions.assertThat((long)copy.getCheckpointId()).isEqualTo(checkpointId);
        ((ObjectAssert)Assertions.assertThat((Object)copy).returns((Object)numberOfSubtasks, CheckpointCommittableManagerImpl::getNumberOfSubtasks)).returns((Object)checkpointId, CheckpointCommittableManagerImpl::getCheckpointId);
    }

    private static class NoOpCommitter
    implements Committer<Integer> {
        private NoOpCommitter() {
        }

        public void commit(Collection<Committer.CommitRequest<Integer>> committables) throws IOException, InterruptedException {
        }

        public void close() throws Exception {
        }
    }
}

