package org.apache.spark.sql.execution.streaming.continuous;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.sql.Timestamp;
import java.util.Calendar;
import java.util.UUID;
import javax.annotation.concurrent.GuardedBy;
import org.apache.spark.internal.Logging;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.streaming.ContinuousPartitionReaderFactory;
import org.apache.spark.sql.connector.read.streaming.ContinuousStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.connector.read.streaming.PartitionOffset;
import org.apache.spark.sql.execution.streaming.ContinuousRecordEndpoint;
import org.apache.spark.sql.execution.streaming.ContinuousRecordPartitionOffset;
import org.apache.spark.sql.execution.streaming.sources.TextSocketReader$;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.json4s.DefaultFormats;
import org.json4s.DefaultFormats$;
import org.json4s.jackson.Serialization$;
import org.slf4j.Logger;
import scala.Function0;
import scala.MatchError;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.ArrayOps$;
import scala.collection.SeqOps;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Growable;
import scala.collection.mutable.ListBuffer;
import scala.math.Ordering$Int$;
import scala.package$;
import scala.reflect.ClassTag$;
import scala.reflect.ManifestFactory$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.runtime.BoxesRunTime;

/* compiled from: ContinuousTextSocketSource.scala */
@ScalaSignature(bytes = "\u0006\u0005\t%c\u0001B\u0014)\u0001]B\u0001b\u0014\u0001\u0003\u0002\u0003\u0006I\u0001\u0015\u0005\t;\u0002\u0011\t\u0011)A\u0005=\"A!\r\u0001B\u0001B\u0003%a\f\u0003\u0005d\u0001\t\u0005\t\u0015!\u0003e\u0011\u0015Q\u0007\u0001\"\u0001l\u0011\u001d\u0011\bA1A\u0005\u0004MDaA\u001f\u0001!\u0002\u0013!\bbB>\u0001\u0005\u0004%I\u0001 \u0005\b\u00037\u0001\u0001\u0015!\u0003~\u0011-\ti\u0002\u0001a\u0001\u0002\u0004%I!a\b\t\u0017\u00055\u0002\u00011AA\u0002\u0013%\u0011q\u0006\u0005\f\u0003w\u0001\u0001\u0019!A!B\u0013\t\t\u0003C\u0006\u0002Z\u0001\u0001\r\u00111A\u0005\n\u0005m\u0003bCA2\u0001\u0001\u0007\t\u0019!C\u0005\u0003KB1\"!\u001b\u0001\u0001\u0004\u0005\t\u0015)\u0003\u0002^!I\u0011Q\u000e\u0001C\u0002\u0013%\u0011q\u000e\u0005\t\u00033\u0003\u0001\u0015!\u0003\u0002r!I\u0011Q\u0014\u0001A\u0002\u0013%\u0011q\u0014\u0005\n\u0003C\u0003\u0001\u0019!C\u0005\u0003GCq!a*\u0001A\u0003&a\f\u0003\u0007\u0002,\u0002\u0001\r\u00111A\u0005\u0002A\ni\u000b\u0003\u0007\u00026\u0002\u0001\r\u00111A\u0005\u0002A\n9\fC\u0006\u0002<\u0002\u0001\r\u0011!Q!\n\u0005=\u0006\"CA_\u0001\t\u0007I\u0011BA`\u0011!\tI\r\u0001Q\u0001\n\u0005\u0005\u0007bCAf\u0001\u0001\u0007\t\u0019!C\u0005\u0003\u001bD1\"a7\u0001\u0001\u0004\u0005\r\u0011\"\u0003\u0002^\"Y\u0011\u0011\u001d\u0001A\u0002\u0003\u0005\u000b\u0015BAh\u0011\u001d\tY\u000f\u0001C!\u0003[DqA!\u0002\u0001\t\u0003\u00129\u0001C\u0004\u0003\u000e\u0001!\tEa\u0004\t\u000f\tE\u0001\u0001\"\u0011\u0003\u0014!9!1\u0005\u0001\u0005B\t\u0015\u0002b\u0002B\u0017\u0001\u0011\u0005#q\u0006\u0005\b\u0005k\u0001A\u0011\tB\u001c\u0011\u001d\u0011I\u0004\u0001C\u0005\u0005oAqAa\u000f\u0001\t\u0003\u0012i\u0004C\u0004\u0003@\u0001!IA!\u0011\u00035Q+\u0007\u0010^*pG.,GoQ8oi&tWo\\;t'R\u0014X-Y7\u000b\u0005%R\u0013AC2p]RLg.^8vg*\u00111\u0006L\u0001\ngR\u0014X-Y7j]\u001eT!!\f\u0018\u0002\u0013\u0015DXmY;uS>t'BA\u00181\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003cI\nQa\u001d9be.T!a\r\u001b\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005)\u0014aA8sO\u000e\u00011\u0003\u0002\u00019\u0001&\u0003\"!\u000f \u000e\u0003iR!a\u000f\u001f\u0002\t1\fgn\u001a\u0006\u0002{\u0005!!.\u0019<b\u0013\ty$H\u0001\u0004PE*,7\r\u001e\t\u0003\u0003\u001ek\u0011A\u0011\u0006\u0003W\rS!\u0001R#\u0002\tI,\u0017\r\u001a\u0006\u0003\r:\n\u0011bY8o]\u0016\u001cGo\u001c:\n\u0005!\u0013%\u0001E\"p]RLg.^8vgN#(/Z1n!\tQU*D\u0001L\u0015\ta\u0005'\u0001\u0005j]R,'O\\1m\u0013\tq5JA\u0004M_\u001e<\u0017N\\4\u0002\t!|7\u000f\u001e\t\u0003#js!A\u0015-\u0011\u0005M3V\"\u0001+\u000b\u0005U3\u0014A\u0002\u001fs_>$hHC\u0001X\u0003\u0015\u00198-\u00197b\u0013\tIf+\u0001\u0004Qe\u0016$WMZ\u0005\u00037r\u0013aa\u0015;sS:<'BA-W\u0003\u0011\u0001xN\u001d;\u0011\u0005}\u0003W\"\u0001,\n\u0005\u00054&aA%oi\u0006ia.^7QCJ$\u0018\u000e^5p]N\fqa\u001c9uS>t7\u000f\u0005\u0002fQ6\taM\u0003\u0002h]\u0005!Q\u000f^5m\u0013\tIgM\u0001\rDCN,\u0017J\\:f]NLG/\u001b<f'R\u0014\u0018N\\4NCB\fa\u0001P5oSRtD#\u00027o_B\f\bCA7\u0001\u001b\u0005A\u0003\"B(\u0006\u0001\u0004\u0001\u0006\"B/\u0006\u0001\u0004q\u0006\"\u00022\u0006\u0001\u0004q\u0006\"B2\u0006\u0001\u0004!\u0017A\u00043fM\u0006,H\u000e\u001e$pe6\fGo]\u000b\u0002iB\u0011Q\u000f_\u0007\u0002m*\u0011q\u000fN\u0001\u0007UN|g\u000eN:\n\u0005e4(A\u0004#fM\u0006,H\u000e\u001e$pe6\fGo]\u0001\u0010I\u00164\u0017-\u001e7u\r>\u0014X.\u0019;tA\u00059QM\\2pI\u0016\u0014X#A?\u0011\u000by\f9!a\u0003\u000e\u0003}TA!!\u0001\u0002\u0004\u0005AQM\\2pI\u0016\u00148OC\u0002\u0002\u00069\n\u0001bY1uC2L8\u000f^\u0005\u0004\u0003\u0013y(!E#yaJ,7o]5p]\u0016s7m\u001c3feB1q,!\u0004Q\u0003#I1!a\u0004W\u0005\u0019!V\u000f\u001d7feA!\u00111CA\f\u001b\t\t)B\u0003\u00020y%!\u0011\u0011DA\u000b\u0005%!\u0016.\\3ti\u0006l\u0007/\u0001\u0005f]\u000e|G-\u001a:!\u0003\u0019\u0019xnY6fiV\u0011\u0011\u0011\u0005\t\u0005\u0003G\tI#\u0004\u0002\u0002&)\u0019\u0011q\u0005\u001f\u0002\u00079,G/\u0003\u0003\u0002,\u0005\u0015\"AB*pG.,G/\u0001\u0006t_\u000e\\W\r^0%KF$B!!\r\u00028A\u0019q,a\r\n\u0007\u0005UbK\u0001\u0003V]&$\b\"CA\u001d\u0017\u0005\u0005\t\u0019AA\u0011\u0003\rAH%M\u0001\bg>\u001c7.\u001a;!Q\u001da\u0011qHA*\u0003+\u0002B!!\u0011\u0002P5\u0011\u00111\t\u0006\u0005\u0003\u000b\n9%\u0001\u0006d_:\u001cWO\u001d:f]RTA!!\u0013\u0002L\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\u000b\u0005\u00055\u0013!\u00026bm\u0006D\u0018\u0002BA)\u0003\u0007\u0012\u0011bR;be\u0012,GMQ=\u0002\u000bY\fG.^3\"\u0005\u0005]\u0013\u0001\u0002;iSN\f!B]3bIRC'/Z1e+\t\ti\u0006E\u0002:\u0003?J1!!\u0019;\u0005\u0019!\u0006N]3bI\u0006q!/Z1e)\"\u0014X-\u00193`I\u0015\fH\u0003BA\u0019\u0003OB\u0011\"!\u000f\u000f\u0003\u0003\u0005\r!!\u0018\u0002\u0017I,\u0017\r\u001a+ie\u0016\fG\r\t\u0015\b\u001f\u0005}\u00121KA+\u0003\u001d\u0011WoY6fiN,\"!!\u001d\u0011\r\u0005M\u0014QPAA\u001b\t\t)H\u0003\u0003\u0002x\u0005e\u0014!C5n[V$\u0018M\u00197f\u0015\r\tYHV\u0001\u000bG>dG.Z2uS>t\u0017\u0002BA@\u0003k\u00121aU3r!\u0019\t\u0019)!#\u0002\u000e6\u0011\u0011Q\u0011\u0006\u0005\u0003\u000f\u000bI(A\u0004nkR\f'\r\\3\n\t\u0005-\u0015Q\u0011\u0002\u000b\u0019&\u001cHOQ;gM\u0016\u0014\b\u0003BAH\u0003+k!!!%\u000b\t\u0005M\u00151A\u0001\fKb\u0004(/Z:tS>t7/\u0003\u0003\u0002\u0018\u0006E%!C+og\u00064WMU8x\u0003!\u0011WoY6fiN\u0004\u0003fB\t\u0002@\u0005M\u0013QK\u0001\u000eGV\u0014(/\u001a8u\u001f\u001a47/\u001a;\u0016\u0003y\u000b\u0011cY;se\u0016tGo\u00144gg\u0016$x\fJ3r)\u0011\t\t$!*\t\u0011\u0005e2#!AA\u0002y\u000babY;se\u0016tGo\u00144gg\u0016$\b\u0005K\u0004\u0015\u0003\u007f\t\u0019&!\u0016\u0002\u0017M$\u0018M\u001d;PM\u001a\u001cX\r^\u000b\u0003\u0003_\u00032!\\AY\u0013\r\t\u0019\f\u000b\u0002\u0011)\u0016DHoU8dW\u0016$xJ\u001a4tKR\fqb\u001d;beR|eMZ:fi~#S-\u001d\u000b\u0005\u0003c\tI\fC\u0005\u0002:Y\t\t\u00111\u0001\u00020\u0006a1\u000f^1si>3gm]3uA\u0005q!/Z2pe\u0012,e\u000e\u001a9pS:$XCAAa!\u0011\t\u0019-!2\u000e\u0003)J1!a2+\u0005a\u0019uN\u001c;j]V|Wo\u001d*fG>\u0014H-\u00128ea>Lg\u000e^\u0001\u0010e\u0016\u001cwN\u001d3F]\u0012\u0004x.\u001b8uA\u0005YQM\u001c3q_&tGOU3g+\t\ty\r\u0005\u0003\u0002R\u0006]WBAAj\u0015\r\t)\u000eM\u0001\u0004eB\u001c\u0017\u0002BAm\u0003'\u0014aB\u00159d\u000b:$\u0007o\\5oiJ+g-A\bf]\u0012\u0004x.\u001b8u%\u00164w\fJ3r)\u0011\t\t$a8\t\u0013\u0005e2$!AA\u0002\u0005=\u0017\u0001D3oIB|\u0017N\u001c;SK\u001a\u0004\u0003f\u0001\u000f\u0002fB\u0019q,a:\n\u0007\u0005%hK\u0001\u0005w_2\fG/\u001b7f\u00031iWM]4f\u001f\u001a47/\u001a;t)\u0011\ty/!>\u0011\u0007\u0005\u000b\t0C\u0002\u0002t\n\u0013aa\u00144gg\u0016$\bbBA|;\u0001\u0007\u0011\u0011`\u0001\b_\u001a47/\u001a;t!\u0015y\u00161`A��\u0013\r\tiP\u0016\u0002\u0006\u0003J\u0014\u0018-\u001f\t\u0004\u0003\n\u0005\u0011b\u0001B\u0002\u0005\ny\u0001+\u0019:uSRLwN\\(gMN,G/A\teKN,'/[1mSj,wJ\u001a4tKR$B!a<\u0003\n!1!1\u0002\u0010A\u0002A\u000bAA[:p]\u0006i\u0011N\\5uS\u0006dwJ\u001a4tKR$\"!a<\u0002'Ad\u0017M\\%oaV$\b+\u0019:uSRLwN\\:\u0015\t\tU!q\u0004\t\u0006?\u0006m(q\u0003\t\u0005\u00053\u0011Y\"D\u0001D\u0013\r\u0011ib\u0011\u0002\u000f\u0013:\u0004X\u000f\u001e)beRLG/[8o\u0011\u001d\u0011\t\u0003\ta\u0001\u0003_\fQa\u001d;beR\fQd\u0019:fCR,7i\u001c8uS:,x.^:SK\u0006$WM\u001d$bGR|'/\u001f\u000b\u0003\u0005O\u00012!\u0011B\u0015\u0013\r\u0011YC\u0011\u0002!\u0007>tG/\u001b8v_V\u001c\b+\u0019:uSRLwN\u001c*fC\u0012,'OR1di>\u0014\u00180\u0001\u0004d_6l\u0017\u000e\u001e\u000b\u0005\u0003c\u0011\t\u0004C\u0004\u00034\t\u0002\r!a<\u0002\u0007\u0015tG-\u0001\u0003ti>\u0004HCAA\u0019\u0003)Ig.\u001b;jC2L'0Z\u0001\ti>\u001cFO]5oOR\t\u0001+\u0001\tj]\u000edW\u000fZ3US6,7\u000f^1naV\u0011!1\t\t\u0004?\n\u0015\u0013b\u0001B$-\n9!i\\8mK\u0006t\u0007")
/* loaded from: input_file:org/apache/spark/sql/execution/streaming/continuous/TextSocketContinuousStream.class */
public class TextSocketContinuousStream implements ContinuousStream, Logging {
    public final String org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$host;
    public final int org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$port;
    public final int org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$numPartitions;
    private final CaseInsensitiveStringMap options;
    private final DefaultFormats defaultFormats;
    private final ExpressionEncoder<Tuple2<String, Timestamp>> org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$encoder;

