package org.apache.flume.sink.hive;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flume.Event;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.hive.hcatalog.streaming.HiveEndPoint;
import org.apache.hive.hcatalog.streaming.RecordWriter;
import org.apache.hive.hcatalog.streaming.SerializationError;
import org.apache.hive.hcatalog.streaming.StreamingConnection;
import org.apache.hive.hcatalog.streaming.StreamingException;
import org.apache.hive.hcatalog.streaming.StreamingIOFailure;
import org.apache.hive.hcatalog.streaming.TransactionBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flume/sink/hive/HiveWriter.class */
public class HiveWriter {
    private static final Logger LOG = LoggerFactory.getLogger(HiveWriter.class);
    private final HiveEndPoint endPoint;
    private HiveEventSerializer serializer;
    private final StreamingConnection connection;
    private final int txnsPerBatch;
    private final RecordWriter recordWriter;
    private TransactionBatch txnBatch;
    private final ExecutorService callTimeoutPool;
    private final long callTimeout;
    private long lastUsed;
    private SinkCounter sinkCounter;
    private int batchCounter;
    private long eventCounter;
    private long processSize;
    protected boolean closed;
    private boolean autoCreatePartitions;
    private boolean hearbeatNeeded = false;
    private final int writeBatchSz = 1000;
    private ArrayList<Event> batch = new ArrayList<>(1000);

    /* loaded from: input_file:org/apache/flume/sink/hive/HiveWriter$CallRunner.class */
    private interface CallRunner<T> {
        T call() throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/sink/hive/HiveWriter$CallRunner1.class */
    public interface CallRunner1<T> {
        T call() throws StreamingException, InterruptedException, Failure;
    }

    /* loaded from: input_file:org/apache/flume/sink/hive/HiveWriter$CommitException.class */
    public static class CommitException extends Failure {
        public CommitException(HiveEndPoint hiveEndPoint, Long l, Throwable th) {
            super("Commit of Txn " + l + " failed on EndPoint: " + hiveEndPoint, th);
        }
    }

    /* loaded from: input_file:org/apache/flume/sink/hive/HiveWriter$ConnectException.class */
    public static class ConnectException extends Failure {
        public ConnectException(HiveEndPoint hiveEndPoint, Throwable th) {
            super("Failed connecting to EndPoint " + hiveEndPoint, th);
        }
    }

    /* loaded from: input_file:org/apache/flume/sink/hive/HiveWriter$Failure.class */
    public static class Failure extends Exception {
        public Failure(String str, Throwable th) {
            super(str, th);
        }
    }

    /* loaded from: input_file:org/apache/flume/sink/hive/HiveWriter$TxnBatchException.class */
    public static class TxnBatchException extends Failure {
        public TxnBatchException(HiveEndPoint hiveEndPoint, Throwable th) {
            super("Failed acquiring Transaction Batch from EndPoint: " + hiveEndPoint, th);
        }
    }

    /* loaded from: input_file:org/apache/flume/sink/hive/HiveWriter$TxnFailure.class */
    private class TxnFailure extends Failure {
        public TxnFailure(TransactionBatch transactionBatch, Throwable th) {
            super("Failed switching to next Txn in TxnBatch " + transactionBatch, th);
        }
    }

