Hadoop is a great technology built with java.
Today we will use Scala to implement a simple map reduce job and then run it using HDInsight. We shall add the assembly plugin on our assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Then we will add the Hadoop core dependency on our build.sbt file. Also will we apply some configuration in the merge strategy to avoid deduplicate errors.
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
libraryDependencies += "org.apache.hadoop" % "hadoop-core" % "1.2.1"We will use WordCount as an example. The original Java class shall be transformed to a Scala class.
package com.gkatzioura.scala
import java.lang.Iterable
import java.util.StringTokenizer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{IntWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer}
import scala.collection.JavaConverters._
/**
* Created by gkatzioura on 2/14/17.
*/
package object WordCount {
class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] {
val one = new IntWritable(1)
val word = new Text()
override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = {
val itr = new StringTokenizer(value.toString)
while (itr.hasMoreTokens()) {
word.set(itr.nextToken())
context.write(word, one)
}
}
}
class IntSumReader extends Reducer[Text,IntWritable,Text,IntWritable] {
override def reduce(key: Text, values: Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = {
var sum = values.asScala.foldLeft(0)(_ + _.get)
context.write(key, new IntWritable(sum))
}
}
def main(args: Array[String]): Unit = {
val configuration = new Configuration
val job = Job.getInstance(configuration,"word count")
job.setJarByClass(this.getClass)
job.setMapperClass(classOf[TokenizerMapper])
job.setCombinerClass(classOf[IntSumReader])
job.setReducerClass(classOf[IntSumReader])
job.setOutputKeyClass(classOf[Text])
job.setOutputKeyClass(classOf[Text]);
job.setOutputValueClass(classOf[IntWritable]);
FileInputFormat.addInputPath(job, new Path(args(0)))
FileOutputFormat.setOutputPath(job, new Path(args(1)))
System.exit(if(job.waitForCompletion(true)) 0 else 1)
}
}Then we will build our example
sbt clean compile assembly
Our new jar will reside on target/scala-2.12/ScalaHadoop-assembly-1.0.jar On the next post we shall run our code using Azureβs HDInsight.
You can find the code on github.
| Reference: | WordCount on Hadoop with Scala from our JCG partner Emmanouil Gkatziouras at the gkatzioura blog. |
Do you want to know how to develop your skillset to become a Java Rockstar?
Subscribe to our newsletter to start Rocking right now!
To get you started we give you our best selling eBooks for FREE!
1. JPA Mini Book
2. JVM Troubleshooting Guide
3. JUnit Tutorial for Unit Testing
4. Java Annotations Tutorial
5. Java Interview Questions
6. Spring Interview Questions
7. Android UI Design
and many more ....
I agree to the Terms and Privacy Policy
Thank you!
We will contact you soon.
Tags
Apache Hadoop
π Photo of Emmanouil Gkatziouras
Emmanouil GkatziourasFebruary 20th, 2017Last Updated: February 18th, 2017
Emmanouil GkatziourasFebruary 20th, 2017Last Updated: February 18th, 2017
0 161 1 minute read

This site uses Akismet to reduce spam. Learn how your comment data is processed.