/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.util.testing;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;

public class CollectingSink<T>
implements Sink<T> {
    private static final long serialVersionUID = 1L;
    private static final List<BlockingQueue<Object>> queues = Collections.synchronizedList(new ArrayList());
    private static final AtomicInteger numSinks = new AtomicInteger(-1);
    private final int index = numSinks.incrementAndGet();

    public CollectingSink() {
        queues.add(new LinkedBlockingQueue());
    }

    public SinkWriter<T> createWriter(WriterInitContext context) throws IOException {
        return new CollectingElementWriter(this.index);
    }

    public List<T> getRemainingOutput() {
        return new ArrayList<Object>(queues.get(this.index));
    }

    public boolean isEmpty() {
        return queues.get(this.index).isEmpty();
    }

    public T poll() throws TimeoutException {
        return this.poll(Duration.ofSeconds(15L));
    }

    public T poll(Duration duration) throws TimeoutException {
        Object element;
        try {
            element = queues.get(this.index).poll(duration.toMillis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException var4) {
            throw new RuntimeException(var4);
        }
        if (element == null) {
            throw new TimeoutException();
        }
        return (T)element;
    }

    public void close() {
        queues.get(this.index).clear();
    }

    private class CollectingElementWriter
    implements SinkWriter<T> {
        private final int index;

        public CollectingElementWriter(int index) {
            this.index = index;
        }

        public void write(T element, SinkWriter.Context context) {
            queues.get(this.index).add(element);
        }

        public void flush(boolean endOfInput) {
        }

        public void close() {
        }
    }
}

