THE SQL Server Blog Spot on the Web

Welcome to SQLblog.com - The SQL Server blog spot on the web Sign in | |
in Search

Jamie Thomson

This is the blog of Jamie Thomson, a data mangler in London working for Dunnhumby

A story of scale-out on HDInsight

Lately I have been doing a lot of work using Microsoft’s Hadoop offering, HDInsight. I suspect a lot of people who read my blog are unfamiliar with what Hadoop actually is so I thought I’d recount a recent test I did that exhibits the scale-out nature of processing data on Hadoop.

The test used a mapreduce job (written in Java) to process an incoming CSV file and load it into an HBase database on that same cluster (HBase is a NoSQL data store, for the purposes of this blog post that’s all you need to know right now). Mapreduce is a technique for doing data processing on Hadoop’s scale out architecture where incoming data is split according to some expression over the incoming rows and then rows with the same result of that expression are combined using some sort of aggregation. The splitting and aggregation can be done on multiple nodes in the Hadoop cluster and hence I like to refer to mapreduce as distributed GroupBy.

The actual work that our mapreduce job is doing isn’t important, but for your information its combining many rows of data pertaining to a single customer into a JSON document and loading that document into HBase. Upon successful completion of the job HBase contained 3125000 JSON documents.

TL;DR

I loaded 1.6GB of raw source data (62.5m rows) into HBase using HDInsight, the quickest I was able to complete this was 34m2s.The key to speeding up our throughput was to (a) use a bigger HDInsight cluster and (b) split our input data into multiple files thus forcing the processing to be distributed over more map tasks. With more performance tuning (aka scaling out to more HDInsight nodes) I am confident we get this much lower.

Note that it is possible to specify the number of tasks that the map phase uses rather than Hadoop guessing how many it should use, for this test I chose not to specify that. In other words, splitting the incoming data over multiple files is not a necessity, it was just a trick I pulled to affect the mapreduce job.

The detail

I generated a 1.6GB (1,677,562,500B) file containing 62 500 000 rows. On the first run I used an HDInsight cluster that had 2 worker nodes. The entire mapreduce job took ~1hr13m50s. The map phase took ~58m, reduce phase took ~1hr6m (so clearly they overlapped – that is because the reduce phase starts as soon as the first map task completes and as you will see below the map tasks completed at wildly different times).

Even though the reduce phase took longer its actually the map phase which caused the long execution time. To try and pinpoint why it took so long I dove into the logs that Hadoop produces. Unless you tell Hadoop otherwise it determines how many tasks it should spin up in the map phase and in this case it determined it needed 4 map tasks:

clip_image002

I’ve highlighted the elapsed times for each, note the 4th is much lower. This would explain why the reduce phase took so long, it started as soon as the first map task completed but then had to wait ~52minutes until all the other map tasks were complete.

Each one of those tasks has its own task log and from those task logs I found the following information:

Processing split: wasb://etl@dhnlsjamiet.blob.core.windows.net/nls/20141009T130423423_20140101000000_usuals_1_0/input/usuals_1_0_20140101000000.csv:0+536870912
Processing split: wasb://etl@dhnlsjamiet.blob.core.windows.net/nls/20141009T130423423_20140101000000_usuals_1_0/input/usuals_1_0_20140101000000.csv:536870912+536870912
Processing split: wasb://etl@dhnlsjamiet.blob.core.windows.net/nls/20141009T130423423_20140101000000_usuals_1_0/input/usuals_1_0_20140101000000.csv:1073741824+536870912
Processing split: wasb://etl@dhnlsjamiet.blob.core.windows.net/nls/20141009T130423423_20140101000000_usuals_1_0/input/usuals_1_0_20140101000000.csv:1610612736+66949764

The numbers at the end represent the byte ranges that each task is processing (the first one starts from byte 0 as you would expect). Notice the last one (1610612736+66949764). That means it is starting from byte 1610612736 and processing the next 66949764 bytes. Given that task is the 4th task of 4 it shouldn’t surprise you to know that if you add those two numbers together they come to 1677562500 which is exactly the same size as the input file. In other words, the logs tell us exactly what we should expect, that the input data has been split over the 4 tasks that it deemed necessary to process this file.

Notice that the first 3 tasks processed 536 870 912B, the 4th processed only about 12% of that, 66 949 764B. This would explain why the 4th task completed so much quicker than the others. The data has not been split evenly, and clearly that’s a problem because one of the map tasks completed so much quicker than the others which ultimately means the reduce phase has to sit around waiting for all the data – the uneven split of the data has caused inefficient use of our cluster.

We can infer some things from this:

  • The less data that a task has to process, then the less time that task takes to complete (pretty obvious)
  • If we can increase the number of tasks, the data will be distributed more uniformly over those tasks and they should complete much quicker (and in roughly the same amount of time) due to having less data to process and less competition for resources.

Thus I ran the same test again changing only one variable, the number of nodes in the HDInsight cluster – I increased it from 2 to 20. I hoped that this would increase the number of map tasks. Unfortunately the job failed (my fault, I left some output files lying around from a previous run and that caused a failure) however it got as far as completing the map phase which is pretty much all I cared about:

clip_image004

As you can see there were still only 4 tasks and they actually took longer. So, we didn’t achieve more tasks and thus we didn’t speed the job up. That’s not good. I can’t explain right now why they actually took longer. The same number of tasks (4) distributed over a greater number of nodes (20) would, you would think, be slightly quicker due to less resource contention. Bit of a weird one that and I can’t explain it right now.

I wondered if splitting the input file into lots of smaller files would make a difference so I split that file into 20 equally sized smaller files and ran the job again on the 2-node cluster. This time we got 20 tasks:

clip_image006

