package io.automatiko.engine.addons.persistence.cassandra.job;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.servererrors.QueryExecutionException;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
import com.datastax.oss.driver.api.querybuilder.SchemaBuilder;
import com.datastax.oss.driver.api.querybuilder.delete.Delete;
import com.datastax.oss.driver.api.querybuilder.insert.RegularInsert;
import com.datastax.oss.driver.api.querybuilder.schema.CreateKeyspace;
import com.datastax.oss.driver.api.querybuilder.select.Select;
import com.datastax.oss.driver.api.querybuilder.update.Update;
import io.automatiko.engine.api.Application;
import io.automatiko.engine.api.Model;
import io.automatiko.engine.api.audit.Auditor;
import io.automatiko.engine.api.auth.IdentityProvider;
import io.automatiko.engine.api.auth.TrustedIdentityProvider;
import io.automatiko.engine.api.jobs.ExpirationTime;
import io.automatiko.engine.api.jobs.JobsService;
import io.automatiko.engine.api.jobs.ProcessInstanceJobDescription;
import io.automatiko.engine.api.jobs.ProcessJobDescription;
import io.automatiko.engine.api.uow.UnitOfWorkManager;
import io.automatiko.engine.api.workflow.Process;
import io.automatiko.engine.api.workflow.ProcessInstance;
import io.automatiko.engine.api.workflow.ProcessInstanceReadMode;
import io.automatiko.engine.api.workflow.Processes;
import io.automatiko.engine.services.time.TimerInstance;
import io.automatiko.engine.services.uow.UnitOfWorkExecutor;
import io.automatiko.engine.workflow.Sig;
import io.automatiko.engine.workflow.audit.BaseAuditEntry;
import io.automatiko.engine.workflow.base.core.timer.CronExpirationTime;
import io.automatiko.engine.workflow.base.core.timer.NoOpExpirationTime;
import io.quarkus.runtime.ShutdownEvent;
import io.quarkus.runtime.StartupEvent;
import jakarta.annotation.Priority;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.event.Observes;
import jakarta.inject.Inject;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.time.temporal.TemporalUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ApplicationScoped
/* loaded from: input_file:io/automatiko/engine/addons/persistence/cassandra/job/CassandraJobService.class */
public class CassandraJobService implements JobsService {
    private static final Logger LOGGER = LoggerFactory.getLogger(CassandraJobService.class);
    private static final String INSTANCE_ID_FIELD = "JobInstanceId";
    private static final String FIRE_AT_FIELD = "JobFireAt";
    private static final String OWNER_INSTANCE_ID_FIELD = "JobOwnerInstanceId";
    private static final String OWNER_DEF_ID_FIELD = "JobOwnerDefinitionId";
    private static final String TRIGGER_TYPE_FIELD = "JobTriggerType";
    private static final String STATUS_FIELD = "JobStatus";
    private static final String FIRE_LIMIT_FIELD = "JobFireLimit";
    private static final String REPEAT_INTERVAL_FIELD = "JobRepeatInterval";
    private static final String EXPRESSION_FIELD = "JobExpression";
    protected final CqlSession cqlSession;
    protected final UnitOfWorkManager unitOfWorkManager;
    protected final Auditor auditor;
    protected final ScheduledThreadPoolExecutor scheduler;
    protected final ScheduledThreadPoolExecutor loadScheduler;
    protected Map<String, Process<? extends Model>> mappedProcesses = new HashMap();
    protected ConcurrentHashMap<String, ScheduledFuture<?>> scheduledJobs = new ConcurrentHashMap<>();
    protected final String tableName = "ATK_JOBS";
    private Optional<Boolean> createKeyspace;
    private Optional<Boolean> createTables;
    private Optional<String> keyspace;
    private Optional<Long> interval;
    private Optional<Integer> threads;

