package org.apache.seatunnel.connectors.seatunnel.starrocks.client;

import com.starrocks.shade.org.apache.thrift.protocol.TMultiplexedProtocol;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.protocol.HTTP;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksDelimiterParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.class */
public class StarRocksStreamLoadVisitor {
    private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class);
    private final HttpHelper httpHelper = new HttpHelper();
    private static final int MAX_SLEEP_TIME = 5;
    private final SinkConfig sinkConfig;
    private long pos;
    private static final String RESULT_FAILED = "Fail";
    private static final String RESULT_SUCCESS = "Success";
    private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
    private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
    private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
    private static final String RESULT_LABEL_PREPARE = "PREPARE";
    private static final String RESULT_LABEL_ABORTED = "ABORTED";
    private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
    private List<String> fieldNames;

    public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, List<String> list) {
        this.sinkConfig = sinkConfig;
        this.fieldNames = list;
    }

    public Boolean doStreamLoad(StarRocksFlushTuple starRocksFlushTuple) throws IOException {
        String availableHost = getAvailableHost();
        if (null == availableHost) {
            throw new StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "None of the host in `load_url` could be connected.");
        }
        String str = availableHost + "/api/" + this.sinkConfig.getDatabase() + "/" + this.sinkConfig.getTable() + "/_stream_load";
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", Integer.valueOf(starRocksFlushTuple.getRows().size()), starRocksFlushTuple.getBytes(), starRocksFlushTuple.getLabel()));
        }
        Map<String, Object> doHttpPut = this.httpHelper.doHttpPut(str, joinRows(starRocksFlushTuple.getRows(), starRocksFlushTuple.getBytes().intValue()), getStreamLoadHttpHeader(starRocksFlushTuple.getLabel()));
        if (null == doHttpPut || !doHttpPut.containsKey("Status")) {
            LOG.error("unknown result status. {}", doHttpPut);
            throw new StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, "Unable to flush data to StarRocks: unknown result status. " + doHttpPut);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("StreamLoad response:\n" + JsonUtils.toJsonString(doHttpPut));
        }
        if (!RESULT_FAILED.equals(doHttpPut.get("Status"))) {
            if (RESULT_LABEL_EXISTED.equals(doHttpPut.get("Status"))) {
                LOG.debug("StreamLoad response:\n" + JsonUtils.toJsonString(doHttpPut));
                checkLabelState(availableHost, starRocksFlushTuple.getLabel());
            }
            return Boolean.valueOf(RESULT_SUCCESS.equals(doHttpPut.get("Status")));
        }
        StringBuilder sb = new StringBuilder("Failed to flush data to StarRocks \n");
        sb.append(this.sinkConfig.getDatabase()).append("/").append(this.sinkConfig.getTable()).append(StringUtils.LF);
        if (doHttpPut.containsKey("Message")) {
            sb.append(doHttpPut.get("Message"));
            sb.append('\n');
        }
        if (doHttpPut.containsKey("ErrorURL")) {
            LOG.error("StreamLoad response: {}", doHttpPut);
            try {
                sb.append(this.httpHelper.doHttpGet(doHttpPut.get("ErrorURL").toString()));
                sb.append('\n');
            } catch (IOException e) {
                LOG.warn("Get Error URL failed. {} ", doHttpPut.get("ErrorURL"), e);
            }
        } else {
            sb.append(JsonUtils.toJsonString(doHttpPut));
            sb.append('\n');
        }
        throw new StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, sb.toString());
    }

    private String getAvailableHost() {
        List<String> nodeUrls = this.sinkConfig.getNodeUrls();
        long size = this.pos + nodeUrls.size();
        while (this.pos < size) {
            String str = "http://" + nodeUrls.get((int) (this.pos % nodeUrls.size()));
            if (this.httpHelper.tryHttpConnection(str)) {
                return str;
            }
            this.pos++;
        }
        return null;
    }

    private byte[] joinRows(List<byte[]> list, int i) {
        if (SinkConfig.StreamLoadFormat.CSV.equals(this.sinkConfig.getLoadFormat())) {
            byte[] bytes = StarRocksDelimiterParser.parse((String) this.sinkConfig.getStreamLoadProps().get("row_delimiter"), StringUtils.LF).getBytes(StandardCharsets.UTF_8);
            ByteBuffer allocate = ByteBuffer.allocate(i + (list.size() * bytes.length));
            Iterator<byte[]> it = list.iterator();
            while (it.hasNext()) {
                allocate.put(it.next());
                allocate.put(bytes);
            }
            return allocate.array();
        }
        if (!SinkConfig.StreamLoadFormat.JSON.equals(this.sinkConfig.getLoadFormat())) {
            throw new StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, "Failed to join rows data, unsupported `format` from stream load properties:");
        }
        ByteBuffer allocate2 = ByteBuffer.allocate(i + (list.isEmpty() ? 2 : list.size() + 1));
        allocate2.put("[".getBytes(StandardCharsets.UTF_8));
        byte[] bytes2 = ",".getBytes(StandardCharsets.UTF_8);
        boolean z = true;
        for (byte[] bArr : list) {
            if (!z) {
                allocate2.put(bytes2);
            }
            allocate2.put(bArr);
            z = false;
        }
        allocate2.put("]".getBytes(StandardCharsets.UTF_8));
        return allocate2.array();
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:10:0x00c7. Please report as an issue. */
    private void checkLabelState(String str, String str2) throws IOException {
        int i = 0;
        while (true) {
            try {
                i++;
                TimeUnit.SECONDS.sleep(Math.min(i, 5));
                try {
                    Map<String, Object> doHttpGet = this.httpHelper.doHttpGet(str + "/api/" + this.sinkConfig.getDatabase() + "/get_load_state?label=" + str2, getLoadStateHttpHeader(str2));
                    if (doHttpGet == null) {
                        throw new StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, String.format("Failed to flush data to StarRocks, Error could not get the final state of label[%s].\n", str2), (Throwable) null);
                    }
                    String str3 = (String) doHttpGet.get("state");
                    if (null == str3) {
                        throw new StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, String.format("Failed to flush data to StarRocks, Error could not get the final state of label[%s]. response[%s]\n", str2, JsonUtils.toJsonString(doHttpGet)), (Throwable) null);
                    }
                    LOG.info(String.format("Checking label[%s] state[%s]\n", str2, str3));
                    boolean z = -1;
                    switch (str3.hashCode()) {
                        case -476794961:
                            if (str3.equals(RESULT_LABEL_ABORTED)) {
                                z = 3;
                                break;
                            }
                            break;
                        case 399612135:
                            if (str3.equals(RESULT_LABEL_PREPARE)) {
                                z = 2;
                                break;
                            }
                            break;
                        case 433141802:
                            if (str3.equals(RESULT_LABEL_UNKNOWN)) {
                                z = 4;
                                break;
                            }
                            break;
                        case 1184726098:
                            if (str3.equals(LAEBL_STATE_VISIBLE)) {
                                z = false;
                                break;
                            }
                            break;
                        case 1295451996:
                            if (str3.equals(LAEBL_STATE_COMMITTED)) {
                                z = true;
                                break;
                            }
                            break;
                    }
                    switch (z) {
                        case false:
                        case true:
                            return;
                        case true:
                        case true:
                            throw new StarRocksConnectorException((SeaTunnelErrorCode) StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, String.format("Failed to flush data to StarRocks, Error label[%s] state[%s]\n", str2, str3), true);
                        case true:
                        default:
                            throw new StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, String.format("Failed to flush data to StarRocks, Error label[%s] state[%s]\n", str2, str3));
                    }
                } catch (IOException e) {
                    throw new StarRocksConnectorException(StarRocksConnectorErrorCode.FLUSH_DATA_FAILED, e);
                }
            } catch (InterruptedException e2) {
                return;
            }
        }
    }

    private String getBasicAuthHeader(String str, String str2) {
        return "Basic " + new String(Base64.encodeBase64((str + TMultiplexedProtocol.SEPARATOR + str2).getBytes(StandardCharsets.UTF_8)));
    }

    private Map<String, String> getStreamLoadHttpHeader(String str) {
        HashMap hashMap = new HashMap();
        if (null != this.fieldNames && !this.fieldNames.isEmpty() && SinkConfig.StreamLoadFormat.CSV.equals(this.sinkConfig.getLoadFormat())) {
            hashMap.put("columns", String.join(",", (Iterable<? extends CharSequence>) this.fieldNames.stream().map(str2 -> {
                return String.format("`%s`", str2);
            }).collect(Collectors.toList())));
        }
        if (null != this.sinkConfig.getStreamLoadProps()) {
            for (Map.Entry<String, Object> entry : this.sinkConfig.getStreamLoadProps().entrySet()) {
                hashMap.put(entry.getKey(), String.valueOf(entry.getValue()));
            }
        }
        hashMap.put("strip_outer_array", "true");
        hashMap.put("Expect", HTTP.EXPECT_CONTINUE);
        hashMap.put("label", str);
        hashMap.put("Content-Type", URLEncodedUtils.CONTENT_TYPE);
        hashMap.put("format", this.sinkConfig.getLoadFormat().name().toUpperCase());
        hashMap.put("Authorization", getBasicAuthHeader(this.sinkConfig.getUsername(), this.sinkConfig.getPassword()));
        return hashMap;
    }

    private Map<String, String> getLoadStateHttpHeader(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("Authorization", getBasicAuthHeader(this.sinkConfig.getUsername(), this.sinkConfig.getPassword()));
        hashMap.put("Connection", "close");
        return hashMap;
    }
}
