package io.trino.operator;

import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.base.Verify;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Multimaps;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.airlift.slice.DynamicSliceOutput;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceOutput;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.annotation.NotThreadSafe;
import io.trino.exchange.ExchangeManagerRegistry;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.execution.scheduler.Exchanges;
import io.trino.spi.QueryId;
import io.trino.spi.StandardErrorCode;
import io.trino.spi.TrinoException;
import io.trino.spi.exchange.Exchange;
import io.trino.spi.exchange.ExchangeContext;
import io.trino.spi.exchange.ExchangeId;
import io.trino.spi.exchange.ExchangeManager;
import io.trino.spi.exchange.ExchangeSink;
import io.trino.spi.exchange.ExchangeSinkHandle;
import io.trino.spi.exchange.ExchangeSinkInstanceHandle;
import io.trino.spi.exchange.ExchangeSource;
import io.trino.spi.exchange.ExchangeSourceOutputSelector;
import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/* loaded from: input_file:io/trino/operator/DeduplicatingDirectExchangeBuffer.class */
public class DeduplicatingDirectExchangeBuffer implements DirectExchangeBuffer {
    private static final Logger log = Logger.get(DeduplicatingDirectExchangeBuffer.class);
    private static final Duration SINK_INSTANCE_HANDLE_GET_TIMEOUT = Duration.succinctDuration(60.0d, TimeUnit.SECONDS);
    private final Executor executor;

    @GuardedBy("this")
    private boolean noMoreTasks;

    @GuardedBy("this")
    private int maxAttemptId;

    @GuardedBy("this")
    private final PageBuffer pageBuffer;

    @GuardedBy("this")
    private OutputSource outputSource;

    @GuardedBy("this")
    private long maxRetainedSizeInBytes;

    @GuardedBy("this")
    private Throwable failure;

    @GuardedBy("this")
    private boolean closed;

    @GuardedBy("this")
    private final Set<TaskId> allTasks = new HashSet();

    @GuardedBy("this")
    private final Set<TaskId> successfulTasks = new HashSet();

    @GuardedBy("this")
    private final Map<TaskId, Throwable> failedTasks = new HashMap();
    private final SettableFuture<Void> outputReady = SettableFuture.create();

    /* JADX INFO: Access modifiers changed from: private */
    @NotThreadSafe
    /* loaded from: input_file:io/trino/operator/DeduplicatingDirectExchangeBuffer$ExchangeOutputSource.class */
    public static class ExchangeOutputSource implements OutputSource {
        private final Set<TaskId> selectedTasks;
        private final QueryId queryId;
        private ListenableFuture<ExchangeSource> exchangeSourceFuture;
        private ExchangeSource exchangeSource;
        private boolean finished;

        private ExchangeOutputSource(Set<TaskId> set, QueryId queryId, ListenableFuture<ExchangeSource> listenableFuture) {
            this.selectedTasks = ImmutableSet.copyOf((Collection) Objects.requireNonNull(set, "selectedTasks is null"));
            this.queryId = (QueryId) Objects.requireNonNull(queryId, "queryId is null");
            this.exchangeSourceFuture = (ListenableFuture) Objects.requireNonNull(listenableFuture, "exchangeSourceFuture is null");
        }

        @Override // io.trino.operator.DeduplicatingDirectExchangeBuffer.OutputSource
        public Slice getNext() {
            if (this.finished) {
                return null;
            }
            if (this.exchangeSource == null) {
                if (!this.exchangeSourceFuture.isDone()) {
                    return null;
                }
                this.exchangeSource = (ExchangeSource) MoreFutures.getFutureValue(this.exchangeSourceFuture);
            }
            while (!this.exchangeSource.isFinished()) {
                if (!this.exchangeSource.isBlocked().isDone()) {
                    return null;
                }
                Slice read = this.exchangeSource.read();
                if (read != null) {
                    int i = read.getInt(0);
                    if (this.selectedTasks.contains(new TaskId(new StageId(this.queryId, i), read.getInt(4), read.getInt(8)))) {
                        return read.slice(12, read.length() - 12);
                    }
                }
            }
            close();
            return null;
        }

        @Override // io.trino.operator.DeduplicatingDirectExchangeBuffer.OutputSource
        public boolean isFinished() {
            return this.finished;
        }

