Friday, May 11, 2012

Functional Collections in Java

The one disadvantage of Scala is, once you get used to Scala, going back to Java is so annoying. Java lacks so many features which feel so natural in Scala.
Yesterday, while studying MapReduce, I was implementing a simple Reduce class which just returned the  maximum of the given values. Simple enough you think, but being used to Scala now, what I actually wanted to do was just values.max, and not implement a for-loop.
The closed thing we got in Java which looks a bit like functional programming in the Google Guave library. So I browsed through it's API but also couldn't find a max method or a method I could pass a function to to calculate the max.
So I then thought, well, if I just sort the collection, then it will be easy to get the highest value. And Guave's Ordering class also has a max method. So I used this to get the max value from an Iterable:
IntWritable max = Ordering.natural().max(values);
As a good TDD practitioner I wrote unittests for the Reducer and the MapperReducer, using MRUnit, to verify this method works. And it did.
Then I wrote a Driver so I could launch the job from the command-line. How big was my surprise when I saw that the result from running the job on the command-line was different then from the test !?! Wtf? I quickly added a println statement in the Reducer to see the maximum value. The max value calculated when running the tests was ok, but when running the Driver, which uses the exact same Mapper and Reducer classes as the unittests, the maximum value was wrong. It seemed that the natural ordering was different.

Instead of diving deep into Google Guave code to find the problem, I just threw away the code, but I still didn't wanted to write a for-loop (call me stubborn :-). How would I do this in Scala? The simplist solution is of course the max function, but I could also be done using the foldLeft function. Does Google Guave provide a foldLeft function? No! (why not) Well, then I just create one myself. I could sleep last night, so let's have some fun :-)
Because this implementation is obviously better then Java's Collections class and Guave's Collections2 class, I called it Collections3. I started with the foldLeft function and I create a sum and concat test to test the foldLeft function.
public class Collections3Test {

    List<Integer> input = newArrayList(1, 3, 5, 2, 10, 4);

    @Test
    public void foldLeftSum() {
        Integer result = foldLeft(input, 0, new Function2<Integer, Integer>() {
            @Override
            public Integer apply(Integer total, Integer value) {
                return total + value;
            }
        });
        assertThat(result, is(25));
    }

    @Test
    public void foldLeftConcat() {
        String result = foldLeft(input, "", new Function2() {
            @Override
            public String apply(String s, Integer integer) {
                return s + integer.toString();
            }
        });
        assertThat(result, is("1352104"));
    }
}
Then I implemented the function itself.
public class Collections3 {

    public static <A,B> B foldLeft(final Iterable<A> list, final B b, final Function2<A,B> f) {
        B result = b;
        for(A a : list) {
            result = f.apply(result, a);
        }
        return result;
    }
}

public interface Function2<A, B> {
    B apply(B b, A a);
}
I remembered from the Scala course that the foldLeft function can basically be used for almost any function, so I also added a map and filter function which use the foldLeft.
These are the testcases:
    @Test
    public void applyMap() {
        List<String> result = map(input, new Function1<Integer, String>() {
            @Override
            public String apply(Integer integer) {
                return integer.toString();
            }
        });
        assertThat(result, equalTo((List<String>)newArrayList("1", "3", "5", "2", "10", "4")));
    }

    @Test
    public void applyFilter() {
        List<Integer> result = filter(input, new Function1<java.lang.Integer, java.lang.Boolean>() {
            @Override
            public Boolean apply(Integer integer) {
                return integer > 5;
            }
        });
        assertThat(result, equalTo((List<Integer>)newArrayList(10)));
    }

