/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;

import com.clickhouse.client.ClickHouseNode;
import com.google.auto.service.AutoService;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKAggCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigMergeable;

@AutoService(value={SeaTunnelSink.class})
public class ClickhouseSink
implements SeaTunnelSink<SeaTunnelRow, ClickhouseSinkState, CKCommitInfo, CKAggCommitInfo> {
    private ReaderOption option;

    public String getPluginName() {
        return "Clickhouse";
    }

    public void prepare(Config config) throws PrepareFailException {
        boolean isCredential;
        CheckResult result = CheckConfigUtil.checkAllExists((Config)config, (String[])new String[]{ClickhouseConfig.HOST.key(), ClickhouseConfig.DATABASE.key(), ClickhouseConfig.TABLE.key()});
        boolean bl = isCredential = config.hasPath(ClickhouseConfig.USERNAME.key()) || config.hasPath(ClickhouseConfig.PASSWORD.key());
        if (isCredential) {
            result = CheckConfigUtil.checkAllExists((Config)config, (String[])new String[]{ClickhouseConfig.USERNAME.key(), ClickhouseConfig.PASSWORD.key()});
        }
        if (!result.isSuccess()) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, String.format("PluginName: %s, PluginType: %s, Message: %s", this.getPluginName(), PluginType.SINK, result.getMsg()));
        }
        ImmutableMap defaultConfig = ImmutableMap.builder().put((Object)ClickhouseConfig.BULK_SIZE.key(), ClickhouseConfig.BULK_SIZE.defaultValue()).put((Object)ClickhouseConfig.SPLIT_MODE.key(), ClickhouseConfig.SPLIT_MODE.defaultValue()).build();
        config = config.withFallback((ConfigMergeable)ConfigFactory.parseMap((Map)defaultConfig));
        List<ClickHouseNode> nodes = !isCredential ? ClickhouseUtil.createNodes(config.getString(ClickhouseConfig.HOST.key()), config.getString(ClickhouseConfig.DATABASE.key()), null, null) : ClickhouseUtil.createNodes(config.getString(ClickhouseConfig.HOST.key()), config.getString(ClickhouseConfig.DATABASE.key()), config.getString(ClickhouseConfig.USERNAME.key()), config.getString(ClickhouseConfig.PASSWORD.key()));
        Properties clickhouseProperties = new Properties();
        if (CheckConfigUtil.isValidParam((Config)config, (String)ClickhouseConfig.CLICKHOUSE_CONFIG.key())) {
            config.getObject(ClickhouseConfig.CLICKHOUSE_CONFIG.key()).forEach((key, value) -> clickhouseProperties.put(key, String.valueOf(value.unwrapped())));
        }
        if (isCredential) {
            clickhouseProperties.put("user", config.getString(ClickhouseConfig.USERNAME.key()));
            clickhouseProperties.put("password", config.getString(ClickhouseConfig.PASSWORD.key()));
        }
        ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
        Map<String, String> tableSchema = proxy.getClickhouseTableSchema(config.getString(ClickhouseConfig.TABLE.key()));
        String shardKey = null;
        String shardKeyType = null;
        ClickhouseTable table = proxy.getClickhouseTable(config.getString(ClickhouseConfig.DATABASE.key()), config.getString(ClickhouseConfig.TABLE.key()));
        if (config.getBoolean(ClickhouseConfig.SPLIT_MODE.key())) {
            if (!"Distributed".equals(table.getEngine())) {
                throw new ClickhouseConnectorException((SeaTunnelErrorCode)CommonErrorCode.ILLEGAL_ARGUMENT, "split mode only support table which engine is 'Distributed' engine at now");
            }
            if (config.hasPath(ClickhouseConfig.SHARDING_KEY.key())) {
                shardKey = config.getString(ClickhouseConfig.SHARDING_KEY.key());
                shardKeyType = tableSchema.get(shardKey);
            }
        }
        ShardMetadata metadata = isCredential ? new ShardMetadata(shardKey, shardKeyType, table.getSortingKey(), config.getString(ClickhouseConfig.DATABASE.key()), config.getString(ClickhouseConfig.TABLE.key()), table.getEngine(), config.getBoolean(ClickhouseConfig.SPLIT_MODE.key()), new Shard(1, 1, nodes.get(0)), config.getString(ClickhouseConfig.USERNAME.key()), config.getString(ClickhouseConfig.PASSWORD.key())) : new ShardMetadata(shardKey, shardKeyType, table.getSortingKey(), config.getString(ClickhouseConfig.DATABASE.key()), config.getString(ClickhouseConfig.TABLE.key()), table.getEngine(), config.getBoolean(ClickhouseConfig.SPLIT_MODE.key()), new Shard(1, 1, nodes.get(0)));
        proxy.close();
        String[] primaryKeys = null;
        if (config.hasPath(ClickhouseConfig.PRIMARY_KEY.key())) {
            String primaryKey = config.getString(ClickhouseConfig.PRIMARY_KEY.key());
            if (shardKey != null && !Objects.equals(primaryKey, shardKey)) {
                throw new ClickhouseConnectorException((SeaTunnelErrorCode)CommonErrorCode.ILLEGAL_ARGUMENT, "sharding_key and primary_key must be consistent to ensure correct processing of cdc events");
            }
            primaryKeys = new String[]{primaryKey};
        }
        boolean supportUpsert = (Boolean)ClickhouseConfig.SUPPORT_UPSERT.defaultValue();
        if (config.hasPath(ClickhouseConfig.SUPPORT_UPSERT.key())) {
            supportUpsert = config.getBoolean(ClickhouseConfig.SUPPORT_UPSERT.key());
        }
        boolean allowExperimentalLightweightDelete = (Boolean)ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.defaultValue();
        if (config.hasPath(ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key())) {
            allowExperimentalLightweightDelete = config.getBoolean(ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE.key());
        }
        this.option = ReaderOption.builder().shardMetadata(metadata).properties(clickhouseProperties).tableEngine(table.getEngine()).tableSchema(tableSchema).bulkSize(config.getInt(ClickhouseConfig.BULK_SIZE.key())).primaryKeys(primaryKeys).supportUpsert(supportUpsert).allowExperimentalLightweightDelete(allowExperimentalLightweightDelete).build();
    }

    public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> createWriter(SinkWriter.Context context) throws IOException {
        return new ClickhouseSinkWriter(this.option, context);
    }

    public SinkWriter<SeaTunnelRow, CKCommitInfo, ClickhouseSinkState> restoreWriter(SinkWriter.Context context, List<ClickhouseSinkState> states) throws IOException {
        return super.restoreWriter(context, states);
    }

    public Optional<Serializer<ClickhouseSinkState>> getWriterStateSerializer() {
        return Optional.of(new DefaultSerializer());
    }

    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
        this.option.setSeaTunnelRowType(seaTunnelRowType);
    }

    public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
        return this.option.getSeaTunnelRowType();
    }
}

