Spark QuickStart Guide

Pivotal HD

Pivotal HD

Spark QuickStart Guide

2015-03-26


List of Tables

2.1. Spark Prerequisites

Chapter 1. Introduction

Pivotal HD supports Apache Spark, a popular choice for fast, large-scale data processing.

Deep integration of Spark with YARN allows Spark to operate as a highly efficient tenant alongside other engines such as Hive, Storm, and HBase, running simultaneously on a single data platform.

YARN allows flexibility: you can choose the right processing tool for the job. Instead of creating and managing a set of dedicated clusters for Spark applications, you can store data in a single location, access and analyze it with multiple processing engines, and leverage your resources.

Spark Features

Spark on PHD supports the following features:

  • Spark on YARN

  • Full ORC file support

  • Spark-on-YARN on Kerberos-enabled clusters

  • Spark History Server

Spark Thrift Server and SparkSQL are currently available as technical previews.

Spark on PHD supports two deployment modes*:

  • Standalone mode, for developing Spark applications against a local Spark instance. Standalone mode is similar to developing and deploying jobs from an IDE. Spark handles resource management (job scheduling and execution) for a standalone node or pseudo-cluster.

  • Spark on YARN, which uses YARN services for resource allocation, running Spark Executors in YARN containers. Spark on YARN supports sophisticated workload management and Kerberos security features. It has two sub-modes:

    • YARN-Cluster mode, optimized for long-running production jobs.

    • YARN-Client mode, best for interactive use such as prototyping, testing, and debugging. Spark Shell runs in YARN-Client mode only.

In a modern data architecture with multiple processing engines using YARN and accessing data in HDFS, Spark on YARN is the leading Spark deployment mode.

* A third deployment mode, Spark on Mesos, is not supported in PHD distributions.

Chapter 2. Prerequisites

Before installing Spark, make sure your cluster meets the following prerequisites.

Table 2.1. Spark Prerequisites

PrerequisiteDescription
Cluster Stack VersionPHD 3.0 or later stack
ComponentsSpark requires HDFS and YARN.

[Note]Note

If you used the tech preview, save any configuration changes you made to the tech preview environment. Install Spark, and then update the configuration with your changes.

Chapter 3. Installing Spark

To install Spark manually, see Installing and Configuring Apache Spark in the Manual Installation Guide.

To install Spark on a Kerberized cluster, first read Installing Spark with Kerberos (the next topic in this Quick Start Guide).

Chapter 4. Installing Spark with Kerberos

Note the following considerations when installing Spark with Kerberos enabled:

  • You will need a Kerberos keytab when submitting a job.

  • Before running a Spark job, run kinit as a valid Kerberos user. The following example uses "smokeuser.headless.keytab" as a sample keytab file; user-id references a valid Kerberos account with HDFS access privileges.

    kinit -kt /etc/security/keytabs/smokeuser.headless.keytab <user-id>

    ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10

  • To start the Thrift server (tech preview) in a Kerberos cluster:

    1. Locate the Spark Thrift server on the same host as HiveServer2, to access the hiveserver2 keytab.

    2. Modify /var/run/spark and /var/log/spark to allow read/write access to the hive account.

    3. Use the hive account to start the Thrift server.

Chapter 5. Validating Spark

To validate the Spark installation, run the following Spark jobs:

Run the Spark Pi example

The Pi program tests compute-intensive tasks by calculating pi using an approximation method. The program “throws darts” at a circle -- it generates points in the unit square ((0,0) to (1,1)) and sees how many fall within the unit circle. The result approximates pi.

To run Spark Pi:

  1. Log on as a user with HDFS access--for example, your spark user (if you defined one) or hdfs. Navigate to a node with a Spark client and access the spark-client directory:

    su hdfs

    cd /usr/phd/current/spark-client

  2. Submit the Spark Pi job:

    ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10

    The job should complete without errors. It should produce output similar to the following:

    14/12/19 19:46:38 INFO impl.YarnClientImpl: Submitted application
    application_1419016680263_0002
    14/12/19 19:46:39 INFO yarn.Client: Application report for 
    application_1419016680263_0002 (state: ACCEPTED)
    14/12/19 19:46:39 INFO yarn.Client: 
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: default 
         start time: 1419018398442 
         final status: UNDEFINED 
         tracking URL: http://blue1:8088/cluster/app/application_1424284032717_0066 
         user: root

  3. To view the results in a browser, copy the URL tracking from the job output. Go to the associated URL. (In the following example, replace blue2 with your node name.)

  4. Check for errors. The value of pi should be listed near the end of the output messages, with the format "Pi is roughly <value>."

    You can also view results in a browser, using the Web UI and the application's tracking URL; for example:

    http://hdm:8088/proxy/application_1418792282603_1186852/

  5. Click the "logs" link in the bottom right.

    The browser shows the YARN container output after a redirect. You should see output similar to the following. (Additional output omitted for brevity.)

    14/12/22 17:13:30 INFO yarn.ApplicationMaster: Unregistering 
    ApplicationMaster with SUCCEEDED
    14/12/22 17:13:30 INFO impl.AMRMClientImpl: Waiting for 
    application to be successfully unregistered.
    14/12/22 17:13:30 INFO remote.RemoteActorRefProvider$RemotingTerminator: 
    Remoting shut down.
    14/12/22 17:13:30 INFO yarn.ApplicationMaster: Deleting staging 
    directory .sparkStaging/application_1419016680263_0005
    
    Log Type: stdout
    Log Upload Time: 22-Dec-2014 17:13:33
    Log Length: 23
    Pi is roughly 3.143824

