/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.state.forst.ForStDBGetRequest;
import org.apache.flink.state.forst.ForStDBMapCheckRequest;
import org.apache.flink.state.forst.ForStDBOperation;
import org.forstdb.ColumnFamilyHandle;
import org.forstdb.ReadOptions;
import org.forstdb.RocksDB;

public class ForStGeneralMultiGetOperation
implements ForStDBOperation {
    private final RocksDB db;
    private final List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest;
    List<List<ForStDBGetRequest<?, ?, ?, ?>>> splitRequests;
    List<ForStDBGetRequest<?, ?, ?, ?>> mapCheckRequests;
    private final Executor executor;
    private final Runnable subProcessFinished;
    private final int readIoParallelism;

    ForStGeneralMultiGetOperation(RocksDB db, List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest, Executor executor) {
        this(db, batchRequest, executor, 1, null);
    }

    ForStGeneralMultiGetOperation(RocksDB db, List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest, Executor executor, int readIoParallelism, Runnable subProcessFinished) {
        this.db = db;
        this.batchRequest = batchRequest;
        this.executor = executor;
        this.subProcessFinished = subProcessFinished;
        this.readIoParallelism = readIoParallelism;
        this.splitRequests = new ArrayList();
        this.mapCheckRequests = new ArrayList();
        this.classifyAndSplitRequests(this.splitRequests, this.mapCheckRequests);
    }

    @Override
    public CompletableFuture<Void> process() {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        AtomicReference<Exception> error = new AtomicReference<Exception>();
        AtomicInteger counter = new AtomicInteger(this.batchRequest.size());
        this.processOneByOne(this.mapCheckRequests, error, counter, future);
        for (List<ForStDBGetRequest<?, ?, ?, ?>> getRequests : this.splitRequests) {
            this.executor.execute(() -> {
                try {
                    ReadOptions readOptions = new ReadOptions();
                    readOptions.setReadaheadSize(0L);
                    ArrayList<byte[]> keys = new ArrayList<byte[]>(getRequests.size());
                    ArrayList<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<ColumnFamilyHandle>(getRequests.size());
                    for (int i = 0; i < getRequests.size(); ++i) {
                        ForStDBGetRequest request = (ForStDBGetRequest)getRequests.get(i);
                        try {
                            if (error.get() == null) {
                                byte[] key = request.buildSerializedKey();
                                keys.add(key);
                                columnFamilyHandles.add(request.getColumnFamilyHandle());
                                continue;
                            }
                            this.completeExceptionallyRequest(request, "Error already occurred in other state request of the same group, failed the state request directly", (Exception)error.get());
                            continue;
                        }
                        catch (IOException e) {
                            error.set(e);
                            this.completeExceptionallyRequest(request, "Error when execute ForStDb serialized get key", e);
                            future.completeExceptionally(e);
                        }
                    }
                    if (error.get() != null) {
                        return;
                    }
                    List values = null;
                    try {
                        values = this.db.multiGetAsList(readOptions, columnFamilyHandles, keys);
                    }
                    catch (Exception e) {
                        error.set(e);
                        future.completeExceptionally(e);
                        for (int i = 0; i < getRequests.size(); ++i) {
                            this.completeExceptionallyRequest((ForStDBGetRequest)getRequests.get(i), "Error occurred when multiGet", e);
                        }
                    }
                    if (error.get() != null) {
                        return;
                    }
                    for (int i = 0; i < getRequests.size(); ++i) {
                        ForStDBGetRequest request = (ForStDBGetRequest)getRequests.get(i);
                        try {
                            if (error.get() == null) {
                                request.completeStateFuture((byte[])values.get(i));
                                continue;
                            }
                            this.completeExceptionallyRequest(request, "Error already occurred in other state request of the same group, failed the state request directly", (Exception)error.get());
                            continue;
                        }
                        catch (Exception e) {
                            error.set(e);
                            this.completeExceptionallyRequest(request, "Error when complete get future.", e);
                            future.completeExceptionally(e);
                        }
                    }
                    if (counter.addAndGet(-getRequests.size()) == 0 && !future.isCompletedExceptionally()) {
                        future.complete(null);
                    }
                }
                finally {
                    if (this.subProcessFinished != null) {
                        this.subProcessFinished.run();
                    }
                }
            });
        }
        return future;
    }

    private void completeExceptionallyRequest(ForStDBGetRequest<?, ?, ?, ?> request, String message, Exception e) {
        request.completeStateFutureExceptionally(message, e);
    }

    private void classifyAndSplitRequests(List<List<ForStDBGetRequest<?, ?, ?, ?>>> splitRequests, List<ForStDBGetRequest<?, ?, ?, ?>> mapCheckRequests) {
        ArrayList getRequests = new ArrayList();
        for (int i = 0; i < this.batchRequest.size(); ++i) {
            ForStDBGetRequest<?, ?, ?, ?> request = this.batchRequest.get(i);
            if (request instanceof ForStDBMapCheckRequest) {
                mapCheckRequests.add(request);
                continue;
            }
            getRequests.add(request);
        }
        for (int p = 0; p < this.readIoParallelism; ++p) {
            int endIndex;
            int startIndex = getRequests.size() * p / this.readIoParallelism;
            if (startIndex < (endIndex = getRequests.size() * (p + 1) / this.readIoParallelism)) {
                splitRequests.add(new ArrayList());
            }
            for (int i = startIndex; i < endIndex; ++i) {
                splitRequests.get(splitRequests.size() - 1).add((ForStDBGetRequest)getRequests.get(i));
            }
        }
    }

    private void processOneByOne(List<ForStDBGetRequest<?, ?, ?, ?>> requests, AtomicReference<Exception> error, AtomicInteger counter, CompletableFuture<Void> future) {
        for (int i = 0; i < requests.size(); ++i) {
            ForStDBGetRequest<?, ?, ?, ?> request = requests.get(i);
            this.executor.execute(() -> {
                try {
                    if (error.get() == null) {
                        request.process(this.db);
                    } else {
                        request.completeStateFutureExceptionally("Error already occurred in other state request of the same group, failed the state request directly", (Throwable)error.get());
                    }
                }
                catch (Exception e) {
                    error.set(e);
                    request.completeStateFutureExceptionally("Error when execute ForStDb get operation", e);
                    future.completeExceptionally(e);
                }
                finally {
                    if (counter.decrementAndGet() == 0 && !future.isCompletedExceptionally() && !future.isDone()) {
                        future.complete(null);
                    }
                    if (this.subProcessFinished != null) {
                        this.subProcessFinished.run();
                    }
                }
            });
        }
    }

    @Override
    public int subProcessCount() {
        return this.mapCheckRequests.size() + this.splitRequests.size();
    }
}

