package io.confluent.connect.jdbc.sink;

import io.confluent.connect.jdbc.dialect.DatabaseDialect;
import io.confluent.connect.jdbc.dialect.DatabaseDialects;
import io.confluent.connect.jdbc.util.LogUtil;
import io.confluent.connect.jdbc.util.Version;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/jdbc/sink/JdbcSinkTask.class */
public class JdbcSinkTask extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(JdbcSinkTask.class);
    ErrantRecordReporter reporter;
    DatabaseDialect dialect;
    JdbcSinkConfig config;
    JdbcDbWriter writer;
    int remainingRetries;
    boolean shouldTrimSensitiveLogs;

    public void start(Map<String, String> map) {
        log.info("Starting JDBC Sink task");
        this.config = new JdbcSinkConfig(map);
        initWriter();
        this.remainingRetries = this.config.maxRetries;
        this.shouldTrimSensitiveLogs = this.config.trimSensitiveLogsEnabled;
        try {
            this.reporter = this.context.errantRecordReporter();
        } catch (NoClassDefFoundError | NoSuchMethodError e) {
            this.reporter = null;
        }
    }

    void initWriter() {
        log.info("Initializing JDBC writer");
        if (this.config.dialectName == null || this.config.dialectName.trim().isEmpty()) {
            this.dialect = DatabaseDialects.findBestFor(this.config.connectionUrl, this.config);
        } else {
            this.dialect = DatabaseDialects.create(this.config.dialectName, this.config);
        }
        DbStructure dbStructure = new DbStructure(this.dialect);
        log.info("Initializing writer using SQL dialect: {}", this.dialect.getClass().getSimpleName());
        this.writer = new JdbcDbWriter(this.config, this.dialect, dbStructure);
        log.info("JDBC writer initialized");
    }

    /* JADX WARN: Type inference failed for: r12v1, types: [java.lang.Throwable, io.confluent.connect.jdbc.sink.TableAlterOrCreateException] */
    public void put(Collection<SinkRecord> collection) {
        if (collection.isEmpty()) {
            return;
        }
        SinkRecord next = collection.iterator().next();
        log.debug("Received {} records. First record kafka coordinates:({}-{}-{}). Writing them to the database...", new Object[]{Integer.valueOf(collection.size()), next.topic(), next.kafkaPartition(), Long.valueOf(next.kafkaOffset())});
        try {
            this.writer.write(collection);
        } catch (TableAlterOrCreateException e) {
            if (this.reporter == null) {
                log.error(e.toString());
                throw e;
            }
            unrollAndRetry(collection);
        } catch (SQLException e2) {
            SQLException trimSensitiveData = this.shouldTrimSensitiveLogs ? LogUtil.trimSensitiveData(e2) : e2;
            log.warn("Write of {} records failed, remainingRetries={}", new Object[]{Integer.valueOf(collection.size()), Integer.valueOf(this.remainingRetries), trimSensitiveData});
            int i = 0;
            Iterator<Throwable> it = e2.iterator();
            while (it.hasNext()) {
                it.next();
                i++;
            }
            SQLException allMessagesException = getAllMessagesException(e2);
            if (this.remainingRetries > 0) {
                this.writer.closeQuietly();
                initWriter();
                this.remainingRetries--;
                this.context.timeout(this.config.retryBackoffMs);
                log.debug(allMessagesException.toString());
                throw new RetriableException(allMessagesException);
            }
            if (this.reporter == null) {
                log.error("Failing task after exhausting retries; encountered {} exceptions on last write attempt. For complete details on each exception, please enable DEBUG logging.", Integer.valueOf(i));
                int i2 = 1;
                Iterator<Throwable> it2 = trimSensitiveData.iterator();
                while (it2.hasNext()) {
                    int i3 = i2;
                    i2++;
                    log.debug("Exception {}:", Integer.valueOf(i3), it2.next());
                }
                throw new ConnectException(allMessagesException);
            }
            unrollAndRetry(collection);
        }
        this.remainingRetries = this.config.maxRetries;
    }

    /* JADX WARN: Type inference failed for: r8v1, types: [java.lang.Throwable, io.confluent.connect.jdbc.sink.TableAlterOrCreateException] */
    private void unrollAndRetry(Collection<SinkRecord> collection) {
        this.writer.closeQuietly();
        initWriter();
        for (SinkRecord sinkRecord : collection) {
            try {
                this.writer.write(Collections.singletonList(sinkRecord));
            } catch (TableAlterOrCreateException e) {
                log.debug(e.toString());
                this.reporter.report(sinkRecord, (Throwable) e);
                this.writer.closeQuietly();
            } catch (SQLException e2) {
                SQLException allMessagesException = getAllMessagesException(e2);
                log.debug(allMessagesException.toString());
                this.reporter.report(sinkRecord, allMessagesException);
                this.writer.closeQuietly();
            }
        }
    }

    private SQLException getAllMessagesException(SQLException sQLException) {
        String str = "Exception chain:" + System.lineSeparator();
        SQLException trimSensitiveData = this.shouldTrimSensitiveLogs ? LogUtil.trimSensitiveData(sQLException) : sQLException;
        Iterator<Throwable> it = trimSensitiveData.iterator();
        while (it.hasNext()) {
            str = str + it.next() + System.lineSeparator();
        }
        SQLException sQLException2 = new SQLException(str);
        sQLException2.setNextException(trimSensitiveData);
        return sQLException2;
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
    }

    public void stop() {
        log.info("Stopping task");
        try {
            try {
                this.writer.closeQuietly();
                if (this.dialect != null) {
                    this.dialect.close();
                }
            } catch (Throwable th) {
                try {
                    try {
                        if (this.dialect != null) {
                            this.dialect.close();
                        }
                    } catch (Throwable th2) {
                        log.warn("Error while closing the {} dialect: ", this.dialect.name(), th2);
                        this.dialect = null;
                        throw th;
                    }
                    throw th;
                } finally {
                }
            }
        } catch (Throwable th3) {
            log.warn("Error while closing the {} dialect: ", this.dialect.name(), th3);
        } finally {
            this.dialect = null;
        }
    }

    public String version() {
        return Version.getVersion();
    }
}