    @GuardedBy("this")
    private Socket socket;

    @GuardedBy("this")
    private Thread readThread;

    @GuardedBy("this")
    private final Seq<ListBuffer<UnsafeRow>> org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$buckets;

    @GuardedBy("this")
    private int org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$currentOffset;
    private TextSocketOffset startOffset;
    private final ContinuousRecordEndpoint recordEndpoint;
    private volatile RpcEndpointRef endpointRef;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public void initializeForcefully(boolean z, boolean z2) {
        Logging.initializeForcefully$(this, z, z2);
    }

    public boolean needsReconfiguration() {
        return super.needsReconfiguration();
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    public DefaultFormats defaultFormats() {
        return this.defaultFormats;
    }

    public ExpressionEncoder<Tuple2<String, Timestamp>> org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$encoder() {
        return this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$encoder;
    }

    private Socket socket() {
        return this.socket;
    }

    private void socket_$eq(Socket socket) {
        this.socket = socket;
    }

    private Thread readThread() {
        return this.readThread;
    }

    private void readThread_$eq(Thread thread) {
        this.readThread = thread;
    }

    public Seq<ListBuffer<UnsafeRow>> org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$buckets() {
        return this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$buckets;
    }

    public int org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$currentOffset() {
        return this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$currentOffset;
    }

    public void org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$currentOffset_$eq(int i) {
        this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$currentOffset = i;
    }

    public TextSocketOffset startOffset() {
        return this.startOffset;
    }

    public void startOffset_$eq(TextSocketOffset textSocketOffset) {
        this.startOffset = textSocketOffset;
    }

    private ContinuousRecordEndpoint recordEndpoint() {
        return this.recordEndpoint;
    }

    private RpcEndpointRef endpointRef() {
        return this.endpointRef;
    }

    private void endpointRef_$eq(RpcEndpointRef rpcEndpointRef) {
        this.endpointRef = rpcEndpointRef;
    }

    public Offset mergeOffsets(PartitionOffset[] partitionOffsetArr) {
        Predef$.MODULE$.assert(partitionOffsetArr.length == this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$numPartitions);
        return new TextSocketOffset(Predef$.MODULE$.wrapIntArray((int[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps((Object[]) ArrayOps$.MODULE$.sortBy$extension(Predef$.MODULE$.refArrayOps((Object[]) ArrayOps$.MODULE$.map$extension(Predef$.MODULE$.refArrayOps(partitionOffsetArr), partitionOffset -> {
            return (ContinuousRecordPartitionOffset) partitionOffset;
        }, ClassTag$.MODULE$.apply(ContinuousRecordPartitionOffset.class))), continuousRecordPartitionOffset -> {
            return BoxesRunTime.boxToInteger(continuousRecordPartitionOffset.partitionId());
        }, Ordering$Int$.MODULE$)), continuousRecordPartitionOffset2 -> {
            return BoxesRunTime.boxToInteger(continuousRecordPartitionOffset2.offset());
        }, ClassTag$.MODULE$.Int())).toList());
    }

    public Offset deserializeOffset(String str) {
        return new TextSocketOffset((List) Serialization$.MODULE$.read(str, defaultFormats(), ManifestFactory$.MODULE$.classType(List.class, ManifestFactory$.MODULE$.Int(), Nil$.MODULE$)));
    }

    public Offset initialOffset() {
        startOffset_$eq(new TextSocketOffset(package$.MODULE$.List().fill(this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$numPartitions, () -> {
            return 0;
        })));
        return startOffset();
    }

    public InputPartition[] planInputPartitions(Offset offset) {
        TextSocketOffset textSocketOffset = (TextSocketOffset) offset;
        recordEndpoint().setStartOffsets(textSocketOffset.offsets());
        String str = "TextSocketContinuousReaderEndpoint-" + UUID.randomUUID();
        endpointRef_$eq(recordEndpoint().rpcEnv().setupEndpoint(str, recordEndpoint()));
        if (textSocketOffset == null) {
            throw new IllegalArgumentException("invalid offset type " + textSocketOffset.getClass() + " for TextSocketContinuousReader");
        }
        List<Object> offsets = textSocketOffset.offsets();
        if (offsets.size() != this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$numPartitions) {
            throw new IllegalArgumentException("The previous run contained " + offsets.size() + " partitions, but " + this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$numPartitions + " partitions are currently configured. The numPartitions option cannot be changed.");
        }
        return (InputPartition[]) ((List) textSocketOffset.offsets().zipWithIndex()).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            return new TextSocketContinuousInputPartition(str, tuple2._2$mcI$sp(), tuple2._1$mcI$sp(), this.includeTimestamp());
        }).toArray(ClassTag$.MODULE$.apply(InputPartition.class));
    }

