package org.apache.flink.streaming.examples.statemachine;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.examples.statemachine.event.Event;
import org.apache.flink.streaming.examples.statemachine.generator.EventsGeneratorFunction;
import org.apache.flink.streaming.examples.statemachine.kafka.EventDeSerializationSchema;
import org.apache.flink.streaming.examples.statemachine.kafka.KafkaStandaloneGenerator;

/* loaded from: input_file:org/apache/flink/streaming/examples/statemachine/KafkaEventsGeneratorJob.class */
public class KafkaEventsGeneratorJob {
    public static void main(String[] strArr) throws Exception {
        ParameterTool fromArgs = ParameterTool.fromArgs(strArr);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        double d = fromArgs.getDouble("error-rate", 0.0d);
        double d2 = fromArgs.getDouble("rps", rpsFromSleep(fromArgs.getInt("sleep", 1), executionEnvironment.getParallelism()));
        System.out.printf("Generating events to Kafka with standalone source with error rate %f and %.1f records per second\n", Double.valueOf(d), Double.valueOf(d2));
        System.out.println();
        executionEnvironment.fromSource(new DataGeneratorSource(new EventsGeneratorFunction(d), Long.MAX_VALUE, RateLimiterStrategy.perSecond(d2), TypeInformation.of(Event.class)), WatermarkStrategy.noWatermarks(), "Events Generator Source").sinkTo(KafkaSink.builder().setBootstrapServers(fromArgs.get("brokers", KafkaStandaloneGenerator.BROKER_ADDRESS)).setRecordSerializer(KafkaRecordSerializationSchema.builder().setValueSerializationSchema(new EventDeSerializationSchema()).setTopic(fromArgs.get("kafka-topic")).build()).build());
        executionEnvironment.execute("State machine example Kafka events generator job");
    }

    private static double rpsFromSleep(int i, int i2) {
        return (1000.0d / i) * i2;
    }
}
