Big Data/Spark

Apache Spark 설치

신씅 2014. 10. 1. 21:22
<Spark 1.0.2 설치>

* Putty로 1번 서버로 접속할 시 SSH Tunnels 설정에 L8081 -> localhost:8081과 L4040 -> localhost:4040을 추가하여 저장 후 연결하도록 한다.


----- 1번, 2번, 3번 서버에서 동일하게 수행한다.------

1. system에서 spark 계정을 만든 후 hdfs 계정으로 HDFS의 /user/spark 폴더를 만들고 $ chmod spark:hdfs로 해당 폴더에 spark 계정 권한 부여한다.
   root 계정에서 $ adduser spark -g hdfs 후 $ passwd spark 해서 passwd 설정
   hdfs 계정에서 $ hadoop fs -mkdir /user/spark 후 $ hadoop fs -chmod spark:hdfs /user/spark
   spark 계정에서 $ hadoop fs -mkdir ./test1 해서 폴더가 생성되는지 확인

2. 서버 간 spark 계정으로 password-less 한 접속 설정 (key 공유)
  - $ ssh-keygen 후 엔터키, 엔터키, 엔터키
  - $ ssh-copy-id -i .ssh/id_rsa.pub <각 3개 host> 후 연결 확인

3. Pre-built된 Spark 다운로드 후 압축 해제 https://spark.apache.org/downloads.html
   $ wget http://mirror.apache-kr.org/spark/spark-1.0.2/spark-1.0.2-bin-hadoop2.tgz

4. spark soft-link 만들기
   $ ln -s spark-1.0.2-bin-hadoop2 spark


----- 1번 서버에서 수행 -----

5. conf 폴더에서 spark-env.sh.template 를 spark-env.sh 파일로 복사
   $ conf/spark-env.sh.template conf/spark-env.sh
6. spark-env.sh 파일에서 다음을 추가

    SPARK_JAVA_OPTS=-Dspark.executor.memory=1g
    SPARK_MASTER_IP=bigdata20-01 (각 서버의 hostname)
    SPARK_MASTER_PORT=7077

7. spark-env.sh 를 2, 3번 서버에 복사
   $ scp spark-env.sh bigdata20-02:~/spark/conf/. && scp spark-env.sh bigdata20-03:~/spark/conf/.

8. conf/slaves 파일을 편집하여 localhost로 되어 있는 것을 2, 3번 서버 hostname을 입력한다.
   
   # A Spark Worker will be started on each of the machines listed below.
   bigdata20-02
   bigdata20-03

