Thursday, August 27, 2015

Property-based testing with Spark Streaming

In the previous post we saw how to use ScalaCheck for performing property-based testing on batch Spark programs. This post is about using ScalaCheck with Spark Streaming. But first let's recapitulate a bit. Just a little reminder that in property-based testing the assertions about single input-output pairs often used in unit testing, are replaced by properties that relate randomly synthesized inputs with the outputs obtained by applying the test subject.

As seen in the previous post, the main difficulty to integrate ScalaCheck with Spark lies in sharing a Spark context, which provides a handler to the Spark cluster, among all the ScalaCheck test cases. This is needed because creating a Spark context is an expensive operation, and because only a single Spark context should be running per JVM. Sharing a connection to an expensive shared resource is common in unit testing, so we were able to easily solve that problem with Specs2, by creating a trait SharedSparkContextBeforeAfterAll extending org.specs2.specification.BeforeAfterAll, that handles the Spark context lifecycle. When using ScalaCheck with Spark Streaming we can reuse the same solution, but we also have to face other less common problems:
  • Besides a Spark context, Spark Streaming programs use a streaming context to define the computations, which are transformations or periodic actions executed over series of RDDs that contain small batches of records. Those series of RDD are called DStreams which stands for discretized streams. Each streaming context is associated to a Spark context, and it is a lightweight object that can be created quickly, but it has to be started and stopped explicitly, not only created. Besides, all the transformations and actions on DStreams have to be defined before the streaming context has started. Finally, only a single streaming context can be active per JVM, and streaming contexts cannot be restarted. This is a complex lifecycle that needs to be handled with care. 
  • Spark Streaming programs are designed to run forever, hence we need to a way to determine when all the assertions relevant to a test case have been executed completely, so we can then finish the test case by stopping the streaming context. 
  • DStream batches are generated with a fixed frequency that we call the "batch interval". All the batches are expected to be completed at the same speed but, as we are generating random test cases, then we'll often have batches significantly bigger than others, so in practice some batches will be computed faster than others. And anyway, in general some input values are faster to compute than others (as an extreme example consider a transformation that computes the i-th prime number for each input number i).  As the chosen batch interval must leave enough time to compute the slowest batches, this might lead to wasting time when computing the fastest batches, and tests not running as fast as they should.
  • As DStreams are meant to run nonstop, the Spark Streaming runtime captures any exception generated when computing a batch, to prevent stopping the computation. That includes exceptions generated by Specs2 matchers, or simply unexpected exceptions that would normally lead to a failing test case, so some care must be taken to ensure those exceptions are not hidden by Spark's well-intentioned runtime. 

