package org.apache.flink.streaming.api.runners.python.beam.state;

import java.util.Base64;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.util.ProtoUtils;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.streaming.api.utils.ByteArrayWrapper;
import org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer;

/* loaded from: input_file:org/apache/flink/streaming/api/runners/python/beam/state/BeamOperatorStateStore.class */
public class BeamOperatorStateStore implements BeamStateStore {
    private final OperatorStateBackend operatorStateBackend;
    private final TypeSerializer<byte[]> valueSerializer = PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO.createSerializer(new SerializerConfigImpl());
    private final Map<String, StateDescriptor<?, ?>> stateDescriptorCache = new HashMap();

    public BeamOperatorStateStore(OperatorStateBackend operatorStateBackend) {
        this.operatorStateBackend = operatorStateBackend;
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.state.BeamStateStore
    public ListState<byte[]> getListState(BeamFnApi.StateRequest stateRequest) throws Exception {
        throw new RuntimeException("Operator list state is still not supported");
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.state.BeamStateStore
    public MapState<ByteArrayWrapper, byte[]> getMapState(BeamFnApi.StateRequest stateRequest) throws Exception {
        MapStateDescriptor mapStateDescriptor;
        if (!stateRequest.getStateKey().hasMultimapKeysSideInput()) {
            throw new RuntimeException("Unsupported broadcast state request: " + stateRequest);
        }
        FlinkFnApi.StateDescriptor parseFrom = FlinkFnApi.StateDescriptor.parseFrom(Base64.getDecoder().decode(stateRequest.getStateKey().getMultimapKeysSideInput().getSideInputId()));
        String str = "python-state-" + parseFrom.getStateName();
        MapStateDescriptor mapStateDescriptor2 = (StateDescriptor) this.stateDescriptorCache.get(str);
        if (mapStateDescriptor2 instanceof MapStateDescriptor) {
            mapStateDescriptor = mapStateDescriptor2;
        } else {
            if (mapStateDescriptor2 != null) {
                throw new RuntimeException(String.format("State name corrupt detected: '%s' is used both as MAP state and '%s' state at the same time.", str, mapStateDescriptor2.getType()));
            }
            mapStateDescriptor = new MapStateDescriptor(str, ByteArrayWrapperSerializer.INSTANCE, this.valueSerializer);
            if (parseFrom.hasStateTtlConfig()) {
                mapStateDescriptor.enableTimeToLive(ProtoUtils.parseStateTtlConfigFromProto(parseFrom.getStateTtlConfig()));
            }
            this.stateDescriptorCache.put(str, mapStateDescriptor);
        }
        final BroadcastState broadcastState = this.operatorStateBackend.getBroadcastState(mapStateDescriptor);
        return new MapState<ByteArrayWrapper, byte[]>() { // from class: org.apache.flink.streaming.api.runners.python.beam.state.BeamOperatorStateStore.1
            public byte[] get(ByteArrayWrapper byteArrayWrapper) throws Exception {
                return (byte[]) broadcastState.get(byteArrayWrapper);
            }

            public void put(ByteArrayWrapper byteArrayWrapper, byte[] bArr) throws Exception {
                broadcastState.put(byteArrayWrapper, bArr);
            }

            public void putAll(Map<ByteArrayWrapper, byte[]> map) throws Exception {
                broadcastState.putAll(map);
            }

            public void remove(ByteArrayWrapper byteArrayWrapper) throws Exception {
                broadcastState.remove(byteArrayWrapper);
            }

            public boolean contains(ByteArrayWrapper byteArrayWrapper) throws Exception {
                return broadcastState.contains(byteArrayWrapper);
            }

            public Iterable<Map.Entry<ByteArrayWrapper, byte[]>> entries() throws Exception {
                return broadcastState.entries();
            }

            public Iterable<ByteArrayWrapper> keys() throws Exception {
                Iterator<Map.Entry<ByteArrayWrapper, byte[]>> it = iterator();
                return () -> {
                    return new Iterator<ByteArrayWrapper>() { // from class: org.apache.flink.streaming.api.runners.python.beam.state.BeamOperatorStateStore.1.1
                        @Override // java.util.Iterator
                        public boolean hasNext() {
                            return it.hasNext();
                        }

                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.Iterator
                        public ByteArrayWrapper next() {
                            return (ByteArrayWrapper) ((Map.Entry) it.next()).getKey();
                        }
                    };
                };
            }

            public Iterable<byte[]> values() throws Exception {
                Iterator<Map.Entry<ByteArrayWrapper, byte[]>> it = iterator();
                return () -> {
                    return new Iterator<byte[]>() { // from class: org.apache.flink.streaming.api.runners.python.beam.state.BeamOperatorStateStore.1.2
                        @Override // java.util.Iterator
                        public boolean hasNext() {
                            return it.hasNext();
                        }

                        /* JADX WARN: Can't rename method to resolve collision */
                        @Override // java.util.Iterator
                        public byte[] next() {
                            return (byte[]) ((Map.Entry) it.next()).getValue();
                        }
                    };
                };
            }

            public Iterator<Map.Entry<ByteArrayWrapper, byte[]>> iterator() throws Exception {
                return broadcastState.entries().iterator();
            }

            public boolean isEmpty() throws Exception {
                return iterator().hasNext();
            }

            public void clear() {
                broadcastState.clear();
            }
        };
    }
}
