Thursday, August 27, 2015

Property-based testing with Spark Streaming

In the previous post we saw how to use ScalaCheck for performing property-based testing on batch Spark programs. This post is about using ScalaCheck with Spark Streaming. But first let's recapitulate a bit. Just a little reminder that in property-based testing the assertions about single input-output pairs often used in unit testing, are replaced by properties that relate randomly synthesized inputs with the outputs obtained by applying the test subject.

As seen in the previous post, the main difficulty to integrate ScalaCheck with Spark lies in sharing a Spark context, which provides a handler to the Spark cluster, among all the ScalaCheck test cases. This is needed because creating a Spark context is an expensive operation, and because only a single Spark context should be running per JVM. Sharing a connection to an expensive shared resource is common in unit testing, so we were able to easily solve that problem with Specs2, by creating a trait SharedSparkContextBeforeAfterAll extending org.specs2.specification.BeforeAfterAll, that handles the Spark context lifecycle. When using ScalaCheck with Spark Streaming we can reuse the same solution, but we also have to face other less common problems:
  • Besides a Spark context, Spark Streaming programs use a streaming context to define the computations, which are transformations or periodic actions executed over series of RDDs that contain small batches of records. Those series of RDD are called DStreams which stands for discretized streams. Each streaming context is associated to a Spark context, and it is a lightweight object that can be created quickly, but it has to be started and stopped explicitly, not only created. Besides, all the transformations and actions on DStreams have to be defined before the streaming context has started. Finally, only a single streaming context can be active per JVM, and streaming contexts cannot be restarted. This is a complex lifecycle that needs to be handled with care. 
  • Spark Streaming programs are designed to run forever, hence we need to a way to determine when all the assertions relevant to a test case have been executed completely, so we can then finish the test case by stopping the streaming context. 
  • DStream batches are generated with a fixed frequency that we call the "batch interval". All the batches are expected to be completed at the same speed but, as we are generating random test cases, then we'll often have batches significantly bigger than others, so in practice some batches will be computed faster than others. And anyway, in general some input values are faster to compute than others (as an extreme example consider a transformation that computes the i-th prime number for each input number i).  As the chosen batch interval must leave enough time to compute the slowest batches, this might lead to wasting time when computing the fastest batches, and tests not running as fast as they should.
  • As DStreams are meant to run nonstop, the Spark Streaming runtime captures any exception generated when computing a batch, to prevent stopping the computation. That includes exceptions generated by Specs2 matchers, or simply unexpected exceptions that would normally lead to a failing test case, so some care must be taken to ensure those exceptions are not hidden by Spark's well-intentioned runtime. 



But it's not all bad news. We can reuse the same Spark context for several streaming contexts. That leads naturally to a test life cycle where we create and stop Spark contexts with BeforeAfterAll, and create and stop Spark streaming contexts with BeforeAfterEach, that is encapsulated in the trait SharedStreamingContextBeforeAfterEach. We still need to manually start the streaming context in the body of the test case, but only after declaring the derived DStreams defined by applying the test subject, and the actions that apply the assertions that characterize the test. To avoid losing the results of the matchers, due to Spark captured exceptions, we can use Specs2's Result type, starting from ok and combining the result obtained for each batch with Specs2's and operator. Finally, in order to determine when the test can finish, we can register a StreamingListener in the streaming context, that notifies a SyncVar each time a batch is completed, so we can use the SyncVar to block waiting for completion of a fixed number of batches: this is implemented in the method awaitForNBatchesCompleted of the object StreamingContextUtils. That is all we need to define simple unit test for Spark Streaming like the one below, which checks some obvious properties on the input DStream, and a derived DStream. Note also the use of the custom Specs2 matcher foreachRecord, that checks whether a predicate holds for all the records of an RDD.
  def successfulSimpleQueueStreamTest = simpleQueueStreamTest(expectedCount = 0)
  def failingSimpleQueueStreamTest = simpleQueueStreamTest(expectedCount = 1) must beFailing
        
  def simpleQueueStreamTest(expectedCount : Int) : Result = {
    val record = "hola"
    val batches = Seq.fill(5)(Seq.fill(10)(record))
    val queue = new Queue[RDD[String]]
    queue ++= batches.map(batch => sc.parallelize(batch, numSlices = defaultParallelism))
    val inputDStream = ssc.queueStream(queue, oneAtATime = true)
    val sizesDStream = inputDStream.map(_.length)
    
    var batchCount = 0
    var result : Result = ok
    inputDStream.foreachRDD { rdd =>
      batchCount += 1
      println(s"completed batch number $batchCount: ${rdd.collect.mkString(",")}")
      result = result and {
        rdd.filter(_!= record).count() === expectedCount
        rdd should existsRecord(_ == "hola")
      }
    }
    sizesDStream.foreachRDD { rdd =>
      result = result and { 
        rdd should foreachRecord(record.length)(len => _ == len)      
      }
    }
    
    // should only start the dstream after all the transformations and actions have been defined
    ssc.start()
    
    // wait for completion of batches.length batches
    StreamingContextUtils.awaitForNBatchesCompleted(batches.length, atMost = 10 seconds)(ssc)
    
    result
  }
