/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.sequencedmultisetstate;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.LongStream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.SerializerFactory;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.ProjectedRowData;
import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.HashFunction;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.sequencedmultisetstate.AdaptiveSequencedMultiSetState;
import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState;
import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateContext;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.runtime.util.StreamRecordUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.apache.flink.util.function.TriFunctionWithException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class SequencedMultiSetStateTest {
    @Parameter(value=0)
    private SequencedMultiSetState.Strategy strategy;
    @Parameter(value=1)
    private long adaptiveLowThresholdOverride;
    @Parameter(value=2)
    private long adaptiveHighThresholdOverride;
    private static final LogicalType VARCHAR = DataTypes.VARCHAR((int)50).getLogicalType();
    public static final int KEY_POS = 0;

    @Parameters(name="strategy={0}, lowThreshold={1}, highThreshold={2}")
    public static Object[][] parameters() {
        return new Object[][]{{SequencedMultiSetState.Strategy.VALUE_STATE, -1, -1}, {SequencedMultiSetState.Strategy.MAP_STATE, -1, -1}, {SequencedMultiSetState.Strategy.ADAPTIVE, 0, 1}, {SequencedMultiSetState.Strategy.ADAPTIVE, 1, 2}, {SequencedMultiSetState.Strategy.ADAPTIVE, 0, 10}, {SequencedMultiSetState.Strategy.ADAPTIVE, 9, 10}};
    }

    @TestTemplate
    public void testBasicFlow() throws Exception {
        this.runTest((BiConsumerWithException<SequencedMultiSetState<RowData>, InternalKeyContext<String>, Exception>)((BiConsumerWithException)(state, keyContext) -> {
            keyContext.setCurrentKey((Object)"sk1");
            Assertions.assertTrue((boolean)state.isEmpty());
            state.add((Object)StreamRecordUtils.row("key", "value"), 1L);
            Assertions.assertFalse((boolean)state.isEmpty());
            keyContext.setCurrentKey((Object)"sk2");
            Assertions.assertTrue((boolean)state.isEmpty());
            keyContext.setCurrentKey((Object)"sk1");
            state.clear();
            SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, new Tuple2[0]);
        }));
    }

    @TestTemplate
    public void testAppend() throws Exception {
        this.runTest((ThrowingConsumer<SequencedMultiSetState<RowData>, Exception>)((ThrowingConsumer)state -> {
            state.append((Object)StreamRecordUtils.row("k1", "x"), 777L);
            SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, Tuple2.of((Object)StreamRecordUtils.row("k1", "x"), (Object)777L));
            state.append((Object)StreamRecordUtils.row("k1", "x"), 778L);
            SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, Tuple2.of((Object)StreamRecordUtils.row("k1", "x"), (Object)777L), Tuple2.of((Object)StreamRecordUtils.row("k1", "x"), (Object)778L));
            state.append((Object)StreamRecordUtils.row("k2", "y"), 779L);
            SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, Tuple2.of((Object)StreamRecordUtils.row("k1", "x"), (Object)777L), Tuple2.of((Object)StreamRecordUtils.row("k1", "x"), (Object)778L), Tuple2.of((Object)StreamRecordUtils.row("k2", "y"), (Object)779L));
            state.append((Object)StreamRecordUtils.row("k1", "x"), 777L);
            SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, Tuple2.of((Object)StreamRecordUtils.row("k1", "x"), (Object)777L), Tuple2.of((Object)StreamRecordUtils.row("k1", "x"), (Object)778L), Tuple2.of((Object)StreamRecordUtils.row("k2", "y"), (Object)779L), Tuple2.of((Object)StreamRecordUtils.row("k1", "x"), (Object)777L));
        }));
    }

    @TestTemplate
    public void testAdd() throws Exception {
        this.runTest((ThrowingConsumer<SequencedMultiSetState<RowData>, Exception>)((ThrowingConsumer)state -> {
            state.add((Object)StreamRecordUtils.row("k1", "x"), 777L);
            SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("k1", "x"), 777L);
            state.add((Object)StreamRecordUtils.row("k2", "x"), 777L);
            SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, Tuple2.of((Object)StreamRecordUtils.row("k1", "x"), (Object)777L), Tuple2.of((Object)StreamRecordUtils.row("k2", "x"), (Object)777L));
            state.add((Object)StreamRecordUtils.row("k2", "y"), 778L);
            SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, Tuple2.of((Object)StreamRecordUtils.row("k1", "x"), (Object)777L), Tuple2.of((Object)StreamRecordUtils.row("k2", "y"), (Object)778L));
            state.add((Object)StreamRecordUtils.row("k1", "y"), 778L);
            SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, Tuple2.of((Object)StreamRecordUtils.row("k1", "y"), (Object)778L), Tuple2.of((Object)StreamRecordUtils.row("k2", "y"), (Object)778L));
        }));
    }

    @TestTemplate
    public void testRemove() throws Exception {
        this.runTest((ThrowingConsumer<SequencedMultiSetState<RowData>, Exception>)((ThrowingConsumer)state -> {
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key1"), SequencedMultiSetState.StateChangeType.REMOVAL_NOT_FOUND, new RowData[0]);
            state.add((Object)StreamRecordUtils.row("key1", "value"), 777L);
            state.add((Object)StreamRecordUtils.row("key2", "value"), 777L);
            state.add((Object)StreamRecordUtils.row("key3", "value"), 777L);
            state.add((Object)StreamRecordUtils.row("key4", "value"), 777L);
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key999"), SequencedMultiSetState.StateChangeType.REMOVAL_NOT_FOUND, new RowData[0]);
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key4"), SequencedMultiSetState.StateChangeType.REMOVAL_LAST_ADDED, StreamRecordUtils.row("key3", "value"));
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key3"), SequencedMultiSetState.StateChangeType.REMOVAL_LAST_ADDED, StreamRecordUtils.row("key2", "value"));
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key1"), SequencedMultiSetState.StateChangeType.REMOVAL_OTHER, new RowData[0]);
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key2", "value-to-return"), SequencedMultiSetState.StateChangeType.REMOVAL_ALL, StreamRecordUtils.row("key2", "value-to-return"));
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key1"), SequencedMultiSetState.StateChangeType.REMOVAL_NOT_FOUND, new RowData[0]);
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key2"), SequencedMultiSetState.StateChangeType.REMOVAL_NOT_FOUND, new RowData[0]);
        }));
    }

    @TestTemplate
    public void testAddAfterRemovingTail() throws Exception {
        this.runTest((ThrowingConsumer<SequencedMultiSetState<RowData>, Exception>)((ThrowingConsumer)state -> {
            state.add((Object)StreamRecordUtils.row("key1", "value-1"), 777L);
            state.add((Object)StreamRecordUtils.row("key2", "value-2"), 777L);
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key1"), SequencedMultiSetState.StateChangeType.REMOVAL_OTHER, StreamRecordUtils.row("key1", "value-1"));
            state.add((Object)StreamRecordUtils.row("key1", "value-1"), 777L);
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key2"), SequencedMultiSetState.StateChangeType.REMOVAL_OTHER, StreamRecordUtils.row("key2", "value-2"));
            state.add((Object)StreamRecordUtils.row("key2", "value-2"), 777L);
        }));
    }

    @TestTemplate
    public void testRemoveFirstAppended() throws Exception {
        this.runTest((ThrowingConsumer<SequencedMultiSetState<RowData>, Exception>)((ThrowingConsumer)state -> {
            state.append((Object)StreamRecordUtils.row("key", "value-1"), 777L);
            state.append((Object)StreamRecordUtils.row("key", "value-2"), 778L);
            state.append((Object)StreamRecordUtils.row("key", "value-3"), 779L);
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key"), SequencedMultiSetState.StateChangeType.REMOVAL_OTHER, new RowData[0]);
            SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, Tuple2.of((Object)StreamRecordUtils.row("key", "value-2"), (Object)778L), Tuple2.of((Object)StreamRecordUtils.row("key", "value-3"), (Object)779L));
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key"), SequencedMultiSetState.StateChangeType.REMOVAL_OTHER, new RowData[0]);
            SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, Tuple2.of((Object)StreamRecordUtils.row("key", "value-3"), (Object)779L));
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key"), SequencedMultiSetState.StateChangeType.REMOVAL_ALL, StreamRecordUtils.row("key"));
            Assertions.assertTrue((boolean)state.isEmpty());
        }));
    }

    @TestTemplate
    public void testRemoveWithInterleavingRowAppended() throws Exception {
        this.runTest((ThrowingConsumer<SequencedMultiSetState<RowData>, Exception>)((ThrowingConsumer)state -> {
            state.append((Object)StreamRecordUtils.row("key1", "value"), 777L);
            state.append((Object)StreamRecordUtils.row("key2", "value"), 777L);
            state.append((Object)StreamRecordUtils.row("key2", "value"), 778L);
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key2"), SequencedMultiSetState.StateChangeType.REMOVAL_OTHER, StreamRecordUtils.row("key2", "value"));
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key2"), SequencedMultiSetState.StateChangeType.REMOVAL_LAST_ADDED, StreamRecordUtils.row("key1", "value"));
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key1", "value-to-return"), SequencedMultiSetState.StateChangeType.REMOVAL_ALL, StreamRecordUtils.row("key1", "value-to-return"));
        }));
    }

    @TestTemplate
    public void testCaching() throws Exception {
        this.runTest((BiConsumerWithException<SequencedMultiSetState<RowData>, InternalKeyContext<String>, Exception>)((BiConsumerWithException)(state, ctx) -> {
            ctx.setCurrentKey((Object)"sk1");
            state.add((Object)StreamRecordUtils.row("key", "value-1"), 777L);
            state.clearCache();
            Assertions.assertFalse((boolean)state.isEmpty());
            ctx.setCurrentKey((Object)"sk2");
            state.loadCache();
            Assertions.assertTrue((boolean)state.isEmpty());
        }));
    }

    @TestTemplate
    public void testKeyExtraction() throws Exception {
        Function<RowData, RowData> keyExtractor = row -> ProjectedRowData.from((int[])new int[]{1}).replaceRow(row);
        this.runTest((BiConsumerWithException<SequencedMultiSetState<RowData>, InternalKeyContext<String>, Exception>)((BiConsumerWithException)(state, ctx) -> {
            ctx.setCurrentKey((Object)"sk1");
            state.add((Object)StreamRecordUtils.row("value-123", "key"), 777L);
            Assertions.assertFalse((boolean)state.isEmpty());
            SequencedMultiSetState.StateChangeInfo ret = state.remove((Object)StreamRecordUtils.row("value-456", "key"));
            Tuple3.of((Object)ret.getSizeAfter(), (Object)ret.getChangeType(), (Object)ret.getPayload());
            Assertions.assertTrue((boolean)state.isEmpty());
        }), keyExtractor, 0);
    }

    @TestTemplate
    public void testRowKindNormalization() throws Exception {
        this.runTest((ThrowingConsumer<SequencedMultiSetState<RowData>, Exception>)((ThrowingConsumer)state -> {
            for (RowKind firstKind : RowKind.values()) {
                for (RowKind secondKind : RowKind.values()) {
                    state.append((Object)StreamRecordUtils.rowOfKind(firstKind, "key", "value"), 778L);
                    state.remove((Object)StreamRecordUtils.rowOfKind(secondKind, "key", "value"));
                    Assertions.assertTrue((boolean)state.isEmpty());
                    state.add((Object)StreamRecordUtils.rowOfKind(firstKind, "key", "value"), 777L);
                    state.remove((Object)StreamRecordUtils.rowOfKind(secondKind, "key", "value"));
                    Assertions.assertTrue((boolean)state.isEmpty());
                    state.add((Object)StreamRecordUtils.rowOfKind(firstKind, "key", "value"), 777L);
                    state.add((Object)StreamRecordUtils.rowOfKind(secondKind, "key", "value"), 778L);
                    SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, Tuple2.of((Object)StreamRecordUtils.row("key", "value"), (Object)778L));
                    state.clear();
                }
            }
        }));
    }

    @TestTemplate
    public void testAdaptivity() throws Exception {
        Assumptions.assumeTrue((this.strategy == SequencedMultiSetState.Strategy.ADAPTIVE ? 1 : 0) != 0);
        long totalSize = this.adaptiveHighThresholdOverride * 2L;
        this.runTest((BiConsumerWithException<SequencedMultiSetState<RowData>, InternalKeyContext<String>, Exception>)((BiConsumerWithException)(state, ctx) -> {
            AdaptiveSequencedMultiSetState ad = (AdaptiveSequencedMultiSetState)state;
            int runningSize = 0;
            ctx.setCurrentKey((Object)"k1");
            Assertions.assertFalse((boolean)ad.isIsUsingLargeState(), (String)"should start with value state");
            while ((long)runningSize < totalSize) {
                Assertions.assertEquals((Object)((long)runningSize >= this.adaptiveHighThresholdOverride ? 1 : 0), (Object)ad.isIsUsingLargeState(), (String)"should switch after reaching high threshold");
                ad.append(StreamRecordUtils.row("key", "value"), (long)runningSize);
                ++runningSize;
            }
            ctx.setCurrentKey((Object)"k2");
            Assertions.assertFalse((boolean)ad.isIsUsingLargeState(), (String)"should not mix different context keys");
            ctx.setCurrentKey((Object)"k1");
            Assertions.assertTrue((boolean)ad.isIsUsingLargeState(), (String)"should not mix different context keys");
            while ((long)runningSize > this.adaptiveLowThresholdOverride + 1L) {
                ad.remove(StreamRecordUtils.row("key"));
                Assertions.assertTrue((boolean)ad.isIsUsingLargeState(), (String)"should switch back after reaching low threshold");
                --runningSize;
            }
            ad.remove(StreamRecordUtils.row("key"));
            Assertions.assertFalse((boolean)ad.isIsUsingLargeState(), (String)"should switch back after reaching low threshold");
            SequencedMultiSetStateTest.assertStateContents((SequencedMultiSetState<RowData>)state, (Tuple2[])LongStream.range(totalSize - (long)(--runningSize), totalSize).mapToObj(ts -> Tuple2.of((Object)StreamRecordUtils.row("key", "value"), (Object)ts)).toArray(Tuple2[]::new));
            while (runningSize > 0) {
                Assertions.assertFalse((boolean)ad.isIsUsingLargeState(), (String)"should switch back after reaching low threshold");
                ad.remove(StreamRecordUtils.row("key"));
                --runningSize;
            }
            Assertions.assertTrue((boolean)ad.isEmpty());
            Assertions.assertEquals((int)0, (int)runningSize);
            while ((long)runningSize < totalSize) {
                Assertions.assertEquals((Object)((long)runningSize >= this.adaptiveHighThresholdOverride ? 1 : 0), (Object)ad.isIsUsingLargeState(), (String)"should switch after reaching high threshold");
                ad.add(StreamRecordUtils.row(Integer.toString(runningSize), "value"), 777L);
                ++runningSize;
            }
            Assertions.assertTrue((boolean)ad.isIsUsingLargeState());
            state.clear();
            Assertions.assertFalse((boolean)ad.isIsUsingLargeState(), (String)"should switch to value state after clear");
        }));
    }

    @TestTemplate
    public void testAddReturnValues() throws Exception {
        this.testReturnValues((TriFunctionWithException<SequencedMultiSetState<RowData>, RowData, Long, SequencedMultiSetState.StateChangeInfo<RowData>, Exception>)((TriFunctionWithException)SequencedMultiSetState::add));
    }

    @TestTemplate
    public void testAppendReturnValues() throws Exception {
        this.testReturnValues((TriFunctionWithException<SequencedMultiSetState<RowData>, RowData, Long, SequencedMultiSetState.StateChangeInfo<RowData>, Exception>)((TriFunctionWithException)SequencedMultiSetState::append));
    }

    private void testReturnValues(TriFunctionWithException<SequencedMultiSetState<RowData>, RowData, Long, SequencedMultiSetState.StateChangeInfo<RowData>, Exception> updateFn) throws Exception {
        this.runTest((ThrowingConsumer<SequencedMultiSetState<RowData>, Exception>)((ThrowingConsumer)state -> {
            SequencedMultiSetState.StateChangeInfo ret = (SequencedMultiSetState.StateChangeInfo)updateFn.apply(state, (Object)StreamRecordUtils.row("key-1", "value"), (Object)777L);
            Assertions.assertEquals((Object)SequencedMultiSetState.StateChangeType.ADDITION, (Object)ret.getChangeType());
            Assertions.assertEquals((long)1L, (long)ret.getSizeAfter());
            Assertions.assertTrue((boolean)ret.wasEmpty());
            ret = (SequencedMultiSetState.StateChangeInfo)updateFn.apply(state, (Object)StreamRecordUtils.row("key-2", "value"), (Object)777L);
            Assertions.assertEquals((Object)SequencedMultiSetState.StateChangeType.ADDITION, (Object)ret.getChangeType());
            Assertions.assertEquals((long)2L, (long)ret.getSizeAfter());
            Assertions.assertFalse((boolean)ret.wasEmpty());
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key-1"), SequencedMultiSetState.StateChangeType.REMOVAL_OTHER, new RowData[0]);
            SequencedMultiSetStateTest.removeAndAssert((SequencedMultiSetState<RowData>)state, StreamRecordUtils.row("key-2"), SequencedMultiSetState.StateChangeType.REMOVAL_ALL, StreamRecordUtils.row("key-2"));
            ret = (SequencedMultiSetState.StateChangeInfo)updateFn.apply(state, (Object)StreamRecordUtils.row("key-3", "value"), (Object)777L);
            Assertions.assertEquals((Object)SequencedMultiSetState.StateChangeType.ADDITION, (Object)ret.getChangeType());
            Assertions.assertEquals((long)1L, (long)ret.getSizeAfter());
            Assertions.assertTrue((boolean)ret.wasEmpty());
        }));
    }

    private void runTest(ThrowingConsumer<SequencedMultiSetState<RowData>, Exception> test) throws Exception {
        this.runTest((BiConsumerWithException<SequencedMultiSetState<RowData>, InternalKeyContext<String>, Exception>)((BiConsumerWithException)(state, keyContext) -> {
            keyContext.setCurrentKey((Object)"key1");
            test.accept(state);
        }));
    }

    private void runTest(BiConsumerWithException<SequencedMultiSetState<RowData>, InternalKeyContext<String>, Exception> test) throws Exception {
        this.runTest(test, Function.identity(), 0);
    }

    private void runTest(BiConsumerWithException<SequencedMultiSetState<RowData>, InternalKeyContext<String>, Exception> test, Function<RowData, RowData> keyExtractor, int keyPos) throws Exception {
        SequencedMultiSetStateContext p = new SequencedMultiSetStateContext((TypeSerializer)new RowDataSerializer(new LogicalType[]{VARCHAR}), (GeneratedRecordEqualiser)new MyGeneratedEqualiser(keyPos), (GeneratedHashFunction)new MyGeneratedHashFunction(keyPos), (TypeSerializer)new RowDataSerializer(new LogicalType[]{VARCHAR, VARCHAR}), keyExtractor, new SequencedMultiSetStateConfig(this.strategy, Long.valueOf(this.adaptiveHighThresholdOverride), Long.valueOf(this.adaptiveLowThresholdOverride), StateTtlConfig.DISABLED, TimeDomain.EVENT_TIME));
        MockEnvironment env = new MockEnvironmentBuilder().build();
        AbstractKeyedStateBackend stateBackend = SequencedMultiSetStateTest.getKeyedStateBackend(env, StringSerializer.INSTANCE);
        StreamingRuntimeContext ctx = new StreamingRuntimeContext((Environment)env, Collections.emptyMap(), UnregisteredMetricsGroup.createOperatorMetricGroup(), new OperatorID(), (ProcessingTimeService)new TestProcessingTimeService(), SequencedMultiSetStateTest.getKeyedStateStore(stateBackend), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES);
        test.accept((Object)SequencedMultiSetState.create((SequencedMultiSetStateContext)p, (RuntimeContext)ctx, (String)"hashmap"), stateBackend);
    }

    private static KeyedStateStore getKeyedStateStore(KeyedStateBackend<String> stateBackend) {
        return new DefaultKeyedStateStore(stateBackend, new SerializerFactory(){

            public <T> TypeSerializer<T> createSerializer(TypeInformation<T> ti) {
                return ti.createSerializer((SerializerConfig)new SerializerConfigImpl());
            }
        });
    }

    private static <T> AbstractKeyedStateBackend<T> getKeyedStateBackend(MockEnvironment env, TypeSerializer<T> keySerializer) throws IOException {
        String op = "test-operator";
        JobID jobId = new JobID();
        JobVertexID jobVertexId = new JobVertexID();
        KeyGroupRange emptyKeyGroupRange = KeyGroupRange.of((int)0, (int)10);
        int numberOfKeyGroups = emptyKeyGroupRange.getNumberOfKeyGroups();
        return new HashMapStateBackend().createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl((Environment)env, jobId, op, keySerializer, numberOfKeyGroups, emptyKeyGroupRange, new KvStateRegistry().createTaskRegistry(jobId, jobVertexId), TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), Collections.emptyList(), new CloseableRegistry()));
    }

    private static void assertStateContents(SequencedMultiSetState<RowData> state, RowData rowData, Long timestamp) throws Exception {
        SequencedMultiSetStateTest.assertStateContents(state, Tuple2.of((Object)rowData, (Object)timestamp));
    }

    @SafeVarargs
    private static void assertStateContents(SequencedMultiSetState<RowData> state, Tuple2<RowData, Long> ... expectedArr) throws Exception {
        ArrayList actual = new ArrayList();
        state.iterator().forEachRemaining(actual::add);
        Assertions.assertEquals((Object)(expectedArr.length == 0 ? 1 : 0), (Object)state.isEmpty());
        Assertions.assertEquals((int)expectedArr.length, (int)actual.size());
        Assertions.assertArrayEquals((Object[])expectedArr, (Object[])actual.toArray());
    }

    private static void removeAndAssert(SequencedMultiSetState<RowData> state, RowData key, SequencedMultiSetState.StateChangeType expectedResultType, RowData ... expectedReturnedRow) throws Exception {
        SequencedMultiSetState.StateChangeInfo ret = state.remove((Object)key);
        Assertions.assertEquals((Object)expectedResultType, (Object)ret.getChangeType());
        switch (ret.getChangeType()) {
            case REMOVAL_NOT_FOUND: {
                Assertions.assertEquals(Optional.empty(), (Object)ret.getPayload());
                break;
            }
            case REMOVAL_ALL: {
                Assertions.assertEquals((long)0L, (long)ret.getSizeAfter());
                Assertions.assertTrue((boolean)state.isEmpty(), (String)"state is expected to be empty");
                Assertions.assertEquals(Optional.of(expectedReturnedRow[0]), (Object)ret.getPayload());
                break;
            }
            case REMOVAL_OTHER: {
                Assertions.assertFalse((boolean)state.isEmpty(), (String)"state is expected to be non-empty");
                Assertions.assertEquals(Optional.empty(), (Object)ret.getPayload());
                break;
            }
            case REMOVAL_LAST_ADDED: {
                Assertions.assertFalse((boolean)state.isEmpty(), (String)"state is expected to be non-empty");
                Assertions.assertEquals(Optional.of(expectedReturnedRow[0]), (Object)ret.getPayload());
            }
        }
    }

    private static class TestRecordEqualiser
    implements RecordEqualiser,
    HashFunction {
        private final int keyPos;

        private TestRecordEqualiser(int keyPos) {
            this.keyPos = keyPos;
        }

        public boolean equals(RowData row1, RowData row2) {
            return row1.getRowKind() == row2.getRowKind() && row1.getString(this.keyPos).equals(row2.getString(this.keyPos));
        }

        public int hashCode(Object data) {
            RowData rd = (RowData)data;
            return Objects.hash(rd.getRowKind(), rd.getString(this.keyPos));
        }
    }

    private static class MyGeneratedHashFunction
    extends GeneratedHashFunction {
        private final int keyPos;

        public MyGeneratedHashFunction(int keyPos) {
            super("", "", new Object[0], (ReadableConfig)new Configuration());
            this.keyPos = keyPos;
        }

        public HashFunction newInstance(ClassLoader classLoader) {
            return new TestRecordEqualiser(this.keyPos);
        }
    }

    private static class MyGeneratedEqualiser
    extends GeneratedRecordEqualiser {
        private final int keyPos;

        public MyGeneratedEqualiser(int keyPos) {
            super("", "", new Object[0]);
            this.keyPos = keyPos;
        }

        public RecordEqualiser newInstance(ClassLoader classLoader) {
            return new TestRecordEqualiser(this.keyPos);
        }
    }
}

