/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.s3;

import com.amazonaws.AmazonClientException;
import io.confluent.common.utils.SystemTime;
import io.confluent.common.utils.Time;
import io.confluent.connect.s3.S3SinkConnectorConfig;
import io.confluent.connect.s3.TopicPartitionWriter;
import io.confluent.connect.s3.format.KeyValueHeaderRecordWriterProvider;
import io.confluent.connect.s3.format.RecordViewSetter;
import io.confluent.connect.s3.format.RecordViews;
import io.confluent.connect.s3.storage.S3Storage;
import io.confluent.connect.s3.util.SchemaPartitioner;
import io.confluent.connect.s3.util.TombstoneSupportedPartitioner;
import io.confluent.connect.s3.util.Version;
import io.confluent.connect.storage.StorageFactory;
import io.confluent.connect.storage.format.Format;
import io.confluent.connect.storage.format.RecordWriterProvider;
import io.confluent.connect.storage.partitioner.Partitioner;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class S3SinkTask
extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(S3SinkTask.class);
    private S3SinkConnectorConfig connectorConfig;
    private String url;
    private long timeoutMs;
    private S3Storage storage;
    private final Map<TopicPartition, TopicPartitionWriter> topicPartitionWriters = new HashMap<TopicPartition, TopicPartitionWriter>();
    private Partitioner<?> partitioner;
    private Format<S3SinkConnectorConfig, String> format;
    private RecordWriterProvider<S3SinkConnectorConfig> writerProvider;
    private final Time time;
    private ErrantRecordReporter reporter;

    public S3SinkTask() {
        this.time = new SystemTime();
    }

    S3SinkTask(S3SinkConnectorConfig connectorConfig, SinkTaskContext context, S3Storage storage, Partitioner<?> partitioner, Format<S3SinkConnectorConfig, String> format, Time time) throws Exception {
        this.connectorConfig = connectorConfig;
        this.context = context;
        this.storage = storage;
        this.partitioner = partitioner;
        this.format = format;
        this.time = time;
        this.url = connectorConfig.getString("store.url");
        this.writerProvider = this.format.getRecordWriterProvider();
        this.open(context.assignment());
        log.info("Started S3 connector task with assigned partitions {}", this.topicPartitionWriters.keySet());
    }

    public void start(Map<String, String> props) {
        try {
            this.connectorConfig = new S3SinkConnectorConfig(props);
            this.url = this.connectorConfig.getString("store.url");
            this.timeoutMs = this.connectorConfig.getLong("retry.backoff.ms");
            Class storageClass = this.connectorConfig.getClass("storage.class");
            this.storage = (S3Storage)StorageFactory.createStorage((Class)storageClass, S3SinkConnectorConfig.class, (Object)((Object)this.connectorConfig), (String)this.url);
            if (!this.storage.bucketExists()) {
                throw new ConnectException("Non-existent S3 bucket: " + this.connectorConfig.getBucketName());
            }
            this.writerProvider = this.newRecordWriterProvider(this.connectorConfig);
            log.info("Created S3 sink record writer provider.");
            this.partitioner = this.newPartitioner(this.connectorConfig);
            log.info("Created S3 sink partitioner.");
            this.open(this.context.assignment());
            try {
                this.reporter = this.context.errantRecordReporter();
                if (this.reporter == null) {
                    log.info("Errant record reporter not configured.");
                }
            }
            catch (NoClassDefFoundError | NoSuchMethodError | UnsupportedOperationException e) {
                log.warn("Connect versions prior to Apache Kafka 2.6 do not support the errant record reporter", e);
            }
            log.info("Started S3 connector task with assigned partitions: {}", this.topicPartitionWriters.keySet());
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new ConnectException("Reflection exception: ", (Throwable)e);
        }
        catch (AmazonClientException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    public String version() {
        return Version.getVersion();
    }

    public void open(Collection<TopicPartition> partitions) {
        for (TopicPartition tp : partitions) {
            this.topicPartitionWriters.put(tp, this.newTopicPartitionWriter(tp));
        }
        log.info("Assigned topic partitions: {}", this.topicPartitionWriters.keySet());
    }

    private Format<S3SinkConnectorConfig, String> newFormat(String formatClassConfig) throws ClassNotFoundException, IllegalAccessException, InstantiationException, InvocationTargetException, NoSuchMethodException {
        Class formatClass = this.connectorConfig.getClass(formatClassConfig);
        return (Format)formatClass.getConstructor(S3Storage.class).newInstance(this.storage);
    }

    RecordWriterProvider<S3SinkConnectorConfig> newRecordWriterProvider(S3SinkConnectorConfig config) throws ClassNotFoundException, InvocationTargetException, InstantiationException, NoSuchMethodException, IllegalAccessException {
        RecordWriterProvider valueWriterProvider = this.newFormat("format.class").getRecordWriterProvider();
        RecordWriterProvider keyWriterProvider = null;
        if (config.getBoolean("store.kafka.keys").booleanValue()) {
            keyWriterProvider = this.newFormat("keys.format.class").getRecordWriterProvider();
            ((RecordViewSetter)keyWriterProvider).setRecordView(new RecordViews.KeyRecordView());
        }
        RecordWriterProvider headerWriterProvider = null;
        if (config.getBoolean("store.kafka.headers").booleanValue()) {
            headerWriterProvider = this.newFormat("headers.format.class").getRecordWriterProvider();
            ((RecordViewSetter)headerWriterProvider).setRecordView(new RecordViews.HeaderRecordView());
        }
        return new KeyValueHeaderRecordWriterProvider((RecordWriterProvider<S3SinkConnectorConfig>)valueWriterProvider, (RecordWriterProvider<S3SinkConnectorConfig>)keyWriterProvider, (RecordWriterProvider<S3SinkConnectorConfig>)headerWriterProvider);
    }

    private Partitioner<?> newPartitioner(S3SinkConnectorConfig config) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Class partitionerClass = config.getClass("partitioner.class");
        Object partitioner = (SchemaPartitioner)partitionerClass.newInstance();
        HashMap plainValues = new HashMap(config.plainValues());
        Map originals = config.originals();
        for (String originalKey : originals.keySet()) {
            if (plainValues.containsKey(originalKey)) continue;
            plainValues.put(originalKey, originals.get(originalKey));
        }
        if (config.getSchemaPartitionAffixType() != S3SinkConnectorConfig.AffixType.NONE) {
            partitioner = new SchemaPartitioner(partitioner);
        }
        if (config.isTombstoneWriteEnabled()) {
            String tomebstonePartition = config.getTombstoneEncodedPartition();
            partitioner = new TombstoneSupportedPartitioner(partitioner, tomebstonePartition);
        }
        partitioner.configure(plainValues);
        return partitioner;
    }

    public void put(Collection<SinkRecord> records) throws ConnectException {
        long putStartTime = this.time.milliseconds();
        for (SinkRecord record : records) {
            String topic = record.topic();
            int partition = record.kafkaPartition();
            TopicPartition tp = new TopicPartition(topic, partition);
            if (this.maybeSkipOnNullValue(record)) {
                if (this.reporter == null || !this.connectorConfig.reportNullRecordsToDlq()) continue;
                this.reporter.report(record, (Throwable)new DataException("Skipping null value record."));
                continue;
            }
            this.topicPartitionWriters.get(tp).buffer(record);
        }
        if (log.isDebugEnabled()) {
            log.debug("Read {} records from Kafka", (Object)records.size());
        }
        ArrayList<TopicPartition> shuffledList = new ArrayList<TopicPartition>(this.topicPartitionWriters.keySet());
        Collections.shuffle(shuffledList);
        for (TopicPartition tp : shuffledList) {
            TopicPartitionWriter writer = this.topicPartitionWriters.get(tp);
            try {
                writer.setWriteDeadline(putStartTime);
                writer.write();
                if (!log.isDebugEnabled()) continue;
                log.debug("TopicPartition: {}, SchemaCompatibility:{}, FileRotations: {}", new Object[]{tp.toString(), writer.getSchemaCompatibility(), writer.getFileRotationTracker().toString()});
            }
            catch (RetriableException e) {
                log.error("Exception on topic partition {}: ", (Object)tp, (Object)e);
                Long currentStartOffset = writer.currentStartOffset();
                if (currentStartOffset != null) {
                    this.context.offset(tp, currentStartOffset.longValue());
                }
                this.context.timeout(this.timeoutMs);
                writer = this.newTopicPartitionWriter(tp);
                writer.failureTime(this.time.milliseconds());
                this.topicPartitionWriters.put(tp, writer);
            }
        }
    }

    private boolean maybeSkipOnNullValue(SinkRecord record) {
        if (record.value() == null) {
            if (this.connectorConfig.nullValueBehavior().equalsIgnoreCase(S3SinkConnectorConfig.OutputWriteBehavior.IGNORE.toString())) {
                log.debug("Null valued record from topic '{}', partition {} and offset {} was skipped.", new Object[]{record.topic(), record.kafkaPartition(), record.kafkaOffset()});
                return true;
            }
            if (this.connectorConfig.nullValueBehavior().equalsIgnoreCase(S3SinkConnectorConfig.OutputWriteBehavior.WRITE.toString())) {
                log.debug("Null valued record from topic '{}', partition {} and offset {} was written in thepartition {}.", new Object[]{record.topic(), record.kafkaPartition(), record.kafkaOffset(), this.connectorConfig.getTombstoneEncodedPartition()});
                return false;
            }
            throw new ConnectException("Null valued records are not writeable with current behavior.on.null.values 'settings.");
        }
        return false;
    }

    public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        HashMap<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (TopicPartition tp : this.topicPartitionWriters.keySet()) {
            Long offset = this.topicPartitionWriters.get(tp).getOffsetToCommitAndReset();
            if (offset == null) continue;
            log.trace("Forwarding to framework request to commit offset: {} for {}", (Object)offset, (Object)tp);
            offsetsToCommit.put(tp, new OffsetAndMetadata(offset.longValue()));
        }
        return offsetsToCommit;
    }

    public void close(Collection<TopicPartition> partitions) {
        for (TopicPartition tp : this.topicPartitionWriters.keySet()) {
            try {
                this.topicPartitionWriters.get(tp).close();
            }
            catch (ConnectException e) {
                log.error("Error closing writer for {}. Error: {}", (Object)tp, (Object)e.getMessage());
            }
        }
        this.topicPartitionWriters.clear();
    }

    public void stop() {
        try {
            if (this.storage != null) {
                this.storage.close();
            }
        }
        catch (Exception e) {
            throw new ConnectException((Throwable)e);
        }
    }

    TopicPartitionWriter getTopicPartitionWriter(TopicPartition tp) {
        return this.topicPartitionWriters.get(tp);
    }

    private TopicPartitionWriter newTopicPartitionWriter(TopicPartition tp) {
        return new TopicPartitionWriter(tp, this.storage, this.writerProvider, this.partitioner, this.connectorConfig, this.context, this.time, this.reporter);
    }
}

