/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.dsv2.watermark;

import java.io.Serializable;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.state.StateDeclaration;
import org.apache.flink.api.common.state.StateDeclarations;
import org.apache.flink.api.common.state.ValueStateDeclaration;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.typeinfo.TypeDescriptor;
import org.apache.flink.api.common.typeinfo.TypeDescriptors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.watermark.LongWatermark;
import org.apache.flink.api.common.watermark.LongWatermarkDeclaration;
import org.apache.flink.api.common.watermark.Watermark;
import org.apache.flink.api.common.watermark.WatermarkDeclaration;
import org.apache.flink.api.common.watermark.WatermarkDeclarations;
import org.apache.flink.api.common.watermark.WatermarkHandlingResult;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.connector.dsv2.WrappedSink;
import org.apache.flink.api.connector.dsv2.WrappedSource;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.datastream.api.ExecutionEnvironment;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.NonPartitionedContext;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.function.ApplyPartitionFunction;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.streaming.api.functions.sink.PrintSink;
import org.apache.flink.util.ParameterTool;

public class CountSales {
    public static final LongWatermarkDeclaration EVENT_TIME_WATERMARK_DECLARATION = WatermarkDeclarations.newBuilder((String)"EVENT_TIME").typeLong().combineFunctionMin().combineWaitForAllChannels(true).defaultHandlingStrategyForward().build();

    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        int parallelism = params.getInt("parallelism", 5);
        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
        NonKeyedPartitionStream source = (NonKeyedPartitionStream)env.fromSource((org.apache.flink.api.connector.dsv2.Source)new WrappedSource((Source)new OrderSource()), "order source").withParallelism(parallelism);
        source.keyBy((KeySelector & Serializable)order -> order.productId).process((OneInputStreamProcessFunction)new CountSalesProcessFunction()).toSink((Sink)new WrappedSink((org.apache.flink.api.connector.sink2.Sink)new PrintSink()));
        env.execute("Count Sales");
    }

    private static class CountSalesProcessFunction
    implements OneInputStreamProcessFunction<Order, CumulativeSales> {
        private final ValueStateDeclaration<Double> salesStateDeclaration = StateDeclarations.valueState((String)"sales", (TypeDescriptor)TypeDescriptors.DOUBLE);

        private CountSalesProcessFunction() {
        }

        public Set<StateDeclaration> usesStates() {
            return Set.of(this.salesStateDeclaration);
        }

        public void processRecord(Order record, Collector<CumulativeSales> output, PartitionedContext<CumulativeSales> ctx) throws Exception {
            ValueState salesState = ctx.getStateManager().getState(this.salesStateDeclaration);
            Double previousSales = (Double)salesState.value();
            Double newlySales = previousSales == null ? record.totalPrice : previousSales + record.totalPrice;
            salesState.update((Object)newlySales);
        }

        public WatermarkHandlingResult onWatermark(Watermark watermark, Collector<CumulativeSales> output, NonPartitionedContext<CumulativeSales> ctx) {
            if (watermark.getIdentifier().equals(EVENT_TIME_WATERMARK_DECLARATION.getIdentifier())) {
                final long currentEventTime = ((LongWatermark)watermark).getValue();
                try {
                    ctx.applyToAllPartitions((ApplyPartitionFunction)new ApplyPartitionFunction<CumulativeSales>(){

                        public void apply(Collector<CumulativeSales> collector, PartitionedContext<CumulativeSales> ctx) throws Exception {
                            long productId = (Long)ctx.getStateManager().getCurrentKey();
                            Double sales = (Double)ctx.getStateManager().getState(salesStateDeclaration).value();
                            sales = sales == null ? 0.0 : sales;
                            collector.collect((Object)new CumulativeSales(productId, currentEventTime, sales));
                        }
                    });
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
                return WatermarkHandlingResult.PEEK;
            }
            return WatermarkHandlingResult.PEEK;
        }
    }

    private static class CustomGeneratorFunction
    implements GeneratorFunction<Long, Order> {
        private SourceReaderContext readerContext;
        private long previousTimestamp = 0L;

        private CustomGeneratorFunction() {
        }

        public void open(SourceReaderContext readerContext) throws Exception {
            this.readerContext = readerContext;
        }

        public Order map(Long value) throws Exception {
            long productId = ThreadLocalRandom.current().nextLong(1L, 100L);
            double price = ThreadLocalRandom.current().nextDouble(1.0, 1000.0);
            long timestamp = System.currentTimeMillis();
            if (timestamp - this.previousTimestamp > Duration.ofSeconds(1L).toMillis()) {
                this.readerContext.emitWatermark((Watermark)EVENT_TIME_WATERMARK_DECLARATION.newWatermark(timestamp));
                this.previousTimestamp = timestamp;
            }
            Thread.sleep(100L);
            return new Order(productId, price, timestamp);
        }
    }

    private static class OrderSource
    extends DataGeneratorSource<Order> {
        public OrderSource() {
            super((GeneratorFunction)new CustomGeneratorFunction(), 1000000L, TypeInformation.of(Order.class));
        }

        public Set<? extends WatermarkDeclaration> declareWatermarks() {
            return Set.of(EVENT_TIME_WATERMARK_DECLARATION);
        }
    }

    public static class CumulativeSales {
        public long productId;
        public long timestamp;
        public double sales;

        public CumulativeSales(long productId, long timestamp, double sales) {
            this.productId = productId;
            this.timestamp = timestamp;
            this.sales = sales;
        }

        public String toString() {
            return String.format("%d,%d,%.2f", this.productId, this.timestamp, this.sales);
        }
    }

    public static class Order {
        public long productId;
        public double totalPrice;
        public long timestamp;

        public Order(long productId, double price, long timestamp) {
            this.productId = productId;
            this.totalPrice = price;
            this.timestamp = timestamp;
        }
    }
}