    public ContinuousPartitionReaderFactory createContinuousReaderFactory() {
        return TextSocketReaderFactory$.MODULE$;
    }

    public synchronized void commit(Offset offset) {
        if (!(offset instanceof TextSocketOffset)) {
            throw new IllegalArgumentException("TextSocketContinuousReader.commit()received an offset (" + offset + ") that did not originate with an instance of this class");
        }
        TextSocketOffset textSocketOffset = (TextSocketOffset) offset;
        ((List) textSocketOffset.offsets().zipWithIndex()).foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            int _1$mcI$sp = tuple2._1$mcI$sp();
            int _2$mcI$sp = tuple2._2$mcI$sp();
            int unboxToInt = BoxesRunTime.unboxToInt(this.startOffset().offsets().apply(_2$mcI$sp)) + ((SeqOps) this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$buckets().apply(_2$mcI$sp)).size();
            if (_1$mcI$sp > unboxToInt) {
                throw new IllegalStateException("Invalid offset " + _1$mcI$sp + " to commit for partition " + _2$mcI$sp + ". Max valid offset: " + unboxToInt);
            }
            return ((Buffer) this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$buckets().apply(_2$mcI$sp)).dropInPlace(_1$mcI$sp - BoxesRunTime.unboxToInt(this.startOffset().offsets().apply(_2$mcI$sp)));
        });
        startOffset_$eq(textSocketOffset);
        recordEndpoint().setStartOffsets(startOffset().offsets());
    }

    public synchronized void stop() {
        if (socket() != null) {
            try {
                socket().close();
            } catch (IOException e) {
            }
            socket_$eq(null);
        }
        if (endpointRef() != null) {
            recordEndpoint().rpcEnv().stop(endpointRef());
        }
    }

    private synchronized void initialize() {
        socket_$eq(new Socket(this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$host, this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$port));
        final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket().getInputStream()));
        readThread_$eq(new Thread(this, bufferedReader) { // from class: org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousStream$$anon$1
            private final /* synthetic */ TextSocketContinuousStream $outer;
            private final BufferedReader reader$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                ExpressionEncoder.Serializer createSerializer = this.$outer.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$encoder().createSerializer();
                while (true) {
                    try {
                        String readLine = this.reader$1.readLine();
                        if (readLine == null) {
                            this.$outer.logWarning(() -> {
                                return "Stream closed by " + this.$outer.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$host + ":" + this.$outer.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$port;
                            });
                            return;
                        }
                        ContinuousStream continuousStream = this.$outer;
                        synchronized (continuousStream) {
                            this.$outer.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$currentOffset_$eq(this.$outer.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$currentOffset() + 1);
                            ((Growable) this.$outer.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$buckets().apply(this.$outer.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$currentOffset() % this.$outer.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$numPartitions)).$plus$eq(createSerializer.apply(new Tuple2(readLine, Timestamp.valueOf(TextSocketReader$.MODULE$.DATE_FORMAT().format(Calendar.getInstance().getTime())))).copy());
                        }
                    } catch (IOException e) {
                        return;
                    }
                }
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super("TextSocketContinuousReader(" + this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$host + ", " + this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$port + ")");
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.reader$1 = bufferedReader;
                setDaemon(true);
            }
        });
        readThread().start();
    }

    public String toString() {
        return "TextSocketContinuousReader[host: " + this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$host + ", port: " + this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$port + "]";
    }

    private boolean includeTimestamp() {
        return this.options.getBoolean("includeTimestamp", false);
    }

    public TextSocketContinuousStream(String str, int i, int i2, CaseInsensitiveStringMap caseInsensitiveStringMap) {
        this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$host = str;
        this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$port = i;
        this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$numPartitions = i2;
        this.options = caseInsensitiveStringMap;
        Logging.$init$(this);
        this.defaultFormats = DefaultFormats$.MODULE$;
        ExpressionEncoder$ expressionEncoder$ = ExpressionEncoder$.MODULE$;
        ExpressionEncoder$ expressionEncoder$2 = ExpressionEncoder$.MODULE$;
        TypeTags universe = scala.reflect.runtime.package$.MODULE$.universe();
        final TextSocketContinuousStream textSocketContinuousStream = null;
        ExpressionEncoder apply = expressionEncoder$2.apply(universe.TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(TextSocketContinuousStream.class.getClassLoader()), new TypeCreator(textSocketContinuousStream) { // from class: org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousStream$$typecreator1$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                Universe universe2 = mirror.universe();
                return universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe2.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
            }
        }));
        ExpressionEncoder$ expressionEncoder$3 = ExpressionEncoder$.MODULE$;
        TypeTags universe2 = scala.reflect.runtime.package$.MODULE$.universe();
        final TextSocketContinuousStream textSocketContinuousStream2 = null;
        this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$encoder = expressionEncoder$.tuple(apply, expressionEncoder$3.apply(universe2.TypeTag().apply(scala.reflect.runtime.package$.MODULE$.universe().runtimeMirror(TextSocketContinuousStream.class.getClassLoader()), new TypeCreator(textSocketContinuousStream2) { // from class: org.apache.spark.sql.execution.streaming.continuous.TextSocketContinuousStream$$typecreator2$1
            public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                mirror.universe();
                return mirror.staticClass("java.sql.Timestamp").asType().toTypeConstructor();
            }
        })));
        this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$buckets = (Seq) package$.MODULE$.Seq().fill(i2, () -> {
            return new ListBuffer();
        });
        this.org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$currentOffset = -1;
        this.recordEndpoint = new ContinuousRecordEndpoint(org$apache$spark$sql$execution$streaming$continuous$TextSocketContinuousStream$$buckets(), this);
        initialize();
    }
}
