package org.apache.flink.streaming.api.runners.python.beam.state;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.utils.ByteArrayWrapper;

/* loaded from: input_file:org/apache/flink/streaming/api/runners/python/beam/state/BeamMapStateHandler.class */
public class BeamMapStateHandler extends AbstractBeamStateHandler<MapState<ByteArrayWrapper, byte[]>> {
    private static final String CLEAR_CACHED_ITERATOR_MARK = "clear_iterators";
    private static final byte GET_FLAG = 0;
    private static final byte ITERATE_FLAG = 1;
    private static final byte CHECK_EMPTY_FLAG = 2;
    private static final byte EXIST_FLAG = 0;
    private static final byte IS_NONE_FLAG = 1;
    private static final byte NOT_EXIST_FLAG = 2;
    private static final byte IS_EMPTY_FLAG = 3;
    private static final byte NOT_EMPTY_FLAG = 4;
    private static final byte DELETE = 0;
    private static final byte SET_NONE = 1;
    private static final byte SET_VALUE = 2;
    private static final BeamFnApi.StateGetResponse.Builder NOT_EXIST_RESPONSE;
    private static final BeamFnApi.StateGetResponse.Builder IS_NONE_RESPONSE;
    private static final BeamFnApi.StateGetResponse.Builder IS_EMPTY_RESPONSE;
    private static final BeamFnApi.StateGetResponse.Builder NOT_EMPTY_RESPONSE;
    private final int mapStateIterateResponseBatchSize;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final ByteArrayWrapper reuseByteArrayWrapper = new ByteArrayWrapper(new byte[0]);
    private final Map<ByteArrayWrapper, Iterator<?>> mapStateIteratorCache = new HashMap();
    private final ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos();
    private final DataInputViewStreamWrapper baisWrapper = new DataInputViewStreamWrapper(this.bais);
    private final ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
    private final DataOutputViewStreamWrapper baosWrapper = new DataOutputViewStreamWrapper(this.baos);

