Scala Applications to Access Hadoop Data

One of the most recent and highly used functional programming language is Scala. It is used in some of the Hadoop ecosystem components like Apache Spark, Apache Kafka, etc. So it would be really useful for someone to develop applications using Scala that uses Hadoop and the ecosystem projects.

In this post, I am going to show you how to run a Scala application that accesses Hadoop data. The example below references MapR-FS, which is a distributed and fast file system similar to HDFS, and is part of the MapR Distribution. And from an application perspective, MapR-FS looks exactly like HDFS, so this example application will work on any Hadoop distribution with a slight modification to the core-site.xml configuration file. You can access Hadoop data using Scala applications similarly to how one accesses Hadoop data using Java and C/C++ applications.

I will walk through the code of a simple Scala application, and then go through the steps to compile and execute that Scala application, which uses Java APIs. I will assume that you already have MapR v3.0.3 (if any other version, please change it to corresponding version in build.sbt to pick appropriate dependencies) installed, but if not, please download (for free) either the Community Edition or the MapR Sandbox. Also, you need to have a Scala compiler installed, which you can download from: http://www.scala-lang.org.

The program performs the following, all on MapR-FS:

  1. Initial configuration
  2. Create and save a file
  3. Delete a file
  4. Read a file
  5. Create a folder

Below we have snippets of code explaining each step. We also have the complete source code available for download later in the blog post.

1. Initial configuration

//Get an instance of Configuration
private var conf = new Configuration()

//Hadoop and MapR resources to be added to Configuration instance. Here, we can have custom configuration for both MapR and generic Hadoop stuff.

private var maprfsCoreSitePath = new Path("core-site.xml")
  private var maprfsSitePath = new Path("maprfs-site.xml")

//Add the resources to Configuration instance

  conf.addResource(maprfsCoreSitePath)
  conf.addResource(maprfsSitePath)

//Get a FileSystem handle
private var fileSystem = FileSystem.get(conf)

2. Create and save a file into MapR-FS

  def createAndSave(filepath: String): Unit = {
    var file = new File(filepath)
    var out = fileSystem.create(new Path(file.getName))  //New File will be created in MapR-FS

//Read from the file into InputStream

    var in = new BufferedInputStream(new FileInputStream(file))
    var b = new Array[Byte](1024)
    var numBytes = in.read(b)

//Write the output to the OutputStream given by the FileSystem

    while (numBytes > 0) {
      out.write(b, 0, numBytes)
      numBytes = in.read(b)
    }
    in.close()
    out.close()
  }

3. Delete a file from MapR-FS

//Get the path of the filename and delete the file from MapR-FS

 def deleteFile(filename: String): Boolean = {
     var path = new Path(filename)
     fileSystem.delete(path, true)
  }

4. Read a file from MapR-FS

//Read a file in MapRFS using FileSystem which returns an InputStream

  def getFile(filename: String): InputStream = {
    var path = new Path(filename)
    fileSystem.open(path)
  }

5. Create a folder inside MapR-FS

//If a directory not exists already, create the directory using FileSystem instance

 def mkdirs(folderPath: String): Unit = {
   var path = new Path(folderPath)
     if (!fileSystem.exists(path)) {
    fileSystem.mkdirs(path)
     }
  }

Compiling and Running the Program

Scala has 2 utilities, scalac (compile) and scala(run), similar to Java. I’ll go through the compile and run steps, which should look familiar to Java programmers.

1. cd to the directory where .scala files are stored.

root@ubuntu1:/home/maprfs-scala-example# ls -l
total 44
-rwxr-xr-x 1 mapr root  552 Aug 25 15:42 build.sbt
drwxr-xr-x 3 root root 4096 Sep 15 10:35 com
-rwxr-xr-x 1 mapr root  855 Aug 25 16:24 core-site.xml
drwxr-xr-x 2 mapr root 4096 Aug 26 11:56 lib
-rwxr-xr-x 1 mapr root  305 Aug 25 11:35 maprfs-site.xml
drwxr-xr-x 3 mapr root 4096 Aug 25 11:36 project
-rwxr-xr-x 1 mapr root  132 Aug 25 11:35 README.md
drwxr-xr-x 4 mapr root 4096 Aug 25 11:35 src
drwxr-xr-x 5 root root 4096 Oct  3 13:20 target
-rwxr-xr-x 1 root root   97 Oct  3 10:21 testfile1.txt
-rwxr-xr-x 1 mapr root  110 Oct  3 13:20 testfile.txt

