package io.trino.plugin.exchange.filesystem;

import com.google.common.base.Preconditions;
import com.google.common.base.Verify;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.concurrent.AsyncSemaphore;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeSourceHandle;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeContext;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeSinkHandle;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSourceHandle;
import io.trino.spi.exchange.ExchangeSourceHandleSource;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.runtime.ObjectMethods;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/* loaded from: input_file:io/trino/plugin/exchange/filesystem/FileSystemExchange.class */
public class FileSystemExchange implements Exchange {
    private static final Pattern PARTITION_FILE_NAME_PATTERN = Pattern.compile("(\\d+)_(\\d+)\\.data");
    private static final char[] RANDOMIZED_HEX_PREFIX_ALPHABET = "abcdef0123456789".toCharArray();
    private static final int RANDOMIZED_HEX_PREFIX_LENGTH = 6;
    private final List<URI> baseDirectories;
    private final FileSystemExchangeStorage exchangeStorage;
    private final FileSystemExchangeStats stats;
    private final ExchangeContext exchangeContext;
    private final int outputPartitionCount;
    private final boolean preserveOrderWithinPartition;
    private final int fileListingParallelism;
    private final long exchangeSourceHandleTargetDataSizeInBytes;
    private final ExecutorService executor;

    @GuardedBy("this")
    private boolean noMoreSinks;

    @GuardedBy("this")
    private boolean exchangeSourceHandlesCreationStarted;
    private final Map<Integer, URI> outputDirectories = new ConcurrentHashMap();

    @GuardedBy("this")
    private final Set<Integer> allSinks = new HashSet();

    @GuardedBy("this")
    private final Map<Integer, Integer> finishedSinks = new HashMap();
    private final CompletableFuture<List<ExchangeSourceHandle>> exchangeSourceHandlesFuture = new CompletableFuture<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/trino/plugin/exchange/filesystem/FileSystemExchange$CommittedTaskAttempt.class */
    public static final class CommittedTaskAttempt extends Record {
        private final int partitionId;
        private final int attemptId;

        public CommittedTaskAttempt(int i, int i2) {
            Preconditions.checkArgument(i >= 0, "partitionId is expected to be greater than or equal to zero: %s", i);
            Preconditions.checkArgument(i2 >= 0, "attemptId is expected to be greater than or equal to zero: %s", i2);
            this.partitionId = i;
            this.attemptId = i2;
        }

        @Override // java.lang.Record
        public final String toString() {
            return (String) ObjectMethods.bootstrap(MethodHandles.lookup(), "toString", MethodType.methodType(String.class, CommittedTaskAttempt.class), CommittedTaskAttempt.class, "partitionId;attemptId", "FIELD:Lio/trino/plugin/exchange/filesystem/FileSystemExchange$CommittedTaskAttempt;->partitionId:I", "FIELD:Lio/trino/plugin/exchange/filesystem/FileSystemExchange$CommittedTaskAttempt;->attemptId:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final int hashCode() {
            return (int) ObjectMethods.bootstrap(MethodHandles.lookup(), "hashCode", MethodType.methodType(Integer.TYPE, CommittedTaskAttempt.class), CommittedTaskAttempt.class, "partitionId;attemptId", "FIELD:Lio/trino/plugin/exchange/filesystem/FileSystemExchange$CommittedTaskAttempt;->partitionId:I", "FIELD:Lio/trino/plugin/exchange/filesystem/FileSystemExchange$CommittedTaskAttempt;->attemptId:I").dynamicInvoker().invoke(this) /* invoke-custom */;
        }

        @Override // java.lang.Record
        public final boolean equals(Object obj) {
            return (boolean) ObjectMethods.bootstrap(MethodHandles.lookup(), "equals", MethodType.methodType(Boolean.TYPE, CommittedTaskAttempt.class, Object.class), CommittedTaskAttempt.class, "partitionId;attemptId", "FIELD:Lio/trino/plugin/exchange/filesystem/FileSystemExchange$CommittedTaskAttempt;->partitionId:I", "FIELD:Lio/trino/plugin/exchange/filesystem/FileSystemExchange$CommittedTaskAttempt;->attemptId:I").dynamicInvoker().invoke(this, obj) /* invoke-custom */;
        }

        public int partitionId() {
            return this.partitionId;
        }

        public int attemptId() {
            return this.attemptId;
        }
    }

