/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.utils;

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.util.StateConfigUtil;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class StateConfigUtilTest {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    @Test
    public void testRocksDBWithHeapTimer() throws Exception {
        File tempDir = this.tempFolder.newFolder().getAbsoluteFile();
        Configuration conf = new Configuration();
        conf.setString("state.backend", "rocksdb");
        conf.setString("state.backend.rocksdb.timer-service.factory", "HEAP");
        conf.setString("state.checkpoints.dir", "file://" + tempDir.toString());
        this.assertIsStateImmutable(false, conf);
    }

    @Test
    public void testRocksDBWithDefaultTimer() throws Exception {
        File tempDir = this.tempFolder.newFolder().getAbsoluteFile();
        Configuration conf = new Configuration();
        conf.setString("state.backend", "rocksdb");
        conf.setString("state.checkpoints.dir", "file://" + tempDir.toString());
        this.assertIsStateImmutable(true, conf);
    }

    @Test
    public void testHeapState() throws Exception {
        Configuration conf = new Configuration();
        this.assertIsStateImmutable(false, conf);
    }

    private void assertIsStateImmutable(boolean result, Configuration conf) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        env.setParallelism(1);
        env.fromElements((Object[])new String[]{"a", "b", "c"}).keyBy((KeySelector & Serializable)s -> s).transform("testing", (TypeInformation)BasicTypeInfo.BOOLEAN_TYPE_INFO, (OneInputStreamOperator)new TestingStateBackendOperator()).addSink((SinkFunction)new VerifyingSink());
        env.execute();
        Assert.assertEquals(Arrays.asList(result, result, result), (Object)VerifyingSink.RESULT);
    }

    @After
    public void before() {
        VerifyingSink.RESULT.clear();
    }

    private static final class VerifyingSink
    implements SinkFunction<Boolean> {
        private static final long serialVersionUID = 1L;
        private static final List<Boolean> RESULT = new ArrayList<Boolean>();

        private VerifyingSink() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke(Boolean value, SinkFunction.Context context) throws Exception {
            List<Boolean> list = RESULT;
            synchronized (list) {
                RESULT.add(value);
            }
        }
    }

    private static final class TestingStateBackendOperator
    extends AbstractStreamOperator<Boolean>
    implements OneInputStreamOperator<String, Boolean> {
        private static final long serialVersionUID = 1L;
        private transient Boolean result = null;

        private TestingStateBackendOperator() {
        }

        public void open() throws Exception {
            super.open();
            this.result = StateConfigUtil.isStateImmutableInStateBackend((KeyedStateBackend)this.getKeyedStateBackend());
        }

        public void processElement(StreamRecord<String> element) throws Exception {
            if (this.result != null) {
                this.output.collect((Object)new StreamRecord((Object)this.result));
            }
        }
    }
}

