package io.trino.exchange;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.MoreFutures;
import io.airlift.log.Logger;
import io.airlift.slice.Slice;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.OperatorInfo;
import io.trino.spi.exchange.ExchangeSource;
import io.trino.spi.exchange.ExchangeSourceOutputSelector;
import java.util.Objects;
import java.util.Optional;

/* loaded from: input_file:io/trino/exchange/SpoolingExchangeDataSource.class */
public class SpoolingExchangeDataSource implements ExchangeDataSource {
    private static final Logger log = Logger.get(SpoolingExchangeDataSource.class);
    private ExchangeSource exchangeSource;
    private final LocalMemoryContext systemMemoryContext;
    private volatile boolean closed;

    public SpoolingExchangeDataSource(ExchangeSource exchangeSource, LocalMemoryContext localMemoryContext) {
        this.exchangeSource = (ExchangeSource) Objects.requireNonNull(exchangeSource, "exchangeSource is null");
        this.systemMemoryContext = (LocalMemoryContext) Objects.requireNonNull(localMemoryContext, "systemMemoryContext is null");
    }

    @Override // io.trino.exchange.ExchangeDataSource
    public Slice pollPage() {
        ExchangeSource exchangeSource = this.exchangeSource;
        if (exchangeSource == null) {
            return null;
        }
        Slice read = exchangeSource.read();
        this.systemMemoryContext.setBytes(exchangeSource.getMemoryUsage());
        if (this.closed) {
            this.systemMemoryContext.setBytes(0L);
        }
        return read;
    }

    @Override // io.trino.exchange.ExchangeDataSource
    public boolean isFinished() {
        ExchangeSource exchangeSource = this.exchangeSource;
        if (exchangeSource == null) {
            return true;
        }
        return exchangeSource.isFinished();
    }

    @Override // io.trino.exchange.ExchangeDataSource
    public ListenableFuture<Void> isBlocked() {
        ExchangeSource exchangeSource = this.exchangeSource;
        return exchangeSource == null ? Futures.immediateVoidFuture() : MoreFutures.toListenableFuture(exchangeSource.isBlocked());
    }

    @Override // io.trino.exchange.ExchangeDataSource
    public void addInput(ExchangeInput exchangeInput) {
        SpoolingExchangeInput spoolingExchangeInput = (SpoolingExchangeInput) exchangeInput;
        ExchangeSource exchangeSource = this.exchangeSource;
        if (exchangeSource == null) {
            return;
        }
        Optional<ExchangeSourceOutputSelector> outputSelector = spoolingExchangeInput.getOutputSelector();
        Objects.requireNonNull(exchangeSource);
        outputSelector.ifPresent(exchangeSource::setOutputSelector);
        exchangeSource.addSourceHandles(spoolingExchangeInput.getExchangeSourceHandles());
    }

    @Override // io.trino.exchange.ExchangeDataSource
    public void noMoreInputs() {
        ExchangeSource exchangeSource = this.exchangeSource;
        if (exchangeSource == null) {
            return;
        }
        exchangeSource.noMoreSourceHandles();
    }

    @Override // io.trino.exchange.ExchangeDataSource
    public OperatorInfo getInfo() {
        return null;
    }

    @Override // io.trino.exchange.ExchangeDataSource, java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        try {
            this.exchangeSource.close();
        } catch (RuntimeException e) {
            log.warn(e, "error closing exchange source");
        } finally {
            this.exchangeSource = null;
            this.systemMemoryContext.setBytes(0L);
        }
    }
}
