VOOZH about

URL: https://dzone.com/articles/spark-streaming-1

⇱ Getting Started With Spark Streaming


Related

  1. DZone
  2. Data Engineering
  3. Data
  4. Getting Started With Spark Streaming

Getting Started With Spark Streaming

An introduction to Spark Streaming and how to use it with an example data set.

By Apr. 10, 16 Β· Opinion
Likes
Comment
Save
84.5K Views

Join the DZone community and get the full member experience.

Join For Free

This post will help you get started using Apache Spark Streaming with HBase. Spark Streaming is an extension of the core Spark API that enables continuous data stream processing.

What is Spark Streaming?

First of all, what is streaming? A data stream is an unbounded sequence of data arriving continuously. Streaming divides continuously flowing input data into discrete units for processing. Stream processing is low latency processing and analyzing of streaming data. Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data. Spark Streaming is for use cases which require a significant amount of data to be quickly processed as soon as it arrives. Example real-time use cases are:

  • Website monitoring, network monitoring
  • Fraud detection
  • Web clicks
  • Advertising
  • Internet of Things sensors

Spark Streaming supports data sources such as HDFS directories, TCP sockets, Kafka, Flume, Twitter, etc. Data Streams can be processed with Spark’s core APIS, DataFrames SQL, or machine learning APIs, and can be persisted to a filesystem, HDFS, databases, or any data source offering a Hadoop OutputFormat.

πŸ‘ Image

How Spark Streaming Works

Spark Streaming divides a data stream into batches of X seconds called Dstreams, which internally is a sequence of RDDs. Your Spark Application processes the RDDs using Spark APIs, and the processed results of the RDD operations are returned in batches.

πŸ‘ Image

Architecture of the example Streaming Application

πŸ‘ Image

The Spark Streaming example code does the following:

  • Reads streaming data.
  • Processes the streaming data.
  • Writes the processed data to an HBase Table.

Other Spark example code does the following:

  • Reads HBase Table data written by the streaming code
  • Calculates daily summary statistics
  • Writes summary statistics to the HBase table Column Family stats

Example Data Set

The oil pump sensor data comes in as comma separated value (csv) files dropped in a directory. Spark Streaming will monitor the directory and process any files created in that directory. (As stated before, Spark Streaming supports different streaming data sources; for simplicity, this example will use files.) Below is an example of the csv file with some sample data:

πŸ‘ Image

We use a Scala case class to define the sensor schema corresponding to the sensor data csv files, and a parseSensor function to parse the comma separated values into the sensor case class.

HBase Table Schema

The HBase Table Schema for the streaming data is as follows:

  • Composite row key of the pump name date and time stamp
  • Column Family data with columns corresponding to the input data fields Column Family alerts with columns corresponding to any filters for alarming values. Note that the data and alert column families could be set to expire values after a certain amount of time.

The Schema for the daily statistics summary rollups is as follows:

  • Composite row key of the pump name and date
  • Column Family stats
  • Columns for min, max, and avg.

πŸ‘ Image

The function below converts a Sensor object into an HBase Put object, which is used to insert a row into HBase.

Configuration for Writing to an HBase Table

You can use the TableOutputFormat class with Spark to write to an HBase table, similar to how you would write to an HBase table from MapReduce. Below we set up the configuration for writing to HBase using the TableOutputFormat class.

The Spark Streaming Example Code

These are the basic steps for Spark Streaming code:

  1. Initialize a Spark StreamingContext object.
  2. Apply transformations and output operations to DStreams.
  3. Start receiving data and processing it using streamingContext.start().
  4. Wait for the processing to be stopped using streamingContext.awaitTermination().

We will go through each of these steps with the example application code.

Initializing the StreamingContext

First we create a StreamingContext, the main entry point for streaming functionality, with a 2 second batch interval.

val sparkConf = new SparkConf().setAppName("HBaseStream")

// create a StreamingContext, the main entry point for all streaming functionality
val ssc = new StreamingContext(sparkConf, Seconds(2))

Next, we use the StreamingContext textFileStream(directory) method to create an input stream that monitors a Hadoop-compatible file system for new files and processes any files created in that directory.

// create a DStream that represents streaming data from a directory source
val linesDStream = ssc.textFileStream("/user/user01/stream")

The linesDStream represents the stream of data, each record is a line of text. Internally a DStream is a sequence of RDDs, one RDD per batch interval.

πŸ‘ Image

Apply Transformations and Output Operations to DStreams

Next we parse the lines of data into Sensor objects, with the map operation on the linesDStream.

// parse each line of data in linesDStream into sensor objects

val sensorDStream = linesDStream.map(Sensor.parseSensor) 

The map operation applies the Sensor.parseSensor function on the RDDs in the linesDStream, resulting in RDDs of Sensor objects.

