package com.starrocks.connector.flink.manager;

import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionOptions;
import com.starrocks.connector.flink.connection.StarRocksJdbcConnectionProvider;
import com.starrocks.connector.flink.table.sink.StarRocksSinkOptions;
import com.starrocks.connector.flink.table.sink.StarRocksSinkSemantic;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.constraints.UniqueConstraint;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/manager/StarRocksSinkManager.class */
public class StarRocksSinkManager implements Serializable {
    private static final long serialVersionUID = 1;
    private final StarRocksJdbcConnectionProvider jdbcConnProvider;
    private final StarRocksQueryVisitor starrocksQueryVisitor;
    private StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
    private final StarRocksSinkOptions sinkOptions;
    private transient Counter totalFlushBytes;
    private transient Counter totalFlushRows;
    private transient Counter totalFlushTime;
    private transient Counter totalFlushTimeWithoutRetries;
    private transient Counter totalFlushSucceededTimes;
    private transient Counter totalFlushFailedTimes;
    private transient Histogram flushTimeNs;
    private transient Histogram offerTimeNs;
    private transient Counter totalFilteredRows;
    private transient Histogram commitAndPublishTimeMs;
    private transient Histogram streamLoadPutTimeMs;
    private transient Histogram readDataTimeMs;
    private transient Histogram writeDataTimeMs;
    private transient Histogram loadTimeMs;
    private static final String COUNTER_TOTAL_FLUSH_BYTES = "totalFlushBytes";
    private static final String COUNTER_TOTAL_FLUSH_ROWS = "totalFlushRows";
    private static final String COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES = "totalFlushTimeNsWithoutRetries";
    private static final String COUNTER_TOTAL_FLUSH_COST_TIME = "totalFlushTimeNs";
    private static final String COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES = "totalFlushSucceededTimes";
    private static final String COUNTER_TOTAL_FLUSH_FAILED_TIMES = "totalFlushFailedTimes";
    private static final String HISTOGRAM_FLUSH_TIME = "flushTimeNs";
    private static final String HISTOGRAM_OFFER_TIME_NS = "offerTimeNs";
    private static final String COUNTER_NUMBER_FILTERED_ROWS = "totalFilteredRows";
    private static final String HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS = "commitAndPublishTimeMs";
    private static final String HISTOGRAM_STREAM_LOAD_PUT_TIME_MS = "streamLoadPutTimeMs";
    private static final String HISTOGRAM_READ_DATA_TIME_MS = "readDataTimeMs";
    private static final String HISTOGRAM_WRITE_DATA_TIME_MS = "writeDataTimeMs";
    private static final String HISTOGRAM_LOAD_TIME_MS = "loadTimeMs";
    private volatile Throwable flushException;
    private ScheduledExecutorService scheduler;
    private ScheduledFuture<?> scheduledFuture;
    private transient long startTimeNano;
    private static final Logger LOG = LoggerFactory.getLogger(StarRocksSinkManager.class);
    private static final Map<String, List<LogicalTypeRoot>> typesMap = new HashMap();
    final LinkedBlockingDeque<StarRocksSinkBufferEntity> flushQueue = new LinkedBlockingDeque<>(1);
    private final Map<String, StarRocksSinkBufferEntity> bufferMap = new ConcurrentHashMap();
    private long FLUSH_QUEUE_POLL_TIMEOUT = 3000;
    private volatile boolean closed = false;
    private volatile boolean flushThreadAlive = false;

    public StarRocksSinkManager(StarRocksSinkOptions starRocksSinkOptions, TableSchema tableSchema) {
        this.sinkOptions = starRocksSinkOptions;
        this.jdbcConnProvider = new StarRocksJdbcConnectionProvider(new StarRocksJdbcConnectionOptions(starRocksSinkOptions.getJdbcUrl(), starRocksSinkOptions.getUsername(), starRocksSinkOptions.getPassword()));
        this.starrocksQueryVisitor = new StarRocksQueryVisitor(this.jdbcConnProvider, starRocksSinkOptions.getDatabaseName(), starRocksSinkOptions.getTableName());
        init(tableSchema);
    }

    public StarRocksSinkManager(StarRocksSinkOptions starRocksSinkOptions, TableSchema tableSchema, StarRocksJdbcConnectionProvider starRocksJdbcConnectionProvider, StarRocksQueryVisitor starRocksQueryVisitor) {
        this.sinkOptions = starRocksSinkOptions;
        this.jdbcConnProvider = starRocksJdbcConnectionProvider;
        this.starrocksQueryVisitor = starRocksQueryVisitor;
        init(tableSchema);
    }

