Thursday, August 27, 2015

Property-based testing with Spark Streaming

In the previous post we saw how to use ScalaCheck for performing property-based testing on batch Spark programs. This post is about using ScalaCheck with Spark Streaming. But first let's recapitulate a bit. Just a little reminder that in property-based testing the assertions about single input-output pairs often used in unit testing, are replaced by properties that relate randomly synthesized inputs with the outputs obtained by applying the test subject.

As seen in the previous post, the main difficulty to integrate ScalaCheck with Spark lies in sharing a Spark context, which provides a handler to the Spark cluster, among all the ScalaCheck test cases. This is needed because creating a Spark context is an expensive operation, and because only a single Spark context should be running per JVM. Sharing a connection to an expensive shared resource is common in unit testing, so we were able to easily solve that problem with Specs2, by creating a trait SharedSparkContextBeforeAfterAll extending org.specs2.specification.BeforeAfterAll, that handles the Spark context lifecycle. When using ScalaCheck with Spark Streaming we can reuse the same solution, but we also have to face other less common problems:
  • Besides a Spark context, Spark Streaming programs use a streaming context to define the computations, which are transformations or periodic actions executed over series of RDDs that contain small batches of records. Those series of RDD are called DStreams which stands for discretized streams. Each streaming context is associated to a Spark context, and it is a lightweight object that can be created quickly, but it has to be started and stopped explicitly, not only created. Besides, all the transformations and actions on DStreams have to be defined before the streaming context has started. Finally, only a single streaming context can be active per JVM, and streaming contexts cannot be restarted. This is a complex lifecycle that needs to be handled with care. 
  • Spark Streaming programs are designed to run forever, hence we need to a way to determine when all the assertions relevant to a test case have been executed completely, so we can then finish the test case by stopping the streaming context. 
  • DStream batches are generated with a fixed frequency that we call the "batch interval". All the batches are expected to be completed at the same speed but, as we are generating random test cases, then we'll often have batches significantly bigger than others, so in practice some batches will be computed faster than others. And anyway, in general some input values are faster to compute than others (as an extreme example consider a transformation that computes the i-th prime number for each input number i).  As the chosen batch interval must leave enough time to compute the slowest batches, this might lead to wasting time when computing the fastest batches, and tests not running as fast as they should.
  • As DStreams are meant to run nonstop, the Spark Streaming runtime captures any exception generated when computing a batch, to prevent stopping the computation. That includes exceptions generated by Specs2 matchers, or simply unexpected exceptions that would normally lead to a failing test case, so some care must be taken to ensure those exceptions are not hidden by Spark's well-intentioned runtime. 



But it's not all bad news. We can reuse the same Spark context for several streaming contexts. That leads naturally to a test life cycle where we create and stop Spark contexts with BeforeAfterAll, and create and stop Spark streaming contexts with BeforeAfterEach, that is encapsulated in the trait SharedStreamingContextBeforeAfterEach. We still need to manually start the streaming context in the body of the test case, but only after declaring the derived DStreams defined by applying the test subject, and the actions that apply the assertions that characterize the test. To avoid losing the results of the matchers, due to Spark captured exceptions, we can use Specs2's Result type, starting from ok and combining the result obtained for each batch with Specs2's and operator. Finally, in order to determine when the test can finish, we can register a StreamingListener in the streaming context, that notifies a SyncVar each time a batch is completed, so we can use the SyncVar to block waiting for completion of a fixed number of batches: this is implemented in the method awaitForNBatchesCompleted of the object StreamingContextUtils. That is all we need to define simple unit test for Spark Streaming like the one below, which checks some obvious properties on the input DStream, and a derived DStream. Note also the use of the custom Specs2 matcher foreachRecord, that checks whether a predicate holds for all the records of an RDD.
  def successfulSimpleQueueStreamTest = simpleQueueStreamTest(expectedCount = 0)
  def failingSimpleQueueStreamTest = simpleQueueStreamTest(expectedCount = 1) must beFailing
        
  def simpleQueueStreamTest(expectedCount : Int) : Result = {
    val record = "hola"
    val batches = Seq.fill(5)(Seq.fill(10)(record))
    val queue = new Queue[RDD[String]]
    queue ++= batches.map(batch => sc.parallelize(batch, numSlices = defaultParallelism))
    val inputDStream = ssc.queueStream(queue, oneAtATime = true)
    val sizesDStream = inputDStream.map(_.length)
    
    var batchCount = 0
    var result : Result = ok
    inputDStream.foreachRDD { rdd =>
      batchCount += 1
      println(s"completed batch number $batchCount: ${rdd.collect.mkString(",")}")
      result = result and {
        rdd.filter(_!= record).count() === expectedCount
        rdd should existsRecord(_ == "hola")
      }
    }
    sizesDStream.foreachRDD { rdd =>
      result = result and { 
        rdd should foreachRecord(record.length)(len => _ == len)      
      }
    }
    
    // should only start the dstream after all the transformations and actions have been defined
    ssc.start()
    
    // wait for completion of batches.length batches
    StreamingContextUtils.awaitForNBatchesCompleted(batches.length, atMost = 10 seconds)(ssc)
    
    result
  }
