Spark Speculation May Break Data Consistency

Sometimes we may want to activate the spark.speculation option to speed up our job, but it may be dangerous. Why? Because part of our output/update actions may be duplicated, which may break data consistency.


Let’s show it by a simple test.

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
  *
  * Usage: NetworkWordCount <hostname> <port>
  * <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
  *
  * To run this on your local machine, you need to first run a Netcat server
  *    `$ nc -lk 9999`
  * and then run the example
  *    `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999`
  */
object NetworkWordCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println("Usage: NetworkWordCount <hostname> <port>")
      System.exit(1)
    }

    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(30))

    // Create a socket stream on target ip:port and count the
    // words in input stream of \n delimited text (eg. generated by 'nc')
    // Note that no duplication in storage level only for running locally.
    // Replication necessary in distributed scenario for fault tolerance.
    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" ")).repartition(5)
    val wordCounts = words.map(x => {
      if (TaskContext.getPartitionId() % 2 > 0) {
        Thread.sleep(10000)
      }
      (x, 1)}
    ).reduceByKey(_ + _)
    wordCounts.repartition(3).foreachRDD(
      rdd => rdd.foreachPartition(
        partition => {
          var sleepFlag = false
          if (partition.nonEmpty && TaskContext.getPartitionId() % 2 > 0) {
            sleepFlag = true
          }
          partition.foreach(x => {
            if (sleepFlag) Thread.sleep(10000)
            System.out.println("TestMark: partition=" + TaskContext.getPartitionId() + ", value=" + x)
          })
        }
    ))
    ssc.start()
    ssc.awaitTermination()
  }
}

Submit the upon class to YARN cluster(speculation is not available in local mode)

spark-submit --master yarn  --conf spark.speculation.quantile=0.1 --conf spark.speculation=true --class NetworkWordCount target/tests-1.0-SNAPSHOT.jar hostname 9999

In the stages and tasks view, we can see that the speculation triggered.

Then we check the stdout of each task and can find duplicate items

So, be careful about activating this option!


Refer