package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;

import com.clickhouse.client.ClickHouseValues;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.CKFileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSinkState;
import org.apache.sshd.common.util.OsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.class */
public class ClickhouseFileSinkWriter implements SinkWriter<SeaTunnelRow, CKFileCommitInfo, ClickhouseSinkState> {
    private static final Logger log = LoggerFactory.getLogger(ClickhouseFileSinkWriter.class);
    private static final String CK_LOCAL_CONFIG_TEMPLATE = "<yandex><path> %s </path> <users><default><password/> <profile>default</profile> <quota>default</quota><access_management>1</access_management></default></users><profiles><default/></profiles><quotas><default/></quotas></yandex>";
    private static final String CLICKHOUSE_SETTINGS_KEY = "SETTINGS";
    private static final String CLICKHOUSE_DDL_SETTING_FILTER = "storage_policy";
    private static final String CLICKHOUSE_LOCAL_FILE_SUFFIX = "/local_data.log";
    private static final int UUID_LENGTH = 10;
    private final FileReaderOption readerOption;
    private final ShardRouter shardRouter;
    private final ClickhouseProxy proxy;
    private final ClickhouseTable clickhouseTable;
    private final Map<Shard, List<String>> shardLocalDataPaths;
    private final SinkWriter.Context context;
    private final ThreadLocalRandom threadLocalRandom = ThreadLocalRandom.current();
    private final Map<Shard, FileChannel> rowCache = new HashMap(16);
    private final Map<Shard, String> shardTempFile = new HashMap();