But we still haven't been able to define a ScalaCheck property for a DStream transformation. First of all we need to have a clear notion of what it is a test case for a DStream. This is not completely obvious, because DStreams are meant to run indefinitely, while test cases should be executed in a finite time. We have chosen to define that a DStream test case is a finite prefix of an infinite intended DStream. Hence any ScalaCheck generator for Seq[Seq[T]] can be interpreted as a generator of DStream[T]. Now we need a way to create a DStream per each test case. We could try using StreamingContext.queueStream for that, creating and stopping a new streaming context per each test case, which is not compatible with the lifecycle defined by SharedStreamingContextBeforeAfterEach. We could generate all the test cases before running the property, and then generating the corresponding batches with a queue DStream, but then we would generate test cases that won't be used if the property fails before reaching them, thus wasting memory and CPU. For generating test cases on demand we could use a custom actor Spark receiver, and send the batches as messages to the actor, that would act as a proxy for the corresponding receiver. But that doesn't work because the actor buffers the records, and doesn't respects the integrity of the test cases because batches end up intermingled.
In the end our solution was writing a custom InputDStream that is basically a variation of QueueInputDStream that allows dynamic addition of test cases as Seq[Seq[T]] objects. Combining this with the ideas above for developing unit tests for Spark Streaming, we have developed a first ScalaCheck higher order property for Spark Streaming. The function DStreamProp.forAllAlways uses g1 to generate test cases corresponding to prefixes of an input DStream, then employs gt1 to define a derived DStream, and checks that for each of the test cases the corresponding assertions defined by assertions hold for all the batches.
def forAllAlways[E1:ClassTag,E2:ClassTag,P]
    (g1: Gen[Seq[Seq[E1]]])(
     gt1 : (DStream[E1]) => DStream[E2])(
     assertions: (RDD[E1], RDD[E2]) => P)(
     implicit pv: P => Prop, rv : P => Result, 
     pp1: Seq[Seq[E1]] => Pretty, 
     ssc : StreamingContext, 
     parallelism : Parallelism): Prop
We can use it to define properties like the following simple example property, which checks that a DStream transformation computing the count for each batch is correctly defined:
def countProp(testSubject : DStream[Double] => DStream[Long]) = 
    DStreamProp.forAllAlways(
      Gen.listOfN(10,  Gen.listOfN(30, arbitrary[Double])))(
      testSubject)( 
      (inputBatch : RDD[Double], transBatch : RDD[Long]) => {
        transBatch.count === 1 and
        inputBatch.count === transBatch.first
      }).set(minTestsOk = 10).verbose  

This first proposal still leaves some open issues. First of all, currently DStreamProp.forAllAlways only supports a single generator and a single derived DStream: this should be overloaded to support more arities.
Also, we still have to deal with the problem of non-uniform batch execution time. Holden Karau's spark-testing-base Spark package solves this by implementing a hack to accelerate the completion of a batch when all the expected output records have already been generated. Another option would be parallelizing the execution of several test cases at the same time, which should lead to a more uniform computation time at each batch, as a slower batch would be compensated by a faster batch being executed at the same time. We have made some preliminary work on that line.
Besides, Spark launches weird exceptions when the batch interval is too small for the machine running the test, that is unable to keep up processing the batches. This is unavoidable to some extent, and can be easily solved by tuning the batch interval, but some care should be taken to ensure tests always fail in that situation.

Finally, testing the same assertions for all the batches is very rigid. New ScalaCheck higher order properties should be developed to allow for more flexible testing scenarios. This is something we are planning for the upcoming new release of sscheck in a couple of weeks.


Monday, July 6, 2015

Property-based testing with Spark