2. Compile the .scala files with “scalac -cp .:$(hadoop classpath) *.scala”

root@ubuntu1:/home/maprfs-scala-example# scalac -cp .:$(hadoop classpath) src/main/scala/*.scala

3. Run the scala class files with “scala -cp .:$(hadoop classpath) com.mapr.scala.example.Main”

root@ubuntu1:/home/maprfs-scala-example# scala -cp .:$(hadoop classpath) com.mapr.scala.example.Main
Creating an empty file
New file created as empty-file.txt
Creating new file
Writing Example text  into the testfile.txt
Saving the file testfile.txt to MaprFs
Appended text from testfile1.txt to testfile.txt
Deleting the file empty-file.txt

Using SBT (Simple Build Tool)

SBT (Simple Build Tool) is an open source build tool used to build Scala and Java applications similar to Maven, Ant etc.

1. Install SBT, which you can download from here: http://www.scala-sbt.org/download.html

2. cd to the directory where the Scala project (maprfs-scala-example) resides

3. Create a file called “build.sbt” with the following in it:

name := "example"
organization := "com.mapr.scala"
version := "0.1"
scalaVersion := "2.10.3"

libraryDependencies ++= Seq(
  "org.apache.hadoop" % "hadoop-client" % "1.0.3-mapr-3.0.3" excludeAll(
    ExclusionRule(organization = "com.sun.jdmk"),
    ExclusionRule(organization = "com.sun.jmx"),
    ExclusionRule(organization = "javax.jms")),
  "org.scalatest" % "scalatest_2.10" % "1.9.2" % "test"
)

resolvers += "MapR jars" at "http://repository.mapr.com/nexus/content/groups/mapr-public/"
initialCommands := "import com.mapr.scala.example._"

4. And then run “sbt” inside the directory where build.sbt is saved.

root@ubuntu1:/home/maprfs-scala-example# sbt
[info] Set current project to example (in build file:/home/maprfs-scala-example/)

5. Inside the sbt shell, run “clean” (to clean already existing class files under target directory), “compile” (to compile the scala files inside the project directory), “run” (to execute the “Main” class which is having the main function).