    /* loaded from: input_file:org/apache/flume/sink/hive/HiveWriter$WriteException.class */
    public static class WriteException extends Failure {
        public WriteException(HiveEndPoint hiveEndPoint, Long l, Throwable th) {
            super("Failed writing to : " + hiveEndPoint + ". TxnID : " + l, th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public HiveWriter(HiveEndPoint hiveEndPoint, int i, boolean z, long j, ExecutorService executorService, String str, HiveEventSerializer hiveEventSerializer, SinkCounter sinkCounter) throws ConnectException, InterruptedException {
        try {
            this.autoCreatePartitions = z;
            this.sinkCounter = sinkCounter;
            this.callTimeout = j;
            this.callTimeoutPool = executorService;
            this.endPoint = hiveEndPoint;
            this.connection = newConnection(str);
            this.txnsPerBatch = i;
            this.serializer = hiveEventSerializer;
            this.recordWriter = hiveEventSerializer.createRecordWriter(hiveEndPoint);
            this.txnBatch = nextTxnBatch(this.recordWriter);
            this.txnBatch.beginNextTransaction();
            this.closed = false;
            this.lastUsed = System.currentTimeMillis();
        } catch (InterruptedException e) {
            throw e;
        } catch (RuntimeException e2) {
            throw e2;
        } catch (Exception e3) {
            throw new ConnectException(hiveEndPoint, e3);
        }
    }

    public String toString() {
        return this.endPoint.toString();
    }

    private void resetCounters() {
        this.eventCounter = 0L;
        this.processSize = 0L;
        this.batchCounter = 0;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setHearbeatNeeded() {
        this.hearbeatNeeded = true;
    }

    public int getRemainingTxns() {
        return this.txnBatch.remainingTransactions();
    }

    public synchronized void write(Event event) throws WriteException, InterruptedException {
        if (this.closed) {
            throw new IllegalStateException("Writer closed. Cannot write to : " + this.endPoint);
        }
        this.batch.add(event);
        if (this.batch.size() == 1000) {
            writeEventBatchToSerializer();
        }
        this.processSize += event.getBody().length;
        this.eventCounter++;
    }

    private void writeEventBatchToSerializer() throws InterruptedException, WriteException {
        try {
            timedCall(new CallRunner1<Void>() { // from class: org.apache.flume.sink.hive.HiveWriter.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flume.sink.hive.HiveWriter.CallRunner1
                public Void call() throws InterruptedException, StreamingException {
                    try {
                        Iterator it = HiveWriter.this.batch.iterator();
                        while (it.hasNext()) {
                            Event event = (Event) it.next();
                            try {
                                HiveWriter.this.serializer.write(HiveWriter.this.txnBatch, event);
                            } catch (SerializationError e) {
                                HiveWriter.LOG.info("Parse failed : {}  : {}", e.getMessage(), new String(event.getBody()));
                            }
                        }
                        return null;
                    } catch (IOException e2) {
                        throw new StreamingIOFailure(e2.getMessage(), e2);
                    }
                }
            });
            this.batch.clear();
        } catch (StreamingException e) {
            throw new WriteException(this.endPoint, this.txnBatch.getCurrentTxnId(), e);
        } catch (TimeoutException e2) {
            throw new WriteException(this.endPoint, this.txnBatch.getCurrentTxnId(), e2);
        }
    }

    public void flush(boolean z) throws CommitException, TxnBatchException, TxnFailure, InterruptedException, WriteException {
        if (!this.batch.isEmpty()) {
            writeEventBatchToSerializer();
            this.batch.clear();
        }
        if (this.hearbeatNeeded) {
            this.hearbeatNeeded = false;
            heartBeat();
        }
        this.lastUsed = System.currentTimeMillis();
        try {
            commitTxn();
            if (this.txnBatch.remainingTransactions() == 0) {
                closeTxnBatch();
                this.txnBatch = null;
                if (z) {
                    this.txnBatch = nextTxnBatch(this.recordWriter);
                }
            }
            if (z) {
                LOG.debug("Switching to next Txn for {}", this.endPoint);
                this.txnBatch.beginNextTransaction();
            }
        } catch (StreamingException e) {
            throw new TxnFailure(this.txnBatch, e);
        }
    }

    public void abort() throws InterruptedException {
        this.batch.clear();
        abortTxn();
    }

    public void heartBeat() throws InterruptedException {
        try {
            timedCall(new CallRunner1<Void>() { // from class: org.apache.flume.sink.hive.HiveWriter.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flume.sink.hive.HiveWriter.CallRunner1
                public Void call() throws StreamingException {
                    HiveWriter.LOG.info("Sending heartbeat on batch " + HiveWriter.this.txnBatch);
                    HiveWriter.this.txnBatch.heartbeat();
                    return null;
                }
            });
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e2) {
            LOG.warn("Unable to send heartbeat on Txn Batch " + this.txnBatch, e2);
        }
    }

    public void close() throws InterruptedException {
        this.batch.clear();
        abortRemainingTxns();
        closeTxnBatch();
        closeConnection();
        this.closed = true;
    }

    private void abortRemainingTxns() throws InterruptedException {
        try {
            if (!isClosed(this.txnBatch.getCurrentTransactionState())) {
                abortCurrTxnHelper();
            }
            if (this.txnBatch.remainingTransactions() > 0) {
                timedCall(new CallRunner1<Void>() { // from class: org.apache.flume.sink.hive.HiveWriter.3
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.flume.sink.hive.HiveWriter.CallRunner1
                    public Void call() throws StreamingException, InterruptedException {
                        HiveWriter.this.txnBatch.beginNextTransaction();
                        return null;
                    }
                });
                abortRemainingTxns();
            }
        } catch (TimeoutException e) {
            LOG.warn("Timed out when aborting remaining transactions in batch " + this.txnBatch, e);
        } catch (StreamingException e2) {
            LOG.warn("Error when aborting remaining transactions in batch " + this.txnBatch, e2);
        }
    }

    private void abortCurrTxnHelper() throws TimeoutException, InterruptedException {
        try {
            timedCall(new CallRunner1<Void>() { // from class: org.apache.flume.sink.hive.HiveWriter.4
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flume.sink.hive.HiveWriter.CallRunner1
                public Void call() throws StreamingException, InterruptedException {
                    HiveWriter.this.txnBatch.abort();
                    HiveWriter.LOG.info("Aborted txn " + HiveWriter.this.txnBatch.getCurrentTxnId());
                    return null;
                }
            });
        } catch (StreamingException e) {
            LOG.warn("Unable to abort transaction " + this.txnBatch.getCurrentTxnId(), e);
        }
    }

    private boolean isClosed(TransactionBatch.TxnState txnState) {
        return txnState == TransactionBatch.TxnState.COMMITTED || txnState == TransactionBatch.TxnState.ABORTED;
    }

    public void closeConnection() throws InterruptedException {
        LOG.info("Closing connection to EndPoint : {}", this.endPoint);
        try {
            timedCall(new CallRunner1<Void>() { // from class: org.apache.flume.sink.hive.HiveWriter.5
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flume.sink.hive.HiveWriter.CallRunner1
                public Void call() {
                    HiveWriter.this.connection.close();
                    return null;
                }
            });
            this.sinkCounter.incrementConnectionClosedCount();
        } catch (Exception e) {
            LOG.warn("Error closing connection to EndPoint : " + this.endPoint, e);
        }
    }

    private void commitTxn() throws CommitException, InterruptedException {
        if (LOG.isInfoEnabled()) {
            LOG.info("Committing Txn " + this.txnBatch.getCurrentTxnId() + " on EndPoint: " + this.endPoint);
        }
        try {
            timedCall(new CallRunner1<Void>() { // from class: org.apache.flume.sink.hive.HiveWriter.6
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flume.sink.hive.HiveWriter.CallRunner1
                public Void call() throws StreamingException, InterruptedException {
                    HiveWriter.this.txnBatch.commit();
                    return null;
                }
            });
        } catch (Exception e) {
            throw new CommitException(this.endPoint, this.txnBatch.getCurrentTxnId(), e);
        }
    }

    private void abortTxn() throws InterruptedException {
        LOG.info("Aborting Txn id {} on End Point {}", this.txnBatch.getCurrentTxnId(), this.endPoint);
        try {
            timedCall(new CallRunner1<Void>() { // from class: org.apache.flume.sink.hive.HiveWriter.7
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flume.sink.hive.HiveWriter.CallRunner1
                public Void call() throws StreamingException, InterruptedException {
                    HiveWriter.this.txnBatch.abort();
                    return null;
                }
            });
        } catch (InterruptedException e) {
            throw e;
        } catch (TimeoutException e2) {
            LOG.warn("Timeout while aborting Txn " + this.txnBatch.getCurrentTxnId() + " on EndPoint: " + this.endPoint, e2);
        } catch (Exception e3) {
            LOG.warn("Error aborting Txn " + this.txnBatch.getCurrentTxnId() + " on EndPoint: " + this.endPoint, e3);
        }
    }

    private StreamingConnection newConnection(String str) throws InterruptedException, ConnectException {
        try {
            return (StreamingConnection) timedCall(new CallRunner1<StreamingConnection>() { // from class: org.apache.flume.sink.hive.HiveWriter.8
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flume.sink.hive.HiveWriter.CallRunner1
                public StreamingConnection call() throws InterruptedException, StreamingException {
                    return HiveWriter.this.endPoint.newConnection(HiveWriter.this.autoCreatePartitions);
                }
            });
        } catch (Exception e) {
            throw new ConnectException(this.endPoint, e);
        }
    }

    private TransactionBatch nextTxnBatch(final RecordWriter recordWriter) throws InterruptedException, TxnBatchException {
        LOG.debug("Fetching new Txn Batch for {}", this.endPoint);
        try {
            TransactionBatch transactionBatch = (TransactionBatch) timedCall(new CallRunner1<TransactionBatch>() { // from class: org.apache.flume.sink.hive.HiveWriter.9
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flume.sink.hive.HiveWriter.CallRunner1
                public TransactionBatch call() throws InterruptedException, StreamingException {
                    return HiveWriter.this.connection.fetchTransactionBatch(HiveWriter.this.txnsPerBatch, recordWriter);
                }
            });
            LOG.info("Acquired Transaction batch {}", transactionBatch);
            return transactionBatch;
        } catch (Exception e) {
            throw new TxnBatchException(this.endPoint, e);
        }
    }

    private void closeTxnBatch() throws InterruptedException {
        try {
            LOG.info("Closing Txn Batch {}.", this.txnBatch);
            timedCall(new CallRunner1<Void>() { // from class: org.apache.flume.sink.hive.HiveWriter.10
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.apache.flume.sink.hive.HiveWriter.CallRunner1
                public Void call() throws InterruptedException, StreamingException {
                    HiveWriter.this.txnBatch.close();
                    return null;
                }
            });
        } catch (InterruptedException e) {
            throw e;
        } catch (Exception e2) {
            LOG.warn("Error closing Txn Batch " + this.txnBatch, e2);
        }
    }

    private <T> T timedCall(final CallRunner1<T> callRunner1) throws TimeoutException, InterruptedException, StreamingException {
        Future<T> submit = this.callTimeoutPool.submit(new Callable<T>() { // from class: org.apache.flume.sink.hive.HiveWriter.11
            @Override // java.util.concurrent.Callable
            public T call() throws StreamingException, InterruptedException, Failure {
                return (T) callRunner1.call();
            }
        });
        try {
            return this.callTimeout > 0 ? submit.get(this.callTimeout, TimeUnit.MILLISECONDS) : submit.get();
        } catch (ExecutionException e) {
            this.sinkCounter.incrementConnectionFailedCount();
            StreamingException cause = e.getCause();
            if (cause instanceof IOException) {
                throw new StreamingException("I/O Failure", (IOException) cause);
            }
            if (cause instanceof StreamingException) {
                throw cause;
            }
            if (cause instanceof TimeoutException) {
                throw new StreamingException("Operation Timed Out.", (TimeoutException) cause);
            }
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            if (cause instanceof InterruptedException) {
                throw ((InterruptedException) cause);
            }
            throw new StreamingException(e.getMessage(), e);
        } catch (TimeoutException e2) {
            submit.cancel(true);
            this.sinkCounter.incrementConnectionFailedCount();
            throw e2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getLastUsed() {
        return this.lastUsed;
    }
}
