Monday, July 6, 2015

Property-based testing with Spark

Property-based testing is a program testing paradigm made popular by John Hughes with the QuickCheck tool, and available for Scala as the library ScalaCheck. Classical unit testing with xUnit-like frameworks is usually based on specifying input - expected output pairs, and then comparing the expected output with the observed output that is obtained by applying the corresponding input to the test subject. On the other hand, in property based testing we specify a property that relates input and outputs, and then the property is checked againts a bunch of inputs that are randomly synthesized by the testing framework. This can be easily understood with a concrete property, like the following paradigmatic example in which we check that the reverse of the reverse of a list is the same as the original list.
class ListReverseTest extends Specification
                      with ScalaCheck {
  "Testing operations on lists".title ^
    "the reverse of the reverse of a list is the same list" 
      ! listRevRevProp
    
  def listRevRev[A](xs : List[A]) : List[A] = xs.reverse.reverse
  
  def listRevRevProp =
    Prop.forAll ("xs" |: Gen.listOf(arbitrary[Int])) { 
      xs : List[Int] =>
        xs === listRevRev(xs)
      }. set(minTestsOk = 200).verbose
}
ScalaCheck can be used independently, but in the code above we use the integration of ScalaCheck with Specs2 to make the property part of a Specs2 specification. We can execute this property with sbt as follows.


> test-only blogspot.ListReverseTest
[info] Testing operations on lists
[info] 
[info] + the reverse of the reverse of list, is also a list
[info] OK, passed 200 tests.
[info] 
[info] Total for specification Testing operations on lists
[info] Finished in 700 ms
[info] 1 example, 200 expectations, 0 failure, 0 error
[info] 
[info] ScalaTest
[info] Run completed in 1 second, 541 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 0
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] No tests were executed.
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 2 s, completed Jul 6, 2015 7:57:55 PM
As seen in the output above, ScalaCheck generates 200 random list of arbitrary integers, and tests the property for each of those lists. The random generator tries to be clever and traverse a representative part of the search space corresponding to all possible integer lists, first covering basic corner cases like the empty list or the list of one element, and then proceeding with lists of increasing sizes. While in unit testing we would only consider a small number of explicitly stated input values, the idea is that with property-based testing we gain more confidence about the test because the test subject is exercised for a much higher number input values, and hopefully the search space is more thoroughly explored. In practice, unit testing and property-based testing can be easily combined, reserving unit testing for checking those carefully handcrafted input values that are required to ensure that some critical corner cases are covered.

Applying ScalaCheck for testing programs that manipulate Spark RDDs is quite easy. The main difficulty is ensuring that the Spark context is shared by all the test cases. If we are going to generate around 100 test cases per ScalaCheck property, creating a new Spark context per test case wouldn't be practical, because then test execution would take a lot of time. Besides, due to SPARK-2243 we cannot have more than a Spark context running on the same JVM, hence we have to ensure that the shared Spark context is properly closed after all the properties in a Specification have been exercised. Although the ScalaCheck API allows to register callbacks to be executed after each test case or property is evaluated, callback settings as well as other property execution settings are overridden when properties are run from sbt. For these reasons, using the integration with Specs2, and Specs2's BeforeAfterAll trait leads to an easier and more robust solution. You can take a look to SharedSparkContextBeforeAfterAll for a solution based on Specs2. This trait provides a method sc that can be used to parallelize lists generated with the built-in ScalaCheck generators. The Spark master or the parallelism level (default number of Spark partitions used to parallelize sequences) can also be customized by overriding the corresponding method. That Spark context is also available as an implicit value, that can be then used with the implicit conversions and generator provided by the object RDDGen, that are basically shortcuts to parallelize lists generated by built-in ScalaCheck generators:
object RDDGen { 
  /** Convert a ScalaCheck generator of Seq into a generator of RDD   
   * */
  implicit def seqGen2RDDGen[A](sg : Gen[Seq[A]])
              (implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism) : Gen[RDD[A]] =
    sg.map(sc.parallelize(_, numSlices = parallelism.numSlices))
  
  /** Convert a sequence into a RDD    
   * */
  implicit def seq2RDD[A](seq : Seq[A])(implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism) : RDD[A] = 
    sc.parallelize(seq, numSlices=parallelism.numSlices)
    
  /** @returns a generator of RDD that generates its elements from g
   * */
  def of[A](g : => Gen[A])
           (implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism) 
           : Gen[RDD[A]] = 
    // this way is much simpler that implementing this with a 
    // ScalaCheck Buildable, because that implies defining a 
    // wrapper to convert RDD into Traversable
    seqGen2RDDGen(Gen.listOf(g))
    