> clean
[success] Total time: 1 s, completed Sep 15, 2014 11:03:26 AM
> compile
[info] Updating {file:/home/maprfs-scala-example/}maprfs-scala-example...
[info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Compiling 2 Scala sources to /home/maprfs-scala-example/target/scala-2.10/classes...
[success] Total time: 28 s, completed Sep 15, 2014 11:04:44 AM
< run
[info] Running com.mapr.scala.example.Main
New file created as empty-file.txt
Creating new file
Writing Example text  into the testfile.txt
Saving the file testfile.txt to MaprFs
Appended text from testfile1.txt to testfile.txt
Deleting the file empty-file.txt
[success] Total time: 2 s, completed Oct 6, 2014 10:30:35 AM
>

As you can see, running a Scala application that accesses Hadoop data in MapR is very straightforward. This is just one example of the power of MapR-FS and how it makes your Hadoop data accessible to a wider variety of applications.

MapRfsFileService.scala

package com.mapr.scala.example

import java.io.BufferedInputStream
import java.io.File
import java.io.FileInputStream
import java.io.InputStream
import org.apache.hadoop.conf._
import org.apache.hadoop.fs._

object MapRfsFileService {

//Initial Configuration

  private var conf = new Configuration()
  private var maprfsCoreSitePath = new Path("core-site.xml")
  private var maprfsSitePath = new Path("maprfs-site.xml")

  conf.addResource(maprfsCoreSitePath)
  conf.addResource(maprfsSitePath)

  private var fileSystem = FileSystem.get(conf)

//Create a folder inside MapR-FS

  def mkdirs(folderPath: String): Unit = {
   var path = new Path(folderPath)
     if (!fileSystem.exists(path)) {
    fileSystem.mkdirs(path)
     }
  }

//Create a new Empty file

 def createNewFile(filepath:String): Unit = {
   var file = new File(filepath)
   var out = fileSystem.createNewFile(new Path(file.getName))
   if(out)
     println("New file created as "+file.getName)
   else
     println("File cannot be created : "+file.getName)
  }

//Create and save a file into MapR-FS

  def createAndSave(filepath: String): Unit = {
    var file = new File(filepath)
    var out = fileSystem.create(new Path(file.getName))
    var in = new BufferedInputStream(new FileInputStream(file))
    var b = new Array[Byte](1024)
    var numBytes = in.read(b)
    while (numBytes > 0) {
      out.write(b, 0, numBytes)
      numBytes = in.read(b)
    }
    in.close()
    out.close()
  }

//Append data from a file to another file

 def appendToFile(tofilepath: String, fromfilepath: String): Unit = {
   var file = new File(tofilepath)
   var out = fileSystem.append(new Path(file.getName))
   var in = new BufferedInputStream(new FileInputStream(new File(fromfilepath)))
   var b = new Array[Byte](1024)
   var numBytes = in.read(b)
   while (numBytes > 0) {
     out.write(b, 0, numBytes)
     numBytes = in.read(b)
    }
    in.close()
    out.close()
 }

//Read a file from MapR-FS

  def getFile(filename: String): InputStream = {
    var path = new Path(filename)
    fileSystem.open(path)
  }

//Delete a file from MapR-FS

  def deleteFile(filename: String): Boolean = {
     var path = new Path(filename)
     fileSystem.delete(path, true)
  }

//Close the FileSystem Handle

  def close() = {
     fileSystem.close
   }
 }

Main.scala

package com.mapr.scala.example
import java.io._
import java.util.Arrays;

object Main {
  def main(args: Array[String]) {
    val testfileName = "testfile.txt"
    val testText = "Example text "
    val appendFile = "testfile1.txt"

    val emptyFile = "empty-file.txt"

//Create an Empty file in MapRFS

    println("Creating an empty file")
    MapRfsFileService.createNewFile(emptyFile)

 //Creating a new file and saving it to MapRFS

    println("Creating new file")
    val testfile = new File(testfileName)
    val testfileWriter = new BufferedWriter(new FileWriter(testfile))
    println("Writing " + testText + " into the " + testfileName)
    testfileWriter.write(testText)
    testfileWriter.close
    println("Saving the file " + testfileName + " to MaprFs")
    MapRfsFileService.createAndSave(testfileName)

//Append to file in MapRFS

    MapRfsFileService.appendToFile(testfileName,appendFile)
    println("Appended text from "+ appendFile + " to "+testfileName)
   
//Reading a file from MapRFS

    val outputStream = new FileOutputStream(new File(testfileName))
    val in = MapRfsFileService.getFile(testfileName)
    var b = new Array[Byte](1024)
    var numBytes = in.read(b)
    while (numBytes > 0) {
      outputStream.write(b, 0, numBytes)
      numBytes = in.read(b)
    }
    outputStream.close
    in.close

//Deleting a file from MapRFS

    println("Deleting the file " + emptyFile)
    MapRfsFileService.deleteFile(emptyFile)

//Close the FileSystem Handle

   MapRfsFileService.close
   }
 }

maprfs-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->
<configuration>
  <configuration>
    <property>
      <name>dfs.replication</name>
      <value>1</value>
    </property>
  </configuration>
</configuration>


core-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!-- Put site-specific property overrides in this file. -->
<configuration>
  <property>
    <name>fs.default.name</name>
    <value>maprfs:///</value>
  </property>
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/tmp/hadoop-${user.name}</value>
    <description>A base for other temporary directories.</description>
  </property>
<property>
  <name>fs.mapr.working.dir</name>
  <value>/user/$USERNAME/</value>
  <description>The default directory to be used with relative paths.
  Note that $USERNAME is NOT an environmental variable, but just a placeholder
  to indicate that it will be expanded to the corresponding username.
  Other example default directories could be "/", "/home/$USERNAME", "/$USERNAME" etc.
  </description>
</property>
</configuration>

The github source for this Scala application: https://github.com/vsowrirajan/maprfs-scala-example/

no

Streaming Data Architecture:

New Designs Using Apache Kafka and MapR Streams

 

 

 

Download for free