Wednesday, August 6, 2014

Simple SVG charts with HBase REST service, Flask and Pygal

A little less conversation and a little more action in this post. I wanted to have a flexible way to define simple charts for small HBase tables. Maybe using HBase for small data might sound crazy, but by doing so we can take advantage of its flexible NoSQL schema. So let’s exploit HBase's REST service (see also here) for this, first of all we have to launch the service with the following command:
[cloudera@localhost ~]$ hbase rest start -ro -p 9998
14/08/06 14:49:33 INFO util.VersionInfo: HBase 0.94.6-cdh4.4.0
....
That starts the HBase REST server in read only mode, and serving at port 9998. This service is started by default in some distributions like HDP2. Once the service is started, the idea is defining a mapping from service responses to some charts. For that we might use matplotlib combined with Flask like this:
@app.route('/barchart.png')
def plotOne():
    fig = Figure()
    axis = fig.add_subplot(1, 1, 1)
    axis.bar(range(len(values)), values)

    canvas = FigureCanvas(fig)
    output = StringIO.StringIO()
    canvas.print_png(output)
    response = make_response(output.getvalue())
    response.mimetype = 'image/png'
    return response
But I wanted something simpler, and I found Pygal: it offers nice SVG charts with a high level interface, some fancy animations, and Flask integration. The first chart is easy as pie(chart):
values = [2, 1, 0, 2, 5, 7]
@app.route('/barchart.svg')
def graph_something():
     bar_chart = pygal.Bar(style=DarkSolarizedStyle)
     bar_chart.add('Values', values)
     return bar_chart.render_response()




Now with some creative URL routing in Flask we can define moderately complex graphs just in the URL, thanks to the suffix globbing of HBase's REST service, and by using a simple html table in the Jinja2 template. Autorefresh is obtained simply with a <meta http-equiv="refresh" content="{{refresh_rate}}"> element in the template. So we get


for the URL http://localhost:9999/hbase/charts/localhost:9998/test_hbase_py_client/width/1500/cols/2/refresh/500/bar/Sites%20Visited/visits/bar/Info/info/keys/* , assuming a table created in hbase shell as
create 'test_hbase_py_client', 'info', 'visits'
put '${TABLE_NAME}', 'john', 'info:age', 42
put '${TABLE_NAME}', 'mary', 'info:age', 26
put '${TABLE_NAME}', 'john', 'visits:amazon.com', 5
put '${TABLE_NAME}', 'john', 'visits:google.es', 2
put '${TABLE_NAME}', 'mary', 'visits:amazon.com', 4
put '${TABLE_NAME}', 'mary', 'visits:facebook.com', 2
list
scan '${TABLE_NAME}'
exit

The main idea for the mapping into a barchart is that each HBase row corresponds to a group of bars (a color in the chart), and that given a column family the quals for that column are the values in the x-axis, while the cell values correspond to the values for the y-axis. If several rows are specified then all the bars groups are displayed together with a different color per row key. 
Besides, to allow several charts the number of columns is specified followed by a sequence of chart specifications, which are triples (chart type, chart title, column family). Hence the URL http://localhost:9999/hbase/charts/localhost:9998/test_hbase_py_client/width/1500/cols/2/refresh/500/bar/Sites%20Visited/visits/bar/Info/info/keys/* means "read from the table test_hbase_py_client at the server localhost:9998; the chart table will be 1500 pixels width; use two columns and refresh the whole chart each 500 seconds; the first chart is a bar chart with tittle 'Sites Visited' and takes the values from the column family 'visit', the second chart is a bar chart titled 'Info' that reads from the column family 'info'; use all the keys found in the table". This URL mapping was implemented by combining Flask standard routing primitives with a custom URL converter (extending werkzeug.routing.BaseConverter)

For a more elaborate example, take a look at this simple Spark Streaming program (so simple it would be called script if it was written in Python ...), that populates an HBase table with a sliding window of one minute containing the mention count in Twitter for some musicians. 


As usual, you can find all the code for the post in my github repo, where you can see that the chart service is a single Python script. Now all that is left is extending the Python service to cover all the different types of Pygal charts, and calling a web designer so the chart page stops looking like a web page from the dotcom era.


We are hiring!

If you have enjoyed this post, you are interested in Big Data technologies, and you have a solid experience as a Java developer, take a look to this open position at my company.



Saturday, May 31, 2014

OLAP with Apache Phoenix and HBase