Here you see another disadvantage of Java, it's limited Generics system. The newArrayList has to be cast to a List of Integer's to satisfy Java's compiler. Scala's Generics system is much more advanced and this would not have been necessary.
Anyway, here is the implementation of map and filter :

    public static <A,B> List<B> map(final Iterable<A> list, final Function1<A, B> f) {
        List<B> result = newArrayList();
        result = foldLeft(list, result, new Function2<A, List<B>>() {
            @Override
            public List<B> apply(List<B> bs, A a) {
                bs.add(f.apply(a));
                return bs;
            }
        });
        return result;
    }

    public static <A> List<A> filter(final Iterable<A> list, final Function1<A, Boolean> f) {
        List<A> result = newArrayList();
        result = foldLeft(list, result, new Function2<A, List<A>>() {
            @Override
            public List<A> apply(List<A> as, A a) {
                if(f.apply(a)) {
                    as.add(a);
                }
                return as;
            }
        });
        return result;
    }

So, now with all these functional methods available for I implemented a max function in my Reducer using the foldLeft method. To keep the code clean, I created a seperate method max and a constant MAX which holds the max function.
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

        int max = max(values);
        context.write(key, new IntWritable(max));
    }

    private int max(Iterable<IntWritable> values) {
        return Collections3.foldLeft(values, 0, MAX);
    }

    public static final Function2<IntWritable,Integer> MAX = new Function2<IntWritable, Integer>() {
        @Override
        public Integer apply(Integer max, IntWritable value) {
            return Math.max(max, value.get());
        }
    };

Ok, it's more lines of code then writing a simple for-loop to calculate the max value, but you have to admit, this is way more fun. :-D

The full source of Collections3 and Collections3Test is available in my hadoop-jobs git repository: https://bitbucket.org/diversit/hadoop-jobs.

Tuesday, May 8, 2012

Hadoop streaming with Scala scripts

For a while now I'm fascinated by stories about Hadoop, HBase, Hive, etc. I attended many presentations at several conferences. So it's about time I started to learn more about it so I can play with it myself. Because I'm also into Scala nowadays, I also want to look into how Scala can make developing Hadoop/MapReduce applications easier and more fun. 
So, as to create a logbook for myself, I'm going to write a series of posts about my Hadoop with Scala adventure. Previous posts were about setting up a Hadoop cluster using Puppetsetting up a Hadoop cluster using Cloudera's Free Manager and howto develop (and debug) MapReduce jobs in the IDE. In next posts I will write more about my Hadoop and Scala adventure and, hopefully, eventually ending up with using Scala DSL's like ScoobiScalding and the Scala MapReduce framework Spark.

Note: all source in this blogpost are available in my hadoop-jobs git repository on: https://bitbucket.org/diversit/hadoop-jobs.

Streaming with Scala scripts
I was reading the Hadoop Streaming chapter in Tom White's Hadoop: The Definitive Guide (3rd edition preview) and saw the samples in Ruby and Python. Since I'm into Scala nowadays I thought, hé, this should also be possible with Scala script.

And it was. There were just 2 things I had to figure out: 1) how to create a Scala script file and 2) how to read data from stdin with Scala.

Scala script
A Scala script is a normal script file, with executable rights, but has to start with:
#!/bin/sh
  exec scala "$0" "$@"
!#
the rest of the file can contain any Scala code. You could add an executable object but this is not necessary. Lines are just evaluates sequentially.

Reading from stdin
There are multiple ways to read from stdin. I preferred to use this one:
for(ln <- io.Source.stdin.getLines) {
  ...
}

Max Temperature example in Scala scripts
In the Hadoop Streaming chapter Tom White's uses a Max Temperature example using NCDC data. Here is my implementation of that same example in Scala script:

sample.txt (input for job)

Input data; NCDC samples of temperature measurement which Tom uses for his Max Temperature examples. More sample data is available via the books code and data github.
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
Mapper.scala
The mapper reads the year, temperature and quality of the measurement from the standard input and outputs the year and temperature when the quality is any of 0,1,4,5,9. With streaming, in the output key and value must be separated by a tab.
#!/bin/sh
  exec scala "$0" "$@"
!#

val validQualities = """[01459]""".r

for(ln <- io.Source.stdin.getLines) {
  val year = ln.substring(15, 19)
  val temp = ln.substring(87, 92)
  val quality = ""+ln(92)

  validQualities findFirstIn quality match {
    case Some(_) => println("%s\t%s" format (year, temp))
    case None =>
  }
}

