package org.apache.flink.runtime.asyncprocessing.declare;

import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;

@Experimental
/* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/declare/DeclarationChain.class */
public class DeclarationChain<IN> implements ThrowingConsumer<IN, Exception> {
    private final DeclarationContext context;
    private final Deque<Transformation<?, ?>> transformations = new LinkedList();
    private DeclarationChain<IN>.DeclarationStage<?> currentStage;

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/declare/DeclarationChain$AbstractTransformation.class */
    private static abstract class AbstractTransformation<FROM, TO> implements Transformation<FROM, TO> {
        String name = null;

        private AbstractTransformation() {
        }

        @Override // org.apache.flink.runtime.asyncprocessing.declare.DeclarationChain.Transformation
        public void withName(String str) throws DeclarationException {
            if (this.name != null) {
                throw new DeclarationException("Double naming");
            }
            this.name = str;
            declare();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/declare/DeclarationChain$AcceptTransformation.class */
    private class AcceptTransformation<FROM> extends AbstractTransformation<FROM, Void> {
        ThrowingConsumer<FROM, Exception> action;
        NamedConsumer<FROM> namedFunction;

        AcceptTransformation(ThrowingConsumer<FROM, Exception> throwingConsumer) {
            this.action = throwingConsumer;
        }

        @Override // org.apache.flink.runtime.asyncprocessing.declare.DeclarationChain.Transformation
        public StateFuture<Void> apply(StateFuture<FROM> stateFuture) throws Exception {
            return stateFuture.thenAccept(this.namedFunction);
        }

        @Override // org.apache.flink.runtime.asyncprocessing.declare.DeclarationChain.Transformation
        public void declare() throws DeclarationException {
            if (this.namedFunction == null) {
                if (this.name == null) {
                    this.namedFunction = DeclarationChain.this.context.declare(this.action);
                } else {
                    this.namedFunction = DeclarationChain.this.context.declare(this.name, this.action);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/declare/DeclarationChain$ComposeTransformation.class */
    private class ComposeTransformation<FROM, TO> extends AbstractTransformation<FROM, TO> {
        FunctionWithException<FROM, StateFuture<TO>, ? extends Exception> action;
        NamedFunction<FROM, StateFuture<TO>> namedFunction;

        ComposeTransformation(FunctionWithException<FROM, StateFuture<TO>, Exception> functionWithException) {
            this.action = functionWithException;
        }

        @Override // org.apache.flink.runtime.asyncprocessing.declare.DeclarationChain.Transformation
        public StateFuture<TO> apply(StateFuture<FROM> stateFuture) throws Exception {
            return stateFuture.thenCompose(this.namedFunction);
        }

        @Override // org.apache.flink.runtime.asyncprocessing.declare.DeclarationChain.Transformation
        public void declare() throws DeclarationException {
            if (this.namedFunction == null) {
                if (this.name == null) {
                    this.namedFunction = DeclarationChain.this.context.declare(this.action);
                } else {
                    this.namedFunction = DeclarationChain.this.context.declare(this.name, this.action);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/declare/DeclarationChain$DeclarationStage.class */
    public class DeclarationStage<T> {
        private boolean afterThen = false;

        public DeclarationStage() {
        }

        private void preCheck() throws DeclarationException {
            if (this.afterThen) {
                throw new DeclarationException("Double thenCompose called for single declaration block.");
            }
            if (DeclarationChain.this.currentStage != this) {
                throw new DeclarationException("Diverged declaration. Please make sure you are declaring on the last point.");
            }
            this.afterThen = true;
        }

        public <U> DeclarationChain<IN>.DeclarationStage<U> thenCompose(FunctionWithException<T, StateFuture<U>, Exception> functionWithException) throws DeclarationException {
            preCheck();
            DeclarationStage declarationStage = (DeclarationChain<IN>.DeclarationStage<?>) new DeclarationStage();
            DeclarationChain.this.transformations.add(new ComposeTransformation(functionWithException));
            DeclarationChain.this.currentStage = declarationStage;
            DeclarationChain.this.getLastTransformation().declare();
            return declarationStage;
        }

        public DeclarationChain<IN>.DeclarationStage<Void> thenAccept(ThrowingConsumer<T, Exception> throwingConsumer) throws DeclarationException {
            preCheck();
            DeclarationChain<IN>.DeclarationStage<Void> declarationStage = new DeclarationStage<>();
            DeclarationChain.this.transformations.add(new AcceptTransformation(throwingConsumer));
            DeclarationChain.this.currentStage = declarationStage;
            DeclarationChain.this.getLastTransformation().declare();
            return declarationStage;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public DeclarationChain<IN>.DeclarationStage<T> withName(String str) throws DeclarationException {
            DeclarationChain.this.getLastTransformation().withName(str);
            return this;
        }

        public DeclarationChain<IN> finish() throws DeclarationException {
            preCheck();
            DeclarationChain.this.getLastTransformation().declare();
            return DeclarationChain.this;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Internal
    /* loaded from: input_file:org/apache/flink/runtime/asyncprocessing/declare/DeclarationChain$Transformation.class */
    public interface Transformation<FROM, TO> {
        StateFuture<TO> apply(StateFuture<FROM> stateFuture) throws Exception;

        void withName(String str) throws DeclarationException;

        void declare() throws DeclarationException;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public DeclarationChain(DeclarationContext declarationContext) {
        this.context = declarationContext;
    }

    public void accept(IN in) throws Exception {
        StateFuture<?> completedFuture = StateFutureUtils.completedFuture(in);
        Iterator<Transformation<?, ?>> it = this.transformations.iterator();
        while (it.hasNext()) {
            completedFuture = it.next().apply(completedFuture);
        }
    }

    public DeclarationChain<IN>.DeclarationStage<IN> firstStage() throws DeclarationException {
        if (this.currentStage != null) {
            throw new DeclarationException("Diverged declaration. Please make sure you call firstStage() once.");
        }
        DeclarationStage declarationStage = (DeclarationChain<IN>.DeclarationStage<?>) new DeclarationStage();
        this.currentStage = declarationStage;
        return declarationStage;
    }

    Transformation<?, ?> getLastTransformation() {
        return this.transformations.getLast();
    }
}
