/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.state;

import java.io.File;
import java.io.Serializable;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import kafka.server.LogDirFailureChannel;
import kafka.tier.TierTestUtils$;
import kafka.tier.TopicIdPartition;
import kafka.tier.domain.AbstractTierMetadata;
import kafka.tier.domain.TierPartitionForceRestore;
import kafka.tier.domain.TierTopicInitLeader;
import kafka.tier.state.FileTierPartitionState;
import kafka.tier.state.OffsetAndEpoch;
import kafka.tier.state.StateScan;
import kafka.tier.state.StateSeek;
import kafka.tier.state.TierPartitionState;
import kafka.tier.state.TierPartitionStatus;
import kafka.utils.MockTime;
import kafka.utils.Scheduler;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import scala.Function1;
import scala.Predef$;
import scala.reflect.ScalaSignature;
import scala.runtime.RichInt$;
import scala.runtime.java8.JFunction1;

@ScalaSignature(bytes="\u0006\u000193A\u0001B\u0003\u0001\u0019!)1\u0003\u0001C\u0001)!)q\u0003\u0001C\u00011!)\u0011\b\u0001C\u0001u\t\tC+[3s!\u0006\u0014H/\u001b;j_:\u001cF/\u0019;f\u0007>t7-\u001e:sK:\u001c\u0017\u0010V3ti*\u0011aaB\u0001\u0006gR\fG/\u001a\u0006\u0003\u0011%\tA\u0001^5fe*\t!\"A\u0003lC\u001a\\\u0017m\u0001\u0001\u0014\u0005\u0001i\u0001C\u0001\b\u0012\u001b\u0005y!\"\u0001\t\u0002\u000bM\u001c\u0017\r\\1\n\u0005Iy!AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002+A\u0011a\u0003A\u0007\u0002\u000b\u0005y\"/Z1e/JLG/\u001a%piN<\u0018\r]\"p]\u000e,(O]3oGf$Vm\u001d;\u0015\u0005ea\u0002C\u0001\b\u001b\u0013\tYrB\u0001\u0003V]&$\b\"B\u000f\u0003\u0001\u0004q\u0012aD2iK\u000e\\7/^7F]\u0006\u0014G.\u001a3\u0011\u00059y\u0012B\u0001\u0011\u0010\u0005\u001d\u0011un\u001c7fC:DCA\u0001\u00121cA\u00111EL\u0007\u0002I)\u0011QEJ\u0001\taJ|g/\u001b3fe*\u0011q\u0005K\u0001\u0007a\u0006\u0014\u0018-\\:\u000b\u0005%R\u0013a\u00026va&$XM\u001d\u0006\u0003W1\nQA[;oSRT\u0011!L\u0001\u0004_J<\u0017BA\u0018%\u0005-1\u0016\r\\;f'>,(oY3\u0002\u0011\t|w\u000e\\3b]Nd#AM\u001a\u001a\u0003\u0005I\u0012\u0001\u0001\u0015\u0003\u0005U\u0002\"AN\u001c\u000e\u0003\u0019J!\u0001\u000f\u0014\u0003#A\u000b'/Y7fi\u0016\u0014\u0018N_3e)\u0016\u001cH/\u0001\rsK\u0006$wK]5uK\u000e{gnY;se\u0016t7-\u001f+fgR$2!G\u001e>\u0011\u0015a4\u00011\u0001\u001f\u0003Y\u0019\u0007.Z2lgVl\u0017I\u001c3D_6\u0004\u0018m\u0019;GY\u0006<\u0007\"\u0002 \u0004\u0001\u0004q\u0012aC2mK\u0006tW\u000f\u001d$mC\u001eDCa\u0001!D\tB\u00111%Q\u0005\u0003\u0005\u0012\u0012\u0011bQ:w'>,(oY3\u0002\u000bY\fG.^3-\t\u0015;\u0015jS\u0011\u0002\r\u0006IAO];fYQ\u0014X/Z\u0011\u0002\u0011\u0006Qa-\u00197tK2\"(/^3\"\u0003)\u000b!\u0002\u001e:vK22\u0017\r\\:fC\u0005a\u0015a\u00034bYN,GFZ1mg\u0016D#aA\u001b")
public class TierPartitionStateConcurrencyTest {
    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void readWriteHotswapConcurrencyTest(boolean checksumEnabled) {
        File baseDir = TestUtils$.MODULE$.tempDir();
        String topic = UUID.randomUUID().toString();
        int partition = 0;
        UUID topicId = UUID.randomUUID();
        TopicIdPartition tpid = new TopicIdPartition(topic, topicId, partition);
        TopicPartition tp = tpid.topicPartition();
        int runLengthMs = 500;
        int nThreads = 8;
        int epoch = 0;
        MockTime time = new MockTime();
        FileTierPartitionState state = new FileTierPartitionState(baseDir, new LogDirFailureChannel(5), tp, true, (Scheduler)time.scheduler(), checksumEnabled, false, false, (Time)time);
        state.setTopicId(tpid.topicId());
        state.beginCatchup();
        state.onCatchUpComplete();
        long startTime = System.currentTimeMillis();
        AtomicLong readOffset = new AtomicLong(-1L);
        AtomicReference exception = new AtomicReference();
        AtomicBoolean shutdown = new AtomicBoolean(false);
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), nThreads / 2).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> new Thread(new StateSeek(state, shutdown, exception, readOffset)).start());
        try {
            state.append((AbstractTierMetadata)new TierTopicInitLeader(tpid, epoch, UUID.randomUUID(), 0), TierTestUtils$.MODULE$.nextTierTopicOffsetAndEpoch());
            state.flush();
            int i2 = 0;
            while (System.currentTimeMillis() < startTime + (long)runLengthMs) {
                byte[] copyBytes = Files.readAllBytes(Paths.get(state.flushedPath(), new String[0]));
                TierPartitionForceRestore originalRestore = new TierPartitionForceRestore(tpid, UUID.randomUUID(), Predef$.MODULE$.Long2long(state.startOffset().orElse(Predef$.MODULE$.long2Long(-1L))), state.endOffset(), TierTestUtils$.MODULE$.nextTierTopicOffsetAndEpoch(), "contenthash", Predef$.MODULE$.boolean2Boolean(false));
                TierTestUtils$.MODULE$.uploadWithMetadata((TierPartitionState)state, tpid, epoch, UUID.randomUUID(), (long)(i2 * 2), (long)(i2 * 2 + 1), (long)i2, (long)i2, i2, false, true, false, new OffsetAndEpoch(0L, Optional.empty()), TierTestUtils$.MODULE$.uploadWithMetadata$default$14());
                state.flush();
                TierPartitionForceRestore revertedRestore = new TierPartitionForceRestore(tpid, UUID.randomUUID(), Predef$.MODULE$.Long2long(state.startOffset().orElse(Predef$.MODULE$.long2Long(-1L))), state.endOffset(), TierTestUtils$.MODULE$.nextTierTopicOffsetAndEpoch(), "contenthash", Predef$.MODULE$.boolean2Boolean(false));
                byte[] correctBytes = Files.readAllBytes(Paths.get(state.flushedPath(), new String[0]));
                state.processRestoreEvents((AbstractTierMetadata)originalRestore, Optional.of(ByteBuffer.wrap(copyBytes)), TierPartitionStatus.ONLINE, TierTestUtils$.MODULE$.nextTierTopicOffsetAndEpoch());
                state.processRestoreEvents((AbstractTierMetadata)revertedRestore, Optional.of(ByteBuffer.wrap(correctBytes)), TierPartitionStatus.ONLINE, TierTestUtils$.MODULE$.nextTierTopicOffsetAndEpoch());
                readOffset.set((i2 - 1) * 2);
                ++i2;
            }
            shutdown.set(true);
            Thread.sleep(10L);
            if (exception.get() != null) {
                ((Throwable)exception.get()).printStackTrace();
            }
            Assertions.assertNull(exception.get());
        }
        finally {
            state.delete();
        }
    }

    @ParameterizedTest
    @CsvSource(value={"true,true", "false,true", "true,false", "false,false"})
    public void readWriteConcurrencyTest(boolean checksumAndCompactFlag, boolean cleanupFlag) {
        File baseDir = TestUtils$.MODULE$.tempDir();
        String topic = UUID.randomUUID().toString();
        int partition = 0;
        UUID topicId = UUID.randomUUID();
        TopicIdPartition tpid = new TopicIdPartition(topic, topicId, partition);
        TopicPartition tp = tpid.topicPartition();
        int runLengthMs = 500;
        int nThreads = 8;
        int epoch = 0;
        MockTime time = new MockTime();
        FileTierPartitionState state = new FileTierPartitionState(baseDir, new LogDirFailureChannel(5), tp, true, (Scheduler)time.scheduler(), checksumAndCompactFlag, checksumAndCompactFlag, cleanupFlag, (Time)time);
        state.setTopicId(tpid.topicId());
        state.beginCatchup();
        state.onCatchUpComplete();
        long startTime = System.currentTimeMillis();
        AtomicLong latestStartOffset = new AtomicLong(0L);
        AtomicReference exception = new AtomicReference();
        AtomicBoolean shutdown = new AtomicBoolean(false);
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), nThreads / 2).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> new Thread(new StateSeek(state, shutdown, exception, latestStartOffset)).start());
        RichInt$.MODULE$.to$extension0(Predef$.MODULE$.intWrapper(0), nThreads / 2).foreach$mVc$sp((Function1)(JFunction1.mcVI.sp & Serializable & scala.Serializable)i -> new Thread(new StateScan(state, shutdown, exception, latestStartOffset)).start());
        try {
            state.append((AbstractTierMetadata)new TierTopicInitLeader(tpid, epoch, UUID.randomUUID(), 0), TierTestUtils$.MODULE$.nextTierTopicOffsetAndEpoch());
            int size = 0;
            int i2 = 0;
            while (System.currentTimeMillis() < startTime + (long)runLengthMs) {
                TierTestUtils$.MODULE$.uploadWithMetadata((TierPartitionState)state, tpid, epoch, UUID.randomUUID(), (long)(i2 * 2), (long)(i2 * 2 + 1), (long)i2, (long)i2, i2, false, true, TierTestUtils$.MODULE$.uploadWithMetadata$default$12(), TierTestUtils$.MODULE$.uploadWithMetadata$default$13(), TierTestUtils$.MODULE$.uploadWithMetadata$default$14());
                state.flush();
                latestStartOffset.set(i2 * 2);
                size += i2;
                ++i2;
            }
            shutdown.set(true);
            Thread.sleep(10L);
            if (exception.get() != null) {
                ((Throwable)exception.get()).printStackTrace();
            }
            Assertions.assertNull(exception.get());
        }
        finally {
            state.delete();
        }
    }
}