    protected void init(TableSchema tableSchema) {
        validateTableStructure(tableSchema);
        String starRocksVersion = this.starrocksQueryVisitor.getStarRocksVersion();
        this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(this.sinkOptions, null == tableSchema ? new String[0] : tableSchema.getFieldNames(), starRocksVersion.length() > 0 && !starRocksVersion.trim().startsWith("1."));
    }

    public void setRuntimeContext(RuntimeContext runtimeContext) {
        this.startTimeNano = System.nanoTime();
        this.totalFlushBytes = runtimeContext.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_BYTES);
        this.totalFlushRows = runtimeContext.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_ROWS);
        this.totalFlushTime = runtimeContext.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME);
        this.totalFlushTimeWithoutRetries = runtimeContext.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_COST_TIME_WITHOUT_RETRIES);
        this.totalFlushSucceededTimes = runtimeContext.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_SUCCEEDED_TIMES);
        this.totalFlushFailedTimes = runtimeContext.getMetricGroup().counter(COUNTER_TOTAL_FLUSH_FAILED_TIMES);
        this.flushTimeNs = runtimeContext.getMetricGroup().histogram(HISTOGRAM_FLUSH_TIME, new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
        this.offerTimeNs = runtimeContext.getMetricGroup().histogram(HISTOGRAM_OFFER_TIME_NS, new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
        this.totalFilteredRows = runtimeContext.getMetricGroup().counter(COUNTER_NUMBER_FILTERED_ROWS);
        this.commitAndPublishTimeMs = runtimeContext.getMetricGroup().histogram(HISTOGRAM_COMMIT_AND_PUBLISH_TIME_MS, new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
        this.streamLoadPutTimeMs = runtimeContext.getMetricGroup().histogram(HISTOGRAM_STREAM_LOAD_PUT_TIME_MS, new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
        this.readDataTimeMs = runtimeContext.getMetricGroup().histogram(HISTOGRAM_READ_DATA_TIME_MS, new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
        this.writeDataTimeMs = runtimeContext.getMetricGroup().histogram(HISTOGRAM_WRITE_DATA_TIME_MS, new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
        this.loadTimeMs = runtimeContext.getMetricGroup().histogram(HISTOGRAM_LOAD_TIME_MS, new DescriptiveStatisticsHistogram(this.sinkOptions.getSinkHistogramWindowSize()));
    }

    public void startAsyncFlushing() {
        Thread thread = new Thread(() -> {
            while (true) {
                try {
                } catch (Exception e) {
                    this.flushException = e;
                }
                if (!asyncFlush()) {
                    LOG.info("StarRocks flush thread is about to exit.");
                    this.flushThreadAlive = false;
                    return;
                }
                continue;
            }
        });
        thread.setUncaughtExceptionHandler((thread2, th) -> {
            LOG.error("StarRocks flush thread uncaught exception occurred: " + th.getMessage(), th);
            this.flushException = th;
            this.flushThreadAlive = false;
        });
        thread.setName("starrocks-flush");
        thread.setDaemon(true);
        thread.start();
        this.flushThreadAlive = true;
    }

    public void startScheduler() throws IOException {
        if (StarRocksSinkSemantic.EXACTLY_ONCE.equals(this.sinkOptions.getSemantic())) {
            return;
        }
        stopScheduler();
        this.scheduler = Executors.newScheduledThreadPool(1, new ExecutorThreadFactory("starrocks-interval-sink"));
        this.scheduledFuture = this.scheduler.schedule(() -> {
            synchronized (this) {
                if (!this.closed) {
                    try {
                        LOG.info("StarRocks interval Sinking triggered.");
                        if (this.bufferMap.isEmpty()) {
                            startScheduler();
                        }
                        flush(null, false);
                    } catch (Exception e) {
                        this.flushException = e;
                    }
                }
            }
        }, this.sinkOptions.getSinkMaxFlushInterval(), TimeUnit.MILLISECONDS);
    }

    public void stopScheduler() {
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(false);
            this.scheduler.shutdown();
        }
    }

    public final synchronized void writeRecords(String str, String str2, String... strArr) throws IOException {
        checkFlushException();
        try {
            if (0 == strArr.length) {
                return;
            }
            String format = String.format("%s,%s", str, str2);
            StarRocksSinkBufferEntity computeIfAbsent = this.bufferMap.computeIfAbsent(format, str3 -> {
                return new StarRocksSinkBufferEntity(str, str2, this.sinkOptions.getLabelPrefix());
            });
            for (String str4 : strArr) {
                computeIfAbsent.addToBuffer(str4.getBytes(StandardCharsets.UTF_8));
            }
            if (StarRocksSinkSemantic.EXACTLY_ONCE.equals(this.sinkOptions.getSemantic())) {
                return;
            }
            if (computeIfAbsent.getBatchCount() >= this.sinkOptions.getSinkMaxRows() || computeIfAbsent.getBatchSize() >= this.sinkOptions.getSinkMaxBytes()) {
                LOG.info(String.format("StarRocks buffer Sinking triggered: db: [%s] table: [%s] rows[%d] label[%s].", str, str2, Integer.valueOf(computeIfAbsent.getBatchCount()), computeIfAbsent.getLabel()));
                flush(format, false);
            }
        } catch (Exception e) {
            throw new IOException("Writing records to StarRocks failed.", e);
        }
    }

    public synchronized void flush(String str, boolean z) throws Exception {
        if (this.bufferMap.isEmpty()) {
            flushInternal(null, z);
        } else {
            if (null != str) {
                flushInternal(str, z);
                return;
            }
            Iterator<String> it = this.bufferMap.keySet().iterator();
            while (it.hasNext()) {
                flushInternal(it.next(), z);
            }
        }
    }

    private synchronized void flushInternal(String str, boolean z) throws Exception {
        checkFlushException();
        if (null == str || this.bufferMap.isEmpty() || !this.bufferMap.containsKey(str)) {
            if (z) {
                waitAsyncFlushingDone();
            }
        } else {
            offer(this.bufferMap.get(str));
            this.bufferMap.remove(str);
            if (z) {
                waitAsyncFlushingDone();
            }
        }
    }

    public synchronized void close() {
        if (!this.closed) {
            this.closed = true;
            LOG.info("StarRocks Sink is about to close, loadMetrics: {}.", metricsToString());
            this.bufferMap.clear();
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            if (this.jdbcConnProvider != null) {
                this.jdbcConnProvider.close();
            }
            offerEOF();
        }
        checkFlushException();
    }

    public Map<String, StarRocksSinkBufferEntity> getBufferedBatchMap() {
        HashMap hashMap = new HashMap();
        hashMap.putAll(this.bufferMap);
        return hashMap;
    }

    public void setBufferedBatchMap(Map<String, StarRocksSinkBufferEntity> map) throws IOException {
        if (StarRocksSinkSemantic.EXACTLY_ONCE.equals(this.sinkOptions.getSemantic())) {
            this.bufferMap.clear();
            this.bufferMap.putAll(map);
        }
    }

    private boolean asyncFlush() throws Exception {
        StarRocksSinkBufferEntity poll = this.flushQueue.poll(this.FLUSH_QUEUE_POLL_TIMEOUT, TimeUnit.MILLISECONDS);
        if (poll == null) {
            return true;
        }
        if (0 == poll.getBatchCount() && !poll.EOF()) {
            return true;
        }
        if (poll.EOF()) {
            return false;
        }
        stopScheduler();
        LOG.info(String.format("Async stream load: db[%s] table[%s] rows[%d] bytes[%d] label[%s].", poll.getDatabase(), poll.getTable(), Integer.valueOf(poll.getBatchCount()), Long.valueOf(poll.getBatchSize()), poll.getLabel()));
        long nanoTime = System.nanoTime();
        for (int i = 0; i <= this.sinkOptions.getSinkMaxRetries(); i++) {
            try {
                long nanoTime2 = System.nanoTime();
                Map<String, Object> doStreamLoad = this.starrocksStreamLoadVisitor.doStreamLoad(poll);
                LOG.info(String.format("Async stream load finished: label[%s].", poll.getLabel()));
                if (null != this.totalFlushBytes) {
                    this.totalFlushBytes.inc(poll.getBatchSize());
                    this.totalFlushRows.inc(poll.getBatchCount());
                    this.totalFlushTime.inc(System.nanoTime() - nanoTime);
                    this.totalFlushTimeWithoutRetries.inc(System.nanoTime() - nanoTime2);
                    this.totalFlushSucceededTimes.inc();
                    this.flushTimeNs.update(System.nanoTime() - nanoTime2);
                    updateMetricsFromStreamLoadResult(doStreamLoad);
                }
                startScheduler();
                return true;
            } catch (Exception e) {
                if (this.totalFlushFailedTimes != null) {
                    this.totalFlushFailedTimes.inc();
                }
                LOG.warn("Failed to flush batch data to StarRocks, retry times = {}", Integer.valueOf(i), e);
                if (i >= this.sinkOptions.getSinkMaxRetries()) {
                    throw e;
                }
                if ((e instanceof StarRocksStreamLoadFailedException) && ((StarRocksStreamLoadFailedException) e).needReCreateLabel()) {
                    String label = poll.getLabel();
                    poll.reGenerateLabel();
                    LOG.warn(String.format("Batch label changed from [%s] to [%s]", label, poll.getLabel()));
                }
                try {
                    Thread.sleep(1000 * Math.min(i + 1, 10));
                } catch (InterruptedException e2) {
                    Thread.currentThread().interrupt();
                    throw new IOException("Unable to flush, interrupted while doing another attempt", e);
                }
            }
        }
        return true;
    }

    private void waitAsyncFlushingDone() throws InterruptedException {
        offer(new StarRocksSinkBufferEntity(null, null, null));
        offer(new StarRocksSinkBufferEntity(null, null, null));
        checkFlushException();
    }

    void offer(StarRocksSinkBufferEntity starRocksSinkBufferEntity) throws InterruptedException {
        if (!this.flushThreadAlive) {
            LOG.info(String.format("Flush thread already exit, ignore offer request for label[%s]", starRocksSinkBufferEntity.getLabel()));
            return;
        }
        long nanoTime = System.nanoTime();
        if (!this.flushQueue.offer(starRocksSinkBufferEntity, this.sinkOptions.getSinkOfferTimeout(), TimeUnit.MILLISECONDS)) {
            throw new RuntimeException("Timeout while offering data to flushQueue, exceed " + this.sinkOptions.getSinkOfferTimeout() + " ms, see " + StarRocksSinkOptions.SINK_BATCH_OFFER_TIMEOUT.key());
        }
        long nanoTime2 = System.nanoTime() - nanoTime;
        LOG.debug("Offer wait time {} nanos", Long.valueOf(nanoTime2));
        if (this.offerTimeNs != null) {
            this.offerTimeNs.update(nanoTime2);
        }
    }

    private void offerEOF() {
        try {
            offer(new StarRocksSinkBufferEntity(null, null, null).asEOF());
        } catch (Exception e) {
            LOG.warn("Writing EOF failed.", e);
        }
    }

    private void checkFlushException() {
        if (this.flushException != null) {
            StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
            for (int i = 0; i < stackTrace.length; i++) {
                LOG.info(stackTrace[i].getClassName() + "." + stackTrace[i].getMethodName() + " line:" + stackTrace[i].getLineNumber());
            }
            throw new RuntimeException("Writing records to StarRocks failed.", this.flushException);
        }
    }

    private void validateTableStructure(TableSchema tableSchema) {
        if (null == tableSchema) {
            return;
        }
        Optional primaryKey = tableSchema.getPrimaryKey();
        List<Map<String, Object>> tableColumnsMetaData = this.starrocksQueryVisitor.getTableColumnsMetaData();
        if (tableColumnsMetaData == null || tableColumnsMetaData.isEmpty()) {
            throw new IllegalArgumentException("Couldn't get the sink table's column info.");
        }
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < tableColumnsMetaData.size(); i++) {
            if ("PRI".equals(tableColumnsMetaData.get(i).get("COLUMN_KEY").toString())) {
                arrayList.add(tableColumnsMetaData.get(i).get("COLUMN_NAME").toString().toLowerCase());
            }
        }
        if (!arrayList.isEmpty()) {
            if (!primaryKey.isPresent()) {
                throw new IllegalArgumentException("Primary keys not defined in the sink `TableSchema`.");
            }
            if (((UniqueConstraint) primaryKey.get()).getColumns().size() != arrayList.size() || !((UniqueConstraint) primaryKey.get()).getColumns().stream().allMatch(str -> {
                return arrayList.contains(str.toLowerCase());
            })) {
                throw new IllegalArgumentException("Primary keys of the flink `TableSchema` do not match with the ones from starrocks table.");
            }
            this.sinkOptions.enableUpsertDelete();
        }
        if (this.sinkOptions.hasColumnMappingProperty()) {
            return;
        }
        if (tableSchema.getFieldCount() != tableColumnsMetaData.size()) {
            throw new IllegalArgumentException("Fields count of " + this.sinkOptions.getTableName() + " mismatch. \nflinkSchema[" + tableSchema.getFieldNames().length + "]:" + ((String) Arrays.asList(tableSchema.getFieldNames()).stream().collect(Collectors.joining(","))) + "\n realTab[" + tableColumnsMetaData.size() + "]:" + ((String) tableColumnsMetaData.stream().map(map -> {
                return String.valueOf(map.get("COLUMN_NAME"));
            }).collect(Collectors.joining(","))));
        }
        List tableColumns = tableSchema.getTableColumns();
        for (int i2 = 0; i2 < tableColumnsMetaData.size(); i2++) {
            String lowerCase = tableColumnsMetaData.get(i2).get("COLUMN_NAME").toString().toLowerCase();
            String lowerCase2 = tableColumnsMetaData.get(i2).get("DATA_TYPE").toString().toLowerCase();
            if (((List) tableColumns.stream().filter(tableColumn -> {
                return tableColumn.getName().toLowerCase().equals(lowerCase) && (!typesMap.containsKey(lowerCase2) || typesMap.get(lowerCase2).contains(tableColumn.getType().getLogicalType().getTypeRoot()));
            }).collect(Collectors.toList())).isEmpty()) {
                throw new IllegalArgumentException("Fields name or type mismatch for:" + lowerCase);
            }
        }
    }

    private void updateMetricsFromStreamLoadResult(Map<String, Object> map) {
        if (map != null) {
            updateHisto(map, "CommitAndPublishTimeMs", this.commitAndPublishTimeMs);
            updateHisto(map, "StreamLoadPutTimeMs", this.streamLoadPutTimeMs);
            updateHisto(map, "ReadDataTimeMs", this.readDataTimeMs);
            updateHisto(map, "WriteDataTimeMs", this.writeDataTimeMs);
            updateHisto(map, "LoadTimeMs", this.loadTimeMs);
            updateCounter(map, "NumberFilteredRows", this.totalFilteredRows);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}", metricsToString());
        }
    }

    private void updateCounter(Map<String, Object> map, String str, Counter counter) {
        Object obj;
        if (!map.containsKey(str) || (obj = map.get(str)) == null) {
            return;
        }
        try {
            counter.inc(Long.parseLong(obj.toString()));
        } catch (Exception e) {
            LOG.warn("Parse stream load result metric error", e);
        }
    }

    private void updateHisto(Map<String, Object> map, String str, Histogram histogram) {
        Object obj;
        if (!map.containsKey(str) || (obj = map.get(str)) == null) {
            return;
        }
        try {
            histogram.update(Long.parseLong(obj.toString()));
        } catch (Exception e) {
            LOG.warn("Parse stream load result metric error", e);
        }
    }

    private String metricsToString() {
        return "LoadMetrics{startTimeNano=" + this.startTimeNano + ", totalRunningTimeNano=" + (System.nanoTime() - this.startTimeNano) + ", numberOfSuccessLoad=" + this.totalFlushSucceededTimes.getCount() + ", totalSuccessLoadBytes=" + this.totalFlushBytes.getCount() + ", totalSuccessLoadRows=" + this.totalFlushRows.getCount() + ", totalSuccessLoadTimeNano=" + this.totalFlushTime.getCount() + '}';
    }

    static {
        typesMap.put("bigint", Arrays.asList(LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
        typesMap.put("largeint", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
        typesMap.put("char", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR));
        typesMap.put("date", Arrays.asList(LogicalTypeRoot.DATE, LogicalTypeRoot.VARCHAR));
        typesMap.put("datetime", Arrays.asList(LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE, LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE, LogicalTypeRoot.VARCHAR));
        typesMap.put("decimal", Arrays.asList(LogicalTypeRoot.DECIMAL, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.DOUBLE, LogicalTypeRoot.FLOAT));
        typesMap.put("double", Arrays.asList(LogicalTypeRoot.DOUBLE, LogicalTypeRoot.BIGINT, LogicalTypeRoot.INTEGER));
        typesMap.put("float", Arrays.asList(LogicalTypeRoot.FLOAT, LogicalTypeRoot.INTEGER));
        typesMap.put("int", Arrays.asList(LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
        typesMap.put("tinyint", Arrays.asList(LogicalTypeRoot.TINYINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY, LogicalTypeRoot.BOOLEAN));
        typesMap.put("smallint", Arrays.asList(LogicalTypeRoot.SMALLINT, LogicalTypeRoot.INTEGER, LogicalTypeRoot.BINARY));
        typesMap.put("varchar", Arrays.asList(LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
        typesMap.put("string", Arrays.asList(LogicalTypeRoot.CHAR, LogicalTypeRoot.VARCHAR, LogicalTypeRoot.ARRAY, LogicalTypeRoot.MAP, LogicalTypeRoot.ROW));
    }
}
