package org.kie.kogito.persistence.kafka;

import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.Spliterators;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.jbpm.flow.serialization.ProcessInstanceMarshallerService;
import org.kie.kogito.process.MutableProcessInstances;
import org.kie.kogito.process.Process;
import org.kie.kogito.process.ProcessInstance;
import org.kie.kogito.process.ProcessInstanceDuplicatedException;
import org.kie.kogito.process.ProcessInstanceReadMode;
import org.kie.kogito.process.impl.AbstractProcessInstance;

/* loaded from: input_file:org/kie/kogito/persistence/kafka/KafkaProcessInstances.class */
public class KafkaProcessInstances implements MutableProcessInstances {
    private Process<?> process;
    private KafkaProducer<String, byte[]> producer;
    private ReadOnlyKeyValueStore<String, byte[]> store;
    private ProcessInstanceMarshallerService marshaller;
    private CountDownLatch latch = new CountDownLatch(1);
    private String topic = KafkaPersistenceUtils.topicName();

    public KafkaProcessInstances(Process<?> process, KafkaProducer<String, byte[]> kafkaProducer) {
        this.process = process;
        this.producer = kafkaProducer;
        setMarshaller(ProcessInstanceMarshallerService.newBuilder().withDefaultObjectMarshallerStrategies().build());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Process<?> getProcess() {
        return this.process;
    }

    protected ReadOnlyKeyValueStore<String, byte[]> getStore() {
        return this.store != null ? this.store : getStoreAwait();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setStore(ReadOnlyKeyValueStore<String, byte[]> readOnlyKeyValueStore) {
        this.store = readOnlyKeyValueStore;
        this.latch.countDown();
    }

    private ReadOnlyKeyValueStore<String, byte[]> getStoreAwait() {
        try {
            if (!this.latch.await(1L, TimeUnit.MINUTES)) {
                throw new RuntimeException("Timeout waiting to obtain Kafka Store for process: " + this.process.id());
            }
            if (this.store == null) {
                throw new RuntimeException("Failed to obtain Kafka Store for process: " + this.process.id());
            }
            return this.store;
        } catch (Exception e) {
            throw new RuntimeException("Failed to obtain Kafka Store for process: " + this.process.id(), e);
        }
    }

    protected void setMarshaller(ProcessInstanceMarshallerService processInstanceMarshallerService) {
        this.marshaller = processInstanceMarshallerService;
    }

    public boolean exists(String str) {
        return getProcessInstanceById(str).isPresent();
    }

    protected Optional<byte[]> getProcessInstanceById(String str) {
        return Optional.ofNullable((byte[]) getStore().get(getKeyForProcessInstance(str)));
    }

    protected String getKeyForProcessInstance(String str) {
        return String.format("%s-%s", getProcess().id(), str);
    }

    protected void sendKafkaRecord(String str, byte[] bArr) throws ExecutionException, InterruptedException {
        this.producer.send(new ProducerRecord(this.topic, getKeyForProcessInstance(str), bArr)).get();
    }

    public void create(String str, ProcessInstance processInstance) {
        if (isActive(processInstance)) {
            if (getProcessInstanceById(str).isPresent()) {
                throw new ProcessInstanceDuplicatedException(str);
            }
            try {
                sendKafkaRecord(str, this.marshaller.marshallProcessInstance(processInstance));
            } catch (Exception e) {
                throw new RuntimeException("Unable to persist process instance id: " + str, e);
            }
        }
    }

    public void update(String str, ProcessInstance processInstance) {
        if (isActive(processInstance)) {
            try {
                sendKafkaRecord(str, this.marshaller.marshallProcessInstance(processInstance));
                disconnect(processInstance);
            } catch (Exception e) {
                throw new RuntimeException("Unable to update process instance id: " + str, e);
            }
        }
    }

    public void remove(String str) {
        try {
            sendKafkaRecord(str, null);
        } catch (Exception e) {
            throw new RuntimeException("Unable to remove process instance id: " + str, e);
        }
    }

    public Optional<ProcessInstance<?>> findById(String str, ProcessInstanceReadMode processInstanceReadMode) {
        return getProcessInstanceById(str).map(this.marshaller.createUnmarshallFunction(this.process, processInstanceReadMode));
    }

    public Stream<ProcessInstance<?>> stream(ProcessInstanceReadMode processInstanceReadMode) {
        KeyValueIterator prefixScan = getStore().prefixScan(getProcess().id(), Serdes.String().serializer());
        Stream map = StreamSupport.stream(Spliterators.spliteratorUnknownSize((Iterator) prefixScan, 16), false).map(keyValue -> {
            return (byte[]) keyValue.value;
        }).map(this.marshaller.createUnmarshallFunction(this.process, processInstanceReadMode));
        Objects.requireNonNull(prefixScan);
        return (Stream) map.onClose(prefixScan::close);
    }

    protected void disconnect(ProcessInstance<?> processInstance) {
        ((AbstractProcessInstance) processInstance).internalRemoveProcessInstance(this.marshaller.createdReloadFunction(() -> {
            return getProcessInstanceById(processInstance.id()).orElseThrow();
        }));
    }
}
