/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseException;
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRecord;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;

public class ClickhouseProxy {
    private final ClickHouseRequest<?> clickhouseRequest;
    private final ClickHouseClient client;
    private final Map<Shard, ClickHouseClient> shardToDataSource = new ConcurrentHashMap<Shard, ClickHouseClient>(16);

    public ClickhouseProxy(ClickHouseNode node) {
        this.client = ClickHouseClient.newInstance(node.getProtocol());
        this.clickhouseRequest = this.client.connect(node).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
    }

    public ClickHouseRequest<?> getClickhouseConnection() {
        return this.clickhouseRequest;
    }

    public ClickHouseRequest<?> getClickhouseConnection(Shard shard) {
        ClickHouseClient c = this.shardToDataSource.computeIfAbsent(shard, s -> ClickHouseClient.newInstance(s.getNode().getProtocol()));
        return c.connect(shard.getNode()).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public DistributedEngine getClickhouseDistributedTable(ClickHouseRequest<?> connection, String database, String table) {
        String sql = String.format("select engine_full from system.tables where database = '%s' and name = '%s' and engine = 'Distributed'", database, table);
        try (ClickHouseResponse response = ((ClickHouseRequest)connection.query(sql)).executeAndWait();){
            List records = response.stream().collect(Collectors.toList());
            if (!records.isEmpty()) {
                String localTableDDL;
                String localTableEngine;
                ClickHouseRecord record = (ClickHouseRecord)records.get(0);
                String engineFull = record.getValue(0).asString();
                List infos = Arrays.stream(engineFull.substring(12).split(",")).map(s -> s.replace("'", "").trim()).collect(Collectors.toList());
                String clusterName = (String)infos.get(0);
                String localDatabase = (String)infos.get(1);
                String localTable = ((String)infos.get(2)).replace("\\)", "").trim();
                String localTableSQL = String.format("select engine,create_table_query from system.tables where database = '%s' and name = '%s'", localDatabase, localTable);
                try (ClickHouseResponse localTableResponse = ((ClickHouseRequest)this.clickhouseRequest.query(localTableSQL)).executeAndWait();){
                    List localTableRecords = localTableResponse.stream().collect(Collectors.toList());
                    if (localTableRecords.isEmpty()) {
                        throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get table from clickhouse, resultSet is empty");
                    }
                    localTableEngine = ((ClickHouseRecord)localTableRecords.get(0)).getValue(0).asString();
                    localTableDDL = ((ClickHouseRecord)localTableRecords.get(0)).getValue(1).asString();
                    localTableDDL = this.localizationEngine(localTableEngine, localTableDDL);
                }
                DistributedEngine distributedEngine = new DistributedEngine(clusterName, localDatabase, localTable, localTableEngine, localTableDDL);
                return distributedEngine;
            }
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get distributed table from clickhouse, resultSet is empty");
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get distributed table from clickhouse", e);
        }
    }

    public Map<String, String> getClickhouseTableSchema(String table) {
        ClickHouseRequest<?> request = this.getClickhouseConnection();
        return this.getClickhouseTableSchema(request, table);
    }

    public Map<String, String> getClickhouseTableSchema(ClickHouseRequest<?> request, String table) {
        String sql = "desc " + table;
        LinkedHashMap<String, String> schema = new LinkedHashMap<String, String>();
        try (ClickHouseResponse response = ((ClickHouseRequest)request.query(sql)).executeAndWait();){
            response.records().forEach(r -> schema.put(r.getValue(0).asString(), r.getValue(1).asString()));
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)CommonErrorCode.TABLE_SCHEMA_GET_FAILED, "Cannot get table schema from clickhouse", e);
        }
        return schema;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public List<Shard> getClusterShardList(ClickHouseRequest<?> connection, String clusterName, String database, int port, String username, String password) {
        String sql = "select shard_num,shard_weight,replica_num,host_name,host_address,port from system.clusters where cluster = '" + clusterName + "' and replica_num=1";
        ArrayList<Shard> shardList = new ArrayList<Shard>();
        try (ClickHouseResponse response = ((ClickHouseRequest)connection.query(sql)).executeAndWait();){
            response.records().forEach(r -> shardList.add(new Shard(r.getValue(0).asInteger(), r.getValue(1).asInteger(), r.getValue(2).asInteger(), r.getValue(3).asString(), r.getValue(4).asString(), port, database, username, password)));
            ArrayList<Shard> arrayList = shardList;
            return arrayList;
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.CLUSTER_LIST_GET_FAILED, "Cannot get cluster shard list from clickhouse", e);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public ClickhouseTable getClickhouseTable(String database, String table) {
        String sql = String.format("select engine,create_table_query,engine_full,data_paths,sorting_key from system.tables where database = '%s' and name = '%s'", database, table);
        try (ClickHouseResponse response = ((ClickHouseRequest)this.clickhouseRequest.query(sql)).executeAndWait();){
            List records = response.stream().collect(Collectors.toList());
            if (records.isEmpty()) {
                throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get table from clickhouse, resultSet is empty");
            }
            ClickHouseRecord record = (ClickHouseRecord)records.get(0);
            String engine = record.getValue(0).asString();
            String createTableDDL = record.getValue(1).asString();
            String engineFull = record.getValue(2).asString();
            List<String> dataPaths = record.getValue(3).asTuple().stream().map(Object::toString).collect(Collectors.toList());
            String sortingKey = record.getValue(4).asString();
            DistributedEngine distributedEngine = null;
            if ("Distributed".equals(engine)) {
                distributedEngine = this.getClickhouseDistributedTable(this.clickhouseRequest, database, table);
                createTableDDL = distributedEngine.getTableDDL();
            }
            ClickhouseTable clickhouseTable = new ClickhouseTable(database, table, distributedEngine, engine, createTableDDL, engineFull, dataPaths, sortingKey, this.getClickhouseTableSchema(this.clickhouseRequest, table));
            return clickhouseTable;
        }
        catch (ClickHouseException e) {
            throw new ClickhouseConnectorException((SeaTunnelErrorCode)SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot get clickhouse table", e);
        }
    }

    public String localizationEngine(String engine, String ddl) {
        if ("ReplicatedMergeTree".equalsIgnoreCase(engine)) {
            return ddl.replaceAll("ReplicatedMergeTree(\\([^\\)]*\\))", "MergeTree()");
        }
        return ddl;
    }

    public void close() {
        if (this.client != null) {
            this.client.close();
        }
        this.shardToDataSource.values().forEach(ClickHouseClient::close);
    }
}