  /** @returns a generator of RDD that generates its elements from g
  * */
  def ofN[A](n : Int, g : Gen[A])
     (implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism)
     : Gen[RDD[A]] = {
    seqGen2RDDGen(Gen.listOfN(n, g))
  }
  
   /** @returns a generator of RDD that generates its elements from g
  * */
  def ofNtoM[A](n : Int, m : Int, g : => Gen[A]) 
        (implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism)
        : Gen[RDD[A]] = 
    seqGen2RDDGen(UtilsGen.containerOfNtoM[List, A](n, m, g))
}
We can use RDDGen to define properties like the following, that checks that the method DoubleRDDFunctions.mean works as expected:
def forallRDDGenOfNFreqMean = {
    val freqs = Map(1 -> 0, 4 -> 1)
    val rddSize = 200
    val gRDDFreq = RDDGen.ofN(rddSize, Gen.frequency(freqs.mapValues(Gen.const(_)).toSeq:_*))
    val expectedMean = {
      val freqS = freqs.toSeq
      val num = freqS .map({case (f, v) => v * f}). sum
      val den = freqS .map(_._1). sum
      num / den.toDouble
    }  
    Prop.forAll("rdd" |: gRDDFreq){ rdd : RDD[Int] =>
      rdd.mean must be ~(expectedMean +/- 0.1) 
    }
  }. set(minTestsOk = 50).verbose 
This and other simple properties are included in the test class SharedSparkContextBeforeAfterAllTest, which as seen below, exercises all the tests in a reasonable time:
15/07/06 21:00:27 INFO DAGScheduler: Job 807 finished: mean at SharedSparkContextBeforeAfterAllTest.scala:125, took 0.004391 s
stopping test Spark context
15/07/06 21:00:27 INFO SparkUI: Stopped Spark web UI at http://192.168.0.198:4040
15/07/06 21:00:27 INFO DAGScheduler: Stopping DAGScheduler
15/07/06 21:00:27 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/06 21:00:27 INFO Utils: path = /private/var/folders/38/x3zvqgyn2s33ym6j6qnb0j3r0000gp/T/spark-32fbbd76-b84d-4f11-ad51-0b903db2fcd7/blockmgr-8556e289-4fc7-448f-b96a-8e1f689b3c0e, already present as root for deletion.
15/07/06 21:00:27 INFO MemoryStore: MemoryStore cleared
15/07/06 21:00:27 INFO BlockManager: BlockManager stopped
15/07/06 21:00:27 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/06 21:00:27 INFO SparkContext: Successfully stopped SparkContext
15/07/06 21:00:27 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
15/07/06 21:00:27 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/07/06 21:00:27 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/07/06 21:00:27 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
[info] Sharing a Spark Context between several ScalaCheck properties and test cases, and closing it properly
[info] 
[info] + forall that ignores the Spark context
[info] OK, passed 101 tests.+ simple test that uses the Spark context explicitly+ forall that uses the Spark context explicitly, and parallelizes a Seq explicitly
[info] OK, passed 50 tests.+ forall that uses the Spark context from this, and parallelizes a Seq explicitly
[info] OK, passed 50 tests.+ forall that parallelizes a Seq with an implicit
[info] OK, passed 50 tests.+ forall with implicit conversion of Seq generator to RDD generator
[info] OK, passed 50 tests.+ forall that uses RDDGen.of
[info] OK, passed 10 tests.+ forall that uses RDDGen.of with local overload of parallelism
[info] OK, passed 10 tests.+ forall that uses RDDGen.ofNtoM
[info] OK, passed 101 tests.+ forall that uses RDDGen.ofN, testing frequency generator
[info] OK, passed 50 tests.
[info] 
[info] Total for specification Sharing a Spark Context between several ScalaCheck properties and test cases, and closing it properly
[info] Finished in 31 seconds, 348 ms
[info] 12 examples, 475 expectations, 0 failure, 0 error
[info] 
[info] ScalaTest
[info] Run completed in 36 seconds, 326 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 0
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] No tests were executed.
[info] Passed: Total 10, Failed 0, Errors 0, Passed 10
[success] Total time: 37 s, completed Jul 6, 2015 9:00:27 PM
Another alternative solution based on the integration of ScalaCheck with ScalaTest could be possible, but that integration doesn't cover all the possible ScalaCheck properties (for example there is no support for ScalaCheck's Prop.exists), while on the other hand Specs2's ScalaCheckProp is able to wrap arbitrary ScalaCheck Prop values.

Finally, using parallelExecution := false in build.sbt is currently required, because otherwise sbt could run several test suites in different threads, thus having more than one Spark context running at the same time, kicking SPARK-2243. The use of forking in sbt maybe could be an alternative that would that I also plan to investigate in the future, as it would imply creating several JVMs to achieve parallel test execution.

No comments:

Post a Comment