    public FileSystemExchange(List<URI> list, FileSystemExchangeStorage fileSystemExchangeStorage, FileSystemExchangeStats fileSystemExchangeStats, ExchangeContext exchangeContext, int i, boolean z, int i2, long j, ExecutorService executorService) {
        ArrayList arrayList = new ArrayList((Collection) Objects.requireNonNull(list, "baseDirectories is null"));
        Collections.shuffle(arrayList);
        this.baseDirectories = ImmutableList.copyOf(arrayList);
        this.exchangeStorage = (FileSystemExchangeStorage) Objects.requireNonNull(fileSystemExchangeStorage, "exchangeStorage is null");
        this.stats = (FileSystemExchangeStats) Objects.requireNonNull(fileSystemExchangeStats, "stats is null");
        this.exchangeContext = (ExchangeContext) Objects.requireNonNull(exchangeContext, "exchangeContext is null");
        this.outputPartitionCount = i;
        this.preserveOrderWithinPartition = z;
        this.fileListingParallelism = i2;
        this.exchangeSourceHandleTargetDataSizeInBytes = j;
        this.executor = (ExecutorService) Objects.requireNonNull(executorService, "executor is null");
    }

    public ExchangeId getId() {
        return this.exchangeContext.getExchangeId();
    }

    public synchronized ExchangeSinkHandle addSink(int i) {
        FileSystemExchangeSinkHandle fileSystemExchangeSinkHandle = new FileSystemExchangeSinkHandle(i);
        this.allSinks.add(Integer.valueOf(i));
        return fileSystemExchangeSinkHandle;
    }

    public void noMoreSinks() {
        synchronized (this) {
            this.noMoreSinks = true;
        }
    }