Reducer.scala
The reducer gets all temperatures and determines the maximum temperature for a year. The shuffling of the mapper output makes sure all same keys (=years) are grouped together. The input is the same as the mapper output: "<year>\t<temperature>".
The temperature can be +0010 or -0011. When temperature is above 0, the leading '+' has to be removed because it otherwise gives parse exceptions when changing it into an Int value.
#!/bin/sh
  exec scala "$0" "$@"
!#

val Input = """(\d+)\t([+-]\d+)""".r
var max = Int.MinValue
var lastKey: String = _

def out(key: String, temp: Int) = println("%s\t%s" format(key, temp))

for (ln <- io.Source.stdin.getLines) {

  def parse(s: String) = if (s.startsWith("+")) s.substring(1).toInt else s.toInt

  ln match {
    case Input(k, v) => {
      val temp = parse(v)

      max = if (lastKey != null && lastKey != k) {
        out(lastKey, max)
        temp
      } else {
        if (temp > max) temp else max
      }
      lastKey = k
    }
    case _ =>
  }
}
out(lastKey, max)
Running the job
Hadoop default does not support streaming, so when starting the streaming sample you need to include the hadoop-streaming jar.
HADOOP_HOME should point to your Hadoop installation directory and HADOOP_HOME/bin in your PATH.
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3.jar -input sample.txt -output out -mapper Mapper.scala -reducer Reducer.scala

Here's a Maven pom for using both (Cloudera) Hadoop and Scala in a single project:
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
                        http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>hadoop.diversit.eu</groupId>
    <artifactId>hadoop-jobs</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>Hadoop Job</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <scalaVersion>2.9.1</scalaVersion>
    </properties>

    <repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>Scala-Tools-Sonatype</id>
            <name>Scala Tools Maven2 @ Sonatype</name>
            <url>https://oss.sonatype.org/content/groups/scala-tools/</url>
        </repository>
    </repositories>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scalaVersion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>0.20.2-cdh3u3</version>
            <!--
            <scope>provided</scope>
            -->
        </dependency>
        <!--
             Add project dependencies here. The archetype plugin will
             include all dependencies from the "compile" scope (which is
         the default) in the job jar's "lib" directory.
        -->
        <dependency>
            <groupId>org.mockito</groupId>
            <artifactId>mockito-core</artifactId>
            <version>1.8.5</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.10</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <version>${scalaVersion}</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.2.1</version>
                <configuration>
                    <descriptors>
                        <descriptor>src/main/assembly/hadoop-job.xml</descriptor>
                    </descriptors>
                    <archive>
                        <manifest>
                            <mainClass>hadoop.diversit.eu.WordCount</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

Howto develop (and debug) MapReduce jobs in the IDE


For a while now I'm fascinated by stories about Hadoop, HBase, Hive, etc. I attended many presentations at several conferences. So it's about time I started to learn more about it so I can play with it myself. Because I'm also into Scala nowadays, I also want to look into how Scala can make developing Hadoop/MapReduce applications easier and more fun. 
So, as to create a logbook for myself, I'm going to write a series of posts about my Hadoop with Scala adventure. My first posts were about setting up a Hadoop cluster using Puppet and setting up a Hadoop cluster using Cloudera's Free Manager. In this post I'll write about howto develop (and debug) MapReduce jobs in the IDE. Next posts will be about Hadoop Streaming with Scala scripts, and, hopefully, eventually ending up with using Scala DSL's like ScoobiScalding and the Scala MapReduce framework Spark.


IDE's
It struck me that in all tutorials I've read, after setting up a Hadoop/MapReduce environment, the writer immediately shows some code examples. None of the tutorials talk about creating a MapReduce project or setting up the IDE. Tom White mentions it briefly in his Hadoop: The Definitive Guide book in Configuring the Development Environment.
Ideally, I would like to just click on a button and whala there's a whole project ready to start development.

