package io.pravega.client.stream.impl;

import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.pravega.client.control.impl.Controller;
import io.pravega.client.security.auth.DelegationTokenProvider;
import io.pravega.client.security.auth.DelegationTokenProviderFactory;
import io.pravega.client.segment.impl.Segment;
import io.pravega.client.segment.impl.SegmentOutputStream;
import io.pravega.client.segment.impl.SegmentOutputStreamFactory;
import io.pravega.client.segment.impl.SegmentSealedException;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import io.pravega.common.Exceptions;
import io.pravega.common.concurrent.ExecutorServiceHelpers;
import io.pravega.common.util.ByteBufferUtils;
import io.pravega.common.util.RetriesExhaustedException;
import io.pravega.common.util.Retry;
import io.pravega.shared.security.auth.AccessOperation;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/pravega/client/stream/impl/EventStreamWriterImpl.class */
public class EventStreamWriterImpl<Type> implements EventStreamWriter<Type> {

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    private static final Logger log;
    private final Stream stream;
    private final String writerId;
    private final Serializer<Type> serializer;
    private final Controller controller;
    private final EventWriterConfig config;
    private final SegmentSelector selector;
    private final ExecutorService retransmitPool;
    private final Pinger pinger;
    private final DelegationTokenProvider tokenProvider;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Object writeFlushLock = new Object();
    private final Object writeSealLock = new Object();
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final ConcurrentLinkedQueue<Segment> sealedSegmentQueue = new ConcurrentLinkedQueue<>();
    private final Consumer<Segment> segmentSealedCallBack = this::handleLogSealed;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventStreamWriterImpl(Stream stream, String str, Controller controller, SegmentOutputStreamFactory segmentOutputStreamFactory, Serializer<Type> serializer, EventWriterConfig eventWriterConfig, ExecutorService executorService, ScheduledExecutorService scheduledExecutorService) {
        this.writerId = str;
        this.stream = (Stream) Preconditions.checkNotNull(stream);
        this.controller = (Controller) Preconditions.checkNotNull(controller);
        this.tokenProvider = DelegationTokenProviderFactory.create(this.controller, this.stream.getScope(), this.stream.getStreamName(), AccessOperation.WRITE);
        this.selector = new SegmentSelector(stream, controller, segmentOutputStreamFactory, eventWriterConfig, this.tokenProvider);
        this.serializer = (Serializer) Preconditions.checkNotNull(serializer);
        this.config = eventWriterConfig;
        this.retransmitPool = (ExecutorService) Preconditions.checkNotNull(executorService);
        this.pinger = new Pinger(eventWriterConfig.getTransactionTimeoutTime(), stream, controller, scheduledExecutorService);
        List<PendingEvent> refreshSegmentEventWriters = this.selector.refreshSegmentEventWriters(this.segmentSealedCallBack);
        if (!$assertionsDisabled && !refreshSegmentEventWriters.isEmpty()) {
            throw new AssertionError("There should not be any events to have failed");
        }
        if (eventWriterConfig.isAutomaticallyNoteTime()) {
            scheduledExecutorService.scheduleWithFixedDelay(() -> {
                noteTimeInternal(System.currentTimeMillis());
            }, 5L, 5L, TimeUnit.SECONDS);
        }
    }

    @Override // io.pravega.client.stream.EventStreamWriter
    public CompletableFuture<Void> writeEvent(Type type) {
        return writeEventInternal(null, type);
    }

    @Override // io.pravega.client.stream.EventStreamWriter
    public CompletableFuture<Void> writeEvent(String str, Type type) {
        Preconditions.checkNotNull(str);
        return writeEventInternal(str, type);
    }

    private CompletableFuture<Void> writeEventInternal(String str, Type type) {
        Preconditions.checkNotNull(type);
        Exceptions.checkNotClosed(this.closed.get(), this);
        ByteBuffer serialize = this.serializer.serialize(type);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        synchronized (this.writeFlushLock) {
            synchronized (this.writeSealLock) {
                getSegmentWriter(str).write(PendingEvent.withHeader(str, serialize, completableFuture));
            }
        }
        return completableFuture;
    }

