package net.openhft.chronicle.wire.channel.impl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import net.openhft.affinity.AffinityStrategy;
import net.openhft.affinity.AffinityThreadFactory;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.io.Closeable;
import net.openhft.chronicle.core.io.ClosedIllegalStateException;
import net.openhft.chronicle.threads.NamedThreadFactory;
import net.openhft.chronicle.threads.Pauser;
import net.openhft.chronicle.wire.DocumentContext;
import net.openhft.chronicle.wire.UnrecoverableTimeoutException;
import net.openhft.chronicle.wire.Wire;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.channel.EventPoller;

/* loaded from: input_file:net/openhft/chronicle/wire/channel/impl/BufferedChronicleChannel.class */
public class BufferedChronicleChannel extends DelegateChronicleChannel {
    private static final boolean ALLOW_AFFINITY;
    private final Pauser pauser;
    private final WireExchanger exchanger;
    private final ExecutorService bgWriter;
    private volatile EventPoller eventPoller;
    static final /* synthetic */ boolean $assertionsDisabled;

    public BufferedChronicleChannel(TCPChronicleChannel tCPChronicleChannel, Pauser pauser) {
        super(tCPChronicleChannel);
        this.exchanger = new WireExchanger();
        this.pauser = pauser;
        String str = (tCPChronicleChannel.connectionCfg().initiator() ? "init" : "accp") + "-writer";
        this.bgWriter = Executors.newSingleThreadExecutor((ALLOW_AFFINITY && pauser.isBusy()) ? new AffinityThreadFactory(str, true, new AffinityStrategy[0]) : new NamedThreadFactory(str, true));
        this.bgWriter.submit(this::bgWrite);
    }

    @Override // net.openhft.chronicle.wire.channel.impl.DelegateChronicleChannel, net.openhft.chronicle.wire.channel.InternalChronicleChannel
    public EventPoller eventPoller() {
        return this.eventPoller;
    }

    @Override // net.openhft.chronicle.wire.channel.impl.DelegateChronicleChannel, net.openhft.chronicle.wire.channel.InternalChronicleChannel
    public BufferedChronicleChannel eventPoller(EventPoller eventPoller) {
        if (isClosed()) {
            throw new ClosedIllegalStateException(getClass().getName() + " closed for " + Thread.currentThread().getName());
        }
        this.eventPoller = eventPoller;
        return this;
    }

    private void bgWrite() {
        try {
            try {
                TCPChronicleChannel tCPChronicleChannel = (TCPChronicleChannel) this.channel;
                while (!isClosing()) {
                    tCPChronicleChannel.checkConnected();
                    Wire acquireConsumer = this.exchanger.acquireConsumer();
                    if (acquireConsumer.bytes().isEmpty()) {
                        EventPoller eventPoller = eventPoller();
                        boolean z = eventPoller == null || !eventPoller.onPoll(this);
                        this.exchanger.releaseConsumer();
                        if (z) {
                            this.pauser.pause();
                        }
                    } else {
                        if (!$assertionsDisabled && !TCPChronicleChannel.validateHeader(acquireConsumer.bytes().peekVolatileInt())) {
                            throw new AssertionError();
                        }
                        this.pauser.reset();
                        tCPChronicleChannel.flushOut(acquireConsumer);
                        this.exchanger.releaseConsumer();
                    }
                }
                this.bgWriter.shutdown();
                Closeable.closeQuietly(eventPoller());
            } catch (Throwable th) {
                if (!isClosing() && !this.channel.isClosing()) {
                    Jvm.warn().on(getClass(), "bgWriter died", th);
                }
                this.bgWriter.shutdown();
                Closeable.closeQuietly(eventPoller());
            }
        } catch (Throwable th2) {
            this.bgWriter.shutdown();
            Closeable.closeQuietly(eventPoller());
            throw th2;
        }
    }

    @Override // net.openhft.chronicle.wire.channel.impl.DelegateChronicleChannel, net.openhft.chronicle.wire.MarshallableOut, net.openhft.chronicle.wire.DocumentWritten
    public DocumentContext writingDocument(boolean z) throws UnrecoverableTimeoutException {
        return this.exchanger.writingDocument(z);
    }

    @Override // net.openhft.chronicle.wire.channel.impl.DelegateChronicleChannel, net.openhft.chronicle.wire.MarshallableOut, net.openhft.chronicle.wire.DocumentWritten
    public DocumentContext acquireWritingDocument(boolean z) throws UnrecoverableTimeoutException {
        return this.exchanger.acquireWritingDocument(z);
    }

    @Override // net.openhft.chronicle.wire.channel.impl.DelegateChronicleChannel, net.openhft.chronicle.wire.channel.InternalChronicleChannel
    public WireOut acquireProducer() {
        return this.exchanger.acquireProducer();
    }

    @Override // net.openhft.chronicle.wire.channel.impl.DelegateChronicleChannel, net.openhft.chronicle.wire.channel.InternalChronicleChannel
    public void releaseProducer() {
        this.exchanger.releaseProducer();
    }

    @Override // net.openhft.chronicle.wire.channel.impl.DelegateChronicleChannel
    public void close() {
        super.close();
        Closeable.closeQuietly(new Object[]{this.eventPoller, this.exchanger});
    }

    static {
        $assertionsDisabled = !BufferedChronicleChannel.class.desiredAssertionStatus();
        ALLOW_AFFINITY = Jvm.getBoolean("useAffinity", true);
    }
}