Property-based testing is a program testing paradigm made popular by John Hughes with the QuickCheck tool, and available for Scala as the library ScalaCheck. Classical unit testing with xUnit-like frameworks is usually based on specifying input - expected output pairs, and then comparing the expected output with the observed output that is obtained by applying the corresponding input to the test subject. On the other hand, in property based testing we specify a property that relates input and outputs, and then the property is checked againts a bunch of inputs that are randomly synthesized by the testing framework. This can be easily understood with a concrete property, like the following paradigmatic example in which we check that the reverse of the reverse of a list is the same as the original list.
class ListReverseTest extends Specification
                      with ScalaCheck {
  "Testing operations on lists".title ^
    "the reverse of the reverse of a list is the same list" 
      ! listRevRevProp
    
  def listRevRev[A](xs : List[A]) : List[A] = xs.reverse.reverse
  
  def listRevRevProp =
    Prop.forAll ("xs" |: Gen.listOf(arbitrary[Int])) { 
      xs : List[Int] =>
        xs === listRevRev(xs)
      }. set(minTestsOk = 200).verbose
}
ScalaCheck can be used independently, but in the code above we use the integration of ScalaCheck with Specs2 to make the property part of a Specs2 specification. We can execute this property with sbt as follows.


> test-only blogspot.ListReverseTest
[info] Testing operations on lists
[info] 
[info] + the reverse of the reverse of list, is also a list
[info] OK, passed 200 tests.
[info] 
[info] Total for specification Testing operations on lists
[info] Finished in 700 ms
[info] 1 example, 200 expectations, 0 failure, 0 error
[info] 
[info] ScalaTest
[info] Run completed in 1 second, 541 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 0
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] No tests were executed.
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 2 s, completed Jul 6, 2015 7:57:55 PM
As seen in the output above, ScalaCheck generates 200 random list of arbitrary integers, and tests the property for each of those lists. The random generator tries to be clever and traverse a representative part of the search space corresponding to all possible integer lists, first covering basic corner cases like the empty list or the list of one element, and then proceeding with lists of increasing sizes. While in unit testing we would only consider a small number of explicitly stated input values, the idea is that with property-based testing we gain more confidence about the test because the test subject is exercised for a much higher number input values, and hopefully the search space is more thoroughly explored. In practice, unit testing and property-based testing can be easily combined, reserving unit testing for checking those carefully handcrafted input values that are required to ensure that some critical corner cases are covered.

Applying ScalaCheck for testing programs that manipulate Spark RDDs is quite easy. The main difficulty is ensuring that the Spark context is shared by all the test cases. If we are going to generate around 100 test cases per ScalaCheck property, creating a new Spark context per test case wouldn't be practical, because then test execution would take a lot of time. Besides, due to SPARK-2243 we cannot have more than a Spark context running on the same JVM, hence we have to ensure that the shared Spark context is properly closed after all the properties in a Specification have been exercised. Although the ScalaCheck API allows to register callbacks to be executed after each test case or property is evaluated, callback settings as well as other property execution settings are overridden when properties are run from sbt. For these reasons, using the integration with Specs2, and Specs2's BeforeAfterAll trait leads to an easier and more robust solution. You can take a look to SharedSparkContextBeforeAfterAll for a solution based on Specs2. This trait provides a method sc that can be used to parallelize lists generated with the built-in ScalaCheck generators. The Spark master or the parallelism level (default number of Spark partitions used to parallelize sequences) can also be customized by overriding the corresponding method. That Spark context is also available as an implicit value, that can be then used with the implicit conversions and generator provided by the object RDDGen, that are basically shortcuts to parallelize lists generated by built-in ScalaCheck generators:
object RDDGen { 
  /** Convert a ScalaCheck generator of Seq into a generator of RDD   
   * */
  implicit def seqGen2RDDGen[A](sg : Gen[Seq[A]])
              (implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism) : Gen[RDD[A]] =
    sg.map(sc.parallelize(_, numSlices = parallelism.numSlices))
  
  /** Convert a sequence into a RDD    
   * */
  implicit def seq2RDD[A](seq : Seq[A])(implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism) : RDD[A] = 
    sc.parallelize(seq, numSlices=parallelism.numSlices)
    
  /** @returns a generator of RDD that generates its elements from g
   * */
  def of[A](g : => Gen[A])
           (implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism) 
           : Gen[RDD[A]] = 
    // this way is much simpler that implementing this with a 
    // ScalaCheck Buildable, because that implies defining a 
    // wrapper to convert RDD into Traversable
    seqGen2RDDGen(Gen.listOf(g))
    
  /** @returns a generator of RDD that generates its elements from g
  * */
  def ofN[A](n : Int, g : Gen[A])
     (implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism)
     : Gen[RDD[A]] = {
    seqGen2RDDGen(Gen.listOfN(n, g))
  }
  