Run the WordCount Example

WordCount is a simple program that counts how often a word occurs in a text file.

  1. Select an input file for the Spark WordCount example. You can use any text file as input.

  2. Upload the input file to HDFS. The following example uses log4j.properties as the input file:

    cd /usr/phd/current/spark-client/

    hadoop fs -copyFromLocal /etc/hadoop/conf/log4j.properties /tmp/data

  3. Run the Spark shell:

    ./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m

    You should see output similar to the following:

    14/12/22 17:27:38 INFO util.Utils: Successfully started service 'HTTP class server' on port 41936.
    Welcome to
      ____               __ 
     / __/ __  ___  ____/ /_ 
     _\ \/ _ \/ _ `/ __/ '_/
    /___/ .__/\_,_/_/ /_/\_\ version 1.2.0
       /_/
    
    Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_71)
    Type in expressions to have them evaluated.
    …
    4/12/22 17:28:27 INFO yarn.Client: Application report for application_1419016680263_0006 (state: ACCEPTED)
    14/12/22 17:28:28 INFO yarn.Client: 
      client token: N/A 
      diagnostics: N/A 
      ApplicationMaster host: N/A 
      ApplicationMaster RPC port: -1 
      queue: default 
      start time: 1419269306798 
      final status: UNDEFINED 
      tracking URL: http://sandbox.pivotal.io:8088/proxy/application_1419016680263_0006/ 
      user: root
    …
    14/12/22 17:29:23 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for 
    scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
    14/12/22 17:29:23 INFO repl.SparkILoop: Created spark context..Spark context available as sc.
    scala>

  4. Submit the job: at the scala prompt, type the following commands, replacing node names, file name and file location with your own values.

    val file = sc.textFile("hdfs://blue1:8020/tmp/data")

    val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ +_)

    counts.saveAsTextFile("hdfs://blue1:8020/tmp/wordcount")

    val file = sc.textFile("hdfs://red1:8020/tmp/data")

    val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

    counts.saveAsTextFile("hdfs://red1:8020/tmp/wordcount")

  5. To view the output from within the scala shell:

    counts.count()

    To print the output:

    counts.toArray().foreach(println)

    To view the output using HDFS:

    1. Exit the scala shell:

      scala > exit

    2. View WordCount job results:

      hadoop fs -l /tmp/wordcount

      You should see output similar to the following:

      /tmp/wordcount/_SUCCESS
      /tmp/wordcount/part-00000
      /tmp/wordcount/part-00001
    3. Use the HDFS cat command to list WordCount output. For example:

      hadoop fs -cat /tmp/wordcount/part-00000

Chapter 6. Accessing ORC Files

Apache Spark on PHD provides full support for ORC files.

The following example creates a Hive table with ORC format, writes data into the file, copies the table to HDFS, and uses ORC schema to infer the table schema from RDD.

  1. Create a new Hive table with ORC format:

    hiveContext.sql("create table orc_table(key INT, value STRING) stored as orc")

  2. Load data into the ORC table:

    hiveContext.hql("INSERT INTO table orc_table select * from testtable")

  3. Verify that data was loaded into the ORC table:

    hiveContext.hql("FROM orc_table SELECT *").collect().foreach(println)

  4. Read ORC Table from HDFS as HadoopRDD:

    val inputRead = sc.hadoopFile("hdfs://sandbox.pivotal.io:8020/apps/hive/warehouse/orc_table", classOf[org.apache.hadoop.hive.ql.io.orc.OrcInputFormat],classOf[org.apache.hadoop.io.NullWritable],classOf[org.apache.hadoop.hive.ql.io.orc.OrcStruct])

  5. Verify that you can manipulate the ORC record through RDD

    val k = inputRead.map(pair =>pair._2.toString)

    val c = k.collect

    You should see output similar to the following:

    ...  
    14/12/22 18:41:37 INFO scheduler.DAGScheduler: Stage 7 (collect at <console>:16) finished in 0.418 s  
    14/12/22 18:41:37 INFO scheduler.DAGScheduler: Job 4 finished: collect at <console>:16, took 0.437672 s  
    c: Array[String] = Array({238, val_238}, {86, val_86}, {311, val_311}, {27, val_27}, 
    {165, val_165}, {409, val_409}, {255, val_255}, {278, val_278}, {98, val_98}, {484, val_484}, 
    {265, val_265}, {193, val_193}, {401, val_401}, {150, val_150}, {273, val_273}, {224, val_224}, 
    {369, val_369}, {66, val_66}, {128, val_128}, {213, val_213}, {146, val_146}, {406, val_406}, 
    {429, val_429}, {374, val_374}, {152, val_152}, {469, val_469}, {145, val_145}, {495, val_495}, 
    {37, val_37}, {327, val_327}, {281, val_281}, {277, val_277}, {209, val_209}, {15, val_15}, 
    {82, val_82}, {403, val_403}, {166, val_166}, {417, val_417}, {430, val_430}, {252, val_252}, 
    {292, val_292}, {219, val_219}, {287, val_287}, {153, val_153}, {193, val_193}, {338, val_338}, 
    {446, val_446}, {459, val_459}, {394, val_394}, {2…
  6. Copy example table into HDFS

    cd SPARK_HOME hadoop dfs -put examples/src/main/resources/people.txt people.txt

  7. Run Spark-Shell

    ./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client

    At the Scala prompt type the following (except for the comments):

    import org.apache.spark.sql.hive.orc._  
    import org.apache.spark.sql._ 
    
    **# Load and register the spark table**  
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)  
    val people = sc.textFile("people.txt")  
    val schemaString = "name age"  
    val schema = StructType(schemaString.split(" ").map(fieldName => {if(fieldName == "name") StructField(fieldName, StringType, true) else StructField(fieldName, IntegerType, true)}))  
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), new Integer(p(1).trim)))
    
    **# Infer table schema from RDD**  
    val peopleSchemaRDD = hiveContext.applySchema(rowRDD, schema) 
    
    **# Create a table from schema**  
    peopleSchemaRDD.registerTempTable("people")  
    val results = hiveContext.sql("SELECT * FROM people")  
    results.map(t => "Name: " + t.toString).collect().foreach(println) 
    
    **# Save Table to ORCFile**  
    peopleSchemaRDD.saveAsOrcFile("people.orc")  
    
    **# Create Table from ORCFile**  
    val morePeople = hiveContext.orcFile("people.orc")  
    morePeople.registerTempTable("morePeople")  
    hiveContext.sql("SELECT * from morePeople").collect.foreach(println)
    

Chapter 7. Tuning Spark

When tuning Spark applications it is important to understand how Spark works and what types of resources your application requires. For example, machine learning tasks are usually CPU intensive, whereas extract-transform-load (ETL) operations are I/O intensive.

General performance guidelines:

  • Minimize shuffle operations where possible.

  • Match join strategy (ShuffledHashJoin vs. BroadcastHashJoin) to the table. This requires manual configuration.

  • Consider switching from the default serializer to the Kyro serializer to improve performance. This requires manual configuration and class registration.

If jobs take longer than expected or are not completing successfully, check the following resources to understand more about what a job is doing and where time is being spent.

  • Using Ambari: In the Ambari Services tab, select Spark (in the left column). Click on Quick Links and choose the Spark History Server UI. Ambari will display a list of jobs. Click App ID for job details.

  • Using the CLI: view job history and time spent in various stages of the job. Optimize based on where time is spent:

    http://<host>:8088/proxy/<app_id>/stages/

  • Use toDebugString() on RDD to see a list of RDD's that will be executed. This is useful for understanding how jobs will be executed.

Chapter 8. Troubleshooting Spark

When you run a Spark job, you will see a standard set of console messages. In addition, the following information is available:

  • A list of running applications, where you can retrieve the application ID and check the application log:

    yarn application –list

    yarn logs -applicationId <app_id>

  • Check the Spark environment for a specific job:

    http://<host>:8088/proxy/<job_id>/environment/

Specific Issues

The following paragraphs describe specific issues and possible solutions:

Issue: Job stays in "accepted" state; it doesn't run. This can happen when a job requests more memory or cores than available.

Solution: Assess workload to see if any resources can be released. You might need to stop unresponsive jobs to make room for the job.

Issue: Insufficient HDFS access. This can lead to errors such as the following:

“Loading data to table default.testtable
Failed with exception 
Unable to move sourcehdfs://red1:8020/tmp/hive-spark/hive_2015-03-04_
12-45-42_404_3643812080461575333-1/-ext-10000/kv1.txt to destination 
hdfs://red1:8020/apps/hive/warehouse/testtable/kv1.txt”

Solution: Make sure the user or group running the job has sufficient HDFS privileges to the location.

Issue: Wrong host in Beeline, shows error as invalid URL:

Error: Invalid URL: jdbc:hive2://localhost:10001 (state=08S01,code=0)

Solution: Specify the correct Beeline host assignment.

Issue: Error: closed SQLContext.

Solution: Restart the Thrift server.