Eclipse
For Eclipse there is an Eclipse plugin. The problem with this plugin seems to be, that it's contibuted code and it is not maintained at the same pace as or by the same people who develop Hadoop. The consequence is that the available binaries are old/obsolete; don't support the latest Hadoop or Eclipse versions. And the plugin is not compatible with Cloudera's Hadoop distributions. This has led to several blogposts which describe howto compile the plugin yourself for your Hadoop or Eclipse versions or Cloudera distribution.

IntelliJ
There don't seem to be any plugin or Hadoop support for IntelliJ at this moment.

Maven
I found a post of Matthias Friedrich who created an archetype for easily creating a Hadoop project. This is an excellent base to start your project from.
mvn archetype:generate 
    -DarchetypeCatalog=http://dev.mafr.de/repos/maven2/
To use this with Cloudera distribution you need to add the Cloudera Maven repository and change the hadoop version to the Cloudera version you're using
  ...
  <repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
    </repository>
  </repositories>
  ...
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-core</artifactId>
      <version>0.20.2-cdh3u3</version>
      <scope>provided</scope>
    </dependency>
  ...

Running a MapReduce job
So, all in all no really useable support for IDE's is available. Fortunately you don't really need it. Just create a Maven project and import the pom as new project. This sets up all dependencies and you're then ready to start developing your own mappers and reducers.
To run your job in the IDE, create an executable class and run it. Here's an example:

public class MaxTemperature {

