So, as to create a logbook for myself, I'm going to write a series of posts about my Hadoop with Scala adventure. Previous posts were about setting up a Hadoop cluster using Puppet, setting up a Hadoop cluster using Cloudera's Free Manager and howto develop (and debug) MapReduce jobs in the IDE. In next posts I will write more about my Hadoop and Scala adventure and, hopefully, eventually ending up with using Scala DSL's like Scoobi, Scalding and the Scala MapReduce framework Spark.
Note: all source in this blogpost are available in my hadoop-jobs git repository on: https://bitbucket.org/diversit/hadoop-jobs.
Streaming with Scala scripts
I was reading the Hadoop Streaming chapter in Tom White's Hadoop: The Definitive Guide (3rd edition preview) and saw the samples in Ruby and Python. Since I'm into Scala nowadays I thought, hé, this should also be possible with Scala script.
And it was. There were just 2 things I had to figure out: 1) how to create a Scala script file and 2) how to read data from stdin with Scala.
Scala script
A Scala script is a normal script file, with executable rights, but has to start with:
#!/bin/sh
exec scala "$0" "$@"
!#
the rest of the file can contain any Scala code. You could add an executable object but this is not necessary. Lines are just evaluates sequentially.Reading from stdin
There are multiple ways to read from stdin. I preferred to use this one:
for(ln <- io.Source.stdin.getLines) {
...
}
Max Temperature example in Scala scripts
In the Hadoop Streaming chapter Tom White's uses a Max Temperature example using NCDC data. Here is my implementation of that same example in Scala script:
sample.txt (input for job)
Input data; NCDC samples of temperature measurement which Tom uses for his Max Temperature examples. More sample data is available via the books code and data github.
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
Mapper.scalaThe mapper reads the year, temperature and quality of the measurement from the standard input and outputs the year and temperature when the quality is any of 0,1,4,5,9. With streaming, in the output key and value must be separated by a tab.
#!/bin/sh
exec scala "$0" "$@"
!#
val validQualities = """[01459]""".r
for(ln <- io.Source.stdin.getLines) {
val year = ln.substring(15, 19)
val temp = ln.substring(87, 92)
val quality = ""+ln(92)
validQualities findFirstIn quality match {
case Some(_) => println("%s\t%s" format (year, temp))
case None =>
}
}
Reducer.scala
The reducer gets all temperatures and determines the maximum temperature for a year. The shuffling of the mapper output makes sure all same keys (=years) are grouped together. The input is the same as the mapper output: "<year>\t<temperature>".
The temperature can be +0010 or -0011. When temperature is above 0, the leading '+' has to be removed because it otherwise gives parse exceptions when changing it into an Int value.
#!/bin/sh
exec scala "$0" "$@"
!#
val Input = """(\d+)\t([+-]\d+)""".r
var max = Int.MinValue
var lastKey: String = _
def out(key: String, temp: Int) = println("%s\t%s" format(key, temp))
for (ln <- io.Source.stdin.getLines) {
def parse(s: String) = if (s.startsWith("+")) s.substring(1).toInt else s.toInt
ln match {
case Input(k, v) => {
val temp = parse(v)
max = if (lastKey != null && lastKey != k) {
out(lastKey, max)
temp
} else {
if (temp > max) temp else max
}
lastKey = k
}
case _ =>
}
}
out(lastKey, max)
Running the jobHadoop default does not support streaming, so when starting the streaming sample you need to include the hadoop-streaming jar.
HADOOP_HOME should point to your Hadoop installation directory and HADOOP_HOME/bin in your PATH.
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.2-cdh3u3.jar -input sample.txt -output out -mapper Mapper.scala -reducer Reducer.scala
Here's a Maven pom for using both (Cloudera) Hadoop and Scala in a single project:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>hadoop.diversit.eu</groupId>
<artifactId>hadoop-jobs</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Hadoop Job</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<scalaVersion>2.9.1</scalaVersion>
</properties>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<repository>
<id>Scala-Tools-Sonatype</id>
<name>Scala Tools Maven2 @ Sonatype</name>
<url>https://oss.sonatype.org/content/groups/scala-tools/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scalaVersion}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>0.20.2-cdh3u3</version>
<!--
<scope>provided</scope>
-->
</dependency>
<!--
Add project dependencies here. The archetype plugin will
include all dependencies from the "compile" scope (which is
the default) in the job jar's "lib" directory.
-->
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>${scalaVersion}</version>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2.1</version>
<configuration>
<descriptors>
<descriptor>src/main/assembly/hadoop-job.xml</descriptor>
</descriptors>
<archive>
<manifest>
<mainClass>hadoop.diversit.eu.WordCount</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
No comments:
Post a Comment