package org.apache.seatunnel.connectors.seatunnel.jdbc.sink;

import java.io.IOException;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormat;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.JdbcOutputFormatBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.JdbcBatchStatementExecutor;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState;
import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.class */
public class JdbcSinkWriter implements SinkWriter<SeaTunnelRow, XidInfo, JdbcSinkState> {
    private final JdbcOutputFormat<SeaTunnelRow, JdbcBatchStatementExecutor<SeaTunnelRow>> outputFormat;
    private final SinkWriter.Context context;
    private final JdbcConnectionProvider connectionProvider;
    private transient boolean isOpen;

    public JdbcSinkWriter(SinkWriter.Context context, JdbcDialect jdbcDialect, JdbcSinkConfig jdbcSinkConfig, SeaTunnelRowType seaTunnelRowType) {
        this.context = context;
        this.connectionProvider = new SimpleJdbcConnectionProvider(jdbcSinkConfig.getJdbcConnectionConfig());
        this.outputFormat = new JdbcOutputFormatBuilder(jdbcDialect, this.connectionProvider, jdbcSinkConfig, seaTunnelRowType).build();
    }

    private void tryOpen() throws IOException {
        if (this.isOpen) {
            return;
        }
        this.isOpen = true;
        this.outputFormat.open();
    }

    public List<JdbcSinkState> snapshotState(long j) {
        return Collections.emptyList();
    }

    public void write(SeaTunnelRow seaTunnelRow) throws IOException {
        tryOpen();
        this.outputFormat.writeRecord(SerializationUtils.clone(seaTunnelRow));
    }

    public Optional<XidInfo> prepareCommit() throws IOException {
        tryOpen();
        this.outputFormat.flush();
        try {
            if (!this.connectionProvider.getConnection().getAutoCommit()) {
                this.connectionProvider.getConnection().commit();
            }
            return Optional.empty();
        } catch (SQLException e) {
            throw new JdbcConnectorException(JdbcConnectorErrorCode.TRANSACTION_OPERATION_FAILED, "commit failed," + e.getMessage(), e);
        }
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        tryOpen();
        this.outputFormat.flush();
        try {
            if (!this.connectionProvider.getConnection().getAutoCommit()) {
                this.connectionProvider.getConnection().commit();
            }
            this.outputFormat.close();
        } catch (SQLException e) {
            throw new JdbcConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, "unable to close JDBC sink write", e);
        }
    }
}
