package io.trino.execution.buffer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.airlift.slice.Slice;
import io.airlift.units.DataSize;
import io.trino.execution.StateMachine;
import io.trino.execution.buffer.ClientBuffer;
import io.trino.execution.buffer.PipelinedOutputBuffers;
import io.trino.execution.buffer.SerializedPageReference;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.plugin.base.metrics.TDigestHistogram;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

/* loaded from: input_file:io/trino/execution/buffer/ArbitraryOutputBuffer.class */
public class ArbitraryOutputBuffer implements OutputBuffer {
    private final OutputBufferMemoryManager memoryManager;
    private final SerializedPageReference.PagesReleasedListener onPagesReleased;
    private final MasterBuffer masterBuffer;
    private final OutputBufferStateMachine stateMachine;
    private final String taskInstanceId;

    @GuardedBy("this")
    private volatile PipelinedOutputBuffers outputBuffers = PipelinedOutputBuffers.createInitial(PipelinedOutputBuffers.BufferType.ARBITRARY);

    @GuardedBy("this")
    private final ConcurrentMap<PipelinedOutputBuffers.OutputBufferId, ClientBuffer> buffers = new ConcurrentHashMap();
    private final AtomicInteger nextClientBufferIndex = new AtomicInteger(0);
    private final AtomicLong totalPagesAdded = new AtomicLong();
    private final AtomicLong totalRowsAdded = new AtomicLong();

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:io/trino/execution/buffer/ArbitraryOutputBuffer$MasterBuffer.class */
    public static class MasterBuffer implements ClientBuffer.PagesSupplier {
        private final SerializedPageReference.PagesReleasedListener onPagesReleased;

        @GuardedBy("this")
        private boolean noMorePages;

        @GuardedBy("this")
        private final LinkedList<SerializedPageReference> masterBuffer = new LinkedList<>();
        private final AtomicInteger bufferedPages = new AtomicInteger();

        private MasterBuffer(SerializedPageReference.PagesReleasedListener pagesReleasedListener) {
            this.onPagesReleased = (SerializedPageReference.PagesReleasedListener) Objects.requireNonNull(pagesReleasedListener, "onPagesReleased is null");
        }

        public synchronized void addPages(List<SerializedPageReference> list) {
            this.masterBuffer.addAll(list);
            this.bufferedPages.set(this.masterBuffer.size());
        }

        public synchronized boolean isEmpty() {
            return this.masterBuffer.isEmpty();
        }

        @Override // io.trino.execution.buffer.ClientBuffer.PagesSupplier
        public synchronized boolean mayHaveMorePages() {
            return (this.noMorePages && this.masterBuffer.isEmpty()) ? false : true;
        }

        public synchronized void setNoMorePages() {
            this.noMorePages = true;
        }

        @Override // io.trino.execution.buffer.ClientBuffer.PagesSupplier
        public synchronized List<SerializedPageReference> getPages(DataSize dataSize) {
            long bytes = dataSize.toBytes();
            ArrayList arrayList = new ArrayList();
            long j = 0;
            while (true) {
                SerializedPageReference peek = this.masterBuffer.peek();
                if (peek != null) {
                    j += peek.getRetainedSizeInBytes();
                    if (!arrayList.isEmpty() && j > bytes) {
                        break;
                    }
                    Preconditions.checkState(this.masterBuffer.poll() == peek, "Master buffer corrupted");
                    arrayList.add(peek);
                } else {
                    break;
                }
            }
            this.bufferedPages.set(this.masterBuffer.size());
            return ImmutableList.copyOf(arrayList);
        }

        public void destroy() {
            ImmutableList copyOf;
            Preconditions.checkState(!Thread.holdsLock(this), "Cannot destroy master buffer while holding a lock on this");
            synchronized (this) {
                copyOf = ImmutableList.copyOf(this.masterBuffer);
                this.masterBuffer.clear();
                this.bufferedPages.set(0);
            }
            SerializedPageReference.dereferencePages(copyOf, this.onPagesReleased);
        }

        public int getBufferedPages() {
            return this.bufferedPages.get();
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("bufferedPages", this.bufferedPages.get()).toString();
        }
    }

