package org.apache.seatunnel.flink.clickhouse.sink;

import com.google.common.collect.Lists;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.types.Row;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.flink.clickhouse.ConfigKey;
import org.apache.seatunnel.flink.clickhouse.pojo.IntHolder;
import org.apache.seatunnel.flink.clickhouse.pojo.Shard;
import org.apache.seatunnel.flink.clickhouse.pojo.ShardMetadata;
import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseBatchStatement;
import org.apache.seatunnel.flink.clickhouse.sink.client.ClickhouseClient;
import org.apache.seatunnel.flink.clickhouse.sink.client.ShardRouter;
import org.apache.seatunnel.flink.clickhouse.sink.inject.ArrayInjectFunction;
import org.apache.seatunnel.flink.clickhouse.sink.inject.BigDecimalInjectFunction;
import org.apache.seatunnel.flink.clickhouse.sink.inject.ClickhouseFieldInjectFunction;
import org.apache.seatunnel.flink.clickhouse.sink.inject.DateInjectFunction;
import org.apache.seatunnel.flink.clickhouse.sink.inject.DateTimeInjectFunction;
import org.apache.seatunnel.flink.clickhouse.sink.inject.DoubleInjectFunction;
import org.apache.seatunnel.flink.clickhouse.sink.inject.FloatInjectFunction;
import org.apache.seatunnel.flink.clickhouse.sink.inject.IntInjectFunction;
import org.apache.seatunnel.flink.clickhouse.sink.inject.LongInjectFunction;
import org.apache.seatunnel.flink.clickhouse.sink.inject.StringInjectFunction;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import ru.yandex.clickhouse.ClickHouseConnectionImpl;
import ru.yandex.clickhouse.ClickHousePreparedStatementImpl;
import ru.yandex.clickhouse.ClickHouseStatement;

/* loaded from: input_file:org/apache/seatunnel/flink/clickhouse/sink/ClickhouseOutputFormat.class */
public class ClickhouseOutputFormat extends RichOutputFormat<Row> {
    private static final long serialVersionUID = -1;
    private final Config config;
    private final List<String> fields;
    private final Map<String, String> tableSchema;
    private final ShardMetadata shardMetadata;
    private final int batchSize;
    private transient RetryUtils.RetryMaterial retryMaterial;
    private transient ShardRouter shardRouter;
    private transient ClickhouseClient clickhouseClient;
    private transient String prepareSql;
    private transient Map<Shard, ClickhouseBatchStatement> statementMap;
    private transient Map<String, ClickhouseFieldInjectFunction> fieldInjectFunctionMap;
    private static final ClickhouseFieldInjectFunction DEFAULT_INJECT_FUNCTION = new StringInjectFunction();

    public ClickhouseOutputFormat(Config config, ShardMetadata shardMetadata, List<String> list, Map<String, String> map) {
        this.config = config;
        this.shardMetadata = shardMetadata;
        this.fields = list;
        this.tableSchema = map;
        this.batchSize = config.getInt(ConfigKey.BULK_SIZE);
    }

    public void configure(Configuration configuration) {
    }

    public void open(int i, int i2) {
        List<Integer> intList = this.config.getIntList(ConfigKey.RETRY_CODES);
        this.retryMaterial = new RetryUtils.RetryMaterial(this.config.getInt(ConfigKey.RETRY), true, exc -> {
            if (exc instanceof SQLException) {
                return intList.contains(Integer.valueOf(((SQLException) exc).getErrorCode()));
            }
            return false;
        });
        this.clickhouseClient = new ClickhouseClient(this.config);
        this.fieldInjectFunctionMap = initFieldInjectFunctionMap();
        this.shardRouter = new ShardRouter(this.clickhouseClient, this.shardMetadata);
        this.prepareSql = initPrepareSQL();
        this.statementMap = initStatementMap();
    }

    public void writeRecord(Row row) {
        ClickhouseBatchStatement clickhouseBatchStatement = this.statementMap.get(this.shardRouter.getShard(row));
        ClickHousePreparedStatementImpl preparedStatement = clickhouseBatchStatement.getPreparedStatement();
        IntHolder intHolder = clickhouseBatchStatement.getIntHolder();
        addIntoBatch(row, preparedStatement);
        intHolder.setValue(intHolder.getValue() + 1);
        if (intHolder.getValue() >= this.batchSize) {
            flush(preparedStatement);
            intHolder.setValue(0);
        }
    }

