/*
 * Decompiled with CFR 0.152.
 */
package org.kie.kogito.persistence.postgresql;

import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.pgclient.PgPool;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.Tuple;
import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.kie.kogito.process.MutableProcessInstances;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstanceOptimisticLockingException;
import org.kie.kogito.process.ProcessInstanceReadMode;
import org.kie.kogito.process.impl.AbstractProcessInstance;
import org.kie.kogito.serialization.process.ProcessInstanceMarshallerService;

public class PostgresqlProcessInstances
implements MutableProcessInstances {
    private static final String VERSION = "version";
    private static final String PAYLOAD = "payload";
    private static final String IS_NULL = "is null";
    private static final String INSERT = "INSERT INTO process_instances (id, payload, process_id, process_version, version) VALUES ($1, $2, $3, $4, $5)";
    private static final String UPDATE = "UPDATE process_instances SET payload = $1 WHERE process_id = $2 and id = $3 and process_version ";
    private static final String DELETE = "DELETE FROM process_instances WHERE process_id = $1 and id = $2 and process_version ";
    private static final String FIND_BY_ID = "SELECT payload, version FROM process_instances WHERE process_id = $1 and id = $2 and process_version ";
    private static final String FIND_ALL = "SELECT payload, version FROM process_instances WHERE process_id = $1 and process_version ";
    private static final String UPDATE_WITH_LOCK = "UPDATE process_instances SET payload = $1, version = $2 WHERE process_id = $3 and id = $4 and version = $5 and process_version ";
    private final Process<?> process;
    private final PgPool client;
    private final ProcessInstanceMarshallerService marshaller;
    private final Long queryTimeoutMillis;
    private final boolean lock;

    public PostgresqlProcessInstances(Process<?> process, PgPool client, Long queryTimeoutMillis, boolean lock) {
        this.process = process;
        this.client = client;
        this.queryTimeoutMillis = queryTimeoutMillis;
        this.marshaller = ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().build();
        this.lock = lock;
    }

    public boolean exists(String id) {
        return this.findById(id).isPresent();
    }

    public void create(String id, ProcessInstance instance) {
        if (!this.isActive(instance)) {
            this.disconnect(instance);
            return;
        }
        this.insertInternal(id, this.marshaller.marshallProcessInstance(instance));
    }

    public void update(String id, ProcessInstance instance) {
        if (!this.isActive(instance)) {
            this.disconnect(instance);
            return;
        }
        try {
            if (this.lock) {
                this.updateWithLock(id, this.marshaller.marshallProcessInstance(instance), instance.version());
            } else {
                this.updateInternal(id, this.marshaller.marshallProcessInstance(instance));
            }
        }
        finally {
            this.disconnect(instance);
        }
    }

    public void remove(String id) {
        this.deleteInternal(id);
    }

    public Optional<ProcessInstance> findById(String id, ProcessInstanceReadMode mode) {
        return this.findByIdInternal(id).map(r -> this.unmarshall((Row)r, mode));
    }

    public Stream<ProcessInstance> stream(ProcessInstanceReadMode mode) {
        try {
            return this.getResultFromFuture((Future<RowSet<Row>>)this.client.preparedQuery(FIND_ALL + (this.process.version() == null ? IS_NULL : "= $2")).execute(this.tuple(this.process.id()))).map(r -> StreamSupport.stream(r.spliterator(), false)).orElse(Stream.empty()).map(row -> this.unmarshall((Row)row, mode));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw this.uncheckedException(e, "Error finding all process instances, for processId %s", this.process.id());
        }
        catch (ExecutionException | TimeoutException e) {
            throw this.uncheckedException(e, "Error finding all process instances, for processId %s", this.process.id());
        }
    }

    private ProcessInstance<?> unmarshall(Row r, ProcessInstanceReadMode mode) {
        AbstractProcessInstance instance = (AbstractProcessInstance)this.marshaller.unmarshallProcessInstance(r.getBuffer(PAYLOAD).getBytes(), this.process, mode);
        instance.setVersion(r.getLong(VERSION).longValue());
        return instance;
    }

    public boolean lock() {
        return this.lock;
    }

    private void disconnect(ProcessInstance instance) {
        ((AbstractProcessInstance)instance).internalRemoveProcessInstance(this.marshaller.createdReloadFunction(() -> this.findByIdInternal(instance.id()).map(r -> {
            ((AbstractProcessInstance)instance).setVersion(r.getLong(VERSION).longValue());
            return r.getBuffer(PAYLOAD).getBytes();
        }).orElseThrow()));
    }

    private boolean insertInternal(String id, byte[] payload) {
        try {
            Future future = this.client.preparedQuery(INSERT).execute(Tuple.of((Object)id, (Object)Buffer.buffer((byte[])payload), (Object)this.process.id(), (Object)this.process.version(), (Object)0L));
            return this.getExecutedResult((Future<RowSet<Row>>)future);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw this.uncheckedException(e, "Error inserting process instance %s", id);
        }
        catch (Exception e) {
            throw this.uncheckedException(e, "Error inserting process instance %s", id);
        }
    }

    private RuntimeException uncheckedException(Exception ex, String message, Object ... param) {
        return new RuntimeException(String.format(message, param), ex);
    }

    private boolean updateInternal(String id, byte[] payload) {
        try {
            Future future = this.client.preparedQuery(UPDATE + (this.process.version() == null ? IS_NULL : "= $4")).execute(this.tuple(Buffer.buffer((byte[])payload), this.process.id(), id));
            return this.getExecutedResult((Future<RowSet<Row>>)future);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw this.uncheckedException(e, "Error updating process instance %s", id);
        }
        catch (Exception e) {
            throw this.uncheckedException(e, "Error updating process instance %s", id);
        }
    }

    private boolean deleteInternal(String id) {
        try {
            Future future = this.client.preparedQuery(DELETE + (this.process.version() == null ? IS_NULL : "= $3")).execute(this.tuple(this.process.id(), id));
            return this.getExecutedResult((Future<RowSet<Row>>)future);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw this.uncheckedException(e, "Error deleting process instance %s", id);
        }
        catch (Exception e) {
            throw this.uncheckedException(e, "Error deleting process instance %s", id);
        }
    }

    private Boolean getExecutedResult(Future<RowSet<Row>> future) throws ExecutionException, TimeoutException, InterruptedException {
        try {
            return this.getResultFromFuture(future).map(SqlResult::rowCount).map(count -> count == 1).orElse(false);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw e;
        }
    }

    private Optional<RowSet<Row>> getResultFromFuture(Future<RowSet<Row>> future) throws ExecutionException, TimeoutException, InterruptedException {
        try {
            return Optional.ofNullable((RowSet)future.toCompletionStage().toCompletableFuture().get(this.queryTimeoutMillis, TimeUnit.MILLISECONDS));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw e;
        }
    }

    private Optional<Row> findByIdInternal(String id) {
        try {
            Future future = this.client.preparedQuery(FIND_BY_ID + (this.process.version() == null ? IS_NULL : "= $3")).execute(this.tuple(this.process.id(), id));
            return this.getResultFromFuture((Future<RowSet<Row>>)future).map(RowSet::iterator).filter(Iterator::hasNext).map(Iterator::next);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw this.uncheckedException(e, "Error finding process instance %s", id);
        }
        catch (Exception e) {
            throw this.uncheckedException(e, "Error finding process instance %s", id);
        }
    }

    private Tuple tuple(Object ... parameters) {
        Tuple tuple = Tuple.from((Object[])parameters);
        if (this.process.version() != null) {
            tuple.addValue((Object)this.process.version());
        }
        return tuple;
    }

    private boolean updateWithLock(String id, byte[] payload, long version) {
        try {
            Future future = this.client.preparedQuery(UPDATE_WITH_LOCK + (this.process.version() == null ? IS_NULL : "= $6")).execute(this.tuple(Buffer.buffer((byte[])payload), version + 1L, this.process.id(), id, version));
            boolean result = this.getExecutedResult((Future<RowSet<Row>>)future);
            if (!result) {
                throw new ProcessInstanceOptimisticLockingException(id);
            }
            return result;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (ProcessInstanceOptimisticLockingException e) {
            throw e;
        }
        catch (Exception e) {
            throw this.uncheckedException(e, "Error updating process instance %s", id);
        }
        return false;
    }
}

