/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.state;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import kafka.log.Log;
import kafka.log.Log$;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.AbstractTierSegmentMetadata;
import kafka.tier.domain.TierObjectMetadata;
import kafka.tier.domain.TierSegmentDeleteComplete;
import kafka.tier.domain.TierSegmentDeleteInitiate;
import kafka.tier.domain.TierSegmentUploadComplete;
import kafka.tier.domain.TierSegmentUploadInitiate;
import kafka.tier.domain.TierTopicInitLeader;
import kafka.tier.exceptions.TierPartitionStateIllegalListenerException;
import kafka.tier.serdes.TierPartitionStateHeader;
import kafka.tier.state.FileTierPartitionIterator;
import kafka.tier.state.Header;
import kafka.tier.state.ReplicationMaterializationListener;
import kafka.tier.state.TierPartitionState;
import kafka.tier.state.TierPartitionStatus;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileTierPartitionState
implements TierPartitionState,
AutoCloseable {
    private static final byte CURRENT_VERSION = 2;
    private static final int ENTRY_LENGTH_SIZE = 2;
    private static final long FILE_OFFSET = 0L;
    private static final Logger log = LoggerFactory.getLogger(FileTierPartitionState.class);
    private static final Set<TierObjectMetadata.State> FENCED_STATES = Collections.singleton(TierObjectMetadata.State.SEGMENT_FENCED);
    private final TopicPartition topicPartition;
    private final byte version;
    private final Object lock = new Object();
    private File dir;
    private String basePath;
    private TierObjectMetadata uploadInProgress;
    private boolean dirty = false;
    private volatile State state;
    private volatile TopicIdPartition topicIdPartition;
    private volatile boolean tieringEnabled;
    private volatile ReplicationMaterializationListener materializationTracker;
    private volatile TierPartitionStatus status = TierPartitionStatus.CLOSED;

    public FileTierPartitionState(File dir, TopicPartition topicPartition, boolean tieringEnabled) throws IOException {
        this(dir, topicPartition, tieringEnabled, 2);
    }

    FileTierPartitionState(File dir, TopicPartition topicPartition, boolean tieringEnabled, byte version) throws IOException {
        this.topicPartition = topicPartition;
        this.dir = dir;
        this.basePath = Log.tierStateFile(dir, 0L, "").getAbsolutePath();
        this.tieringEnabled = tieringEnabled;
        this.state = State.UNINITIALIZED_STATE;
        this.version = version;
        this.maybeOpenChannel();
    }

    @Override
    public TopicPartition topicPartition() {
        return this.topicPartition;
    }

    @Override
    public Optional<TopicIdPartition> topicIdPartition() {
        return Optional.ofNullable(this.topicIdPartition);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean setTopicId(UUID topicId) throws IOException {
        if (this.topicIdPartition != null) {
            if (!this.topicIdPartition.topicId().equals(topicId)) {
                throw new IllegalStateException("Illegal reassignment of topic id. Current: " + this.topicIdPartition + " Assigned: " + topicId);
            }
            return false;
        }
        this.topicIdPartition = new TopicIdPartition(this.topicPartition.topic(), topicId, this.topicPartition.partition());
        log.info("Setting topicIdPartition {}", (Object)this.topicIdPartition);
        Object object = this.lock;
        synchronized (object) {
            this.maybeOpenChannel();
        }
        return true;
    }

    @Override
    public boolean isTieringEnabled() {
        return this.tieringEnabled && this.topicIdPartition != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void enableTierConfig() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            this.tieringEnabled = true;
            this.maybeOpenChannel();
        }
    }

    @Override
    public Optional<Long> startOffset() {
        Map.Entry firstEntry = this.state.validSegments.firstEntry();
        if (firstEntry != null) {
            return Optional.of(firstEntry.getKey());
        }
        return Optional.empty();
    }

    @Override
    public long endOffset() {
        return this.state.endOffset;
    }

    @Override
    public long committedEndOffset() {
        return this.state.committedEndOffset();
    }

    @Override
    public long totalSize() {
        return this.state.validSegmentsSize;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void flush() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.dirty && this.status.isOpenForWrite()) {
                FileTierPartitionState.writeHeader(this.state.channel, new Header(this.topicIdPartition.topicId(), this.version, this.state.currentEpoch, this.status, this.state.endOffset, this.state.consumptionInfo.localMaterializedOffset));
                this.state.channel.force(true);
                Files.copy(FileTierPartitionState.mutableFilePath(this.basePath), FileTierPartitionState.tmpFilePath(this.basePath), StandardCopyOption.REPLACE_EXISTING);
                Utils.atomicMoveWithFallback((Path)FileTierPartitionState.tmpFilePath(this.basePath), (Path)FileTierPartitionState.flushedFilePath(this.basePath));
                this.state.committedEndOffset = this.state.endOffset;
                this.dirty = false;
            }
        }
    }

    @Override
    public int tierEpoch() {
        return this.state.currentEpoch;
    }

    @Override
    public File dir() {
        return this.dir;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void delete() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            this.closeHandlers();
            for (StateFileType type : StateFileType.values()) {
                Files.deleteIfExists(type.filePath(this.basePath));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void updateDir(File dir) {
        Object object = this.lock;
        synchronized (object) {
            this.basePath = Log.tierStateFile(dir, 0L, "").getAbsolutePath();
            this.dir = dir;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void closeHandlers() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.status != TierPartitionStatus.CLOSED) {
                try {
                    if (this.state.channel != null) {
                        this.state.channel.close();
                    }
                }
                finally {
                    this.state = State.UNINITIALIZED_STATE;
                    this.uploadInProgress = null;
                    if (this.materializationTracker != null) {
                        this.completeMaterializationTrackerExceptionally(new TierPartitionStateIllegalListenerException("Tier partition state for " + this.topicPartition + " has been closed."));
                    }
                    this.status = TierPartitionStatus.CLOSED;
                }
            }
        }
    }

    @Override
    public TierPartitionStatus status() {
        return this.status;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void beginCatchup() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.tieringEnabled || !this.status.isOpen()) {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.status) + " for tier partition. tieringEnabled: " + this.tieringEnabled + " basePath: " + this.basePath);
            }
            this.setStatus(TierPartitionStatus.CATCHUP);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onCatchUpComplete() {
        Object object = this.lock;
        synchronized (object) {
            if (!this.tieringEnabled || !this.status.isOpen()) {
                throw new IllegalStateException("Illegal state " + (Object)((Object)this.status) + " for tier partition. tieringEnabled: " + this.tieringEnabled + " basePath: " + this.basePath);
            }
            this.setStatus(TierPartitionStatus.ONLINE);
        }
    }

    @Override
    public int numSegments() {
        return this.segmentOffsets().size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Future<TierObjectMetadata> materializationListener(long targetOffset) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.materializationTracker != null) {
                this.completeMaterializationTrackerExceptionally(new IllegalStateException("Duplicate materialization listener registration for " + this.topicIdPartition));
            }
            this.materializationTracker = new ReplicationMaterializationListener(log, this.topicIdPartition, targetOffset);
            Future<TierObjectMetadata> promise = this.materializationTracker.promise();
            if (this.status.isOpen()) {
                Optional<Object> metadata = Optional.empty();
                long uncommittedEndOffset = this.endOffset();
                if (uncommittedEndOffset != -1L && targetOffset <= uncommittedEndOffset) {
                    metadata = this.metadata(targetOffset);
                }
                if (metadata.isPresent()) {
                    if (((TierObjectMetadata)metadata.get()).endOffset() < targetOffset) {
                        throw new IllegalStateException("Metadata lookup for offset " + targetOffset + " returned unexpected segment " + metadata + " for " + this.topicIdPartition);
                    }
                    this.maybeCompleteMaterializationTracker((TierObjectMetadata)metadata.get());
                }
            } else {
                this.completeMaterializationTrackerExceptionally(new TierPartitionStateIllegalListenerException("Tier partition state for " + this.topicPartition + " is not open."));
            }
            return promise;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            try {
                this.flush();
            }
            finally {
                this.closeHandlers();
                log.info("Tier partition state for {} closed.", (Object)this.topicIdPartition().map(TopicIdPartition::toString).orElse(this.topicPartition.toString()));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public TierPartitionState.AppendResult append(AbstractTierMetadata metadata, long tierTopicPartitionOffset) throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (!this.status.isOpenForWrite()) {
                log.debug("Skipping processing for {} from offset {} as file is not open for write", (Object)metadata, (Object)tierTopicPartitionOffset);
                return TierPartitionState.AppendResult.NOT_TIERABLE;
            }
            TierPartitionState.AppendResult result = this.appendMetadata(metadata);
            this.updateLocalMaterializedOffset(tierTopicPartitionOffset);
            log.debug("Processed append for {} with result {} consumed from offset {}", new Object[]{metadata, result, tierTopicPartitionOffset});
            return result;
        }
    }

    @Override
    public NavigableSet<Long> segmentOffsets() {
        return this.state.validSegments.keySet();
    }

    @Override
    public NavigableSet<Long> segmentOffsets(long from, long to) {
        return Log$.MODULE$.logSegments(this.state.validSegments, from, to, this.lock).keySet();
    }

    @Override
    public Optional<TierObjectMetadata> metadata(long targetOffset) throws IOException {
        State currentState = this.state;
        Map.Entry entry = currentState.validSegments.floorEntry(targetOffset);
        if (entry != null) {
            return FileTierPartitionState.readValidObjectMetadata(this.topicIdPartition, currentState, currentState.position((UUID)entry.getValue()), targetOffset);
        }
        return Optional.empty();
    }

    long lastConsumedSrcOffset() {
        return this.state.consumptionInfo.localMaterializedOffset;
    }

    String flushedPath() {
        return FileTierPartitionState.flushedFilePath(this.basePath).toFile().getAbsolutePath();
    }

    @Override
    public Collection<TierObjectMetadata> fencedSegments() {
        State currentState = this.state;
        return FileTierPartitionState.metadataForStates(this.topicIdPartition, currentState, FENCED_STATES);
    }

    public String toString() {
        if (this.tieringEnabled) {
            return "FileTierPartitionState(topicIdPartition=" + this.topicIdPartition + ", startOffset=" + this.startOffset() + ", endOffset=" + this.endOffset() + ", committedEndOffset=" + this.committedEndOffset() + ", numSegments=" + this.numSegments() + ", tierEpoch=" + this.tierEpoch() + ", lastMaterializedOffset=" + this.lastConsumedSrcOffset() + ")";
        }
        return "FileTierPartitionState(topicIdPartition=" + this.topicIdPartition + ", tieringEnabled=" + this.tieringEnabled + ")";
    }

    public static Optional<Header> readHeader(FileChannel channel) throws IOException {
        Optional<Short> headerSizeOpt = FileTierPartitionState.readHeaderSize(channel);
        if (!headerSizeOpt.isPresent()) {
            return Optional.empty();
        }
        short headerSize = headerSizeOpt.get();
        ByteBuffer headerBuf = ByteBuffer.allocate(headerSize);
        Utils.readFully((FileChannel)channel, (ByteBuffer)headerBuf, (long)2L);
        headerBuf.flip();
        if (headerBuf.limit() != headerSize) {
            return Optional.empty();
        }
        return Optional.of(new Header(TierPartitionStateHeader.getRootAsTierPartitionStateHeader(headerBuf)));
    }

    public static Optional<FileTierPartitionIterator> iterator(TopicPartition topicPartition, FileChannel channel) throws IOException {
        Optional<Header> headerOpt = FileTierPartitionState.readHeader(channel);
        if (!headerOpt.isPresent()) {
            return Optional.empty();
        }
        return Optional.of(FileTierPartitionState.iterator(new TopicIdPartition(topicPartition.topic(), headerOpt.get().topicId(), topicPartition.partition()), channel, headerOpt.get().size()));
    }

    byte version() {
        return this.version;
    }

    private void maybeCompleteMaterializationTracker(TierObjectMetadata lastMaterializedSegment) throws IOException {
        if (this.materializationTracker.canComplete(lastMaterializedSegment)) {
            this.flush();
            this.materializationTracker.complete(lastMaterializedSegment);
            this.materializationTracker = null;
        }
    }

    private void completeMaterializationTrackerExceptionally(Exception e) {
        this.materializationTracker.completeExceptionally(e);
        this.materializationTracker = null;
    }

    private static FileTierPartitionIterator iterator(TopicIdPartition topicIdPartition, FileChannel channel, long position) throws IOException {
        return new FileTierPartitionIterator(topicIdPartition, channel, position);
    }

    private void setStatus(TierPartitionStatus status) {
        this.status = status;
        this.dirty = true;
        log.info("Status updated to {} for {}", (Object)status, this.topicIdPartition());
    }

    private void updateLocalMaterializedOffset(long offset) {
        if (this.state.consumptionInfo.localMaterializedOffset >= offset) {
            log.debug("Ignoring previous TierTopicPartition offset {} for {}", (Object)offset, this.topicIdPartition());
            return;
        }
        this.state.consumptionInfo.localMaterializedOffset = offset;
    }

    private static List<TierObjectMetadata> metadataForStates(TopicIdPartition topicIdPartition, State currentState, Set<TierObjectMetadata.State> states) {
        return currentState.allSegments.values().stream().filter(segmentState -> states.contains((Object)((SegmentState)segmentState).state)).map(segmentState -> {
            try {
                return (TierObjectMetadata)FileTierPartitionState.iterator(topicIdPartition, currentState.channel, ((SegmentState)segmentState).position).next();
            }
            catch (IOException e) {
                throw new KafkaStorageException((Throwable)e);
            }
        }).collect(Collectors.toList());
    }

    private void maybeOpenChannel() throws IOException {
        if (this.tieringEnabled && !this.status.isOpen()) {
            Path flushedFilePath = FileTierPartitionState.flushedFilePath(this.basePath);
            Path mutableFilePath = FileTierPartitionState.mutableFilePath(this.basePath);
            if (!Files.exists(flushedFilePath, new LinkOption[0])) {
                Files.createFile(flushedFilePath, new FileAttribute[0]);
            }
            Files.copy(flushedFilePath, mutableFilePath, StandardCopyOption.REPLACE_EXISTING);
            FileChannel channel = FileTierPartitionState.getChannelMaybeReinitialize(this.topicPartition, this.topicIdPartition, this.basePath, this.version);
            if (channel == null) {
                this.status = TierPartitionStatus.CLOSED;
                return;
            }
            try {
                this.scanAndInitialize(channel);
            }
            catch (StateCorruptedException e) {
                this.closeHandlers();
                Files.delete(flushedFilePath);
                this.maybeOpenChannel();
                this.beginCatchup();
            }
        }
    }

    private static Optional<TierObjectMetadata> readValidObjectMetadata(TopicIdPartition topicIdPartition, State state, long initialBytePosition, long targetOffset) throws IOException {
        if (!state.validSegments.isEmpty()) {
            FileTierPartitionIterator iterator = FileTierPartitionState.iterator(topicIdPartition, state.channel, initialBytePosition);
            if (!iterator.hasNext()) {
                throw new IllegalStateException("Could not read entry at " + initialBytePosition + " for partition " + topicIdPartition);
            }
            while (iterator.hasNext()) {
                TierObjectMetadata metadata = (TierObjectMetadata)iterator.next();
                if (metadata.endOffset() < targetOffset || !metadata.state().equals((Object)TierObjectMetadata.State.SEGMENT_UPLOAD_COMPLETE)) continue;
                return Optional.of(metadata);
            }
            return Optional.empty();
        }
        return Optional.empty();
    }

    private TierPartitionState.AppendResult appendMetadata(AbstractTierMetadata entry) throws IOException {
        switch (entry.type()) {
            case InitLeader: {
                return this.handleInitLeader((TierTopicInitLeader)entry);
            }
            case SegmentUploadInitiate: 
            case SegmentUploadComplete: 
            case SegmentDeleteInitiate: 
            case SegmentDeleteComplete: {
                return this.maybeTransitionSegment((AbstractTierSegmentMetadata)entry);
            }
            case PartitionDeleteInitiate: 
            case PartitionDeleteComplete: {
                return TierPartitionState.AppendResult.ACCEPTED;
            }
        }
        throw new IllegalStateException("Attempt to append unknown type " + (Object)((Object)entry.type()) + " to " + this.topicIdPartition);
    }

    private TierPartitionState.AppendResult handleInitLeader(TierTopicInitLeader initLeader) throws IOException {
        if (initLeader.tierEpoch() == this.state.currentEpoch) {
            return TierPartitionState.AppendResult.ACCEPTED;
        }
        if (initLeader.tierEpoch() > this.state.currentEpoch) {
            HashSet<TierObjectMetadata.State> statesToFence = new HashSet<TierObjectMetadata.State>(Arrays.asList(TierObjectMetadata.State.SEGMENT_UPLOAD_INITIATE, TierObjectMetadata.State.SEGMENT_DELETE_INITIATE));
            List<TierObjectMetadata> toFence = FileTierPartitionState.metadataForStates(this.topicIdPartition, this.state, statesToFence);
            for (TierObjectMetadata metadata : toFence) {
                this.fenceSegment(metadata);
            }
            this.state.currentEpoch = initLeader.tierEpoch();
            this.dirty = true;
            return TierPartitionState.AppendResult.ACCEPTED;
        }
        return TierPartitionState.AppendResult.FENCED;
    }

    private TierPartitionState.AppendResult maybeTransitionSegment(AbstractTierSegmentMetadata metadata) throws IOException {
        SegmentState currentState = this.state.getState(metadata.objectId());
        if (metadata.tierEpoch() != this.state.currentEpoch) {
            log.info("Fenced {} as currentEpoch={} ({})", new Object[]{metadata, this.state.currentEpoch, this.topicIdPartition});
            return TierPartitionState.AppendResult.FENCED;
        }
        if (currentState != null) {
            if (currentState.state.equals((Object)metadata.state())) {
                log.debug("Accepting duplicate transition for {} ({})", (Object)metadata, (Object)this.topicIdPartition);
                return TierPartitionState.AppendResult.ACCEPTED;
            }
            if (!currentState.state.canTransitionTo(metadata.state())) {
                log.info("Fencing already processed transition for {} with currentState={} ({})", new Object[]{metadata, currentState, this.topicIdPartition});
                return TierPartitionState.AppendResult.FENCED;
            }
        } else if (metadata.state() != TierObjectMetadata.State.SEGMENT_UPLOAD_INITIATE) {
            throw new IllegalStateException("Cannot complete transition for non-existent segment " + metadata + " for " + this.topicIdPartition);
        }
        switch (metadata.state()) {
            case SEGMENT_UPLOAD_INITIATE: {
                return this.handleUploadInitiate((TierSegmentUploadInitiate)metadata);
            }
            case SEGMENT_UPLOAD_COMPLETE: {
                return this.handleUploadComplete((TierSegmentUploadComplete)metadata);
            }
            case SEGMENT_DELETE_INITIATE: {
                return this.handleDeleteInitiate((TierSegmentDeleteInitiate)metadata);
            }
            case SEGMENT_DELETE_COMPLETE: {
                return this.handleDeleteComplete((TierSegmentDeleteComplete)metadata);
            }
        }
        throw new IllegalStateException("Unexpected state " + (Object)((Object)metadata.state()) + " for " + this.topicIdPartition);
    }

    private TierObjectMetadata updateState(UUID objectId, TierObjectMetadata.State newState) throws IOException {
        SegmentState currentState = this.state.getState(objectId);
        if (currentState == null) {
            throw new IllegalStateException("No metadata found for " + objectId + " in " + this.topicIdPartition);
        }
        TierObjectMetadata metadata = (TierObjectMetadata)FileTierPartitionState.iterator(this.topicIdPartition, this.state.channel, currentState.position).next();
        if (!objectId.equals(metadata.objectId())) {
            throw new IllegalStateException("id mismatch. Expected: " + objectId + " Got: " + metadata.objectId() + " Partition: " + this.topicIdPartition);
        }
        metadata.mutateState(newState);
        Utils.writeFully((FileChannel)this.state.channel, (long)(currentState.position + 2L), (ByteBuffer)metadata.payloadBuffer());
        this.addSegmentMetadata(metadata, currentState.position);
        this.dirty = true;
        return metadata;
    }

    private void fenceSegment(TierObjectMetadata metadata) throws IOException {
        this.updateState(metadata.objectId(), TierObjectMetadata.State.SEGMENT_FENCED);
        if (this.uploadInProgress != null && this.uploadInProgress.objectId().equals(metadata.objectId())) {
            this.uploadInProgress = null;
        }
    }

    private TierPartitionState.AppendResult handleUploadInitiate(TierSegmentUploadInitiate uploadInitiate) throws IOException {
        TierObjectMetadata metadata = new TierObjectMetadata(uploadInitiate);
        if (metadata.endOffset() > this.state.endOffset) {
            if (this.uploadInProgress != null) {
                this.fenceSegment(this.uploadInProgress);
            }
            ByteBuffer metadataBuffer = metadata.payloadBuffer();
            long byteOffset = FileTierPartitionState.appendWithSizePrefix(this.state.channel, metadataBuffer);
            this.addSegmentMetadata(metadata, byteOffset);
            this.dirty = true;
            return TierPartitionState.AppendResult.ACCEPTED;
        }
        log.info("Fencing uploadInitiate for {}. currentEndOffset={} currentEpoch={}. ({})", new Object[]{metadata, this.state.endOffset, this.state.currentEpoch, this.topicIdPartition});
        return TierPartitionState.AppendResult.FENCED;
    }

    private TierPartitionState.AppendResult handleUploadComplete(TierSegmentUploadComplete uploadComplete) throws IOException {
        if (!this.uploadInProgress.objectId().equals(uploadComplete.objectId())) {
            throw new IllegalStateException("Expected " + this.uploadInProgress.objectId() + " to be in-progress but got " + uploadComplete.objectId() + " for partition " + this.topicIdPartition);
        }
        TierObjectMetadata metadata = this.updateState(uploadComplete.objectId(), TierObjectMetadata.State.SEGMENT_UPLOAD_COMPLETE);
        if (this.materializationTracker != null) {
            this.maybeCompleteMaterializationTracker(metadata);
        }
        return TierPartitionState.AppendResult.ACCEPTED;
    }

    private TierPartitionState.AppendResult handleDeleteInitiate(TierSegmentDeleteInitiate deleteInitiate) throws IOException {
        this.updateState(deleteInitiate.objectId(), TierObjectMetadata.State.SEGMENT_DELETE_INITIATE);
        return TierPartitionState.AppendResult.ACCEPTED;
    }

    private TierPartitionState.AppendResult handleDeleteComplete(TierSegmentDeleteComplete deleteComplete) throws IOException {
        this.updateState(deleteComplete.objectId(), TierObjectMetadata.State.SEGMENT_DELETE_COMPLETE);
        return TierPartitionState.AppendResult.ACCEPTED;
    }

    private static long appendWithSizePrefix(FileChannel channel, ByteBuffer metadataBuffer) throws IOException {
        long byteOffset = channel.position();
        short sizePrefix = (short)metadataBuffer.remaining();
        ByteBuffer sizeBuf = ByteBuffer.allocate(2).order(ByteOrder.LITTLE_ENDIAN);
        sizeBuf.putShort(0, sizePrefix);
        Utils.writeFully((FileChannel)channel, (ByteBuffer)sizeBuf);
        Utils.writeFully((FileChannel)channel, (ByteBuffer)metadataBuffer);
        return byteOffset;
    }

    private static void writeHeader(FileChannel channel, Header header) throws IOException {
        short sizePrefix = (short)header.payloadBuffer().remaining();
        ByteBuffer sizeBuf = ByteBuffer.allocate(2).order(ByteOrder.LITTLE_ENDIAN);
        sizeBuf.putShort(sizePrefix);
        sizeBuf.flip();
        Utils.writeFully((FileChannel)channel, (long)0L, (ByteBuffer)sizeBuf);
        Utils.writeFully((FileChannel)channel, (long)2L, (ByteBuffer)header.payloadBuffer());
    }

    private static Optional<Short> readHeaderSize(FileChannel channel) throws IOException {
        ByteBuffer headerPrefixBuf = ByteBuffer.allocate(2).order(ByteOrder.LITTLE_ENDIAN);
        Utils.readFully((FileChannel)channel, (ByteBuffer)headerPrefixBuf, (long)0L);
        headerPrefixBuf.flip();
        if (headerPrefixBuf.limit() == 2) {
            return Optional.of(headerPrefixBuf.getShort());
        }
        return Optional.empty();
    }

    private static void copy(FileChannel src, FileChannel dest) throws IOException {
        long srcSize = src.size();
        for (long position = src.position(); position < srcSize; position += src.transferTo(position, srcSize - position, dest)) {
        }
    }

    private static FileChannel getChannelMaybeReinitialize(TopicPartition topicPartition, TopicIdPartition topicIdPartition, String basePath, byte version) throws IOException {
        FileChannel channel;
        block16: {
            Path mutableFilePath = FileTierPartitionState.mutableFilePath(basePath);
            channel = FileChannel.open(mutableFilePath, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
            try {
                Optional<Header> initialHeaderOpt = FileTierPartitionState.readHeader(channel);
                if (!initialHeaderOpt.isPresent()) {
                    if (topicIdPartition != null) {
                        log.info("Writing new header to tier partition state for {}", (Object)topicIdPartition);
                        channel.truncate(0L);
                        FileTierPartitionState.writeHeader(channel, new Header(topicIdPartition.topicId(), version, -1, TierPartitionStatus.INIT, -1L, -1L));
                        break block16;
                    }
                    channel.close();
                    return null;
                }
                if (initialHeaderOpt.get().version() == version) break block16;
                Header initialHeader = initialHeaderOpt.get();
                Path tmpFilePath = FileTierPartitionState.tmpFilePath(basePath);
                topicIdPartition = new TopicIdPartition(topicPartition.topic(), initialHeader.topicId(), topicPartition.partition());
                try (FileChannel tmpChannel = FileChannel.open(tmpFilePath, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.READ, StandardOpenOption.WRITE);){
                    log.info("Rewriting tier partition state with version {} to {} for {}", new Object[]{initialHeader.version(), version, topicIdPartition});
                    Header newHeader = new Header(topicIdPartition.topicId(), version, initialHeader.tierEpoch(), initialHeader.status(), initialHeader.endOffset(), initialHeader.localMaterializedOffset());
                    FileTierPartitionState.writeHeader(tmpChannel, newHeader);
                    tmpChannel.position(newHeader.size());
                    channel.position(initialHeader.size());
                    FileTierPartitionState.copy(channel, tmpChannel);
                }
                channel.close();
                Utils.atomicMoveWithFallback((Path)tmpFilePath, (Path)mutableFilePath);
                channel = FileChannel.open(mutableFilePath, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
            }
            catch (IOException e) {
                channel.close();
                throw e;
            }
        }
        return channel;
    }

    private void scanAndInitialize(FileChannel channel) throws IOException, StateCorruptedException {
        log.debug("scan and truncate TierPartitionState {}", (Object)this.topicPartition);
        this.state = new State(channel);
        Header header = FileTierPartitionState.readHeader(channel).get();
        this.topicIdPartition = new TopicIdPartition(this.topicPartition.topic(), header.topicId(), this.topicPartition.partition());
        long currentPosition = header.size();
        FileTierPartitionIterator iterator = FileTierPartitionState.iterator(this.topicIdPartition, channel, currentPosition);
        while (iterator.hasNext()) {
            TierObjectMetadata metadata = (TierObjectMetadata)iterator.next();
            log.debug("{}: scan reloaded metadata {}", (Object)this.topicPartition, (Object)metadata);
            this.addSegmentMetadata(metadata, currentPosition);
            currentPosition = iterator.position();
        }
        if (currentPosition < channel.size()) {
            throw new StateCorruptedException("Could not read all bytes in file. position: " + currentPosition + " size: " + channel.size() + " for partition " + this.topicIdPartition);
        }
        if (header.endOffset() != -1L && this.state.endOffset != header.endOffset()) {
            log.info("File header endOffset does not match the materialized endOffset. Setting state endOffset to be equal to header endOffset. Header endOffset: " + header.endOffset() + " materialized state endOffset: " + this.state.endOffset + " for partition " + this.topicIdPartition);
            this.state.endOffset = header.endOffset();
        }
        channel.position(channel.size());
        this.state.committedEndOffset = this.state.endOffset;
        this.state.currentEpoch = header.tierEpoch();
        this.state.consumptionInfo.localMaterializedOffset = header.localMaterializedOffset();
        this.status = header.status();
        log.info("Opened tier partition state for {} in status {}. topicIdPartition: {} tierEpoch: {} endOffset: {}", new Object[]{this.topicPartition, this.status, this.topicIdPartition(), this.tierEpoch(), this.endOffset()});
    }

    private void addSegmentMetadata(TierObjectMetadata metadata, long byteOffset) {
        SegmentState segmentState = this.state.updateAndGetState(byteOffset, metadata);
        switch (metadata.state()) {
            case SEGMENT_UPLOAD_INITIATE: {
                if (this.uploadInProgress != null) {
                    throw new IllegalStateException("Unexpected upload in progress " + this.uploadInProgress + " when appending " + metadata + " to " + this.topicIdPartition);
                }
                this.uploadInProgress = metadata.duplicate();
                break;
            }
            case SEGMENT_UPLOAD_COMPLETE: {
                this.state.putValid(segmentState, metadata);
                this.uploadInProgress = null;
                break;
            }
            case SEGMENT_DELETE_INITIATE: {
                this.state.removeValid(segmentState, metadata);
                break;
            }
            case SEGMENT_DELETE_COMPLETE: 
            case SEGMENT_FENCED: {
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown state " + metadata + " for " + this.topicIdPartition);
            }
        }
    }

    private static Path flushedFilePath(String basePath) {
        return StateFileType.FLUSHED.filePath(basePath);
    }

    private static Path mutableFilePath(String basePath) {
        return StateFileType.MUTABLE.filePath(basePath);
    }

    private static Path tmpFilePath(String basePath) {
        return StateFileType.TEMPORARY.filePath(basePath);
    }

    private static class StateCorruptedException
    extends RetriableException {
        StateCorruptedException(String message) {
            super(message);
        }
    }

    private static class SegmentState {
        private TierObjectMetadata.State state;
        private final long startOffset;
        private final long position;

        SegmentState(long startOffset, long position) {
            this.startOffset = startOffset;
            this.position = position;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            SegmentState that = (SegmentState)o;
            return Objects.equals((Object)this.state, (Object)that.state) && Objects.equals(this.startOffset, that.startOffset) && Objects.equals(this.position, that.position);
        }

        public int hashCode() {
            return Objects.hash(new Object[]{this.state, this.startOffset, this.position});
        }

        public String toString() {
            return "SegmentState(state: " + (Object)((Object)this.state) + ", startOffset: " + this.startOffset + ", position: " + this.position + ")";
        }
    }

    private static class State {
        private static final State UNINITIALIZED_STATE = new State(null);
        private final FileChannel channel;
        private final ConcurrentNavigableMap<Long, UUID> validSegments = new ConcurrentSkipListMap<Long, UUID>();
        private final ConcurrentNavigableMap<UUID, SegmentState> allSegments = new ConcurrentSkipListMap<UUID, SegmentState>();
        private final ConsumptionInfo consumptionInfo = new ConsumptionInfo();
        private volatile long endOffset = -1L;
        private volatile long committedEndOffset = -1L;
        private volatile int currentEpoch = -1;
        private volatile long validSegmentsSize = 0L;

        State(FileChannel channel) {
            this.channel = channel;
        }

        SegmentState updateAndGetState(long byteOffset, TierObjectMetadata metadata) {
            this.allSegments.putIfAbsent(metadata.objectId(), new SegmentState(this.startOffsetOfSegment(metadata), byteOffset));
            SegmentState found = (SegmentState)this.allSegments.get(metadata.objectId());
            found.state = metadata.state();
            return found;
        }

        SegmentState getState(UUID objectId) {
            return (SegmentState)this.allSegments.get(objectId);
        }

        void putValid(SegmentState state, TierObjectMetadata metadata) {
            this.validSegments.put(state.startOffset, metadata.objectId());
            this.validSegmentsSize += (long)metadata.size();
            this.endOffset = Math.max(this.endOffset, metadata.endOffset());
        }

        void removeValid(SegmentState segmentState, TierObjectMetadata metadata) {
            UUID toRemove = (UUID)this.validSegments.get(segmentState.startOffset);
            if (toRemove != null && toRemove.equals(metadata.objectId())) {
                this.validSegments.remove(segmentState.startOffset);
                this.validSegmentsSize -= (long)metadata.size();
            }
        }

        long committedEndOffset() {
            return this.committedEndOffset;
        }

        long position(UUID objectId) {
            SegmentState state = this.getState(objectId);
            if (state != null) {
                return state.position;
            }
            throw new IllegalStateException("Could not find object " + objectId);
        }

        private long startOffsetOfSegment(TierObjectMetadata metadata) {
            return Math.max(metadata.baseOffset(), this.endOffset + 1L);
        }
    }

    private static class ConsumptionInfo {
        private volatile long localMaterializedOffset = -1L;

        private ConsumptionInfo() {
        }
    }

    private static enum StateFileType {
        FLUSHED(""),
        MUTABLE(".mutable"),
        TEMPORARY(".tmp");

        private String suffix;

        private StateFileType(String suffix) {
            this.suffix = suffix;
        }

        public Path filePath(String basePath) {
            return Paths.get(basePath + this.suffix, new String[0]);
        }
    }
}