    /* loaded from: input_file:io/automatiko/engine/addons/persistence/cassandra/job/CassandraJobService$SignalProcessInstanceOnExpiredTimer.class */
    private class SignalProcessInstanceOnExpiredTimer implements Runnable {
        private final String id;
        private final String processId;
        private String processInstanceId;
        private final String trigger;
        private Integer limit;
        private ProcessInstanceJobDescription description;

        private SignalProcessInstanceOnExpiredTimer(String str, String str2, String str3, String str4, Integer num, ProcessInstanceJobDescription processInstanceJobDescription) {
            this.id = str;
            this.processId = str3;
            this.processInstanceId = str4;
            this.trigger = str2;
            this.limit = num;
            this.description = processInstanceJobDescription;
        }

        @Override // java.lang.Runnable
        public void run() {
            CassandraJobService.LOGGER.debug("Job {} started", this.id);
            try {
                if (!CassandraJobService.this.cqlSession.execute(((Update) ((Update) QueryBuilder.update(CassandraJobService.this.keyspace.orElse("automatiko"), "ATK_JOBS").setColumn(CassandraJobService.STATUS_FIELD, QueryBuilder.literal("taken")).whereColumn(CassandraJobService.INSTANCE_ID_FIELD).isEqualTo(QueryBuilder.literal(this.id))).ifColumn(CassandraJobService.STATUS_FIELD).isEqualTo(QueryBuilder.literal("scheduled"))).build()).wasApplied()) {
                    CassandraJobService.this.scheduledJobs.remove(this.id).cancel(true);
                    return;
                }
                Process<? extends Model> process = CassandraJobService.this.mappedProcesses.get(this.processId);
                if (process == null) {
                    CassandraJobService.LOGGER.warn("No process found for process id {}", this.processId);
                    return;
                }
                IdentityProvider.set(new TrustedIdentityProvider("System<timer>"));
                CassandraJobService.this.auditor.publish(() -> {
                    return BaseAuditEntry.timer(this.description).add("message", "Executing timer job for existing workflow instance");
                });
                UnitOfWorkExecutor.executeInUnitOfWork(CassandraJobService.this.unitOfWorkManager, () -> {
                    Optional findById = process.instances().findById(this.processInstanceId, ProcessInstanceReadMode.MUTABLE_WITH_LOCK);
                    if (!findById.isPresent()) {
                        CassandraJobService.this.scheduledJobs.remove(this.id).cancel(false);
                        CassandraJobService.this.removeScheduledJob(this.id);
                        return null;
                    }
                    ((ProcessInstance) findById.get()).send(Sig.of(this.trigger, TimerInstance.with(Long.parseLong(this.id.split("_")[1]), this.id, this.limit)));
                    CassandraJobService.this.scheduledJobs.remove(this.id).cancel(false);
                    if (this.description.expirationTime().next() != null) {
                        CassandraJobService.this.removeScheduledJob(this.id);
                        CassandraJobService.this.scheduleProcessInstanceJob(this.description);
                        return null;
                    }
                    if (this.limit.intValue() > 0) {
                        CassandraJobService.this.updateRepeatableJob(this.id);
                        return null;
                    }
                    CassandraJobService.this.removeScheduledJob(this.id);
                    return null;
                });
                CassandraJobService.LOGGER.debug("Job {} completed", this.id);
            } catch (QueryExecutionException e) {
                CassandraJobService.this.scheduledJobs.remove(this.id).cancel(true);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/automatiko/engine/addons/persistence/cassandra/job/CassandraJobService$StartProcessOnExpiredTimer.class */
    public class StartProcessOnExpiredTimer implements Runnable {
        private final String id;
        private final String processId;
        private Integer limit;
        private ProcessJobDescription description;

        private StartProcessOnExpiredTimer(String str, String str2, Integer num, ProcessJobDescription processJobDescription) {
            this.id = str;
            this.processId = str2;
            this.limit = num;
            this.description = processJobDescription;
        }

        @Override // java.lang.Runnable
        public void run() {
            CassandraJobService.LOGGER.debug("Job {} started", this.id);
            try {
                if (!CassandraJobService.this.cqlSession.execute(((Update) ((Update) QueryBuilder.update(CassandraJobService.this.keyspace.orElse("automatiko"), "ATK_JOBS").setColumn(CassandraJobService.STATUS_FIELD, QueryBuilder.literal("taken")).whereColumn(CassandraJobService.INSTANCE_ID_FIELD).isEqualTo(QueryBuilder.literal(this.id))).ifColumn(CassandraJobService.STATUS_FIELD).isEqualTo(QueryBuilder.literal("scheduled"))).build()).wasApplied()) {
                    CassandraJobService.this.scheduledJobs.remove(this.id).cancel(true);
                    return;
                }
                Process<? extends Model> process = CassandraJobService.this.mappedProcesses.get(this.processId);
                if (process == null) {
                    CassandraJobService.LOGGER.warn("No process found for process id {}", this.processId);
                    return;
                }
                IdentityProvider.set(new TrustedIdentityProvider("System<timer>"));
                CassandraJobService.this.auditor.publish(() -> {
                    return BaseAuditEntry.timer(this.description).add("message", "Executing timer job to create new workflow instance");
                });
                UnitOfWorkExecutor.executeInUnitOfWork(CassandraJobService.this.unitOfWorkManager, () -> {
                    ProcessInstance createInstance = process.createInstance(process.createModel());
                    if (createInstance != null) {
                        createInstance.start("timer", (String) null, (Object) null);
                    }
                    CassandraJobService.this.scheduledJobs.remove(this.id).cancel(false);
                    Integer num = this.limit;
                    this.limit = Integer.valueOf(this.limit.intValue() - 1);
                    if (this.description.expirationTime().next() != null) {
                        CassandraJobService.this.removeScheduledJob(this.id);
                        CassandraJobService.this.scheduleProcessJob(this.description);
                        return null;
                    }
                    if (this.limit.intValue() > 0) {
                        CassandraJobService.this.updateRepeatableJob(this.id);
                        return null;
                    }
                    CassandraJobService.this.removeScheduledJob(this.id);
                    return null;
                });
                CassandraJobService.LOGGER.debug("Job {} completed", this.id);
            } catch (QueryExecutionException e) {
                CassandraJobService.this.scheduledJobs.remove(this.id).cancel(true);
            }
        }
    }

    @Inject
    public CassandraJobService(CqlSession cqlSession, Processes processes, Application application, Auditor auditor, @ConfigProperty(name = "quarkus.automatiko.jobs.cassandra.create-keyspace") Optional<Boolean> optional, @ConfigProperty(name = "quarkus.automatiko.jobs.cassandra.create-tables") Optional<Boolean> optional2, @ConfigProperty(name = "quarkus.automatiko.jobs.cassandra.keyspace") Optional<String> optional3, @ConfigProperty(name = "quarkus.automatiko.jobs.cassandra.interval") Optional<Long> optional4, @ConfigProperty(name = "quarkus.automatiko.jobs.cassandra.threads") Optional<Integer> optional5) {
        this.cqlSession = cqlSession;
        this.createKeyspace = optional;
        this.createTables = optional2;
        this.keyspace = optional3;
        this.interval = optional4;
        this.threads = optional5;
        processes.processIds().forEach(str -> {
            this.mappedProcesses.put(str, processes.processById(str));
        });
        if (this.createTables.orElse(Boolean.TRUE).booleanValue()) {
            createTable();
        }
        this.unitOfWorkManager = application.unitOfWorkManager();
        this.auditor = auditor;
        this.scheduler = new ScheduledThreadPoolExecutor(this.threads.orElse(1).intValue(), runnable -> {
            return new Thread(runnable, "automatiko-jobs-executor");
        });
        this.loadScheduler = new ScheduledThreadPoolExecutor(1, runnable2 -> {
            return new Thread(runnable2, "automatiko-jobs-loader");
        });
    }

    public void start(@Priority(3000) @Observes StartupEvent startupEvent) {
        this.loadScheduler.scheduleAtFixedRate(() -> {
            try {
                long epochMilli = LocalDateTime.now().plus((TemporalAmount) Duration.ofMinutes(this.interval.orElse(10L).longValue())).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
                List<Row> all = this.cqlSession.execute(((Select) QueryBuilder.selectFrom(this.keyspace.orElse("automatiko"), "ATK_JOBS").all().whereColumn(FIRE_AT_FIELD).isLessThan(QueryBuilder.literal(Long.valueOf(epochMilli)))).allowFiltering().build()).all();
                LOGGER.debug("Loaded jobs ({}) to be executed before {}", Integer.valueOf(all.size()), Long.valueOf(epochMilli));
                for (Row row : all) {
                    if (row.getString(OWNER_INSTANCE_ID_FIELD) == null) {
                        ProcessJobDescription of = ProcessJobDescription.of(build(row.getString(EXPRESSION_FIELD)), (String) null, row.getString(OWNER_DEF_ID_FIELD));
                        this.scheduledJobs.computeIfAbsent(row.getString(INSTANCE_ID_FIELD), str -> {
                            return log(row.getString(INSTANCE_ID_FIELD), this.scheduler.schedule(new StartProcessOnExpiredTimer(row.getString(INSTANCE_ID_FIELD), row.getString(OWNER_DEF_ID_FIELD), -1, of), Duration.between(LocalDateTime.now(), ZonedDateTime.ofInstant(Instant.ofEpochMilli(row.getLong(FIRE_AT_FIELD)), ZoneId.systemDefault())).toMillis(), TimeUnit.MILLISECONDS));
                        });
                    } else {
                        ProcessInstanceJobDescription of2 = ProcessInstanceJobDescription.of(row.getString(INSTANCE_ID_FIELD), row.getString(TRIGGER_TYPE_FIELD), build(row.getString(EXPRESSION_FIELD)), row.getString(OWNER_INSTANCE_ID_FIELD), row.getString(OWNER_DEF_ID_FIELD), (String) null);
                        this.scheduledJobs.computeIfAbsent(row.getString(INSTANCE_ID_FIELD), str2 -> {
                            return log(row.getString(INSTANCE_ID_FIELD), this.scheduler.schedule(new SignalProcessInstanceOnExpiredTimer(row.getString(INSTANCE_ID_FIELD), row.getString(TRIGGER_TYPE_FIELD), row.getString(OWNER_DEF_ID_FIELD), row.getString(OWNER_INSTANCE_ID_FIELD), Integer.valueOf(row.getInt(FIRE_LIMIT_FIELD)), of2), Duration.between(LocalDateTime.now(), ZonedDateTime.ofInstant(Instant.ofEpochMilli(row.getLong(FIRE_AT_FIELD)), ZoneId.systemDefault())).toMillis(), TimeUnit.MILLISECONDS));
                        });
                    }
                }
            } catch (Exception e) {
                LOGGER.error("Error while loading jobs from cassandra", e);
            }
        }, 1L, this.interval.orElse(10L).longValue() * 60, TimeUnit.SECONDS);
    }

    public void shutdown(@Observes ShutdownEvent shutdownEvent) {
        this.loadScheduler.shutdownNow();
        this.scheduler.shutdown();
    }

    /* JADX WARN: Type inference failed for: r0v25, types: [java.time.LocalDateTime] */
    /* JADX WARN: Type inference failed for: r2v13, types: [java.time.LocalDateTime] */
    /* JADX WARN: Type inference failed for: r2v14, types: [java.time.ZonedDateTime] */
    /* JADX WARN: Type inference failed for: r2v46, types: [java.time.LocalDateTime] */
    /* JADX WARN: Type inference failed for: r2v47, types: [java.time.ZonedDateTime] */
    public String scheduleProcessJob(ProcessJobDescription processJobDescription) {
        RegularInsert value;
        LOGGER.debug("ScheduleProcessJob: {}", processJobDescription);
        if (processJobDescription.expirationTime().repeatInterval() != null) {
            value = QueryBuilder.insertInto(this.keyspace.orElse("automatiko"), "ATK_JOBS").value(INSTANCE_ID_FIELD, QueryBuilder.literal(processJobDescription.id())).value(OWNER_DEF_ID_FIELD, QueryBuilder.literal(processJobDescription.processId() + version(processJobDescription.processVersion()))).value(STATUS_FIELD, QueryBuilder.literal("scheduled")).value(FIRE_AT_FIELD, QueryBuilder.literal(Long.valueOf(processJobDescription.expirationTime().get().toLocalDateTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()))).value(FIRE_LIMIT_FIELD, QueryBuilder.literal(processJobDescription.expirationTime().repeatLimit())).value(REPEAT_INTERVAL_FIELD, QueryBuilder.literal(processJobDescription.expirationTime().repeatInterval())).value(EXPRESSION_FIELD, QueryBuilder.literal(processJobDescription.expirationTime().expression()));
            this.auditor.publish(() -> {
                return BaseAuditEntry.timer(processJobDescription).add("message", "Scheduled repeatable timer job that creates new workflow instances");
            });
        } else {
            value = QueryBuilder.insertInto(this.keyspace.orElse("automatiko"), "ATK_JOBS").value(INSTANCE_ID_FIELD, QueryBuilder.literal(processJobDescription.id())).value(OWNER_DEF_ID_FIELD, QueryBuilder.literal(processJobDescription.processId() + version(processJobDescription.processVersion()))).value(STATUS_FIELD, QueryBuilder.literal("scheduled")).value(FIRE_AT_FIELD, QueryBuilder.literal(Long.valueOf(processJobDescription.expirationTime().get().toLocalDateTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()))).value(FIRE_LIMIT_FIELD, QueryBuilder.literal(processJobDescription.expirationTime().repeatLimit())).value(EXPRESSION_FIELD, QueryBuilder.literal(processJobDescription.expirationTime().expression()));
            this.auditor.publish(() -> {
                return BaseAuditEntry.timer(processJobDescription).add("message", "Scheduled one time timer job that creates new workflow instances");
            });
        }
        this.cqlSession.execute(value.build());
        if (processJobDescription.expirationTime().get().toLocalDateTime().isBefore(LocalDateTime.now().plusMinutes(this.interval.orElse(10L).longValue()))) {
            this.scheduledJobs.computeIfAbsent(processJobDescription.id(), str -> {
                return this.scheduler.schedule(processJobByDescription(processJobDescription), calculateDelay(processJobDescription.expirationTime().get()), TimeUnit.MILLISECONDS);
            });
        }
        return processJobDescription.id();
    }

    /* JADX WARN: Type inference failed for: r0v26, types: [java.time.LocalDateTime] */
    /* JADX WARN: Type inference failed for: r2v18, types: [java.time.LocalDateTime] */
    /* JADX WARN: Type inference failed for: r2v19, types: [java.time.ZonedDateTime] */
    /* JADX WARN: Type inference failed for: r2v57, types: [java.time.LocalDateTime] */
    /* JADX WARN: Type inference failed for: r2v58, types: [java.time.ZonedDateTime] */
    public String scheduleProcessInstanceJob(ProcessInstanceJobDescription processInstanceJobDescription) {
        RegularInsert value;
        if (processInstanceJobDescription.expirationTime().repeatInterval() != null) {
            value = QueryBuilder.insertInto(this.keyspace.orElse("automatiko"), "ATK_JOBS").value(INSTANCE_ID_FIELD, QueryBuilder.literal(processInstanceJobDescription.id())).value(TRIGGER_TYPE_FIELD, QueryBuilder.literal(processInstanceJobDescription.triggerType())).value(OWNER_DEF_ID_FIELD, QueryBuilder.literal(processInstanceJobDescription.processId() + version(processInstanceJobDescription.processVersion()))).value(OWNER_INSTANCE_ID_FIELD, QueryBuilder.literal(processInstanceJobDescription.processInstanceId())).value(STATUS_FIELD, QueryBuilder.literal("scheduled")).value(FIRE_AT_FIELD, QueryBuilder.literal(Long.valueOf(processInstanceJobDescription.expirationTime().get().toLocalDateTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()))).value(FIRE_LIMIT_FIELD, QueryBuilder.literal(processInstanceJobDescription.expirationTime().repeatLimit())).value(REPEAT_INTERVAL_FIELD, QueryBuilder.literal(processInstanceJobDescription.expirationTime().repeatInterval())).value(EXPRESSION_FIELD, QueryBuilder.literal(processInstanceJobDescription.expirationTime().expression()));
            this.auditor.publish(() -> {
                return BaseAuditEntry.timer(processInstanceJobDescription).add("message", "Scheduled repeatable timer job for existing workflow instance");
            });
        } else {
            value = QueryBuilder.insertInto(this.keyspace.orElse("automatiko"), "ATK_JOBS").value(INSTANCE_ID_FIELD, QueryBuilder.literal(processInstanceJobDescription.id())).value(TRIGGER_TYPE_FIELD, QueryBuilder.literal(processInstanceJobDescription.triggerType())).value(OWNER_DEF_ID_FIELD, QueryBuilder.literal(processInstanceJobDescription.processId() + version(processInstanceJobDescription.processVersion()))).value(OWNER_INSTANCE_ID_FIELD, QueryBuilder.literal(processInstanceJobDescription.processInstanceId())).value(STATUS_FIELD, QueryBuilder.literal("scheduled")).value(FIRE_AT_FIELD, QueryBuilder.literal(Long.valueOf(processInstanceJobDescription.expirationTime().get().toLocalDateTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()))).value(FIRE_LIMIT_FIELD, QueryBuilder.literal(processInstanceJobDescription.expirationTime().repeatLimit())).value(EXPRESSION_FIELD, QueryBuilder.literal(processInstanceJobDescription.expirationTime().expression()));
            this.auditor.publish(() -> {
                return BaseAuditEntry.timer(processInstanceJobDescription).add("message", "Scheduled one time timer job for existing workflow instance");
            });
        }
        this.cqlSession.execute(value.build());
        if (processInstanceJobDescription.expirationTime().get().toLocalDateTime().isBefore(LocalDateTime.now().plusMinutes(this.interval.orElse(10L).longValue()))) {
            this.scheduledJobs.computeIfAbsent(processInstanceJobDescription.id(), str -> {
                return log(processInstanceJobDescription.id(), this.scheduler.schedule(new SignalProcessInstanceOnExpiredTimer(processInstanceJobDescription.id(), processInstanceJobDescription.triggerType(), processInstanceJobDescription.processId() + version(processInstanceJobDescription.processVersion()), processInstanceJobDescription.processInstanceId(), processInstanceJobDescription.expirationTime().repeatLimit(), processInstanceJobDescription), calculateDelay(processInstanceJobDescription.expirationTime().get()), TimeUnit.MILLISECONDS));
            });
        }
        return processInstanceJobDescription.id();
    }

    public boolean cancelJob(String str) {
        removeScheduledJob(str);
        return true;
    }

    public ZonedDateTime getScheduledTime(String str) {
        Row row = (Row) this.cqlSession.execute(((Select) QueryBuilder.selectFrom(this.keyspace.orElse("automatiko"), "ATK_JOBS").column(FIRE_AT_FIELD).whereColumn(INSTANCE_ID_FIELD).isEqualTo(QueryBuilder.literal(str))).build()).one();
        if (row != null) {
            return ZonedDateTime.ofInstant(Instant.ofEpochMilli(Long.valueOf(row.getLong(FIRE_AT_FIELD)).longValue()), ZoneId.systemDefault());
        }
        return null;
    }

    protected long calculateDelay(ZonedDateTime zonedDateTime) {
        return Duration.between(ZonedDateTime.now(), zonedDateTime).toMillis();
    }

    protected Runnable processJobByDescription(ProcessJobDescription processJobDescription) {
        return new StartProcessOnExpiredTimer(processJobDescription.id(), processJobDescription.process().id(), processJobDescription.expirationTime().repeatLimit(), processJobDescription);
    }

    protected String version(String str) {
        return (str == null || str.trim().isEmpty()) ? "" : "_" + str.replaceAll("\\.", "_");
    }

    protected void removeScheduledJob(String str) {
        this.auditor.publish(() -> {
            Row row = (Row) this.cqlSession.execute(((Select) QueryBuilder.selectFrom(this.keyspace.orElse("automatiko"), "ATK_JOBS").columns(new String[]{EXPRESSION_FIELD, REPEAT_INTERVAL_FIELD, FIRE_LIMIT_FIELD, OWNER_DEF_ID_FIELD, OWNER_INSTANCE_ID_FIELD, TRIGGER_TYPE_FIELD}).whereColumn(INSTANCE_ID_FIELD).isEqualTo(QueryBuilder.literal(str))).build()).one();
            return row != null ? BaseAuditEntry.timer().add("message", "Cancelled job for existing workflow instance").add("jobId", str).add("timerExpression", row.getString(EXPRESSION_FIELD)).add("timerInterval", Long.valueOf(row.getLong(REPEAT_INTERVAL_FIELD))).add("timerRepeatLimit", Integer.valueOf(row.getInt(FIRE_LIMIT_FIELD))).add("workflowDefinitionId", row.getString(OWNER_DEF_ID_FIELD)).add("workflowInstanceId", row.getString(OWNER_INSTANCE_ID_FIELD)).add("triggerType", row.getString(TRIGGER_TYPE_FIELD)) : BaseAuditEntry.timer().add("message", "Cancelled job for existing workflow instance").add("jobId", str);
        });
        this.cqlSession.execute(((Delete) QueryBuilder.deleteFrom(this.keyspace.orElse("automatiko"), "ATK_JOBS").whereColumn(INSTANCE_ID_FIELD).isEqualTo(QueryBuilder.literal(str))).ifExists().build());
    }

    protected void updateRepeatableJob(String str) {
        Row row = (Row) this.cqlSession.execute(((Select) QueryBuilder.selectFrom(this.keyspace.orElse("automatiko"), "ATK_JOBS").column(FIRE_AT_FIELD).whereColumn(INSTANCE_ID_FIELD).isEqualTo(QueryBuilder.literal(str))).build()).one();
        if (row != null) {
            Integer valueOf = Integer.valueOf(row.getInt(FIRE_LIMIT_FIELD) - 1);
            Long valueOf2 = Long.valueOf(row.getLong(REPEAT_INTERVAL_FIELD));
            ZonedDateTime ofInstant = ZonedDateTime.ofInstant(Instant.ofEpochMilli(row.getLong(FIRE_AT_FIELD)), ZoneId.systemDefault());
            this.cqlSession.execute(((Update) QueryBuilder.update(this.keyspace.orElse("automatiko"), "ATK_JOBS").setColumn(STATUS_FIELD, QueryBuilder.literal("scheduled")).setColumn(FIRE_LIMIT_FIELD, QueryBuilder.literal(valueOf)).setColumn(FIRE_AT_FIELD, QueryBuilder.literal(Long.valueOf(ofInstant.plus(valueOf2.longValue(), (TemporalUnit) ChronoUnit.MILLIS).toInstant().toEpochMilli()))).whereColumn(INSTANCE_ID_FIELD).isEqualTo(QueryBuilder.literal(str))).build());
            if (row.getString(OWNER_INSTANCE_ID_FIELD) == null) {
                ProcessJobDescription of = ProcessJobDescription.of(build(row.getString(EXPRESSION_FIELD)), (String) null, row.getString(OWNER_DEF_ID_FIELD));
                this.scheduledJobs.computeIfAbsent(row.getString(INSTANCE_ID_FIELD), str2 -> {
                    return log(row.getString(INSTANCE_ID_FIELD), this.scheduler.schedule(new StartProcessOnExpiredTimer(row.getString(INSTANCE_ID_FIELD), row.getString(OWNER_DEF_ID_FIELD), valueOf, of), Duration.between(LocalDateTime.now(), ofInstant).toMillis(), TimeUnit.MILLISECONDS));
                });
            } else {
                ProcessInstanceJobDescription of2 = ProcessInstanceJobDescription.of(row.getString(INSTANCE_ID_FIELD), row.getString(TRIGGER_TYPE_FIELD), build(row.getString(EXPRESSION_FIELD)), row.getString(OWNER_INSTANCE_ID_FIELD), row.getString(OWNER_DEF_ID_FIELD), (String) null);
                this.scheduledJobs.computeIfAbsent(row.getString(INSTANCE_ID_FIELD), str3 -> {
                    return log(row.getString(INSTANCE_ID_FIELD), this.scheduler.scheduleAtFixedRate(new SignalProcessInstanceOnExpiredTimer(row.getString(INSTANCE_ID_FIELD), row.getString(TRIGGER_TYPE_FIELD), row.getString(OWNER_DEF_ID_FIELD), row.getString(OWNER_INSTANCE_ID_FIELD), valueOf, of2), Duration.between(LocalDateTime.now(), ofInstant).toMillis(), valueOf2.longValue(), TimeUnit.MILLISECONDS));
                });
            }
        }
    }

    protected ScheduledFuture<?> log(String str, ScheduledFuture<?> scheduledFuture) {
        LOGGER.debug("Next fire of job {} is in {} seconds ", str, Long.valueOf(scheduledFuture.getDelay(TimeUnit.SECONDS)));
        return scheduledFuture;
    }

    protected ExpirationTime build(String str) {
        return str != null ? CronExpirationTime.of(str) : new NoOpExpirationTime();
    }

    protected void createTable() {
        if (this.createKeyspace.orElse(true).booleanValue()) {
            this.cqlSession.execute(((CreateKeyspace) SchemaBuilder.createKeyspace(this.keyspace.orElse("automatiko")).ifNotExists().withSimpleStrategy(1)).build());
        }
        this.cqlSession.execute(SchemaBuilder.createTable(this.keyspace.orElse("automatiko"), "ATK_JOBS").ifNotExists().withPartitionKey(INSTANCE_ID_FIELD, DataTypes.TEXT).withColumn(FIRE_AT_FIELD, DataTypes.BIGINT).withColumn(OWNER_INSTANCE_ID_FIELD, DataTypes.TEXT).withColumn(OWNER_DEF_ID_FIELD, DataTypes.TEXT).withColumn(TRIGGER_TYPE_FIELD, DataTypes.TEXT).withColumn(STATUS_FIELD, DataTypes.TEXT).withColumn(FIRE_LIMIT_FIELD, DataTypes.INT).withColumn(REPEAT_INTERVAL_FIELD, DataTypes.BIGINT).withColumn(EXPRESSION_FIELD, DataTypes.TEXT).build());
        this.cqlSession.execute(SchemaBuilder.createIndex("ATK_JOBS_IDX").ifNotExists().onTable(this.keyspace.orElse("automatiko"), "ATK_JOBS").andColumn(FIRE_AT_FIELD).build());
    }
}