    public BeamMapStateHandler(ReadableConfig readableConfig) {
        this.mapStateIterateResponseBatchSize = ((Integer) readableConfig.get(PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE)).intValue();
        if (this.mapStateIterateResponseBatchSize <= 0) {
            throw new RuntimeException(String.format("The value of '%s' must be greater than 0!", PythonOptions.MAP_STATE_ITERATE_RESPONSE_BATCH_SIZE.key()));
        }
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.state.BeamStateHandler
    public BeamFnApi.StateResponse.Builder handleGet(BeamFnApi.StateRequest stateRequest, MapState<ByteArrayWrapper, byte[]> mapState) throws Exception {
        ByteArrayWrapper byteArrayWrapper;
        BeamFnApi.StateGetResponse.Builder handleIterate;
        byte[] byteArray = stateRequest.getGet().getContinuationToken().toByteArray();
        byte b = byteArray[0];
        switch (b) {
            case 0:
                this.reuseByteArrayWrapper.setData(byteArray);
                this.reuseByteArrayWrapper.setOffset(1);
                this.reuseByteArrayWrapper.setLimit(byteArray.length);
                handleIterate = handleGetValue(this.reuseByteArrayWrapper, mapState);
                break;
            case 1:
                this.bais.setBuffer(byteArray, 1, byteArray.length - 1);
                IterateType fromOrd = IterateType.fromOrd(this.baisWrapper.readByte());
                int readInt = this.baisWrapper.readInt();
                if (readInt > 0) {
                    this.reuseByteArrayWrapper.setData(byteArray);
                    this.reuseByteArrayWrapper.setOffset(this.bais.getPosition());
                    this.reuseByteArrayWrapper.setLimit(this.bais.getPosition() + readInt);
                    byteArrayWrapper = this.reuseByteArrayWrapper;
                } else {
                    byteArrayWrapper = null;
                }
                handleIterate = handleIterate(mapState, fromOrd, byteArrayWrapper);
                break;
            case 2:
                handleIterate = handleCheckEmpty(mapState);
                break;
            default:
                throw new RuntimeException(String.format("Unsupported get request type: '%d' for map state.", Byte.valueOf(b)));
        }
        return BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setGet(handleIterate);
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.state.BeamStateHandler
    public BeamFnApi.StateResponse.Builder handleAppend(BeamFnApi.StateRequest stateRequest, MapState<ByteArrayWrapper, byte[]> mapState) throws Exception {
        byte[] byteArray = stateRequest.getAppend().getData().toByteArray();
        this.bais.setBuffer(byteArray, 0, byteArray.length);
        int readInt = this.baisWrapper.readInt();
        for (int i = 0; i < readInt; i++) {
            byte readByte = this.baisWrapper.readByte();
            int readInt2 = this.baisWrapper.readInt();
            this.reuseByteArrayWrapper.setData(byteArray);
            this.reuseByteArrayWrapper.setOffset(this.bais.getPosition());
            this.reuseByteArrayWrapper.setLimit(this.bais.getPosition() + readInt2);
            this.baisWrapper.skipBytesToRead(readInt2);
            switch (readByte) {
                case 0:
                    mapState.remove(this.reuseByteArrayWrapper);
                    break;
                case 1:
                    mapState.put(this.reuseByteArrayWrapper.copy(), (Object) null);
                    break;
                case 2:
                    int readInt3 = this.baisWrapper.readInt();
                    byte[] bArr = new byte[readInt3];
                    int read = this.baisWrapper.read(bArr);
                    if (!$assertionsDisabled && readInt3 != read) {
                        throw new AssertionError();
                    }
                    mapState.put(this.reuseByteArrayWrapper.copy(), bArr);
                    break;
                default:
                    throw new RuntimeException(String.format("Unsupported append request type: '%d' for map state.", Byte.valueOf(readByte)));
            }
        }
        return BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setAppend(BeamFnApi.StateAppendResponse.getDefaultInstance());
    }

    @Override // org.apache.flink.streaming.api.runners.python.beam.state.BeamStateHandler
    public BeamFnApi.StateResponse.Builder handleClear(BeamFnApi.StateRequest stateRequest, MapState<ByteArrayWrapper, byte[]> mapState) throws Exception {
        if (stateRequest.getStateKey().getMultimapSideInput().getTransformId().equals(CLEAR_CACHED_ITERATOR_MARK)) {
            this.mapStateIteratorCache.clear();
        } else {
            mapState.clear();
        }
        return BeamFnApi.StateResponse.newBuilder().setId(stateRequest.getId()).setClear(BeamFnApi.StateClearResponse.getDefaultInstance());
    }

    private BeamFnApi.StateGetResponse.Builder handleGetValue(ByteArrayWrapper byteArrayWrapper, MapState<ByteArrayWrapper, byte[]> mapState) throws Exception {
        if (!mapState.contains(byteArrayWrapper)) {
            return NOT_EXIST_RESPONSE;
        }
        byte[] bArr = (byte[]) mapState.get(byteArrayWrapper);
        if (bArr == null) {
            return IS_NONE_RESPONSE;
        }
        this.baos.reset();
        this.baosWrapper.writeByte(0);
        this.baosWrapper.write(bArr);
        return BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(this.baos.toByteArray()));
    }

    private BeamFnApi.StateGetResponse.Builder handleCheckEmpty(MapState<ByteArrayWrapper, byte[]> mapState) throws Exception {
        return mapState.isEmpty() ? IS_EMPTY_RESPONSE : NOT_EMPTY_RESPONSE;
    }

    private BeamFnApi.StateGetResponse.Builder handleIterate(MapState<ByteArrayWrapper, byte[]> mapState, IterateType iterateType, ByteArrayWrapper byteArrayWrapper) throws Exception {
        Iterator<?> it;
        if (byteArrayWrapper == null) {
            switch (iterateType) {
                case ITEMS:
                case VALUES:
                    it = mapState.iterator();
                    break;
                case KEYS:
                    it = mapState.keys().iterator();
                    break;
                default:
                    throw new RuntimeException("Unsupported iterate type: " + iterateType);
            }
        } else {
            it = this.mapStateIteratorCache.get(byteArrayWrapper);
            if (it == null) {
                throw new RuntimeException("The cached iterator does not exist!");
            }
        }
        this.baos.reset();
        switch (iterateType) {
            case ITEMS:
            case VALUES:
                Iterator<?> it2 = it;
                for (int i = 0; i < this.mapStateIterateResponseBatchSize && it2.hasNext(); i++) {
                    Map.Entry entry = (Map.Entry) it2.next();
                    ByteArrayWrapper byteArrayWrapper2 = (ByteArrayWrapper) entry.getKey();
                    this.baosWrapper.write(byteArrayWrapper2.getData(), byteArrayWrapper2.getOffset(), byteArrayWrapper2.getLimit() - byteArrayWrapper2.getOffset());
                    this.baosWrapper.writeBoolean(entry.getValue() != null);
                    if (entry.getValue() != null) {
                        this.baosWrapper.write((byte[]) entry.getValue());
                    }
                }
            case KEYS:
                Iterator<?> it3 = it;
                for (int i2 = 0; i2 < this.mapStateIterateResponseBatchSize && it3.hasNext(); i2++) {
                    ByteArrayWrapper byteArrayWrapper3 = (ByteArrayWrapper) it3.next();
                    this.baosWrapper.write(byteArrayWrapper3.getData(), byteArrayWrapper3.getOffset(), byteArrayWrapper3.getLimit() - byteArrayWrapper3.getOffset());
                }
            default:
                throw new RuntimeException("Unsupported iterate type: " + iterateType);
        }
        if (it.hasNext()) {
            if (byteArrayWrapper == null) {
                byteArrayWrapper = new ByteArrayWrapper(UUID.randomUUID().toString().getBytes());
            }
            this.mapStateIteratorCache.put(byteArrayWrapper, it);
        } else {
            if (byteArrayWrapper != null) {
                this.mapStateIteratorCache.remove(byteArrayWrapper);
            }
            byteArrayWrapper = null;
        }
        BeamFnApi.StateGetResponse.Builder data = BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(this.baos.toByteArray()));
        if (byteArrayWrapper != null) {
            data.setContinuationToken(ByteString.copyFrom(byteArrayWrapper.getData(), byteArrayWrapper.getOffset(), byteArrayWrapper.getLimit() - byteArrayWrapper.getOffset()));
        }
        return data;
    }

    static {
        $assertionsDisabled = !BeamMapStateHandler.class.desiredAssertionStatus();
        NOT_EXIST_RESPONSE = BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(new byte[]{2}));
        IS_NONE_RESPONSE = BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(new byte[]{1}));
        IS_EMPTY_RESPONSE = BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(new byte[]{3}));
        NOT_EMPTY_RESPONSE = BeamFnApi.StateGetResponse.newBuilder().setData(ByteString.copyFrom(new byte[]{4}));
    }
}
