目录

Background

简单讲解下spark streaming, structed streaming

我们都知道spark中有两种streaming,一种是spark streaming,另一种是structed streaming。spark streaming是微批处理,隔一段时间提交一批job,底层走的还是rdd。

而structed streaming是spark为了满足低时延的需求,重新设计的一套流式处理机制。相关的PR是SPARK-29028 SPIP: Continuous Processing Mode for Structured Streaming.

Spark Streaming

首先讲一下微批的streaming。这种streaming 使用DStream进行操作,其API与RDD编程类似。其对应的Context为StreamingContext.

StreamingContext

构造如下:

    val ssc = new StreamingContext(sparkConf, Seconds(2))

其底层也是会创建一个SparkContext,只不过StreamingContext提供了一些streaming编程的Api。可以看到后面的2s是微批的频率,每2秒钟触发一次批处理。

下面是一个HDFSWordCount的例子.

    StreamingExamples.setStreamingLogLevels()
    val sparkConf = new SparkConf().setAppName("HdfsWordCount")
    // Create the context
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    // Create the FileInputDStream on the directory and use the
    // stream to count words in new files created
    val lines = ssc.textFileStream(args(0))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()

可以看到是先指定好调度频率为2s,然后指定每个批次要执行的动作,然后调用start方法开始处理。

下面是start方法的核心代码:

ThreadUtils.runInNewThread("streaming-start") {
 sparkContext.setCallSite(startSite.get)
 sparkContext.clearJobGroup()
 sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
 savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
 scheduler.start()
}

可以看到是启动一个新的线程,是为了在设置callsite 以及job group这些 thread local时候不影响当前线程。

然后这里有一个scheduler.start, 这是 streaming任务的核心, JobScheduler.

JobScheduler

下面是JobScheduler的start方法:

  def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start()
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)

    val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
      case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
      case _ => null
    }

    executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
      executorAllocClient,
      receiverTracker,
      ssc.conf,
      ssc.graph.batchDuration.milliseconds,
      clock)
    executorAllocationManager.foreach(ssc.addStreamingListener)
    receiverTracker.start()
    jobGenerator.start()
    executorAllocationManager.foreach(_.start())
    logInfo("Started JobScheduler")
  }

主要启动了以下组件:

  • receiverTracker 用于接受数据,例如接收从kafka发送的数据
  • inputInfoTracker 统计输入信息,用于监控
  • jobGenerator 用于job生成,每个时间间隔生成一批job
  • executorAllocationManager executor动态分配管理器
ExecutorAllocationManager

是否打开由spark.streaming.dynamicAllocation.enabled控制。可以看出和spark core中的参数很像。也新加了几个参数.

参数 说明 默认值
spark.streaming.dynamicAllocation.scalingInterval 动态分配调整间隔 60s
spark.streaming.dynamicAllocation.scalingUpRatio ratio上限 0.9
spark.streaming.dynamicAllocation.scalingDownRatio ratio下限 0.3

这个动态分配管理器和Core中的有何不同呢?

在core中的管理器是基于空闲时间来控制回收这些executor,而在流处理这些微批中,一个executor空闲是不太可能的,因为每隔很少的时间都会有一批作业被调度,那么在streaming里面如何控制executor的分配和回收呢?

基本策略是基于每批作业处理的时间来确定是否是idle-ness.

  • 使用streamingListener来获得每批jobs 处理的时间。
  • 周期性的(spark.streaming.dynamicAllocation.scalingInterval)拿jobs处理时间和调度周期做对比。
  • 如果 平均处理时间/调度间隔 >= ratio上限,则调大executor数量。
  • 如果 平均处理时间/调度间隔 =< ratio下限,则调小executor数量。

默认上限是0.9,下限为0.3。即如果间隔为2s,如果平均处理时间大于等于1.8s,那么就要调大executor;如果平均处理时间小于等于0.6s,那么就要调小executor数量。

调度

之后的过程就不详细讲了。最近也在做一个跟streamingListener有关的项目,其实了解调度可以从StreamingListener入手,可以看下listener都记录哪些事件。

trait StreamingListener {

  /** Called when the streaming has been started */
  def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }

  /** Called when a receiver has been started */
  def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }

  /** Called when a receiver has reported an error */
  def onReceiverError(receiverError: StreamingListenerReceiverError) { }

  /** Called when a receiver has been stopped */
  def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) { }

  /** Called when a batch of jobs has been submitted for processing. */
  def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }

  /** Called when processing of a batch of jobs has started.  */
  def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { }

  /** Called when processing of a batch of jobs has completed. */
  def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }

  /** Called when processing of a job of a batch has started. */
  def onOutputOperationStarted(
      outputOperationStarted: StreamingListenerOutputOperationStarted) { }

  /** Called when processing of a job of a batch has completed. */
  def onOutputOperationCompleted(
      outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}

前面与receiver相关的我们不管,只看跟处理有关的。

  • BatchSubmitted 是代表提交了一批jobs,对应的是一个JobSet
  • BatchStartted,当对应的jobSet里面的第一个job开始执行时候触发
  • BatchCompleted,当对应的jobSet里面的所有job都完成时触发
  • OutputOperationStarted 这是对应一个job的开始
  • OutputOperationCompleted 对应一个job的完成

下面是一批jobs 信息的数据结构。

case class BatchInfo(
    batchTime: Time,
    streamIdToInputInfo: Map[Int, StreamInputInfo],
    submissionTime: Long,
    processingStartTime: Option[Long],
    processingEndTime: Option[Long],
    outputOperationInfos: Map[Int, OutputOperationInfo]
  ) 

可以看到每个batchInfo对应的key就是一个batchTime,这是独一无二的,最后面有一个outputOperationInfos,这是对应里面每个job的信息,里面包含每个job的failureReason,如果那个job出错的话。

之后就没啥说的了,最终那些streaming的job还是走的底层RDD,这就和普通的批任务没区别了。

Structed Streaming

在spark Streaming中,最小的可能延迟受限于每批的调度间隔以及任务启动时间。因此,这不能满足更低延迟的需求。

如果能够连续的处理,尤其是简单的处理而没有任何的阻塞操作。这种连续处理的架构可以使得端到端延迟最低降低到1ms级别,而不是目前的10-100ms级别.

基本概念

介绍下Epoch, waterMark

EpochTracker是使用一个AtomicLong来计算EpochID,而其incrementCurrentEpoch方法只有在ContinuousCoalesceRDDContinuousWriteRDD中被调用。也就是说只有在进行类似于shuffle 和action的时候才被调用,所以Epoch类似于RDD执行中的StageId。

waterMark是一个标记,代表在这个时间点之前的数据全部都已经完成。

Example

Structed Streaming的Api 和sql比较类似。下面是一个StructuredNetworkWordCount 的例子.

object StructuredNetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    val host = args(0)
    val port = args(1).toInt

    val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .getOrCreate()

    import spark.implicits._

    // Create DataFrame representing the stream of input lines from connection to host:port
    val lines = spark.readStream
      .format("socket")
      .option("host", host)
      .option("port", port)
      .load()

    // Split the lines into words
    val words = lines.as[String].flatMap(_.split(" "))

    // Generate running word count
    val wordCounts = words.groupBy("value").count()

    // Start running the query that prints the running counts to the console
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .start()

    query.awaitTermination()
  }
}

可以看到这个写法就像是DataSource一样。

readStream 和 writeStream 对应的是DataStreamReaderDataStreamWriter.

To Be Continued