    /* JADX WARN: Finally extract failed */
    public void close() {
        for (ClickhouseBatchStatement clickhouseBatchStatement : this.statementMap.values()) {
            try {
                ClickHouseConnectionImpl clickHouseConnection = clickhouseBatchStatement.getClickHouseConnection();
                Throwable th = null;
                try {
                    ClickHousePreparedStatementImpl preparedStatement = clickhouseBatchStatement.getPreparedStatement();
                    Throwable th2 = null;
                    try {
                        try {
                            IntHolder intHolder = clickhouseBatchStatement.getIntHolder();
                            if (intHolder.getValue() > 0) {
                                flush(preparedStatement);
                                intHolder.setValue(0);
                            }
                            if (preparedStatement != null) {
                                if (0 != 0) {
                                    try {
                                        preparedStatement.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    preparedStatement.close();
                                }
                            }
                            if (clickHouseConnection != null) {
                                if (0 != 0) {
                                    try {
                                        clickHouseConnection.close();
                                    } catch (Throwable th4) {
                                        th.addSuppressed(th4);
                                    }
                                } else {
                                    clickHouseConnection.close();
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                } catch (Throwable th5) {
                    if (clickHouseConnection != null) {
                        if (0 != 0) {
                            try {
                                clickHouseConnection.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        } else {
                            clickHouseConnection.close();
                        }
                    }
                    throw th5;
                }
            } catch (SQLException e) {
                throw new RuntimeException("Failed to close prepared statement.", e);
            }
        }
    }

    private void addIntoBatch(Row row, ClickHousePreparedStatementImpl clickHousePreparedStatementImpl) {
        for (int i = 0; i < this.fields.size(); i++) {
            try {
                String str = this.fields.get(i);
                Object field = row.getField(str);
                if (field == null) {
                    clickHousePreparedStatementImpl.setObject(i + 1, null);
                } else {
                    this.fieldInjectFunctionMap.getOrDefault(this.tableSchema.get(str), DEFAULT_INJECT_FUNCTION).injectFields(clickHousePreparedStatementImpl, i + 1, field);
                }
            } catch (SQLException e) {
                throw new RuntimeException("Add row data into batch error", e);
            }
        }
        clickHousePreparedStatementImpl.addBatch();
    }

    private void flush(ClickHouseStatement clickHouseStatement) {
        try {
            RetryUtils.retryWithException(() -> {
                clickHouseStatement.executeBatch();
                return null;
            }, this.retryMaterial);
        } catch (Exception e) {
            throw new RuntimeException("Clickhouse execute batch statement error", e);
        }
    }

    private String initPrepareSQL() {
        String[] strArr = new String[this.fields.size()];
        Arrays.fill(strArr, "?");
        return String.format("INSERT INTO %s (%s) VALUES (%s)", this.shardRouter.getShardTable(), String.join(",", this.fields), String.join(",", strArr));
    }

    private Map<Shard, ClickhouseBatchStatement> initStatementMap() {
        HashMap hashMap = new HashMap(16);
        this.shardRouter.getShards().forEach((num, shard) -> {
            try {
                ClickHouseConnectionImpl clickhouseConnection = this.clickhouseClient.getClickhouseConnection();
                hashMap.put(shard, new ClickhouseBatchStatement(clickhouseConnection, (ClickHousePreparedStatementImpl) clickhouseConnection.prepareStatement(this.prepareSql), new IntHolder()));
            } catch (SQLException e) {
                throw new RuntimeException("Clickhouse prepare statement error", e);
            }
        });
        return hashMap;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Map<String, ClickhouseFieldInjectFunction> initFieldInjectFunctionMap() {
        HashMap hashMap = new HashMap(16);
        ArrayList newArrayList = Lists.newArrayList(new ArrayInjectFunction(), new BigDecimalInjectFunction(), new DateInjectFunction(), new DateTimeInjectFunction(), new DoubleInjectFunction(), new FloatInjectFunction(), new IntInjectFunction(), new LongInjectFunction(), new StringInjectFunction());
        StringInjectFunction stringInjectFunction = new StringInjectFunction();
        Iterator<String> it = this.fields.iterator();
        while (it.hasNext()) {
            StringInjectFunction stringInjectFunction2 = stringInjectFunction;
            String str = this.tableSchema.get(it.next());
            Iterator it2 = newArrayList.iterator();
            while (true) {
                if (it2.hasNext()) {
                    ClickhouseFieldInjectFunction clickhouseFieldInjectFunction = (ClickhouseFieldInjectFunction) it2.next();
                    if (clickhouseFieldInjectFunction.isCurrentFieldType(str)) {
                        stringInjectFunction2 = clickhouseFieldInjectFunction;
                        break;
                    }
                }
            }
            hashMap.put(str, stringInjectFunction2);
        }
        return hashMap;
    }
}
