/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.s3;

import com.amazonaws.SdkClientException;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import io.confluent.common.utils.SystemTime;
import io.confluent.common.utils.Time;
import io.confluent.connect.s3.S3SinkConnectorConfig;
import io.confluent.connect.s3.errors.FileExistsException;
import io.confluent.connect.s3.storage.S3Storage;
import io.confluent.connect.s3.util.FileRotationTracker;
import io.confluent.connect.s3.util.RetryUtil;
import io.confluent.connect.s3.util.TombstoneTimestampExtractor;
import io.confluent.connect.storage.common.util.StringUtils;
import io.confluent.connect.storage.errors.PartitionException;
import io.confluent.connect.storage.format.RecordWriter;
import io.confluent.connect.storage.format.RecordWriterProvider;
import io.confluent.connect.storage.partitioner.Partitioner;
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import io.confluent.connect.storage.partitioner.TimestampExtractor;
import io.confluent.connect.storage.schema.SchemaCompatibilityResult;
import io.confluent.connect.storage.schema.StorageSchemaCompatibility;
import io.confluent.connect.storage.util.DateTimeUtils;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import org.apache.avro.SchemaParseException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.IllegalWorkerStateException;
import org.apache.kafka.connect.errors.SchemaProjectorException;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.apache.parquet.schema.InvalidSchemaException;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicPartitionWriter {
    private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class);
    private final Map<String, String> commitFiles;
    private final Map<String, RecordWriter> writers;
    private final Map<String, Schema> currentSchemas;
    private final TopicPartition tp;
    private final S3Storage storage;
    private final Partitioner<?> partitioner;
    private TimestampExtractor timestampExtractor;
    private String topicsDir;
    private State state;
    private final Queue<SinkRecord> buffer;
    private final SinkTaskContext context;
    private final boolean isTaggingEnabled;
    private final List<String> extraTagKeyValuePair;
    private HashMap<String, String> hashMapTag;
    private final boolean ignoreTaggingErrors;
    private int recordCount;
    private final int flushSize;
    private final long rotateIntervalMs;
    private final long rotateScheduleIntervalMs;
    private long nextScheduledRotation;
    private long currentOffset;
    private Long currentTimestamp;
    private String currentEncodedPartition;
    private Long baseRecordTimestamp;
    private Long offsetToCommit;
    private final RecordWriterProvider<S3SinkConnectorConfig> writerProvider;
    Map<Long, String> offsetToFilenameMap;
    private final Map<String, Long> startOffsets;
    private final Map<String, Long> endOffsets;
    private final Map<String, Long> recordCounts;
    private long timeoutMs;
    private long failureTime;
    private final StorageSchemaCompatibility compatibility;
    private final String extension;
    private final String zeroPadOffsetFormat;
    private final String dirDelim;
    private final String fileDelim;
    private final Time time;
    private DateTimeZone timeZone;
    private final S3SinkConnectorConfig connectorConfig;
    private static final Time SYSTEM_TIME = new SystemTime();
    private ErrantRecordReporter reporter;
    private final long maxWriteDurationMs;
    private long writeDeadline;
    boolean isPaused = false;
    private final FileRotationTracker fileRotationTracker;

    public TopicPartitionWriter(TopicPartition tp, S3Storage storage, RecordWriterProvider<S3SinkConnectorConfig> writerProvider, Partitioner<?> partitioner, S3SinkConnectorConfig connectorConfig, SinkTaskContext context, ErrantRecordReporter reporter) {
        this(tp, storage, writerProvider, partitioner, connectorConfig, context, SYSTEM_TIME, reporter);
    }

    TopicPartitionWriter(TopicPartition tp, S3Storage storage, RecordWriterProvider<S3SinkConnectorConfig> writerProvider, Partitioner<?> partitioner, S3SinkConnectorConfig connectorConfig, SinkTaskContext context, Time time, ErrantRecordReporter reporter) {
        this.connectorConfig = connectorConfig;
        this.time = time;
        this.tp = tp;
        this.storage = storage;
        this.context = context;
        this.writerProvider = writerProvider;
        this.partitioner = partitioner;
        this.reporter = reporter;
        this.timestampExtractor = null;
        if (partitioner instanceof TimeBasedPartitioner) {
            this.timestampExtractor = ((TimeBasedPartitioner)partitioner).getTimestampExtractor();
            if (connectorConfig.isTombstoneWriteEnabled()) {
                this.timestampExtractor = new TombstoneTimestampExtractor(this.timestampExtractor);
            }
        }
        this.isTaggingEnabled = connectorConfig.getBoolean("s3.object.tagging");
        this.extraTagKeyValuePair = connectorConfig.getList("s3.object.tagging.key.value.pairs");
        this.getS3Tag();
        this.ignoreTaggingErrors = connectorConfig.getString("s3.object.behavior.on.tagging.error").equalsIgnoreCase(S3SinkConnectorConfig.IgnoreOrFailBehavior.IGNORE.toString());
        this.flushSize = connectorConfig.getInt("flush.size");
        this.topicsDir = connectorConfig.getString("topics.dir");
        this.rotateIntervalMs = connectorConfig.getLong("rotate.interval.ms");
        if (this.rotateIntervalMs > 0L && this.timestampExtractor == null) {
            log.warn("Property '{}' is set to '{}ms' but partitioner is not an instance of '{}'. This property is ignored.", new Object[]{"rotate.interval.ms", this.rotateIntervalMs, TimeBasedPartitioner.class.getName()});
        }
        this.rotateScheduleIntervalMs = connectorConfig.getLong("rotate.schedule.interval.ms");
        if (this.rotateScheduleIntervalMs > 0L) {
            this.timeZone = DateTimeZone.forID((String)connectorConfig.getString("timezone"));
        }
        this.timeoutMs = connectorConfig.getLong("retry.backoff.ms");
        this.compatibility = StorageSchemaCompatibility.getCompatibility((String)connectorConfig.getString("schema.compatibility"));
        this.buffer = new LinkedList<SinkRecord>();
        this.commitFiles = new LinkedHashMap<String, String>();
        this.writers = new HashMap<String, RecordWriter>();
        this.currentSchemas = new HashMap<String, Schema>();
        this.startOffsets = new HashMap<String, Long>();
        this.endOffsets = new HashMap<String, Long>();
        this.recordCounts = new HashMap<String, Long>();
        this.state = State.WRITE_STARTED;
        this.failureTime = -1L;
        this.currentOffset = -1L;
        this.dirDelim = connectorConfig.getString("directory.delim");
        this.fileDelim = connectorConfig.getString("file.delim");
        this.extension = writerProvider.getExtension();
        this.zeroPadOffsetFormat = "%0" + connectorConfig.getInt("filename.offset.zero.pad.width") + "d";
        this.fileRotationTracker = new FileRotationTracker();
        this.maxWriteDurationMs = connectorConfig.getLong("max.write.duration.ms");
        this.writeDeadline = Long.MAX_VALUE;
        this.offsetToFilenameMap = new HashMap<Long, String>();
        this.setNextScheduledRotation();
    }

    private void getS3Tag() {
        this.hashMapTag = new HashMap();
        if (this.extraTagKeyValuePair.size() != 0) {
            for (int i = 0; i < this.extraTagKeyValuePair.size(); ++i) {
                String[] singleKv = this.extraTagKeyValuePair.get(i).split(":");
                this.hashMapTag.put(singleKv[0], singleKv[1]);
            }
        }
    }

    public void write() {
        long now = this.time.milliseconds();
        if (this.failureTime > 0L && now - this.failureTime < this.timeoutMs) {
            return;
        }
        this.failureTime = -1L;
        this.resetExpiredScheduledRotationIfNoPendingRecords(now);
        while (!this.buffer.isEmpty() && !this.isWriteDeadlineExceeded()) {
            try {
                this.executeState(now);
            }
            catch (IllegalWorkerStateException e) {
                throw new ConnectException((Throwable)e);
            }
            catch (SchemaProjectorException e) {
                if (this.reporter != null) {
                    this.reporter.report(this.buffer.poll(), (Throwable)e);
                    log.warn("Errant record written to DLQ due to: {}", (Object)e.getMessage());
                    continue;
                }
                throw e;
            }
        }
        if (!this.isWriteDeadlineExceeded()) {
            this.commitOnTimeIfNoData(now);
        }
        this.pauseOrResumeOnBuffer();
    }

    private void pauseOrResumeOnBuffer() {
        if (this.buffer.size() >= Math.max(this.flushSize, 1)) {
            this.pause();
        } else if (this.isPaused) {
            this.resume();
        }
    }

    public void setWriteDeadline(long currentTimeMs) {
        this.writeDeadline = currentTimeMs + this.maxWriteDurationMs;
        if (this.writeDeadline < 0L) {
            this.writeDeadline = Long.MAX_VALUE;
        }
    }

    protected boolean isWriteDeadlineExceeded() {
        boolean isWriteDeadlineExceeded;
        boolean bl = isWriteDeadlineExceeded = this.time.milliseconds() > this.writeDeadline;
        if (isWriteDeadlineExceeded) {
            log.info("Deadline exceeded");
        }
        return isWriteDeadlineExceeded;
    }

    private void executeState(long now) {
        switch (this.state) {
            case WRITE_STARTED: {
                this.pause();
                this.nextState();
            }
            case WRITE_PARTITION_PAUSED: {
                boolean shouldRotateForNullSchema;
                String encodedPartition;
                SinkRecord record = this.buffer.peek();
                if (this.timestampExtractor != null) {
                    this.currentTimestamp = this.timestampExtractor.extract((ConnectRecord)record, now);
                    if (this.baseRecordTimestamp == null) {
                        this.baseRecordTimestamp = this.currentTimestamp;
                    }
                }
                Schema valueSchema = record.valueSchema();
                try {
                    encodedPartition = this.partitioner.encodePartition(record, now);
                }
                catch (PartitionException e) {
                    if (this.reporter != null) {
                        this.reporter.report(record, (Throwable)e);
                        this.buffer.poll();
                        break;
                    }
                    throw e;
                }
                if (this.offsetToFilenameMap.size() < this.connectorConfig.getInt("max.files.scan.limit")) {
                    this.offsetToFilenameMap.put(record.kafkaOffset(), this.getCommitFilename(record));
                }
                Schema currentValueSchema = this.currentSchemas.get(encodedPartition);
                boolean bl = this.currentSchemas.containsKey(encodedPartition) && currentValueSchema == null ^ valueSchema == null ? true : (shouldRotateForNullSchema = false);
                if (currentValueSchema == null || valueSchema == null) {
                    this.currentSchemas.put(encodedPartition, valueSchema);
                    currentValueSchema = valueSchema;
                }
                if (!this.checkRotationOrAppend(record, currentValueSchema, valueSchema, encodedPartition, now, shouldRotateForNullSchema)) break;
            }
            case SHOULD_ROTATE: {
                if (this.isWriteDeadlineExceeded()) break;
                this.commitFiles();
                this.nextState();
            }
            case FILE_COMMITTED: {
                this.setState(State.WRITE_PARTITION_PAUSED);
                break;
            }
            default: {
                log.error("{} is not a valid state to write record for topic partition {}.", (Object)this.state, (Object)this.tp);
            }
        }
    }

    private boolean checkRotationOrAppend(SinkRecord record, Schema currentValueSchema, Schema valueSchema, String encodedPartition, long now, boolean shouldRotateForNullSchema) {
        if (shouldRotateForNullSchema) {
            this.fileRotationTracker.incrementRotationByNullSchemaCount(encodedPartition);
            this.nextState();
            return true;
        }
        if (this.rotateOnTime(encodedPartition, this.currentTimestamp, now)) {
            this.setNextScheduledRotation();
            this.nextState();
            return true;
        }
        SchemaCompatibilityResult shouldChangeSchema = this.compatibility.shouldChangeSchema((ConnectRecord)record, null, currentValueSchema);
        if (shouldChangeSchema.isInCompatible() && this.recordCount > 0) {
            this.fileRotationTracker.incrementRotationBySchemaChangeCount(encodedPartition, shouldChangeSchema.getSchemaIncompatibilityType());
            log.trace("Incompatible change of schema detected for record '{}' with encoded partition '{}' and current offset: '{}'", new Object[]{record, encodedPartition, this.currentOffset});
            this.currentSchemas.put(encodedPartition, valueSchema);
            this.nextState();
            return true;
        }
        SinkRecord projectedRecord = this.compatibility.project(record, null, currentValueSchema);
        boolean validRecord = this.writeRecord(projectedRecord, encodedPartition, record);
        this.buffer.poll();
        if (!validRecord) {
            return false;
        }
        if (this.rotateOnSize()) {
            this.fileRotationTracker.incrementRotationByFlushSizeCount(encodedPartition);
            log.info("Starting commit and rotation for topic partition {} with start offset {}", (Object)this.tp, this.startOffsets);
            this.nextState();
            return true;
        }
        return false;
    }

    private void commitOnTimeIfNoData(long now) {
        if (this.buffer.isEmpty()) {
            if (this.recordCount > 0 && this.rotateOnTime(this.currentEncodedPartition, this.currentTimestamp, now)) {
                log.info("Committing files after waiting for rotateIntervalMs time but less than flush.size records available.");
                this.setNextScheduledRotation();
                this.commitFiles();
            }
            this.resume();
            this.setState(State.WRITE_STARTED);
        }
    }

    private void resetExpiredScheduledRotationIfNoPendingRecords(long now) {
        if (this.recordCount == 0 && this.shouldApplyScheduledRotation(now)) {
            this.setNextScheduledRotation();
        }
    }

    public void close() throws ConnectException {
        log.debug("Closing TopicPartitionWriter {}", (Object)this.tp);
        for (RecordWriter writer : this.writers.values()) {
            writer.close();
        }
        this.writers.clear();
        this.startOffsets.clear();
    }

    public void buffer(SinkRecord sinkRecord) {
        this.buffer.add(sinkRecord);
    }

    public Long getOffsetToCommitAndReset() {
        Long latest = this.offsetToCommit;
        this.offsetToCommit = null;
        return latest;
    }

    public Long currentStartOffset() {
        return this.minStartOffset();
    }

    public void failureTime(long when) {
        this.failureTime = when;
    }

    public FileRotationTracker getFileRotationTracker() {
        return this.fileRotationTracker;
    }

    public StorageSchemaCompatibility getSchemaCompatibility() {
        return this.compatibility;
    }

    private Long minStartOffset() {
        Optional<Long> minStartOffset = this.startOffsets.values().stream().min(Comparator.comparing(Long::valueOf));
        return minStartOffset.isPresent() ? minStartOffset.get() : null;
    }

    private String getDirectoryPrefix(String encodedPartition) {
        return this.partitioner.generatePartitionedPath(this.tp.topic(), encodedPartition);
    }

    private void nextState() {
        this.state = this.state.next();
    }

    private void setState(State state) {
        this.state = state;
    }

    private boolean rotateOnPartitionChange(String encodedPartition) {
        return this.connectorConfig.shouldRotateOnPartitionChange() && !encodedPartition.equals(this.currentEncodedPartition);
    }

    private boolean rotateOnTime(String encodedPartition, Long recordTimestamp, long now) {
        if (this.recordCount <= 0) {
            return false;
        }
        boolean periodicRotation = this.rotateIntervalMs > 0L && this.timestampExtractor != null && (recordTimestamp - this.baseRecordTimestamp >= this.rotateIntervalMs || this.rotateOnPartitionChange(encodedPartition));
        log.trace("Checking rotation on time for topic-partition '{}' with recordCount '{}' and encodedPartition '{}'", new Object[]{this.tp, this.recordCount, encodedPartition});
        log.trace("Should apply periodic time-based rotation for topic-partition '{}': (rotateIntervalMs: '{}', baseRecordTimestamp: '{}', timestamp: '{}', encodedPartition: '{}', currentEncodedPartition: '{}')? {}", new Object[]{this.tp, this.rotateIntervalMs, this.baseRecordTimestamp, recordTimestamp, encodedPartition, this.currentEncodedPartition, periodicRotation});
        if (periodicRotation) {
            this.fileRotationTracker.incrementRotationByRotationIntervalCount(encodedPartition);
        } else if (this.shouldApplyScheduledRotation(now)) {
            this.fileRotationTracker.incrementRotationByScheduledRotationIntervalCount(encodedPartition);
        }
        return periodicRotation || this.shouldApplyScheduledRotation(now);
    }

    private boolean shouldApplyScheduledRotation(long now) {
        boolean scheduledRotation = this.rotateScheduleIntervalMs > 0L && now >= this.nextScheduledRotation;
        log.trace("Should apply scheduled rotation for topic-partition '{}': (rotateScheduleIntervalMs: '{}', nextScheduledRotation: '{}', now: '{}')? {}", new Object[]{this.tp, this.rotateScheduleIntervalMs, this.nextScheduledRotation, now, scheduledRotation});
        return scheduledRotation;
    }

    private void setNextScheduledRotation() {
        if (this.rotateScheduleIntervalMs > 0L) {
            long now = this.time.milliseconds();
            this.nextScheduledRotation = DateTimeUtils.getNextTimeAdjustedByDay((long)now, (long)this.rotateScheduleIntervalMs, (DateTimeZone)this.timeZone);
            if (log.isDebugEnabled()) {
                log.debug("Update scheduled rotation timer for topic-partition '{}': (rotateScheduleIntervalMs: '{}', nextScheduledRotation: '{}', now: '{}'). Next rotation will be at {}", new Object[]{this.tp, this.rotateScheduleIntervalMs, this.nextScheduledRotation, now, new DateTime(this.nextScheduledRotation).withZone(this.timeZone).toString()});
            }
        }
    }

    private boolean rotateOnSize() {
        boolean messageSizeRotation = this.recordCount >= this.flushSize;
        log.trace("Should apply size-based rotation for topic-partition '{}': (count {} >= flush size {})? {}", new Object[]{this.tp, this.recordCount, this.flushSize, messageSizeRotation});
        return messageSizeRotation;
    }

    private void pause() {
        log.trace("Pausing writer for topic-partition '{}'", (Object)this.tp);
        this.context.pause(new TopicPartition[]{this.tp});
        this.isPaused = true;
    }

    private void resume() {
        log.trace("Resuming writer for topic-partition '{}'", (Object)this.tp);
        this.context.resume(new TopicPartition[]{this.tp});
        this.isPaused = false;
    }

    private RecordWriter newWriter(SinkRecord record, String encodedPartition) throws ConnectException {
        String commitFilename = this.getCommitFilename(encodedPartition);
        log.debug("Creating new writer encodedPartition='{}' filename='{}'", (Object)encodedPartition, (Object)commitFilename);
        RecordWriter writer = this.writerProvider.getRecordWriter((Object)this.connectorConfig, commitFilename);
        this.writers.put(encodedPartition, writer);
        return writer;
    }

    private String getCommitFilename(String encodedPartition) {
        String commitFile;
        if (this.commitFiles.containsKey(encodedPartition)) {
            commitFile = this.commitFiles.get(encodedPartition);
        } else {
            long startOffset = this.startOffsets.get(encodedPartition);
            String prefix = this.getDirectoryPrefix(encodedPartition);
            commitFile = this.fileKeyToCommit(prefix, startOffset);
            this.commitFiles.put(encodedPartition, commitFile);
        }
        return commitFile;
    }

    String getCommitFilename(SinkRecord sinkRecord) {
        String prefix = this.getDirectoryPrefix(this.partitioner.encodePartition(sinkRecord));
        return this.fileKeyToCommit(prefix, sinkRecord.kafkaOffset());
    }

    private String fileKey(String topicsPrefix, String keyPrefix, String name) {
        String suffix = keyPrefix + this.dirDelim + name;
        return StringUtils.isNotBlank((String)topicsPrefix) ? topicsPrefix + this.dirDelim + suffix : suffix;
    }

    private String fileKeyToCommit(String dirPrefix, long startOffset) {
        String name = this.tp.topic() + this.fileDelim + this.tp.partition() + this.fileDelim + String.format(this.zeroPadOffsetFormat, startOffset) + this.extension;
        return this.fileKey(this.topicsDir, dirPrefix, name);
    }

    private boolean writeRecord(SinkRecord projectedRecord, String encodedPartition, SinkRecord originalRecord) {
        RecordWriter writer = this.writers.get(encodedPartition);
        long currentOffsetIfSuccessful = projectedRecord.kafkaOffset();
        boolean shouldRemoveWriter = false;
        boolean shouldRemoveStartOffset = false;
        boolean shouldRemoveCommitFilename = false;
        try {
            if (!this.startOffsets.containsKey(encodedPartition)) {
                log.trace("Setting writer's start offset for '{}' to {}", (Object)encodedPartition, (Object)currentOffsetIfSuccessful);
                this.startOffsets.put(encodedPartition, currentOffsetIfSuccessful);
                shouldRemoveStartOffset = true;
            }
            if (writer == null) {
                if (!this.commitFiles.containsKey(encodedPartition)) {
                    shouldRemoveCommitFilename = true;
                }
                writer = this.newWriter(projectedRecord, encodedPartition);
                shouldRemoveWriter = true;
            }
            writer.write(projectedRecord);
        }
        catch (SchemaParseException | DataException | InvalidSchemaException e) {
            if (this.reporter != null) {
                if (shouldRemoveStartOffset) {
                    this.startOffsets.remove(encodedPartition);
                }
                if (shouldRemoveWriter) {
                    this.writers.remove(encodedPartition);
                }
                if (shouldRemoveCommitFilename) {
                    this.commitFiles.remove(encodedPartition);
                }
                this.reporter.report(originalRecord, e);
                log.warn("Errant record written to DLQ due to: {}", (Object)e.getMessage());
                return false;
            }
            throw new ConnectException(e);
        }
        this.currentEncodedPartition = encodedPartition;
        this.currentOffset = projectedRecord.kafkaOffset();
        if (shouldRemoveStartOffset) {
            log.trace("Setting writer's start offset for '{}' to {}", (Object)this.currentEncodedPartition, (Object)this.currentOffset);
            this.recordCounts.put(this.currentEncodedPartition, 0L);
            this.endOffsets.put(this.currentEncodedPartition, 0L);
        }
        ++this.recordCount;
        this.recordCounts.put(this.currentEncodedPartition, this.recordCounts.get(this.currentEncodedPartition) + 1L);
        this.endOffsets.put(this.currentEncodedPartition, this.currentOffset);
        return true;
    }

    private void commitFiles() {
        for (Map.Entry<String, String> entry : this.commitFiles.entrySet()) {
            String encodedPartition = entry.getKey();
            this.commitFile(encodedPartition);
            if (this.isTaggingEnabled) {
                RetryUtil.exponentialBackoffRetry(() -> this.tagFile(encodedPartition, (String)entry.getValue(), this.hashMapTag), ConnectException.class, this.connectorConfig.getInt("s3.part.retries"), this.connectorConfig.getLong("s3.retry.backoff.ms"));
            }
            this.startOffsets.remove(encodedPartition);
            this.endOffsets.remove(encodedPartition);
            this.recordCounts.remove(encodedPartition);
            log.debug("Committed {} for {}", (Object)entry.getValue(), (Object)this.tp);
        }
        this.offsetToCommit = this.currentOffset + 1L;
        this.commitFiles.clear();
        this.currentSchemas.clear();
        this.offsetToFilenameMap.clear();
        this.recordCount = 0;
        this.baseRecordTimestamp = null;
        log.info("Files committed to S3. Target commit offset for {} is {}", (Object)this.tp, (Object)this.offsetToCommit);
    }

    private void commitFile(String encodedPartition) {
        if (!this.startOffsets.containsKey(encodedPartition)) {
            log.warn("Tried to commit file with missing starting offset partition: {}. Ignoring.");
            return;
        }
        if (this.writers.containsKey(encodedPartition)) {
            RecordWriter writer = this.writers.get(encodedPartition);
            this.tryCommitFile(writer, encodedPartition);
            this.writers.remove(encodedPartition);
            log.debug("Removed writer for '{}'", (Object)encodedPartition);
        }
    }

    private void tryCommitFile(RecordWriter writer, String encodedPartition) {
        try {
            writer.commit();
        }
        catch (FileExistsException e) {
            long nextStartOffset = this.findNextAvailableFile(encodedPartition);
            log.info("Next available offset for encoded partition {} is {}", (Object)encodedPartition, (Object)nextStartOffset);
            this.startOffsets.put(encodedPartition, nextStartOffset);
            throw e;
        }
    }

    public long findNextAvailableFile(String encodedPartition) {
        long startOffset = this.startOffsets.get(encodedPartition) + 1L;
        long targetEndOffset = startOffset + (long)this.connectorConfig.getInt("max.files.scan.limit").intValue();
        log.info("Scanning for available files for start_offset:{} and file {}", (Object)startOffset, (Object)this.commitFiles.get(encodedPartition));
        do {
            String commitFile = this.offsetToFilenameMap.get(startOffset);
            try {
                if (!this.offsetToFilenameMap.containsKey(startOffset)) {
                    log.info("Start offset {} not present in offsets map. Considering {} as next offset to process from", (Object)startOffset, (Object)startOffset);
                    return startOffset;
                }
                if (!this.storage.exists(this.offsetToFilenameMap.get(startOffset))) {
                    log.info("File {} does not exist in S3. Next target offset to reset to is {}", (Object)this.offsetToFilenameMap.get(startOffset), (Object)startOffset);
                    return startOffset;
                }
                log.debug("File {} already exists, checking for next available file", (Object)commitFile);
            }
            catch (AmazonS3Exception e) {
                if (e.getStatusCode() == 403) {
                    log.warn("Connector failed with 403 error. Incrementing offset by 1", (Throwable)e);
                    return startOffset;
                }
                throw e;
            }
        } while (++startOffset < targetEndOffset);
        log.info("Max scanning limit reached. Resetting offset to {}", (Object)targetEndOffset);
        return targetEndOffset;
    }

    private void tagFile(String encodedPartition, String s3ObjectPath, Map<String, String> extraHashMapTag) {
        Long startOffset = this.startOffsets.get(encodedPartition);
        Long endOffset = this.endOffsets.get(encodedPartition);
        Long recordCount = this.recordCounts.get(encodedPartition);
        if (startOffset == null || endOffset == null || recordCount == null) {
            log.warn("Missing tags when attempting to tag file {}. Starting offset tag: {}, ending offset tag: {}, record count tag: {}. Ignoring.", new Object[]{encodedPartition, startOffset == null ? "missing" : startOffset, endOffset == null ? "missing" : endOffset, recordCount == null ? "missing" : recordCount});
            return;
        }
        log.debug("Object to tag is: {}", (Object)s3ObjectPath);
        HashMap<String, String> tags = new HashMap<String, String>();
        tags.put("startOffset", Long.toString(startOffset));
        tags.put("endOffset", Long.toString(endOffset));
        tags.put("recordCount", Long.toString(recordCount));
        if (extraHashMapTag != null) {
            tags.putAll(extraHashMapTag);
        }
        try {
            this.storage.addTags(s3ObjectPath, tags);
            log.info("Tagged S3 object {} with starting offset {}, ending offset {}, record count {}", new Object[]{s3ObjectPath, startOffset, endOffset, recordCount});
        }
        catch (SdkClientException e) {
            if (this.ignoreTaggingErrors) {
                log.warn("Unable to tag S3 object {}. Ignoring.", (Object)s3ObjectPath, (Object)e);
            }
            throw new ConnectException(String.format("Unable to tag S3 object %s", s3ObjectPath), (Throwable)e);
        }
        catch (Exception e) {
            if (this.ignoreTaggingErrors) {
                log.warn("Unrecoverable exception while attempting to tag S3 object {}. Ignoring.", (Object)s3ObjectPath, (Object)e);
            }
            throw new ConnectException(String.format("Unable to tag S3 object %s", s3ObjectPath), (Throwable)e);
        }
    }

    private static enum State {
        WRITE_STARTED,
        WRITE_PARTITION_PAUSED,
        SHOULD_ROTATE,
        FILE_COMMITTED;

        private static final State[] VALS;

        public State next() {
            return VALS[(this.ordinal() + 1) % VALS.length];
        }

        static {
            VALS = State.values();
        }
    }
}