9. sbin/start-all.sh 실행 후 Web UI에서 master 및 slaves 가 떴는지 확인  (http://localhost:8081 에서 확인)
   $ spark/sbin/start-all.sh
   -> default로 Master Web UI는 8080 port를 사용하지만 Ambari등으로 이미 8080을 사용중에는 8081로 자동으로 변경됨




<Spark Shell 실습>

1. HDFS에 data 넣어주기
   $ wget http://192.168.10.2/training/eunsu/spark-input/16154.yaml  // Schema 파일
   $ wget http://192.168.10.2/training/eunsu/spark-input/ufo_awesome.tsv  // UFO data 파일
   $ hadoop fs -mkdir ./spark-input1
   $ hadoop fs -put ufo_awesome.tsv ./spark-input1/.

 -- Schema 와 data sample --

    h2. Fields:
    
    |_.Short name|_.Type|_.Description|
    |sighted_at|Long| Date the event occurred (yyyymmdd) |
    |reported_at|Long| Date the event was reported |
    |location|String| City and State where event occurred |
    |shape|String|One word string description of the UFO shape|
    |duration|String|Event duration (raw text field)|
    |description|String|A long, ~20-30 line, raw text description|
    
    h2. Snippet:
    
    |19980710|19980721| Brick, NJ| sphere|5 minutes|On the evening of July 10, 1998, I was walking near my home ... |
    |19940815|19980625| Rancho Mirage, CA| rectangle|45 seconds|An extreemly close sighting of a ... |
    |19970527|19981207| Arlington, VA| disk|A few seconds|As I was on my way home, ... |
    |19980828|19980830| Mt. Vernon, OR| fireball|4-5 sec|Bright Blue (as that of an arc welder) light, that lit up ... |
    |19981106|19981108| Bend, OR| formation|15-20 seconds|5 objects in straight line following one another... |


2. Local모드: 1번 서버에서 spark-shell 실행 후 잘 도는지 확인 후 exit
   $ bin/spark-shell
   -> Local모드이기 때문에 Master Web UI에는 아무것도 나타나지 않는다.

3. Cluster모드: 1번 서버에서 master 파라미터 주고 excutor메모리는 500MB로 주고 spark-shell 실행
   $ bin/spark-shell --master spark://bigdata20-01:7077 --executor-memory 500m
   -> spark cluster에서 사용가능한 메모리를 1GB로 설정했기 때문에 1GB보다 적은 사이즈를 executor 메모리로 할당해야 한다.

4. 1번에 HDFS에 올린 data를 input으로 RDD만든 후 여러가지 테스트

   scala> val ufo = sc.textFile("hdfs://bigdata20-01/user/spark/spark-input1/ufo_awesome.tsv")

   scala> ufo.count // 라인 개수 구하기
   
   scala> ufo.first // 첫번째 element 출력

   scala> ufo.take(5) // 5개 element 출력


   * report 시기를 년도별로 count
   
   - 2번째 column만 존재하도록 RDD 생성
   scala> val report = ufo.map(line => line.split("\t")(1))

   scala> report.take(10)

   scala> report.count

   - yyyymmdd -> yyyy 로만 바꾸고 mapreduce를 위해 tuple 형식으로 data를 바꿈
   scala> val report2 = report.map(date => (date.substring(0,4), 1))

   scala> report2.count

   scala> report2.take(10)


   - reduce 수행
   scala> val report3 = report2.reduceByKey((a,b) => (a + b))

   scala> report3.count

   scala> report3.collect.foreach(println)


   - 결과를 HDFS에 저장
   scala> report3.saveAsTextFile("hdfs://bigdata20-01/user/spark/spark-output")


   - 종료 후 HDFS 확인
   scala>  exit;
   scala> hadoop fs -cat hdfs://bigdata20-01/user/spark/spark-output/*





<Spark Application 제작>


1. PPT에 있는 데로 sbt 프로젝트 제작


2. 아래와 같이 spark-app/src/main/scala/SparkApp.scala 파일 만듬

------------------------------------------------------
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkApp {

  def main(args: Array[String]) {

    val conf = new SparkConf()
    val sc = new SparkContext(conf)

    val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    val distData = sc.parallelize(data)

    distData.collect().foreach(println)

  }
}
------------------------------------------------------


3. 아래와 같이 spark-app/src/main/scala/UfoStat.scala 파일 만듬 (spark-shell 실습했던 것)

----------------------------------------------------------------------------------------
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object UfoStat {

  def main(args: Array[String]) {

    val conf = new SparkConf()
    val sc = new SparkContext(conf)

    val ufo = sc.textFile("hdfs://bigdata20-01/user/spark/spark-input1/ufo_awesome.tsv")

    val report = ufo.map(line => line.split("\t")(1))

    val report2 = report.map(date => (date.substring(0,4), 1))

    val report3 = report2.reduceByKey((a,b) => (a + b))

    report3.collect.foreach(println)

    report3.saveAsTextFile("hdfs://bigdata20-01/user/spark/spark-ufo-output")

  }
}
----------------------------------------------------------------------------------------



4. sbt로 빌드
   $ sbt package

5. spark master에 app submit 

   $ ~/spark/bin/spark-submit --class SparkApp --master spark://bigdata20-01:7077 --deploy-mode client --executor-memory 500m ~/spark-app1/target/scala-2.10/spark-app_2.10-1.0.jar

   $ ~/spark/bin/spark-submit --class UfoStat --master spark://bigdata20-01:7077 --deploy-mode client --executor-memory 500m ~/spark-app1/target/scala-2.10/spark-app_2.10-1.0.jar

   -> $ hadoop fs -ls ./spark-ufo-output 으로 파일 생성 확인




<Spark streaming 실습>

1. Source 준비

   $ mkdir src/main/python
   $ vim src/main/python/hdfs-streaming-source.py 해서 아래를 추가해서 저장

------------------------------------------------------------------
#!/usr/bin/python

from random import randint
import time
from subprocess import call
import os


ranType = ["twitter", "facebook", "instagram", "pinterest"]
while True:

        epoch_time = int(time.time())
        randFile = "/tmp/" + str(epoch_time) + ".randnum"
        f = open(randFile, 'w')

        for i in range(10000):
                rani = "%d\n" % randint(1,999999)
                randomType = randint(0,3)
                ttype = ranType[randomType]
                f.write(ttype + "," + rani)

        call(["hadoop","fs","-put",randFile,"./hdfs-streaming"])
        if os.path.isfile(randFile):
                os.remove(randFile)
        else:
                print("Error: %s file not found" % randFile)

        time.sleep(5)


---------------------------------------------------------------------


   $ hadoop fs -mkdir ./hdfs-streaming
    -> HDFS내 source input directory 미리 생성
   $ python src/main/python/hdfs-streaming-source.py & 
    -> Background로 HDFS내 source 디렉토리에 5초에 한번씩 파일 생성 (random number)
   $ hadoop fs -ls ./hdfs-streaming/ 해서 5초에 한번씩 파일이 생기는지 확인
  

2. Spark Streaming App 생성
   $ vim src/main/scala/Streaming.scala


----------------------------------------------------------------------

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.StreamingContext._

import org.apache.log4j.Logger
import org.apache.log4j.Level

object Streaming {

  def defaultFilter(path: Path) = path.getName().endsWith(".randnum")

  def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))

    //val text = ssc.textFileStream("hdfs://bigdata20-01/user/spark/hdfs-streaming")
    val text = ssc.fileStream[LongWritable, Text, TextInputFormat]("hdfs://bigdata20-01/user/spark/hdfs-streaming", defaultFilter(_), true).map(_._2.toString)

    val textFile = text.filter(line => line.contains(",")).map(line => {
      val words = line.split(",")
      (words(0), words(1).toLong)
    }).reduceByKey((a,b) => (a + b))

    textFile.print()
    textFile.saveAsTextFiles("hdfs://bigdata20-01/user/spark/hdfs-streaming-output/output")

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
  }

}

