Sunday, August 26, 2012

Traversing or walking directories with Scala

Traversing or walking through directories to find all files is probably a topic on which is already often written about. Nevertheless, I will add my on thoughts about it.
I have seen quite some implementations for this, even in Scala, but still I thought I could make it even simpler, and also support parallelism although I don't really know whether that really speeds up things, since you're probably traversing on a single disk and the disk head can only be at one place at the time.

Even though, here's my solution:
class GenFileList(val files:GenSeq[File]) {
  import FileList.file2FileList
  private def fileOrSubDir: (File) => GenSeq[File] = f => if (f.isFile) GenSeq(f) else f.walk

  def walk:GenSeq[File] = {

  def walkWithFilter(fileFilter: File => Boolean):GenSeq[File] = {

class ParFileList(override val files:ParSeq[File]) extends GenFileList(files)

class FileList(override val files:List[File]) extends GenFileList(files) {
  def par = new ParFileList(files.par)

object FileList {
  def apply(dir:File) = new FileList(dir.listFiles().toList)

  implicit def file2FileList(dir:File):FileList = FileList(dir)

Using it is very simple: it provides an implicit conversion of your into a FileList, so you can just do this:
import FileList.file2FileList
val dir = new File(".")

println("Traversing without a filter:")

println("Traverse using a filter:")
The 'walk' just returns a GenSeq[File] so you can do any collection operation on it to do something with the found files.

To traverse in parallel, just add 'par':
import FileList.file2FileList
val dir = new File(".")

println("Traversing parallel without a filter:")

println("Traversing parallel with a filter:")
Pretty neat if I may say so.

Friday, June 22, 2012

Deploying Flex mobile apps on a iOS device: the easy way

On the last Devoxx conference I heard about using Flex for mobile apps. This week I finally made some time to look into it and build my first app with Flex. It has been 5 years since I developed with Flex, so it took a bit of time to get into it again, but still I'm amazed with the speed with which you can develop an app. Last year I've build 2 iPhone app in iOS. With Flex developping is just as fast, but at the same time you also get an Android and BlackBerry version of your app.

I'm amazed that all Adobe and Flex Mobile on iOS documentation talk about deploying the Flex app using iTunes while there is a way which is so much easier. In stead of iTunes, which requires syncing, use Apple's iPhone Configuration Utility (available for Mac and Win). It's weird you have to install a music library to deploy application, right?

To build the app, follow the Adobe guidelines to prepare to build, debug and deploy an iOS application. When you have you IPA file in the bin-debug folder...

1. Connection your Apple iOS device to your development computer.

2. Launch iPhone Configuration Utility.

Note: You can also use iPhone Configuration Utility to find the Unique Device Identifier (UDID) of your iOS device. No need to use or install iTunes for anything.

3. If you haven't deployed the mobile provisioning profile to your phone yet:
  1. Either drag and drop your provisioning profile into the utility or utility icon in the dock, or click Provisioning Profiles and then Add (left top) to select your profile.
  2. Then, select your iOS device > Provisioning Profiles tab.
  3. Click on 'Install' for the provisioning profile for your app.
4. To install your application on the iOS device:
  1. Drag and drop your IPA file into the utility or click Add (left top) to select the IPA to install. When asked to replace the app, select 'Yes'.
  2. Select your iOS device > Applications tab.
  3. If you have installed the app before, first uninstall the app.
  4. Then install the app (again).
And you're done. The app is installed and you can run or debug the app on the iOS device.
Basically when your developing you do the setup once and then only build the app via the run configuration and perform step 4 to deploy it.
To me, this is much easier then using iTunes. At least no problems because the iTunes at work is not the iTunes you normally sync with and all your photos and music gets removed etc.

Wednesday, June 20, 2012

MapReduce XML processing (new api)

For a while now I'm fascinated by stories about Hadoop, HBase, Hive, etc. I attended many presentations at several conferences. So it's about time I started to learn more about it so I can play with it myself. Because I'm also into Scala nowadays, I also want to look into how Scala can make developing Hadoop/MapReduce applications easier and more fun. 
So, as to create a logbook for myself, I'm writing a series of posts about my Hadoop with Scala adventure. Previous posts were about setting up a Hadoop cluster using Puppetsetting up a Hadoop cluster using Cloudera's Free Manager, howto develop (and debug) MapReduce jobs in the IDE and Hadoop Streaming with Scala scripts. This post is about processing XML with MapReduce. In next posts I will write more about my Hadoop and Scala adventure and, hopefully, eventually ending up with using Scala DSL's like ScoobiScalding and the Scala MapReduce framework Spark.

To be able to play with Hadoop, I downloaded a Wikipedia dump so I had some data to do some map/reduce jobs on. Hadoop has a StreamXmlRecordReader but this uses the old API and I wanted to use and learn about the new API. While Googleing about this subject, most seem to use the Mahout xml import. I, however, chose to write my own.

In Hadoop, to write your own reader/parser, you have to create an InputFormat implementation that returns a RecordReader. The RecordReader is responsible for reading data from the InputSplit and provide the next Key/Value for the Mapper to process. Simplist is to just extend the FileInputFormat and override the createRecordReader method to return your own RecordReader implementation.
public class XmlPartInputFormat extends FileInputFormat<Text, Text> {

    public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        RecordReader<Text, Text> recordReader = new XmlPartRecorderReader(new XmlPartReaderFactoryImpl());
//        recordReader.initialize(inputSplit, taskAttemptContext);
        return recordReader;
In the new API, initialization is not needed anymore because the method initialize will still be called by Hadoop before the reader is used the first time.

The XmlPartRecordReader implementation is pretty straight forward. It returns the key and values as long there are ones available and it's able to report the progress using the start, end and current position.
The hard work is done by a XmlPartReader implementation. XmlPartReader is just a small interface which allowed my to easily create and test different xml stream parsing implementations.

XmlPartReader using StAX
My first XmlPartReader implementation used StAX to read the xml. The reader doesn't read the whole xml in one time, but during hasNext it tries to find the next start-tag. When found, the stream is positioned just past this start-tag.
Method getNextXmlPart reads the stream until the end-tag is found and returns the whole xml part
from start- to end-tag. This is implemented via a XMLEventReader which uses a custom EventFilter which denies everything until the start-tag is found and accepts everything until the end-tag is found.

The reader performed pretty well. The problem was however that an InputSplit might be somewhere in the middle of the file and the reader must be able to read from that position on. With StAX it is not possible to just position the stream somewhere and start reading. StAX will treat the first tag it reads as the root tag and will throw an exception when reading past the end tag. For example, if in the Wikipedia xml, it reads the <title> as it's first tag, StAX will throw an exception when reading past the </title> tag. At first I was not able to find a solution for this and I started on my second XmlPartReader implementation. Later I thought of a solution by just reading through the xml until the Event location is past the start position. The XmlPartReader will read from the stream from there on.

XmlPartReader 2
The second XmlPartReader implementation simply reads the xml from an InputStream character by character. During hasNext it tries to find a sequence of characters which match the wanted start-tag. Method getNextXmlPart reads the stream until a sequence is found which matches the end-tag. Since it uses a basic InputStream, the stream can easily be positioned by skipping bytes to the wanted start position.

I tried to compare the performance of both XmlPartReader's. Although I had expected the second implementation to be the fastest, the implementation using StAX was a bit faster. I think my xml parsing implementation is pretty lean and mean so I suspect the StAX implementation is faster because it uses a better buffer size. I tried a couple of buffer sizes, but was not able to beat the StAX reader yet. Although the first XmlPartReader now also supports a start position > 0, I do expect the second implementation to be faster with large xml files and many splits since it can just skip to the start position and the StAX implementation has to read through the xml and compare the tag position with the wanted start position.

Although I created this xml processing to read the Wikipedia XML, they can be used for any xml. The Readers can read any xml part from a stream. They can be used to read the whole <page> section, be can also just iterate over the <title> sections (see the test cases). Just specify the tag of the xml part you wish to process. It defaults to 'page', but can be overridden using the property 'xml.reader.tag'.
In the XmlPartInputFormat, other XmlPartReader implementation can easily be used by providing another XmlPartReaderFactory implementation.

Using it
To use this xml processing, in your Map/Reduce driver set the InputFormatClass to the XmlPartInputFormat.class (see WordCounterDriver). The Mapper receives a Text key which is the name of the searched tag and a Text value which is the xml part including the start- and end-tag of the searched tag.
In the Mapper you could use XPath to get the part of the xml your interested in. To get the title from a Wikipedia page:
    private static XPath xpath = XPathFactory.newInstance().newXPath();
    String title = (String) xpath.evaluate(".//title", inputSourceForValue(value), XPathConstants.STRING);

    private InputSource inputSourceForValue(Text value) {
        return new InputSource(new StringReader(value.toString()));
(see also WordCounterMapper)

Sources are available in a Git repository at :
It also contains all test classes and also a Scala implementation of XmlPartReader.

Friday, May 11, 2012

Functional Collections in Java

The one disadvantage of Scala is, once you get used to Scala, going back to Java is so annoying. Java lacks so many features which feel so natural in Scala.
Yesterday, while studying MapReduce, I was implementing a simple Reduce class which just returned the  maximum of the given values. Simple enough you think, but being used to Scala now, what I actually wanted to do was just values.max, and not implement a for-loop.
The closed thing we got in Java which looks a bit like functional programming in the Google Guave library. So I browsed through it's API but also couldn't find a max method or a method I could pass a function to to calculate the max.
So I then thought, well, if I just sort the collection, then it will be easy to get the highest value. And Guave's Ordering class also has a max method. So I used this to get the max value from an Iterable:
IntWritable max = Ordering.natural().max(values);
As a good TDD practitioner I wrote unittests for the Reducer and the MapperReducer, using MRUnit, to verify this method works. And it did.
Then I wrote a Driver so I could launch the job from the command-line. How big was my surprise when I saw that the result from running the job on the command-line was different then from the test !?! Wtf? I quickly added a println statement in the Reducer to see the maximum value. The max value calculated when running the tests was ok, but when running the Driver, which uses the exact same Mapper and Reducer classes as the unittests, the maximum value was wrong. It seemed that the natural ordering was different.

Instead of diving deep into Google Guave code to find the problem, I just threw away the code, but I still didn't wanted to write a for-loop (call me stubborn :-). How would I do this in Scala? The simplist solution is of course the max function, but I could also be done using the foldLeft function. Does Google Guave provide a foldLeft function? No! (why not) Well, then I just create one myself. I could sleep last night, so let's have some fun :-)
Because this implementation is obviously better then Java's Collections class and Guave's Collections2 class, I called it Collections3. I started with the foldLeft function and I create a sum and concat test to test the foldLeft function.
public class Collections3Test {

    List<Integer> input = newArrayList(1, 3, 5, 2, 10, 4);

    public void foldLeftSum() {
        Integer result = foldLeft(input, 0, new Function2<Integer, Integer>() {
            public Integer apply(Integer total, Integer value) {
                return total + value;
        assertThat(result, is(25));

    public void foldLeftConcat() {
        String result = foldLeft(input, "", new Function2() {
            public String apply(String s, Integer integer) {
                return s + integer.toString();
        assertThat(result, is("1352104"));
Then I implemented the function itself.
public class Collections3 {

    public static <A,B> B foldLeft(final Iterable<A> list, final B b, final Function2<A,B> f) {
        B result = b;
        for(A a : list) {
            result = f.apply(result, a);
        return result;

public interface Function2<A, B> {
    B apply(B b, A a);
I remembered from the Scala course that the foldLeft function can basically be used for almost any function, so I also added a map and filter function which use the foldLeft.
These are the testcases:
    public void applyMap() {
        List<String> result = map(input, new Function1<Integer, String>() {
            public String apply(Integer integer) {
                return integer.toString();
        assertThat(result, equalTo((List<String>)newArrayList("1", "3", "5", "2", "10", "4")));

    public void applyFilter() {
        List<Integer> result = filter(input, new Function1<java.lang.Integer, java.lang.Boolean>() {
            public Boolean apply(Integer integer) {
                return integer > 5;
        assertThat(result, equalTo((List<Integer>)newArrayList(10)));

Here you see another disadvantage of Java, it's limited Generics system. The newArrayList has to be cast to a List of Integer's to satisfy Java's compiler. Scala's Generics system is much more advanced and this would not have been necessary.
Anyway, here is the implementation of map and filter :

    public static <A,B> List<B> map(final Iterable<A> list, final Function1<A, B> f) {
        List<B> result = newArrayList();
        result = foldLeft(list, result, new Function2<A, List<B>>() {
            public List<B> apply(List<B> bs, A a) {
                return bs;
        return result;

    public static <A> List<A> filter(final Iterable<A> list, final Function1<A, Boolean> f) {
        List<A> result = newArrayList();
        result = foldLeft(list, result, new Function2<A, List<A>>() {
            public List<A> apply(List<A> as, A a) {
                if(f.apply(a)) {
                return as;
        return result;

So, now with all these functional methods available for I implemented a max function in my Reducer using the foldLeft method. To keep the code clean, I created a seperate method max and a constant MAX which holds the max function.
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int max = max(values);
        context.write(key, new IntWritable(max));

    private int max(Iterable<IntWritable> values) {
        return Collections3.foldLeft(values, 0, MAX);

    public static final Function2<IntWritable,Integer> MAX = new Function2<IntWritable, Integer>() {
        public Integer apply(Integer max, IntWritable value) {
            return Math.max(max, value.get());

Ok, it's more lines of code then writing a simple for-loop to calculate the max value, but you have to admit, this is way more fun. :-D

The full source of Collections3 and Collections3Test is available in my hadoop-jobs git repository:

Tuesday, May 8, 2012

Hadoop streaming with Scala scripts

For a while now I'm fascinated by stories about Hadoop, HBase, Hive, etc. I attended many presentations at several conferences. So it's about time I started to learn more about it so I can play with it myself. Because I'm also into Scala nowadays, I also want to look into how Scala can make developing Hadoop/MapReduce applications easier and more fun. 
So, as to create a logbook for myself, I'm going to write a series of posts about my Hadoop with Scala adventure. Previous posts were about setting up a Hadoop cluster using Puppetsetting up a Hadoop cluster using Cloudera's Free Manager and howto develop (and debug) MapReduce jobs in the IDE. In next posts I will write more about my Hadoop and Scala adventure and, hopefully, eventually ending up with using Scala DSL's like ScoobiScalding and the Scala MapReduce framework Spark.

Note: all source in this blogpost are available in my hadoop-jobs git repository on:

Streaming with Scala scripts
I was reading the Hadoop Streaming chapter in Tom White's Hadoop: The Definitive Guide (3rd edition preview) and saw the samples in Ruby and Python. Since I'm into Scala nowadays I thought, hé, this should also be possible with Scala script.

And it was. There were just 2 things I had to figure out: 1) how to create a Scala script file and 2) how to read data from stdin with Scala.

Scala script
A Scala script is a normal script file, with executable rights, but has to start with:
  exec scala "$0" "$@"
the rest of the file can contain any Scala code. You could add an executable object but this is not necessary. Lines are just evaluates sequentially.

Reading from stdin
There are multiple ways to read from stdin. I preferred to use this one:
for(ln <- io.Source.stdin.getLines) {

Max Temperature example in Scala scripts
In the Hadoop Streaming chapter Tom White's uses a Max Temperature example using NCDC data. Here is my implementation of that same example in Scala script:

sample.txt (input for job)

Input data; NCDC samples of temperature measurement which Tom uses for his Max Temperature examples. More sample data is available via the books code and data github.
The mapper reads the year, temperature and quality of the measurement from the standard input and outputs the year and temperature when the quality is any of 0,1,4,5,9. With streaming, in the output key and value must be separated by a tab.
  exec scala "$0" "$@"

val validQualities = """[01459]""".r

for(ln <- io.Source.stdin.getLines) {
  val year = ln.substring(15, 19)
  val temp = ln.substring(87, 92)
  val quality = ""+ln(92)

  validQualities findFirstIn quality match {
    case Some(_) => println("%s\t%s" format (year, temp))
    case None =>

The reducer gets all temperatures and determines the maximum temperature for a year. The shuffling of the mapper output makes sure all same keys (=years) are grouped together. The input is the same as the mapper output: "<year>\t<temperature>".
The temperature can be +0010 or -0011. When temperature is above 0, the leading '+' has to be removed because it otherwise gives parse exceptions when changing it into an Int value.
  exec scala "$0" "$@"

val Input = """(\d+)\t([+-]\d+)""".r
var max = Int.MinValue
var lastKey: String = _

def out(key: String, temp: Int) = println("%s\t%s" format(key, temp))

for (ln <- io.Source.stdin.getLines) {

  def parse(s: String) = if (s.startsWith("+")) s.substring(1).toInt else s.toInt

  ln match {
    case Input(k, v) => {
      val temp = parse(v)

      max = if (lastKey != null && lastKey != k) {
        out(lastKey, max)
      } else {
        if (temp > max) temp else max
      lastKey = k
    case _ =>
out(lastKey, max)
Running the job
Hadoop default does not support streaming, so when starting the streaming sample you need to include the hadoop-streaming jar.
HADOOP_HOME should point to your Hadoop installation directory and HADOOP_HOME/bin in your PATH.
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3.jar -input sample.txt -output out -mapper Mapper.scala -reducer Reducer.scala

Here's a Maven pom for using both (Cloudera) Hadoop and Scala in a single project:
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns=""



    <name>Hadoop Job</name>


            <name>Scala Tools Maven2 @ Sonatype</name>

             Add project dependencies here. The archetype plugin will
             include all dependencies from the "compile" scope (which is
         the default) in the job jar's "lib" directory.


Howto develop (and debug) MapReduce jobs in the IDE

For a while now I'm fascinated by stories about Hadoop, HBase, Hive, etc. I attended many presentations at several conferences. So it's about time I started to learn more about it so I can play with it myself. Because I'm also into Scala nowadays, I also want to look into how Scala can make developing Hadoop/MapReduce applications easier and more fun. 
So, as to create a logbook for myself, I'm going to write a series of posts about my Hadoop with Scala adventure. My first posts were about setting up a Hadoop cluster using Puppet and setting up a Hadoop cluster using Cloudera's Free Manager. In this post I'll write about howto develop (and debug) MapReduce jobs in the IDE. Next posts will be about Hadoop Streaming with Scala scripts, and, hopefully, eventually ending up with using Scala DSL's like ScoobiScalding and the Scala MapReduce framework Spark.

It struck me that in all tutorials I've read, after setting up a Hadoop/MapReduce environment, the writer immediately shows some code examples. None of the tutorials talk about creating a MapReduce project or setting up the IDE. Tom White mentions it briefly in his Hadoop: The Definitive Guide book in Configuring the Development Environment.
Ideally, I would like to just click on a button and whala there's a whole project ready to start development.

For Eclipse there is an Eclipse plugin. The problem with this plugin seems to be, that it's contibuted code and it is not maintained at the same pace as or by the same people who develop Hadoop. The consequence is that the available binaries are old/obsolete; don't support the latest Hadoop or Eclipse versions. And the plugin is not compatible with Cloudera's Hadoop distributions. This has led to several blogposts which describe howto compile the plugin yourself for your Hadoop or Eclipse versions or Cloudera distribution.

There don't seem to be any plugin or Hadoop support for IntelliJ at this moment.

I found a post of Matthias Friedrich who created an archetype for easily creating a Hadoop project. This is an excellent base to start your project from.
mvn archetype:generate 
To use this with Cloudera distribution you need to add the Cloudera Maven repository and change the hadoop version to the Cloudera version you're using

Running a MapReduce job
So, all in all no really useable support for IDE's is available. Fortunately you don't really need it. Just create a Maven project and import the pom as new project. This sets up all dependencies and you're then ready to start developing your own mappers and reducers.
To run your job in the IDE, create an executable class and run it. Here's an example:

public class MaxTemperature {

    public static void main(String[] args) throws Exception {
        if(args.length < 2) {
            System.err.println("Usage: MaxTemperature <input path> <output path>");

        Job job = new Job();
        job.setJobName("Max Temperature");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

//        job.setCombinerClass(MaxTemperatureReducer.class);


        System.exit(job.waitForCompletion(true) ? 0 : 1);

ps. when using the suggested Maven project, you need to remove the 'provided' scope from the hadoop dependency otherwise hadoop is not on the runnable path.
In your IDE create a run configuration for the class to which you add the necessary arguments. The output of a job is written to the output path. A job will not overwrite existing files, so to be able to run the job multiple time, you need to clear the delete the previous output, or change the output path.

There are multiple ways to do this. I didn't want to do it in Java code, so I chose to create a simple Ant script for this and adjusted the run configuration to run this Ant task before launch.
(IntelliJ has the option to run a Maven goal before launch but this is run after 'make'. The 'clean' goal clears the 'target' directory so then your compiled classes are gone and there is nothing to launch left.)
<?xml version="1.0"?>

<project name="Hadoop-jobs" default="clear-job-output">

    <target name="clear-job-output">
        <echo message="Delete: ${basedir}/target/output"/>
        <delete dir="${basedir}/target/output" verbose="true"/>

Sunday, May 6, 2012

Setting up a Hadoop cluster using Cloudera Manager

For a while now I'm fascinated by stories about Hadoop, HBase, Hive, etc. I attended many presentations at several conferences. So it's about time I started to learn more about it so I can play with it myself. Because I'm also into Scala nowadays, I also want to look into how Scala can make developing Hadoop/MapReduce applications easier and more fun. 
So, as to create a logbook for myself, I'm going to write a series of posts about my Hadoop with Scala adventure. My first post was about setting up a Hadoop cluster using Puppet. In this post I'll describe  setting up a Hadoop cluster using Cloudera's Free Manager. Next posts will be about How to develop (and debug) MapReduce jobs in the IDE, Hadoop Streaming with Scala scripts, and, hopefully, eventually ending up with using Scala DSL's like ScoobiScalding and the Scala MapReduce framework Spark.

Cloudera is a small company in the Sillicon Valley, employs many former employees of Google, Yahoo, Facebook, Twitter, etc, so they have an incredible amount of knowledge about the whole Hadoop Eco-system. They contribute to Hadoop, give courses, and have lots of public resources like tutorials and training video's for those, like me, who want to learn about Hadoop.
Cloudera also has a product called Cloudera Manager with which you can easily setup a Hadoop environment. The free edition can manage up to 50 nodes, so more than enough for me to play with.

Installing Cloudera Manager
Installation of Cloudera Manager is very easy. After downloading the binary, the excellent installation guide helps you through the installation process. This time I chose to use CentOS 5.8 for the cluster hosts since Debian, which I used for my previous Hadoop cluster,  is not one of the supported operating systems. To create a new basic VM, I downloaded the CentOS 5.8 Network Install ISO and chose a basic server install, no additional applications or options. After installation I booted the VM and ran yum update to get all latest packages and disabled SELinux (is a requirement). Then I created a template of this VM so I could use it for all other cluster nodes.
To disable SELinux:
- edit /etc/selinux/conf
- Change 'SELINUX=enforcing' to 'SELINUX=disabled'
- Reboot
Download the Cloudera Manager on, or copy it to, the VM, and run it. It has an installation wizzard which helps you through the installation process and installs all required software, Oracle JDK, ProgreSQL, etc.

After installation, you can connect to the manager webgui via http://<ip>:7180 (uid:pwd = admin:admin).
For me this didn't work straight away because the systems iptables was preventing access to on port, so I had to enabled it.
To open port 7180, you can either disable iptables altogether
/etc/init.d/iptables stop
or open up the port:
iptables -I INPUT -m state --state NEW -p tcp --dport 7180 -j ACCEPT

(And while your at it, also open ports 7182 and 9001, both used by agents to connect to the manager. You'll need it later on.)

To save your new iptable settings:
/sbin/services iptables save
You can view all your iptables settings in /etc/sysconfig/iptables.

Setting up nodes
When you connect to the manager's webgui, you'll be taken through the process of adding node hosts to your cluster. The manager will take care of installing all required software.
Using the CentOS VM template I create before, I created 5 new node VM's; cdhnode1 to 5.
Cloudera Manager requires proper name resolution for all hosts, so for the VM's network connectivity I chose bridged network this time instead of shared network, and
- added the MAC addresses of the VM nodes to my DHCP server so the nodes would have a static ip
- and added those ip's to my local DNS server for proper ip <--> name resolution.

Before starting the node installation, for each node, setup the correct hostname and /etc/hosts file.
Hostname: on CentOS, change it in /etc/sysconfig/network.
/etc/hosts must contain at least 2 lines containing localhost and your hostname:        localhost    cdhnode5.diversit.local cdhnode5
To add the hosts to the cluster, in the bottom textbox, add all node ip's or an ip range pattern like 192.168.1.[221-225]. Then click 'Find Hosts'.
Check all the hosts you want to add to the cluster and click 'Continue'.
The Cloudera Manager needs a way to connect to all hosts and perform the installation. Easiest is to provide the root password, which must be equal for all hosts!! Providing an account with password-less sudo rights is also possible or use a public/private key pair.
Click on 'Start Installation' to begin installing CDH on the cluster hosts.

You can follow the installation process on a hosts by looking at the /tmp/scm_prepare_node.*/scm_prepare_node.log.

For more default about the manager and hosts installation, see the installation guide.

Starting services
After the software has been installed on all hosts and the hosts have been added to the cluster you can start your required services.
I just started HDFS, which is always required, and the MapReduce service. Adding a service is pretty straight forward using the manager.

Connecting to services
Via the Cloudera Manager webgui you can generate the client configuration needed to connect to your CDH cluster. On your client, you must run the same software version as on your hosts. Cloudera uses a specific Cloudera build. These packages can also be download from the Cloudera Download page.

-Put the right package on your client.
-Add the client configuration and you're done.
Happy Hadooping ;-)

Problems encountered
Error during installation of hosts software
At first, some nodes failed during installation. In /tmp/scm_prepare_node.*/scm_prepare_node.log you can then see the manager is rolling back the installation of all software (look for 'rollback started'). It was unclear why this happend. Opening up the ports 7182 and 9001 on the cloudera manager instance solved this.

Error starting service
I also had problems starting services on some hosts. This was caused by not having the localhost line in my /etc/hosts file. I had read somewhere that that line had to be removed, but apparently that was wrong. Thanks to Cloudera support for the quick reply and solution.

Friday, May 4, 2012

Setting up a Hadoop cluster using Puppet

For a while now I'm fascinated by stories about Hadoop, HBase, Hive, etc. I attended many presentations at several conferences. So it's about time I started to learn more about it so I can play with it myself. Because I'm also into Scala nowadays, I also want to look into how Scala can make developing Hadoop/MapReduce applications easier and more fun.
So, as to create a logbook for myself, I'm going to write a series of posts about my Hadoop with Scala adventure. This first post will be about setting up a Hadoop cluster using Puppet. Next posts will be about setting up a cluster using Cloudera's Free Manager, Howto develop (and debug) MapReduce jobs in the IDE, Hadoop Streaming with Scala scripts, and, hopefully, eventually ending up with using Scala DSL's like Scoobi, Scalding and the Scala MapReduce framework Spark.

My first step was to learn about Hadoop and setup a Hadoop cluster. On my dual quad core MacPro with 22GB memory I should have plenty of resources for that, using Parallels Desktop as virtualization platform. I could have just created a VM with all the software and then clone it multiple times, but I choose to use (and learn) Puppet so after the initial Hadoop cluster setup, I could also easily roll out other software like HBase, Hive, Pig, etc. And during the setup, Puppet also proved to be a handy tools to push other configuration settings to the VM's like the /etc/hosts file and ntpd.

The puppet master and puppet agent configuration are available in a Git repository as well as the Hadoop and Java modules. See Resources down below.

Creating the VM
Before I started with the cluster, I went through the Single node and Pseudo Distribution setup to get a bit familiar with Hadoop. After that, I created a basic VM using a Debian 6 network install. (At several steps during the whole cluster setup, I created snapshots of the VM, a feature of Parallels Desktop, so I could easily go back if I messed up something.)

Installing puppet
Next step was to install Puppet. First the puppet master. I cloned the Debian VM to create a seperate PuppetMaster instance. Since Puppet was new to me, I had to read about how to install Puppet and read several other resources about using and configuring Puppet (with Hadoop).

- Add squeeze-backports debian library to /etc/apt/sources.list to be able to install the latest Puppet version.
 (deb squeeze-backports main)
- apt-get update
- Install Puppet :
  apt-get -t squeeze-backports install puppetmaster facter
  To search for and look at packages use :
  aptitude [-t squeeze-backports] search|show puppetmaster|facter )

The puppetmaster will be started after the install.

Puppet agents
For the Hadoop nodes, I again cloned the basic Debian VM. These will be the Puppet agents. Installation of Puppet is as described above, but instead of 'puppetmaster', install 'puppet'.

apt-get -t squeeze-backports install puppet facter
At this point, I created a template of this VM which I could then use to create new Hadoop nodes. See Adding more Hadoop Nodes.

- Add the ip of the puppetmaster in /etc/hosts. Default the agent will look for a host named 'puppet'
- Set the hostname of the node with a FQDN : hostname <host>.<domain>.
  I used hostname 'hadoop-01.diversit.local'
  On Debian, also set the hostname in /etc/hostname so it survives a reboot. On CentOS, it's in /etc/sysconfig/network.

The default agents configuration is sufficient. You might want to add pluginsync=true to the [main] section. On both the puppet master and puppet agent, the configuration is in the /etc/puppet directory.

Now run : puppetd -vt
The agent will now create a certificate using the hostname and will try to contact the puppetmaster.

The agent certificate has to be signed by the puppetmaster before the agent will receive any updates. While the agent is running, check on the puppetmaster if the sign request has already been received.
To view the certificate requests : puppetca -la
To sign the certificate : puppetca -sign <agent-name>

Puppet modules
Now both the puppet master and puppet agents are configured, it's time to add some useful modules on the puppet master to sync to the agents. To get familiar with Puppet, I first created a simple module to synchronize the /etc/hosts file.

mkdir /etc/puppet/modules/hosts
mkdir /etc/puppet/modules/hosts/files
mkdir /etc/puppet/modules/hosts/manifests

Put the hosts file you want to push to the agents, which will be Hadoop nodes, in the files directory.
Create an init.pp file in the manifest directory. The name of the class must be equal to the module name, in this case hosts.
class hosts {
  file { "/etc/hosts":
    owner => root,
    group => root,
    mode  => 775,
    source => "puppet:///hosts/hosts"
This simple configuration will copy the file hosts in module hosts on the puppet master to /etc/hosts on the puppet agent.
File serving must be enabled on the puppet master to be able to copy files to agents. Edit the /etc/puppet/fileserver.conf and add the ip's or network of agents which are allowed to get files.

Last thing to do, is to tell puppet master which modules should be synced with which agents.
Create the file /etc/puppet/manifests/site.pp
import "nodes"
Then create /etc/puppet/manifests/nodes.pp
node basenode {
  include hosts

node 'hadoop-01' inherits basenode {
This file contains the information which modules are synced to which agents. As you can see, a node can inherit from other nodes. Here I created a basenode which can then be inherited by all agent nodes.

Now run puppetd -vt on the client to see if the hosts file is synchronized.
This is the basics of puppet modules. All modules are a variation on this. Of course puppet has many more features. For there, check out the puppet documentation or checkout Puppet Forge which is a repository of public puppet modules.

Java Module
We need Java to be able to run Hadoop. Oracle changed the license for Java and therefore it is not possible anymore to install Java as a package. So let's create a puppet module to distribute Java to the nodes.
I based my Java module on Brian Carpio's module in Puppet Forge. His module didn't work on Debian, so I had to tweak it a little bit.

- I downloaded his java module, put it in /etc/puppet/modules.
- The JDK package in /etc/puppet/modules/java/files didn't run on Debian, so I replaced it with a Linux x64  package downloaded from Oracle. I renamed the Oracle package to jdk1.7.0_03.tar.gz.
- In /etc/puppet/modules/java/manifests/init.pp I had to add path => "/usr/bin:/usr/sbin:/bin", to the exec section otherwise Debian was not able to find the untar command.

Finally I added the java module to the basenode in /etc/puppet/manifests/nodes.pp and Java was be synced to the agents.

Hadoop Module
Because Brian's Java module worked pretty well, I also used his Hadoop module.
One thing you have to do is create ssh keys for the hduser. This user is going to be created on the nodes and this user will run Hadoop.
- Added a hduser on the puppet master and generate ssh keys for it.
 useradd -m hduser
 sudo - hduser
 ssh-keygen (empty passphrase)
- Copied the id_rsa and to /etc/puppet/modules/hadoop/files/ssh

I updated Hadoop to version 1.0.2.
- Download tar.gz from Hadoop site.
- Updated the version in /etc/puppet/modules/hadoop/manifests/params.pp

In this module I also had to add path => "/usr/bin:/usr/sbin:/bin",to the exec section.

Also in /etc/puppet/modules/hadoop/manifests/params.pp you need to put the fqdn of the master and all slave nodes. These values are used to generate the Hadoop configuration.

Add the hadoop module to the basenode in /etc/puppet/manifests/nodes.pp and Hadoop will be synced to the agents (if agent daemon is running). Run puppetd -vt to run the agent directly.

In my latest Hadoop module version I disabled the formatting of the namenode. This should only be done once and not everytime the Hadoop module is updated.

Starting Hadoop
I did all above steps for 4 nodes: 1 master and 3 slaves.
To start the Hadoop:
- Login on the master node
- Go to the Hadoop install directory (/opt/hadoop/hadoop)
- Run bin/ to start HDFS
- Run bin/ to start MapReduce
These scripts will connect to all slave nodes, using the hduser, and start the processes on the slaves.

You can now try to connect to the cluster from your client, typically your development machine. Make sure you have the same version of Hadoop locally as on the cluster.
Copy the conf/core-site.xml and conf/mapred-site.xml from your Hadoop master or one of your nodes to your client.

Test your setup. From your Hadoop install directory run : bin/hadoop fs -ls /
This should show you the root of the HDFS in the cluster and not your local filesystem.

Adding more Hadoop nodes
Now that Hadoop is running and with Puppet all setup, adding more nodes now is easy.
- Clone a VM using the Puppet Agent template I created earlier.
- Add the puppet master ip in /etc/hosts
- Set the correct hostname
- Run the puppet agent : puppetd -vt
- Sign the puppet agent certificate on the puppet master : puppetca --sign <agent-name>
- Add the new node to /etc/puppet/manifests/nodes.pp
- Add the new node to the slaves in /etc/puppet/modules/hadoop/manifests/params.pp
- Add the new node's ip in /etc/puppet/modules/hosts/files/hosts
- Run the puppet agent on the new node to install all software and configuration.

Your new Hadoop node should now be ready.
First format the HDFS on the new node. On the node run : bin/hadoop namenode -format
Then start the datanode and tasktracker on the node. The Hadoop master will automatically find the new node.
bin/ start datanode
bin/ start tasktracker

Verify on your Hadoop master that the new node(s) are available in the cluster.
Either by running bin/hadoop dfsadmin -report on the Hadoop master
Or open http://hadoopmaster:50070 in your browser.

Problems I encountered
Time out of sync
Even though the VM's are configured to get the time from my Mac, the time on the VM's was different sometimes. This causes problems with Puppet, because Puppet wants all times within 30 minutes of the puppet master.
To fix this I decided to install ntpd on all nodes so all time would be equal.
Since puppet was already in place, I create a simple ntp module to install and configure ntp on the nodes.

Automatically start puppet agents
Puppet agent not started automatically when the VM starts. After setup is complete, and you want to have the puppet agent running so changes are pushed automatically, edit /etc/default/puppet and change START=no into START=yes. Then use /etc/init.d/puppet start to start the agent.

Push changes to agents
Agent will default poll the puppet master every 30 minutes. If you have a change you want to push to the agents immediately, you have to connect to each agent and run 'puppetd -vt'.
Another option is to push changes to the agents. The agents must be configured to listen for 'push' events which they don't do by default.
In the /etc/puppet/puppet.conf, add 'listen=true' in the [main] section.
Create a /etc/puppet/auth.conf file to allow certain hosts to push changes.
# Allow puppet kick access
path /run
method save
auth any
allow *.hadoop.local
The agents should reload the new configuration automatically.
This has to be done on all puppet agents, so what the heck, let's put it all in a puppet module and let puppet do the work.
Since this is also just file copying like the /etc/hosts file, I renamed the hosts module to config and added the puppet agent configuration to it.

Getting Started with Puppet on OSX :
How to create a Puppet Module :
My Puppet Bookmarkts :
Brian Carpio Puppet Modules :

Git repositories:
Puppet master configuration (contains all modules except Hadoop and Java modules) :
Pupper agent configuration :
Puppet Java Module :
Puppet Hadoop Module :

Next post will be about setting up a Hadoop cluster using Cloudera's Free Manager.

Thursday, April 19, 2012

Improved testing messages send by Actor with Akka

Earlier this week I wrote a post about how to test the message send to an actor which is looked up by the actor under test. This week I attended the Scala Days in London and had a chat about this with Roland Kuhn of TypeSafe, who did a presentation about testing Akka. He suggested a better way to test this so TestKit's testActor is used and you can make use of the expected message assertions and 'within' timing options.
The solution he proposed is quite simple actually (why didn't I think of that?): just create a Forwarding Actor which forward all messages to the testActor and register it with the name used by the actor under test.

So here's how the test then looks like:

package testing

import org.scalatest.matchers.ShouldMatchers
import akka.testkit.TestKit
import org.scalatest.{FeatureSpec, BeforeAndAfterAll}
import{ActorRef, Props, ActorSystem, Actor}
import testing.ProperTestASendingMsgToB.{A, ForwardingActor}

object ProperTestASendingMsgToB {

  // The Actor to test
  class A extends Actor {
    val next = context.actorFor("/user/B")

    def receive = {
      case s: String => {
        println("Received by A: %s" format s)
        next ! s

  // A utility Actor which forwards everything to a target actor
  class ForwardingActor(targetActor: ActorRef) extends Actor {
    def receive = {
      case msg => targetActor ! msg

class ProperTestASendingMsgToB(system: ActorSystem) extends TestKit(system)
    with FeatureSpec with ShouldMatchers with BeforeAndAfterAll {

  def this() = this(ActorSystem("Test"))

  feature("'A' must send message to 'B'") {
    scenario("Test using TestKit's testActor") {

      import system._
      // register Forwarding Actor with name 'B'. Forward to testActor.
      actorOf(Props(new ForwardingActor(testActor)), "B")
      // create actor to test
      val actorA: ActorRef = actorOf(Props[A])

      import akka.util.duration._
      // validate message received within 100 millis
      within(100 millis) {
        // send the message
        actorA ! "some text"
        // verify message received by testActor
        expectMsg("some text")                   

Monday, April 16, 2012

Testing message send by Actor with Akka

Update: Talked to Roland Kuhn at Scala Days in London and he suggested a better way to test this. I created a new blogpost with the new solution.

Last week I've been diving into Akka. Before building my first Akka code, I wanted to know how to properly test Akka applications, so I can do proper TDD. The Akka documentation provides an excellent starting place. It provides some examples of how to test actors and the messages they send, but I noticed that all those test provide the target actor as a constructor argument to the actor-under-test.

I can't image you always want to pass target actors. When using an actor for a boundary, that would mean you have to carry that actor from front to back to pass it to the whole actor hierarchy. I image that's why they made the actor lookup in the first place. But when the actor-under-test does a lookup of it's target actor, how do you test that the target actor receives the correct message? I don't want to create a seperate constructor just for testing.

My first tries to use the testActor of TestKit failed. It's impossible to register the testActor under a custom name for testing. Also trying to use TestProbe and TestActorRef was not successful. You can't construct an instance of a custom actor and use that with TestActorRef.
Finally, after reading through the API, I found that you can create a TestActorRef and pass a custom name with it, just like you create a normal actor. Somehow the TestActorRef cannot find an implicit ActorSystem, even though the test extends TestKit, so an additional implicit assignment of ActorSystem was needed.

So, here's an example of how to test an actor which sends a message to a target actor which it requires via lookup:

package testing

import org.scalatest.matchers.ShouldMatchers
import org.scalatest.{BeforeAndAfterAll, FeatureSpec}
import{Props, ActorSystem, Actor}
import akka.testkit.{TestActorRef, TestKit}

class A extends Actor {
  val next = context.actorFor("/user/B")

  def receive = {
    case s: String => {
      println("Received by A: %s" format s)
      next ! s

 * An Actor which captures the message so it can be verified in test.
class CaptureMsgActor extends Actor {
  var capturedMessage = ""
  def receive = {
    case msg => {
      capturedMessage = msg.asInstanceOf[String]
      println("Received by B: %s" format msg)

class TestASendingMsgToB(system: ActorSystem) extends TestKit(system)
    with FeatureSpec with ShouldMatchers with BeforeAndAfterAll {

  def this() = this(ActorSystem("Test"))

  feature("'A' must send message to 'B'") {
    scenario("A message send to 'A' must be received by 'B'") {

      // implicit system needed for creating an TestActorRef using Props and name
      implicit val testSystem = this.system
      // create an TestActorRef of type CaptureMsgActor and register under name of "B" (which name is used by actor under test 'A')
      val actorB: TestActorRef[CaptureMsgActor] = TestActorRef(Props[CaptureMsgActor], name = "B")
      // create actor to test. Use TestActorRef so message is send synchronous.
      val actorToTest = TestActorRef[A]
      // send a message to the actor under test
      actorToTest ! "some text"
      // validate message captured
      actorB.underlyingActor.capturedMessage should equal("some text")

Getting started with Scala in Java projects

Tips for an optimal interoperability

Last Januari I wrote an article about using Scala in Java projects and the interoperability between Scala and Java. Unfortunately the publisher of the Dutch Java Magazine, in which it was going to be published, went bankrupt in Februari. So to not let my effects go to waist, I'm publishing it on my own site now.

Article intro:
It's been two years since Scala featured in the Dutch Java Magazine. Meanwhile Scala has become a mature language and is gaining more and more followers. Logical because Scala already has the features that are not expected in Java for another 2 or 3 years, if ever. These features not only result in less code,  but also provides better and more readable code. Many Java projects are large and have been around for many years. It is not realistic to expect that such projects will be reimplemented in Scala. That is not necessary, because Scala is very wellinto existing projects.

This article is about how to use Scala in Java projects, use Scala from Java and vise versa, how to use Scala with Maven, SBT, editors, etc. Read the whole article...

(This article was written in Dutch. I'm working on a translation in English.)

Wednesday, January 4, 2012

Hosting static sites using Play! and Heroku

I used to rent a virtual private server to host some blogs and static sites. Last year I hardly had any time to manage it and I also noticed a couple of times my server was not running. So, time to get rid of it and put everything in the cloud. The blogs were moved to Blogger, but what to do with my static sites?
I thought it would be fairly easy to create a Play! application for this and host it on Heroku. And ... it was!

Play! is a web framework with which you can very fast and easy build a web application, And Play! supports both Java and Scala!
Heroku is a Cloud Application Platform on which you can host your Python or Java application and they also have support for Play!.

Getting started
First, install Play! On a Mac, just install Homebrew and run:

brew install play

To create a new Play! application, run:

play new <appname>

This creates the whole appliction. Just start 'play run' and get rockin'.

Static sites location
Each static sites is put in it's own folder in the 'sites' folder in the root folder of the application. To be able to host static sites for multiple domain names, each folder has the fully qualified domain name of the site. E.g. ''

The application
The main application is in /app/controllers/ The default index method has been changed to accept both the domain name and the path. A File is constructed using these and then rendered binary so this also works for images and other binary files. If the file does not exist, a 404 - Not Found is returned.

public class Application extends Controller {

  private static final Logger LOGGER = Logger.getLogger(Application.class);

  public static final String SITES = "sites/";

  public static void index(String domainname, String path) {
    if("".equals(path)) {
      path = "/index.html";
    LOGGER.debug("Request for domainname:"+domainname+" and path:"+path);

    File f = new File(SITES+domainname+"/"+path);
    if(!f.exists()) {
      if(!"favicon.ico".equals(f.getName())) {"File not found: "+ f.getAbsolutePath());
    LOGGER.debug("Serving: "+f.getAbsolutePath());

Routes configuration
In Play! a routes configuration defines how a url is mapped to a application method. For this application all requests have to route to the 'Application.index' method. It's also possible to capture values from the url and map these to method parameters. The index method takes both the domain name and path, so the route must map these. The name of the variables in the route must match the parameter names in the Java code.

# Routes
# This file defines all application routes (Higher priority routes first)
# ~~~~

# Ignore favicon requests
GET     /favicon.ico                            404

# All request
GET  {site}/{<.*>path}                          Application.index

A special regex is used to match all paths since {path} would only match '/index.html', but not '/dir/other.html' or '/images/logos/company/logo.gif'. {<.*>path} will match anything to the 'path' parameter.

{site} matches the fully qualified domain name. It would also be possible to use '{site}' if you just wanted to match the subdomain name.

Note: even though this Virtual Hosts feature was introduced in Play! 1.1, it didn't work in Play! 1.2.3. So make sure you're using Play! 1.2.4 in which this feature does work. There is no need to install an additional module for this.

Deploying on Heroku
Deploying a Play! application on Heroku is extreemly easy. Details can be found on the Heroku Dev Center site, but it basically comes down to this:

# Setting up Heroku
- Create an account on Heroku
- Install Heroku app locally
- heroku login

# Create a Git repository
- git init
- git commit -am 'Init'

# Create a Heroku stack
- heroku create --stack cedar

# Deploy on Heroku
- git push heroku master

... and your done! Your application now runs in the cloud.

The next step is to add the 'Custom Domains' add-on to your application using the 'My Apps' administration of Heroku and add all the domain names of your static sites so Heroku will serve requests for these domain names.

The final step is to change your DNS settings and create a CNAME for each of your static sites and map these to the url of your application on Heroku.

The code
This code for this Play! app is available in this Git repository. If you're using this, you can skip the 'create a Git repository' step described in 'Deploying on Heroku'.