package org.kie.kogito.jobs.service.repository.postgresql;

import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.pgclient.PgPool;
import io.vertx.mutiny.sqlclient.Row;
import io.vertx.mutiny.sqlclient.SqlClient;
import io.vertx.mutiny.sqlclient.Tuple;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.io.Serializable;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.kie.kogito.jobs.service.model.JobServiceManagementInfo;
import org.kie.kogito.jobs.service.repository.JobServiceManagementRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:org/kie/kogito/jobs/service/repository/postgresql/PostgreSqlJobServiceManagementRepository.class */
public class PostgreSqlJobServiceManagementRepository implements JobServiceManagementRepository {
    private static final Logger LOGGER = LoggerFactory.getLogger(PostgreSqlJobServiceManagementRepository.class);
    private PgPool client;

    @Inject
    public PostgreSqlJobServiceManagementRepository(PgPool pgPool) {
        this.client = pgPool;
    }

    public Uni<JobServiceManagementInfo> getAndUpdate(String str, Function<JobServiceManagementInfo, JobServiceManagementInfo> function) {
        LOGGER.info("get {}", str);
        return this.client.withTransaction(sqlConnection -> {
            return sqlConnection.preparedQuery("SELECT id, token, last_heartbeat FROM job_service_management WHERE id = $1 FOR UPDATE ").execute(Tuple.of(str)).onItem().transform((v0) -> {
                return v0.iterator();
            }).onItem().transform(rowIterator -> {
                if (rowIterator.hasNext()) {
                    return from((Row) rowIterator.next());
                }
                return null;
            }).onItem().invoke(jobServiceManagementInfo -> {
                LOGGER.trace("got {}", jobServiceManagementInfo);
            }).onItem().transformToUni(jobServiceManagementInfo2 -> {
                return update(sqlConnection, (JobServiceManagementInfo) function.apply(jobServiceManagementInfo2));
            });
        });
    }

    JobServiceManagementInfo from(Row row) {
        return new JobServiceManagementInfo(row.getString("id"), row.getString("token"), row.getOffsetDateTime("last_heartbeat"));
    }

    public Uni<JobServiceManagementInfo> set(JobServiceManagementInfo jobServiceManagementInfo) {
        LOGGER.info("set {}", jobServiceManagementInfo);
        return update(this.client, jobServiceManagementInfo);
    }

    private Uni<JobServiceManagementInfo> update(SqlClient sqlClient, JobServiceManagementInfo jobServiceManagementInfo) {
        return Objects.isNull(jobServiceManagementInfo) ? Uni.createFrom().nullItem() : sqlClient.preparedQuery("INSERT INTO job_service_management (id, token, last_heartbeat) VALUES ($1, $2, $3) ON CONFLICT (id) DO UPDATE SET token = $2, last_heartbeat = $3 RETURNING id, token, last_heartbeat").execute(Tuple.tuple((List) Stream.of((Object[]) new Serializable[]{jobServiceManagementInfo.getId(), jobServiceManagementInfo.getToken(), jobServiceManagementInfo.getLastHeartbeat()}).collect(Collectors.toList()))).onItem().transform((v0) -> {
            return v0.iterator();
        }).onItem().transform(rowIterator -> {
            if (rowIterator.hasNext()) {
                return from((Row) rowIterator.next());
            }
            return null;
        });
    }

    public Uni<JobServiceManagementInfo> heartbeat(JobServiceManagementInfo jobServiceManagementInfo) {
        return this.client.withTransaction(sqlConnection -> {
            return sqlConnection.preparedQuery("UPDATE job_service_management SET last_heartbeat = now() WHERE id = $1 AND token = $2 RETURNING id, token, last_heartbeat").execute(Tuple.of(jobServiceManagementInfo.getId(), jobServiceManagementInfo.getToken())).onItem().transform((v0) -> {
                return v0.iterator();
            }).onItem().transform(rowIterator -> {
                if (rowIterator.hasNext()) {
                    return from((Row) rowIterator.next());
                }
                return null;
            }).onItem().invoke(jobServiceManagementInfo2 -> {
                LOGGER.trace("Heartbeat {}", jobServiceManagementInfo2);
            });
        });
    }

    public Uni<Boolean> release(JobServiceManagementInfo jobServiceManagementInfo) {
        return this.client.withTransaction(sqlConnection -> {
            return sqlConnection.preparedQuery("UPDATE job_service_management SET token = null, last_heartbeat = null WHERE id = $1 AND token = $2 RETURNING id, token, last_heartbeat").execute(Tuple.of(jobServiceManagementInfo.getId(), jobServiceManagementInfo.getToken())).onItem().transform((v0) -> {
                return v0.iterator();
            }).onItem().transform((v0) -> {
                return v0.hasNext();
            });
        });
    }
}
