package org.apache.flink.runtime.checkpoint.hooks;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.LambdaUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/hooks/MasterHooks.class */
public class MasterHooks {

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/hooks/MasterHooks$WrappedMasterHook.class */
    private static class WrappedMasterHook<T> implements MasterTriggerRestoreHook<T> {
        private final MasterTriggerRestoreHook<T> hook;
        private final ClassLoader userClassLoader;

        /* loaded from: input_file:org/apache/flink/runtime/checkpoint/hooks/MasterHooks$WrappedMasterHook$WrappedCommand.class */
        private static class WrappedCommand implements Runnable {
            private final ClassLoader userClassLoader;
            private final Runnable command;

            WrappedCommand(ClassLoader classLoader, Runnable runnable) {
                this.userClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
                this.command = (Runnable) Preconditions.checkNotNull(runnable);
            }

            @Override // java.lang.Runnable
            public void run() {
                ClassLoader classLoader = this.userClassLoader;
                Runnable runnable = this.command;
                Objects.requireNonNull(runnable);
                LambdaUtil.withContextClassLoader(classLoader, runnable::run);
            }
        }

        WrappedMasterHook(MasterTriggerRestoreHook<T> masterTriggerRestoreHook, ClassLoader classLoader) {
            this.hook = (MasterTriggerRestoreHook) Preconditions.checkNotNull(masterTriggerRestoreHook);
            this.userClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        }

        @Override // org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook
        public void reset() throws Exception {
            ClassLoader classLoader = this.userClassLoader;
            MasterTriggerRestoreHook<T> masterTriggerRestoreHook = this.hook;
            Objects.requireNonNull(masterTriggerRestoreHook);
            LambdaUtil.withContextClassLoader(classLoader, masterTriggerRestoreHook::reset);
        }

        @Override // org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook
        public void close() throws Exception {
            ClassLoader classLoader = this.userClassLoader;
            MasterTriggerRestoreHook<T> masterTriggerRestoreHook = this.hook;
            Objects.requireNonNull(masterTriggerRestoreHook);
            LambdaUtil.withContextClassLoader(classLoader, masterTriggerRestoreHook::close);
        }

        @Override // org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook
        public String getIdentifier() {
            ClassLoader classLoader = this.userClassLoader;
            MasterTriggerRestoreHook<T> masterTriggerRestoreHook = this.hook;
            Objects.requireNonNull(masterTriggerRestoreHook);
            return (String) LambdaUtil.withContextClassLoader(classLoader, masterTriggerRestoreHook::getIdentifier);
        }

        @Override // org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook
        @Nullable
        public CompletableFuture<T> triggerCheckpoint(long j, long j2, Executor executor) throws Exception {
            Executor executor2 = runnable -> {
                executor.execute(new WrappedCommand(this.userClassLoader, runnable));
            };
            return (CompletableFuture) LambdaUtil.withContextClassLoader(this.userClassLoader, () -> {
                return this.hook.triggerCheckpoint(j, j2, executor2);
            });
        }

        @Override // org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook
        public void restoreCheckpoint(long j, @Nullable T t) throws Exception {
            LambdaUtil.withContextClassLoader(this.userClassLoader, () -> {
                this.hook.restoreCheckpoint(j, t);
            });
        }

        @Override // org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook
        @Nullable
        public SimpleVersionedSerializer<T> createCheckpointDataSerializer() {
            ClassLoader classLoader = this.userClassLoader;
            MasterTriggerRestoreHook<T> masterTriggerRestoreHook = this.hook;
            Objects.requireNonNull(masterTriggerRestoreHook);
            return (SimpleVersionedSerializer) LambdaUtil.withContextClassLoader(classLoader, masterTriggerRestoreHook::createCheckpointDataSerializer);
        }
    }

    public static void reset(Collection<MasterTriggerRestoreHook<?>> collection, Logger logger) throws FlinkException {
        for (MasterTriggerRestoreHook<?> masterTriggerRestoreHook : collection) {
            String identifier = masterTriggerRestoreHook.getIdentifier();
            try {
                masterTriggerRestoreHook.reset();
            } catch (Throwable th) {
                ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
                throw new FlinkException("Error while resetting checkpoint master hook '" + identifier + '\'', th);
            }
        }
    }

    public static void close(Collection<MasterTriggerRestoreHook<?>> collection, Logger logger) {
        for (MasterTriggerRestoreHook<?> masterTriggerRestoreHook : collection) {
            try {
                masterTriggerRestoreHook.close();
            } catch (Throwable th) {
                logger.warn("Failed to cleanly close a checkpoint master hook (" + masterTriggerRestoreHook.getIdentifier() + ")", th);
            }
        }
    }

