package org.apache.flink.state.forst;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.forstdb.ReadOptions;
import org.forstdb.RocksDB;

/* loaded from: input_file:org/apache/flink/state/forst/ForStGeneralMultiGetOperation.class */
public class ForStGeneralMultiGetOperation implements ForStDBOperation {
    private final RocksDB db;
    private final List<ForStDBGetRequest<?, ?, ?, ?>> batchRequest;
    List<List<ForStDBGetRequest<?, ?, ?, ?>>> splitRequests;
    List<ForStDBGetRequest<?, ?, ?, ?>> mapCheckRequests;
    private final Executor executor;
    private final Runnable subProcessFinished;
    private final int readIoParallelism;

    ForStGeneralMultiGetOperation(RocksDB rocksDB, List<ForStDBGetRequest<?, ?, ?, ?>> list, Executor executor) {
        this(rocksDB, list, executor, 1, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ForStGeneralMultiGetOperation(RocksDB rocksDB, List<ForStDBGetRequest<?, ?, ?, ?>> list, Executor executor, int i, Runnable runnable) {
        this.db = rocksDB;
        this.batchRequest = list;
        this.executor = executor;
        this.subProcessFinished = runnable;
        this.readIoParallelism = i;
        this.splitRequests = new ArrayList();
        this.mapCheckRequests = new ArrayList();
        classifyAndSplitRequests(this.splitRequests, this.mapCheckRequests);
    }

    @Override // org.apache.flink.state.forst.ForStDBOperation
    public CompletableFuture<Void> process() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        AtomicReference<Exception> atomicReference = new AtomicReference<>();
        AtomicInteger atomicInteger = new AtomicInteger(this.batchRequest.size());
        processOneByOne(this.mapCheckRequests, atomicReference, atomicInteger, completableFuture);
        for (List<ForStDBGetRequest<?, ?, ?, ?>> list : this.splitRequests) {
            this.executor.execute(() -> {
                Runnable runnable;
                try {
                    ReadOptions readOptions = new ReadOptions();
                    readOptions.setReadaheadSize(0L);
                    ArrayList arrayList = new ArrayList(list.size());
                    ArrayList arrayList2 = new ArrayList(list.size());
                    for (int i = 0; i < list.size(); i++) {
                        ForStDBGetRequest<?, ?, ?, ?> forStDBGetRequest = (ForStDBGetRequest) list.get(i);
                        try {
                            if (atomicReference.get() == null) {
                                arrayList.add(forStDBGetRequest.buildSerializedKey());
                                arrayList2.add(forStDBGetRequest.getColumnFamilyHandle());
                            } else {
                                completeExceptionallyRequest(forStDBGetRequest, "Error already occurred in other state request of the same group, failed the state request directly", (Exception) atomicReference.get());
                            }
                        } catch (IOException e) {
                            atomicReference.set(e);
                            completeExceptionallyRequest(forStDBGetRequest, "Error when execute ForStDb serialized get key", e);
                            completableFuture.completeExceptionally(e);
                        }
                    }
                    if (atomicReference.get() != null) {
                        if (runnable != null) {
                            return;
                        } else {
                            return;
                        }
                    }
                    List list2 = null;
                    try {
                        list2 = this.db.multiGetAsList(readOptions, arrayList2, arrayList);
                    } catch (Exception e2) {
                        atomicReference.set(e2);
                        completableFuture.completeExceptionally(e2);
                        for (int i2 = 0; i2 < list.size(); i2++) {
                            completeExceptionallyRequest((ForStDBGetRequest) list.get(i2), "Error occurred when multiGet", e2);
                        }
                    }
                    if (atomicReference.get() != null) {
                        if (this.subProcessFinished != null) {
                            this.subProcessFinished.run();
                            return;
                        }
                        return;
                    }
                    for (int i3 = 0; i3 < list.size(); i3++) {
                        ForStDBGetRequest<?, ?, ?, ?> forStDBGetRequest2 = (ForStDBGetRequest) list.get(i3);
                        try {
                            if (atomicReference.get() == null) {
                                forStDBGetRequest2.completeStateFuture((byte[]) list2.get(i3));
                            } else {
                                completeExceptionallyRequest(forStDBGetRequest2, "Error already occurred in other state request of the same group, failed the state request directly", (Exception) atomicReference.get());
                            }
                        } catch (Exception e3) {
                            atomicReference.set(e3);
                            completeExceptionallyRequest(forStDBGetRequest2, "Error when complete get future.", e3);
                            completableFuture.completeExceptionally(e3);
                        }
                    }
                    if (atomicInteger.addAndGet(-list.size()) == 0 && !completableFuture.isCompletedExceptionally()) {
                        completableFuture.complete(null);
                    }
                    if (this.subProcessFinished != null) {
                        this.subProcessFinished.run();
                    }
                } finally {
                    if (this.subProcessFinished != null) {
                        this.subProcessFinished.run();
                    }
                }
            });
        }
        return completableFuture;
    }

    private void completeExceptionallyRequest(ForStDBGetRequest<?, ?, ?, ?> forStDBGetRequest, String str, Exception exc) {
        forStDBGetRequest.completeStateFutureExceptionally(str, exc);
    }

    private void classifyAndSplitRequests(List<List<ForStDBGetRequest<?, ?, ?, ?>>> list, List<ForStDBGetRequest<?, ?, ?, ?>> list2) {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.batchRequest.size(); i++) {
            ForStDBGetRequest<?, ?, ?, ?> forStDBGetRequest = this.batchRequest.get(i);
            if (forStDBGetRequest instanceof ForStDBMapCheckRequest) {
                list2.add(forStDBGetRequest);
            } else {
                arrayList.add(forStDBGetRequest);
            }
        }
        for (int i2 = 0; i2 < this.readIoParallelism; i2++) {
            int size = (arrayList.size() * i2) / this.readIoParallelism;
            int size2 = (arrayList.size() * (i2 + 1)) / this.readIoParallelism;
            if (size < size2) {
                list.add(new ArrayList());
            }
            for (int i3 = size; i3 < size2; i3++) {
                list.get(list.size() - 1).add((ForStDBGetRequest) arrayList.get(i3));
            }
        }
    }

