package iep.io.reactivex.netty.channel;

import iep.io.reactivex.netty.metrics.Clock;
import iep.io.reactivex.netty.metrics.MetricEventsSubject;
import iep.io.reactivex.netty.pipeline.PrimitiveConversionHandler;
import iep.io.reactivex.netty.util.MultipleFutureListener;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.FileRegion;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import rx.Observable;
import rx.functions.Action0;
import rx.functions.Action1;

/* loaded from: input_file:iep/io/reactivex/netty/channel/DefaultChannelWriter.class */
public class DefaultChannelWriter<O> implements ChannelWriter<O> {
    protected static final Observable<Void> CONNECTION_ALREADY_CLOSED = Observable.error(new IllegalStateException("Connection is already closed."));
    protected final AtomicBoolean closeIssued = new AtomicBoolean();
    private final Channel nettyChannel;
    private final AtomicReference<MultipleFutureListener> unflushedWritesListener;
    private final MetricEventsSubject eventsSubject;
    private final ChannelMetricEventProvider metricEventProvider;

    /* JADX INFO: Access modifiers changed from: protected */
    public DefaultChannelWriter(Channel channel, MetricEventsSubject<?> metricEventsSubject, ChannelMetricEventProvider channelMetricEventProvider) {
        this.eventsSubject = metricEventsSubject;
        this.metricEventProvider = channelMetricEventProvider;
        if (null == channel) {
            throw new NullPointerException("Channel can not be null.");
        }
        this.nettyChannel = channel;
        this.unflushedWritesListener = new AtomicReference<>(new MultipleFutureListener(channel.newPromise()));
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public Observable<Void> writeAndFlush(O o) {
        write(o);
        return flush();
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public <R> Observable<Void> writeAndFlush(R r, ContentTransformer<R> contentTransformer) {
        write(r, contentTransformer);
        return flush();
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public Observable<Void> writeBytesAndFlush(ByteBuf byteBuf) {
        writeBytes(byteBuf);
        return flush();
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public void write(O o) {
        writeOnChannel(o);
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public <R> void write(final R r, final ContentTransformer<R> contentTransformer) {
        writeOnChannel(new PrimitiveConversionHandler.DelayedTransformationMessage() { // from class: iep.io.reactivex.netty.channel.DefaultChannelWriter.1
            @Override // iep.io.reactivex.netty.pipeline.PrimitiveConversionHandler.DelayedTransformationMessage
            public Object getTransformed(ByteBufAllocator byteBufAllocator) {
                return contentTransformer.call(r, byteBufAllocator);
            }
        });
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public void writeBytes(ByteBuf byteBuf) {
        writeOnChannel(byteBuf);
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public void writeBytes(byte[] bArr) {
        writeOnChannel(bArr);
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public void writeString(String str) {
        writeOnChannel(str);
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public Observable<Void> writeBytesAndFlush(byte[] bArr) {
        writeBytes(bArr);
        return flush();
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public Observable<Void> writeStringAndFlush(String str) {
        writeString(str);
        return flush();
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public void writeFileRegion(FileRegion fileRegion) {
        writeOnChannel(fileRegion);
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public Observable<Void> flush() {
        final long newStartTimeMillis = Clock.newStartTimeMillis();
        this.eventsSubject.onEvent(this.metricEventProvider.getFlushStartEvent());
        MultipleFutureListener andSet = this.unflushedWritesListener.getAndSet(new MultipleFutureListener(this.nettyChannel.newPromise()));
        this.nettyChannel.flush();
        return andSet.asObservable().doOnCompleted(new Action0() { // from class: iep.io.reactivex.netty.channel.DefaultChannelWriter.3
            public void call() {
                DefaultChannelWriter.this.eventsSubject.onEvent((MetricEventsSubject) DefaultChannelWriter.this.metricEventProvider.getFlushSuccessEvent(), Clock.onEndMillis(newStartTimeMillis));
            }
        }).doOnError(new Action1<Throwable>() { // from class: iep.io.reactivex.netty.channel.DefaultChannelWriter.2
            public void call(Throwable th) {
                DefaultChannelWriter.this.eventsSubject.onEvent((MetricEventsSubject) DefaultChannelWriter.this.metricEventProvider.getFlushFailedEvent(), Clock.onEndMillis(newStartTimeMillis), th);
            }
        });
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public void cancelPendingWrites(boolean z) {
        this.unflushedWritesListener.get().cancelPendingFutures(z);
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public ByteBufAllocator getAllocator() {
        return this.nettyChannel.alloc();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ChannelFuture writeOnChannel(Object obj) {
        ChannelFuture write = getChannel().write(obj);
        this.unflushedWritesListener.get().listen(write);
        return write;
    }

    public Channel getChannel() {
        return this.nettyChannel;
    }

    public boolean isCloseIssued() {
        return this.closeIssued.get();
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public Observable<Void> close() {
        return close(false);
    }

    @Override // iep.io.reactivex.netty.channel.ChannelWriter
    public Observable<Void> close(boolean z) {
        return this.closeIssued.compareAndSet(false, true) ? _close(z) : CONNECTION_ALREADY_CLOSED;
    }

    protected Observable<Void> _close(boolean z) {
        return Observable.empty();
    }
}