Some weeks ago a message was posted in the Apache Phoenix users mailing list requesting for examples of business intelligence visualization products that could be used with Phoenix. Apache Phoenix provides an SQL layer on top on HBase, as a JDBC client and a set of HBase coprocessors that enable the efficient execution of SQL queries by exploiting the parallelism offered by that mechanism. I suggested Saiku and an example configuration to connect Saiku to Phoenix,and I ended up being invited to write a guest post in Apache Phoenix's blog. Here you can find the result, a tutorial on using Phoenix and Saiku for implementing an OLAP system over HBase. Take a look if you're interested in using OLAP for performing flexible analysis and visualisation, over an horizontally scalable database like HBase.


try to ride on waves of activity in every direction
you're the center and you're always free in every direction


Sunday, April 20, 2014

Using Storm's Multilang Protocol to download popular tweets with Python

Apache Storm is a very popular open source stream processing framework, that allows us to construct real-time fault-tolerant distributed data processing systems very easily. The idea is that a stream is an infinite sequence of tuples, which are dictionaries from strings to values, and that streams of tuples are processed and generated by Storm topologies. A storm topology is a directed graph where each node (called topology component, to avoid confusion with the nodes of the Storm cluster I guess) is either a Spout or a Bolt. A Spout is a node that generates tuples from thin air (for example by connecting to a external service, reading from a log or database, connecting to scalable message bus like Apache Kafka ...), so spouts are used as the starting points for topologies. The rest of the topology components are Bolts, which accept tuples from spouts or other bolts, and generate more tuples from other bolts, or maybe just connect to some external system (for example a database or Apace Kafka) to act as a sink. The concepts section of the Storm wiki gives a much better introduction to the subject that anything I could say.
Stream processing systems like Storm are an important part of the Lambda Arquitecture for Big Data, and can be used for example to compute aproximation of metrics for the data that has been recently introduced in the system, and therefore couldn't have been processed yet by other batch processes, tipically in the form of MapReduce jobs. So it's pretty clear that Storm is a relevant system to be part of our data processing tool belt.

An interesting feature of Storm is that it is designed from the ground up to be able to use different programming languages in the topologies. As early as in the Storm tutorial we see that bolts can be defined in any language, that those bolts will be executed as subprocesses of the corresponding Storm supervisor daemon, and that comunication between the supervisor and subprocesses is based on JSON messages over stdin. That communication convention is formalized in the Multilang Protocol of Storm, which is very simple:
  • Messages are strings encoding JSON objects, and the end of any message is signed with a single line containing "end", which is not part of the preceding JSON.
  • There is an initial handsharke phase in which the parent supervisor process sends the JSON serializations of a Storm configuration object and a Storm TopologyContext object to the stdin of the child process. This somehow mimicks the call to IBolt.prepare() or ISpout.open() that otherwise would be executed in Java. Besides, the parent process specifies a directory in the local file system were the children must create an empty file named after its PID. This gives the supervisor the information it needs to kill the process later if needed. The handshake ends when the child process sends a JSON object with its PID to the parent, through its stdout.
  • The rest of the communication is performed by sending JSON through stdin/stdout. For example the parent process will send the message
    {"command": "next"}
to a child spout to ask for the next tuple. So in order to implement a spout or bolt in any programming language we just have to write a program implementing that protocol from the perspective of the child process. There is an example bolt in Python in the Storm tutorial above, which is based on the Python storm module distributed with Storm, were the boilerplate of the protocol is implemented so we only have to extend a simple class to implement a bolt. The topology is still written using the Java API, and in fact the Python implementation of the bolt is invoked by a wrapper Java class that extends backtype.storm.task.ShellBolt and implements backtype.storm.topology.IRichBolt. So many layers! It's always difficult to escape from the JVM when you're working on Big Data.

underneath it all, we feel so small
the heavens fall, but still we crawl

Even when that example Python bolt is available, I've had difficulties locating an example spout in Python based on the corresponding backtype.storm.spout.ShellSpout. So I decided to stop searching and start developing my own example instead. An this is what this post is about.