    private void processOneByOne(List<ForStDBGetRequest<?, ?, ?, ?>> list, AtomicReference<Exception> atomicReference, AtomicInteger atomicInteger, CompletableFuture<Void> completableFuture) {
        for (int i = 0; i < list.size(); i++) {
            ForStDBGetRequest<?, ?, ?, ?> forStDBGetRequest = list.get(i);
            this.executor.execute(() -> {
                try {
                    try {
                        if (atomicReference.get() == null) {
                            forStDBGetRequest.process(this.db);
                        } else {
                            forStDBGetRequest.completeStateFutureExceptionally("Error already occurred in other state request of the same group, failed the state request directly", (Throwable) atomicReference.get());
                        }
                        if (atomicInteger.decrementAndGet() == 0 && !completableFuture.isCompletedExceptionally() && !completableFuture.isDone()) {
                            completableFuture.complete(null);
                        }
                        if (this.subProcessFinished != null) {
                            this.subProcessFinished.run();
                        }
                    } catch (Exception e) {
                        atomicReference.set(e);
                        forStDBGetRequest.completeStateFutureExceptionally("Error when execute ForStDb get operation", e);
                        completableFuture.completeExceptionally(e);
                        if (atomicInteger.decrementAndGet() == 0 && !completableFuture.isCompletedExceptionally() && !completableFuture.isDone()) {
                            completableFuture.complete(null);
                        }
                        if (this.subProcessFinished != null) {
                            this.subProcessFinished.run();
                        }
                    }
                } catch (Throwable th) {
                    if (atomicInteger.decrementAndGet() == 0 && !completableFuture.isCompletedExceptionally() && !completableFuture.isDone()) {
                        completableFuture.complete(null);
                    }
                    if (this.subProcessFinished != null) {
                        this.subProcessFinished.run();
                    }
                    throw th;
                }
            });
        }
    }

    @Override // org.apache.flink.state.forst.ForStDBOperation
    public int subProcessCount() {
        return this.mapCheckRequests.size() + this.splitRequests.size();
    }
}
