package org.apache.flink.iteration.datacache.nonkeyed;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.util.NonClosingInputStreamDecorator;
import org.apache.flink.runtime.util.NonClosingOutputStreamDecorator;
import org.apache.flink.statefun.flink.core.feedback.FeedbackConsumer;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

/* loaded from: input_file:org/apache/flink/iteration/datacache/nonkeyed/DataCacheSnapshot.class */
public class DataCacheSnapshot {
    private static final int CURRENT_VERSION = 1;
    private final FileSystem fileSystem;

    @Nullable
    private final Tuple2<Integer, Integer> readerPosition;
    private final List<Segment> segments;

    public DataCacheSnapshot(FileSystem fileSystem, @Nullable Tuple2<Integer, Integer> tuple2, List<Segment> list) {
        this.fileSystem = fileSystem;
        this.readerPosition = tuple2;
        this.segments = list;
        Iterator<Segment> it = list.iterator();
        while (it.hasNext()) {
            Preconditions.checkArgument(it.next().getFsSize() > 0);
        }
    }

    public FileSystem getFileSystem() {
        return this.fileSystem;
    }

    @Nullable
    public Tuple2<Integer, Integer> getReaderPosition() {
        return this.readerPosition;
    }

    public List<Segment> getSegments() {
        return this.segments;
    }