Which is great, however the map phase failed due to out-of-memory issues:

clip_image008

So, I uploaded those same 20 files to the 20node cluster and ran again. We got 20 tasks in the map phase and, thankfully, this time they all completed successfully. The entire job (map + reduce) completed in 34m2s (less than half the time taken on the 2node cluster when loading the single file), the map phase completed in 10m34s, reduce phase took 24m46s. The overlap there is only 1m18s and that’s because the durations of the map tasks were more uniformly distributed due to the data being separated over more tasks. Here are the 20 map tasks with durations:

clip_image010

That has been a bit of a braindump but I figured it might be interesting to anyone starting out on the path of doing data processing on Hadoop. Please post questions in the comments.

@Jamiet

UPDATE, Thanks to Paul Lambert I've found this very useful primer on attempting to artificially set the number of mappers and reducers: HowManyMapsAndReduces 

Published Friday, October 10, 2014 9:53 AM by jamiet

Comment Notification

If you would like to receive an email when updates are made to this post, please register here

Subscribe to this post's comments using RSS

Comments

 

Davide Mauri said:

Hi Jamie

I'm using HDInsight too and so for I'm pretty disappointed by performances. I have a lot of "small" JSON files on Azure Blob (several thousands) and I needed to extract some data from them. I decided to use Python to do the map job due to its native support to JSON. I dind't need the reduce part. The extract less than 1GB of data it took something like an hour on 4 compute nodes.

From what I see you show pretty much the same problem, and the only solution is to increas the number of nodes, which means increasing costs. My feeling is that for data under the dozen of TB scale, a "big" VM (say, the SSD + 56GB RAM VM) is faster and ultimately cheaper.

In addition to that, the lack of native support from SSIS makes evertything worse...and the Hive ODBC driver is so slow that it is almost unusable.

This, at least, is my experience so far....

October 10, 2014 4:36 AM
 

jamiet said:

Hi Davide,

Interesting to hear that. I will say this, the nature of the work that we are doing in the map phase is very compute-intensive (lots of transforming of the data which is dynamic in nature (i.e. it depends on metadata that is fed in at execution time)) so don't read too much into the "10m to do the map phase statistic". The significant point here is that we were able to speed it up by scaling out.

Also, even on the 20node cluster with 20 input files we still only get 2 reduce tasks (didn't do any tuning of that stuff) so there is lots of scope for improvement there too.

Nevertheless, its not good to read you saying that you're disappointed with the pert. We are investing in a process to spin up a cluster when the files are available and then spin it down again once processing is complete, hence we have to find the balance between speed and cost, more nodes will cost more but to what degree is that offset by having the cluster available for less time? Its an interesting conundrum.

Regards

Jamie

October 10, 2014 4:44 AM
 

Davide Mauri said:

Hi Jamie

we're doing something similar...using HDInsight to process JSON data that is stored on Azure Storage, and we'd like to spin it up and down on request. Basicaly we'd like to use it as a complex ETL engine.

We don't have any complex transformation so far: we just need to extract some data from the JSON document in order to allow the end user to do some Self-Service BI.

I was also interested into testing HBase for this purpose....but give the results I'm not sure to proceed in that way. Are you happy with HBase performances? Have you tried to connect to it via Excel / PowerQuery / PowerPivot?

October 10, 2014 5:21 AM
 

jamiet said:

Re HBase perf, yes and no. We get good response times (occasionally in single digit milliseconds, usually between 20ms-50ms) however we've noticed a problem where if you don't hit the HBase endpoint for a period of time (e.g. 30seconds) then that next request falls off the cliff (i.e. >3000ms), all subsequent requests go back to double-digit perf until the next pause in querying. I've got a video that easily demos this: http://1drv.ms/1s0d5bB

This is a big problem for us because our querying pattern will be infrequent rather than constant.

We've spoken to Microsoft about it and they think the problem lies in the http gateway that all requests go through - it was built for batch, not interactive requests. They are currently looking into it for us.

How are you spinning up the cluster? We are using Azure Automation, hence I spend my life in PowerShell at the moment.

JT

October 10, 2014 5:43 AM
 

Davide Mauri said:

Hi Jamie, ok I'll give HBase a try :)

Right now we're using PowerShell too....but I'd like to have a script component in SSIS that does it.

October 10, 2014 5:52 AM
 

jamiet said:

Interesting. We're deliberately not using SSIS anywhere in this. Trying to avoid using stuff that ties us managing actual machines (be they physical or virtual)

October 10, 2014 5:55 AM
 

Davide Mauri said:

One of our requirements is to have a relational Data Warehouse / Data Mart....and I'm glad the customer asked for it :)So we use HDI to squeeze the big data, and we store the juice in the good old relational format (but using SQL 2014 and the columnstore clustered index)

October 10, 2014 6:03 AM
 

jerome said:

Pretty slow for a system supposed to be able to handle TB and PB of data.

its far away from the 1TB loaded in 30Min with SSIS and SQL.

will you test the performance on doing transformations?

scanning data to cleanup things like bad birth dates (month / day inversed) or names which are near the same (John = Jhon) etc...

as we talk about "big data" and "unstructured data", these data quality issues are important to be able to consume the data effectively...

October 10, 2014 6:52 AM
 

jamiet said:

Hi Jerome,

Thanks for the comment.

I made the point, perhaps not reinforced enough, that the work being done was not important here. Comparing with the SSIS stat that you quote is irrelevant as they're not doing the same thing, its not an apples-to-apples comparison.

We do have lots more formal performance testing to do. This was a more informal test just to see if scale-out would benefit us.

Regards

Jamie

October 10, 2014 7:07 AM

Leave a Comment

(required) 
(required) 
Submit

This Blog

Syndication

Privacy Statement