Stream processing systems like Storm are an important part of the Lambda Arquitecture for Big Data, and can be used for example to compute aproximation of metrics for the data that has been recently introduced in the system, and therefore couldn't have been processed yet by other batch processes, tipically in the form of MapReduce jobs. So it's pretty clear that Storm is a relevant system to be part of our data processing tool belt.
An interesting feature of Storm is that it is designed from the ground up to be able to use different programming languages in the topologies. As early as in the Storm tutorial we see that bolts can be defined in any language, that those bolts will be executed as subprocesses of the corresponding Storm supervisor daemon, and that comunication between the supervisor and subprocesses is based on JSON messages over stdin. That communication convention is formalized in the Multilang Protocol of Storm, which is very simple:
- Messages are strings encoding JSON objects, and the end of any message is signed with a single line containing "end", which is not part of the preceding JSON.
- There is an initial handsharke phase in which the parent supervisor process sends the JSON serializations of a Storm configuration object and a Storm TopologyContext object to the stdin of the child process. This somehow mimicks the call to IBolt.prepare() or ISpout.open() that otherwise would be executed in Java. Besides, the parent process specifies a directory in the local file system were the children must create an empty file named after its PID. This gives the supervisor the information it needs to kill the process later if needed. The handshake ends when the child process sends a JSON object with its PID to the parent, through its stdout.
- The rest of the communication is performed by sending JSON through stdin/stdout. For example the parent process will send the message
{"command": "next"}
underneath it all, we feel so small
the heavens fall, but still we crawl
the heavens fall, but still we crawl
Even when that example Python bolt is available, I've had difficulties locating an example spout in Python based on the corresponding backtype.storm.spout.ShellSpout. So I decided to stop searching and start developing my own example instead. An this is what this post is about.
The example I developed is a simple topology that obtains popular tweets from Twitter and stores them in MySQL for further analysis. As this is just a simple tutorial I will just consider the trending topics for a fixed set of locations, in this case 4 Spanish cities: Madrid, Barcelona, Donostia and Valencia. The starting spout, and several bolts, are written in Python using the storm module above, if you are interested you can find the full source at my github repo. Originally I wanted to implement the following topology:
- A TrensSpout obtains the trending topics for each location calling the Twitter API, and emits a tuple (place, topic name, topic query) for each trending topic, where the topic query is a query string to be used to ask twitter for tweets for that topic.
- Those tuples are received by a GetTweetsBolts, that performs the query to Twitter, makes some projections of the result for the interesting meta-information, and emits a tuple for each tweet with the tweet text and metainformation.
- Finally a DBStoreBolt persists the tweets in MySQL.
// This spout has no parallelism builder.setSpout("PlacesSpout", new PlacesSpout(), 1); builder.setBolt("TrendsBolt", new TrendsBolt(), 4).shuffleGrouping("PlacesSpout"); builder.setBolt("GetTweetsBolt", new GetTweetsBolt(), 4*2).shuffleGrouping("TrendsBolt"); builder.setBolt("DBStoreBolt", new DBStoreBolt(), 4*2).shuffleGrouping("GetTweetsBolt");Note that shuffleGrouping is always used to connect the topology components, as there is no local state to be concerned about. Parallelism hints are kind of random, but it is clear that the number of tuples is multiplicated down the topology (per 10 for trending topics and per 15 for tweets).
In general the code is pretty easy. For example this is the Python sample spout I was looking for, composed by a wrapper Java class PlacesSpout
public PlacesSpout(int freq) { super("python2.7", "python/twitter_storm/places_spout.py"); this.tickFrequency = freq; } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(TopologyFields.PLACE)); } @Override public Mapand the corresponding Python class PlacesSpout which does all the job.getComponentConfiguration() { Config conf = new Config(); conf.put(FREQUENCY_CONF_KEY, this.tickFrequency); return conf; }
class PlacesSpout(storm.Spout): _frequency_conf_key = "PlacesSpoutFrequency" def initialize(self, conf, context): self._places = get_tweets.available_places() self._tick_frequency = conf[self.__class__._frequency_conf_key] def nextTuple(self): for place in self._places: storm.emit([place]) time.sleep(self._tick_frequency)Here we can also see how easy it is to pass configuration parameters to Python from the Java wrapper. Other things to take into account:
- Note the use of a relative path to the python script in the constructor of the PlacesSpout Java class. Storm expects a resources folder to be placed at the root of the resulting jar, for using it as the parent for that relative path. Don't forget to configure your pom.xml for that.
- Also in that constructor, we see the command to invoke Python and the name of the Python module to use as main. I wrote a dummy Python module for each spout and bolt, that just creates the corresponding instance and calls the run() method for it, to start waiting for the handshake message from the parent process:
from twitter_components import PlacesSpout if __name__ == '__main__': # Start this spout PlacesSpout().run()But now that I look at it, maybe a single module and an argument to switch the class would have been better ...
mysql> select place, topic_name, sum(favorite_count) as sum_favs from storm_tweets group by place, topic_name order by place, sum_favs desc ; +-----------+-------------------------------------+----------+ | place | topic_name | sum_favs | +-----------+-------------------------------------+----------+ | Barcelona | You&I | 939324 | | Barcelona | Spain | 4207 | | Barcelona | Tony | 3971 | | Barcelona | Totalmente | 738 | | Barcelona | S�bado Santo | 26 | | Donostia | Starbucks | 17217 | | Donostia | #GraciasGabo | 9318 | | Donostia | #GabrielGarciaMarquez | 6645 | | Donostia | Madrid | 6380 | | Donostia | Viernes Santo | 2605 | | Donostia | Oriana | 1469 | | Donostia | #SemanaSanta | 1384 | | Donostia | #ElChiringuitoDeNitro | 349 | | Donostia | Noja | 42 | | Donostia | #gabon | 0 | | Madrid | Toronto | 35688 | | Madrid | #ThanksOneDirection | 4820 | | Madrid | #100RazonesPorLasQueOdiarElSigloXXI | 1223 | | Madrid | S�bado Santo | 812 | | Madrid | #VRLTyfaKAZANDIRTIYOR | 755 | | Madrid | Valanciunas | 587 | | Madrid | Castilla | 425 | | Madrid | Rui Faria | 189 | | Valencia | Semana Santa | 4386 | | Valencia | Sunderland | 3289 | | Valencia | Sevilla | 2049 | | Valencia | Valencia | 1907 | | Valencia | #osasunaVCF | 385 | | Valencia | #cuernoriana | 301 | | Valencia | Guaita | 110 | | Valencia | #GabrielGarciaMarquez | 45 | | Valencia | Oriana | 39 | +-----------+-------------------------------------+----------+ 32 rows in set (0.00 sec)
There is another Python library for Storm called Petrel, that is more ambitious that the storm Python module as it allows "writing, submitting, debugging, and monitoring Storm topologies in pure Python". Although it still uses some Java classes to launch the topology in Storm, it is quite close to "avoid the JVM completely" when writing topologies in Python. I will give it a try for sure, but I had to learn to walk before I could run in Storm.
See you!