The example I developed is a simple topology that obtains popular tweets from Twitter and stores them in MySQL for further analysis. As this is just a simple tutorial I will just consider the trending topics for a fixed set of locations, in this case 4 Spanish cities: Madrid, Barcelona, Donostia and Valencia. The starting spout, and several bolts, are written in Python using the storm module above, if you are interested you can find the full source at my github repo. Originally I wanted to implement the following topology:
  1. A TrensSpout obtains the trending topics for each location calling the Twitter API, and emits a tuple (place, topic name, topic query) for each trending topic, where the topic query is a query string to be used to ask twitter for tweets for that topic.
  2. Those tuples are received by a GetTweetsBolts, that performs the query to Twitter, makes some projections of the result for the interesting meta-information, and emits a tuple for each tweet with the tweet text and metainformation.
  3. Finally a DBStoreBolt persists the tweets in MySQL.
Unfortunately, for what it seems to be a bug in Storm's multilang protocol implementation, the task id is not sent to the spouts during the initial handshake. As a consequence the initial bolt cannot be parallelized as the locations Madrid/Barcelona/Donostia/Valencia  cannot be assigned to each spout instance based on the task id, as it is not available. My workaround consist in assuming that the initial spout won't be parallel, so TrensSpout is replaced by PlacesSpout, which just emits a tuple per each location with a fixed frequency. Then the next bolt TrendsBolt can be executed in parallel: it takes a location and then fetches the trending topics for that location. The topology definition in Java looks like this:
  // This spout has no parallelism 
builder.setSpout("PlacesSpout", new PlacesSpout(), 1);
builder.setBolt("TrendsBolt", new TrendsBolt(), 4).shuffleGrouping("PlacesSpout");
builder.setBolt("GetTweetsBolt", new GetTweetsBolt(), 4*2).shuffleGrouping("TrendsBolt"); 
builder.setBolt("DBStoreBolt", new DBStoreBolt(), 4*2).shuffleGrouping("GetTweetsBolt");
Note that shuffleGrouping is always used to connect the topology components, as there is no local state to be concerned about. Parallelism hints are kind of random, but it is clear that the number of tuples is multiplicated down the topology (per 10 for trending topics and per 15 for tweets).
In general the code is pretty easy. For example this is the Python sample spout I was looking for, composed by a wrapper Java class PlacesSpout
public PlacesSpout(int freq) {
  super("python2.7", "python/twitter_storm/places_spout.py");
  this.tickFrequency = freq;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
  declarer.declare(new Fields(TopologyFields.PLACE));
}

@Override
public Map getComponentConfiguration() {
  Config conf = new Config();
  conf.put(FREQUENCY_CONF_KEY, this.tickFrequency);
  return conf;
}
and the corresponding Python class PlacesSpout which does all the job.
class PlacesSpout(storm.Spout):
    _frequency_conf_key = "PlacesSpoutFrequency"
    def initialize(self, conf, context):
        self._places = get_tweets.available_places()
        self._tick_frequency = conf[self.__class__._frequency_conf_key]
    
    def nextTuple(self):
        for place in self._places:
            storm.emit([place])
        time.sleep(self._tick_frequency)
Here we can also see how easy it is to pass configuration parameters to Python from the Java wrapper. Other things to take into account:
  • Note the use of a relative path to the python script in the constructor of the PlacesSpout Java class. Storm expects a resources folder to be placed at the root of the resulting jar, for using it as the parent for that relative path. Don't forget to configure your pom.xml for that.
  • Also in that constructor, we see the command to invoke Python and the name of the Python module to use as main. I wrote a dummy Python module for each spout and bolt, that just creates the corresponding instance and calls the run() method for it, to start waiting for the handshake message from the parent process:
  • from twitter_components import PlacesSpout
    if __name__ == '__main__':
        # Start this spout
        PlacesSpout().run()
    
    But now that I look at it, maybe a single module and an argument to switch the class would have been better ...
  • Note how tuples are emitted by calling the function storm.emit(), instead of some method inherited from the storm.Spout class, or for some object passed during the construction of that class. The storm Python module does not follow the design of the Java Storm API, which can be confusing at first, especially combined with the lack of comments or documentation. I have added some comments to the Python storm module and generated the corresponding documentation with Sphinx, you can find it here (with a terrible CSS configuration, I'm afraid, I would rather download it as it works locally, I haven't learnt Github Pages yet). Anyway the point is using the functions emit(), emitDirect() and reportError() to emulate the corresponding methods for SpoutOutputCollector, and the functions emit(), emitDirect(), ack(), fail() and reportError() to emulate the corresponding methods for OutputCollector.
