package io.kestra.plugin.debezium;

import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.data.Envelope;
import io.debezium.data.VariableScaleDecimal;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.FileSerde;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.plugin.debezium.AbstractDebeziumTask;
import io.kestra.plugin.debezium.models.Envelope;
import io.kestra.plugin.debezium.models.Message;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.ZonedDateTime;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.transforms.HeaderFrom;

/* loaded from: input_file:io/kestra/plugin/debezium/ChangeConsumer.class */
public class ChangeConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
    private final AbstractDebeziumTask abstractDebeziumTask;
    private final RunContext runContext;
    private final AtomicInteger count;
    private ZonedDateTime lastRecord;
    private final Map<String, Pair<File, OutputStream>> records = new HashMap();
    private final Map<String, AtomicInteger> recordsCount = new ConcurrentHashMap();

    public ChangeConsumer(AbstractDebeziumTask abstractDebeziumTask, RunContext runContext, AtomicInteger atomicInteger, ZonedDateTime zonedDateTime) {
        this.abstractDebeziumTask = abstractDebeziumTask;
        this.runContext = runContext;
        this.count = atomicInteger;
        this.lastRecord = zonedDateTime;
    }

    @Override // io.debezium.engine.DebeziumEngine.ChangeConsumer
    public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> list, DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> recordCommitter) {
        this.lastRecord = ZonedDateTime.now();
        for (ChangeEvent<SourceRecord, SourceRecord> changeEvent : list) {
            Pair<Message, Message> convert = MapConverter.convert(changeEvent.value());
            Map<String, Object> handle = handle(convert);
            if (handle != null) {
                write(handle, convert.getValue().getSource());
            }
            recordCommitter.markProcessed(changeEvent);
        }
        recordCommitter.markBatchFinished();
    }

    private Map<String, Object> handle(Pair<Message, Message> pair) {
        if (isFilter(pair)) {
            return null;
        }
        switch (this.abstractDebeziumTask.getFormat()) {
            case RAW:
                return handleFormatRaw(pair);
            case INLINE:
                return handleFormatInline(pair);
            case WRAP:
                return handleFormatWrap(pair);
            default:
                throw new IllegalArgumentException("Invalid Format '" + this.abstractDebeziumTask.getFormat());
        }
    }

    private void write(Map<String, Object> map, Message.Source source) throws IOException {
        String db;
        switch (this.abstractDebeziumTask.getSplitTable()) {
            case OFF:
                db = CloudEventsMaker.FieldName.DATA;
                break;
            case TABLE:
                db = source.getDb() + "." + source.getTable();
                break;
            case DATABASE:
                db = source.getDb();
                break;
            default:
                throw new IllegalArgumentException("Invalid SplitTable '" + this.abstractDebeziumTask.getSplitTable());
        }
        if (!this.records.containsKey(db)) {
            Path tempFile = this.runContext.tempFile(db);
            this.records.put(db, Pair.of(tempFile.toFile(), new FileOutputStream(tempFile.toFile())));
        }
        this.recordsCount.computeIfAbsent(db, str -> {
            return new AtomicInteger();
        }).incrementAndGet();
        int incrementAndGet = this.count.incrementAndGet();
        if (incrementAndGet > 0 && incrementAndGet % 5000 == 0) {
            this.runContext.logger().debug("Received {} records: {}", this.count, this.recordsCount);
        }
        FileSerde.write(this.records.get(db).getRight(), map);
    }

    private boolean isFilter(Pair<Message, Message> pair) {
        if (!(pair.getValue() instanceof Envelope) && this.abstractDebeziumTask.getIgnoreDdl().booleanValue()) {
            return true;
        }
        if (pair.getValue() == null && this.abstractDebeziumTask.getDeleted() == AbstractDebeziumTask.Deleted.DROP) {
            return true;
        }
        return ((pair.getValue() instanceof Envelope) || this.abstractDebeziumTask.getFormat() == AbstractDebeziumTask.Format.RAW) ? false : true;
    }

    private Map<String, Object> handleFormatRaw(Pair<Message, Message> pair) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("key", pair.getKey());
        linkedHashMap.put(VariableScaleDecimal.VALUE_FIELD, pair.getValue());
        addDeleted(linkedHashMap, pair);
        return linkedHashMap;
    }

    private Map<String, Object> handleFormatInline(Pair<Message, Message> pair) {
        Envelope envelope = (Envelope) pair.getValue();
        Map<String, Object> formatInlineWithoutAdditional = formatInlineWithoutAdditional(envelope);
        addDeleted(formatInlineWithoutAdditional, pair);
        addKey(formatInlineWithoutAdditional, pair);
        addMetadata(formatInlineWithoutAdditional, envelope);
        return formatInlineWithoutAdditional;
    }

    private Map<String, Object> handleFormatWrap(Pair<Message, Message> pair) {
        Envelope envelope = (Envelope) pair.getValue();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("record", formatInlineWithoutAdditional(envelope));
        addDeleted(linkedHashMap, pair);
        addKey(linkedHashMap, pair);
        addMetadata(linkedHashMap, envelope);
        return linkedHashMap;
    }

    private Map<String, Object> formatInlineWithoutAdditional(Envelope envelope) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        if (envelope.getOperation() == Envelope.Operation.DELETE) {
            linkedHashMap.putAll(envelope.getBefore());
        } else {
            linkedHashMap.putAll(envelope.getAfter());
        }
        return linkedHashMap;
    }

    private void addDeleted(Map<String, Object> map, Pair<Message, Message> pair) {
        if (this.abstractDebeziumTask.getDeleted() == AbstractDebeziumTask.Deleted.ADD_FIELD && (pair.getValue() instanceof io.kestra.plugin.debezium.models.Envelope)) {
            Envelope.Operation operation = ((io.kestra.plugin.debezium.models.Envelope) pair.getValue()).getOperation();
            map.put(this.abstractDebeziumTask.getDeletedFieldName(), Boolean.valueOf(operation == Envelope.Operation.DELETE || operation == Envelope.Operation.TRUNCATE));
        }
    }

    private void addKey(Map<String, Object> map, Pair<Message, Message> pair) {
        if (this.abstractDebeziumTask.getKey() != AbstractDebeziumTask.Key.ADD_FIELD || pair.getKey() == null) {
            return;
        }
        map.putAll(JacksonMapper.toMap(pair.getKey()));
    }

    private void addMetadata(Map<String, Object> map, io.kestra.plugin.debezium.models.Envelope envelope) {
        if (this.abstractDebeziumTask.getMetadata() == AbstractDebeziumTask.Metadata.ADD_FIELD) {
            HashMap hashMap = new HashMap();
            if (envelope.getProperties() != null) {
                hashMap.putAll(envelope.getProperties());
            }
            if (envelope.getOperation() != null) {
                hashMap.put(HeaderFrom.OPERATION_FIELD, envelope.getOperation());
            }
            if (envelope.getTransaction() != null) {
                hashMap.put("transaction", envelope.getTransaction());
            }
            if (envelope.getSource() != null) {
                hashMap.put("source", envelope.getSource());
            }
            if (envelope.getTimestamp() != null) {
                hashMap.put("timestamp", envelope.getTimestamp());
            }
            map.put(this.abstractDebeziumTask.getMetadataFieldName(), hashMap);
        }
    }

    @Override // io.debezium.engine.DebeziumEngine.ChangeConsumer
    public boolean supportsTombstoneEvents() {
        return super.supportsTombstoneEvents();
    }

    @Generated
    public Map<String, Pair<File, OutputStream>> getRecords() {
        return this.records;
    }

    @Generated
    public Map<String, AtomicInteger> getRecordsCount() {
        return this.recordsCount;
    }
}
