/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import org.apache.flink.state.forst.ForStDBOperation;
import org.apache.flink.state.forst.ForStDBPutRequest;
import org.apache.flink.state.forst.ForStDBWriteBatchWrapper;
import org.forstdb.RocksDB;
import org.forstdb.WriteOptions;

public class ForStWriteBatchOperation
implements ForStDBOperation {
    private final RocksDB db;
    private final List<ForStDBPutRequest<?, ?, ?>> batchRequest;
    private final WriteOptions writeOptions;
    private final Executor executor;
    private final Runnable subProcessFinished;

    ForStWriteBatchOperation(RocksDB db, List<ForStDBPutRequest<?, ?, ?>> batchRequest, WriteOptions writeOptions, Executor executor) {
        this(db, batchRequest, writeOptions, executor, null);
    }

    ForStWriteBatchOperation(RocksDB db, List<ForStDBPutRequest<?, ?, ?>> batchRequest, WriteOptions writeOptions, Executor executor, Runnable subProcessFinished) {
        this.db = db;
        this.batchRequest = batchRequest;
        this.writeOptions = writeOptions;
        this.executor = executor;
        this.subProcessFinished = subProcessFinished;
    }

    @Override
    public CompletableFuture<Void> process() {
        return CompletableFuture.runAsync(() -> {
            try (ForStDBWriteBatchWrapper writeBatch = new ForStDBWriteBatchWrapper(this.db, this.writeOptions, this.batchRequest.size());){
                for (ForStDBPutRequest<?, ?, ?> request : this.batchRequest) {
                    request.process(writeBatch, this.db);
                }
                writeBatch.flush();
                for (ForStDBPutRequest<?, ?, ?> request : this.batchRequest) {
                    request.completeStateFuture();
                }
            }
            catch (Exception e) {
                String msg = "Error while write batch data to ForStDB.";
                for (ForStDBPutRequest<?, ?, ?> request : this.batchRequest) {
                    request.completeStateFutureExceptionally(msg, e);
                }
                throw new CompletionException(msg, e);
            }
            finally {
                if (this.subProcessFinished != null) {
                    this.subProcessFinished.run();
                }
            }
        }, this.executor);
    }

    @Override
    public int subProcessCount() {
        return this.batchRequest.size();
    }
}

