package io.debezium.connector.spanner.kafka.internal;

import com.google.protobuf.InvalidProtocolBufferException;
import io.debezium.connector.spanner.exception.SpannerConnectorException;
import io.debezium.connector.spanner.function.BlockingBiConsumer;
import io.debezium.connector.spanner.kafka.event.proto.SyncEventProtos;
import io.debezium.connector.spanner.kafka.internal.model.SyncEventMetadata;
import io.debezium.connector.spanner.kafka.internal.model.TaskSyncEvent;
import io.debezium.connector.spanner.kafka.internal.proto.SyncEventFromProtoMapper;
import io.debezium.connector.spanner.task.LoggerUtils;
import java.lang.Thread;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/debezium/connector/spanner/kafka/internal/TaskSyncEventListener.class */
public class TaskSyncEventListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(TaskSyncEventListener.class);
    private final String consumerGroup;
    private final String topic;
    private final boolean seekBackToPreviousEpoch;
    private final Duration pollDuration;
    private final Duration commitOffsetsTimeout;
    private final long commitOffsetsInterval;
    private final SyncEventConsumerFactory<String, byte[]> consumerFactory;
    private final List<BlockingBiConsumer<TaskSyncEvent, SyncEventMetadata>> eventConsumers = new ArrayList();
    private final Consumer<RuntimeException> errorHandler;
    private volatile Thread thread;

    public TaskSyncEventListener(String str, String str2, SyncEventConsumerFactory<String, byte[]> syncEventConsumerFactory, boolean z, Consumer<RuntimeException> consumer) {
        this.consumerGroup = str;
        this.topic = str2;
        this.seekBackToPreviousEpoch = z;
        this.pollDuration = Duration.ofMillis(syncEventConsumerFactory.getConfig().syncPollDuration());
        this.commitOffsetsTimeout = Duration.ofMillis(syncEventConsumerFactory.getConfig().syncCommitOffsetsTimeout());
        this.commitOffsetsInterval = syncEventConsumerFactory.getConfig().syncCommitOffsetsInterval();
        this.consumerFactory = syncEventConsumerFactory;
        this.errorHandler = consumer;
    }

    public void subscribe(BlockingBiConsumer<TaskSyncEvent, SyncEventMetadata> blockingBiConsumer) {
        this.eventConsumers.add(blockingBiConsumer);
    }

    public void unsubscribe(BiConsumer<TaskSyncEvent, SyncEventMetadata> biConsumer) {
        this.eventConsumers.remove(biConsumer);
    }

    public void start() throws InterruptedException {
        TopicPartition topicPartition = new TopicPartition(this.topic, 0);
        List of = List.of(topicPartition);
        org.apache.kafka.clients.consumer.Consumer<String, byte[]> createConsumer = this.consumerFactory.createConsumer(this.consumerGroup);
        createConsumer.assign(of);
        Long l = (Long) createConsumer.endOffsets(of).get(topicPartition);
        Long l2 = (Long) createConsumer.beginningOffsets(of).get(topicPartition);
        long max = Math.max(l.longValue() - 1, l2.longValue());
        try {
            if (l.longValue() == max) {
                LOGGER.info("task {}, listen: Sync topic is empty, so initial sync is finished", this.consumerGroup);
                Iterator<BlockingBiConsumer<TaskSyncEvent, SyncEventMetadata>> it = this.eventConsumers.iterator();
                while (it.hasNext()) {
                    it.next().accept(null, SyncEventMetadata.builder().canInitiateRebalancing(true).build());
                }
            } else {
                LOGGER.info("Task {}, listen: read last message", this.consumerGroup);
                try {
                    createConsumer.seek(topicPartition, max);
                    seekBackToPreviousEpoch(createConsumer, topicPartition, l2.longValue());
                } catch (Exception e) {
                    this.errorHandler.accept(new SpannerConnectorException("Error during seek back the Sync Topic", e));
                    return;
                } catch (InterruptException e2) {
                    throw new InterruptedException();
                }
            }
            this.thread = new Thread(() -> {
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    while (!Thread.currentThread().isInterrupted()) {
                        try {
                            poll(createConsumer, l.longValue());
                            if (!this.consumerFactory.isAutoCommitEnabled() && currentTimeMillis + this.commitOffsetsInterval < System.currentTimeMillis()) {
                                createConsumer.commitSync(this.commitOffsetsTimeout);
                                currentTimeMillis = System.currentTimeMillis();
                            }
                        } catch (InterruptException | InterruptedException e3) {
                            shutdownConsumer(createConsumer);
                            return;
                        } catch (Exception e4) {
                            this.errorHandler.accept(new SpannerConnectorException("Error during poll from the Sync Topic", e4));
                            shutdownConsumer(createConsumer);
                            return;
                        }
                    }
                    shutdownConsumer(createConsumer);
                } catch (Throwable th) {
                    shutdownConsumer(createConsumer);
                    throw th;
                }
            }, "SpannerConnector-TaskSyncEventListener");
            this.thread.start();
        } catch (Exception e3) {
            shutdownConsumer(createConsumer);
            throw e3;
        }
    }

    private int poll(org.apache.kafka.clients.consumer.Consumer<String, byte[]> consumer, long j) throws InvalidProtocolBufferException, InterruptedException {
        ConsumerRecords poll = consumer.poll(this.pollDuration);
        LOGGER.trace("listen: poll messages count: {}", Integer.valueOf(poll.count()));
        if (poll.isEmpty()) {
            return 0;
        }
        Iterator it = poll.iterator();
        while (it.hasNext()) {
            ConsumerRecord<String, byte[]> consumerRecord = (ConsumerRecord) it.next();
            TaskSyncEvent parseSyncEvent = parseSyncEvent(consumerRecord);
            LoggerUtils.debug(LOGGER, "Receive SyncEvent from Kafka topic: {}", parseSyncEvent);
            Iterator<BlockingBiConsumer<TaskSyncEvent, SyncEventMetadata>> it2 = this.eventConsumers.iterator();
            while (it2.hasNext()) {
                it2.next().accept(parseSyncEvent, SyncEventMetadata.builder().offset(consumerRecord.offset()).canInitiateRebalancing(consumerRecord.offset() >= j - 1).build());
            }
        }
        return poll.count();
    }

    private void seekBackToPreviousEpoch(org.apache.kafka.clients.consumer.Consumer<String, byte[]> consumer, TopicPartition topicPartition, long j) throws InvalidProtocolBufferException {
        if (this.seekBackToPreviousEpoch) {
            ConsumerRecords poll = consumer.poll(this.pollDuration);
            if (poll.isEmpty()) {
                LOGGER.warn("listen: fail to poll last message");
                return;
            }
            long max = Math.max(parseSyncEvent((ConsumerRecord) poll.iterator().next()).getEpochOffset(), j);
            LOGGER.info("Task {}, listen: seek back to previous epoch offset: {}", this.consumerGroup, Long.valueOf(max));
            consumer.seek(topicPartition, max);
        }
    }

    private TaskSyncEvent parseSyncEvent(ConsumerRecord<String, byte[]> consumerRecord) throws InvalidProtocolBufferException {
        return SyncEventFromProtoMapper.mapFromProto(SyncEventProtos.SyncEvent.parseFrom((byte[]) consumerRecord.value()));
    }

    private void shutdownConsumer(org.apache.kafka.clients.consumer.Consumer<String, byte[]> consumer) {
        try {
            consumer.unsubscribe();
            consumer.close();
        } catch (InterruptException e) {
            if (Thread.currentThread().isInterrupted()) {
                return;
            }
            Thread.currentThread().interrupt();
        }
    }

    public void shutdown() {
        if (this.thread == null) {
            return;
        }
        this.thread.interrupt();
        do {
        } while (!this.thread.getState().equals(Thread.State.TERMINATED));
        this.thread = null;
    }
}
