package org.apache.flink.streaming.connectors.kafka.testutils;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Collection;
import java.util.Set;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.common.typeutils.ClassRelocator;
import org.apache.flink.api.common.typeutils.ThreadContextClassLoader;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerMatchers;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.AssertionsForInterfaceTypes;
import org.assertj.core.api.Assumptions;
import org.assertj.core.api.HamcrestCondition;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase.class */
public abstract class TypeSerializerUpgradeTestBase<PreviousElementT, UpgradedElementT> {
    public static final FlinkVersion CURRENT_VERSION = FlinkVersion.v1_17;
    public static final Set<FlinkVersion> MIGRATION_VERSIONS = FlinkVersion.rangeOf(FlinkVersion.v1_11, CURRENT_VERSION);
    private static final int INITIAL_OUTPUT_BUFFER_SIZE = 64;

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase$ClassLoaderSafePreUpgradeSetup.class */
    private static class ClassLoaderSafePreUpgradeSetup<PreviousElementT> implements PreUpgradeSetup<PreviousElementT> {
        private final PreUpgradeSetup<PreviousElementT> delegateSetup;
        private final ClassLoader setupClassloader;

        ClassLoaderSafePreUpgradeSetup(Class<? extends PreUpgradeSetup<PreviousElementT>> cls) throws Exception {
            Preconditions.checkNotNull(cls);
            Class relocate = ClassRelocator.relocate(cls);
            this.setupClassloader = relocate.getClassLoader();
            ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(this.setupClassloader);
            try {
                this.delegateSetup = (PreUpgradeSetup) relocate.newInstance();
                threadContextClassLoader.close();
            } catch (Throwable th) {
                try {
                    threadContextClassLoader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }

        @Override // org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerUpgradeTestBase.PreUpgradeSetup
        public TypeSerializer<PreviousElementT> createPriorSerializer() {
            try {
                ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(this.setupClassloader);
                try {
                    TypeSerializer<PreviousElementT> createPriorSerializer = this.delegateSetup.createPriorSerializer();
                    threadContextClassLoader.close();
                    return createPriorSerializer;
                } finally {
                }
            } catch (IOException e) {
                throw new RuntimeException("Error creating prior serializer via ClassLoaderSafePreUpgradeSetup.", e);
            }
        }

        @Override // org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerUpgradeTestBase.PreUpgradeSetup
        public PreviousElementT createTestData() {
            try {
                ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(this.setupClassloader);
                try {
                    PreviousElementT createTestData = this.delegateSetup.createTestData();
                    threadContextClassLoader.close();
                    return createTestData;
                } finally {
                }
            } catch (IOException e) {
                throw new RuntimeException("Error creating test data via ThreadContextClassLoader.", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase$ClassLoaderSafeUpgradeVerifier.class */
    private static class ClassLoaderSafeUpgradeVerifier<UpgradedElementT> implements UpgradeVerifier<UpgradedElementT> {
        private final UpgradeVerifier<UpgradedElementT> delegateVerifier;
        private final ClassLoader verifierClassloader;

        ClassLoaderSafeUpgradeVerifier(Class<? extends UpgradeVerifier<UpgradedElementT>> cls) throws Exception {
            Preconditions.checkNotNull(cls);
            Class relocate = ClassRelocator.relocate(cls);
            this.verifierClassloader = relocate.getClassLoader();
            ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(this.verifierClassloader);
            try {
                this.delegateVerifier = (UpgradeVerifier) relocate.newInstance();
                threadContextClassLoader.close();
            } catch (Throwable th) {
                try {
                    threadContextClassLoader.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }

        @Override // org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerUpgradeTestBase.UpgradeVerifier
        public TypeSerializer<UpgradedElementT> createUpgradedSerializer() {
            try {
                ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(this.verifierClassloader);
                try {
                    TypeSerializer<UpgradedElementT> createUpgradedSerializer = this.delegateVerifier.createUpgradedSerializer();
                    threadContextClassLoader.close();
                    return createUpgradedSerializer;
                } finally {
                }
            } catch (IOException e) {
                throw new RuntimeException("Error creating upgraded serializer via ClassLoaderSafeUpgradeVerifier.", e);
            }
        }

        @Override // org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerUpgradeTestBase.UpgradeVerifier
        public Matcher<UpgradedElementT> testDataMatcher() {
            try {
                ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(this.verifierClassloader);
                try {
                    Matcher<UpgradedElementT> testDataMatcher = this.delegateVerifier.testDataMatcher();
                    threadContextClassLoader.close();
                    return testDataMatcher;
                } finally {
                }
            } catch (IOException e) {
                throw new RuntimeException("Error creating expected test data via ClassLoaderSafeUpgradeVerifier.", e);
            }
        }

        @Override // org.apache.flink.streaming.connectors.kafka.testutils.TypeSerializerUpgradeTestBase.UpgradeVerifier
        public Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher(FlinkVersion flinkVersion) {
            try {
                ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(this.verifierClassloader);
                try {
                    Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher = this.delegateVerifier.schemaCompatibilityMatcher(flinkVersion);
                    threadContextClassLoader.close();
                    return schemaCompatibilityMatcher;
                } finally {
                }
            } catch (IOException e) {
                throw new RuntimeException("Error creating schema compatibility matcher via ClassLoaderSafeUpgradeVerifier.", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase$PreUpgradeSetup.class */
    public interface PreUpgradeSetup<PreviousElementT> {
        TypeSerializer<PreviousElementT> createPriorSerializer();

        PreviousElementT createTestData();
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase$TestSpecification.class */
    public static class TestSpecification<PreviousElementT, UpgradedElementT> {
        private final String name;
        private final FlinkVersion flinkVersion;
        private final ClassLoaderSafePreUpgradeSetup<PreviousElementT> setup;
        private final ClassLoaderSafeUpgradeVerifier<UpgradedElementT> verifier;

        public TestSpecification(String str, FlinkVersion flinkVersion, Class<? extends PreUpgradeSetup<PreviousElementT>> cls, Class<? extends UpgradeVerifier<UpgradedElementT>> cls2) throws Exception {
            this.name = (String) Preconditions.checkNotNull(str);
            this.flinkVersion = (FlinkVersion) Preconditions.checkNotNull(flinkVersion);
            this.setup = new ClassLoaderSafePreUpgradeSetup<>(cls);
            this.verifier = new ClassLoaderSafeUpgradeVerifier<>(cls2);
        }

        public String toString() {
            return this.name + " / " + this.flinkVersion;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/testutils/TypeSerializerUpgradeTestBase$UpgradeVerifier.class */
    public interface UpgradeVerifier<UpgradedElementT> {
        TypeSerializer<UpgradedElementT> createUpgradedSerializer();

        Matcher<UpgradedElementT> testDataMatcher();

        Matcher<TypeSerializerSchemaCompatibility<UpgradedElementT>> schemaCompatibilityMatcher(FlinkVersion flinkVersion);
    }

    public abstract Collection<TestSpecification<?, ?>> createTestSpecifications() throws Exception;

    @Disabled
    @MethodSource({"createTestSpecifications"})
    @ParameterizedTest(name = "Test Specification = {0}")
    void generateTestSetupFiles(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) throws Exception {
        Files.createDirectories(getSerializerSnapshotFilePath(testSpecification).getParent(), new FileAttribute[0]);
        ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(((TestSpecification) testSpecification).setup.setupClassloader);
        try {
            TypeSerializer<PreviousElementT> createPriorSerializer = ((TestSpecification) testSpecification).setup.createPriorSerializer();
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE);
            createPriorSerializer.serialize(((TestSpecification) testSpecification).setup.createTestData(), dataOutputSerializer);
            writeContentsTo(getGenerateDataFilePath(testSpecification), dataOutputSerializer.getCopyOfBuffer());
            DataOutputSerializer dataOutputSerializer2 = new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE);
            writeSerializerSnapshot(dataOutputSerializer2, createPriorSerializer, CURRENT_VERSION);
            writeContentsTo(getGenerateSerializerSnapshotFilePath(testSpecification), dataOutputSerializer2.getCopyOfBuffer());
            threadContextClassLoader.close();
        } catch (Throwable th) {
            try {
                threadContextClassLoader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @MethodSource({"createTestSpecifications"})
    @ParameterizedTest(name = "Test Specification = {0}")
    void restoreSerializerIsValid(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) throws Exception {
        ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(((TestSpecification) testSpecification).verifier.verifierClassloader);
        try {
            Assumptions.assumeThat(TypeSerializerSchemaCompatibility.incompatible()).as("This test only applies for test specifications that verify an upgraded serializer that is not incompatible.", new Object[0]).is(HamcrestCondition.matching(CoreMatchers.not(((TestSpecification) testSpecification).verifier.schemaCompatibilityMatcher(((TestSpecification) testSpecification).flinkVersion))));
            assertSerializerIsValid(snapshotUnderTest(testSpecification).restoreSerializer(), dataUnderTest(testSpecification), ((TestSpecification) testSpecification).verifier.testDataMatcher());
            threadContextClassLoader.close();
        } catch (Throwable th) {
            try {
                threadContextClassLoader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @MethodSource({"createTestSpecifications"})
    @ParameterizedTest(name = "Test Specification = {0}")
    void upgradedSerializerHasExpectedSchemaCompatibility(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) throws Exception {
        ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(((TestSpecification) testSpecification).verifier.verifierClassloader);
        try {
            AssertionsForClassTypes.assertThat(snapshotUnderTest(testSpecification).resolveSchemaCompatibility(((TestSpecification) testSpecification).verifier.createUpgradedSerializer())).is(HamcrestCondition.matching(((TestSpecification) testSpecification).verifier.schemaCompatibilityMatcher(((TestSpecification) testSpecification).flinkVersion)));
            threadContextClassLoader.close();
        } catch (Throwable th) {
            try {
                threadContextClassLoader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @MethodSource({"createTestSpecifications"})
    @ParameterizedTest(name = "Test Specification = {0}")
    void upgradedSerializerIsValidAfterMigration(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) throws Exception {
        ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(((TestSpecification) testSpecification).verifier.verifierClassloader);
        try {
            TypeSerializerSnapshot<UpgradedElementT> snapshotUnderTest = snapshotUnderTest(testSpecification);
            TypeSerializer<UpgradedElementT> createUpgradedSerializer = ((TestSpecification) testSpecification).verifier.createUpgradedSerializer();
            Assumptions.assumeThat(snapshotUnderTest.resolveSchemaCompatibility(createUpgradedSerializer)).as("This test only applies for test specifications that verify an upgraded serializer that requires migration to be compatible.", new Object[0]).is(HamcrestCondition.matching(TypeSerializerMatchers.isCompatibleAfterMigration()));
            assertSerializerIsValid(createUpgradedSerializer, readAndThenWriteData(dataUnderTest(testSpecification), snapshotUnderTest.restoreSerializer(), createUpgradedSerializer, ((TestSpecification) testSpecification).verifier.testDataMatcher()), ((TestSpecification) testSpecification).verifier.testDataMatcher());
            threadContextClassLoader.close();
        } catch (Throwable th) {
            try {
                threadContextClassLoader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @MethodSource({"createTestSpecifications"})
    @ParameterizedTest(name = "Test Specification = {0}")
    void upgradedSerializerIsValidAfterReconfiguration(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) throws Exception {
        ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(((TestSpecification) testSpecification).verifier.verifierClassloader);
        try {
            TypeSerializerSchemaCompatibility resolveSchemaCompatibility = snapshotUnderTest(testSpecification).resolveSchemaCompatibility(((TestSpecification) testSpecification).verifier.createUpgradedSerializer());
            Assumptions.assumeThat(resolveSchemaCompatibility).as("This test only applies for test specifications that verify an upgraded serializer that requires reconfiguration to be compatible.", new Object[0]).is(HamcrestCondition.matching(TypeSerializerMatchers.isCompatibleWithReconfiguredSerializer()));
            assertSerializerIsValid(resolveSchemaCompatibility.getReconfiguredSerializer(), dataUnderTest(testSpecification), ((TestSpecification) testSpecification).verifier.testDataMatcher());
            threadContextClassLoader.close();
        } catch (Throwable th) {
            try {
                threadContextClassLoader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @MethodSource({"createTestSpecifications"})
    @ParameterizedTest(name = "Test Specification = {0}")
    void upgradedSerializerIsValidWhenCompatibleAsIs(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) throws Exception {
        ThreadContextClassLoader threadContextClassLoader = new ThreadContextClassLoader(((TestSpecification) testSpecification).verifier.verifierClassloader);
        try {
            TypeSerializerSnapshot<UpgradedElementT> snapshotUnderTest = snapshotUnderTest(testSpecification);
            TypeSerializer<UpgradedElementT> createUpgradedSerializer = ((TestSpecification) testSpecification).verifier.createUpgradedSerializer();
            Assumptions.assumeThat(snapshotUnderTest.resolveSchemaCompatibility(createUpgradedSerializer)).as("This test only applies for test specifications that verify an upgraded serializer that is compatible as is.", new Object[0]).is(HamcrestCondition.matching(TypeSerializerMatchers.isCompatibleAsIs()));
            assertSerializerIsValid(createUpgradedSerializer, dataUnderTest(testSpecification), ((TestSpecification) testSpecification).verifier.testDataMatcher());
            threadContextClassLoader.close();
        } catch (Throwable th) {
            try {
                threadContextClassLoader.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static <T> void assertSerializerIsValid(TypeSerializer<T> typeSerializer, DataInputView dataInputView, Matcher<T> matcher) throws Exception {
        DataInputView readAndThenWriteData = readAndThenWriteData(dataInputView, typeSerializer, typeSerializer, matcher);
        TypeSerializerSnapshot writeAndThenReadSerializerSnapshot = writeAndThenReadSerializerSnapshot(typeSerializer);
        TypeSerializer restoreSerializer = writeAndThenReadSerializerSnapshot.restoreSerializer();
        DataInputView readAndThenWriteData2 = readAndThenWriteData(readAndThenWriteData, restoreSerializer, restoreSerializer, matcher);
        TypeSerializer duplicate = writeAndThenReadSerializerSnapshot.restoreSerializer().duplicate();
        readAndThenWriteData(readAndThenWriteData2, duplicate, duplicate, matcher);
    }

    private Path getGenerateSerializerSnapshotFilePath(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) {
        return Paths.get(getGenerateResourceDirectory(testSpecification) + "/serializer-snapshot", new String[0]);
    }

    private Path getGenerateDataFilePath(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) {
        return Paths.get(getGenerateResourceDirectory(testSpecification) + "/test-data", new String[0]);
    }

    private String getGenerateResourceDirectory(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) {
        return System.getProperty("user.dir") + "/src/test/resources/" + ((TestSpecification) testSpecification).name + "-" + CURRENT_VERSION;
    }

    private Path getSerializerSnapshotFilePath(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) {
        return Paths.get(getTestResourceDirectory(testSpecification) + "/serializer-snapshot", new String[0]);
    }

    private Path getTestDataFilePath(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) {
        return Paths.get(getTestResourceDirectory(testSpecification) + "/test-data", new String[0]);
    }

    private String getTestResourceDirectory(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) {
        return System.getProperty("user.dir") + "/src/test/resources/" + ((TestSpecification) testSpecification).name + "-" + ((TestSpecification) testSpecification).flinkVersion;
    }

    private TypeSerializerSnapshot<UpgradedElementT> snapshotUnderTest(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) throws Exception {
        return readSerializerSnapshot(contentsOf(getSerializerSnapshotFilePath(testSpecification)), ((TestSpecification) testSpecification).flinkVersion);
    }

    private DataInputView dataUnderTest(TestSpecification<PreviousElementT, UpgradedElementT> testSpecification) {
        return contentsOf(getTestDataFilePath(testSpecification));
    }

    private static void writeContentsTo(Path path, byte[] bArr) {
        try {
            Files.write(path, bArr, new OpenOption[0]);
        } catch (IOException e) {
            throw new RuntimeException("Failed to write to " + path, e);
        }
    }

    private static DataInputView contentsOf(Path path) {
        try {
            return new DataInputDeserializer(Files.readAllBytes(path));
        } catch (IOException e) {
            throw new RuntimeException("Failed to read contents of " + path, e);
        }
    }

    private static <T> void writeSerializerSnapshot(DataOutputView dataOutputView, TypeSerializer<T> typeSerializer, FlinkVersion flinkVersion) throws IOException {
        if (!flinkVersion.isNewerVersionThan(FlinkVersion.v1_6)) {
            throw new UnsupportedOperationException("There should be no longer a need to support/use this path since Flink 1.17");
        }
        writeSerializerSnapshotCurrentFormat(dataOutputView, typeSerializer);
    }

    private static <T> void writeSerializerSnapshotCurrentFormat(DataOutputView dataOutputView, TypeSerializer<T> typeSerializer) throws IOException {
        TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(dataOutputView, typeSerializer.snapshotConfiguration());
    }

    private static <T> TypeSerializerSnapshot<T> readSerializerSnapshot(DataInputView dataInputView, FlinkVersion flinkVersion) throws IOException {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        Preconditions.checkState(flinkVersion.isNewerVersionThan(FlinkVersion.v1_6));
        return readSerializerSnapshotCurrentFormat(dataInputView, contextClassLoader);
    }

    private static <T> TypeSerializerSnapshot<T> readSerializerSnapshotCurrentFormat(DataInputView dataInputView, ClassLoader classLoader) throws IOException {
        return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(dataInputView, classLoader);
    }

    private static <T> DataInputView readAndThenWriteData(DataInputView dataInputView, TypeSerializer<T> typeSerializer, TypeSerializer<T> typeSerializer2, Matcher<T> matcher) throws IOException {
        Object deserialize = typeSerializer.deserialize(dataInputView);
        AssertionsForInterfaceTypes.assertThat(deserialize).is(HamcrestCondition.matching(matcher));
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE);
        typeSerializer2.serialize(deserialize, dataOutputSerializer);
        return new DataInputDeserializer(dataOutputSerializer.wrapAsByteBuffer());
    }

    private static <T> TypeSerializerSnapshot<T> writeAndThenReadSerializerSnapshot(TypeSerializer<T> typeSerializer) throws IOException {
        DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(INITIAL_OUTPUT_BUFFER_SIZE);
        writeSerializerSnapshotCurrentFormat(dataOutputSerializer, typeSerializer);
        return readSerializerSnapshotCurrentFormat(new DataInputDeserializer(dataOutputSerializer.wrapAsByteBuffer()), Thread.currentThread().getContextClassLoader());
    }
}