Finally, for those interested, here is a thoughtful analysis of 3 minutes of tweets from yesterday:
mysql> select place, topic_name, sum(favorite_count) as sum_favs from storm_tweets group by place, topic_name order by place, sum_favs desc ;
+-----------+-------------------------------------+----------+
| place     | topic_name                          | sum_favs |
+-----------+-------------------------------------+----------+
| Barcelona | You&I                               |   939324 |
| Barcelona | Spain                               |     4207 |
| Barcelona | Tony                                |     3971 |
| Barcelona | Totalmente                          |      738 |
| Barcelona | S�bado Santo                        |       26 |
| Donostia  | Starbucks                           |    17217 |
| Donostia  | #GraciasGabo                        |     9318 |
| Donostia  | #GabrielGarciaMarquez               |     6645 |
| Donostia  | Madrid                              |     6380 |
| Donostia  | Viernes Santo                       |     2605 |
| Donostia  | Oriana                              |     1469 |
| Donostia  | #SemanaSanta                        |     1384 |
| Donostia  | #ElChiringuitoDeNitro               |      349 |
| Donostia  | Noja                                |       42 |
| Donostia  | #gabon                              |        0 |
| Madrid    | Toronto                             |    35688 |
| Madrid    | #ThanksOneDirection                 |     4820 |
| Madrid    | #100RazonesPorLasQueOdiarElSigloXXI |     1223 |
| Madrid    | S�bado Santo                        |      812 |
| Madrid    | #VRLTyfaKAZANDIRTIYOR               |      755 |
| Madrid    | Valanciunas                         |      587 |
| Madrid    | Castilla                            |      425 |
| Madrid    | Rui Faria                           |      189 |
| Valencia  | Semana Santa                        |     4386 |
| Valencia  | Sunderland                          |     3289 |
| Valencia  | Sevilla                             |     2049 |
| Valencia  | Valencia                            |     1907 |
| Valencia  | #osasunaVCF                         |      385 |
| Valencia  | #cuernoriana                        |      301 |
| Valencia  | Guaita                              |      110 |
| Valencia  | #GabrielGarciaMarquez               |       45 |
| Valencia  | Oriana                              |       39 |
+-----------+-------------------------------------+----------+
32 rows in set (0.00 sec)

There is another Python library for Storm called Petrel, that is more ambitious that the storm Python module as it allows "writing, submitting, debugging, and monitoring Storm topologies in pure Python". Although it still uses some Java classes to launch the topology in Storm, it is quite close to "avoid the JVM completely" when writing topologies in Python. I will give it a try for sure, but I had to learn to walk before I could run in Storm.

See you!

Sunday, March 2, 2014

Talking to HBase from Python with JPype

Long time no see. I really love Python and also the Hadoop ecosystem, but there is this problem that Hadoop is all Java based, so sometimes is not so easy to use Hadoop with Python. There are some approaches to interoperatibility between Python and Java, being the Jython interpreter one of the most remarkable, and also what it's shipped with Apache Pig by default. Nevertheless, Jython is always lagging behind Python (I think it only supports Python 2.5), and I've also have found some problems when importing external libraries, even pure Python libraries, at least in the standalone version shipped with Pig. You lose access to all the cool C-based libraries available in the reference CPython implementation as well.
So I was very happy to see that CPython is now supported for UDFs in the new Pig 0.12.0. This opens a whole world of possibilities, and in particular I was thinking it would be very nice to use HBase from a CPython UDF in a Pig job, following the "HBase as a shared resource" pattern for MapReduce and HBase interactions. With this and other possible applications in mind (e.g. calling HBase from a Python Storm bolt), I decided to do some research about accesing HBase from CPython. Finally, I came with the idea of using JPype as the interface between Python and the Java client classes for HBase.

The approach in JPype is different to Jython in that, instead of implementing Python in Java, the idea is instantiating a JVM for calling Java from Python. Hence, to get an HBase driver for CPython I'd only have to call the Java driver from JPype, implementing a light wrapper for ease of use. For now I'm just in the proof of concept phase, but at least I've been able to make a simple connection to HBase from CPython. So let's go for it!

First we have to install JPype, which is available at pip and anyway is very easy to install by hand. Then we can import the jpype module from our Python code, and access to the HBase Java driver classes through the jpype.JClass Python class. For this little experiment (all the code is available at github) I first created a simple HBase table with this simple bash script

#!/bin/bash

TABLE_NAME='test_hbase_py_client'