----------------------------------------------------------------------

3. app 빌드 

   $ vim build.sbt 해서 아래를 추가 
     libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0"

   $ sbt package

4. spark app submit

   $ ~/spark/bin/spark-submit --class Streaming --master spark://bigdata20-01:7077 --deploy-mode client --executor-memory 500m ~/spa-app1/target/scala-2.10/spark-app_2.10-1.0.jar


5. Streaming Window 사용

   $ vim src/main/scala/Streaming.scala

----------------------------------------------------------------------

import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.StreamingContext._

import org.apache.log4j.Logger
import org.apache.log4j.Level

object Streaming {

  def defaultFilter(path: Path) = path.getName().endsWith(".randnum")

  def main(args: Array[String]) {

    Logger.getLogger("org").setLevel(Level.WARN)
    Logger.getLogger("akka").setLevel(Level.WARN)

    val conf = new SparkConf()
    val sc = new SparkContext(conf)
    val ssc = new StreamingContext(sc, Seconds(5))

    ssc.checkpoint("./hdfs-streaming-checkpoint/")

    //val text = ssc.textFileStream("hdfs://bigdata20-01/user/spark/hdfs-streaming")
    val text = ssc.fileStream[LongWritable, Text, TextInputFormat]("hdfs://bigdata20-01/user/spark/hdfs-streaming", defaultFilter(_), true).map(_._2.toString)

    val textFile = text.filter(line => line.contains(",") && line.split(",").size == 2).map(line => {
      val words = line.split(",")
      (words(0), words(1).toLong)
    }).reduceByKeyAndWindow((a,b) => (a + b), (a,b) => (a-b), Seconds(30), Seconds(10))


    textFile.print()
    textFile.saveAsTextFiles("hdfs://bigdata20-01/user/spark/hdfs-streaming-output/output")

    ssc.start() // Start the computation
    ssc.awaitTermination() // Wait for the computation to terminate
  }

}

----------------------------------------------------------------------

 -> 빌드후 submit