<Spark 1.0.2 설치>
* Putty로 1번 서버로 접속할 시 SSH Tunnels 설정에 L8081 -> localhost:8081과 L4040 -> localhost:4040을 추가하여 저장 후 연결하도록 한다.
----- 1번, 2번, 3번 서버에서 동일하게 수행한다.------
1. system에서 spark 계정을 만든 후 hdfs 계정으로 HDFS의 /user/spark 폴더를 만들고 $ chmod spark:hdfs로 해당 폴더에 spark 계정 권한 부여한다.
root 계정에서 $ adduser spark -g hdfs 후 $ passwd spark 해서 passwd 설정
hdfs 계정에서 $ hadoop fs -mkdir /user/spark 후 $ hadoop fs -chmod spark:hdfs /user/spark
spark 계정에서 $ hadoop fs -mkdir ./test1 해서 폴더가 생성되는지 확인
2. 서버 간 spark 계정으로 password-less 한 접속 설정 (key 공유)
- $ ssh-keygen 후 엔터키, 엔터키, 엔터키
- $ ssh-copy-id -i .ssh/id_rsa.pub <각 3개 host> 후 연결 확인
3. Pre-built된 Spark 다운로드 후 압축 해제 https://spark.apache.org/downloads.html
$ wget http://mirror.apache-kr.org/spark/spark-1.0.2/spark-1.0.2-bin-hadoop2.tgz
4. spark soft-link 만들기
$ ln -s spark-1.0.2-bin-hadoop2 spark
----- 1번 서버에서 수행 -----
5. conf 폴더에서 spark-env.sh.template 를 spark-env.sh 파일로 복사
$ conf/spark-env.sh.template conf/spark-env.sh
6. spark-env.sh 파일에서 다음을 추가
SPARK_JAVA_OPTS=-Dspark.executor.memory=1g
SPARK_MASTER_IP=bigdata20-01 (각 서버의 hostname)
SPARK_MASTER_PORT=7077
7. spark-env.sh 를 2, 3번 서버에 복사
$ scp spark-env.sh bigdata20-02:~/spark/conf/. && scp spark-env.sh bigdata20-03:~/spark/conf/.
8. conf/slaves 파일을 편집하여 localhost로 되어 있는 것을 2, 3번 서버 hostname을 입력한다.
# A Spark Worker will be started on each of the machines listed below.
bigdata20-02
bigdata20-03
9. sbin/start-all.sh 실행 후 Web UI에서 master 및 slaves 가 떴는지 확인 (http://localhost:8081 에서 확인)
$ spark/sbin/start-all.sh
-> default로 Master Web UI는 8080 port를 사용하지만 Ambari등으로 이미 8080을 사용중에는 8081로 자동으로 변경됨
<Spark Shell 실습>
1. HDFS에 data 넣어주기
$ wget http://192.168.10.2/training/eunsu/spark-input/16154.yaml // Schema 파일
$ wget http://192.168.10.2/training/eunsu/spark-input/ufo_awesome.tsv // UFO data 파일
$ hadoop fs -mkdir ./spark-input1
$ hadoop fs -put ufo_awesome.tsv ./spark-input1/.
-- Schema 와 data sample --
h2. Fields:
|_.Short name|_.Type|_.Description|
|sighted_at|Long| Date the event occurred (yyyymmdd) |
|reported_at|Long| Date the event was reported |
|location|String| City and State where event occurred |
|shape|String|One word string description of the UFO shape|
|duration|String|Event duration (raw text field)|
|description|String|A long, ~20-30 line, raw text description|
h2. Snippet:
|19980710|19980721| Brick, NJ| sphere|5 minutes|On the evening of July 10, 1998, I was walking near my home ... |
|19940815|19980625| Rancho Mirage, CA| rectangle|45 seconds|An extreemly close sighting of a ... |
|19970527|19981207| Arlington, VA| disk|A few seconds|As I was on my way home, ... |
|19980828|19980830| Mt. Vernon, OR| fireball|4-5 sec|Bright Blue (as that of an arc welder) light, that lit up ... |
|19981106|19981108| Bend, OR| formation|15-20 seconds|5 objects in straight line following one another... |
2. Local모드: 1번 서버에서 spark-shell 실행 후 잘 도는지 확인 후 exit
$ bin/spark-shell
-> Local모드이기 때문에 Master Web UI에는 아무것도 나타나지 않는다.
3. Cluster모드: 1번 서버에서 master 파라미터 주고 excutor메모리는 500MB로 주고 spark-shell 실행
$ bin/spark-shell --master spark://bigdata20-01:7077 --executor-memory 500m
-> spark cluster에서 사용가능한 메모리를 1GB로 설정했기 때문에 1GB보다 적은 사이즈를 executor 메모리로 할당해야 한다.
4. 1번에 HDFS에 올린 data를 input으로 RDD만든 후 여러가지 테스트
scala> val ufo = sc.textFile("hdfs://bigdata20-01/user/spark/spark-input1/ufo_awesome.tsv")
scala> ufo.count // 라인 개수 구하기
scala> ufo.first // 첫번째 element 출력
scala> ufo.take(5) // 5개 element 출력
* report 시기를 년도별로 count
- 2번째 column만 존재하도록 RDD 생성
scala> val report = ufo.map(line => line.split("\t")(1))
scala> report.take(10)
scala> report.count
- yyyymmdd -> yyyy 로만 바꾸고 mapreduce를 위해 tuple 형식으로 data를 바꿈
scala> val report2 = report.map(date => (date.substring(0,4), 1))
scala> report2.count
scala> report2.take(10)
- reduce 수행
scala> val report3 = report2.reduceByKey((a,b) => (a + b))
scala> report3.count
scala> report3.collect.foreach(println)
- 결과를 HDFS에 저장
scala> report3.saveAsTextFile("hdfs://bigdata20-01/user/spark/spark-output")
- 종료 후 HDFS 확인
scala> exit;
scala> hadoop fs -cat hdfs://bigdata20-01/user/spark/spark-output/*
<Spark Application 제작>
1. PPT에 있는 데로 sbt 프로젝트 제작
2. 아래와 같이 spark-app/src/main/scala/SparkApp.scala 파일 만듬
------------------------------------------------------
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object SparkApp {
def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
val distData = sc.parallelize(data)
distData.collect().foreach(println)
}
}
------------------------------------------------------
3. 아래와 같이 spark-app/src/main/scala/UfoStat.scala 파일 만듬 (spark-shell 실습했던 것)
----------------------------------------------------------------------------------------
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
object UfoStat {
def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val ufo = sc.textFile("hdfs://bigdata20-01/user/spark/spark-input1/ufo_awesome.tsv")
val report = ufo.map(line => line.split("\t")(1))
val report2 = report.map(date => (date.substring(0,4), 1))
val report3 = report2.reduceByKey((a,b) => (a + b))
report3.collect.foreach(println)
report3.saveAsTextFile("hdfs://bigdata20-01/user/spark/spark-ufo-output")
}
}
----------------------------------------------------------------------------------------
4. sbt로 빌드
$ sbt package
5. spark master에 app submit
$ ~/spark/bin/spark-submit --class SparkApp --master spark://bigdata20-01:7077 --deploy-mode client --executor-memory 500m ~/spark-app1/target/scala-2.10/spark-app_2.10-1.0.jar
$ ~/spark/bin/spark-submit --class UfoStat --master spark://bigdata20-01:7077 --deploy-mode client --executor-memory 500m ~/spark-app1/target/scala-2.10/spark-app_2.10-1.0.jar
-> $ hadoop fs -ls ./spark-ufo-output 으로 파일 생성 확인
<Spark streaming 실습>
1. Source 준비
$ mkdir src/main/python
$ vim src/main/python/hdfs-streaming-source.py 해서 아래를 추가해서 저장
------------------------------------------------------------------
#!/usr/bin/python
from random import randint
import time
from subprocess import call
import os
ranType = ["twitter", "facebook", "instagram", "pinterest"]
while True:
epoch_time = int(time.time())
randFile = "/tmp/" + str(epoch_time) + ".randnum"
f = open(randFile, 'w')
for i in range(10000):
rani = "%d\n" % randint(1,999999)
randomType = randint(0,3)
ttype = ranType[randomType]
f.write(ttype + "," + rani)
call(["hadoop","fs","-put",randFile,"./hdfs-streaming"])
if os.path.isfile(randFile):
os.remove(randFile)
else:
print("Error: %s file not found" % randFile)
time.sleep(5)
---------------------------------------------------------------------
$ hadoop fs -mkdir ./hdfs-streaming
-> HDFS내 source input directory 미리 생성
$ python src/main/python/hdfs-streaming-source.py &
-> Background로 HDFS내 source 디렉토리에 5초에 한번씩 파일 생성 (random number)
$ hadoop fs -ls ./hdfs-streaming/ 해서 5초에 한번씩 파일이 생기는지 확인
2. Spark Streaming App 생성
$ vim src/main/scala/Streaming.scala
----------------------------------------------------------------------
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
object Streaming {
def defaultFilter(path: Path) = path.getName().endsWith(".randnum")
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val conf = new SparkConf()
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
//val text = ssc.textFileStream("hdfs://bigdata20-01/user/spark/hdfs-streaming")
val text = ssc.fileStream[LongWritable, Text, TextInputFormat]("hdfs://bigdata20-01/user/spark/hdfs-streaming", defaultFilter(_), true).map(_._2.toString)
val textFile = text.filter(line => line.contains(",")).map(line => {
val words = line.split(",")
(words(0), words(1).toLong)
}).reduceByKey((a,b) => (a + b))
textFile.print()
textFile.saveAsTextFiles("hdfs://bigdata20-01/user/spark/hdfs-streaming-output/output")
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}
----------------------------------------------------------------------
3. app 빌드
$ vim build.sbt 해서 아래를 추가
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.0.0"
$ sbt package
4. spark app submit
$ ~/spark/bin/spark-submit --class Streaming --master spark://bigdata20-01:7077 --deploy-mode client --executor-memory 500m ~/spa-app1/target/scala-2.10/spark-app_2.10-1.0.jar
5. Streaming Window 사용
$ vim src/main/scala/Streaming.scala
----------------------------------------------------------------------
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
object Streaming {
def defaultFilter(path: Path) = path.getName().endsWith(".randnum")
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.WARN)
val conf = new SparkConf()
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("./hdfs-streaming-checkpoint/")
//val text = ssc.textFileStream("hdfs://bigdata20-01/user/spark/hdfs-streaming")
val text = ssc.fileStream[LongWritable, Text, TextInputFormat]("hdfs://bigdata20-01/user/spark/hdfs-streaming", defaultFilter(_), true).map(_._2.toString)
val textFile = text.filter(line => line.contains(",") && line.split(",").size == 2).map(line => {
val words = line.split(",")
(words(0), words(1).toLong)
}).reduceByKeyAndWindow((a,b) => (a + b), (a,b) => (a-b), Seconds(30), Seconds(10))
textFile.print()
textFile.saveAsTextFiles("hdfs://bigdata20-01/user/spark/hdfs-streaming-output/output")
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
}
}
----------------------------------------------------------------------
-> 빌드후 submit