    @Override // io.pravega.client.stream.EventStreamWriter
    public CompletableFuture<Void> writeEvents(String str, List<Type> list) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(list);
        Exceptions.checkNotClosed(this.closed.get(), this);
        java.util.stream.Stream<Type> stream = list.stream();
        Serializer<Type> serializer = this.serializer;
        Objects.requireNonNull(serializer);
        List list2 = (List) stream.map(serializer::serialize).collect(Collectors.toList());
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        synchronized (this.writeFlushLock) {
            synchronized (this.writeSealLock) {
                getSegmentWriter(str).write(PendingEvent.withHeader(str, (List<ByteBuffer>) list2, completableFuture));
            }
        }
        return completableFuture;
    }

    private SegmentOutputStream getSegmentWriter(String str) {
        SegmentOutputStream segmentOutputStreamForKey = this.selector.getSegmentOutputStreamForKey(str);
        while (true) {
            SegmentOutputStream segmentOutputStream = segmentOutputStreamForKey;
            if (segmentOutputStream != null) {
                return segmentOutputStream;
            }
            log.info("Don't have a writer for segment: {}", this.selector.getSegmentForEvent(str));
            handleMissingLog();
            segmentOutputStreamForKey = this.selector.getSegmentOutputStreamForKey(str);
        }
    }

    @GuardedBy("writeSealLock")
    private void handleMissingLog() {
        resend(this.selector.refreshSegmentEventWriters(this.segmentSealedCallBack));
    }

    private void handleLogSealed(Segment segment) {
        this.sealedSegmentQueue.add(segment);
        this.retransmitPool.execute(() -> {
            Retry.indefinitelyWithExpBackoff(this.config.getInitialBackoffMillis(), this.config.getBackoffMultiple(), this.config.getMaxBackoffMillis(), th -> {
                log.error("Encountered exception when handling a sealed segment: ", th);
            }).run(() -> {
                synchronized (this.writeSealLock) {
                    Segment poll = this.sealedSegmentQueue.poll();
                    log.info("Sealing segment {} ", poll);
                    while (poll != null) {
                        resend(this.selector.refreshSegmentEventWritersUponSealed(poll, this.segmentSealedCallBack));
                        this.selector.removeSegmentWriter(poll);
                        for (SegmentOutputStream segmentOutputStream : this.selector.getWriters().values()) {
                            try {
                                segmentOutputStream.write(PendingEvent.withoutHeader(null, ByteBufferUtils.EMPTY, null));
                                segmentOutputStream.flush();
                            } catch (SegmentSealedException e) {
                                log.info("Flush on segment {} failed due to {}, it will be retried.", segmentOutputStream.getSegmentName(), e.getMessage());
                            } catch (RetriesExhaustedException e2) {
                                log.warn("Flush on segment {} failed after all retries", segmentOutputStream.getSegmentName(), e2);
                            }
                        }
                        poll = this.sealedSegmentQueue.poll();
                        log.info("Sealing another segment {} ", poll);
                    }
                }
                return null;
            });
        });
    }

    @GuardedBy("writeSealLock")
    private void resend(List<PendingEvent> list) {
        while (!list.isEmpty()) {
            ArrayList arrayList = new ArrayList();
            boolean z = false;
            log.info("Resending {} events", Integer.valueOf(list.size()));
            for (PendingEvent pendingEvent : list) {
                if (z) {
                    arrayList.add(pendingEvent);
                } else {
                    SegmentOutputStream segmentOutputStreamForKey = this.selector.getSegmentOutputStreamForKey(pendingEvent.getRoutingKey());
                    if (segmentOutputStreamForKey == null) {
                        log.info("No writer for segment during resend.");
                        arrayList.addAll(this.selector.refreshSegmentEventWriters(this.segmentSealedCallBack));
                        z = true;
                    } else {
                        segmentOutputStreamForKey.write(pendingEvent);
                    }
                }
            }
            list = arrayList;
        }
    }

    @Override // io.pravega.client.stream.EventStreamWriter
    public void flush() {
        Preconditions.checkState(!this.closed.get());
        synchronized (this.writeFlushLock) {
            boolean z = false;
            while (!z) {
                z = true;
                for (SegmentOutputStream segmentOutputStream : this.selector.getWriters().values()) {
                    try {
                        segmentOutputStream.flush();
                    } catch (SegmentSealedException e) {
                        z = false;
                        log.warn("Flush on segment {} failed due to {}, it will be retried.", segmentOutputStream.getSegmentName(), e.getMessage());
                    }
                }
            }
        }
    }

    @Override // io.pravega.client.stream.EventStreamWriter, java.lang.AutoCloseable
    public void close() {
        if (this.closed.getAndSet(true)) {
            return;
        }
        this.pinger.close();
        synchronized (this.writeFlushLock) {
            boolean z = false;
            while (!z) {
                z = true;
                Iterator<SegmentOutputStream> it = this.selector.getWriters().values().iterator();
                while (it.hasNext()) {
                    try {
                        it.next().close();
                    } catch (SegmentSealedException e) {
                        z = false;
                        log.warn("Close failed due to {}, it will be retried.", e.getMessage());
                    }
                }
            }
        }
        ExecutorServiceHelpers.shutdown(new ExecutorService[]{this.retransmitPool});
    }

    @Override // io.pravega.client.stream.EventStreamWriter
    public EventWriterConfig getConfig() {
        return this.config;
    }

    @Override // io.pravega.client.stream.EventStreamWriter
    public void noteTime(long j) {
        Preconditions.checkState(!this.config.isAutomaticallyNoteTime(), "To note time, automatic noting of time should be disabled.");
        noteTimeInternal(j);
    }

    private void noteTimeInternal(long j) {
        this.controller.noteTimestampFromWriter(this.writerId, this.stream, j, new WriterPosition((Map) this.selector.getWriters().entrySet().stream().collect(Collectors.toMap(entry -> {
            return (Segment) entry.getKey();
        }, entry2 -> {
            return Long.valueOf(((SegmentOutputStream) entry2.getValue()).getLastObservedWriteOffset());
        }))));
    }

    @SuppressFBWarnings(justification = "generated code")
    @Generated
    public String toString() {
        return "EventStreamWriterImpl(stream=" + this.stream + ", closed=" + this.closed + ")";
    }

    static {
        $assertionsDisabled = !EventStreamWriterImpl.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(EventStreamWriterImpl.class);
    }
}
