package org.apache.flink.state.forst;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestContainer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.concurrent.FutureUtils;
import org.forstdb.RocksDB;
import org.forstdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/forst/ForStStateExecutor.class */
public class ForStStateExecutor implements StateExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ForStStateExecutor.class);
    private final ExecutorService coordinatorThread;
    private final ExecutorService readThreads;
    private final ExecutorService writeThreads;
    private final int readThreadCount;
    private final boolean sharedWriteThread;
    private final RocksDB db;
    private final WriteOptions writeOptions;
    private Throwable executionError;
    private final AtomicLong ongoing;
    private final ExecutorService directExecutor = Executors.newDirectExecutorService();

    public ForStStateExecutor(boolean z, boolean z2, int i, int i2, RocksDB rocksDB, WriteOptions writeOptions) {
        if (z2) {
            Preconditions.checkState(i > 0);
            this.coordinatorThread = z ? this.directExecutor : java.util.concurrent.Executors.newSingleThreadExecutor(new ExecutorThreadFactory("ForSt-StateExecutor-Coordinator-And-Write"));
            this.readThreadCount = i;
            this.readThreads = java.util.concurrent.Executors.newFixedThreadPool(i, new ExecutorThreadFactory("ForSt-StateExecutor-read-IO"));
            this.writeThreads = this.directExecutor;
            this.sharedWriteThread = true;
        } else {
            Preconditions.checkState(i > 0 || i2 > 0);
            this.coordinatorThread = z ? this.directExecutor : java.util.concurrent.Executors.newSingleThreadExecutor(new ExecutorThreadFactory("ForSt-StateExecutor-Coordinator"));
            if (i <= 0 || i2 <= 0) {
                this.readThreadCount = Math.max(i, i2);
                this.readThreads = java.util.concurrent.Executors.newFixedThreadPool(this.readThreadCount, new ExecutorThreadFactory("ForSt-StateExecutor-IO"));
                this.writeThreads = this.readThreads;
                this.sharedWriteThread = true;
            } else {
                this.readThreadCount = i;
                this.readThreads = java.util.concurrent.Executors.newFixedThreadPool(i, new ExecutorThreadFactory("ForSt-StateExecutor-read-IO"));
                this.writeThreads = java.util.concurrent.Executors.newFixedThreadPool(i2, new ExecutorThreadFactory("ForSt-StateExecutor-write-IO"));
                this.sharedWriteThread = false;
            }
        }
        this.db = rocksDB;
        this.writeOptions = writeOptions;
        this.ongoing = new AtomicLong();
    }

    public CompletableFuture<Void> executeBatchRequests(StateRequestContainer stateRequestContainer) {
        checkState();
        Preconditions.checkArgument(stateRequestContainer instanceof ForStStateRequestClassifier);
        ForStStateRequestClassifier forStStateRequestClassifier = (ForStStateRequestClassifier) stateRequestContainer;
        List<ForStDBGetRequest<?, ?, ?, ?>> pollDbGetRequests = forStStateRequestClassifier.pollDbGetRequests();
        List<ForStDBIterRequest<?, ?, ?, ?, ?>> pollDbIterRequests = forStStateRequestClassifier.pollDbIterRequests();
        if (!pollDbGetRequests.isEmpty()) {
            this.ongoing.addAndGet(1L);
        }
        if (!pollDbIterRequests.isEmpty()) {
            this.ongoing.addAndGet(1L);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.coordinatorThread.execute(() -> {
            long currentTimeMillis = System.currentTimeMillis();
            ArrayList arrayList = new ArrayList(3);
            if (!pollDbGetRequests.isEmpty()) {
                RocksDB rocksDB = this.db;
                ExecutorService executorService = this.readThreads;
                int i = this.readThreadCount;
                AtomicLong atomicLong = this.ongoing;
                Objects.requireNonNull(atomicLong);
                ForStGeneralMultiGetOperation forStGeneralMultiGetOperation = new ForStGeneralMultiGetOperation(rocksDB, pollDbGetRequests, executorService, i, atomicLong::decrementAndGet);
                this.ongoing.addAndGet(forStGeneralMultiGetOperation.subProcessCount() - 1);
                arrayList.add(forStGeneralMultiGetOperation.process());
            }
            if (!pollDbIterRequests.isEmpty()) {
                RocksDB rocksDB2 = this.db;
                ExecutorService executorService2 = this.readThreads;
                AtomicLong atomicLong2 = this.ongoing;
                Objects.requireNonNull(atomicLong2);
                ForStIterateOperation forStIterateOperation = new ForStIterateOperation(rocksDB2, pollDbIterRequests, executorService2, atomicLong2::decrementAndGet);
                this.ongoing.addAndGet(forStIterateOperation.subProcessCount() - 1);
                arrayList.add(forStIterateOperation.process());
            }
            List<ForStDBPutRequest<?, ?, ?>> pollDbPutRequests = forStStateRequestClassifier.pollDbPutRequests();
            if (!pollDbPutRequests.isEmpty()) {
                arrayList.add(new ForStWriteBatchOperation(this.db, pollDbPutRequests, this.writeOptions, this.writeThreads).process());
            }
            FutureUtils.combineAll(arrayList).thenAcceptAsync(collection -> {
                LOG.debug("Complete executing a batch of state requests, putRequest size {}, getRequest size {}, iterRequest size {}, duration {} ms", new Object[]{Integer.valueOf(pollDbPutRequests.size()), Integer.valueOf(pollDbGetRequests.size()), Integer.valueOf(pollDbIterRequests.size()), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
                completableFuture.complete(null);
            }, this.coordinatorThread).exceptionally(th -> {
                try {
                    Iterator it = pollDbIterRequests.iterator();
                    while (it.hasNext()) {
                        ((ForStDBIterRequest) it.next()).close();
                    }
                } catch (IOException e) {
                    LOG.error("Close iterRequests fail", e);
                }
                this.executionError = th;
                completableFuture.completeExceptionally(th);
                return null;
            });
        });
        return completableFuture;
    }

    public StateRequestContainer createStateRequestContainer() {
        checkState();
        return new ForStStateRequestClassifier();
    }

    public void executeRequestSync(StateRequest<?, ?, ?, ?> stateRequest) {
        ForStDBOperation forStWriteBatchOperation;
        checkState();
        Object convertRequests = ForStStateRequestClassifier.convertRequests(stateRequest);
        try {
            if (convertRequests instanceof ForStDBGetRequest) {
                forStWriteBatchOperation = new ForStGeneralMultiGetOperation(this.db, Collections.singletonList((ForStDBGetRequest) convertRequests), this.directExecutor, 1, null);
            } else if (convertRequests instanceof ForStDBIterRequest) {
                forStWriteBatchOperation = new ForStIterateOperation(this.db, Collections.singletonList((ForStDBIterRequest) convertRequests), this.directExecutor, null);
            } else {
                if (!(convertRequests instanceof ForStDBPutRequest)) {
                    throw new IllegalArgumentException("Unknown request type: " + convertRequests);
                }
                forStWriteBatchOperation = new ForStWriteBatchOperation(this.db, Collections.singletonList((ForStDBPutRequest) convertRequests), this.writeOptions, this.directExecutor);
            }
            forStWriteBatchOperation.process().exceptionally(th -> {
                this.executionError = th;
                return null;
            });
        } catch (Exception e) {
            this.executionError = e;
        }
        checkState();
    }

    public boolean fullyLoaded() {
        return this.ongoing.get() >= ((long) this.readThreadCount);
    }

    private void checkState() {
        if (this.executionError != null) {
            throw new IllegalStateException("previous state request already failed : ", this.executionError);
        }
    }

    public void shutdown() {
        shutdownAndWait(this.coordinatorThread);
        shutdownAndWait(this.readThreads);
        if (!this.sharedWriteThread) {
            shutdownAndWait(this.writeThreads);
        }
        LOG.info("Shutting down the ForStStateExecutor.");
    }

    private void shutdownAndWait(ExecutorService executorService) {
        try {
            executorService.shutdown();
            do {
            } while (!executorService.awaitTermination(60L, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            executorService.shutdownNow();
        }
    }
}