    public static <T> CompletableFuture<MasterState> triggerHook(MasterTriggerRestoreHook<T> masterTriggerRestoreHook, long j, long j2, Executor executor) {
        String identifier = masterTriggerRestoreHook.getIdentifier();
        SimpleVersionedSerializer<T> createCheckpointDataSerializer = masterTriggerRestoreHook.createCheckpointDataSerializer();
        try {
            CompletableFuture<T> triggerCheckpoint = masterTriggerRestoreHook.triggerCheckpoint(j, j2, executor);
            return triggerCheckpoint == null ? CompletableFuture.completedFuture(null) : triggerCheckpoint.thenApply((Function) obj -> {
                if (obj == null) {
                    return null;
                }
                if (createCheckpointDataSerializer == null) {
                    throw new CompletionException((Throwable) new FlinkException("Checkpoint hook '" + identifier + " is stateful but creates no serializer"));
                }
                try {
                    return new MasterState(identifier, createCheckpointDataSerializer.serialize(obj), createCheckpointDataSerializer.getVersion());
                } catch (Throwable th) {
                    ExceptionUtils.rethrowIfFatalErrorOrOOM(th);
                    throw new CompletionException((Throwable) new FlinkException("Failed to serialize state of master hook '" + identifier + '\'', th));
                }
            }).exceptionally((Function<Throwable, ? extends U>) th -> {
                throw new CompletionException((Throwable) new FlinkException("Checkpoint master hook '" + identifier + "' produced an exception", th.getCause()));
            });
        } catch (Throwable th2) {
            return FutureUtils.completedExceptionally(new FlinkException("Error while triggering checkpoint master hook '" + identifier + '\'', th2));
        }
    }

    public static void restoreMasterHooks(Map<String, MasterTriggerRestoreHook<?>> map, Collection<MasterState> collection, long j, boolean z, Logger logger) throws FlinkException {
        if (collection == null || collection.isEmpty() || map == null || map.isEmpty()) {
            logger.info("No master state to restore");
            return;
        }
        logger.info("Calling master restore hooks");
        LinkedHashMap linkedHashMap = new LinkedHashMap(map);
        ArrayList arrayList = new ArrayList();
        for (MasterState masterState : collection) {
            if (masterState != null) {
                String name = masterState.name();
                MasterTriggerRestoreHook masterTriggerRestoreHook = (MasterTriggerRestoreHook) linkedHashMap.remove(name);
                if (masterTriggerRestoreHook != null) {
                    logger.debug("Found state to restore for hook '{}'", name);
                    arrayList.add(new Tuple2(masterTriggerRestoreHook, deserializeState(masterState, masterTriggerRestoreHook)));
                } else {
                    if (!z) {
                        throw new IllegalStateException("Found state '" + masterState.name() + "' which is not resumed by any hook.");
                    }
                    logger.info("Dropping unmatched state from '{}'", name);
                }
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Tuple2 tuple2 = (Tuple2) it.next();
            restoreHook(tuple2.f1, (MasterTriggerRestoreHook) tuple2.f0, j);
        }
        Iterator it2 = linkedHashMap.values().iterator();
        while (it2.hasNext()) {
            restoreHook(null, (MasterTriggerRestoreHook) it2.next(), j);
        }
    }

    private static <T> T deserializeState(MasterState masterState, MasterTriggerRestoreHook<?> masterTriggerRestoreHook) throws FlinkException {
        String identifier = masterTriggerRestoreHook.getIdentifier();
        try {
            SimpleVersionedSerializer<?> createCheckpointDataSerializer = masterTriggerRestoreHook.createCheckpointDataSerializer();
            if (createCheckpointDataSerializer == null) {
                throw new FlinkException("null serializer for state of hook " + masterTriggerRestoreHook.getIdentifier());
            }
            return (T) createCheckpointDataSerializer.deserialize(masterState.version(), masterState.bytes());
        } catch (Throwable th) {
            throw new FlinkException("Cannot deserialize state for master hook '" + identifier + '\'', th);
        }
    }

    private static <T> void restoreHook(Object obj, MasterTriggerRestoreHook<?> masterTriggerRestoreHook, long j) throws FlinkException {
        try {
            masterTriggerRestoreHook.restoreCheckpoint(j, obj);
        } catch (FlinkException e) {
            throw e;
        } catch (Throwable th) {
            ExceptionUtils.rethrowIfFatalError(th);
            throw new FlinkException("Error while calling restoreCheckpoint on checkpoint hook '" + masterTriggerRestoreHook.getIdentifier() + '\'', th);
        }
    }

    public static <T> MasterTriggerRestoreHook<T> wrapHook(MasterTriggerRestoreHook<T> masterTriggerRestoreHook, ClassLoader classLoader) {
        return new WrappedMasterHook(masterTriggerRestoreHook, classLoader);
    }

    private MasterHooks() {
    }
}
