package org.apache.flink.runtime.state.changelog.inmemory;

import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:org/apache/flink/runtime/state/changelog/inmemory/InMemoryStateChangelogWriter.class */
class InMemoryStateChangelogWriter implements StateChangelogWriter<InMemoryChangelogStateHandle> {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryStateChangelogWriter.class);
    private static final SequenceNumber INITIAL_SQN = SequenceNumber.of(0);
    private final KeyGroupRange keyGroupRange;
    private boolean closed;
    private final Map<Integer, NavigableMap<SequenceNumber, byte[]>> changesByKeyGroup = new HashMap();
    private SequenceNumber sqn = INITIAL_SQN;

    public InMemoryStateChangelogWriter(KeyGroupRange keyGroupRange) {
        this.keyGroupRange = keyGroupRange;
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public void appendMeta(byte[] bArr) throws IOException {
        LOG.trace("append metadata: {} bytes", Integer.valueOf(bArr.length));
        if (this.closed) {
            LOG.warn("LogWriter is closed.");
        } else {
            this.changesByKeyGroup.computeIfAbsent(-1, num -> {
                return new TreeMap();
            }).put(this.sqn, bArr);
            this.sqn = this.sqn.next();
        }
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public void append(int i, byte[] bArr) {
        LOG.trace("append, keyGroup={}, {} bytes", Integer.valueOf(i), Integer.valueOf(bArr.length));
        if (this.closed) {
            LOG.warn("LogWriter is closed.");
        } else {
            this.changesByKeyGroup.computeIfAbsent(Integer.valueOf(i), num -> {
                return new TreeMap();
            }).put(this.sqn, bArr);
            this.sqn = this.sqn.next();
        }
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public SequenceNumber initialSequenceNumber() {
        return INITIAL_SQN;
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public SequenceNumber nextSequenceNumber() {
        return this.sqn;
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public CompletableFuture<SnapshotResult<InMemoryChangelogStateHandle>> persist(SequenceNumber sequenceNumber, long j) {
        LOG.debug("Persist after {}", sequenceNumber);
        Preconditions.checkNotNull(sequenceNumber);
        return CompletableFuture.completedFuture(SnapshotResult.withLocalState(new InMemoryChangelogStateHandle(collectChanges(sequenceNumber), sequenceNumber, this.sqn, this.keyGroupRange), new InMemoryChangelogStateHandle(collectChanges(sequenceNumber), sequenceNumber, this.sqn, this.keyGroupRange)));
    }

    private List<StateChange> collectChanges(SequenceNumber sequenceNumber) {
        return (List) this.changesByKeyGroup.entrySet().stream().flatMap(entry -> {
            return toChangeStream((NavigableMap) entry.getValue(), sequenceNumber, ((Integer) entry.getKey()).intValue());
        }).sorted(Comparator.comparing(tuple2 -> {
            return (SequenceNumber) tuple2.f0;
        })).map(tuple22 -> {
            return (StateChange) tuple22.f1;
        }).collect(Collectors.toList());
    }

    private Stream<Tuple2<SequenceNumber, StateChange>> toChangeStream(NavigableMap<SequenceNumber, byte[]> navigableMap, SequenceNumber sequenceNumber, int i) {
        return i == -1 ? navigableMap.tailMap(sequenceNumber, true).entrySet().stream().map(entry -> {
            return Tuple2.of(entry.getKey(), StateChange.ofMetadataChange((byte[]) entry.getValue()));
        }) : navigableMap.tailMap(sequenceNumber, true).entrySet().stream().map(entry2 -> {
            return Tuple2.of(entry2.getKey(), StateChange.ofDataChange(i, (byte[]) entry2.getValue()));
        });
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter, java.lang.AutoCloseable
    public void close() {
        Preconditions.checkState(!this.closed);
        this.closed = true;
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public void truncate(SequenceNumber sequenceNumber) {
        this.changesByKeyGroup.forEach((num, navigableMap) -> {
            navigableMap.headMap(sequenceNumber, false).clear();
        });
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public void truncateAndClose(SequenceNumber sequenceNumber) {
        close();
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public void confirm(SequenceNumber sequenceNumber, SequenceNumber sequenceNumber2, long j) {
    }

    @Override // org.apache.flink.runtime.state.changelog.StateChangelogWriter
    public void reset(SequenceNumber sequenceNumber, SequenceNumber sequenceNumber2, long j) {
    }
}