        @Override // io.trino.operator.DeduplicatingDirectExchangeBuffer.OutputSource
        public ListenableFuture<Void> isBlocked() {
            if (this.finished) {
                return Futures.immediateVoidFuture();
            }
            if (!this.exchangeSourceFuture.isDone()) {
                return Futures.nonCancellationPropagating(MoreFutures.asVoid(Futures.transformAsync(this.exchangeSourceFuture, exchangeSource -> {
                    return MoreFutures.toListenableFuture(exchangeSource.isBlocked());
                }, MoreExecutors.directExecutor())));
            }
            if (this.exchangeSource != null) {
                CompletableFuture isBlocked = this.exchangeSource.isBlocked();
                if (!isBlocked.isDone()) {
                    return Futures.nonCancellationPropagating(MoreFutures.toListenableFuture(isBlocked));
                }
            }
            return Futures.immediateVoidFuture();
        }

        @Override // io.trino.operator.DeduplicatingDirectExchangeBuffer.OutputSource
        public long getRetainedSizeInBytes() {
            if (this.exchangeSource != null) {
                return this.exchangeSource.getMemoryUsage();
            }
            return 0L;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.finished) {
                return;
            }
            this.finished = true;
            this.exchangeSource = null;
            Futures.addCallback(this.exchangeSourceFuture, new FutureCallback<ExchangeSource>() { // from class: io.trino.operator.DeduplicatingDirectExchangeBuffer.ExchangeOutputSource.1
                public void onSuccess(ExchangeSource exchangeSource) {
                    try {
                        exchangeSource.close();
                    } catch (RuntimeException e) {
                        DeduplicatingDirectExchangeBuffer.log.warn(e, "error closing exchange source");
                    }
                }

                public void onFailure(Throwable th) {
                }
            }, MoreExecutors.directExecutor());
            this.exchangeSourceFuture = null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotThreadSafe
    /* loaded from: input_file:io/trino/operator/DeduplicatingDirectExchangeBuffer$InMemoryBufferOutputSource.class */
    public static class InMemoryBufferOutputSource implements OutputSource {
        private final Iterator<Slice> iterator;

        private InMemoryBufferOutputSource(Iterator<Slice> it) {
            this.iterator = (Iterator) Objects.requireNonNull(it, "iterator is null");
        }

        @Override // io.trino.operator.DeduplicatingDirectExchangeBuffer.OutputSource
        public Slice getNext() {
            if (this.iterator.hasNext()) {
                return this.iterator.next();
            }
            return null;
        }

        @Override // io.trino.operator.DeduplicatingDirectExchangeBuffer.OutputSource
        public boolean isFinished() {
            return !this.iterator.hasNext();
        }

        @Override // io.trino.operator.DeduplicatingDirectExchangeBuffer.OutputSource
        public ListenableFuture<Void> isBlocked() {
            return Futures.immediateVoidFuture();
        }