    public ArbitraryOutputBuffer(String str, OutputBufferStateMachine outputBufferStateMachine, DataSize dataSize, Supplier<LocalMemoryContext> supplier, Executor executor) {
        this.taskInstanceId = (String) Objects.requireNonNull(str, "taskInstanceId is null");
        this.stateMachine = (OutputBufferStateMachine) Objects.requireNonNull(outputBufferStateMachine, "stateMachine is null");
        Objects.requireNonNull(dataSize, "maxBufferSize is null");
        Preconditions.checkArgument(dataSize.toBytes() > 0, "maxBufferSize must be at least 1");
        this.memoryManager = new OutputBufferMemoryManager(dataSize.toBytes(), (Supplier) Objects.requireNonNull(supplier, "memoryContextSupplier is null"), (Executor) Objects.requireNonNull(executor, "notificationExecutor is null"));
        this.onPagesReleased = SerializedPageReference.PagesReleasedListener.forOutputBufferMemoryManager(this.memoryManager);
        this.masterBuffer = new MasterBuffer(this.onPagesReleased);
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void addStateChangeListener(StateMachine.StateChangeListener<BufferState> stateChangeListener) {
        this.stateMachine.addStateChangeListener(stateChangeListener);
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public double getUtilization() {
        return this.memoryManager.getUtilization();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public OutputBufferStatus getStatus() {
        return OutputBufferStatus.builder(this.outputBuffers.getVersion()).setOverutilized(this.memoryManager.getUtilization() >= 0.5d && this.stateMachine.getState().canAddPages()).build();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public OutputBufferInfo getInfo() {
        BufferState state = this.stateMachine.getState();
        Collection<ClientBuffer> values = this.buffers.values();
        int bufferedPages = this.masterBuffer.getBufferedPages();
        ImmutableList.Builder builderWithExpectedSize = ImmutableList.builderWithExpectedSize(values.size());
        Iterator<ClientBuffer> it = values.iterator();
        while (it.hasNext()) {
            PipelinedBufferInfo info = it.next().getInfo();
            builderWithExpectedSize.add(info);
            bufferedPages += info.getBufferedPages();
        }
        return new OutputBufferInfo("ARBITRARY", state, state.canAddBuffers(), state.canAddPages(), this.memoryManager.getBufferedBytes(), bufferedPages, this.totalRowsAdded.get(), this.totalPagesAdded.get(), Optional.of(builderWithExpectedSize.build()), Optional.of(new TDigestHistogram(this.memoryManager.getUtilizationHistogram())), Optional.empty());
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public BufferState getState() {
        return this.stateMachine.getState();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void setOutputBuffers(OutputBuffers outputBuffers) {
        Preconditions.checkState(!Thread.holdsLock(this), "Cannot set output buffers while holding a lock on this");
        Objects.requireNonNull(outputBuffers, "newOutputBuffers is null");
        Preconditions.checkArgument(outputBuffers instanceof PipelinedOutputBuffers, "newOutputBuffers is expected to be an instance of PipelinedOutputBuffers");
        synchronized (this) {
            if (this.stateMachine.getState().isTerminal() || this.outputBuffers.getVersion() >= outputBuffers.getVersion()) {
                return;
            }
            this.outputBuffers.checkValidTransition(outputBuffers);
            this.outputBuffers = (PipelinedOutputBuffers) outputBuffers;
            Iterator<PipelinedOutputBuffers.OutputBufferId> it = this.outputBuffers.getBuffers().keySet().iterator();
            while (it.hasNext()) {
                getBuffer(it.next());
            }
            this.nextClientBufferIndex.set(0);
            if (this.outputBuffers.isNoMoreBufferIds()) {
                this.stateMachine.noMoreBuffers();
            }
            if (!this.stateMachine.getState().canAddBuffers()) {
                noMoreBuffers();
            }
            checkFlushComplete();
        }
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public ListenableFuture<Void> isFull() {
        return this.memoryManager.getBufferBlockedFuture();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void enqueue(List<Slice> list) {
        Preconditions.checkState(!Thread.holdsLock(this), "Cannot enqueue pages while holding a lock on this");
        Objects.requireNonNull(list, "pages is null");
        if (this.stateMachine.getState().canAddPages()) {
            ImmutableList.Builder builderWithExpectedSize = ImmutableList.builderWithExpectedSize(list.size());
            long j = 0;
            long j2 = 0;
            for (Slice slice : list) {
                j += slice.getRetainedSize();
                int serializedPagePositionCount = PagesSerdeUtil.getSerializedPagePositionCount(slice);
                j2 += serializedPagePositionCount;
                builderWithExpectedSize.add(new SerializedPageReference(slice, serializedPagePositionCount, 1));
            }
            List<SerializedPageReference> build = builderWithExpectedSize.build();
            this.totalRowsAdded.addAndGet(j2);
            this.totalPagesAdded.addAndGet(build.size());
            this.memoryManager.updateMemoryUsage(j);
            this.masterBuffer.addPages(build);
            List<ClientBuffer> safeGetBuffersSnapshot = safeGetBuffersSnapshot();
            if (safeGetBuffersSnapshot.isEmpty()) {
                return;
            }
            int size = this.nextClientBufferIndex.get() % safeGetBuffersSnapshot.size();
            for (int i = 0; i < safeGetBuffersSnapshot.size(); i++) {
                safeGetBuffersSnapshot.get(size).loadPagesIfNecessary(this.masterBuffer);
                size = (size + 1) % safeGetBuffersSnapshot.size();
                if (this.masterBuffer.isEmpty()) {
                    this.nextClientBufferIndex.set(size);
                    return;
                }
            }
        }
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void enqueue(int i, List<Slice> list) {
        Preconditions.checkState(i == 0, "Expected partition number to be zero");
        enqueue(list);
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public ListenableFuture<BufferResult> get(PipelinedOutputBuffers.OutputBufferId outputBufferId, long j, DataSize dataSize) {
        Preconditions.checkState(!Thread.holdsLock(this), "Cannot get pages while holding a lock on this");
        Objects.requireNonNull(outputBufferId, "bufferId is null");
        Preconditions.checkArgument(dataSize.toBytes() > 0, "maxSize must be at least 1 byte");
        return getBuffer(outputBufferId).getPages(j, dataSize, Optional.of(this.masterBuffer));
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void acknowledge(PipelinedOutputBuffers.OutputBufferId outputBufferId, long j) {
        Preconditions.checkState(!Thread.holdsLock(this), "Cannot acknowledge pages while holding a lock on this");
        Objects.requireNonNull(outputBufferId, "bufferId is null");
        getBuffer(outputBufferId).acknowledgePages(j);
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void destroy(PipelinedOutputBuffers.OutputBufferId outputBufferId) {
        Preconditions.checkState(!Thread.holdsLock(this), "Cannot destroy while holding a lock on this");
        Objects.requireNonNull(outputBufferId, "bufferId is null");
        getBuffer(outputBufferId).destroy();
        checkFlushComplete();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void setNoMorePages() {
        Preconditions.checkState(!Thread.holdsLock(this), "Cannot set no more pages while holding a lock on this");
        this.stateMachine.noMorePages();
        this.memoryManager.setNoBlockOnFull();
        this.masterBuffer.setNoMorePages();
        Iterator<ClientBuffer> it = safeGetBuffersSnapshot().iterator();
        while (it.hasNext()) {
            it.next().loadPagesIfNecessary(this.masterBuffer);
        }
        checkFlushComplete();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void destroy() {
        Preconditions.checkState(!Thread.holdsLock(this), "Cannot destroy while holding a lock on this");
        if (this.stateMachine.finish()) {
            noMoreBuffers();
            this.masterBuffer.destroy();
            safeGetBuffersSnapshot().forEach((v0) -> {
                v0.destroy();
            });
            this.memoryManager.setNoBlockOnFull();
            forceFreeMemory();
        }
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public void abort() {
        if (this.stateMachine.abort()) {
            this.memoryManager.setNoBlockOnFull();
            forceFreeMemory();
        }
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public long getPeakMemoryUsage() {
        return this.memoryManager.getPeakMemoryUsage();
    }

    @Override // io.trino.execution.buffer.OutputBuffer
    public Optional<Throwable> getFailureCause() {
        return this.stateMachine.getFailureCause();
    }

    @VisibleForTesting
    void forceFreeMemory() {
        this.memoryManager.close();
    }

    private synchronized ClientBuffer getBuffer(PipelinedOutputBuffers.OutputBufferId outputBufferId) {
        ClientBuffer clientBuffer = this.buffers.get(outputBufferId);
        if (clientBuffer != null) {
            return clientBuffer;
        }
        BufferState state = this.stateMachine.getState();
        Preconditions.checkState(state == BufferState.ABORTED || state.canAddBuffers() || !this.outputBuffers.isNoMoreBufferIds(), "No more buffers already set");
        ClientBuffer clientBuffer2 = new ClientBuffer(this.taskInstanceId, outputBufferId, this.onPagesReleased);
        if (this.stateMachine.getState() == BufferState.FINISHED) {
            clientBuffer2.destroy();
        }
        this.buffers.put(outputBufferId, clientBuffer2);
        return clientBuffer2;
    }

    private synchronized List<ClientBuffer> safeGetBuffersSnapshot() {
        return ImmutableList.copyOf(this.buffers.values());
    }

    private synchronized void noMoreBuffers() {
        if (this.outputBuffers.isNoMoreBufferIds()) {
            Sets.SetView difference = Sets.difference(this.buffers.keySet(), this.outputBuffers.getBuffers().keySet());
            Preconditions.checkState(difference.isEmpty(), "Final output buffers does not contain all created buffer ids: %s", difference);
        }
    }

    private void checkFlushComplete() {
        BufferState state = this.stateMachine.getState();
        if ((state == BufferState.FLUSHING || (state == BufferState.NO_MORE_PAGES && this.masterBuffer.isEmpty())) && safeGetBuffersSnapshot().stream().allMatch((v0) -> {
            return v0.isDestroyed();
        })) {
            destroy();
        }
    }

    @VisibleForTesting
    OutputBufferMemoryManager getMemoryManager() {
        return this.memoryManager;
    }
}