πŸ‘ Image

Next we use the DStream foreachRDD method to apply processing to each RDD in this DStream. We filter the sensor objects for low psi to create alerts, then we write the sensor and alert data to HBase by converting them to Put objects, and using the PairRDDFunctions saveAsHadoopDataset method, which outputs the RDD to any Hadoop-supported storage system using a Hadoop Configuration object for that storage system (see Hadoop Configuration for HBase above).

// for each RDD. performs function on each RDD in DStream
sensorRDD.foreachRDD { rdd =>
 // filter sensor data for low psi
 val alertRDD = rdd.filter(sensor => sensor.psi < 5.0)

 // convert sensor data to put object and write to HBase Table CF data
 rdd.map(Sensor.convertToPut).saveAsHadoopDataset(jobConfig)

 // convert alert to put object write to HBase Table CF alerts
 rdd.map(Sensor.convertToPutAlert).saveAsHadoopDataset(jobConfig)
}

The sensorRDD objects are converted to put objects then written to HBase.

πŸ‘ Image

Start Receiving Data

To start receiving data, we must explicitly call start() on the StreamingContext, then call awaitTermination to wait for the streaming computation to finish.

 // Start the computation
 ssc.start()
 // Wait for the computation to terminate
 ssc.awaitTermination()

Spark Reading From and Writing to HBase

Now we want to read the HBase sensor table data , calculate daily summary statistics and write these statistics to the stats column family.

πŸ‘ Image

The code below reads the HBase table sensor table psi column data, calculates statistics on this data using StatCounter, then writes the statistics to the sensor stats column family.

 // configure HBase for reading 
 val conf = HBaseConfiguration.create()
 conf.set(TableInputFormat.INPUT_TABLE, HBaseSensorStream.tableName)
 // scan data column family psi column
 conf.set(TableInputFormat.SCAN_COLUMNS, "data:psi") 

// Load an RDD of (row key, row Result) tuples from the table
 val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
 classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
 classOf[org.apache.hadoop.hbase.client.Result])

 // transform (row key, row Result) tuples into an RDD of Results
 val resultRDD = hBaseRDD.map(tuple => tuple._2)

 // transform into an RDD of (RowKey, ColumnValue)s , with Time removed from row key
 val keyValueRDD = resultRDD.
 map(result => (Bytes.toString(result.getRow()).
 split(" ")(0), Bytes.toDouble(result.value)))

 // group by rowkey , get statistics for column value
 val keyStatsRDD = keyValueRDD.
 groupByKey().
 mapValues(list => StatCounter(list))

 // convert rowkey, stats to put and write to hbase table stats column family
 keyStatsRDD.map { case (k, v) => convertToPut(k, v) }.saveAsHadoopDataset(jobConfig)

The diagram below shows that the output from newAPIHadoopRDD is an RDD of row key, result pairs. The PairRDDFunctions saveAsHadoopDataset saves the Put objects to HBase.

πŸ‘ Image

Software

Running the Application

You can run the code as a standalone application as described in the tutorial on Getting Started with Spark on MapR Sandbox.

Here are the steps summarized:

  1. Log into the MapR Sandbox, as explained in Getting Started with Spark on MapR Sandbox, using userid user01, password mapr.
  2. Build the application using maven.
  3. Copy the jar file and data file to your sandbox home directory /user/user01 using scp.
  4. Run the streaming app:
     /opt/mapr/spark/spark-<version>/bin/spark-submit --driver-class-path `hbase classpath` 
     --class examples.HBaseSensorStream sparkstreamhbaseapp-1.0.jar

  5. Copy the streaming data file to the stream directory:cp sensordata.csv /user/user01/stream/
  6. Read data and calculate stats for one column
     /opt/mapr/spark/spark-<version>/bin/spark-submit --driver-class-path `hbase classpath` 
     --class examples.HBaseReadWrite sparkstreamhbaseapp-1.0.jar

  7. Calculate stats for whole row
     /opt/mapr/spark/spark-<version>/bin/spark-submit --driver-class-path `hbase classpath` 
     --class examples.HBaseReadRowWriteStats sparkstreamhbaseapp-1.0.jar

Summary

This concludes the tutorial on Spark Streaming with HBase. You can find more information in the references section.

References and More Information:

Database Data file Apache Spark sql Stream processing Statistics File system Column family Object (computer science)

Opinions expressed by DZone contributors are their own.

Related

  • How To Recover SQL Server FILESTREAM Enabled Database
  • Monitoring and Managing the Growth of the MSDB System Database in SQL Server
  • Using AUTHID Parameter in Oracle PL/SQL
  • How To Generate Scripts of Database Objects in SQL Server

Partner Resources

Γ—

Comments

The likes didn't load as expected. Please refresh the page and try again.

Let's be friends: