KLAB 강창성 (kangcs@kr.ibm.com) Power of data. Simplicity of design. Speed of innovation.
순서 1. 기존분석계환경대안 2. Spark 플랫폼소개 3. Spark Streams 4. IBM Open Data Platform + Biginsight v4.0 5. RDD (Resilient Distributed Datasets) 원리 -- 원하는분만끝까지 나도해봤다?! 실습 1 : Biginsight v4.0 에서의 Spark 설치 - Spark shell 실행, Spark SQL, Spark Mlib, Spark Steasm 예제 실습 2: Spark on Docker (Docker 개념소개 ) - Spark shell 실행
기존데이터분석시스템 Hadoop/MapReduce 가대형데이터분석을쉽게만들어주긴해도지속되는갈증 - 더복잡하고, Multi-Stage 한처리 ( 머신러닝, 그래프 ) - Interactive, ad-hoc 한쿼리는잘못한다.
기존데이터분석시스템운영자피드백
Spark 로옮긴후..
디스크대신 RAM?? Why not?? 1. Hadoop File System - Modify 가안되는파일시스템 : 디스크스토리지 => RAM?? 2. RDD (Resilient Distributed Datasets) => read only, 이력기록
Spark 1. Spark SQL 2. Spark Streaming 3. MLlib 4. GraphX 5. SparkR (version 1.4)
Spark 에코시스템은계속진화중
Spark
Spark = RDD (Resilient Distribution Datasets)! RDD Operator : Transformations & Actions 1. Transformation 은 MR 의 map,reduce 보다명령어가풍부하다 - filter,join,union,match,gogroup 2. Transformation 에는자료가어떻게변해갈지는디자인하고실제계산은하지않는다. 3. Actions 는실제로 자모든 transformation 한결과를내놓아라 하는명령수행 (Lazy-execution 방식 )
Spark 함수소개..
Spark 와 Hadoop 를같이? Lambda Architecture
Spark 와 Hadoop 를같이? - 같이갑시다.
Spark SQL scala> weather.registertemptable("weather") scala> val hottest_with_precip = sqlcontext.sql("select * FROM weather WHERE precipitation > 0.0 ORDER BY temp DESC") hottest_with_precip: org.apache.spark.sql.schemardd = SchemaRDD[6] at RDD at SchemaRDD.scala:108 == Query Plan == == Physical Plan == Sort [temp#1 DESC], true Exchange (RangePartitioning [temp#1 DESC], 200) Filter (precipitation#2 > 0.0) PhysicalRDD [date#0,temp#1,precipitation#2], MapPartitionsRDD[4] at mappartitions at ExistingRDD.scala:36 scala>
Spark MLlib import org.apache.spark.mllib.clustering.kmeans import org.apache.spark.mllib.linalg.vectors val taxifile = sc.textfile("/tmp/labdata/sparkdata/nyctaxisub.csv") taxifile.count() val taxidata=taxifile.filter(_.contains("2013")).filter(_.split(",")(3)!="").filter(_.split(",")(4)!="") taxifile.count() val taxifence=taxidata.filter(_.split(",")(3).todouble>40.70). filter(_.split(",")(3).todouble<40.86). filter(_.split(",")(4).todouble>(-74.02)). filter(_.split(",")(4).todouble<(-73.93)) taxifence.count() val taxi=taxifence.map{line=>vectors.dense(line.split(',').slice(3,5).map(_.todouble))} val iterationcount=10 val clustercount=3 val model=kmeans.train(taxi,clustercount,iterationcount) val clustercenters=model.clustercenters.map(_.toarray) val cost=model.computecost(taxi)
Spark stream import org.apache.log4j.logger import org.apache.log4j.level Logger.getLogger("org").setLevel(Level.OFF) Logger.getLogger("akka").setLevel(Level.OFF) import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.streaming.streamingcontext._ val ssc = new StreamingContext(sc,Seconds(1)) val lines = ssc.sockettextstream("localhost",7777) val pass = lines.map(_.split(",")).map(pass=>(pass(15),pass(7).toint)).r educebykey(_+_) pass.print() The next two line starts the stream. ssc.start() ssc.awaittermination()
Spark 예제 스칼라 [root@3e14f256e3c4 examples]# run-example GroupByTest Spark assembly has been built with Hive, including Datanucleus jars on classpath 2000 [root@3e14f256e3c4 examples]# run-example SparkPi Spark assembly has been built with Hive, including Datanucleus jars on classpath Pi is roughly 3.13814 [root@3e14f256e3c4 examples]# pwd /usr/local/spark-1.2.1-bin-hadoop2.4/examples/src/main/scala/org/apache/spark/examples [root@3e14f256e3c4 examples]# 파이썬 > spark-submit /usr/local/spark-1.2.1-bin-hadoop2.4/examples/src/main/python/pi.py Spark assembly has been built with Hive, including Datanucleus jars on classpath Pi is roughly 3.150680 예제모음 [root@3e14f256e3c4 main]# pwd /usr/local/spark-1.2.1-bin-hadoop2.4/examples/src/main
Spark 예제 : wordcount Python, 단일노드 47 vi test.txt 48 hadoop fs -put test.txt /user/virtuser/ 49 spark-submit /usr/iop/4.0.0.0/spark/examples/src/main/python/wordcount.py test.txt 50 history [virtuser@rvm ~]$ cat test.txt a b c d a b 결과 15/06/25 09:00:54 INFO scheduler.dagscheduler: Job 0 finished: collect at /usr/iop/4.0.0.0/spark/examples/src/main/python/wordcount.py:33, took 1.355375 s a: 2 : 1 c: 1 b: 2 d: 1 15/06/25 09:00:54 INFO handler.contexthandler: stopped o.e.j.s.servletcontexthandler{/metrics/json,null} 15/06
Spark 예제 : wordcount 자바, 멀티노드 [root@nimbus hadoop]# jps 2777 Master --Spark Master 3203 QuorumPeerMain 주피터 3628 SecondaryNameNode 3762 Jps 3470 DataNode 3349 NameNode [root@supervisor1 ~]# jps 4187 Worker Spark Slave 4261 Jps -- 하둡파일읽어옴 [root@nimbus examples]# spark-submit --class JavaWordCount --verbose jar/spark.jar /data/click_thru_data.txt Spark assembly has been built with Hive, including Datanucleus jars on classpath Using properties file: null Using properties file: null 15/06/28 17:48:07 INFO spark.sparkcontext: Job finished: collect at JavaWordCount.java:66, took 0.836364074 s d: 1 a: 2 b: 1 : 1 r: 1 c: 2 root@nimbus examples]# hadoop fs -ls /data 15/06/28 17:56:51 WARN util.nativecodeloader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 1 items -rw-r--r-- 3 root supergroup 15 2015-06-28 17:47 /data/click_thru_data.txt [root@nimbus examples]#
Spark Monitor ( 암바리에서클릭클릭 ) History server : http://localhost:18080/
질문 1. 수행중메모리가모자라면어찌되나요? - LRU 로안쓰는파티션날림 2. 수행중 Fault 가나면 Recovery 영향은? - 특정파티션에문제가생기면다른노드에서계보 (lineage) 는가져옴. 3. 성능? : 좋다.
성능?
Stream 이야기 Spark Stream: stream 을 Transformation 하고또해서 Action 해보자.
Stream 의활용 - 알고리즘주식트레이딩 - 지난 30분간의주문상품 Top10 - 온라인몰사용자클릭스트림기반개인화추천 - 모바일광고타겟팅서비스 - 실시간금융사기방지
Stream 시스템의아키텍처 : EDA( Event-Driven Architecture) 1. Event producers 2. Event processing logic 3. Event consumers
Stream 과일반방식의비교
Stream 솔루션?
Stark + Kafka? Kafka 프로젝트통합모듈존재 KaVaInputDStream.scala https://github.com/apache/spark/tree/master/external/kava High level Consumer API로구현
실시간로그수집? 1. Publish-Subscribe 방식의고성능분산메세징시스템 (CDC, replication server..) 2. Linkedin SNA 팀에서개발하고서비스에사용중 - 2011 년시작, Scala, Java 구현 - 18,000 개의토픽, 하루에 2,200 억메시지처리 3. Scale-out 아키텍처
Spark 실행 on docker( 도커 ) https://registry.hub.docker.com/u/bigdatauniversity/spark2/ >docker pull bigdatauniversity/spark2 ( 내 PC 로이미지 download)
Spark 실행예시 on docker( 도커 ) docker@boot2docker:~$ docker images REPOSITORY TAG IMAGE ID CREATED VIRTUAL SIZE bigdatauniversity/spark2 latest 753e7307ea2c 4 days ago 2.419 GB docker@boot2docker:~$ docker ps -a CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES docker@boot2docker:~$ docker run -it --name bdu_spark2 -P -p 4040:4040 -p 4041:4041 -p 8080:8080 -p 8081:8081 bigdatauniversity/spark2: Starting Spark environment... starting namenode, logging to /var/log/hadoop-hdfs/hadoop-hdfs-namenode-3e14f256e3c4.out Started Hadoop namenode: [ OK ] starting datanode, logging to /var/log/hadoop-hdfs/hadoop-hdfs-datanode-3e14f256e3c4.out Started Hadoop datanode (hadoop-hdfs-datanode): [ OK ] starting resourcemanager, logging to /var/log/hadoop-yarn/yarn-yarn-resourcemanager-3e14f256e3c4.out Started Hadoop resourcemanager: [ OK ] starting nodemanager, logging to /var/log/hadoop-yarn/yarn-yarn-nodemanager-3e14f256e3c4.out Started Hadoop nodemanager: [ OK ] Starting sshd: [ OK ] starting org.apache.spark.deploy.history.historyserver, logging to /usr/local/spark-1.2.1-bin-hadoop2.4/sbin/../logs/spark-student-org. Zeppelin start [ OK ] Starting Spark environment... Done! [root@3e14f256e3c4 /]# docker@boot2docker:~$ docker ps -a CONTAINER ID IMAGE COMMAND CRE ATED STATUS PORTS NAMES 3e14f256e3c4 bigdatauniversity/spark2:latest "/etc/bootstrap.sh - 3 m inutes ago Up 3 minutes 0.0.0.0:4040-4041->4040-4041/tcp, 0.0.0.0:8 080-8081->8080-8081/tcp, 0.0.0.0:32782->22/tcp, 0.0.0.0:32780->8042/tcp, 0.0.0.0 :32781->8088/tcp, 0.0.0.0:32784->18080/tcp, 0.0.0.0:32783->50070/tcp, 0.0.0.0:32 785->50075/tcp bdu_spark2 docker@boot2docker:~$
Docker 이미지리스트 : 이미지를다운 (pull) 받아서컨테이너를실행한다! https://registry.hub.docker.com/
Docker 이미지리스트 : https://registry.hub.docker.com/ https://registry.hub.docker.com/u/sequenceiq/spark/ > 이미지가져오기 :>sudo docker pull < 이미지이름 >:< 태그 > > 컨테이너생성 : >docker run -it --name bdu_spark2 -P -p 4040:4040 -p 4041:4041 -p 8080:8080 -p 8081:8081 bigdatauniversity/spark2:latest
Spark 실습 (docker) docker@boot2docker:~$ docker attach bdu_spark2 [root@3e14f256e3c4 /]# hdfs dfs -ls /user/student/data Found 5 items -rw-r--r-- 1 student supergroup 8196 2015-04-15 16:38 /user/student/data/.ds_store drwxr-xr-x - student supergroup 0 2015-04-15 16:38 /user/student/data/sql drwxr-xr-x - student supergroup 0 2015-04-15 16:38 /user/student/data/stations drwxr-xr-x - student supergroup 0 2015-04-15 16:38 /user/student/data/trips drwxr-xr-x - student supergroup 0 2015-04-15 16:38 /user/student/data/weather [root@3e14f256e3c4 /]#
Spark 실행 on Biginsights 4.0
Spark 실행 on Biginsights v4 - Spark 가안보이면 Services 항목에서추가함 ( 설치후서비스재시작 )
Spark 실행 on 블루믹스 (beta) http://www.spark.tc/beta/
References http://kava.apache.org http://esper.codehaus.org http://storm.apache.org http://spark.apache.org/docs/latest/programming- guide.html https://www.usenix.org/system/files/conference/nsdi12/nsdi12- final138.pdf http://berlinbuzzwords.de/sites/berlinbuzzwords.de/files/media/documents/andrew_psales_- _sparkstreaming.pdf http://www.michael- noll.com/blog/2014/10/01/kava- spark- streaming- integraeon- example- tutorial http://www.slideshare.net/ptgoetz/apache- storm- vs- spark- streaming http://infinispan.org/infinispan- 7.0 http://blog.cloudera.com/blog/2014/07/jay- kreps- apache- kava- architect- visits- cloudera http://www.slideshare.net/tedwon/realemecepv26 http://www.slideshare.net/tedwon/real- Eme- bigdataanalyecspracecewithunstructureddata http://www.slideshare.net/tedwon/complex- event- processing- with- esper- 11106262 http://www.slideshare.net/tedwon/red- hat- forum- 2012- jboss- rhq
ASF projects http://www.apache.org
Spark http://spark.apache.org
Spark Download http://spark.apache.org/downloads.html IBM BigInsigts V4.0: Spark 1.2.1 IBM BigInsights V4.1: Spark 1.3.0
QnA What are IBM announcing? What is the market PoV on IBM and Spark? Why is IBM investing in Apache Spark? How will IBM make money from Apache Spark? How does Apache Spark fit within the IBM portfolio? How does Spark work with IBM BigInsights? Does Spark replace IBM Streams? Who are our target buyers for Spark? Who are the target users for Spark? How does Spark benefit business users? Is IBM proven participating in open source communities?. Etc..