    public ClickhouseFileSinkWriter(FileReaderOption fileReaderOption, SinkWriter.Context context) {
        this.readerOption = fileReaderOption;
        this.context = context;
        this.proxy = new ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode());
        this.shardRouter = new ShardRouter(this.proxy, this.readerOption.getShardMetadata());
        this.clickhouseTable = this.proxy.getClickhouseTable(this.readerOption.getShardMetadata().getDatabase(), this.readerOption.getShardMetadata().getTable());
        nodePasswordCheck();
        this.shardLocalDataPaths = (Map) this.shardRouter.getShards().values().stream().collect(Collectors.toMap(Function.identity(), shard -> {
            return this.proxy.getClickhouseTable(shard.getNode().getDatabase().get(), this.clickhouseTable.getLocalTableName()).getDataPaths();
        }));
    }

    public void write(SeaTunnelRow seaTunnelRow) throws IOException {
        Shard shard = this.shardRouter.getShard(seaTunnelRow);
        saveDataToFile(this.rowCache.computeIfAbsent(shard, shard2 -> {
            try {
                String format = String.format("%s/%s", this.readerOption.getFileTempPath(), UUID.randomUUID().toString().substring(0, 10).replaceAll("-", "_"));
                FileUtils.forceMkdir(new File(format));
                String str = format + CLICKHOUSE_LOCAL_FILE_SUFFIX;
                this.shardTempFile.put(shard, str);
                return FileChannel.open(Paths.get(str, new String[0]), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
            } catch (IOException e) {
                throw new ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "can't create new file to save tmp data", e);
            }
        }), seaTunnelRow);
    }

    private void nodePasswordCheck() {
        if (this.readerOption.isNodeFreePass()) {
            return;
        }
        this.shardRouter.getShards().values().forEach(shard -> {
            if (!this.readerOption.getNodePassword().containsKey(shard.getNode().getAddress().getHostName()) && !this.readerOption.getNodePassword().containsKey(shard.getNode().getHost())) {
                throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.PASSWORD_NOT_FOUND_IN_SHARD_NODE, "Cannot find password of shard " + shard.getNode().getAddress().getHostName());
            }
        });
    }

    public Optional<CKFileCommitInfo> prepareCommit() throws IOException {
        Iterator<FileChannel> it = this.rowCache.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        HashMap hashMap = new HashMap();
        this.shardTempFile.forEach((shard, str) -> {
            List<String> list = null;
            try {
                try {
                    list = generateClickhouseLocalFiles(str);
                    moveClickhouseLocalFileToServer(shard, list);
                    hashMap.put(shard, list);
                    if (list == null || list.isEmpty()) {
                        return;
                    }
                    clearLocalFileDirectory(list);
                } catch (Exception e) {
                    throw new ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data into clickhouse file error", e);
                }
            } catch (Throwable th) {
                if (list != null && !list.isEmpty()) {
                    clearLocalFileDirectory(list);
                }
                throw th;
            }
        });
        this.rowCache.clear();
        this.shardTempFile.clear();
        return Optional.of(new CKFileCommitInfo(hashMap));
    }

    public void abortPrepare() {
    }

    public void close() throws IOException {
        Iterator<FileChannel> it = this.rowCache.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    private void saveDataToFile(FileChannel fileChannel, SeaTunnelRow seaTunnelRow) throws IOException {
        fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(), r0.getBytes(StandardCharsets.UTF_8).length).put((((String) this.readerOption.getFields().stream().map(str -> {
            return seaTunnelRow.getField(this.readerOption.getSeaTunnelRowType().indexOf(str)).toString();
        }).collect(Collectors.joining(this.readerOption.getFileFieldsDelimiter()))) + StringUtils.LF).getBytes(StandardCharsets.UTF_8));
    }

    /* JADX WARN: Finally extract failed */
    private List<String> generateClickhouseLocalFiles(String str) throws IOException, InterruptedException {
        String[] split = str.split("/");
        String str2 = split[split.length - 2];
        List list = (List) Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(StringUtils.SPACE)).collect(Collectors.toList());
        String substring = str.substring(0, str.length() - CLICKHOUSE_LOCAL_FILE_SUFFIX.length());
        ArrayList arrayList = new ArrayList(list);
        if (list.size() == 1) {
            arrayList.add("local");
        }
        arrayList.add("--file");
        arrayList.add(str);
        arrayList.add("--format_csv_delimiter");
        arrayList.add("\"" + this.readerOption.getFileFieldsDelimiter() + "\"");
        arrayList.add("-S");
        arrayList.add("\"" + ((String) this.readerOption.getFields().stream().map(str3 -> {
            return str3 + StringUtils.SPACE + this.readerOption.getTableSchema().get(str3);
        }).collect(Collectors.joining(","))) + "\"");
        arrayList.add("-N");
        arrayList.add("\"temp_table" + str2 + "\"");
        arrayList.add("-q");
        arrayList.add(String.format("\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"", adjustClickhouseDDL(), this.clickhouseTable.getLocalTableName(), this.readerOption.getTableSchema().keySet().stream().map(str4 -> {
            return this.readerOption.getFields().contains(str4) ? str4 : ClickHouseValues.NULL_EXPR;
        }).collect(Collectors.joining(",")), str2));
        if (this.readerOption.isCompatibleMode()) {
            String format = String.format("%s/%s/config.xml", this.readerOption.getFileTempPath(), str2);
            try {
                FileWriter fileWriter = new FileWriter(format);
                Throwable th = null;
                try {
                    try {
                        fileWriter.write(String.format(CK_LOCAL_CONFIG_TEMPLATE, substring));
                        if (fileWriter != null) {
                            if (0 != 0) {
                                try {
                                    fileWriter.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                fileWriter.close();
                            }
                        }
                        arrayList.add("--config-file");
                        arrayList.add("\"" + format + "\"");
                    } finally {
                    }
                } finally {
                }
            } catch (IOException e) {
                throw new ClickhouseConnectorException((SeaTunnelErrorCode) CommonErrorCode.FILE_OPERATION_FAILED, "Error occurs when create ck local config");
            }
        } else {
            arrayList.add("--path");
            arrayList.add("\"" + substring + "\"");
        }
        log.info("Generate clickhouse local file command: {}", String.join(StringUtils.SPACE, arrayList));
        Process start = new ProcessBuilder("bash", "-c", String.join(StringUtils.SPACE, arrayList)).start();
        InputStream inputStream = start.getInputStream();
        Throwable th3 = null;
        try {
            InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
            Throwable th4 = null;
            try {
                BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
                Throwable th5 = null;
                while (true) {
                    try {
                        try {
                            String readLine = bufferedReader.readLine();
                            if (readLine == null) {
                                break;
                            }
                            log.info(readLine);
                        } finally {
                        }
                    } finally {
                    }
                }
                if (bufferedReader != null) {
                    if (0 != 0) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th6) {
                            th5.addSuppressed(th6);
                        }
                    } else {
                        bufferedReader.close();
                    }
                }
                if (inputStreamReader != null) {
                    if (0 != 0) {
                        try {
                            inputStreamReader.close();
                        } catch (Throwable th7) {
                            th4.addSuppressed(th7);
                        }
                    } else {
                        inputStreamReader.close();
                    }
                }
                InputStream errorStream = start.getErrorStream();
                Throwable th8 = null;
                try {
                    InputStreamReader inputStreamReader2 = new InputStreamReader(errorStream);
                    Throwable th9 = null;
                    try {
                        bufferedReader = new BufferedReader(inputStreamReader2);
                        Throwable th10 = null;
                        while (true) {
                            try {
                                try {
                                    String readLine2 = bufferedReader.readLine();
                                    if (readLine2 == null) {
                                        break;
                                    }
                                    log.error(readLine2);
                                } finally {
                                }
                            } finally {
                            }
                        }
                        if (bufferedReader != null) {
                            if (0 != 0) {
                                try {
                                    bufferedReader.close();
                                } catch (Throwable th11) {
                                    th10.addSuppressed(th11);
                                }
                            } else {
                                bufferedReader.close();
                            }
                        }
                        if (inputStreamReader2 != null) {
                            if (0 != 0) {
                                try {
                                    inputStreamReader2.close();
                                } catch (Throwable th12) {
                                    th9.addSuppressed(th12);
                                }
                            } else {
                                inputStreamReader2.close();
                            }
                        }
                        start.waitFor();
                        File file = new File(substring + "/data/_local/" + this.clickhouseTable.getLocalTableName());
                        if (!file.exists()) {
                            throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.FILE_NOT_EXISTS, "clickhouse local file not exists");
                        }
                        File[] listFiles = file.listFiles();
                        if (listFiles == null) {
                            throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.FILE_NOT_EXISTS, "clickhouse local file not exists");
                        }
                        return (List) Arrays.stream(listFiles).filter((v0) -> {
                            return v0.isDirectory();
                        }).filter(file2 -> {
                            return !"detached".equals(file2.getName());
                        }).map(file3 -> {
                            File file3 = new File(file3.getParent() + "/" + file3.getName() + "_" + this.context.getIndexOfSubtask());
                            if (file3.renameTo(file3)) {
                                return file3;
                            }
                            log.warn("rename file failed, will continue move file, but maybe cause file conflict");
                            return file3;
                        }).map((v0) -> {
                            return v0.getAbsolutePath();
                        }).collect(Collectors.toList());
                    } catch (Throwable th13) {
                        if (inputStreamReader2 != null) {
                            if (0 != 0) {
                                try {
                                    inputStreamReader2.close();
                                } catch (Throwable th14) {
                                    th9.addSuppressed(th14);
                                }
                            } else {
                                inputStreamReader2.close();
                            }
                        }
                        throw th13;
                    }
                } finally {
                    if (errorStream != null) {
                        if (0 != 0) {
                            try {
                                errorStream.close();
                            } catch (Throwable th15) {
                                th8.addSuppressed(th15);
                            }
                        } else {
                            errorStream.close();
                        }
                    }
                }
            } catch (Throwable th16) {
                if (inputStreamReader != null) {
                    if (0 != 0) {
                        try {
                            inputStreamReader.close();
                        } catch (Throwable th17) {
                            th4.addSuppressed(th17);
                        }
                    } else {
                        inputStreamReader.close();
                    }
                }
                throw th16;
            }
        } finally {
            if (inputStream != null) {
                if (0 != 0) {
                    try {
                        inputStream.close();
                    } catch (Throwable th18) {
                        th3.addSuppressed(th18);
                    }
                } else {
                    inputStream.close();
                }
            }
        }
    }

    private void moveClickhouseLocalFileToServer(Shard shard, List<String> list) {
        String host = shard.getNode().getHost();
        FileTransfer createFileTransfer = FileTransferFactory.createFileTransfer(this.readerOption.getCopyMethod(), host, this.readerOption.getNodeUser().getOrDefault(host, OsUtils.ROOT_USER), this.readerOption.getNodePassword().getOrDefault(host, null));
        createFileTransfer.init();
        createFileTransfer.transferAndChown(list, this.shardLocalDataPaths.get(shard).get(this.threadLocalRandom.nextInt(this.shardLocalDataPaths.get(shard).size())) + "detached/");
        createFileTransfer.close();
    }

    private void clearLocalFileDirectory(List<String> list) {
        String substring = list.get(0).substring(0, this.readerOption.getFileTempPath().length() + 10 + 1);
        try {
            File file = new File(substring);
            if (file.exists()) {
                FileUtils.deleteDirectory(file);
            }
        } catch (IOException e) {
            throw new ClickhouseConnectorException(ClickhouseConnectorErrorCode.DELETE_DIRECTORY_FIELD, "Unable to delete directory " + substring, e);
        }
    }

    private String adjustClickhouseDDL() {
        String replaceAll = this.clickhouseTable.getCreateTableDDL().replace(this.clickhouseTable.getDatabase() + ".", "").replaceAll("`", "");
        if (replaceAll.contains(CLICKHOUSE_SETTINGS_KEY)) {
            List list = (List) Arrays.stream(CLICKHOUSE_DDL_SETTING_FILTER.split(",")).collect(Collectors.toList());
            int indexOf = replaceAll.indexOf(CLICKHOUSE_SETTINGS_KEY);
            replaceAll = replaceAll.substring(0, indexOf) + CLICKHOUSE_SETTINGS_KEY + ((String) Arrays.stream(replaceAll.substring(indexOf + CLICKHOUSE_SETTINGS_KEY.length()).split(",")).filter(str -> {
                return !list.contains(str.split("=")[0].trim());
            }).collect(Collectors.joining(",")));
        }
        return replaceAll;
    }
}
