package org.apache.flink.runtime.testutils;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.taskmanager.TaskTest;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.jupiter.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/testutils/CommonTestUtils.class */
public class CommonTestUtils {
    private static final Logger LOG = LoggerFactory.getLogger(CommonTestUtils.class);
    private static final long RETRY_INTERVAL = 100;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.runtime.testutils.CommonTestUtils$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/testutils/CommonTestUtils$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$execution$ExecutionState = new int[ExecutionState.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$execution$ExecutionState[ExecutionState.RUNNING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$execution$ExecutionState[ExecutionState.FINISHED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/testutils/CommonTestUtils$PipeForwarder.class */
    public static class PipeForwarder extends Thread {
        private final StringWriter target;
        private final InputStream source;

        public PipeForwarder(InputStream inputStream, StringWriter stringWriter) {
            super("Pipe Forwarder");
            setDaemon(true);
            this.source = inputStream;
            this.target = stringWriter;
            start();
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    int read = this.source.read();
                    if (read == -1) {
                        return;
                    } else {
                        this.target.write(read);
                    }
                } catch (IOException e) {
                    return;
                }
            }
        }
    }

    public static String getCurrentClasspath() {
        return ManagementFactory.getRuntimeMXBean().getClassPath();
    }

    public static File createTemporaryLog4JProperties() throws IOException {
        File createTempFile = File.createTempFile(FileUtils.getRandomFilename(""), "-log4j.properties");
        createTempFile.deleteOnExit();
        printLog4jDebugConfig(createTempFile);
        return createTempFile;
    }

    public static String getJavaCommandPath() {
        File file = new File(System.getProperty("java.home"));
        String absolutePath = new File(file, "java").getAbsolutePath();
        String absolutePath2 = new File(new File(file, "bin"), "java").getAbsolutePath();
        try {
            if (new ProcessBuilder(absolutePath, "-version").start().waitFor() == 0) {
                return absolutePath;
            }
        } catch (Throwable th) {
        }
        try {
            if (new ProcessBuilder(absolutePath2, "-version").start().waitFor() == 0) {
                return absolutePath2;
            }
            return null;
        } catch (Throwable th2) {
            return null;
        }
    }

    public static void printLog4jDebugConfig(File file) throws IOException {
        PrintWriter printWriter = new PrintWriter(new FileWriter(file));
        try {
            printWriter.println("rootLogger.level = DEBUG");
            printWriter.println("rootLogger.appenderRef.console.ref = ConsoleAppender");
            printWriter.println("appender.console.name = ConsoleAppender");
            printWriter.println("appender.console.type = CONSOLE");
            printWriter.println("appender.console.target = SYSTEM_ERR");
            printWriter.println("appender.console.layout.type = PatternLayout");
            printWriter.println("appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-4r [%t] %-5p %c %x - %m%n");
            printWriter.println("logger.jetty.name = org.eclipse.jetty.util.log");
            printWriter.println("logger.jetty.level = OFF");
            printWriter.println("logger.zookeeper.name = org.apache.zookeeper");
            printWriter.println("logger.zookeeper.level = OFF");
            printWriter.flush();
            printWriter.close();
        } catch (Throwable th) {
            try {
                printWriter.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    public static void waitUntilCondition(SupplierWithException<Boolean, Exception> supplierWithException) throws Exception {
        waitUntilCondition(supplierWithException, RETRY_INTERVAL);
    }

    public static void waitUntilCondition(SupplierWithException<Boolean, Exception> supplierWithException, long j) throws Exception {
        while (!((Boolean) supplierWithException.get()).booleanValue()) {
            Thread.sleep(j);
        }
    }

    public static void waitUntilCondition(SupplierWithException<Boolean, Exception> supplierWithException, int i) throws Exception {
        waitUntilCondition(supplierWithException, RETRY_INTERVAL, i);
    }

    public static void waitUntilCondition(SupplierWithException<Boolean, Exception> supplierWithException, long j, int i) throws Exception {
        while (!((Boolean) supplierWithException.get()).booleanValue() && i != 0) {
            i--;
            LOG.debug("Condition not true. Remaining retry attempts {}", Integer.valueOf(i));
            LOG.debug("Sleeping for {} milliseconds", Long.valueOf(j));
            Thread.sleep(j);
        }
        if (i == 0) {
            throw new FlinkException("Exhausted retry attempts.");
        }
        LOG.debug("Condition true. Remaining retry attempts {}", Integer.valueOf(i));
    }

    public static void waitForAllTaskRunning(MiniCluster miniCluster, JobID jobID, boolean z) throws Exception {
        waitForAllTaskRunning(() -> {
            return getGraph(miniCluster, jobID);
        }, z);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static AccessExecutionGraph getGraph(MiniCluster miniCluster, JobID jobID) throws InterruptedException, ExecutionException, TimeoutException {
        return (AccessExecutionGraph) miniCluster.getExecutionGraph(jobID).get(60L, TimeUnit.SECONDS);
    }

    public static void waitForAllTaskRunning(SupplierWithException<AccessExecutionGraph, Exception> supplierWithException, boolean z) throws Exception {
        Predicate predicate = accessExecutionVertex -> {
            switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$execution$ExecutionState[accessExecutionVertex.getExecutionState().ordinal()]) {
                case 1:
                    return true;
                case TaskTest.InvokableDecliningCheckpoints.REJECTED_EXECUTION_CHECKPOINT_ID /* 2 */:
                    if (z) {
                        return true;
                    }
                    throw new RuntimeException("Sub-Task finished unexpectedly" + accessExecutionVertex);
                default:
                    return false;
            }
        };
        waitUntilCondition(() -> {
            AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph) supplierWithException.get();
            if (accessExecutionGraph.getState().isGloballyTerminalState()) {
                ErrorInfo failureInfo = accessExecutionGraph.getFailureInfo();
                Assertions.fail(String.format("Graph is in globally terminal state (%s)", accessExecutionGraph.getState()), failureInfo != null ? failureInfo.getException() : null);
            }
            return Boolean.valueOf(accessExecutionGraph.getState() == JobStatus.RUNNING && accessExecutionGraph.getAllVertices().values().stream().allMatch(accessExecutionJobVertex -> {
                return Arrays.stream(accessExecutionJobVertex.getTaskVertices()).allMatch(predicate);
            }));
        });
    }

    public static void waitForAllTaskRunning(SupplierWithException<JobDetailsInfo, Exception> supplierWithException) throws Exception {
        waitUntilCondition(() -> {
            Collection<JobDetailsInfo.JobVertexDetailsInfo> jobVertexInfos = ((JobDetailsInfo) supplierWithException.get()).getJobVertexInfos();
            if (jobVertexInfos.size() == 0) {
                return false;
            }
            for (JobDetailsInfo.JobVertexDetailsInfo jobVertexDetailsInfo : jobVertexInfos) {
                Integer num = (Integer) jobVertexDetailsInfo.getTasksPerState().get(ExecutionState.RUNNING);
                if (num == null || num.intValue() != jobVertexDetailsInfo.getParallelism()) {
                    return false;
                }
            }
            return true;
        });
    }

    public static void waitForNoTaskRunning(SupplierWithException<JobDetailsInfo, Exception> supplierWithException) throws Exception {
        waitUntilCondition(() -> {
            Integer num = (Integer) ((JobDetailsInfo) supplierWithException.get()).getJobVerticesPerState().get(ExecutionState.RUNNING);
            return Boolean.valueOf(num == null || num.equals(0));
        });
    }

    public static void waitUntilJobManagerIsInitialized(SupplierWithException<JobStatus, Exception> supplierWithException) throws Exception {
        waitUntilCondition((SupplierWithException<Boolean, Exception>) () -> {
            return Boolean.valueOf(supplierWithException.get() != JobStatus.INITIALIZING);
        }, 20L);
    }

    public static void waitForJobStatus(JobClient jobClient, List<JobStatus> list) throws Exception {
        waitUntilCondition(() -> {
            JobStatus jobStatus = (JobStatus) jobClient.getJobStatus().get();
            if (list.contains(jobStatus)) {
                return true;
            }
            if (!jobStatus.isTerminalState()) {
                return false;
            }
            try {
                jobClient.getJobExecutionResult().get();
                throw new IllegalStateException(String.format("Job has entered a terminal state %s, but expecting %s", jobStatus, list));
            } catch (Exception e) {
                throw new IllegalStateException(String.format("Job has entered %s state, but expecting %s", jobStatus, list), e);
            }
        });
    }

    public static void terminateJob(JobClient jobClient) throws Exception {
        jobClient.cancel().get();
    }

    public static void waitForSubtasksToFinish(MiniCluster miniCluster, JobID jobID, JobVertexID jobVertexID, boolean z) throws Exception {
        Predicate predicate = accessExecutionVertex -> {
            ExecutionState executionState = accessExecutionVertex.getExecutionState();
            if (executionState == ExecutionState.FINISHED) {
                return true;
            }
            if (executionState.isTerminal()) {
                throw new RuntimeException(String.format("Sub-Task %s is already in a terminal state %s", accessExecutionVertex, executionState));
            }
            return false;
        };
        waitUntilCondition(() -> {
            AccessExecutionGraph graph = getGraph(miniCluster, jobID);
            if (graph.getState() != JobStatus.RUNNING) {
                return false;
            }
            Stream stream = Arrays.stream(((AccessExecutionJobVertex) graph.getAllVertices().values().stream().filter(accessExecutionJobVertex -> {
                return accessExecutionJobVertex.getJobVertexId().equals(jobVertexID);
            }).findAny().orElseThrow(() -> {
                return new RuntimeException("Vertex not found " + jobVertexID);
            })).getTaskVertices());
            return Boolean.valueOf(z ? stream.allMatch(predicate) : stream.anyMatch(predicate));
        });
    }

    public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, int i) throws Exception {
        waitForCheckpoints(jobID, miniCluster, checkpointStatsSnapshot -> {
            return checkpointStatsSnapshot != null && checkpointStatsSnapshot.getCounts().getNumberOfCompletedCheckpoints() >= ((long) i);
        });
    }

    public static void waitForNewCheckpoint(JobID jobID, MiniCluster miniCluster) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        waitForCheckpoints(jobID, miniCluster, checkpointStatsSnapshot -> {
            CompletedCheckpointStats latestCompletedCheckpoint;
            return (checkpointStatsSnapshot == null || (latestCompletedCheckpoint = checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint()) == null || latestCompletedCheckpoint.getTriggerTimestamp() <= currentTimeMillis) ? false : true;
        });
    }

    private static void waitForCheckpoints(JobID jobID, MiniCluster miniCluster, Predicate<CheckpointStatsSnapshot> predicate) throws Exception {
        waitUntilCondition(() -> {
            AccessExecutionGraph accessExecutionGraph = (AccessExecutionGraph) miniCluster.getExecutionGraph(jobID).get();
            if (predicate.test(accessExecutionGraph.getCheckpointStatsSnapshot())) {
                return true;
            }
            if (!accessExecutionGraph.getState().isGloballyTerminalState()) {
                return false;
            }
            Preconditions.checkState(accessExecutionGraph.getFailureInfo() != null, "Job terminated (state=%s) before completing the requested checkpoint(s).", new Object[]{accessExecutionGraph.getState()});
            throw accessExecutionGraph.getFailureInfo().getException();
        });
    }

    public static Optional<String> getLatestCompletedCheckpointPath(JobID jobID, MiniCluster miniCluster) throws InterruptedException, ExecutionException, FlinkJobNotFoundException {
        return Optional.ofNullable(((AccessExecutionGraph) miniCluster.getExecutionGraph(jobID).get()).getCheckpointStatsSnapshot()).flatMap(checkpointStatsSnapshot -> {
            return Optional.ofNullable(checkpointStatsSnapshot.getHistory().getLatestCompletedCheckpoint());
        }).map((v0) -> {
            return v0.getExternalPath();
        });
    }

    public static boolean isStreamContentEqual(InputStream inputStream, InputStream inputStream2) throws IOException {
        if (!(inputStream instanceof BufferedInputStream)) {
            inputStream = new BufferedInputStream(inputStream);
        }
        if (!(inputStream2 instanceof BufferedInputStream)) {
            inputStream2 = new BufferedInputStream(inputStream2);
        }
        int read = inputStream.read();
        while (true) {
            int i = read;
            if (-1 == i) {
                return inputStream2.read() == -1;
            }
            if (i != inputStream2.read()) {
                return false;
            }
            read = inputStream.read();
        }
    }
}