    public void writeTo(OutputStream outputStream) throws IOException {
        DataOutputStream dataOutputStream = new DataOutputStream(new NonClosingOutputStreamDecorator(outputStream));
        Throwable th = null;
        try {
            dataOutputStream.writeInt(CURRENT_VERSION);
            dataOutputStream.writeBoolean(this.readerPosition != null);
            if (this.readerPosition != null) {
                dataOutputStream.writeInt(((Integer) this.readerPosition.f0).intValue());
                dataOutputStream.writeInt(((Integer) this.readerPosition.f1).intValue());
            }
            dataOutputStream.writeBoolean(this.fileSystem.isDistributedFS());
            if (this.fileSystem.isDistributedFS()) {
                serializeSegments(this.segments, dataOutputStream);
            } else {
                dataOutputStream.writeInt(this.segments.size());
                for (Segment segment : this.segments) {
                    dataOutputStream.writeInt(segment.getCount());
                    dataOutputStream.writeLong(segment.getFsSize());
                    FSDataInputStream open = this.fileSystem.open(segment.getPath());
                    Throwable th2 = null;
                    try {
                        try {
                            IOUtils.copyBytes(open, outputStream, false);
                            if (open != null) {
                                if (0 != 0) {
                                    try {
                                        open.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    open.close();
                                }
                            }
                        } catch (Throwable th4) {
                            th2 = th4;
                            throw th4;
                        }
                    } catch (Throwable th5) {
                        if (open != null) {
                            if (th2 != null) {
                                try {
                                    open.close();
                                } catch (Throwable th6) {
                                    th2.addSuppressed(th6);
                                }
                            } else {
                                open.close();
                            }
                        }
                        throw th5;
                    }
                }
            }
            if (dataOutputStream != null) {
                if (0 == 0) {
                    dataOutputStream.close();
                    return;
                }
                try {
                    dataOutputStream.close();
                } catch (Throwable th7) {
                    th.addSuppressed(th7);
                }
            }
        } catch (Throwable th8) {
            if (dataOutputStream != null) {
                if (0 != 0) {
                    try {
                        dataOutputStream.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    dataOutputStream.close();
                }
            }
            throw th8;
        }
    }

    public static <T> void replay(InputStream inputStream, TypeSerializer<T> typeSerializer, FeedbackConsumer<T> feedbackConsumer) throws Exception {
        DataInputStream dataInputStream = new DataInputStream(new NonClosingInputStreamDecorator(inputStream));
        Throwable th = null;
        try {
            Preconditions.checkState(dataInputStream.readInt() == CURRENT_VERSION, "Currently only support version 1");
            parseReaderPosition(dataInputStream);
            if (dataInputStream.readBoolean()) {
                DataCacheReader dataCacheReader = new DataCacheReader(typeSerializer, deserializeSegments(dataInputStream));
                while (dataCacheReader.hasNext()) {
                    feedbackConsumer.processFeedback(dataCacheReader.next());
                }
            } else {
                DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(dataInputStream);
                int readInt = dataInputStream.readInt();
                for (int i = 0; i < readInt; i += CURRENT_VERSION) {
                    int readInt2 = dataInputStream.readInt();
                    dataInputStream.readLong();
                    for (int i2 = 0; i2 < readInt2; i2 += CURRENT_VERSION) {
                        feedbackConsumer.processFeedback(typeSerializer.deserialize(dataInputViewStreamWrapper));
                    }
                }
            }
            if (dataInputStream != null) {
                if (0 == 0) {
                    dataInputStream.close();
                    return;
                }
                try {
                    dataInputStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (dataInputStream != null) {
                if (0 != 0) {
                    try {
                        dataInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    dataInputStream.close();
                }
            }
            throw th3;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public static DataCacheSnapshot recover(InputStream inputStream, FileSystem fileSystem, SupplierWithException<Path, IOException> supplierWithException) throws IOException {
        List arrayList;
        DataInputStream dataInputStream = new DataInputStream(new NonClosingInputStreamDecorator(inputStream));
        Throwable th = null;
        try {
            Preconditions.checkState(dataInputStream.readInt() == CURRENT_VERSION, "Currently only support version 1");
            Tuple2<Integer, Integer> parseReaderPosition = parseReaderPosition(dataInputStream);
            boolean readBoolean = dataInputStream.readBoolean();
            Preconditions.checkState(readBoolean == fileSystem.isDistributedFS(), "Currently we do not support changing the cache file system. If required, please manually copy the directory from one filesystem to another.");
            if (readBoolean) {
                arrayList = deserializeSegments(dataInputStream);
            } else {
                int readInt = dataInputStream.readInt();
                arrayList = new ArrayList(readInt);
                for (int i = 0; i < readInt; i += CURRENT_VERSION) {
                    int readInt2 = dataInputStream.readInt();
                    long readLong = dataInputStream.readLong();
                    Path path = (Path) supplierWithException.get();
                    FSDataOutputStream create = fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE);
                    Throwable th2 = null;
                    try {
                        try {
                            BoundedInputStream boundedInputStream = new BoundedInputStream(inputStream, readLong);
                            boundedInputStream.setPropagateClose(false);
                            IOUtils.copyBytes(boundedInputStream, create, false);
                            boundedInputStream.close();
                            if (create != null) {
                                if (0 != 0) {
                                    try {
                                        create.close();
                                    } catch (Throwable th3) {
                                        th2.addSuppressed(th3);
                                    }
                                } else {
                                    create.close();
                                }
                            }
                            arrayList.add(new Segment(path, readInt2, readLong));
                        } finally {
                        }
                    } catch (Throwable th4) {
                        if (create != null) {
                            if (th2 != null) {
                                try {
                                    create.close();
                                } catch (Throwable th5) {
                                    th2.addSuppressed(th5);
                                }
                            } else {
                                create.close();
                            }
                        }
                        throw th4;
                    }
                }
            }
            DataCacheSnapshot dataCacheSnapshot = new DataCacheSnapshot(fileSystem, parseReaderPosition, arrayList);
            if (dataInputStream != null) {
                if (0 != 0) {
                    try {
                        dataInputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    dataInputStream.close();
                }
            }
            return dataCacheSnapshot;
        } catch (Throwable th7) {
            if (dataInputStream != null) {
                if (0 != 0) {
                    try {
                        dataInputStream.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    dataInputStream.close();
                }
            }
            throw th7;
        }
    }

    public <T> void tryReadSegmentsToMemory(TypeSerializer<T> typeSerializer, MemorySegmentPool memorySegmentPool) throws IOException {
        for (Segment segment : this.segments) {
            if (segment.getCache().isEmpty()) {
                FileSegmentReader fileSegmentReader = new FileSegmentReader(typeSerializer, segment, 0);
                try {
                    MemorySegmentWriter memorySegmentWriter = new MemorySegmentWriter(typeSerializer, segment.getPath(), memorySegmentPool, segment.getFsSize());
                    boolean z = CURRENT_VERSION;
                    while (z && fileSegmentReader.hasNext()) {
                        if (!memorySegmentWriter.addRecord(fileSegmentReader.next())) {
                            memorySegmentWriter.finish().ifPresent(segment2 -> {
                                memorySegmentPool.returnAll(segment2.getCache());
                            });
                            z = false;
                        }
                    }
                    if (z) {
                        segment.setCache(memorySegmentWriter.finish().get().getCache());
                    }
                } catch (MemoryAllocationException e) {
                    return;
                }
            }
        }
    }

    private static Tuple2<Integer, Integer> parseReaderPosition(DataInputStream dataInputStream) throws IOException {
        Tuple2<Integer, Integer> tuple2 = null;
        if (dataInputStream.readBoolean()) {
            tuple2 = new Tuple2<>(Integer.valueOf(dataInputStream.readInt()), Integer.valueOf(dataInputStream.readInt()));
        }
        return tuple2;
    }

    private static void serializeSegments(List<Segment> list, DataOutputStream dataOutputStream) throws IOException {
        dataOutputStream.writeInt(list.size());
        for (Segment segment : list) {
            dataOutputStream.writeUTF(segment.getPath().toString());
            dataOutputStream.writeInt(segment.getCount());
            dataOutputStream.writeLong(segment.getFsSize());
        }
    }

    private static List<Segment> deserializeSegments(DataInputStream dataInputStream) throws IOException {
        ArrayList arrayList = new ArrayList();
        int readInt = dataInputStream.readInt();
        for (int i = 0; i < readInt; i += CURRENT_VERSION) {
            arrayList.add(new Segment(new Path(dataInputStream.readUTF()), dataInputStream.readInt(), dataInputStream.readLong()));
        }
        return arrayList;
    }
}
