package com.ververica.cdc.debezium.internal;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.utils.ThreadUtils;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.json.JsonConverterConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.runtime.WorkerConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.OffsetBackingStore;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.OffsetStorageWriter;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.util.Callback;
import io.debezium.embedded.EmbeddedEngine;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/ververica/cdc/debezium/internal/FlinkOffsetBackingStore.class */
public class FlinkOffsetBackingStore implements OffsetBackingStore {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkOffsetBackingStore.class);
    public static final String OFFSET_STATE_VALUE = "offset.storage.flink.state.value";
    public static final int FLUSH_TIMEOUT_SECONDS = 10;
    protected Map<ByteBuffer, ByteBuffer> data = new HashMap();
    protected ExecutorService executor;

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.OffsetBackingStore
    public void configure(WorkerConfig workerConfig) {
        start();
        Map<String, Object> originals = workerConfig.originals();
        if (originals.containsKey(OFFSET_STATE_VALUE)) {
            String str = (String) originals.get(OFFSET_STATE_VALUE);
            try {
                DebeziumOffset deserialize = new DebeziumOffsetSerializer().deserialize(str.getBytes(StandardCharsets.UTF_8));
                String str2 = (String) originals.get(EmbeddedEngine.ENGINE_NAME.name());
                JsonConverter jsonConverter = new JsonConverter();
                JsonConverter jsonConverter2 = new JsonConverter();
                jsonConverter.configure(workerConfig.originals(), true);
                HashMap hashMap = new HashMap(originals);
                hashMap.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
                jsonConverter2.configure(hashMap, true);
                OffsetStorageWriter offsetStorageWriter = new OffsetStorageWriter(this, str2, jsonConverter, jsonConverter2);
                offsetStorageWriter.offset(deserialize.sourcePartition, deserialize.sourceOffset);
                if (!offsetStorageWriter.beginFlush()) {
                    LOG.warn("Initialize FlinkOffsetBackingStore from empty offset state, this shouldn't happen.");
                    return;
                }
                try {
                    offsetStorageWriter.doFlush((th, r5) -> {
                        if (th != null) {
                            LOG.error("Failed to flush initial offset.", th);
                        } else {
                            LOG.debug("Successfully flush initial offset.");
                        }
                    }).get(10L, TimeUnit.SECONDS);
                    LOG.info("Flush offsets successfully, partition: {}, offsets: {}", deserialize.sourcePartition, deserialize.sourceOffset);
                } catch (InterruptedException e) {
                    LOG.warn("Flush offsets interrupted, cancelling.", e);
                    offsetStorageWriter.cancelFlush();
                } catch (ExecutionException e2) {
                    LOG.error("Flush offsets threw an unexpected exception.", e2);
                    offsetStorageWriter.cancelFlush();
                } catch (TimeoutException e3) {
                    LOG.error("Timed out waiting to flush offsets to storage.", e3);
                    offsetStorageWriter.cancelFlush();
                }
            } catch (IOException e4) {
                LOG.error("Can't deserialize debezium offset state from JSON: " + str, e4);
                throw new RuntimeException(e4);
            }
        }
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.OffsetBackingStore
    public void start() {
        if (this.executor == null) {
            this.executor = Executors.newFixedThreadPool(1, ThreadUtils.createThreadFactory(getClass().getSimpleName() + "-%d", false));
        }
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.OffsetBackingStore
    public void stop() {
        if (this.executor != null) {
            this.executor.shutdown();
            try {
                this.executor.awaitTermination(30L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (!this.executor.shutdownNow().isEmpty()) {
                throw new ConnectException("Failed to stop FlinkOffsetBackingStore. Exiting without cleanly shutting down pending tasks and/or callbacks.");
            }
            this.executor = null;
        }
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.OffsetBackingStore
    public Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> collection) {
        return this.executor.submit(() -> {
            HashMap hashMap = new HashMap();
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                ByteBuffer byteBuffer = (ByteBuffer) it.next();
                hashMap.put(byteBuffer, this.data.get(byteBuffer));
            }
            return hashMap;
        });
    }

    @Override // com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.storage.OffsetBackingStore
    public Future<Void> set(Map<ByteBuffer, ByteBuffer> map, Callback<Void> callback) {
        return this.executor.submit(() -> {
            for (Map.Entry entry : map.entrySet()) {
                this.data.put((ByteBuffer) entry.getKey(), (ByteBuffer) entry.getValue());
            }
            if (callback == null) {
                return null;
            }
            callback.onCompletion(null, null);
            return null;
        });
    }
}