But it's not all bad news. We can reuse the same Spark context for several streaming contexts. That leads naturally to a test life cycle where we create and stop Spark contexts with BeforeAfterAll, and create and stop Spark streaming contexts with BeforeAfterEach, that is encapsulated in the trait SharedStreamingContextBeforeAfterEach. We still need to manually start the streaming context in the body of the test case, but only after declaring the derived DStreams defined by applying the test subject, and the actions that apply the assertions that characterize the test. To avoid losing the results of the matchers, due to Spark captured exceptions, we can use Specs2's Result type, starting from ok and combining the result obtained for each batch with Specs2's and operator. Finally, in order to determine when the test can finish, we can register a StreamingListener in the streaming context, that notifies a SyncVar each time a batch is completed, so we can use the SyncVar to block waiting for completion of a fixed number of batches: this is implemented in the method awaitForNBatchesCompleted of the object StreamingContextUtils. That is all we need to define simple unit test for Spark Streaming like the one below, which checks some obvious properties on the input DStream, and a derived DStream. Note also the use of the custom Specs2 matcher foreachRecord, that checks whether a predicate holds for all the records of an RDD.
  def successfulSimpleQueueStreamTest = simpleQueueStreamTest(expectedCount = 0)
  def failingSimpleQueueStreamTest = simpleQueueStreamTest(expectedCount = 1) must beFailing
  def simpleQueueStreamTest(expectedCount : Int) : Result = {
    val record = "hola"
    val batches = Seq.fill(5)(Seq.fill(10)(record))
    val queue = new Queue[RDD[String]]
    queue ++= => sc.parallelize(batch, numSlices = defaultParallelism))
    val inputDStream = ssc.queueStream(queue, oneAtATime = true)
    val sizesDStream =
    var batchCount = 0
    var result : Result = ok
    inputDStream.foreachRDD { rdd =>
      batchCount += 1
      println(s"completed batch number $batchCount: ${rdd.collect.mkString(",")}")
      result = result and {
        rdd.filter(_!= record).count() === expectedCount
        rdd should existsRecord(_ == "hola")
    sizesDStream.foreachRDD { rdd =>
      result = result and { 
        rdd should foreachRecord(record.length)(len => _ == len)      
    // should only start the dstream after all the transformations and actions have been defined
    // wait for completion of batches.length batches
    StreamingContextUtils.awaitForNBatchesCompleted(batches.length, atMost = 10 seconds)(ssc)
But we still haven't been able to define a ScalaCheck property for a DStream transformation. First of all we need to have a clear notion of what it is a test case for a DStream. This is not completely obvious, because DStreams are meant to run indefinitely, while test cases should be executed in a finite time. We have chosen to define that a DStream test case is a finite prefix of an infinite intended DStream. Hence any ScalaCheck generator for Seq[Seq[T]] can be interpreted as a generator of DStream[T]. Now we need a way to create a DStream per each test case. We could try using StreamingContext.queueStream for that, creating and stopping a new streaming context per each test case, which is not compatible with the lifecycle defined by SharedStreamingContextBeforeAfterEach. We could generate all the test cases before running the property, and then generating the corresponding batches with a queue DStream, but then we would generate test cases that won't be used if the property fails before reaching them, thus wasting memory and CPU. For generating test cases on demand we could use a custom actor Spark receiver, and send the batches as messages to the actor, that would act as a proxy for the corresponding receiver. But that doesn't work because the actor buffers the records, and doesn't respects the integrity of the test cases because batches end up intermingled.
In the end our solution was writing a custom InputDStream that is basically a variation of QueueInputDStream that allows dynamic addition of test cases as Seq[Seq[T]] objects. Combining this with the ideas above for developing unit tests for Spark Streaming, we have developed a first ScalaCheck higher order property for Spark Streaming. The function DStreamProp.forAllAlways uses g1 to generate test cases corresponding to prefixes of an input DStream, then employs gt1 to define a derived DStream, and checks that for each of the test cases the corresponding assertions defined by assertions hold for all the batches.
def forAllAlways[E1:ClassTag,E2:ClassTag,P]
    (g1: Gen[Seq[Seq[E1]]])(
     gt1 : (DStream[E1]) => DStream[E2])(
     assertions: (RDD[E1], RDD[E2]) => P)(
     implicit pv: P => Prop, rv : P => Result, 
     pp1: Seq[Seq[E1]] => Pretty, 
     ssc : StreamingContext, 
     parallelism : Parallelism): Prop
We can use it to define properties like the following simple example property, which checks that a DStream transformation computing the count for each batch is correctly defined:
def countProp(testSubject : DStream[Double] => DStream[Long]) = 
      Gen.listOfN(10,  Gen.listOfN(30, arbitrary[Double])))(
      (inputBatch : RDD[Double], transBatch : RDD[Long]) => {
        transBatch.count === 1 and
        inputBatch.count === transBatch.first
      }).set(minTestsOk = 10).verbose  

This first proposal still leaves some open issues. First of all, currently DStreamProp.forAllAlways only supports a single generator and a single derived DStream: this should be overloaded to support more arities.
Also, we still have to deal with the problem of non-uniform batch execution time. Holden Karau's spark-testing-base Spark package solves this by implementing a hack to accelerate the completion of a batch when all the expected output records have already been generated. Another option would be parallelizing the execution of several test cases at the same time, which should lead to a more uniform computation time at each batch, as a slower batch would be compensated by a faster batch being executed at the same time. We have made some preliminary work on that line.
Besides, Spark launches weird exceptions when the batch interval is too small for the machine running the test, that is unable to keep up processing the batches. This is unavoidable to some extent, and can be easily solved by tuning the batch interval, but some care should be taken to ensure tests always fail in that situation.

Finally, testing the same assertions for all the batches is very rigid. New ScalaCheck higher order properties should be developed to allow for more flexible testing scenarios. This is something we are planning for the upcoming new release of sscheck in a couple of weeks.

Monday, July 6, 2015

Property-based testing with Spark

Property-based testing is a program testing paradigm made popular by John Hughes with the QuickCheck tool, and available for Scala as the library ScalaCheck. Classical unit testing with xUnit-like frameworks is usually based on specifying input - expected output pairs, and then comparing the expected output with the observed output that is obtained by applying the corresponding input to the test subject. On the other hand, in property based testing we specify a property that relates input and outputs, and then the property is checked againts a bunch of inputs that are randomly synthesized by the testing framework. This can be easily understood with a concrete property, like the following paradigmatic example in which we check that the reverse of the reverse of a list is the same as the original list.
class ListReverseTest extends Specification
                      with ScalaCheck {
  "Testing operations on lists".title ^
    "the reverse of the reverse of a list is the same list" 
      ! listRevRevProp
  def listRevRev[A](xs : List[A]) : List[A] = xs.reverse.reverse
  def listRevRevProp =
    Prop.forAll ("xs" |: Gen.listOf(arbitrary[Int])) { 
      xs : List[Int] =>
        xs === listRevRev(xs)
      }. set(minTestsOk = 200).verbose
ScalaCheck can be used independently, but in the code above we use the integration of ScalaCheck with Specs2 to make the property part of a Specs2 specification. We can execute this property with sbt as follows.

> test-only blogspot.ListReverseTest
[info] Testing operations on lists
[info] + the reverse of the reverse of list, is also a list
[info] OK, passed 200 tests.
[info] Total for specification Testing operations on lists
[info] Finished in 700 ms
[info] 1 example, 200 expectations, 0 failure, 0 error
[info] ScalaTest
[info] Run completed in 1 second, 541 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 0
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] No tests were executed.
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 2 s, completed Jul 6, 2015 7:57:55 PM
As seen in the output above, ScalaCheck generates 200 random list of arbitrary integers, and tests the property for each of those lists. The random generator tries to be clever and traverse a representative part of the search space corresponding to all possible integer lists, first covering basic corner cases like the empty list or the list of one element, and then proceeding with lists of increasing sizes. While in unit testing we would only consider a small number of explicitly stated input values, the idea is that with property-based testing we gain more confidence about the test because the test subject is exercised for a much higher number input values, and hopefully the search space is more thoroughly explored. In practice, unit testing and property-based testing can be easily combined, reserving unit testing for checking those carefully handcrafted input values that are required to ensure that some critical corner cases are covered.

Applying ScalaCheck for testing programs that manipulate Spark RDDs is quite easy. The main difficulty is ensuring that the Spark context is shared by all the test cases. If we are going to generate around 100 test cases per ScalaCheck property, creating a new Spark context per test case wouldn't be practical, because then test execution would take a lot of time. Besides, due to SPARK-2243 we cannot have more than a Spark context running on the same JVM, hence we have to ensure that the shared Spark context is properly closed after all the properties in a Specification have been exercised. Although the ScalaCheck API allows to register callbacks to be executed after each test case or property is evaluated, callback settings as well as other property execution settings are overridden when properties are run from sbt. For these reasons, using the integration with Specs2, and Specs2's BeforeAfterAll trait leads to an easier and more robust solution. You can take a look to SharedSparkContextBeforeAfterAll for a solution based on Specs2. This trait provides a method sc that can be used to parallelize lists generated with the built-in ScalaCheck generators. The Spark master or the parallelism level (default number of Spark partitions used to parallelize sequences) can also be customized by overriding the corresponding method. That Spark context is also available as an implicit value, that can be then used with the implicit conversions and generator provided by the object RDDGen, that are basically shortcuts to parallelize lists generated by built-in ScalaCheck generators:
object RDDGen { 
  /** Convert a ScalaCheck generator of Seq into a generator of RDD   
   * */
  implicit def seqGen2RDDGen[A](sg : Gen[Seq[A]])
              (implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism) : Gen[RDD[A]] =, numSlices = parallelism.numSlices))
  /** Convert a sequence into a RDD    
   * */
  implicit def seq2RDD[A](seq : Seq[A])(implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism) : RDD[A] = 
    sc.parallelize(seq, numSlices=parallelism.numSlices)
  /** @returns a generator of RDD that generates its elements from g
   * */
  def of[A](g : => Gen[A])
           (implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism) 
           : Gen[RDD[A]] = 
    // this way is much simpler that implementing this with a 
    // ScalaCheck Buildable, because that implies defining a 
    // wrapper to convert RDD into Traversable
  /** @returns a generator of RDD that generates its elements from g
  * */
  def ofN[A](n : Int, g : Gen[A])
     (implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism)
     : Gen[RDD[A]] = {
    seqGen2RDDGen(Gen.listOfN(n, g))
   /** @returns a generator of RDD that generates its elements from g
  * */
  def ofNtoM[A](n : Int, m : Int, g : => Gen[A]) 
        (implicit aCt: ClassTag[A], sc : SparkContext, parallelism : Parallelism)
        : Gen[RDD[A]] = 
    seqGen2RDDGen(UtilsGen.containerOfNtoM[List, A](n, m, g))
We can use RDDGen to define properties like the following, that checks that the method DoubleRDDFunctions.mean works as expected:
def forallRDDGenOfNFreqMean = {
    val freqs = Map(1 -> 0, 4 -> 1)
    val rddSize = 200
    val gRDDFreq = RDDGen.ofN(rddSize, Gen.frequency(freqs.mapValues(Gen.const(_)).toSeq:_*))
    val expectedMean = {
      val freqS = freqs.toSeq
      val num = freqS .map({case (f, v) => v * f}). sum
      val den = freqS .map(_._1). sum
      num / den.toDouble
    Prop.forAll("rdd" |: gRDDFreq){ rdd : RDD[Int] =>
      rdd.mean must be ~(expectedMean +/- 0.1) 
  }. set(minTestsOk = 50).verbose 
This and other simple properties are included in the test class SharedSparkContextBeforeAfterAllTest, which as seen below, exercises all the tests in a reasonable time:
15/07/06 21:00:27 INFO DAGScheduler: Job 807 finished: mean at SharedSparkContextBeforeAfterAllTest.scala:125, took 0.004391 s
stopping test Spark context
15/07/06 21:00:27 INFO SparkUI: Stopped Spark web UI at
15/07/06 21:00:27 INFO DAGScheduler: Stopping DAGScheduler
15/07/06 21:00:27 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/06 21:00:27 INFO Utils: path = /private/var/folders/38/x3zvqgyn2s33ym6j6qnb0j3r0000gp/T/spark-32fbbd76-b84d-4f11-ad51-0b903db2fcd7/blockmgr-8556e289-4fc7-448f-b96a-8e1f689b3c0e, already present as root for deletion.
15/07/06 21:00:27 INFO MemoryStore: MemoryStore cleared
15/07/06 21:00:27 INFO BlockManager: BlockManager stopped
15/07/06 21:00:27 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/06 21:00:27 INFO SparkContext: Successfully stopped SparkContext
15/07/06 21:00:27 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
15/07/06 21:00:27 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
15/07/06 21:00:27 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
15/07/06 21:00:27 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
[info] Sharing a Spark Context between several ScalaCheck properties and test cases, and closing it properly
[info] + forall that ignores the Spark context
[info] OK, passed 101 tests.+ simple test that uses the Spark context explicitly+ forall that uses the Spark context explicitly, and parallelizes a Seq explicitly
[info] OK, passed 50 tests.+ forall that uses the Spark context from this, and parallelizes a Seq explicitly
[info] OK, passed 50 tests.+ forall that parallelizes a Seq with an implicit
[info] OK, passed 50 tests.+ forall with implicit conversion of Seq generator to RDD generator
[info] OK, passed 50 tests.+ forall that uses RDDGen.of
[info] OK, passed 10 tests.+ forall that uses RDDGen.of with local overload of parallelism
[info] OK, passed 10 tests.+ forall that uses RDDGen.ofNtoM
[info] OK, passed 101 tests.+ forall that uses RDDGen.ofN, testing frequency generator
[info] OK, passed 50 tests.
[info] Total for specification Sharing a Spark Context between several ScalaCheck properties and test cases, and closing it properly
[info] Finished in 31 seconds, 348 ms
[info] 12 examples, 475 expectations, 0 failure, 0 error
[info] ScalaTest
[info] Run completed in 36 seconds, 326 milliseconds.
[info] Total number of tests run: 0
[info] Suites: completed 0, aborted 0
[info] Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
[info] No tests were executed.
[info] Passed: Total 10, Failed 0, Errors 0, Passed 10
[success] Total time: 37 s, completed Jul 6, 2015 9:00:27 PM
Another alternative solution based on the integration of ScalaCheck with ScalaTest could be possible, but that integration doesn't cover all the possible ScalaCheck properties (for example there is no support for ScalaCheck's Prop.exists), while on the other hand Specs2's ScalaCheckProp is able to wrap arbitrary ScalaCheck Prop values.

Finally, using parallelExecution := false in build.sbt is currently required, because otherwise sbt could run several test suites in different threads, thus having more than one Spark context running at the same time, kicking SPARK-2243. The use of forking in sbt maybe could be an alternative that would that I also plan to investigate in the future, as it would imply creating several JVMs to achieve parallel test execution.

Wednesday, August 6, 2014

Simple SVG charts with HBase REST service, Flask and Pygal

A little less conversation and a little more action in this post. I wanted to have a flexible way to define simple charts for small HBase tables. Maybe using HBase for small data might sound crazy, but by doing so we can take advantage of its flexible NoSQL schema. So let’s exploit HBase's REST service (see also here) for this, first of all we have to launch the service with the following command:
[cloudera@localhost ~]$ hbase rest start -ro -p 9998
14/08/06 14:49:33 INFO util.VersionInfo: HBase 0.94.6-cdh4.4.0
That starts the HBase REST server in read only mode, and serving at port 9998. This service is started by default in some distributions like HDP2. Once the service is started, the idea is defining a mapping from service responses to some charts. For that we might use matplotlib combined with Flask like this:
def plotOne():
    fig = Figure()
    axis = fig.add_subplot(1, 1, 1), values)

    canvas = FigureCanvas(fig)
    output = StringIO.StringIO()
    response = make_response(output.getvalue())
    response.mimetype = 'image/png'
    return response
But I wanted something simpler, and I found Pygal: it offers nice SVG charts with a high level interface, some fancy animations, and Flask integration. The first chart is easy as pie(chart):
values = [2, 1, 0, 2, 5, 7]
def graph_something():
     bar_chart = pygal.Bar(style=DarkSolarizedStyle)
     bar_chart.add('Values', values)
     return bar_chart.render_response()

Now with some creative URL routing in Flask we can define moderately complex graphs just in the URL, thanks to the suffix globbing of HBase's REST service, and by using a simple html table in the Jinja2 template. Autorefresh is obtained simply with a <meta http-equiv="refresh" content="{{refresh_rate}}"> element in the template. So we get

for the URL http://localhost:9999/hbase/charts/localhost:9998/test_hbase_py_client/width/1500/cols/2/refresh/500/bar/Sites%20Visited/visits/bar/Info/info/keys/* , assuming a table created in hbase shell as
create 'test_hbase_py_client', 'info', 'visits'
put '${TABLE_NAME}', 'john', 'info:age', 42
put '${TABLE_NAME}', 'mary', 'info:age', 26
put '${TABLE_NAME}', 'john', '', 5
put '${TABLE_NAME}', 'john', '', 2
put '${TABLE_NAME}', 'mary', '', 4
put '${TABLE_NAME}', 'mary', '', 2
scan '${TABLE_NAME}'

The main idea for the mapping into a barchart is that each HBase row corresponds to a group of bars (a color in the chart), and that given a column family the quals for that column are the values in the x-axis, while the cell values correspond to the values for the y-axis. If several rows are specified then all the bars groups are displayed together with a different color per row key. 
Besides, to allow several charts the number of columns is specified followed by a sequence of chart specifications, which are triples (chart type, chart title, column family). Hence the URL http://localhost:9999/hbase/charts/localhost:9998/test_hbase_py_client/width/1500/cols/2/refresh/500/bar/Sites%20Visited/visits/bar/Info/info/keys/* means "read from the table test_hbase_py_client at the server localhost:9998; the chart table will be 1500 pixels width; use two columns and refresh the whole chart each 500 seconds; the first chart is a bar chart with tittle 'Sites Visited' and takes the values from the column family 'visit', the second chart is a bar chart titled 'Info' that reads from the column family 'info'; use all the keys found in the table". This URL mapping was implemented by combining Flask standard routing primitives with a custom URL converter (extending werkzeug.routing.BaseConverter)

For a more elaborate example, take a look at this simple Spark Streaming program (so simple it would be called script if it was written in Python ...), that populates an HBase table with a sliding window of one minute containing the mention count in Twitter for some musicians. 

As usual, you can find all the code for the post in my github repo, where you can see that the chart service is a single Python script. Now all that is left is extending the Python service to cover all the different types of Pygal charts, and calling a web designer so the chart page stops looking like a web page from the dotcom era.

We are hiring!

If you have enjoyed this post, you are interested in Big Data technologies, and you have a solid experience as a Java developer, take a look to this open position at my company.

Saturday, May 31, 2014

OLAP with Apache Phoenix and HBase

Some weeks ago a message was posted in the Apache Phoenix users mailing list requesting for examples of business intelligence visualization products that could be used with Phoenix. Apache Phoenix provides an SQL layer on top on HBase, as a JDBC client and a set of HBase coprocessors that enable the efficient execution of SQL queries by exploiting the parallelism offered by that mechanism. I suggested Saiku and an example configuration to connect Saiku to Phoenix,and I ended up being invited to write a guest post in Apache Phoenix's blog. Here you can find the result, a tutorial on using Phoenix and Saiku for implementing an OLAP system over HBase. Take a look if you're interested in using OLAP for performing flexible analysis and visualisation, over an horizontally scalable database like HBase.

try to ride on waves of activity in every direction
you're the center and you're always free in every direction

Sunday, April 20, 2014

Using Storm's Multilang Protocol to download popular tweets with Python

Apache Storm is a very popular open source stream processing framework, that allows us to construct real-time fault-tolerant distributed data processing systems very easily. The idea is that a stream is an infinite sequence of tuples, which are dictionaries from strings to values, and that streams of tuples are processed and generated by Storm topologies. A storm topology is a directed graph where each node (called topology component, to avoid confusion with the nodes of the Storm cluster I guess) is either a Spout or a Bolt. A Spout is a node that generates tuples from thin air (for example by connecting to a external service, reading from a log or database, connecting to scalable message bus like Apache Kafka ...), so spouts are used as the starting points for topologies. The rest of the topology components are Bolts, which accept tuples from spouts or other bolts, and generate more tuples from other bolts, or maybe just connect to some external system (for example a database or Apace Kafka) to act as a sink. The concepts section of the Storm wiki gives a much better introduction to the subject that anything I could say.
Stream processing systems like Storm are an important part of the Lambda Arquitecture for Big Data, and can be used for example to compute aproximation of metrics for the data that has been recently introduced in the system, and therefore couldn't have been processed yet by other batch processes, tipically in the form of MapReduce jobs. So it's pretty clear that Storm is a relevant system to be part of our data processing tool belt.

An interesting feature of Storm is that it is designed from the ground up to be able to use different programming languages in the topologies. As early as in the Storm tutorial we see that bolts can be defined in any language, that those bolts will be executed as subprocesses of the corresponding Storm supervisor daemon, and that comunication between the supervisor and subprocesses is based on JSON messages over stdin. That communication convention is formalized in the Multilang Protocol of Storm, which is very simple:
  • Messages are strings encoding JSON objects, and the end of any message is signed with a single line containing "end", which is not part of the preceding JSON.
  • There is an initial handsharke phase in which the parent supervisor process sends the JSON serializations of a Storm configuration object and a Storm TopologyContext object to the stdin of the child process. This somehow mimicks the call to IBolt.prepare() or that otherwise would be executed in Java. Besides, the parent process specifies a directory in the local file system were the children must create an empty file named after its PID. This gives the supervisor the information it needs to kill the process later if needed. The handshake ends when the child process sends a JSON object with its PID to the parent, through its stdout.
  • The rest of the communication is performed by sending JSON through stdin/stdout. For example the parent process will send the message
    {"command": "next"}
to a child spout to ask for the next tuple. So in order to implement a spout or bolt in any programming language we just have to write a program implementing that protocol from the perspective of the child process. There is an example bolt in Python in the Storm tutorial above, which is based on the Python storm module distributed with Storm, were the boilerplate of the protocol is implemented so we only have to extend a simple class to implement a bolt. The topology is still written using the Java API, and in fact the Python implementation of the bolt is invoked by a wrapper Java class that extends backtype.storm.task.ShellBolt and implements backtype.storm.topology.IRichBolt. So many layers! It's always difficult to escape from the JVM when you're working on Big Data.

underneath it all, we feel so small
the heavens fall, but still we crawl

Even when that example Python bolt is available, I've had difficulties locating an example spout in Python based on the corresponding backtype.storm.spout.ShellSpout. So I decided to stop searching and start developing my own example instead. An this is what this post is about.

The example I developed is a simple topology that obtains popular tweets from Twitter and stores them in MySQL for further analysis. As this is just a simple tutorial I will just consider the trending topics for a fixed set of locations, in this case 4 Spanish cities: Madrid, Barcelona, Donostia and Valencia. The starting spout, and several bolts, are written in Python using the storm module above, if you are interested you can find the full source at my github repo. Originally I wanted to implement the following topology:
  1. A TrensSpout obtains the trending topics for each location calling the Twitter API, and emits a tuple (place, topic name, topic query) for each trending topic, where the topic query is a query string to be used to ask twitter for tweets for that topic.
  2. Those tuples are received by a GetTweetsBolts, that performs the query to Twitter, makes some projections of the result for the interesting meta-information, and emits a tuple for each tweet with the tweet text and metainformation.
  3. Finally a DBStoreBolt persists the tweets in MySQL.
Unfortunately, for what it seems to be a bug in Storm's multilang protocol implementation, the task id is not sent to the spouts during the initial handshake. As a consequence the initial bolt cannot be parallelized as the locations Madrid/Barcelona/Donostia/Valencia  cannot be assigned to each spout instance based on the task id, as it is not available. My workaround consist in assuming that the initial spout won't be parallel, so TrensSpout is replaced by PlacesSpout, which just emits a tuple per each location with a fixed frequency. Then the next bolt TrendsBolt can be executed in parallel: it takes a location and then fetches the trending topics for that location. The topology definition in Java looks like this:
  // This spout has no parallelism 
builder.setSpout("PlacesSpout", new PlacesSpout(), 1);
builder.setBolt("TrendsBolt", new TrendsBolt(), 4).shuffleGrouping("PlacesSpout");
builder.setBolt("GetTweetsBolt", new GetTweetsBolt(), 4*2).shuffleGrouping("TrendsBolt"); 
builder.setBolt("DBStoreBolt", new DBStoreBolt(), 4*2).shuffleGrouping("GetTweetsBolt");
Note that shuffleGrouping is always used to connect the topology components, as there is no local state to be concerned about. Parallelism hints are kind of random, but it is clear that the number of tuples is multiplicated down the topology (per 10 for trending topics and per 15 for tweets).
In general the code is pretty easy. For example this is the Python sample spout I was looking for, composed by a wrapper Java class PlacesSpout
public PlacesSpout(int freq) {
  super("python2.7", "python/twitter_storm/");
  this.tickFrequency = freq;

public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields(TopologyFields.PLACE));

public Map getComponentConfiguration() {
  Config conf = new Config();
  conf.put(FREQUENCY_CONF_KEY, this.tickFrequency);
  return conf;
and the corresponding Python class PlacesSpout which does all the job.
class PlacesSpout(storm.Spout):
    _frequency_conf_key = "PlacesSpoutFrequency"
    def initialize(self, conf, context):
        self._places = get_tweets.available_places()
        self._tick_frequency = conf[self.__class__._frequency_conf_key]
    def nextTuple(self):
        for place in self._places:
Here we can also see how easy it is to pass configuration parameters to Python from the Java wrapper. Other things to take into account:
  • Note the use of a relative path to the python script in the constructor of the PlacesSpout Java class. Storm expects a resources folder to be placed at the root of the resulting jar, for using it as the parent for that relative path. Don't forget to configure your pom.xml for that.
  • Also in that constructor, we see the command to invoke Python and the name of the Python module to use as main. I wrote a dummy Python module for each spout and bolt, that just creates the corresponding instance and calls the run() method for it, to start waiting for the handshake message from the parent process:
  • from twitter_components import PlacesSpout
    if __name__ == '__main__':
        # Start this spout
    But now that I look at it, maybe a single module and an argument to switch the class would have been better ...
  • Note how tuples are emitted by calling the function storm.emit(), instead of some method inherited from the storm.Spout class, or for some object passed during the construction of that class. The storm Python module does not follow the design of the Java Storm API, which can be confusing at first, especially combined with the lack of comments or documentation. I have added some comments to the Python storm module and generated the corresponding documentation with Sphinx, you can find it here (with a terrible CSS configuration, I'm afraid, I would rather download it as it works locally, I haven't learnt Github Pages yet). Anyway the point is using the functions emit(), emitDirect() and reportError() to emulate the corresponding methods for SpoutOutputCollector, and the functions emit(), emitDirect(), ack(), fail() and reportError() to emulate the corresponding methods for OutputCollector.
Finally, for those interested, here is a thoughtful analysis of 3 minutes of tweets from yesterday:
mysql> select place, topic_name, sum(favorite_count) as sum_favs from storm_tweets group by place, topic_name order by place, sum_favs desc ;
| place     | topic_name                          | sum_favs |
| Barcelona | You&I                               |   939324 |
| Barcelona | Spain                               |     4207 |
| Barcelona | Tony                                |     3971 |
| Barcelona | Totalmente                          |      738 |
| Barcelona | S�bado Santo                        |       26 |
| Donostia  | Starbucks                           |    17217 |
| Donostia  | #GraciasGabo                        |     9318 |
| Donostia  | #GabrielGarciaMarquez               |     6645 |
| Donostia  | Madrid                              |     6380 |
| Donostia  | Viernes Santo                       |     2605 |
| Donostia  | Oriana                              |     1469 |
| Donostia  | #SemanaSanta                        |     1384 |
| Donostia  | #ElChiringuitoDeNitro               |      349 |
| Donostia  | Noja                                |       42 |
| Donostia  | #gabon                              |        0 |
| Madrid    | Toronto                             |    35688 |
| Madrid    | #ThanksOneDirection                 |     4820 |
| Madrid    | #100RazonesPorLasQueOdiarElSigloXXI |     1223 |
| Madrid    | S�bado Santo                        |      812 |
| Madrid    | #VRLTyfaKAZANDIRTIYOR               |      755 |
| Madrid    | Valanciunas                         |      587 |
| Madrid    | Castilla                            |      425 |
| Madrid    | Rui Faria                           |      189 |
| Valencia  | Semana Santa                        |     4386 |
| Valencia  | Sunderland                          |     3289 |
| Valencia  | Sevilla                             |     2049 |
| Valencia  | Valencia                            |     1907 |
| Valencia  | #osasunaVCF                         |      385 |
| Valencia  | #cuernoriana                        |      301 |
| Valencia  | Guaita                              |      110 |
| Valencia  | #GabrielGarciaMarquez               |       45 |
| Valencia  | Oriana                              |       39 |
32 rows in set (0.00 sec)

There is another Python library for Storm called Petrel, that is more ambitious that the storm Python module as it allows "writing, submitting, debugging, and monitoring Storm topologies in pure Python". Although it still uses some Java classes to launch the topology in Storm, it is quite close to "avoid the JVM completely" when writing topologies in Python. I will give it a try for sure, but I had to learn to walk before I could run in Storm.

See you!

Sunday, March 2, 2014

Talking to HBase from Python with JPype

Long time no see. I really love Python and also the Hadoop ecosystem, but there is this problem that Hadoop is all Java based, so sometimes is not so easy to use Hadoop with Python. There are some approaches to interoperatibility between Python and Java, being the Jython interpreter one of the most remarkable, and also what it's shipped with Apache Pig by default. Nevertheless, Jython is always lagging behind Python (I think it only supports Python 2.5), and I've also have found some problems when importing external libraries, even pure Python libraries, at least in the standalone version shipped with Pig. You lose access to all the cool C-based libraries available in the reference CPython implementation as well.
So I was very happy to see that CPython is now supported for UDFs in the new Pig 0.12.0. This opens a whole world of possibilities, and in particular I was thinking it would be very nice to use HBase from a CPython UDF in a Pig job, following the "HBase as a shared resource" pattern for MapReduce and HBase interactions. With this and other possible applications in mind (e.g. calling HBase from a Python Storm bolt), I decided to do some research about accesing HBase from CPython. Finally, I came with the idea of using JPype as the interface between Python and the Java client classes for HBase.

The approach in JPype is different to Jython in that, instead of implementing Python in Java, the idea is instantiating a JVM for calling Java from Python. Hence, to get an HBase driver for CPython I'd only have to call the Java driver from JPype, implementing a light wrapper for ease of use. For now I'm just in the proof of concept phase, but at least I've been able to make a simple connection to HBase from CPython. So let's go for it!

First we have to install JPype, which is available at pip and anyway is very easy to install by hand. Then we can import the jpype module from our Python code, and access to the HBase Java driver classes through the jpype.JClass Python class. For this little experiment (all the code is available at github) I first created a simple HBase table with this simple bash script



hbase shell <<END
create '${TABLE_NAME}', 'info', 'visits'
put '${TABLE_NAME}', 'john', 'info:age', 42
put '${TABLE_NAME}', 'mary', 'info:age', 26
put '${TABLE_NAME}', 'john', '', 5
put '${TABLE_NAME}', 'john', '', 2
put '${TABLE_NAME}', 'mary', '', 4
put '${TABLE_NAME}', 'mary', '', 2
scan '${TABLE_NAME}'

The goal now is writing a CPython program to scan that table. JPype is a very simple library, you only have to start a JVM through a call to jpype.startJVM, and then you can easy access to Java objects through simple calls like the following

HTablePoolClass = jpype.JClass("org.apache.hadoop.hbase.client.HTablePool")
connection_pool = HTablePoolClass()

Here we access to the Java class HTablePool and store it in a variable, so we can instantiate it in Python by using the usual Python notation for object creation and calling the constructors as defined in Java. JPype is smart enough to perform most of the necesary type conversions between Python and Java automatically, and also choosing the right version of overloaded methods. On the other hand, sadly JPype is not the most active project in the world, and sometimes strange exceptions may arise. In particular when you instantiate a class A that depends on a class B, which is not available in the classpath, JPype raises an exception saying that A is not found, when the problem is that B is not available. To solve this, I just added to the classpath all the jars related to Hadoop or HBase on the creation of the JVM:

_jvm_lib_path = "/usr/java/jdk1.6.0_32/jre/lib/amd64/server/"
cp_dirs = '/usr/lib/hadoop/client-0.20:/usr/lib/hadoop/lib:/usr/lib/hadoop:/usr/lib/hadoop/client:/usr/lib/hbase/lib/:/usr/lib/hbase/'
cp_jars_str = ":".join(set(jar for cp_dir in cp_dirs.split(':') for jar in glob.iglob(cp_dir + "/*.jar")))

jpype.startJVM(_jvm_lib_path, "-ea","-Djava.class.path=" + cp_jars_str)

After that everything worked fine for me with JPype, as you can see in the rest of the program below, in which I just create a connection to HBase, open a table, and perform a full scan. The only remarkable detail is the use of the function iterate_iterable() to traverse Java Iterable objects as Python generators.

def iterate_iterable(iterable):       
    iterator = iterable.iterator()
    while iterator.hasNext():

test_table_name = 'test_hbase_py_client'

    HTablePoolClass = jpype.JClass("org.apache.hadoop.hbase.client.HTablePool")
    connection_pool = HTablePoolClass()
    test_table = connection_pool.getTable(test_table_name)
    BytesClass = jpype.JClass("org.apache.hadoop.hbase.util.Bytes")
    ScanClass = jpype.JClass("org.apache.hadoop.hbase.client.Scan")
    scan_all = ScanClass()
        # class ResultScanner
    result_scanner = test_table.getScanner(scan_all)
    # for result in result_scanner: TypeError: 'org.apache.hadoop.hbase.client.ClientScanner' object is not iterable
    print '\n'*2, '-'*30
    print 'Scanning table "{table_name}"'.format(table_name=test_table_name)
    for result in iterate_iterable(result_scanner):
        print "row id:", result.getRow()
        for key_val in iterate_iterable(result.list()):
            print "\t", "family : {family}, qual : {qual}, value : {value}".format(family = key_val.getFamily(), qual = key_val.getQualifier(), value = BytesClass.toString(key_val.getValue()).encode('ascii', 'ignore'))
    print '-'*30, '\n'*2
jpype.JavaException as ex:
    print 'exception', ex.javaClass(), ex.message()
    print 'stacktrace:', ex.stacktrace()

I have only tested it in my Cloudera Quickstart CDH4.4.0, so please tell me if you have any problem.
There are other CPython clients for HBase like pyhbase and hbase-thrift. Regarding pyhbase, it looks like an abandoned project, and it doesn't work with CDH4, at least in the tests I performed. On the other hand I haven't tested hbase-thrift, but I don't like the idea of having the thrift gatewat as a bottle neck for connections to the HBase cluster. Anyway I think the technique of wrapping a Java driver with JPype is interesting because it can be applied to other databases, and it would be easy to keep the driver up to date by updating the underlying jars when needed.

I hope you enjoyed the post!

Saturday, November 16, 2013

Let's start with the basics

Hi, this is the first post. What better way to start a blog on Big Data and stuff, than writing about MapReduce, the celebrated programming model for parallel processing proposed by Google in 2004?

What better place than here, what better time than now?

Lately I've been studying how to write MapReduce programs. MapReduce is an algorithmic pattern for writing distributed divide-and-conquer programs. One of the main problems in parallelizing programs is how to split the input data amongst the processing nodes: if the pieces are too big, then the individual nodes won't be able to process the parts of the data that are assigned to them; but if the pieces are too small, then most of the computing and network resources will be spent on communication and coordination, instead of processing the data. This is one of the reasons why the automatic parallelization of programs, as it is performed in systems like e.g. NESL or Data Parallel Haskell, is very difficult.
For me MapReduce is a way to overcome this problem by giving the programmer explicit control of the way the input data is splitted ... or you can also see that as leaving the problem for the programmer. Anyway, in MapReduce this is achieved very elegantly by reducing any problem to the task of defining two functions: a map function and a reduce function:
  • The purpose of the map function is putting together the parts of the input that need to be processed together, i.e., the groups that are used for a distributed divide-and-conquer. This way the programmer specifies the granullarity of the split of the data I mentioned above.
  • Then the reduce functions specifies how to process each of the groups defined by map. These groups are processed independently in differerent worker nodes.
As you can see, I said functions, not methods, because MapReduce has a Functional Programming flavor in the sense that inter-process communication based on a shared state is avoided in favor of expressing the state explicitly in the arguments of the program functions, which then get closer to mathematical functions.
This model is made concrete by constraining the signature of these functions as follows:
  • The function map takes a key and a value corresponding to a piece of the input data, and generates a list of intermediate key-value pairs. The map function is executed in parallel and independently in several worker nodes, and then all the pairs generated in each of the nodes are shuffled by the MapReduce engine so values corresponding to the same intermediate key are put together.
  • Now we have a mapping from intermediate keys to list of intermediate values. The reduce function takes an intermediate key and the corresponding list of values and generates a list of output key-value pairs.
So we already have parallel computing in the execution of map, but the point I was trying to express is that map deals with local data, while in reduce we have global information in the sense that we are sure that each relevant element of the input that is relevant for computing the part of output for that key is available. Anyway that is a very rough explanation of MapReduce, and the implementations of MapReduce like e.g. Hadoop or Apache Spark provide many other fundamental features like code and data distribution, slave coordination, etc, that are needed in practice.

So let's finish setting these ideas with the paradigmatic HelloWorld program for MapReduce: the word count problem. In this problem we have several texts and we want to count the number of times each word appears in each of the texts. As I'm a very pythonic person, I will represent an example input with the following Python list of pairs:

[(None, "hola que tal hola"), (None, "ey hola"), (None, "como estamos")]

What about the map function? We want to count words, so we have to put together all the ocurrences of each word. That sounds easy, like the following Python code for map, that defines a Python generator of key-value pairs, where each key is a word that appears in an input text, and the value is always 1, as a declaration that the word exists:

def map_f(_title, text):
    for word in text.split(' '):
        yield((word, 1))

Now we have all the ocurrences of each together, all we have to do is count the 1s for each word, and we just got our word counter program:

def reduce_f(word, counts):
    yield((word, sum(counts)))

Maybe that was a little too much for a very short introduction to MapReduce, but give me a break, this is my first blog post. And here it comes my real motivation for writing this post. While I was reading about MapReduce, it ocurred to me that writing Yet Another MapReduce Emulator would be i) a good way to interiorize the mechanics of MapReduce and the way the map and reduce functions are invoked; and ii) it could be a nice tool for the first stages of the development of MapReduce programs.
As I said I'm a very pythonic person, so I developed the emulator in Python, also because there are nice debuggers like PyDev that could increase the usefulness of the emulator. Besides Python has several Functional Programming tricks in its bag that I could use both for the emulator and for the MapReduce programs.

This post is already too long, so I won't explain the details of the emulator, but it's pretty simple and you can take a look at it at my GitHub repo. Here is the output of an execution of the implementation of word count above in my emulator.

Executing word_count for input: [(None, 'hola que tal hola'), (None, 'ey hola'), (None, 'como estamos')]

        #mappers: 3 | contents: [[(None, 'ey hola')], [(None, 'hola que tal hola')], [(None, 'como estamos')]]
        [{'ey': [1], 'hola': [1]}, {'tal': [1], 'que': [1], 'hola': [1, 1]}, {'estamos': [1], 'como': [1]}]
        [[('ey', 1), ('hola', 1)], [('tal', 1), ('que', 1), ('hola', 2)], [('estamos', 1), ('como', 1)]]
        {'que': [1], 'como': [1], 'tal': [1], 'ey': [1], 'estamos': [1], 'hola': [1, 2]}

[('que', 1), ('como', 1), ('tal', 1), ('ey', 1), ('estamos', 1), ('hola', 3)]

The input list is randomly splitted into several lists in mappers_inputs to simulate the distribution of the input into several mapper nodes. The list of dictionaries combiners_inputs correspond to the local execution of the combiners at the mapper nodes. The combiner is a secret ingredient for scaling MapReduce I did not mention above, but the basic idea is extending this diagram for MapReduce by performing a kind of local reduce in the mapper nodes before sending the output pair of map to the MapReduce engine for shuffle. The combine function is another function with the same signature as reduce but that only works with the pairs local to the worker node where it is invoked. In some situations the use of a combiner dramatically reduces the network usage during the shuffle phase as the number of pairs emited to the network is increased a lot.
In the example above the combiner works between combiners_inputs and combiners_outputs: you can see how in the second mapper the combiner collapses the two occurrences of the word 'hola' into a single pair ('hola', 2). Without the combiner two pairs ('hola', 1) would have been emitted instead. This doesn't sound very impressive, uh? Now imagine this with TBs of documents, and the repetition frequency of words in real texts, it makes more sense now?

I hope you enjoy (at least some part of) this post, see you!