Sunday, April 20, 2014

Using Storm's Multilang Protocol to download popular tweets with Python

Apache Storm is a very popular open source stream processing framework, that allows us to construct real-time fault-tolerant distributed data processing systems very easily. The idea is that a stream is an infinite sequence of tuples, which are dictionaries from strings to values, and that streams of tuples are processed and generated by Storm topologies. A storm topology is a directed graph where each node (called topology component, to avoid confusion with the nodes of the Storm cluster I guess) is either a Spout or a Bolt. A Spout is a node that generates tuples from thin air (for example by connecting to a external service, reading from a log or database, connecting to scalable message bus like Apache Kafka ...), so spouts are used as the starting points for topologies. The rest of the topology components are Bolts, which accept tuples from spouts or other bolts, and generate more tuples from other bolts, or maybe just connect to some external system (for example a database or Apace Kafka) to act as a sink. The concepts section of the Storm wiki gives a much better introduction to the subject that anything I could say.
Stream processing systems like Storm are an important part of the Lambda Arquitecture for Big Data, and can be used for example to compute aproximation of metrics for the data that has been recently introduced in the system, and therefore couldn't have been processed yet by other batch processes, tipically in the form of MapReduce jobs. So it's pretty clear that Storm is a relevant system to be part of our data processing tool belt.

An interesting feature of Storm is that it is designed from the ground up to be able to use different programming languages in the topologies. As early as in the Storm tutorial we see that bolts can be defined in any language, that those bolts will be executed as subprocesses of the corresponding Storm supervisor daemon, and that comunication between the supervisor and subprocesses is based on JSON messages over stdin. That communication convention is formalized in the Multilang Protocol of Storm, which is very simple:
  • Messages are strings encoding JSON objects, and the end of any message is signed with a single line containing "end", which is not part of the preceding JSON.
  • There is an initial handsharke phase in which the parent supervisor process sends the JSON serializations of a Storm configuration object and a Storm TopologyContext object to the stdin of the child process. This somehow mimicks the call to IBolt.prepare() or ISpout.open() that otherwise would be executed in Java. Besides, the parent process specifies a directory in the local file system were the children must create an empty file named after its PID. This gives the supervisor the information it needs to kill the process later if needed. The handshake ends when the child process sends a JSON object with its PID to the parent, through its stdout.
  • The rest of the communication is performed by sending JSON through stdin/stdout. For example the parent process will send the message
    {"command": "next"}
to a child spout to ask for the next tuple. So in order to implement a spout or bolt in any programming language we just have to write a program implementing that protocol from the perspective of the child process. There is an example bolt in Python in the Storm tutorial above, which is based on the Python storm module distributed with Storm, were the boilerplate of the protocol is implemented so we only have to extend a simple class to implement a bolt. The topology is still written using the Java API, and in fact the Python implementation of the bolt is invoked by a wrapper Java class that extends backtype.storm.task.ShellBolt and implements backtype.storm.topology.IRichBolt. So many layers! It's always difficult to escape from the JVM when you're working on Big Data.

underneath it all, we feel so small
the heavens fall, but still we crawl

Even when that example Python bolt is available, I've had difficulties locating an example spout in Python based on the corresponding backtype.storm.spout.ShellSpout. So I decided to stop searching and start developing my own example instead. An this is what this post is about.

The example I developed is a simple topology that obtains popular tweets from Twitter and stores them in MySQL for further analysis. As this is just a simple tutorial I will just consider the trending topics for a fixed set of locations, in this case 4 Spanish cities: Madrid, Barcelona, Donostia and Valencia. The starting spout, and several bolts, are written in Python using the storm module above, if you are interested you can find the full source at my github repo. Originally I wanted to implement the following topology:
  1. A TrensSpout obtains the trending topics for each location calling the Twitter API, and emits a tuple (place, topic name, topic query) for each trending topic, where the topic query is a query string to be used to ask twitter for tweets for that topic.
  2. Those tuples are received by a GetTweetsBolts, that performs the query to Twitter, makes some projections of the result for the interesting meta-information, and emits a tuple for each tweet with the tweet text and metainformation.
  3. Finally a DBStoreBolt persists the tweets in MySQL.
Unfortunately, for what it seems to be a bug in Storm's multilang protocol implementation, the task id is not sent to the spouts during the initial handshake. As a consequence the initial bolt cannot be parallelized as the locations Madrid/Barcelona/Donostia/Valencia  cannot be assigned to each spout instance based on the task id, as it is not available. My workaround consist in assuming that the initial spout won't be parallel, so TrensSpout is replaced by PlacesSpout, which just emits a tuple per each location with a fixed frequency. Then the next bolt TrendsBolt can be executed in parallel: it takes a location and then fetches the trending topics for that location. The topology definition in Java looks like this:
  // This spout has no parallelism 