   /** @returns a generator of RDD that generates its elements from g
  * */
  def ofNtoM[A](n : Int, m : Int, g : => Gen[A]) 
        (implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism)
        : Gen[RDD[A]] = 
    seqGen2RDDGen(UtilsGen.containerOfNtoM[List, A](n, m, g))
}
We can use RDDGen to define properties like the following, that checks that the method DoubleRDDFunctions.mean works as expected:
def forallRDDGenOfNFreqMean = {
    val freqs = Map(1 -> 0, 4 -> 1)
    val rddSize = 200
    val gRDDFreq = RDDGen.ofN(rddSize, Gen.frequency(freqs.mapValues(Gen.const(_)).toSeq:_*))
    val expectedMean = {
      val freqS = freqs.toSeq
      val num = freqS .map({case (f, v) => v * f}). sum
      val den = freqS .map(_._1). sum
      num / den.toDouble
    }  
    Prop.forAll("rdd" |: gRDDFreq){ rdd : RDD[Int] =>
      rdd.mean must be ~(expectedMean +/- 0.1) 
    }
  }. set(minTestsOk = 50).verbose 
This and other simple properties are included in the test class SharedSparkContextBeforeAfterAllTest, which as seen below, exercises all the tests in a reasonable time:
15/07/06 21:00:27 INFO DAGScheduler: Job 807 finished: mean at SharedSparkContextBeforeAfterAllTest.scala:125, took 0.004391 s
stopping test Spark context
15/07/06 21:00:27 INFO SparkUI: Stopped Spark web UI at http://192.168.0.198:4040
15/07/06 21:00:27 INFO DAGScheduler: Stopping DAGScheduler
15/07/06 21:00:27 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/06 21:00:27 INFO Utils: path = /private/var/folders/38/x3zvqgyn2s33ym6j6qnb0j3r0000gp/T/spark-32fbbd76-b84d-4f11-ad51-0b903db2fcd7/blockmgr-8556e289-4fc7-448f-b96a-8e1f689b3c0e, already present as root for deletion.
15/07/06 21:00:27 INFO MemoryStore: MemoryStore cleared
15/07/06 21:00:27 INFO BlockManager: BlockManager stopped
15/07/06 21:00:27 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/06 21:00:27 INFO SparkContext: Successfully stopped SparkContext
15/07/06 21:00:27 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
15/07/06 21:00:27 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/07/06 21:00:27 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/07/06 21:00:27 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
[info] Sharing a Spark Context between several ScalaCheck properties and test cases, and closing it properly
[info] 
[info] + forall that ignores the Spark context
[info] OK, passed 101 tests.+ simple test that uses the Spark context explicitly+ forall that uses the Spark context explicitly, and parallelizes a Seq explicitly
[info] OK, passed 50 tests.+ forall that uses the Spark context from this, and parallelizes a Seq explicitly
[info] OK, passed 50 tests.+ forall that parallelizes a Seq with an implicit
[info] OK, passed 50 tests.+ forall with implicit conversion of Seq generator to RDD generator
[info] OK, passed 50 tests.+ forall that uses RDDGen.of
[info] OK, passed 10 tests.+ forall that uses RDDGen.of with local overload of parallelism
[info] OK, passed 10 tests.+ forall that uses RDDGen.ofNtoM
[info] OK, passed 101 tests.+ forall that uses RDDGen.ofN, testing frequency generator
[info] OK, passed 50 tests.
[info] 
[info] Total for specification Sharing a Spark Context between several ScalaCheck properties and test cases, and closing it properly
[info] Finished in 31 seconds, 348 ms
[info] 12 examples, 475 expectations, 0 failure, 0 error
[info] 
[info] ScalaTest
[info] Run completed in 36 seconds, 326 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 0
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] No tests were executed.
[info] Passed: Total 10, Failed 0, Errors 0, Passed 10
[success] Total time: 37 s, completed Jul 6, 2015 9:00:27 PM
Another alternative solution based on the integration of ScalaCheck with ScalaTest could be possible, but that integration doesn't cover all the possible ScalaCheck properties (for example there is no support for ScalaCheck's Prop.exists), while on the other hand Specs2's ScalaCheckProp is able to wrap arbitrary ScalaCheck Prop values.

Finally, using parallelExecution := false in build.sbt is currently required, because otherwise sbt could run several test suites in different threads, thus having more than one Spark context running at the same time, kicking SPARK-2243. The use of forking in sbt maybe could be an alternative that would that I also plan to investigate in the future, as it would imply creating several JVMs to achieve parallel test execution.