package org.apache.seatunnel.engine.server.task.flow;

import com.hazelcast.cluster.Address;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.AbstractTask;
import org.apache.seatunnel.engine.server.task.SeaTunnelSourceCollector;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.context.SourceReaderContext;
import org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
import org.apache.seatunnel.engine.server.task.operation.source.RequestSplitOperation;
import org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOperation;
import org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
import org.apache.seatunnel.engine.server.task.operation.source.SourceReaderEventOperation;
import org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.class */
public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends ActionFlowLifeCycle implements InternalCheckpointListener {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SourceFlowLifeCycle.class);
    private final SourceAction<T, SplitT, ?> sourceAction;
    private final TaskLocation enumeratorTaskLocation;
    private Address enumeratorTaskAddress;
    private SourceReader<T, SplitT> reader;
    private transient Serializer<SplitT> splitSerializer;
    private final int indexID;
    private final TaskLocation currentTaskLocation;
    private SeaTunnelSourceCollector<T> collector;
    private final MetricsContext metricsContext;

    public SourceFlowLifeCycle(SourceAction<T, SplitT, ?> sourceAction, int i, TaskLocation taskLocation, SeaTunnelTask seaTunnelTask, TaskLocation taskLocation2, CompletableFuture<Void> completableFuture, MetricsContext metricsContext) {
        super(sourceAction, seaTunnelTask, completableFuture);
        this.sourceAction = sourceAction;
        this.indexID = i;
        this.enumeratorTaskLocation = taskLocation;
        this.currentTaskLocation = taskLocation2;
        this.metricsContext = metricsContext;
    }

    public void setCollector(SeaTunnelSourceCollector<T> seaTunnelSourceCollector) {
        this.collector = seaTunnelSourceCollector;
    }

    @Override // org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle
    public void init() throws Exception {
        this.splitSerializer = this.sourceAction.getSource().getSplitSerializer();
        this.reader = this.sourceAction.getSource().createReader(new SourceReaderContext(this.indexID, this.sourceAction.getSource().getBoundedness(), this, this.metricsContext));
        this.enumeratorTaskAddress = getEnumeratorTaskAddress();
    }

    @Override // org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle
    public void open() throws Exception {
        this.reader.open();
        register();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Address getEnumeratorTaskAddress() throws ExecutionException, InterruptedException {
        return (Address) this.runningTask.getExecutionContext().sendToMaster(new GetTaskGroupAddressOperation(this.enumeratorTaskLocation)).get();
    }

    @Override // org.apache.seatunnel.engine.server.task.flow.AbstractFlowLifeCycle, org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle
    public void close() throws IOException {
        this.reader.close();
        super.close();
    }

    public void collect() throws Exception {
        if (this.prepareClose.booleanValue()) {
            return;
        }
        this.reader.pollNext(this.collector);
        if (this.collector.getRowCountThisPollNext() == 0) {
            Thread.sleep(100L);
        } else {
            this.collector.resetRowCountThisPollNext();
        }
    }

    public void signalNoMoreElement() {
        try {
            this.prepareClose = true;
            this.runningTask.getExecutionContext().sendToMember(new SourceNoMoreElementOperation(this.currentTaskLocation, this.enumeratorTaskLocation), this.enumeratorTaskAddress).get();
        } catch (Exception e) {
            log.warn("source close failed {}", (Throwable) e);
            throw new RuntimeException(e);
        }
    }

    private void register() {
        try {
            this.runningTask.getExecutionContext().sendToMember(new SourceRegisterOperation(this.currentTaskLocation, this.enumeratorTaskLocation), this.enumeratorTaskAddress).get();
        } catch (InterruptedException | ExecutionException e) {
            log.warn("source register failed.", e);
            throw new RuntimeException(e);
        }
    }

    public void requestSplit() {
        try {
            this.runningTask.getExecutionContext().sendToMember(new RequestSplitOperation(this.currentTaskLocation, this.enumeratorTaskLocation), this.enumeratorTaskAddress).get();
        } catch (InterruptedException | ExecutionException e) {
            log.warn("source request split failed.", e);
            throw new RuntimeException(e);
        }
    }

    public void sendSourceEventToEnumerator(SourceEvent sourceEvent) {
        try {
            this.runningTask.getExecutionContext().sendToMember(new SourceReaderEventOperation(this.enumeratorTaskLocation, this.currentTaskLocation, sourceEvent), this.enumeratorTaskAddress).get();
        } catch (InterruptedException | ExecutionException e) {
            log.warn("source request split failed.", e);
            throw new RuntimeException(e);
        }
    }

    public void receivedSplits(List<SplitT> list) {
        if (list.isEmpty()) {
            this.reader.handleNoMoreSplits();
        } else {
            this.reader.addSplits(list);
        }
    }

    public void triggerBarrier(Barrier barrier) throws Exception {
        log.debug("source trigger barrier [{}]", barrier);
        synchronized (this.collector.getCheckpointLock()) {
            if (barrier.prepareClose()) {
                this.prepareClose = true;
            }
            if (barrier.snapshot()) {
                this.runningTask.addState(barrier, ActionStateKey.of(this.sourceAction), AbstractTask.serializeStates(this.splitSerializer, this.reader.snapshotState(barrier.getId())));
            }
            this.runningTask.ack(barrier);
            log.debug("source ack barrier finished, taskId: [{}]", this.runningTask.getTaskID());
            this.collector.sendRecordToNext(new Record<>(barrier));
            log.debug("send record to next finished, taskId: [{}]", this.runningTask.getTaskID());
        }
    }

    @Override // org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener, org.apache.seatunnel.api.state.CheckpointListener
    public void notifyCheckpointComplete(long j) throws Exception {
        this.reader.notifyCheckpointComplete(j);
    }

    @Override // org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener, org.apache.seatunnel.api.state.CheckpointListener
    public void notifyCheckpointAborted(long j) throws Exception {
        this.reader.notifyCheckpointAborted(j);
    }

    /* JADX WARN: Type inference failed for: r4v1, types: [java.lang.Object[], java.io.Serializable] */
    @Override // org.apache.seatunnel.engine.server.checkpoint.Stateful
    public void restoreState(List<ActionSubtaskState> list) throws Exception {
        if (list.isEmpty()) {
            return;
        }
        try {
            this.runningTask.getExecutionContext().sendToMember(new RestoredSplitOperation(this.enumeratorTaskLocation, SerializationUtils.serialize(((List) list.stream().map((v0) -> {
                return v0.getState();
            }).flatMap((v0) -> {
                return v0.stream();
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).map(bArr -> {
                return (SourceSplit) ExceptionUtil.sneaky(() -> {
                    return this.splitSerializer.deserialize(bArr);
                });
            }).collect(Collectors.toList())).toArray()), this.indexID), this.enumeratorTaskAddress).get();
        } catch (InterruptedException | ExecutionException e) {
            log.warn("source request split failed.", e);
            throw new RuntimeException(e);
        }
    }
}
