package org.apache.flink.streaming.runtime.operators.sink;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/BatchGlobalCommitterOperatorTest.class */
public class BatchGlobalCommitterOperatorTest extends TestLogger {
    @Test(expected = IllegalStateException.class)
    public void throwExceptionWithoutCommitter() throws Exception {
        createTestHarness(null).initializeEmptyState();
    }

    @Test(expected = UnsupportedOperationException.class)
    public void doNotSupportRetry() throws Exception {
        OneInputStreamOperatorTestHarness<String, String> createTestHarness = createTestHarness(new TestSink.AlwaysRetryGlobalCommitter());
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.processElement(new StreamRecord<>("hotel"));
        createTestHarness.endInput();
        createTestHarness.close();
    }

    @Test
    public void endOfInput() throws Exception {
        TestSink.DefaultGlobalCommitter defaultGlobalCommitter = new TestSink.DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<String, String> createTestHarness = createTestHarness(defaultGlobalCommitter);
        List<String> asList = Arrays.asList("compete", "swear", "shallow");
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.processElements((Collection) asList.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).collect(Collectors.toList()));
        createTestHarness.endInput();
        MatcherAssert.assertThat(defaultGlobalCommitter.getCommittedData(), Matchers.containsInAnyOrder(Arrays.asList(defaultGlobalCommitter.combine(asList), TestSink.END_OF_INPUT_STR).toArray()));
        createTestHarness.close();
    }

    @Test
    public void close() throws Exception {
        TestSink.DefaultGlobalCommitter defaultGlobalCommitter = new TestSink.DefaultGlobalCommitter();
        OneInputStreamOperatorTestHarness<String, String> createTestHarness = createTestHarness(defaultGlobalCommitter);
        createTestHarness.initializeEmptyState();
        createTestHarness.open();
        createTestHarness.close();
        MatcherAssert.assertThat(Boolean.valueOf(defaultGlobalCommitter.isClosed()), Matchers.is(true));
    }

    private OneInputStreamOperatorTestHarness<String, String> createTestHarness(GlobalCommitter<String, String> globalCommitter) throws Exception {
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperatorFactory) new BatchGlobalCommitterOperatorFactory(TestSink.newBuilder().setGlobalCommitter(globalCommitter).setGlobalCommittableSerializer(TestSink.StringCommittableSerializer.INSTANCE).build()), (TypeSerializer) StringSerializer.INSTANCE);
    }
}
