/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.state;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBMemoryControllerUtils;
import org.apache.flink.contrib.streaming.state.RocksDBOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.rocksdb.Cache;
import org.rocksdb.WriteBufferManager;

public class TaskManagerWideRocksDbMemorySharingITCase
extends TestLogger {
    private static final int PARALLELISM = 4;
    private static final int NUMBER_OF_JOBS = 5;
    private static final int NUMBER_OF_TASKS = 20;
    private static final MemorySize SHARED_MEMORY = MemorySize.ofMebiBytes((long)500L);
    private MiniClusterWithClientResource cluster;
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();

    @Before
    public void init() throws Exception {
        this.cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(TaskManagerWideRocksDbMemorySharingITCase.getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(20).build());
        this.cluster.before();
    }

    @After
    public void destroy() {
        this.cluster.after();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBlockCache() throws Exception {
        CopyOnWriteArrayList createdCaches = new CopyOnWriteArrayList();
        CopyOnWriteArrayList createdWriteBufferManagers = new CopyOnWriteArrayList();
        TestingRocksDBMemoryFactory memoryFactory = new TestingRocksDBMemoryFactory(this.sharedObjects.add(createdCaches), this.sharedObjects.add(createdWriteBufferManagers));
        ArrayList jobIDs = new ArrayList(5);
        try {
            for (int i = 0; i < 5; ++i) {
                jobIDs.add(this.cluster.getRestClusterClient().submitJob(this.dag(memoryFactory)).get());
            }
            for (JobID jid : jobIDs) {
                CommonTestUtils.waitForAllTaskRunning((MiniCluster)this.cluster.getMiniCluster(), (JobID)jid, (boolean)false);
            }
            Assert.assertEquals((long)1L, (long)createdCaches.size());
            Assert.assertEquals((long)1L, (long)createdWriteBufferManagers.size());
        }
        finally {
            for (JobID jobID : jobIDs) {
                try {
                    this.cluster.getRestClusterClient().cancel(jobID).get();
                }
                catch (Exception e) {
                    this.log.warn("Can not cancel job {}", (Object)jobID, (Object)e);
                }
            }
        }
    }

    private JobGraph dag(RocksDBMemoryControllerUtils.RocksDBMemoryFactory memoryFactory) {
        Configuration configuration = new Configuration();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        env.setParallelism(4);
        EmbeddedRocksDBStateBackend backend = new EmbeddedRocksDBStateBackend(true);
        backend.setRocksDBMemoryFactory(memoryFactory);
        env.setStateBackend((StateBackend)backend);
        env.enableCheckpointing(86400000L, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());
        DataStreamSource src = env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE);
        src.keyBy((KeySelector & Serializable)number -> number).map((MapFunction)new RichMapFunction<Long, Long>(){
            private ListState<byte[]> state;
            private int payloadSize;

            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                this.state = this.getRuntimeContext().getListState(new ListStateDescriptor("state", byte[].class));
                this.payloadSize = 4 + new Random().nextInt(7);
            }

            public Long map(Long value) throws Exception {
                this.state.add((Object)new byte[this.payloadSize]);
                Thread.sleep(1L);
                return value;
            }
        }).addSink((SinkFunction)new DiscardingSink());
        return env.getStreamGraph().getJobGraph();
    }

    private static Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(RocksDBOptions.FIX_PER_TM_MEMORY_SIZE, (Object)SHARED_MEMORY);
        configuration.set(RocksDBOptions.USE_MANAGED_MEMORY, (Object)false);
        return configuration;
    }

    private static class TestingRocksDBMemoryFactory
    implements RocksDBMemoryControllerUtils.RocksDBMemoryFactory {
        private final SharedReference<List<Cache>> createdCaches;
        private final SharedReference<List<WriteBufferManager>> createdWriteBufferManagers;

        private TestingRocksDBMemoryFactory(SharedReference<List<Cache>> createdCaches, SharedReference<List<WriteBufferManager>> createdWriteBufferManagers) {
            this.createdCaches = createdCaches;
            this.createdWriteBufferManagers = createdWriteBufferManagers;
        }

        public Cache createCache(long cacheCapacity, double highPriorityPoolRatio) {
            Cache cache = RocksDBMemoryControllerUtils.RocksDBMemoryFactory.DEFAULT.createCache(cacheCapacity, highPriorityPoolRatio);
            ((List)this.createdCaches.get()).add(cache);
            return cache;
        }

        public WriteBufferManager createWriteBufferManager(long writeBufferManagerCapacity, Cache cache) {
            WriteBufferManager writeBufferManager = RocksDBMemoryControllerUtils.RocksDBMemoryFactory.DEFAULT.createWriteBufferManager(writeBufferManagerCapacity, cache);
            ((List)this.createdWriteBufferManagers.get()).add(writeBufferManager);
            return writeBufferManager;
        }
    }
}

