/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.examples.dsv2.windowing;

import java.io.Serializable;
import java.time.Duration;
import java.util.Objects;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.connector.dsv2.Source;
import org.apache.flink.api.connector.dsv2.WrappedSink;
import org.apache.flink.api.connector.dsv2.WrappedSource;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.datastream.api.ExecutionEnvironment;
import org.apache.flink.datastream.api.builtin.BuiltinFuncs;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
import org.apache.flink.datastream.api.extension.eventtime.strategy.EventTimeExtractor;
import org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext;
import org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction;
import org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.formats.csv.CsvReaderFormat;
import org.apache.flink.streaming.api.functions.sink.PrintSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.ParameterTool;

public class CountProductSalesWindowing {
    public static void main(String[] args) throws Exception {
        ParameterTool params = ParameterTool.fromArgs((String[])args);
        boolean fileOutput = params.has("output");
        ExecutionEnvironment env = ExecutionEnvironment.getInstance();
        String inputFilePath = Objects.requireNonNull(CountProductSalesWindowing.class.getClassLoader().getResource("datas/dsv2/windowing/CountProductSalesWindowingOrders.csv")).getPath();
        FileSource fileSource = FileSource.forRecordStreamFormat((StreamFormat)CsvReaderFormat.forPojo(Order.class), (Path[])new Path[]{new Path(inputFilePath)}).build();
        NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream orders = env.fromSource((Source)new WrappedSource((org.apache.flink.api.connector.source.Source)fileSource), "order source");
        NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream orderStream = orders.process(EventTimeExtension.newWatermarkGeneratorBuilder((EventTimeExtractor & Serializable)order -> order.timestamp).periodicWatermark(Duration.ofMillis(200L)).buildAsProcessFunction());
        NonKeyedPartitionStream.ProcessConfigurableAndNonKeyedPartitionStream productSalesQuantityStream = orderStream.keyBy((KeySelector & Serializable)order -> order.productId).process(BuiltinFuncs.window((WindowStrategy)WindowStrategy.tumbling((Duration)Duration.ofHours(1L), (WindowStrategy.TimeType)WindowStrategy.EVENT_TIME), (OneInputWindowStreamProcessFunction)new CountSalesQuantity()));
        if (fileOutput) {
            productSalesQuantityStream.toSink((Sink)new WrappedSink((org.apache.flink.api.connector.sink2.Sink)((FileSink.DefaultRowFormatBuilder)FileSink.forRowFormat((Path)new Path(params.get("output")), (Encoder)new SimpleStringEncoder()).withRollingPolicy((RollingPolicy)DefaultRollingPolicy.builder().withMaxPartSize(MemorySize.ofMebiBytes((long)1L)).withRolloverInterval(Duration.ofSeconds(10L)).build())).build())).withName("file-sink");
        } else {
            productSalesQuantityStream.toSink((Sink)new WrappedSink((org.apache.flink.api.connector.sink2.Sink)new PrintSink())).withName("print-sink");
        }
        env.execute("Count Product Sales Windowing");
    }

    public static class CountSalesQuantity
    implements OneInputWindowStreamProcessFunction<Order, ProductSales> {
        public void onTrigger(Collector<ProductSales> output, PartitionedContext<ProductSales> ctx, OneInputWindowContext<Order> windowContext) throws Exception {
            long productId = (Long)ctx.getStateManager().getCurrentKey();
            long salesQuantity = 0L;
            for (Order ignored : windowContext.getAllRecords()) {
                ++salesQuantity;
            }
            output.collect((Object)new ProductSales(productId, windowContext.getStartTime(), salesQuantity));
        }
    }

    public static class ProductSales {
        public long productId;
        public long startTime;
        public long salesQuantity;

        public ProductSales(long productId, long startTime, long salesQuantity) {
            this.productId = productId;
            this.startTime = startTime;
            this.salesQuantity = salesQuantity;
        }

        public String toString() {
            return String.format("%d,%d,%d", this.productId, this.startTime, this.salesQuantity);
        }
    }

    public static class Order {
        public long productId;
        public long timestamp;
    }
}

