package ru.ivi.opensource.flinkclickhousesink.applied;

import com.google.common.collect.Lists;
import io.netty.handler.codec.http.HttpHeaderNames;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Request;
import org.asynchttpclient.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseRequestBlank;
import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkCommonParams;
import ru.ivi.opensource.flinkclickhousesink.util.FutureUtil;
import ru.ivi.opensource.flinkclickhousesink.util.ThreadUtil;

/* loaded from: input_file:ru/ivi/opensource/flinkclickhousesink/applied/ClickHouseWriter.class */
public class ClickHouseWriter implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(ClickHouseWriter.class);
    private ExecutorService service;
    private ExecutorService callbackService;
    private List<WriterTask> tasks;
    private final BlockingQueue<ClickHouseRequestBlank> commonQueue;
    private final AtomicLong unprocessedRequestsCounter;
    private final AsyncHttpClient asyncHttpClient;
    private final List<CompletableFuture<Boolean>> futures;
    private final ClickHouseSinkCommonParams sinkParams;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:ru/ivi/opensource/flinkclickhousesink/applied/ClickHouseWriter$WriterTask.class */
    public static class WriterTask implements Runnable {
        private static final Logger logger = LoggerFactory.getLogger(WriterTask.class);
        private static final int HTTP_OK = 200;
        private final BlockingQueue<ClickHouseRequestBlank> queue;
        private final AtomicLong queueCounter;
        private final ClickHouseSinkCommonParams sinkSettings;
        private final AsyncHttpClient asyncHttpClient;
        private final ExecutorService callbackService;
        private final List<CompletableFuture<Boolean>> futures;
        private final int id;
        private volatile boolean isWorking;

        WriterTask(int i, AsyncHttpClient asyncHttpClient, BlockingQueue<ClickHouseRequestBlank> blockingQueue, ClickHouseSinkCommonParams clickHouseSinkCommonParams, ExecutorService executorService, List<CompletableFuture<Boolean>> list, AtomicLong atomicLong) {
            this.id = i;
            this.sinkSettings = clickHouseSinkCommonParams;
            this.queue = blockingQueue;
            this.callbackService = executorService;
            this.asyncHttpClient = asyncHttpClient;
            this.futures = list;
            this.queueCounter = atomicLong;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                try {
                    this.isWorking = true;
                    logger.info("Start writer task, id = {}", Integer.valueOf(this.id));
                    while (true) {
                        if (!this.isWorking && this.queue.size() <= 0) {
                            logger.info("Task id = {} is finished", Integer.valueOf(this.id));
                            return;
                        } else {
                            ClickHouseRequestBlank poll = this.queue.poll(300L, TimeUnit.MILLISECONDS);
                            if (poll != null) {
                                CompletableFuture<Boolean> completableFuture = new CompletableFuture<>();
                                this.futures.add(completableFuture);
                                send(poll, completableFuture);
                            }
                        }
                    }
                } catch (Exception e) {
                    logger.error("Error while inserting data", e);
                    throw new RuntimeException(e);
                }
            } catch (Throwable th) {
                logger.info("Task id = {} is finished", Integer.valueOf(this.id));
                throw th;
            }
        }

        private void send(ClickHouseRequestBlank clickHouseRequestBlank, CompletableFuture<Boolean> completableFuture) {
            Request buildRequest = buildRequest(clickHouseRequestBlank);
            logger.info("Ready to load data to {}, size = {}", clickHouseRequestBlank.getTargetTable(), Integer.valueOf(clickHouseRequestBlank.getValues().size()));
            ListenableFuture<Response> executeRequest = this.asyncHttpClient.executeRequest(buildRequest);
            executeRequest.addListener(responseCallback(executeRequest, clickHouseRequestBlank, completableFuture), this.callbackService);
        }

        private Request buildRequest(ClickHouseRequestBlank clickHouseRequestBlank) {
            BoundRequestBuilder body = this.asyncHttpClient.preparePost(this.sinkSettings.getClickHouseClusterSettings().getRandomHostUrl()).setHeader(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=utf-8").setBody(String.format("INSERT INTO %s VALUES %s", clickHouseRequestBlank.getTargetTable(), String.join(" , ", clickHouseRequestBlank.getValues())));
            if (this.sinkSettings.getClickHouseClusterSettings().isAuthorizationRequired()) {
                body.setHeader(HttpHeaderNames.AUTHORIZATION, "Basic " + this.sinkSettings.getClickHouseClusterSettings().getCredentials());
            }
            return body.build();
        }

        private Runnable responseCallback(ListenableFuture<Response> listenableFuture, ClickHouseRequestBlank clickHouseRequestBlank, CompletableFuture<Boolean> completableFuture) {
            return () -> {
                try {
                    try {
                        Response response = (Response) listenableFuture.get();
                        if (response.getStatusCode() != HTTP_OK) {
                            handleUnsuccessfulResponse(response, clickHouseRequestBlank, completableFuture);
                        } else {
                            logger.info("Successful send data to ClickHouse, batch size = {}, target table = {}, current attempt = {}", new Object[]{Integer.valueOf(clickHouseRequestBlank.getValues().size()), clickHouseRequestBlank.getTargetTable(), Integer.valueOf(clickHouseRequestBlank.getAttemptCounter())});
                            completableFuture.complete(true);
                        }
                        this.queueCounter.decrementAndGet();
                    } catch (Exception e) {
                        logger.error("Error while executing callback, params = {}", this.sinkSettings, e);
                        clickHouseRequestBlank.setException(e);
                        try {
                            handleUnsuccessfulResponse(null, clickHouseRequestBlank, completableFuture);
                        } catch (Exception e2) {
                            logger.error("Error while handle unsuccessful response", e2);
                            completableFuture.completeExceptionally(e2);
                        }
                        this.queueCounter.decrementAndGet();
                    }
                } catch (Throwable th) {
                    this.queueCounter.decrementAndGet();
                    throw th;
                }
            };
        }

        private void handleUnsuccessfulResponse(Response response, ClickHouseRequestBlank clickHouseRequestBlank, CompletableFuture<Boolean> completableFuture) throws Exception {
            if (clickHouseRequestBlank.getAttemptCounter() < this.sinkSettings.getMaxRetries()) {
                clickHouseRequestBlank.incrementCounter();
                logger.warn("Next attempt to send data to ClickHouse, table = {}, buffer size = {}, current attempt num = {}, max attempt num = {}, response = {}", new Object[]{clickHouseRequestBlank.getTargetTable(), Integer.valueOf(clickHouseRequestBlank.getValues().size()), Integer.valueOf(clickHouseRequestBlank.getAttemptCounter()), Integer.valueOf(this.sinkSettings.getMaxRetries()), response});
                this.queueCounter.incrementAndGet();
                this.queue.put(clickHouseRequestBlank);
                completableFuture.complete(false);
                return;
            }
            logger.warn("Failed to send data to ClickHouse, cause: limit of attempts is exceeded. ClickHouse response = {}. Ready to flush data on disk.", response, clickHouseRequestBlank.getException());
            logFailedRecords(clickHouseRequestBlank);
            Object[] objArr = new Object[2];
            objArr[0] = response != null ? response.getResponseBody() : null;
            objArr[1] = clickHouseRequestBlank.getException();
            completableFuture.completeExceptionally(new RuntimeException(String.format("Failed to send data to ClickHouse, cause: limit of attempts is exceeded. ClickHouse response: %s. Cause: %s", objArr)));
        }

        private void logFailedRecords(ClickHouseRequestBlank clickHouseRequestBlank) throws Exception {
            String format = String.format("%s/%s_%s", this.sinkSettings.getFailedRecordsPath(), clickHouseRequestBlank.getTargetTable(), Long.valueOf(System.currentTimeMillis()));
            PrintWriter printWriter = new PrintWriter(format);
            Throwable th = null;
            try {
                try {
                    List<String> values = clickHouseRequestBlank.getValues();
                    printWriter.getClass();
                    values.forEach(printWriter::println);
                    printWriter.flush();
                    if (printWriter != null) {
                        if (0 != 0) {
                            try {
                                printWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            printWriter.close();
                        }
                    }
                    logger.info("Successful send data on disk, path = {}, size = {} ", format, Integer.valueOf(clickHouseRequestBlank.getValues().size()));
                } finally {
                }
            } catch (Throwable th3) {
                if (printWriter != null) {
                    if (th != null) {
                        try {
                            printWriter.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        printWriter.close();
                    }
                }
                throw th3;
            }
        }

        void setStopWorking() {
            this.isWorking = false;
        }
    }

    public ClickHouseWriter(ClickHouseSinkCommonParams clickHouseSinkCommonParams, List<CompletableFuture<Boolean>> list) {
        this(clickHouseSinkCommonParams, list, Dsl.asyncHttpClient());
    }

    public ClickHouseWriter(ClickHouseSinkCommonParams clickHouseSinkCommonParams, List<CompletableFuture<Boolean>> list, AsyncHttpClient asyncHttpClient) {
        this.unprocessedRequestsCounter = new AtomicLong();
        this.sinkParams = clickHouseSinkCommonParams;
        this.futures = list;
        this.commonQueue = new LinkedBlockingQueue(clickHouseSinkCommonParams.getQueueMaxCapacity());
        this.asyncHttpClient = asyncHttpClient;
        initDirAndExecutors();
    }

    private void initDirAndExecutors() {
        try {
            initDir(this.sinkParams.getFailedRecordsPath());
            buildComponents();
        } catch (Exception e) {
            logger.error("Error while starting CH writer", e);
            throw new RuntimeException(e);
        }
    }

    private static void initDir(String str) throws IOException {
        Files.createDirectories(Paths.get(str, new String[0]), new FileAttribute[0]);
    }

    private void buildComponents() {
        logger.info("Building components");
        this.service = Executors.newFixedThreadPool(this.sinkParams.getNumWriters(), ThreadUtil.threadFactory("clickhouse-writer"));
        this.callbackService = Executors.newCachedThreadPool(ThreadUtil.threadFactory("clickhouse-writer-callback-executor"));
        int numWriters = this.sinkParams.getNumWriters();
        this.tasks = Lists.newArrayListWithCapacity(numWriters);
        for (int i = 0; i < numWriters; i++) {
            WriterTask writerTask = new WriterTask(i, this.asyncHttpClient, this.commonQueue, this.sinkParams, this.callbackService, this.futures, this.unprocessedRequestsCounter);
            this.tasks.add(writerTask);
            this.service.submit(writerTask);
        }
    }

    public void put(ClickHouseRequestBlank clickHouseRequestBlank) {
        try {
            this.unprocessedRequestsCounter.incrementAndGet();
            this.commonQueue.put(clickHouseRequestBlank);
        } catch (InterruptedException e) {
            logger.error("Interrupted error while putting data to queue", e);
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
    }

    private void waitUntilAllFuturesDone() {
        logger.info("Wait until all futures are done or completed exceptionally. Futures size: {}", Integer.valueOf(this.futures.size()));
        while (true) {
            try {
                if (this.unprocessedRequestsCounter.get() <= 0 && this.futures.isEmpty()) {
                    return;
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Futures size: {}.", Integer.valueOf(this.futures.size()));
                }
                try {
                    FutureUtil.allOf(this.futures).get();
                    this.futures.removeIf(completableFuture -> {
                        return completableFuture.isDone() && !completableFuture.isCompletedExceptionally();
                    });
                    if (logger.isDebugEnabled()) {
                        logger.debug("Futures size after removing: {}", Integer.valueOf(this.futures.size()));
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } finally {
                stopWriters();
                this.futures.clear();
            }
        }
    }

    private void stopWriters() {
        logger.info("Stopping writers.");
        if (this.tasks != null && this.tasks.size() > 0) {
            this.tasks.forEach((v0) -> {
                v0.setStopWorking();
            });
        }
        logger.info("Writers stopped.");
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        logger.info("ClickHouseWriter is shutting down.");
        try {
            waitUntilAllFuturesDone();
        } finally {
            ThreadUtil.shutdownExecutorService(this.service);
            ThreadUtil.shutdownExecutorService(this.callbackService);
            this.asyncHttpClient.close();
            logger.info("{} shutdown complete.", ClickHouseWriter.class.getSimpleName());
        }
    }
}
