/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.Buckets;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;

@Internal
public class StreamingFileSinkHelper<IN>
implements ProcessingTimeService.ProcessingTimeCallback {
    private static final ListStateDescriptor<byte[]> BUCKET_STATE_DESC = new ListStateDescriptor("bucket-states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
    private static final ListStateDescriptor<Long> MAX_PART_COUNTER_STATE_DESC = new ListStateDescriptor("max-part-counter", (TypeSerializer)LongSerializer.INSTANCE);
    private final long bucketCheckInterval;
    private final ProcessingTimeService procTimeService;
    private final Buckets<IN, ?> buckets;
    private final ListState<byte[]> bucketStates;
    private final ListState<Long> maxPartCountersState;

    public StreamingFileSinkHelper(Buckets<IN, ?> buckets, boolean isRestored, OperatorStateStore stateStore, ProcessingTimeService procTimeService, long bucketCheckInterval) throws Exception {
        this.bucketCheckInterval = bucketCheckInterval;
        this.buckets = buckets;
        this.bucketStates = stateStore.getListState(BUCKET_STATE_DESC);
        this.maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC);
        this.procTimeService = procTimeService;
        if (isRestored) {
            buckets.initializeState(this.bucketStates, this.maxPartCountersState);
        }
        long currentProcessingTime = procTimeService.getCurrentProcessingTime();
        procTimeService.registerTimer(currentProcessingTime + bucketCheckInterval, (ProcessingTimeService.ProcessingTimeCallback)this);
    }

    public void commitUpToCheckpoint(long checkpointId) throws Exception {
        this.buckets.commitUpToCheckpoint(checkpointId);
    }

    public void snapshotState(long checkpointId) throws Exception {
        this.buckets.snapshotState(checkpointId, this.bucketStates, this.maxPartCountersState);
    }

    public void onProcessingTime(long timestamp) throws Exception {
        long currentTime = this.procTimeService.getCurrentProcessingTime();
        this.buckets.onProcessingTime(currentTime);
        this.procTimeService.registerTimer(currentTime + this.bucketCheckInterval, (ProcessingTimeService.ProcessingTimeCallback)this);
    }

    public void onElement(IN value, long currentProcessingTime, @Nullable Long elementTimestamp, long currentWatermark) throws Exception {
        this.buckets.onElement(value, currentProcessingTime, elementTimestamp, currentWatermark);
    }

    public void close() {
        this.buckets.close();
    }
}

