package io.debezium.connector.spanner.task;

import io.debezium.connector.spanner.db.model.Partition;
import io.debezium.connector.spanner.metrics.MetricsEventPublisher;
import io.debezium.connector.spanner.metrics.event.TaskStateChangeQueueUpdateMetricEvent;
import io.debezium.connector.spanner.task.state.NewPartitionsEvent;
import io.debezium.connector.spanner.task.state.TaskStateChangeEvent;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/task/TaskStateChangeEventProcessor.class */
public class TaskStateChangeEventProcessor {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskStateChangeEventProcessor.class);
    private final BlockingQueue<TaskStateChangeEvent> queue;
    private final TaskSyncContextHolder taskSyncContextHolder;
    private final TaskStateChangeEventHandler taskStateChangeEventHandler;
    private final Consumer<Throwable> errorHandler;
    private final MetricsEventPublisher metricsEventPublisher;
    private volatile Thread thread;

    public TaskStateChangeEventProcessor(int i, TaskSyncContextHolder taskSyncContextHolder, TaskStateChangeEventHandler taskStateChangeEventHandler, Consumer<Throwable> consumer, MetricsEventPublisher metricsEventPublisher) {
        this.queue = new ArrayBlockingQueue(i);
        this.taskSyncContextHolder = taskSyncContextHolder;
        this.errorHandler = consumer;
        this.taskStateChangeEventHandler = taskStateChangeEventHandler;
        this.metricsEventPublisher = metricsEventPublisher;
    }

    private Thread createEventHandlerThread() {
        Thread thread = new Thread(() -> {
            while (!Thread.interrupted()) {
                try {
                    LOGGER.debug("createEventHandlerThread: Wait for sync event");
                    TaskStateChangeEvent take = this.queue.take();
                    this.metricsEventPublisher.publishMetricEvent(new TaskStateChangeQueueUpdateMetricEvent(this.queue.remainingCapacity()));
                    LoggerUtils.debug(LOGGER, "createEventHandlerThread: Received sync event of type: {}, event: {}", take.getClass().getSimpleName(), take);
                    this.taskSyncContextHolder.awaitNewEpoch();
                    this.taskSyncContextHolder.lock();
                    try {
                        this.taskStateChangeEventHandler.processEvent(take);
                    } catch (InterruptedException e) {
                        LOGGER.info("Task {}, interrupting the event handler thread", this.taskSyncContextHolder.get().getTaskUid());
                        Thread.currentThread().interrupt();
                    } finally {
                        this.taskSyncContextHolder.unlock();
                    }
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    LOGGER.info("Task {}, interrupting the event handler thread", this.taskSyncContextHolder.get().getTaskUid());
                    return;
                }
            }
        }, "SpannerConnector-TaskStateChangeEventProcessor");
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            this.errorHandler.accept(th);
        });
        return thread;
    }

    public void startProcessing() {
        if (this.thread != null) {
            return;
        }
        this.thread = createEventHandlerThread();
        this.thread.start();
    }

    public void stopProcessing() {
        if (this.thread != null) {
            this.queue.clear();
            this.thread.interrupt();
            this.thread = null;
        }
    }

    public void processEvent(TaskStateChangeEvent taskStateChangeEvent) throws InterruptedException {
        if (taskStateChangeEvent instanceof NewPartitionsEvent) {
            List<Partition> removeAlreadyExistingPartitions = removeAlreadyExistingPartitions(((NewPartitionsEvent) taskStateChangeEvent).getPartitions());
            if (!removeAlreadyExistingPartitions.isEmpty()) {
                this.queue.put(new NewPartitionsEvent(removeAlreadyExistingPartitions));
            }
        } else {
            this.queue.put(taskStateChangeEvent);
        }
        this.metricsEventPublisher.publishMetricEvent(new TaskStateChangeQueueUpdateMetricEvent(this.queue.remainingCapacity()));
    }

    private List<Partition> removeAlreadyExistingPartitions(List<Partition> list) {
        Set<String> allPartitionTokens = TaskStateUtil.allPartitionTokens(this.taskSyncContextHolder.get());
        return (List) list.stream().filter(partition -> {
            return !allPartitionTokens.contains(partition.getToken());
        }).collect(Collectors.toList());
    }
}
