package org.apache.flink.streaming.api.functions;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.util.OptionalLong;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.JobInfoImpl;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.TaskInfoImpl;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.streaming.api.functions.sink.PrintSink;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/streaming/api/functions/PrintSinkTest.class */
class PrintSinkTest {
    private final PrintStream originalSystemOut = System.out;
    private final PrintStream originalSystemErr = System.err;
    private final ByteArrayOutputStream arrayOutputStream = new ByteArrayOutputStream();
    private final ByteArrayOutputStream arrayErrorStream = new ByteArrayOutputStream();
    private final String line = System.lineSeparator();

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/PrintSinkTest$DummyMailboxExecutor.class */
    private static class DummyMailboxExecutor implements MailboxExecutor {
        private DummyMailboxExecutor() {
        }

        public void execute(MailboxExecutor.MailOptions mailOptions, ThrowingRunnable<? extends Exception> throwingRunnable, String str, Object... objArr) {
        }

        public void yield() throws InterruptedException, FlinkRuntimeException {
        }

        public boolean tryYield() throws FlinkRuntimeException {
            return false;
        }

        public boolean shouldInterrupt() {
            return false;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/PrintSinkTest$MockContext.class */
    private static class MockContext implements SinkWriter.Context {
        private MockContext() {
        }

        public long currentWatermark() {
            return 0L;
        }

        public Long timestamp() {
            return Long.valueOf(System.currentTimeMillis());
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/PrintSinkTest$MockInitContext.class */
    private static class MockInitContext implements WriterInitContext, SerializationSchema.InitializationContext {
        private final JobInfo jobInfo = new JobInfoImpl(new JobID(), "MockJob");
        private final TaskInfo taskInfo;

        private MockInitContext(int i) {
            this.taskInfo = new TaskInfoImpl("MockTask", i + 1, 0, i, 0);
        }

        public UserCodeClassLoader getUserCodeClassLoader() {
            return SimpleUserCodeClassLoader.create(PrintSinkTest.class.getClassLoader());
        }

        public MailboxExecutor getMailboxExecutor() {
            return new DummyMailboxExecutor();
        }

        public ProcessingTimeService getProcessingTimeService() {
            return new TestProcessingTimeService();
        }

        public SinkWriterMetricGroup metricGroup() {
            return MetricsGroupTestUtils.mockWriterMetricGroup();
        }

        public MetricGroup getMetricGroup() {
            return metricGroup();
        }

        public OptionalLong getRestoredCheckpointId() {
            return OptionalLong.empty();
        }

        public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
            return this;
        }

        public boolean isObjectReuseEnabled() {
            return false;
        }

        public <IN> TypeSerializer<IN> createInputSerializer() {
            return null;
        }

        public JobInfo getJobInfo() {
            return this.jobInfo;
        }

        public TaskInfo getTaskInfo() {
            return this.taskInfo;
        }
    }

    PrintSinkTest() {
    }

    @BeforeEach
    void setUp() {
        System.setOut(new PrintStream(this.arrayOutputStream));
        System.setErr(new PrintStream(this.arrayErrorStream));
    }

    @AfterEach
    void tearDown() {
        if (System.out != this.originalSystemOut) {
            System.out.close();
        }
        if (System.err != this.originalSystemErr) {
            System.err.close();
        }
        System.setOut(this.originalSystemOut);
        System.setErr(this.originalSystemErr);
    }

    @Test
    void testPrintSinkStdOut() throws Exception {
        PrintSink printSink = new PrintSink();
        SinkWriter createWriter = printSink.createWriter(new MockInitContext(1));
        try {
            createWriter.write("hello world!", new MockContext());
            Assertions.assertThat(printSink).hasToString("Print to System.out");
            Assertions.assertThat(this.arrayOutputStream).hasToString("hello world!" + this.line);
            if (createWriter != null) {
                createWriter.close();
            }
        } catch (Throwable th) {
            if (createWriter != null) {
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testPrintSinkStdErr() throws Exception {
        PrintSink printSink = new PrintSink(true);
        SinkWriter createWriter = printSink.createWriter(new MockInitContext(1));
        try {
            createWriter.write("hello world!", new MockContext());
            Assertions.assertThat(printSink).hasToString("Print to System.err");
            Assertions.assertThat(this.arrayErrorStream).hasToString("hello world!" + this.line);
            if (createWriter != null) {
                createWriter.close();
            }
        } catch (Throwable th) {
            if (createWriter != null) {
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testPrintSinkStdErrWithIdentifier() throws Exception {
        PrintSink printSink = new PrintSink("mySink", true);
        SinkWriter createWriter = printSink.createWriter(new MockInitContext(1));
        try {
            createWriter.write("hello world!", new MockContext());
            Assertions.assertThat(printSink).hasToString("Print to System.err");
            Assertions.assertThat(this.arrayErrorStream).hasToString("mySink> hello world!" + this.line);
            if (createWriter != null) {
                createWriter.close();
            }
        } catch (Throwable th) {
            if (createWriter != null) {
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testPrintSinkWithPrefix() throws Exception {
        PrintSink printSink = new PrintSink();
        SinkWriter createWriter = printSink.createWriter(new MockInitContext(2));
        try {
            createWriter.write("hello world!", new MockContext());
            Assertions.assertThat(printSink).hasToString("Print to System.out");
            Assertions.assertThat(this.arrayOutputStream).hasToString("1> hello world!" + this.line);
            if (createWriter != null) {
                createWriter.close();
            }
        } catch (Throwable th) {
            if (createWriter != null) {
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testPrintSinkWithIdentifierAndPrefix() throws Exception {
        PrintSink printSink = new PrintSink("mySink");
        SinkWriter createWriter = printSink.createWriter(new MockInitContext(2));
        try {
            createWriter.write("hello world!", new MockContext());
            Assertions.assertThat(printSink).hasToString("Print to System.out");
            Assertions.assertThat(this.arrayOutputStream).hasToString("mySink:1> hello world!" + this.line);
            if (createWriter != null) {
                createWriter.close();
            }
        } catch (Throwable th) {
            if (createWriter != null) {
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testPrintSinkWithIdentifierButNoPrefix() throws Exception {
        PrintSink printSink = new PrintSink("mySink");
        SinkWriter createWriter = printSink.createWriter(new MockInitContext(1));
        try {
            createWriter.write("hello world!", new MockContext());
            Assertions.assertThat(printSink).hasToString("Print to System.out");
            Assertions.assertThat(this.arrayOutputStream).hasToString("mySink> hello world!" + this.line);
            if (createWriter != null) {
                createWriter.close();
            }
        } catch (Throwable th) {
            if (createWriter != null) {
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