builder.setSpout("PlacesSpout", new PlacesSpout(), 1);
builder.setBolt("TrendsBolt", new TrendsBolt(), 4).shuffleGrouping("PlacesSpout");
builder.setBolt("GetTweetsBolt", new GetTweetsBolt(), 4*2).shuffleGrouping("TrendsBolt"); 
builder.setBolt("DBStoreBolt", new DBStoreBolt(), 4*2).shuffleGrouping("GetTweetsBolt");
Note that shuffleGrouping is always used to connect the topology components, as there is no local state to be concerned about. Parallelism hints are kind of random, but it is clear that the number of tuples is multiplicated down the topology (per 10 for trending topics and per 15 for tweets).
In general the code is pretty easy. For example this is the Python sample spout I was looking for, composed by a wrapper Java class PlacesSpout
public PlacesSpout(int freq) {
  super("python2.7", "python/twitter_storm/places_spout.py");
  this.tickFrequency = freq;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields(TopologyFields.PLACE));
}

@Override
public Map getComponentConfiguration() {
  Config conf = new Config();
  conf.put(FREQUENCY_CONF_KEY, this.tickFrequency);
  return conf;
}
and the corresponding Python class PlacesSpout which does all the job.
class PlacesSpout(storm.Spout):
    _frequency_conf_key = "PlacesSpoutFrequency"
    def initialize(self, conf, context):
        self._places = get_tweets.available_places()
        self._tick_frequency = conf[self.__class__._frequency_conf_key]
    
    def nextTuple(self):
        for place in self._places:
            storm.emit([place])
        time.sleep(self._tick_frequency)
Here we can also see how easy it is to pass configuration parameters to Python from the Java wrapper. Other things to take into account:
  • Note the use of a relative path to the python script in the constructor of the PlacesSpout Java class. Storm expects a resources folder to be placed at the root of the resulting jar, for using it as the parent for that relative path. Don't forget to configure your pom.xml for that.
  • Also in that constructor, we see the command to invoke Python and the name of the Python module to use as main. I wrote a dummy Python module for each spout and bolt, that just creates the corresponding instance and calls the run() method for it, to start waiting for the handshake message from the parent process:
  • from twitter_components import PlacesSpout
    if __name__ == '__main__':
        # Start this spout
        PlacesSpout().run()
    
    But now that I look at it, maybe a single module and an argument to switch the class would have been better ...
  • Note how tuples are emitted by calling the function storm.emit(), instead of some method inherited from the storm.Spout class, or for some object passed during the construction of that class. The storm Python module does not follow the design of the Java Storm API, which can be confusing at first, especially combined with the lack of comments or documentation. I have added some comments to the Python storm module and generated the corresponding documentation with Sphinx, you can find it here (with a terrible CSS configuration, I'm afraid, I would rather download it as it works locally, I haven't learnt Github Pages yet). Anyway the point is using the functions emit(), emitDirect() and reportError() to emulate the corresponding methods for SpoutOutputCollector, and the functions emit(), emitDirect(), ack(), fail() and reportError() to emulate the corresponding methods for OutputCollector.
Finally, for those interested, here is a thoughtful analysis of 3 minutes of tweets from yesterday:
mysql> select place, topic_name, sum(favorite_count) as sum_favs from storm_tweets group by place, topic_name order by place, sum_favs desc ;
+-----------+-------------------------------------+----------+
| place     | topic_name                          | sum_favs |
+-----------+-------------------------------------+----------+
| Barcelona | You&I                               |   939324 |
| Barcelona | Spain                               |     4207 |
| Barcelona | Tony                                |     3971 |
| Barcelona | Totalmente                          |      738 |
| Barcelona | S�bado Santo                        |       26 |
| Donostia  | Starbucks                           |    17217 |
| Donostia  | #GraciasGabo                        |     9318 |
| Donostia  | #GabrielGarciaMarquez               |     6645 |
| Donostia  | Madrid                              |     6380 |
| Donostia  | Viernes Santo                       |     2605 |
| Donostia  | Oriana                              |     1469 |
| Donostia  | #SemanaSanta                        |     1384 |
| Donostia  | #ElChiringuitoDeNitro               |      349 |
| Donostia  | Noja                                |       42 |
| Donostia  | #gabon                              |        0 |
| Madrid    | Toronto                             |    35688 |
| Madrid    | #ThanksOneDirection                 |     4820 |
| Madrid    | #100RazonesPorLasQueOdiarElSigloXXI |     1223 |
| Madrid    | S�bado Santo                        |      812 |
| Madrid    | #VRLTyfaKAZANDIRTIYOR               |      755 |
| Madrid    | Valanciunas                         |      587 |
| Madrid    | Castilla                            |      425 |
| Madrid    | Rui Faria                           |      189 |
| Valencia  | Semana Santa                        |     4386 |
| Valencia  | Sunderland                          |     3289 |
| Valencia  | Sevilla                             |     2049 |
| Valencia  | Valencia                            |     1907 |
| Valencia  | #osasunaVCF                         |      385 |
| Valencia  | #cuernoriana                        |      301 |
| Valencia  | Guaita                              |      110 |
| Valencia  | #GabrielGarciaMarquez               |       45 |
| Valencia  | Oriana                              |       39 |
+-----------+-------------------------------------+----------+
32 rows in set (0.00 sec)

There is another Python library for Storm called Petrel, that is more ambitious that the storm Python module as it allows "writing, submitting, debugging, and monitoring Storm topologies in pure Python". Although it still uses some Java classes to launch the topology in Storm, it is quite close to "avoid the JVM completely" when writing topologies in Python. I will give it a try for sure, but I had to learn to walk before I could run in Storm.

See you!