/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.formats.avro;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.connector.datagen.source.TestDataGenerators;
import org.apache.flink.core.fs.Path;
import org.apache.flink.formats.avro.AvroWriterFactory;
import org.apache.flink.formats.avro.AvroWriters;
import org.apache.flink.formats.avro.generated.Address;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.UniqueBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.legacy.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class AvroStreamingFileSinkITCase
extends AbstractTestBaseJUnit4 {
    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds((long)20L);

    @Test
    public void testWriteAvroSpecific() throws Exception {
        File folder = TEMPORARY_FOLDER.newFolder();
        List<Address> data = Arrays.asList(new Address(1, "a", "b", "c", "12345"), new Address(2, "p", "q", "r", "12345"), new Address(3, "x", "y", "z", "12345"));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(100L);
        AvroWriterFactory avroWriterFactory = AvroWriters.forSpecificRecord(Address.class);
        DataStreamSource stream = env.fromSource((Source)TestDataGenerators.fromDataWithSnapshotsLatch(data, (TypeInformation)TypeInformation.of(Address.class)), WatermarkStrategy.noWatermarks(), "Test Source");
        stream.addSink((SinkFunction)((StreamingFileSink.DefaultBulkFormatBuilder)StreamingFileSink.forBulkFormat((Path)Path.fromLocalFile((File)folder), (BulkWriter.Factory)avroWriterFactory).withBucketAssigner((BucketAssigner)new UniqueBucketAssigner("test"))).build());
        env.execute();
        AvroStreamingFileSinkITCase.validateResults(folder, new SpecificDatumReader(Address.class), data);
    }

    @Test
    public void testWriteAvroGeneric() throws Exception {
        File folder = TEMPORARY_FOLDER.newFolder();
        Schema schema = Address.getClassSchema();
        GenericTestDataCollection data = new GenericTestDataCollection();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(100L);
        AvroWriterFactory avroWriterFactory = AvroWriters.forGenericRecord((Schema)schema);
        DataStreamSource stream = env.fromSource((Source)TestDataGenerators.fromDataWithSnapshotsLatch((Collection)data, (TypeInformation)new GenericRecordAvroTypeInfo(schema)), WatermarkStrategy.noWatermarks(), "Test Source");
        stream.addSink((SinkFunction)((StreamingFileSink.DefaultBulkFormatBuilder)StreamingFileSink.forBulkFormat((Path)Path.fromLocalFile((File)folder), (BulkWriter.Factory)avroWriterFactory).withBucketAssigner((BucketAssigner)new UniqueBucketAssigner("test"))).build());
        env.execute();
        AvroStreamingFileSinkITCase.validateResults(folder, new GenericDatumReader(schema), new ArrayList<GenericRecord>(data));
    }

    @Test
    public void testWriteAvroReflect() throws Exception {
        File folder = TEMPORARY_FOLDER.newFolder();
        List<Datum> data = Arrays.asList(new Datum("a", 1), new Datum("b", 2), new Datum("c", 3));
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(100L);
        AvroWriterFactory avroWriterFactory = AvroWriters.forReflectRecord(Datum.class);
        DataStreamSource stream = env.fromSource((Source)TestDataGenerators.fromDataWithSnapshotsLatch(data, (TypeInformation)TypeInformation.of(Datum.class)), WatermarkStrategy.noWatermarks(), "Test Source");
        stream.addSink((SinkFunction)((StreamingFileSink.DefaultBulkFormatBuilder)StreamingFileSink.forBulkFormat((Path)Path.fromLocalFile((File)folder), (BulkWriter.Factory)avroWriterFactory).withBucketAssigner((BucketAssigner)new UniqueBucketAssigner("test"))).build());
        env.execute();
        AvroStreamingFileSinkITCase.validateResults(folder, new ReflectDatumReader(Datum.class), data);
    }

    private static <T> void validateResults(File folder, DatumReader<T> datumReader, List<T> expected) throws Exception {
        Object[] buckets = folder.listFiles();
        Assertions.assertThat((Object[])buckets).hasSize(1);
        Object[] partFiles = ((File)buckets[0]).listFiles();
        Assertions.assertThat((Object[])partFiles).hasSize(2);
        for (Object partFile : partFiles) {
            Assertions.assertThat((File)partFile).isNotEmpty();
            List<T> fileContent = AvroStreamingFileSinkITCase.readAvroFile((File)partFile, datumReader);
            Assertions.assertThat(fileContent).isEqualTo(expected);
        }
    }

    private static <T> List<T> readAvroFile(File file, DatumReader<T> datumReader) throws IOException {
        ArrayList<Object> results = new ArrayList<Object>();
        try (DataFileReader dataFileReader = new DataFileReader(file, datumReader);){
            while (dataFileReader.hasNext()) {
                results.add(dataFileReader.next());
            }
        }
        return results;
    }

    public static class Datum
    implements Serializable {
        public String a;
        public int b;

        public Datum() {
        }

        public Datum(String a, int b) {
            this.a = a;
            this.b = b;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            Datum datum = (Datum)o;
            return this.b == datum.b && (this.a != null ? this.a.equals(datum.a) : datum.a == null);
        }

        public int hashCode() {
            int result = this.a != null ? this.a.hashCode() : 0;
            result = 31 * result + this.b;
            return result;
        }
    }

    private static class GenericTestDataCollection
    extends AbstractCollection<GenericRecord>
    implements Serializable {
        private GenericTestDataCollection() {
        }

        @Override
        public Iterator<GenericRecord> iterator() {
            GenericData.Record rec1 = new GenericData.Record(Address.getClassSchema());
            rec1.put(0, (Object)1);
            rec1.put(1, (Object)"a");
            rec1.put(2, (Object)"b");
            rec1.put(3, (Object)"c");
            rec1.put(4, (Object)"12345");
            GenericData.Record rec2 = new GenericData.Record(Address.getClassSchema());
            rec2.put(0, (Object)2);
            rec2.put(1, (Object)"x");
            rec2.put(2, (Object)"y");
            rec2.put(3, (Object)"z");
            rec2.put(4, (Object)"98765");
            return Arrays.asList(rec1, rec2).iterator();
        }

        @Override
        public int size() {
            return 2;
        }
    }
}