        @Override // io.trino.operator.DeduplicatingDirectExchangeBuffer.OutputSource
        public long getRetainedSizeInBytes() {
            return 0L;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotThreadSafe
    /* loaded from: input_file:io/trino/operator/DeduplicatingDirectExchangeBuffer$OutputSource.class */
    public interface OutputSource extends Closeable {
        Slice getNext();

        boolean isFinished();

        ListenableFuture<Void> isBlocked();

        long getRetainedSizeInBytes();
    }

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/trino/operator/DeduplicatingDirectExchangeBuffer$PageBuffer.class */
    public static class PageBuffer implements Closeable {
        private final ExchangeManagerRegistry exchangeManagerRegistry;
        private final QueryId queryId;
        private final ExchangeId exchangeId;
        private final Executor executor;
        private final long pageBufferCapacityInBytes;

        @GuardedBy("this")
        private final ListMultimap<TaskId, Slice> pageBuffer = ArrayListMultimap.create();

        @GuardedBy("this")
        private long pageBufferRetainedSizeInBytes;

        @GuardedBy("this")
        private ExchangeManager exchangeManager;

        @GuardedBy("this")
        private Exchange exchange;

        @GuardedBy("this")
        private ExchangeSinkHandle sinkHandle;

        @GuardedBy("this")
        private ExchangeSink exchangeSink;

        @GuardedBy("this")
        private SliceOutput writeBuffer;

        @GuardedBy("this")
        private int bufferedPageCount;

        @GuardedBy("this")
        private long spilledBytes;

        @GuardedBy("this")
        private int spilledPageCount;

        @GuardedBy("this")
        private boolean inputFinished;

        @GuardedBy("this")
        private boolean closed;

        private PageBuffer(ExchangeManagerRegistry exchangeManagerRegistry, QueryId queryId, ExchangeId exchangeId, Executor executor, DataSize dataSize) {
            this.exchangeManagerRegistry = (ExchangeManagerRegistry) Objects.requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
            this.queryId = (QueryId) Objects.requireNonNull(queryId, "queryId is null");
            this.exchangeId = (ExchangeId) Objects.requireNonNull(exchangeId, "exchangeId is null");
            this.executor = (Executor) Objects.requireNonNull(executor, "executor is null");
            this.pageBufferCapacityInBytes = dataSize.toBytes();
        }

        public synchronized void addPages(TaskId taskId, List<Slice> list) {
            if (this.closed || this.inputFinished) {
                return;
            }
            long retainedSizeInBytes = getRetainedSizeInBytes(list);
            if (this.exchangeSink == null && this.pageBufferRetainedSizeInBytes + retainedSizeInBytes <= this.pageBufferCapacityInBytes) {
                this.pageBuffer.putAll(taskId, list);
                this.pageBufferRetainedSizeInBytes += retainedSizeInBytes;
                this.bufferedPageCount += list.size();
                return;
            }
            if (this.exchangeSink == null) {
                Verify.verify(this.exchangeManager == null, "exchangeManager is not expected to be initialized", new Object[0]);
                Verify.verify(this.exchange == null, "exchange is not expected to be initialized", new Object[0]);
                Verify.verify(this.sinkHandle == null, "sinkHandle is not expected to be initialized", new Object[0]);
                Verify.verify(this.writeBuffer == null, "writeBuffer is not expected to be initialized", new Object[0]);
                this.exchangeManager = this.exchangeManagerRegistry.getExchangeManager();
                this.exchange = this.exchangeManager.createExchange(new ExchangeContext(this.queryId, this.exchangeId), 1, true);
                this.sinkHandle = this.exchange.addSink(0);
                this.exchange.noMoreSinks();
                try {
                    this.exchangeSink = this.exchangeManager.createSink((ExchangeSinkInstanceHandle) this.exchange.instantiateSink(this.sinkHandle, 0).get(DeduplicatingDirectExchangeBuffer.SINK_INSTANCE_HANDLE_GET_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS));
                    this.writeBuffer = new DynamicSliceOutput(IncrementalLoadFactorHashArraySizeSupplier.THRESHOLD_50);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                } catch (Exception e2) {
                    throw new RuntimeException(e2);
                }
            }
            if (!this.pageBuffer.isEmpty()) {
                for (Map.Entry entry : Multimaps.asMap(this.pageBuffer).entrySet()) {
                    writeToSink((TaskId) entry.getKey(), (List) entry.getValue());
                }
                this.pageBuffer.clear();
                this.pageBufferRetainedSizeInBytes = 0L;
            }
            writeToSink(taskId, list);
            this.bufferedPageCount += list.size();
        }

        private static long getRetainedSizeInBytes(List<Slice> list) {
            long j = 0;
            Iterator<Slice> it = list.iterator();
            while (it.hasNext()) {
                j += it.next().getRetainedSize();
            }
            return j;
        }

        @GuardedBy("this")
        private void writeToSink(TaskId taskId, List<Slice> list) {
            Verify.verify(this.exchangeSink != null, "exchangeSink is expected to be initialized", new Object[0]);
            Verify.verify(this.writeBuffer != null, "writeBuffer is expected to be initialized", new Object[0]);
            for (Slice slice : list) {
                while (true) {
                    try {
                        this.exchangeSink.isBlocked().get(1L, TimeUnit.SECONDS);
                        break;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    } catch (ExecutionException e2) {
                        throw new RuntimeException(e2);
                    } catch (TimeoutException e3) {
                        updateSinkInstanceHandleIfNecessary();
                    }
                }
                this.writeBuffer.writeInt(taskId.getStageId().getId());
                this.writeBuffer.writeInt(taskId.getPartitionId());
                this.writeBuffer.writeInt(taskId.getAttemptId());
                this.writeBuffer.writeBytes(slice);
                this.exchangeSink.add(0, this.writeBuffer.slice().copy());
                this.writeBuffer.reset();
                this.spilledBytes += slice.length();
                this.spilledPageCount++;
            }
        }

        @GuardedBy("this")
        private void updateSinkInstanceHandleIfNecessary() {
            Verify.verify(Thread.holdsLock(this), "this method is expected to be called under a lock", new Object[0]);
            Verify.verify(this.exchange != null, "exchange is null", new Object[0]);
            Verify.verify(this.exchangeSink != null, "exchangeSink is null", new Object[0]);
            Verify.verify(this.sinkHandle != null, "sinkHandle is null", new Object[0]);
            if (this.exchangeSink.isHandleUpdateRequired()) {
                try {
                    this.exchangeSink.updateHandle((ExchangeSinkInstanceHandle) this.exchange.updateSinkInstanceHandle(this.sinkHandle, 0).get(DeduplicatingDirectExchangeBuffer.SINK_INSTANCE_HANDLE_GET_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                } catch (Exception e2) {
                    throw new RuntimeException(e2);
                }
            }
        }

        public synchronized void removePagesForPreviousAttempts(int i) {
            Preconditions.checkState(!this.inputFinished, "input is finished");
            if (this.closed) {
                return;
            }
            long j = 0;
            int i2 = 0;
            Iterator it = Multimaps.asMap(this.pageBuffer).entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry entry = (Map.Entry) it.next();
                if (((TaskId) entry.getKey()).getAttemptId() < i) {
                    Iterator it2 = ((List) entry.getValue()).iterator();
                    while (it2.hasNext()) {
                        j += ((Slice) it2.next()).getRetainedSize();
                        i2++;
                    }
                    it.remove();
                }
            }
            this.pageBufferRetainedSizeInBytes -= j;
            this.bufferedPageCount -= i2;
        }

        public synchronized OutputSource createOutputSource(Set<TaskId> set) {
            Preconditions.checkState(!this.inputFinished, "input is already marked as finished and page source has already been created");
            this.inputFinished = true;
            if (this.exchangeSink == null) {
                return new InMemoryBufferOutputSource(this.pageBuffer.entries().stream().filter(entry -> {
                    return set.contains(entry.getKey());
                }).map((v0) -> {
                    return v0.getValue();
                }).iterator());
            }
            Verify.verify(this.exchangeManager != null, "exchangeManager is expected to be initialized", new Object[0]);
            Verify.verify(this.exchange != null, "exchange is expected to be initialized", new Object[0]);
            Verify.verify(this.sinkHandle != null, "sinkHandle is expected to be initialized", new Object[0]);
            this.writeBuffer = null;
            return new ExchangeOutputSource(set, this.queryId, FluentFuture.from(MoreFutures.toListenableFuture(this.exchangeSink.finish())).transformAsync(r5 -> {
                this.exchange.sinkFinished(this.sinkHandle, 0);
                this.exchange.allRequiredSinksFinished();
                synchronized (this) {
                    this.exchangeSink = null;
                    this.sinkHandle = null;
                }
                return Exchanges.getAllSourceHandles(this.exchange.getSourceHandles());
            }, this.executor).transform(list -> {
                ExchangeSource createSource = this.exchangeManager.createSource();
                try {
                    createSource.setOutputSelector(ExchangeSourceOutputSelector.builder(ImmutableSet.of(this.exchangeId)).include(this.exchangeId, 0, 0).setPartitionCount(this.exchangeId, 1).setFinal().build());
                    createSource.addSourceHandles(list);
                    createSource.noMoreSourceHandles();
                    return createSource;
                } catch (Throwable th) {
                    try {
                        createSource.close();
                    } catch (Throwable th2) {
                        if (th2 != th) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            }, this.executor));
        }

        public synchronized long getRetainedSizeInBytes() {
            long j = this.pageBufferRetainedSizeInBytes;
            if (this.exchangeSink != null) {
                j += this.exchangeSink.getMemoryUsage();
            }
            if (this.writeBuffer != null) {
                j += this.writeBuffer.getRetainedSize();
            }
            return j;
        }

        public synchronized int getBufferedPageCount() {
            return this.bufferedPageCount;
        }

        public synchronized long getSpilledBytes() {
            return this.spilledBytes;
        }

        public synchronized int getSpilledPageCount() {
            return this.spilledPageCount;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.pageBuffer.clear();
            this.pageBufferRetainedSizeInBytes = 0L;
            this.bufferedPageCount = 0;
            this.writeBuffer = null;
            if (this.exchangeSink != null) {
                try {
                    this.exchangeSink.abort().whenComplete((r4, th) -> {
                        if (th != null) {
                            DeduplicatingDirectExchangeBuffer.log.warn(th, "Error aborting exchange sink");
                        }
                    });
                } catch (RuntimeException e) {
                    DeduplicatingDirectExchangeBuffer.log.warn(e, "Error aborting exchange sink");
                }
                this.exchangeSink = null;
            }
            if (this.exchange != null) {
                this.exchange.close();
            }
        }
    }

    public DeduplicatingDirectExchangeBuffer(Executor executor, DataSize dataSize, RetryPolicy retryPolicy, ExchangeManagerRegistry exchangeManagerRegistry, QueryId queryId, ExchangeId exchangeId) {
        this.executor = (Executor) Objects.requireNonNull(executor, "executor is null");
        Preconditions.checkArgument(retryPolicy == RetryPolicy.QUERY, "the class is used for query level retries only, got: %s", retryPolicy);
        this.pageBuffer = new PageBuffer(exchangeManagerRegistry, queryId, exchangeId, executor, dataSize);
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    public synchronized ListenableFuture<Void> isBlocked() {
        if (this.failure != null || this.closed) {
            return Futures.immediateVoidFuture();
        }
        if (!this.outputReady.isDone()) {
            return Futures.nonCancellationPropagating(Futures.transformAsync(this.outputReady, r4 -> {
                synchronized (this) {
                    if (this.outputSource != null) {
                        return this.outputSource.isBlocked();
                    }
                    return Futures.immediateVoidFuture();
                }
            }, MoreExecutors.directExecutor()));
        }
        Preconditions.checkState(this.outputSource != null, "outputSource is expected to be set");
        return this.outputSource.isBlocked();
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    public synchronized Slice pollPage() {
        throwIfFailed();
        if (this.closed || !this.outputReady.isDone()) {
            return null;
        }
        Preconditions.checkState(this.outputSource != null, "outputSource is expected to be set");
        Slice next = this.outputSource.getNext();
        updateMaxRetainedSize();
        return next;
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    public synchronized void addTask(TaskId taskId) {
        if (this.closed) {
            return;
        }
        Preconditions.checkState(!this.noMoreTasks, "no more tasks expected");
        Preconditions.checkState(this.allTasks.add(taskId), "task already registered: %s", taskId);
        if (taskId.getAttemptId() > this.maxAttemptId) {
            this.maxAttemptId = taskId.getAttemptId();
            this.pageBuffer.removePagesForPreviousAttempts(this.maxAttemptId);
            updateMaxRetainedSize();
        }
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    public synchronized void addPages(TaskId taskId, List<Slice> list) {
        if (!this.closed && this.failure == null) {
            Preconditions.checkState(this.allTasks.contains(taskId), "task is not registered: %s", taskId);
            Preconditions.checkState(!this.successfulTasks.contains(taskId), "task is finished: %s", taskId);
            Preconditions.checkState(!this.failedTasks.containsKey(taskId), "task is failed: %s", taskId);
            if (taskId.getAttemptId() < this.maxAttemptId) {
                return;
            }
            try {
                this.pageBuffer.addPages(taskId, list);
                updateMaxRetainedSize();
            } catch (RuntimeException e) {
                fail(e);
            }
        }
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    public synchronized void taskFinished(TaskId taskId) {
        if (this.closed) {
            return;
        }
        Preconditions.checkState(this.allTasks.contains(taskId), "task is not registered: %s", taskId);
        Preconditions.checkState(!this.failedTasks.containsKey(taskId), "task is failed: %s", taskId);
        Preconditions.checkState(this.successfulTasks.add(taskId), "task is finished: %s", taskId);
        checkInputFinished();
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    public synchronized void taskFailed(TaskId taskId, Throwable th) {
        if (this.closed) {
            return;
        }
        Preconditions.checkState(this.allTasks.contains(taskId), "task is not registered: %s", taskId);
        Preconditions.checkState(!this.successfulTasks.contains(taskId), "task is finished: %s", taskId);
        Preconditions.checkState(this.failedTasks.put(taskId, th) == null, "task is already failed: %s", taskId);
        checkInputFinished();
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    public synchronized void noMoreTasks() {
        if (this.closed) {
            return;
        }
        this.noMoreTasks = true;
        checkInputFinished();
    }

    @GuardedBy("this")
    private void checkInputFinished() {
        if (this.failure == null && this.outputSource == null && this.noMoreTasks) {
            Set<TaskId> set = (Set) this.allTasks.stream().filter(taskId -> {
                return taskId.getAttemptId() == this.maxAttemptId;
            }).collect(ImmutableSet.toImmutableSet());
            if (this.successfulTasks.containsAll(set)) {
                this.outputSource = this.pageBuffer.createOutputSource(set);
                unblock(this.outputReady);
                return;
            }
            TrinoException trinoException = null;
            for (Map.Entry entry : ((Map) this.failedTasks.entrySet().stream().filter(entry2 -> {
                return ((TaskId) entry2.getKey()).getAttemptId() == this.maxAttemptId;
            }).collect(ImmutableMap.toImmutableMap((v0) -> {
                return v0.getKey();
            }, (v0) -> {
                return v0.getValue();
            }))).entrySet()) {
                TaskId taskId2 = (TaskId) entry.getKey();
                TrinoException trinoException2 = (Throwable) entry.getValue();
                if ((trinoException2 instanceof TrinoException) && StandardErrorCode.REMOTE_TASK_FAILED.toErrorCode().equals(trinoException2.getErrorCode())) {
                    log.debug("Task failure discovered while fetching task results: %s", new Object[]{taskId2});
                } else if (trinoException == null) {
                    trinoException = trinoException2;
                } else if (trinoException != trinoException2) {
                    trinoException.addSuppressed(trinoException2);
                }
            }
            if (trinoException != null) {
                fail(trinoException);
            }
        }
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    public synchronized boolean isFinished() {
        return this.failure == null && (this.closed || (this.outputSource != null && this.outputSource.isFinished()));
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    public synchronized boolean isFailed() {
        return this.failure != null;
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    public synchronized long getRemainingCapacityInBytes() {
        return Long.MAX_VALUE;
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    public synchronized long getRetainedSizeInBytes() {
        long retainedSizeInBytes = this.pageBuffer.getRetainedSizeInBytes();
        if (this.outputSource != null) {
            retainedSizeInBytes += this.outputSource.getRetainedSizeInBytes();
        }
        return retainedSizeInBytes;
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    public synchronized long getMaxRetainedSizeInBytes() {
        return this.maxRetainedSizeInBytes;
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    @GuardedBy("this")
    public int getBufferedPageCount() {
        return this.pageBuffer.getBufferedPageCount();
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    @GuardedBy("this")
    public long getSpilledBytes() {
        return this.pageBuffer.getSpilledBytes();
    }

    @Override // io.trino.operator.DirectExchangeBuffer
    @GuardedBy("this")
    public int getSpilledPageCount() {
        return this.pageBuffer.getSpilledPageCount();
    }

    @Override // io.trino.operator.DirectExchangeBuffer, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        closeAndUnblock();
    }

    @GuardedBy("this")
    private void fail(Throwable th) {
        this.failure = th;
        closeAndUnblock();
    }

    @GuardedBy("this")
    private void throwIfFailed() {
        if (this.failure != null) {
            Throwables.throwIfUnchecked(this.failure);
            throw new RuntimeException(this.failure);
        }
    }

    @GuardedBy("this")
    private void closeAndUnblock() {
        try {
            try {
                Closer create = Closer.create();
                try {
                    create.register(this.pageBuffer);
                    create.register(this.outputSource);
                    if (create != null) {
                        create.close();
                    }
                } catch (Throwable th) {
                    if (create != null) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        } finally {
            unblock(this.outputReady);
        }
    }

    @GuardedBy("this")
    private void updateMaxRetainedSize() {
        this.maxRetainedSizeInBytes = Math.max(this.maxRetainedSizeInBytes, getRetainedSizeInBytes());
    }

    private void unblock(SettableFuture<Void> settableFuture) {
        this.executor.execute(() -> {
            settableFuture.set((Object) null);
        });
    }
}