    public CompletableFuture<ExchangeSinkInstanceHandle> instantiateSink(ExchangeSinkHandle exchangeSinkHandle, int i) {
        FileSystemExchangeSinkHandle fileSystemExchangeSinkHandle = (FileSystemExchangeSinkHandle) exchangeSinkHandle;
        URI resolve = getTaskOutputDirectory(fileSystemExchangeSinkHandle.getPartitionId()).resolve(i + "/");
        try {
            this.exchangeStorage.createDirectories(resolve);
            return CompletableFuture.completedFuture(new FileSystemExchangeSinkInstanceHandle(fileSystemExchangeSinkHandle, resolve, this.outputPartitionCount, this.preserveOrderWithinPartition));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public CompletableFuture<ExchangeSinkInstanceHandle> updateSinkInstanceHandle(ExchangeSinkHandle exchangeSinkHandle, int i) {
        throw new UnsupportedOperationException();
    }

    public void sinkFinished(ExchangeSinkHandle exchangeSinkHandle, int i) {
        synchronized (this) {
            this.finishedSinks.putIfAbsent(Integer.valueOf(((FileSystemExchangeSinkHandle) exchangeSinkHandle).getPartitionId()), Integer.valueOf(i));
        }
    }

    public void allRequiredSinksFinished() {
        Verify.verify(!Thread.holdsLock(this));
        synchronized (this) {
            if (this.exchangeSourceHandlesCreationStarted) {
                return;
            }
            Verify.verify(this.noMoreSinks, "noMoreSinks is expected to be set", new Object[0]);
            Verify.verify(this.finishedSinks.keySet().containsAll(this.allSinks), "all sinks are expected to be finished", new Object[0]);
            this.exchangeSourceHandlesCreationStarted = true;
            Futures.addCallback((ListenableFuture) this.stats.getCreateExchangeSourceHandles().record(this::createExchangeSourceHandles), new FutureCallback<List<ExchangeSourceHandle>>() { // from class: io.trino.plugin.exchange.filesystem.FileSystemExchange.1
                public void onSuccess(List<ExchangeSourceHandle> list) {
                    FileSystemExchange.this.exchangeSourceHandlesFuture.complete(list);
                }

                public void onFailure(Throwable th) {
                    FileSystemExchange.this.exchangeSourceHandlesFuture.completeExceptionally(th);
                }
            }, MoreExecutors.directExecutor());
        }
    }

    private ListenableFuture<List<ExchangeSourceHandle>> createExchangeSourceHandles() {
        List list;
        synchronized (this) {
            list = (List) this.finishedSinks.entrySet().stream().map(entry -> {
                return new CommittedTaskAttempt(((Integer) entry.getKey()).intValue(), ((Integer) entry.getValue()).intValue());
            }).collect(ImmutableList.toImmutableList());
        }
        return Futures.transform(AsyncSemaphore.processAll(list, this::getCommittedPartitions, this.fileListingParallelism, this.executor), list2 -> {
            ArrayListMultimap create = ArrayListMultimap.create();
            list2.forEach(multimap -> {
                Objects.requireNonNull(create);
                multimap.forEach((v1, v2) -> {
                    r1.put(v1, v2);
                });
            });
            ImmutableList.Builder builder = ImmutableList.builder();
            for (Integer num : create.keySet()) {
                Collection<FileSystemExchangeSourceHandle.SourceFile> collection = create.get(num);
                long j = 0;
                ImmutableList.Builder builder2 = ImmutableList.builder();
                for (FileSystemExchangeSourceHandle.SourceFile sourceFile : collection) {
                    if (j > 0 && j + sourceFile.getFileSize() > this.exchangeSourceHandleTargetDataSizeInBytes) {
                        builder.add(new FileSystemExchangeSourceHandle(this.exchangeContext.getExchangeId(), num.intValue(), builder2.build()));
                        j = 0;
                        builder2 = ImmutableList.builder();
                    }
                    j += sourceFile.getFileSize();
                    builder2.add(sourceFile);
                }
                if (j > 0) {
                    builder.add(new FileSystemExchangeSourceHandle(this.exchangeContext.getExchangeId(), num.intValue(), builder2.build()));
                }
            }
            return builder.build();
        }, this.executor);
    }

    private ListenableFuture<Multimap<Integer, FileSystemExchangeSourceHandle.SourceFile>> getCommittedPartitions(CommittedTaskAttempt committedTaskAttempt) {
        URI taskOutputDirectory = getTaskOutputDirectory(committedTaskAttempt.partitionId());
        return this.stats.getGetCommittedPartitions().record(Futures.transform(this.exchangeStorage.listFilesRecursively(taskOutputDirectory), list -> {
            List<String> list = (List) list.stream().map((v0) -> {
                return v0.getFilePath();
            }).filter(str -> {
                return str.endsWith(FileSystemExchangeSink.COMMITTED_MARKER_FILE_NAME);
            }).collect(ImmutableList.toImmutableList());
            if (list.isEmpty()) {
                throw new IllegalStateException(String.format("No committed attempts found under sink output path %s", taskOutputDirectory));
            }
            for (String str2 : list) {
                String[] split = str2.split(FileSystemExchangeManager.PATH_SEPARATOR);
                Preconditions.checkState(split.length >= 3, "committedMarkerFilePath %s is malformed", str2);
                String str3 = split[split.length - 2];
                if (Integer.parseInt(str3) == committedTaskAttempt.attemptId()) {
                    int length = ((str2.length() - str3.length()) - FileSystemExchangeManager.PATH_SEPARATOR.length()) - FileSystemExchangeSink.COMMITTED_MARKER_FILE_NAME.length();
                    List<FileStatus> list2 = (List) list.stream().filter(fileStatus -> {
                        return fileStatus.getFilePath().startsWith(str3 + "/", length) && fileStatus.getFilePath().endsWith(FileSystemExchangeSink.DATA_FILE_SUFFIX);
                    }).collect(ImmutableList.toImmutableList());
                    ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
                    for (FileStatus fileStatus2 : list2) {
                        Matcher matcher = PARTITION_FILE_NAME_PATTERN.matcher(new File(fileStatus2.getFilePath()).getName());
                        Preconditions.checkState(matcher.matches(), "Unexpected partition file: %s", fileStatus2);
                        builder.put(Integer.valueOf(Integer.parseInt(matcher.group(1))), new FileSystemExchangeSourceHandle.SourceFile(fileStatus2.getFilePath(), fileStatus2.getFileSize(), committedTaskAttempt.partitionId(), committedTaskAttempt.attemptId()));
                    }
                    return builder.build();
                }
            }
            throw new IllegalArgumentException("committed attempt %s for task %s not found".formatted(Integer.valueOf(committedTaskAttempt.attemptId()), Integer.valueOf(committedTaskAttempt.partitionId())));
        }, this.executor));
    }

    private URI getTaskOutputDirectory(int i) {
        return this.outputDirectories.computeIfAbsent(Integer.valueOf(i), num -> {
            return this.baseDirectories.get(ThreadLocalRandom.current().nextInt(this.baseDirectories.size())).resolve(generateRandomizedHexPrefix() + "." + this.exchangeContext.getQueryId() + "." + this.exchangeContext.getExchangeId() + "." + i + "/");
        });
    }

    public ExchangeSourceHandleSource getSourceHandles() {
        return new ExchangeSourceHandleSource() { // from class: io.trino.plugin.exchange.filesystem.FileSystemExchange.2
            public CompletableFuture<ExchangeSourceHandleSource.ExchangeSourceHandleBatch> getNextBatch() {
                return FileSystemExchange.this.exchangeSourceHandlesFuture.thenApply(list -> {
                    return new ExchangeSourceHandleSource.ExchangeSourceHandleBatch(list, true);
                });
            }

            public void close() {
            }
        };
    }

    public void close() {
        this.stats.getCloseExchange().record(this.exchangeStorage.deleteRecursively((List) this.allSinks.stream().map((v1) -> {
            return getTaskOutputDirectory(v1);
        }).collect(ImmutableList.toImmutableList())));
    }

    private static String generateRandomizedHexPrefix() {
        char[] cArr = new char[RANDOMIZED_HEX_PREFIX_LENGTH];
        for (int i = 0; i < cArr.length; i++) {
            cArr[i] = RANDOMIZED_HEX_PREFIX_ALPHABET[ThreadLocalRandom.current().nextInt(RANDOMIZED_HEX_PREFIX_ALPHABET.length)];
        }
        return new String(cArr);
    }
}