But we still haven't been able to define a ScalaCheck property for a DStream transformation. First of all we need to have a clear notion of what it is a test case for a DStream. This is not completely obvious, because DStreams are meant to run indefinitely, while test cases should be executed in a finite time. We have chosen to define that a DStream test case is a finite prefix of an infinite intended DStream. Hence any ScalaCheck generator for Seq[Seq[T]] can be interpreted as a generator of DStream[T]. Now we need a way to create a DStream per each test case. We could try using StreamingContext.queueStream for that, creating and stopping a new streaming context per each test case, which is not compatible with the lifecycle defined by SharedStreamingContextBeforeAfterEach. We could generate all the test cases before running the property, and then generating the corresponding batches with a queue DStream, but then we would generate test cases that won't be used if the property fails before reaching them, thus wasting memory and CPU. For generating test cases on demand we could use a custom actor Spark receiver, and send the batches as messages to the actor, that would act as a proxy for the corresponding receiver. But that doesn't work because the actor buffers the records, and doesn't respects the integrity of the test cases because batches end up intermingled.
In the end our solution was writing a custom InputDStream that is basically a variation of QueueInputDStream that allows dynamic addition of test cases as Seq[Seq[T]] objects. Combining this with the ideas above for developing unit tests for Spark Streaming, we have developed a first ScalaCheck higher order property for Spark Streaming. The function DStreamProp.forAllAlways uses g1 to generate test cases corresponding to prefixes of an input DStream, then employs gt1 to define a derived DStream, and checks that for each of the test cases the corresponding assertions defined by assertions hold for all the batches.
def forAllAlways[E1:ClassTag,E2:ClassTag,P]
    (g1: Gen[Seq[Seq[E1]]])(
     gt1 : (DStream[E1]) => DStream[E2])(
     assertions: (RDD[E1], RDD[E2]) => P)(
     implicit pv: P => Prop, rv : P => Result, 
     pp1: Seq[Seq[E1]] => Pretty, 
     ssc : StreamingContext, 
     parallelism : Parallelism): Prop
We can use it to define properties like the following simple example property, which checks that a DStream transformation computing the count for each batch is correctly defined:
def countProp(testSubject : DStream[Double] => DStream[Long]) = 
    DStreamProp.forAllAlways(
      Gen.listOfN(10,  Gen.listOfN(30, arbitrary[Double])))(
      testSubject)( 
      (inputBatch : RDD[Double], transBatch : RDD[Long]) => {
        transBatch.count === 1 and
        inputBatch.count === transBatch.first
      }).set(minTestsOk = 10).verbose  

This first proposal still leaves some open issues. First of all, currently DStreamProp.forAllAlways only supports a single generator and a single derived DStream: this should be overloaded to support more arities.
Also, we still have to deal with the problem of non-uniform batch execution time. Holden Karau's spark-testing-base Spark package solves this by implementing a hack to accelerate the completion of a batch when all the expected output records have already been generated. Another option would be parallelizing the execution of several test cases at the same time, which should lead to a more uniform computation time at each batch, as a slower batch would be compensated by a faster batch being executed at the same time. We have made some preliminary work on that line.
Besides, Spark launches weird exceptions when the batch interval is too small for the machine running the test, that is unable to keep up processing the batches. This is unavoidable to some extent, and can be easily solved by tuning the batch interval, but some care should be taken to ensure tests always fail in that situation.

Finally, testing the same assertions for all the batches is very rigid. New ScalaCheck higher order properties should be developed to allow for more flexible testing scenarios. This is something we are planning for the upcoming new release of sscheck in a couple of weeks.