package org.apache.flink.streaming.runtime.operators;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.TestLogger;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.class */
public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink<IN>> extends TestLogger {
    private final int maxParallelism = 10;

    protected abstract S createSink() throws Exception;

    protected abstract TypeInformation<IN> createTypeInfo();

    protected abstract IN generateValue(int i, int i2);

    protected abstract void verifyResultsIdealCircumstances(S s) throws Exception;

    protected abstract void verifyResultsDataPersistenceUponMissedNotify(S s) throws Exception;

    protected abstract void verifyResultsDataDiscardingUponRestore(S s) throws Exception;

    protected abstract void verifyResultsWhenReScaling(S s, int i, int i2) throws Exception;

    @Test
    public void testIdealCircumstances() throws Exception {
        S createSink = createSink();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(createSink);
        oneInputStreamOperatorTestHarness.open();
        int i = 1;
        for (int i2 = 0; i2 < 20; i2++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(generateValue(i, 0)));
            i++;
        }
        int i3 = 0 + 1;
        oneInputStreamOperatorTestHarness.snapshot(0, 0L);
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint(i3 - 1);
        for (int i4 = 0; i4 < 20; i4++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(generateValue(i, 1)));
            i++;
        }
        int i5 = i3 + 1;
        oneInputStreamOperatorTestHarness.snapshot(i3, 0L);
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint(i5 - 1);
        for (int i6 = 0; i6 < 20; i6++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(generateValue(i, 2)));
            i++;
        }
        oneInputStreamOperatorTestHarness.snapshot(i5, 0L);
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint((i5 + 1) - 1);
        verifyResultsIdealCircumstances(createSink);
    }

    @Test
    public void testDataPersistenceUponMissedNotify() throws Exception {
        S createSink = createSink();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(createSink);
        oneInputStreamOperatorTestHarness.open();
        int i = 1;
        for (int i2 = 0; i2 < 20; i2++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(generateValue(i, 0)));
            i++;
        }
        int i3 = 0 + 1;
        oneInputStreamOperatorTestHarness.snapshot(0, 0L);
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint(i3 - 1);
        for (int i4 = 0; i4 < 20; i4++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(generateValue(i, 1)));
            i++;
        }
        int i5 = i3 + 1;
        oneInputStreamOperatorTestHarness.snapshot(i3, 0L);
        for (int i6 = 0; i6 < 20; i6++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(generateValue(i, 2)));
            i++;
        }
        oneInputStreamOperatorTestHarness.snapshot(i5, 0L);
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint((i5 + 1) - 1);
        verifyResultsDataPersistenceUponMissedNotify(createSink);
    }

    @Test
    public void testDataDiscardingUponRestore() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(createSink());
        oneInputStreamOperatorTestHarness.open();
        int i = 1;
        for (int i2 = 0; i2 < 20; i2++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(generateValue(i, 0)));
            i++;
        }
        int i3 = 0 + 1;
        OperatorSubtaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(0, 0L);
        oneInputStreamOperatorTestHarness.notifyOfCompletedCheckpoint(i3 - 1);
        for (int i4 = 0; i4 < 20; i4++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(generateValue(i, 1)));
            i++;
        }
        oneInputStreamOperatorTestHarness.close();
        S createSink = createSink();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(createSink);
        oneInputStreamOperatorTestHarness2.setup();
        oneInputStreamOperatorTestHarness2.initializeState(snapshot);
        oneInputStreamOperatorTestHarness2.open();
        for (int i5 = 0; i5 < 20; i5++) {
            oneInputStreamOperatorTestHarness2.processElement(new StreamRecord<>(generateValue(i, 2)));
            i++;
        }
        oneInputStreamOperatorTestHarness2.snapshot(i3, 0L);
        oneInputStreamOperatorTestHarness2.notifyOfCompletedCheckpoint((i3 + 1) - 1);
        verifyResultsDataDiscardingUponRestore(createSink);
    }

    @Test
    public void testScalingDown() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) createSink(), 10, 2, 0);
        oneInputStreamOperatorTestHarness.open();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) createSink(), 10, 2, 1);
        oneInputStreamOperatorTestHarness2.open();
        int i = 1;
        for (int i2 = 0; i2 < 10; i2++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(generateValue(i, 0)));
            i++;
        }
        for (int i3 = 0; i3 < 11; i3++) {
            oneInputStreamOperatorTestHarness2.processElement(new StreamRecord<>(generateValue(i, 0)));
            i++;
        }
        OperatorSubtaskState repackageState = AbstractStreamOperatorTestHarness.repackageState(oneInputStreamOperatorTestHarness.snapshot(0, 0L), oneInputStreamOperatorTestHarness2.snapshot(0, 0L));
        oneInputStreamOperatorTestHarness.close();
        oneInputStreamOperatorTestHarness2.close();
        OperatorSubtaskState repartitionOperatorState = AbstractStreamOperatorTestHarness.repartitionOperatorState(repackageState, 10, 2, 1, 0);
        S createSink = createSink();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness3 = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) createSink, 10, 1, 0);
        oneInputStreamOperatorTestHarness3.setup();
        oneInputStreamOperatorTestHarness3.initializeState(repartitionOperatorState);
        oneInputStreamOperatorTestHarness3.open();
        for (int i4 = 0; i4 < 12; i4++) {
            oneInputStreamOperatorTestHarness3.processElement(new StreamRecord<>(generateValue(i, 0)));
            i++;
        }
        int i5 = 0 + 1;
        oneInputStreamOperatorTestHarness3.snapshot(i5, 1L);
        oneInputStreamOperatorTestHarness3.notifyOfCompletedCheckpoint(i5);
        verifyResultsWhenReScaling(createSink, 1, 33);
        oneInputStreamOperatorTestHarness3.close();
    }

    @Test
    public void testScalingUp() throws Exception {
        S createSink = createSink();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) createSink, 10, 1, 0);
        int i = 1;
        oneInputStreamOperatorTestHarness.open();
        for (int i2 = 0; i2 < 10; i2++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(generateValue(i, 0)));
            i++;
        }
        int i3 = 0 + 1;
        oneInputStreamOperatorTestHarness.snapshot(i3, 0L);
        for (int i4 = 0; i4 < 11; i4++) {
            oneInputStreamOperatorTestHarness.processElement(new StreamRecord<>(generateValue(i, 0)));
            i++;
        }
        int i5 = i3 + 1;
        OperatorSubtaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(i5, 0L);
        oneInputStreamOperatorTestHarness.close();
        verifyResultsWhenReScaling(createSink, 0, -1);
        OperatorSubtaskState repartitionOperatorState = AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 10, 1, 2, 0);
        OperatorSubtaskState repartitionOperatorState2 = AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 10, 1, 2, 1);
        int i6 = i5 + 1;
        S createSink2 = createSink();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) createSink2, 10, 2, 0);
        oneInputStreamOperatorTestHarness2.setup();
        oneInputStreamOperatorTestHarness2.initializeState(repartitionOperatorState);
        oneInputStreamOperatorTestHarness2.open();
        oneInputStreamOperatorTestHarness2.notifyOfCompletedCheckpoint(i6);
        verifyResultsWhenReScaling(createSink2, 1, 10);
        S createSink3 = createSink();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness3 = new OneInputStreamOperatorTestHarness((OneInputStreamOperator) createSink3, 10, 2, 1);
        oneInputStreamOperatorTestHarness3.setup();
        oneInputStreamOperatorTestHarness3.initializeState(repartitionOperatorState2);
        oneInputStreamOperatorTestHarness3.open();
        for (int i7 = 0; i7 < 10; i7++) {
            oneInputStreamOperatorTestHarness3.processElement(new StreamRecord<>(generateValue(i, 0)));
            i++;
        }
        oneInputStreamOperatorTestHarness3.snapshot(i6, 1L);
        oneInputStreamOperatorTestHarness3.notifyOfCompletedCheckpoint(i6);
        verifyResultsWhenReScaling(createSink3, 11, 31);
        oneInputStreamOperatorTestHarness2.close();
        oneInputStreamOperatorTestHarness3.close();
    }
}