hbase shell <<END
create '${TABLE_NAME}', 'info', 'visits'
put '${TABLE_NAME}', 'john', 'info:age', 42
put '${TABLE_NAME}', 'mary', 'info:age', 26
put '${TABLE_NAME}', 'john', 'visits:amazon.com', 5
put '${TABLE_NAME}', 'john', 'visits:google.es', 2
put '${TABLE_NAME}', 'mary', 'visits:amazon.com', 4
put '${TABLE_NAME}', 'mary', 'visits:facebook.com', 2
list
scan '${TABLE_NAME}'
exit
END


The goal now is writing a CPython program to scan that table. JPype is a very simple library, you only have to start a JVM through a call to jpype.startJVM, and then you can easy access to Java objects through simple calls like the following

HTablePoolClass = jpype.JClass("org.apache.hadoop.hbase.client.HTablePool")
connection_pool = HTablePoolClass()


Here we access to the Java class HTablePool and store it in a variable, so we can instantiate it in Python by using the usual Python notation for object creation and calling the constructors as defined in Java. JPype is smart enough to perform most of the necesary type conversions between Python and Java automatically, and also choosing the right version of overloaded methods. On the other hand, sadly JPype is not the most active project in the world, and sometimes strange exceptions may arise. In particular when you instantiate a class A that depends on a class B, which is not available in the classpath, JPype raises an exception saying that A is not found, when the problem is that B is not available. To solve this, I just added to the classpath all the jars related to Hadoop or HBase on the creation of the JVM:

_jvm_lib_path = "/usr/java/jdk1.6.0_32/jre/lib/amd64/server/libjvm.so"
cp_dirs = '/usr/lib/hadoop/client-0.20:/usr/lib/hadoop/lib:/usr/lib/hadoop:/usr/lib/hadoop/client:/usr/lib/hbase/lib/:/usr/lib/hbase/'
cp_jars_str = ":".join(set(jar for cp_dir in cp_dirs.split(':') for jar in glob.iglob(cp_dir + "/*.jar")))

jpype.startJVM(_jvm_lib_path, "-ea","-Djava.class.path=" + cp_jars_str)


After that everything worked fine for me with JPype, as you can see in the rest of the program below, in which I just create a connection to HBase, open a table, and perform a full scan. The only remarkable detail is the use of the function iterate_iterable() to traverse Java Iterable objects as Python generators.

def iterate_iterable(iterable):       
    iterator = iterable.iterator()
    while iterator.hasNext():
        yield iterator.next()

test_table_name = 'test_hbase_py_client'


try:
    HTablePoolClass = jpype.JClass("org.apache.hadoop.hbase.client.HTablePool")
    connection_pool = HTablePoolClass()
    test_table = connection_pool.getTable(test_table_name)
    BytesClass = jpype.JClass("org.apache.hadoop.hbase.util.Bytes")
    ScanClass = jpype.JClass("org.apache.hadoop.hbase.client.Scan")
    scan_all = ScanClass()
        # class ResultScanner
    result_scanner = test_table.getScanner(scan_all)
    # for result in result_scanner: TypeError: 'org.apache.hadoop.hbase.client.ClientScanner' object is not iterable
    print '\n'*2, '-'*30
    print 'Scanning table "{table_name}"'.format(table_name=test_table_name)
    for result in iterate_iterable(result_scanner):
        print "row id:", result.getRow()
        for key_val in iterate_iterable(result.list()):
            print "\t", "family : {family}, qual : {qual}, value : {value}".format(family = key_val.getFamily(), qual = key_val.getQualifier(), value = BytesClass.toString(key_val.getValue()).encode('ascii', 'ignore'))
    print '-'*30, '\n'*2
    test_table.close()
except
jpype.JavaException as ex:
    print 'exception', ex.javaClass(), ex.message()
    print 'stacktrace:', ex.stacktrace()


I have only tested it in my Cloudera Quickstart CDH4.4.0, so please tell me if you have any problem.
There are other CPython clients for HBase like pyhbase and hbase-thrift. Regarding pyhbase, it looks like an abandoned project, and it doesn't work with CDH4, at least in the tests I performed. On the other hand I haven't tested hbase-thrift, but I don't like the idea of having the thrift gatewat as a bottle neck for connections to the HBase cluster. Anyway I think the technique of wrapping a Java driver with JPype is interesting because it can be applied to other databases, and it would be easy to keep the driver up to date by updating the underlying jars when needed.

I hope you enjoyed the post!