    public static void main(String[] args) throws Exception {
        if(args.length < 2) {
            System.err.println("Usage: MaxTemperature <input path> <output path>");
            System.exit(1);
        }

        Job job = new Job();
        job.setJarByClass(MaxTemperature.class);
        job.setJobName("Max Temperature");

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setReducerClass(MaxTemperatureReducer.class);
//        job.setCombinerClass(MaxTemperatureReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

ps. when using the suggested Maven project, you need to remove the 'provided' scope from the hadoop dependency otherwise hadoop is not on the runnable path.
In your IDE create a run configuration for the class to which you add the necessary arguments. The output of a job is written to the output path. A job will not overwrite existing files, so to be able to run the job multiple time, you need to clear the delete the previous output, or change the output path.

There are multiple ways to do this. I didn't want to do it in Java code, so I chose to create a simple Ant script for this and adjusted the run configuration to run this Ant task before launch.
(IntelliJ has the option to run a Maven goal before launch but this is run after 'make'. The 'clean' goal clears the 'target' directory so then your compiled classes are gone and there is nothing to launch left.)
<?xml version="1.0"?>

<project name="Hadoop-jobs" default="clear-job-output">

    <target name="clear-job-output">
        <echo message="Delete: ${basedir}/target/output"/>
        <delete dir="${basedir}/target/output" verbose="true"/>
    </target>
</project>


Sunday, May 6, 2012

Setting up a Hadoop cluster using Cloudera Manager

For a while now I'm fascinated by stories about Hadoop, HBase, Hive, etc. I attended many presentations at several conferences. So it's about time I started to learn more about it so I can play with it myself. Because I'm also into Scala nowadays, I also want to look into how Scala can make developing Hadoop/MapReduce applications easier and more fun. 
So, as to create a logbook for myself, I'm going to write a series of posts about my Hadoop with Scala adventure. My first post was about setting up a Hadoop cluster using Puppet. In this post I'll describe  setting up a Hadoop cluster using Cloudera's Free Manager. Next posts will be about How to develop (and debug) MapReduce jobs in the IDE, Hadoop Streaming with Scala scripts, and, hopefully, eventually ending up with using Scala DSL's like ScoobiScalding and the Scala MapReduce framework Spark.

Cloudera is a small company in the Sillicon Valley, employs many former employees of Google, Yahoo, Facebook, Twitter, etc, so they have an incredible amount of knowledge about the whole Hadoop Eco-system. They contribute to Hadoop, give courses, and have lots of public resources like tutorials and training video's for those, like me, who want to learn about Hadoop.
Cloudera also has a product called Cloudera Manager with which you can easily setup a Hadoop environment. The free edition can manage up to 50 nodes, so more than enough for me to play with.

Installing Cloudera Manager
Installation of Cloudera Manager is very easy. After downloading the binary, the excellent installation guide helps you through the installation process. This time I chose to use CentOS 5.8 for the cluster hosts since Debian, which I used for my previous Hadoop cluster,  is not one of the supported operating systems. To create a new basic VM, I downloaded the CentOS 5.8 Network Install ISO and chose a basic server install, no additional applications or options. After installation I booted the VM and ran yum update to get all latest packages and disabled SELinux (is a requirement). Then I created a template of this VM so I could use it for all other cluster nodes.
To disable SELinux:
- edit /etc/selinux/conf
- Change 'SELINUX=enforcing' to 'SELINUX=disabled'
- Reboot
Download the Cloudera Manager on, or copy it to, the VM, and run it. It has an installation wizzard which helps you through the installation process and installs all required software, Oracle JDK, ProgreSQL, etc.

After installation, you can connect to the manager webgui via http://<ip>:7180 (uid:pwd = admin:admin).
For me this didn't work straight away because the systems iptables was preventing access to on port, so I had to enabled it.
To open port 7180, you can either disable iptables altogether
/etc/init.d/iptables stop
or open up the port:
iptables -I INPUT -m state --state NEW -p tcp --dport 7180 -j ACCEPT

(And while your at it, also open ports 7182 and 9001, both used by agents to connect to the manager. You'll need it later on.)

To save your new iptable settings:
/sbin/services iptables save
You can view all your iptables settings in /etc/sysconfig/iptables.

Setting up nodes
When you connect to the manager's webgui, you'll be taken through the process of adding node hosts to your cluster. The manager will take care of installing all required software.
Using the CentOS VM template I create before, I created 5 new node VM's; cdhnode1 to 5.
Cloudera Manager requires proper name resolution for all hosts, so for the VM's network connectivity I chose bridged network this time instead of shared network, and
- added the MAC addresses of the VM nodes to my DHCP server so the nodes would have a static ip
- and added those ip's to my local DNS server for proper ip <--> name resolution.

Before starting the node installation, for each node, setup the correct hostname and /etc/hosts file.
Hostname: on CentOS, change it in /etc/sysconfig/network.
/etc/hosts must contain at least 2 lines containing localhost and your hostname:
127.0.0.1        localhost
192.168.1.225    cdhnode5.diversit.local cdhnode5
To add the hosts to the cluster, in the bottom textbox, add all node ip's or an ip range pattern like 192.168.1.[221-225]. Then click 'Find Hosts'.
Check all the hosts you want to add to the cluster and click 'Continue'.
The Cloudera Manager needs a way to connect to all hosts and perform the installation. Easiest is to provide the root password, which must be equal for all hosts!! Providing an account with password-less sudo rights is also possible or use a public/private key pair.
Click on 'Start Installation' to begin installing CDH on the cluster hosts.

You can follow the installation process on a hosts by looking at the /tmp/scm_prepare_node.*/scm_prepare_node.log.

For more default about the manager and hosts installation, see the installation guide.

Starting services
After the software has been installed on all hosts and the hosts have been added to the cluster you can start your required services.
I just started HDFS, which is always required, and the MapReduce service. Adding a service is pretty straight forward using the manager.

Connecting to services
Via the Cloudera Manager webgui you can generate the client configuration needed to connect to your CDH cluster. On your client, you must run the same software version as on your hosts. Cloudera uses a specific Cloudera build. These packages can also be download from the Cloudera Download page.

-Put the right package on your client.
-Add the client configuration and you're done.
Happy Hadooping ;-)

Problems encountered
Error during installation of hosts software
At first, some nodes failed during installation. In /tmp/scm_prepare_node.*/scm_prepare_node.log you can then see the manager is rolling back the installation of all software (look for 'rollback started'). It was unclear why this happend. Opening up the ports 7182 and 9001 on the cloudera manager instance solved this.

Error starting service
I also had problems starting services on some hosts. This was caused by not having the 127.0.0.1 localhost line in my /etc/hosts file. I had read somewhere that that line had to be removed, but apparently that was wrong. Thanks to Cloudera support for the quick reply and solution.

Friday, May 4, 2012

Setting up a Hadoop cluster using Puppet

For a while now I'm fascinated by stories about Hadoop, HBase, Hive, etc. I attended many presentations at several conferences. So it's about time I started to learn more about it so I can play with it myself. Because I'm also into Scala nowadays, I also want to look into how Scala can make developing Hadoop/MapReduce applications easier and more fun.
So, as to create a logbook for myself, I'm going to write a series of posts about my Hadoop with Scala adventure. This first post will be about setting up a Hadoop cluster using Puppet. Next posts will be about setting up a cluster using Cloudera's Free Manager, Howto develop (and debug) MapReduce jobs in the IDE, Hadoop Streaming with Scala scripts, and, hopefully, eventually ending up with using Scala DSL's like Scoobi, Scalding and the Scala MapReduce framework Spark.

My first step was to learn about Hadoop and setup a Hadoop cluster. On my dual quad core MacPro with 22GB memory I should have plenty of resources for that, using Parallels Desktop as virtualization platform. I could have just created a VM with all the software and then clone it multiple times, but I choose to use (and learn) Puppet so after the initial Hadoop cluster setup, I could also easily roll out other software like HBase, Hive, Pig, etc. And during the setup, Puppet also proved to be a handy tools to push other configuration settings to the VM's like the /etc/hosts file and ntpd.

The puppet master and puppet agent configuration are available in a Git repository as well as the Hadoop and Java modules. See Resources down below.

Creating the VM
Before I started with the cluster, I went through the Single node and Pseudo Distribution setup to get a bit familiar with Hadoop. After that, I created a basic VM using a Debian 6 network install. (At several steps during the whole cluster setup, I created snapshots of the VM, a feature of Parallels Desktop, so I could easily go back if I messed up something.)

Installing puppet
Next step was to install Puppet. First the puppet master. I cloned the Debian VM to create a seperate PuppetMaster instance. Since Puppet was new to me, I had to read about how to install Puppet and read several other resources about using and configuring Puppet (with Hadoop).

- Add squeeze-backports debian library to /etc/apt/sources.list to be able to install the latest Puppet version.
 (deb http://backports.debian.org/debian-backports squeeze-backports main)
- apt-get update
- Install Puppet :
  apt-get -t squeeze-backports install puppetmaster facter
  To search for and look at packages use :
  aptitude [-t squeeze-backports] search|show puppetmaster|facter )

The puppetmaster will be started after the install.

Puppet agents
For the Hadoop nodes, I again cloned the basic Debian VM. These will be the Puppet agents. Installation of Puppet is as described above, but instead of 'puppetmaster', install 'puppet'.

apt-get -t squeeze-backports install puppet facter
At this point, I created a template of this VM which I could then use to create new Hadoop nodes. See Adding more Hadoop Nodes.

Then:
- Add the ip of the puppetmaster in /etc/hosts. Default the agent will look for a host named 'puppet'
- Set the hostname of the node with a FQDN : hostname <host>.<domain>.
  I used hostname 'hadoop-01.diversit.local'
  On Debian, also set the hostname in /etc/hostname so it survives a reboot. On CentOS, it's in /etc/sysconfig/network.

The default agents configuration is sufficient. You might want to add pluginsync=true to the [main] section. On both the puppet master and puppet agent, the configuration is in the /etc/puppet directory.

Now run : puppetd -vt
The agent will now create a certificate using the hostname and will try to contact the puppetmaster.

The agent certificate has to be signed by the puppetmaster before the agent will receive any updates. While the agent is running, check on the puppetmaster if the sign request has already been received.
To view the certificate requests : puppetca -la
To sign the certificate : puppetca -sign <agent-name>

Puppet modules
Now both the puppet master and puppet agents are configured, it's time to add some useful modules on the puppet master to sync to the agents. To get familiar with Puppet, I first created a simple module to synchronize the /etc/hosts file.

mkdir /etc/puppet/modules/hosts
mkdir /etc/puppet/modules/hosts/files
mkdir /etc/puppet/modules/hosts/manifests

Put the hosts file you want to push to the agents, which will be Hadoop nodes, in the files directory.
Create an init.pp file in the manifest directory. The name of the class must be equal to the module name, in this case hosts.
class hosts {
  file { "/etc/hosts":
    owner => root,
    group => root,
    mode  => 775,
    source => "puppet:///hosts/hosts"
  }
}
This simple configuration will copy the file hosts in module hosts on the puppet master to /etc/hosts on the puppet agent.
File serving must be enabled on the puppet master to be able to copy files to agents. Edit the /etc/puppet/fileserver.conf and add the ip's or network of agents which are allowed to get files.

Last thing to do, is to tell puppet master which modules should be synced with which agents.
Create the file /etc/puppet/manifests/site.pp
import "nodes"
Then create /etc/puppet/manifests/nodes.pp
node basenode {
  include hosts
}

node 'hadoop-01' inherits basenode {
}
This file contains the information which modules are synced to which agents. As you can see, a node can inherit from other nodes. Here I created a basenode which can then be inherited by all agent nodes.

Now run puppetd -vt on the client to see if the hosts file is synchronized.
This is the basics of puppet modules. All modules are a variation on this. Of course puppet has many more features. For there, check out the puppet documentation or checkout Puppet Forge which is a repository of public puppet modules.

Java Module
We need Java to be able to run Hadoop. Oracle changed the license for Java and therefore it is not possible anymore to install Java as a package. So let's create a puppet module to distribute Java to the nodes.
I based my Java module on Brian Carpio's module in Puppet Forge. His module didn't work on Debian, so I had to tweak it a little bit.

- I downloaded his java module, put it in /etc/puppet/modules.
- The JDK package in /etc/puppet/modules/java/files didn't run on Debian, so I replaced it with a Linux x64  package downloaded from Oracle. I renamed the Oracle package to jdk1.7.0_03.tar.gz.
- In /etc/puppet/modules/java/manifests/init.pp I had to add path => "/usr/bin:/usr/sbin:/bin", to the exec section otherwise Debian was not able to find the untar command.

Finally I added the java module to the basenode in /etc/puppet/manifests/nodes.pp and Java was be synced to the agents.

Hadoop Module
Because Brian's Java module worked pretty well, I also used his Hadoop module.
One thing you have to do is create ssh keys for the hduser. This user is going to be created on the nodes and this user will run Hadoop.
- Added a hduser on the puppet master and generate ssh keys for it.
 useradd -m hduser
 sudo - hduser
 ssh-keygen (empty passphrase)
- Copied the id_rsa and id_rsa.pub to /etc/puppet/modules/hadoop/files/ssh

I updated Hadoop to version 1.0.2.
- Download tar.gz from Hadoop site.
- Updated the version in /etc/puppet/modules/hadoop/manifests/params.pp

In this module I also had to add path => "/usr/bin:/usr/sbin:/bin",to the exec section.

Also in /etc/puppet/modules/hadoop/manifests/params.pp you need to put the fqdn of the master and all slave nodes. These values are used to generate the Hadoop configuration.

Add the hadoop module to the basenode in /etc/puppet/manifests/nodes.pp and Hadoop will be synced to the agents (if agent daemon is running). Run puppetd -vt to run the agent directly.

In my latest Hadoop module version I disabled the formatting of the namenode. This should only be done once and not everytime the Hadoop module is updated.

Starting Hadoop
I did all above steps for 4 nodes: 1 master and 3 slaves.
To start the Hadoop:
- Login on the master node
- Go to the Hadoop install directory (/opt/hadoop/hadoop)
- Run bin/start-dfs.sh to start HDFS
- Run bin/start-mapred.sh to start MapReduce
These scripts will connect to all slave nodes, using the hduser, and start the processes on the slaves.

You can now try to connect to the cluster from your client, typically your development machine. Make sure you have the same version of Hadoop locally as on the cluster.
Copy the conf/core-site.xml and conf/mapred-site.xml from your Hadoop master or one of your nodes to your client.

Test your setup. From your Hadoop install directory run : bin/hadoop fs -ls /
This should show you the root of the HDFS in the cluster and not your local filesystem.

Adding more Hadoop nodes
Now that Hadoop is running and with Puppet all setup, adding more nodes now is easy.
- Clone a VM using the Puppet Agent template I created earlier.
- Add the puppet master ip in /etc/hosts
- Set the correct hostname
- Run the puppet agent : puppetd -vt
- Sign the puppet agent certificate on the puppet master : puppetca --sign <agent-name>
- Add the new node to /etc/puppet/manifests/nodes.pp
- Add the new node to the slaves in /etc/puppet/modules/hadoop/manifests/params.pp
- Add the new node's ip in /etc/puppet/modules/hosts/files/hosts
- Run the puppet agent on the new node to install all software and configuration.

Your new Hadoop node should now be ready.
First format the HDFS on the new node. On the node run : bin/hadoop namenode -format
Then start the datanode and tasktracker on the node. The Hadoop master will automatically find the new node.
bin/hadoop-daemon.sh start datanode
bin/hadoop-daemon.sh start tasktracker

Verify on your Hadoop master that the new node(s) are available in the cluster.
Either by running bin/hadoop dfsadmin -report on the Hadoop master
Or open http://hadoopmaster:50070 in your browser.

Problems I encountered
Time out of sync
Even though the VM's are configured to get the time from my Mac, the time on the VM's was different sometimes. This causes problems with Puppet, because Puppet wants all times within 30 minutes of the puppet master.
To fix this I decided to install ntpd on all nodes so all time would be equal.
Since puppet was already in place, I create a simple ntp module to install and configure ntp on the nodes.

Automatically start puppet agents
Puppet agent not started automatically when the VM starts. After setup is complete, and you want to have the puppet agent running so changes are pushed automatically, edit /etc/default/puppet and change START=no into START=yes. Then use /etc/init.d/puppet start to start the agent.

Push changes to agents
Agent will default poll the puppet master every 30 minutes. If you have a change you want to push to the agents immediately, you have to connect to each agent and run 'puppetd -vt'.
Another option is to push changes to the agents. The agents must be configured to listen for 'push' events which they don't do by default.
In the /etc/puppet/puppet.conf, add 'listen=true' in the [main] section.
Create a /etc/puppet/auth.conf file to allow certain hosts to push changes.
# Allow puppet kick access
path /run
method save
auth any
allow *.hadoop.local
The agents should reload the new configuration automatically.
This has to be done on all puppet agents, so what the heck, let's put it all in a puppet module and let puppet do the work.
Since this is also just file copying like the /etc/hosts file, I renamed the hosts module to config and added the puppet agent configuration to it.

Resources
Getting Started with Puppet on OSX : http://glarizza.posterous.com/getting-started-with-puppet-on-os-x
How to create a Puppet Module : https://www.ctrip.ufl.edu/puppet-module-debian-lenny
My Puppet Bookmarkts : http://www.diigo.com/user/jdboer/puppet
Brian Carpio Puppet Modules : http://forge.puppetlabs.com/users/bcarpio

Git repositories:
Puppet master configuration (contains all modules except Hadoop and Java modules) : https://bitbucket.org/diversit/hadoop-puppet-master-config/
Pupper agent configuration : https://bitbucket.org/diversit/hadoop-puppet-agent-config/
Puppet Java Module : https://bitbucket.org/diversit/puppet-module-java/
Puppet Hadoop Module : https://bitbucket.org/diversit/puppet-module-hadoop/

Next
Next post will be about setting up a Hadoop cluster using Cloudera's Free Manager.