2017 년정보처리학회단기강좌 빅데이터플랫폼과 Spark - 장형석 - 국민대학교빅데이터경영 MBA 과정교수 - chjang1204@nate.com
Part I 빅데이터플랫폼
1. 빅데이터플랫폼 빅데이터수집 / 분석 / 서비스를위한목표플랫폼 빅데이터플랫폼 데이터 서비스 서비스 / 시각화 시각화 활용 내부DB 외부DB 공공DB 문서파일포털 (WEB) SNS A P I A g e n t SFTP HTTPS REST SOAP Agent Download SFTP Agent Download Crawling 빅데이터통합연계수집 OA, 보안,... 그룹사 A, B,... 분야 A, B,... 외부서비스 A, B,... 통계 / 분석 형태 정형 / 준정형 비정형 통계 / 분석 / 데이터마이닝 데이터마이닝 관리 메타데이터 데이터모델 텍스트마이닝 분석 소셜네트워크분석 저장 / 처리 데이터원본 DB 기초데이터 기초데이터 RDBMS/MPP NoSQL 분산파일시스템 연계 / 수집 추출전송변환저장 인프라 분석결과 DB 결과데이터 결과데이터 실시간분석 데이터시각화 고급분석 머신러닝 서비스시각화 처리 예측분석 배치처리 CEP SQL 경영진부서그룹사외부개인고객공공 통합보안 H/W N/W S/W 인증
1. 빅데이터플랫폼 빅데이터플랫폼과소프트웨어 데이터 DB 서비스 / 시각화 빅데이터분석가및고객 ( 서비스 ) 을위한웹인터페이스 분석 웹 / 모바일 Application D3.js DB A P I 통계 / 분석 / 마이닝고급분석통계 / 분석데이터마이닝 SNA 머신러닝실시간분석 R Pig Spark Mahout DB 형태 정형 / 준정형 구분 내부 DB 데이터원본 DB 기초데이터 저장 / 처리 분석결과 DB 결과데이터 Impala Hive 비정형 외부 /SNS 기초데이터 결과데이터 HDFS MapReduce 파일 로그 A g e n t RDBMS/MPP NoSQL 분산파일시스템 연계 / 수집 추출전송변환저장 인프라 Flume Sqoop 통합보안 H/W N/W S/W 인증 클라우드 (OpenStack) 클러스터매니저 (YARN)
2. 빅데이터프로세스및기술 빅데이터프로세스 데이터소스수집저장처리분석표현 내부데이터 수동 - 정형 - 비정형 - 준정형 배치처리 ( 맵리듀스 ) 전처리 서비스 ( 활용 ) 시각화 외부데이터 자동 - 로그수집기 - 크롤러 - 센서수집기 분산파일시스템 NoSQL SQL 분석 통계데이터마이닝머신러닝 데이터 ( 분석 ) 시각화 사물인터넷 ( 센서 ) 실시간처리
2. 빅데이터프로세스및기술 수집개요및기술 정형 로그수집기 반정형 크롤러 센서데이터수집기 빅데이터 저장소 비정형 Open API < 빅데이터의주요수집기술 > 기술 개발 최초공개 주요기능및특징 Sqoop 아파치 2009년 RDBMS와 HDFS(NoSQL) 간의데이터연동 Flume Cloudera 2010년 방대한양의이벤트로그수집 Kafka Linkedin 2010년 분산시스템에서메시지전송및수집
2. 빅데이터프로세스및기술 저장기술 구분기술최초개발주요기능및특징 HDFS 아파치대표적인오픈소스분산파일시스템 분산파일시스템 Hive 페이스북 HDFS 기반의 DataWarehouse S3 아마존아마존의클라우드기반분산스토리지서비스 HBase 아파치 HDFS 기반의 NoSQL NoSQL Cassandra A. Lakshman ACID 속성을유지한분산데이터베이스 Mongo DB 10gen DB 의수평확장및범위질의지원, 자체맵리듀스 처리기술 배치처리 : 하둡맵리듀스, Pig, Hive 분산병렬데이터처리기술의표준, 일반범용서버로구성된군집화시스템을기반으로 < 키, 값 > 입력데이터분할처리및처리결과통합기술, job 스케줄링기술, 작업분배기술, 태스크재수행기술이통합된분산컴퓨팅기술 SQL on Hadoop : Hive on Tez, Impala, Presto, Shark(SparkSQL) 배치처리중심의맵리듀스의한계를넘기위해만들어진 SQL 기반의자체쿼리실행엔진.
2. 빅데이터프로세스및기술 분석기술 R 오픈소스통계분석소프트웨어. 기본적인통계분석부터최신머신러닝까지다양한패키지를지원. 하둡및스팍과연동이가능 R, RStudio, RHadoop, RHive, SparkR Python 데이터마이닝과머신러닝을지원하는다양한패키지통계학과의기본적인프로그래밍언어로정착스크립트기반의대화형분석환경지원 ipython Notebook Mahout 추천시스템, 분류, 군집등머신러닝기능을지원하둡기반의머신러닝 Java 라이브러리현재 Spark의 MLlib로발전함 Spark 대화형분석, 머신러닝, SQL, 그래프알고리즘등다양한분석가능 Spark Core, Spark MLlib, SparkSQL, GraphX, Spark Streaming
2. 빅데이터프로세스및기술 실시간수집및처리사례 출처 : 엄태욱, 2015/12/18, SK 플래닛기술블로그 http://readme.skplanet.com/?p=12465
3. 빅데이터인프라 < 개발 / 파일럿시스템구성 > 수집서버 x86 2CPU, 64G RAM 36TB SATA HDD Linux 웹크롤러수집애플리케이션 분석서버 ( 하둡 / 스팍클라이언트 ) x86 2CPU, 64G RAM 36TB SATA HDD Linux Hadoop Ecosystem Clients(Hive,Spark 등 ) 분석서버 (RStudio Server) x86 2CPU, 64G RAM 36TB SATA HDD Linux R(Linux Server Version) 마스터노드 데이터노드 마스터노드 1 마스터노드 2 데이터노드 1 데이터노드 2 데이터노드 3 x86 2CPU, 96G RAM 4TB SAS HDD Linux NameNode(Acitve) Metastore(Mysql Server) x86 2CPU, 96G RAM 4TB SAS HDD Linux 2st NameNode(Stanby) YARN/Spark Master x86 2CPU, 64G RAM 36TB SATA HDD Linux DataNode TaskTracker Spark Worker x86 2CPU, 64G RAM 36TB SATA HDD Linux DataNode TaskTracker Spark Worker x86 2CPU, 64G RAM 36TB SATA HDD Linux DataNode TaskTracker Spark Worker
3. 빅데이터인프라 하둡클러스터 수집용 마스터노드 [ 수집서버 1] 수집서버 2 크롤링서버 [ 마스터노드 1] [ 마스터노드 2] 마스터노드 3 x86 2CPU, 96G RAM 4TB SAS HDD Linux NameNode(active) Metastore(Mysql) x86 2CPU, 96G RAM 4TB SAS HDD Linux 2st NameNode(Stanby) YARN/Spark Master x86 2CPU, 96G RAM 4TB SAS HDD Linux Cluster Manager Backup x86 2CPU, 64G RAM 36TB SATA HDD ETL Linux x86 2CPU, 64G RAM 36TB SATA HDD ETL Linux 분석용 x86 2CPU, 64G RAM 36TB SATA HDD Crawler LInux 데이터노드 [ 분석서버 1] [ 분석서버 2] 시각화서버 [ 데이터노드 1] x86 2CPU, 64G RAM 36TB SATA HDD Linux DataNode TaskTracker Spark Worker [ 데이터노드 2] [ 데이터노드 3] 데이터노드 4 데이터노드 5 x86 2CPU, 64G RAM 36TB SATA HDD Hadoop Clients(Hive 등 ) Linux x86 2CPU, 64G RAM 36TB SATA HDD RStudio Server Linux 서비스용 x86 2CPU, 64G RAM 36TB SATA HDD Tableau Server Linux 데이터노드 6 데이터노드 7 데이터노드 8 데이터노드 9 데이터노드 10 웹서버 1 x86 2CPU, 64G RAM 8TB SATA HDD WebServer/WAS 웹서버 2 x86 2CPU, 64G RAM 8TB SATA HDD WebServer/WAS
Part II 하둡에코시스템
1. 분산병렬처리 처리속도의발전 1단계 : CPU의 Clock Frequency 늘리기 - 무어의법칙 : 동일한면적의반도체직접회로의트랜지스터의개수가 18개월에 2배씩증가 - CPU Clock 물리적인한계 : 최대 4Ghz 2 단계 : Single CPU 에서 Multi CPU & Core - 1 CPU -> 2 CPU( 최적 ) -> 4 CPU -> 8 CPU - 1 CPU : 1 Core -> 2 Core -> 4 Core -> 6 Core -> 8 Core 2 배가아닌 10 배씩증가 3 단계 : Single Machine -> Cluster( N 대의 Machine 을 Network 로연결 ) - 수퍼컴퓨터 - 하둡클러스터 -> N 대의범용컴퓨터를묶은클러스터 ( 하둡 HDFS 와맵리듀스 )
1. 분산병렬처리 분산병렬처리를위한맵리듀스프레임워크 구글 - 2003년 GFS : 구글분산파일시스템아키텍처 -> 논문공개 - 2004년 MapReduce : 구글맵리듀스아키텍처 -> 논문공개 하둡 - 2006년 2월더그커팅 : 오픈소스로하둡 (HDFS,MapReduce) 공개 - 2008년야후 : 900노드에서 1테라바이트정렬 : 209초 ( 세계최고기록 ) - Hadoop, Pig, Hive, HBase, Mahout, Zookeeper... Spark : 하둡에코시스템및스팍으로발전 오픈소스 범용컴퓨터 수집 / 저장 / 처리 / 분석 / 시각화기술
1. 분산병렬처리 하둡 MapReduce 출처 : https://developer.yahoo.com/hadoop/tutorial/module4.html https://blog.pivotal.io/pivotal/products/hadoop-101-programming-mapreduce-with-native-libraries-hive-pig-and-cascading
2. Hadoop Ecosystem 빅데이터핵심기술 하둡 하둡에코시스템 < 분산파일시스템 + 분산병렬처리 > [ MapReduce ] CPU CPU CPU CPU RAM RAM RAM RAM... CPU CPU RAM RAM 모니터링및관리 1.Zookeeper 워크플로우 2.Oozie 분산데이터스토어 3.HBase 비정형데이터수집 9.Chukwa 10.Flume 11.Scribe 배치분석 4.Pig 5.Hive 기계학습 6.Mahout 메타데이터관리 7.HCatalog 분산병렬처리 (MapReduce) 분산파일시스템 (HDFS) 정형데이터수집 12.Sqoop 13.Hiho 데이터직렬화 8.Avro Disk Disk Disk Disk [ HDFS ] Disk Disk 1. Zookeeper : 분산환경에서서버들간에상호조정이필요한다양한서비스제공 2. Oozie : 하둡작업을관리하는워크플로우및코디네이터시스템 3. Hbase : HDFS 기반의컬럼 NoSQL 4. Pig : 복잡한 MapReduce 프로그래밍을대체할 Pig Latin 언어제공 5. Hive : 하둡기반의데이터웨어하우스, 테이블단위의데이터저장과 SQL 쿼리를지원 6. Mahout : 하둡기반으로데이터마이닝알고리즘을구현한오픈소스라이브러리 7. Hcatalog : 하둡기반의테이블및스토리지관리 8. Avro : RPC(Remote Procedure Call) 과데이터직렬화를지원하는프레임워크 9. Chukwa : 분산환경에서생성되는데이터를 HDFS 에안정적으로저장시키는플랫폼 10. Flume : 소스서버에에이전트가설치, 에이전트로부터데이터를전달받는콜랙터로구성 11. Scribe : 페이스북에서개발, 데이터수집플랫폼, Chukwa 와달리중앙집중서버로전송 12. Sqoop : 대용량데이터전송솔루션이며, HDFS, RDBMS, DW,NoSQL 등다양한저장소에대용량데이터를신속하게전송할수있는방법제공 13. Hiho : Sqoop 과같은대용량데이터전송솔루션이며, 하둡에서데이터를가져오기위한 SQL 을지정할수있으며, JDBC 인터페이스를지원
2. Hadoop Ecosystem 출처 : 호튼웍스 http://hortonworks.com/
Part III Spark 의이해
Spark 개요 Spark 개요 하둡 MapReduce 보다발전된새로운분산병렬처리 Framework - 저장소는로컬파일시스템, 하둡 HDFS, NoSQL(Hbase, Redis), RDBMS( 오라클,MSSQL) - Spark는분산병렬처리엔진 기존 Hive, Pig, Mahout, R, Storm 등을모두대체가능 - 분산병렬처리엔진인 Spark Core를기반으로 - 다양한내장패키지를지원
Spark 개요 Spark 의특징 하둡에코시스템발전의최고봉 - 구현원리및아키텍처는 Stinger Initiative와유사 - 레코드기반과컬럼기반데이터저장이모두가능 - YARN, Hive, Sqoop, Kafka 등다양한하둡에코시스템과통합
Spark 개요 Spark vs MapReduce : Word Count
Spark 개요 구성요소
Spark 개요 Process & Scheduling
Spark 개요 Data Process Model MapReduce
Spark 개요 Data Process Model Spark
Spark 개요 Spark runs on Hadoop Cluster
Spark 개요 Example : Log Mining
Spark 개요 성능 : Logistic Regression
Part IV Spark 아키텍처
Spark 아키텍처와구현원리 함수형언어 : Scala 분산병렬처리에최적화된함수형언어의필요성 - 구글 MapReduce 프레임워크 : C/C++ 언어 - 하둡 MapReduce 프레임워크 : Java 6 / Java 7 - 하둡 Tez 프레임워크 : Java 7 / Java 8 - Spark Core 프레임워크 : Scala / Java 8 함수형언어 - 함수형언어 vs 절차형언어또는명령형언어 - 대표적인함수형언어로는 SQL # MapReduce의구현목표는분산병렬처리가가능한데이터집계기능 (SQL) # HiveQL, SQL on Hadoop( 임팔라, 타조, 프레스토등 ) - Data Work Flow는 DAG(Directed Acyclic Graph, 방향성비사이클그래프 ) # Start -> 입력 (Input) -> 처리 -> 분기 -> 처리 -> 병합 -> 처리 -> 결과 (Output) -> End - 입력이같으면결과도같아야함 - 단일머신에서병렬처리 & 클러스터에서분산병렬처리가가능 - 명령형언어이면서함수형언어의특징을가진대표적인언어 : R, Java Script, Python
Spark 아키텍처와구현원리 함수형언어 : Scala 프로그래밍아키텍처 - OS : Linux - JVM( 자바가상머신 ) : 자바애플리케이션을실행하기위한가상머신 - Java 8 / Scala 프로그래밍언어 : JVM 기반의애플리케이션개발언어 - 컴파일 vs 스크립트컴파일방식 : 컴파일후실행파일 (JAR) 을각머신에배포해야함 ( Compile -> Job Submit ) 스크립트방식 : 컴파일과정없이스크립트를그때마다해석하여 JVM에서바로실행 ( Console ) Java 컴파일 Java 실행파일 [ Console / Shell ] Console > Scala 스크립트 1 Source Code (Byte Code) Console > Scala 스크립트 2 Java Virtual Machine Linux OS
Spark 아키텍처와구현원리 함수형언어 : Scala 분산병렬처리 : 프로그래밍실행구조 하둡 MapReduce 방식 - 하둡 MapReduce : Java/Pig/Hive 에서컴파일후 JobTracker 에 Submit -> 각머신에서실행 - Spark : 각머신에실행코드를바로전송후실행 [ Master 머신 ] JobTracker [ 클라이언트머신 ] 3.Job Submit 4.Task Assign ( 스케줄링 ) hive> 쿼리 ; 1.MapReduce [ Worker 머신 ] [ Worker 머신 ] [ Worker 머신 ] 코드생성 TaskTracker TaskTracker TaskTracker Java Source Code 5.JAR 배포 JAR 실행파일 JAR 실행파일 JAR 실행파일 2. 컴파일 6.fork() 6.fork() 6.fork() JAR 실행파일 JVM Linux OS JVM Linux OS JVM Linux OS
Spark 아키텍처와구현원리 함수형언어 : Scala 분산병렬처리 : 프로그래밍실행구조 - 하둡 MapReduce : Java/Pig/Hive 에서컴파일후 JobTracker 에 Submit -> 각머신에서실행 - Spark : 각머신에실행코드를전송후바로실행 Spark 방식 [ 클라이언트머신 ] [ Worker 머신 ] [ Worker 머신 ] [ Worker 머신 ] Spark Spark Spark Executor Executor Executor 2.fork() 2.fork() 2.fork() Scala> 스크립트 ; 1. 실행코드전송 JVM Linux OS JVM Linux OS JVM Linux OS [ Spark Driver ] ( 스케줄링 )
Spark 아키텍처와구현원리 함수형언어 : Scala [ 하둡 MapReduce ] vs [ Spark ] [ 클라이언트 ] Pig/Hive Shell >... [ 클라이언트 ] Spark Driver [ Master ] Scala shell >... Job Tracker [ Worker ] [ Worker ] [ Worker ] [ Worker ] [ Worker ] [ Worker ] Task Tracker Task Tracker Task Tracker Spark Executor Spark Executor Spark Executor
Spark 아키텍처와구현원리 Data Work Flow Job DAG 의이해 - [ 하둡 MapReduce] Job : Input -> Map -> Shuffle -> Reduce -> Output - Job DAG : 1 번째 Job -> 2 번째 Job ->... -> N 번째 Job 단일 Job 2 Map Shuffle 3 Reduce 4 1 Input HDFS Output 5
Spark 아키텍처와구현원리 Data Work Flow 단일 [ 하둡 MapReduce] Job 네트워크전송 Local I/O [ DataNode ] [ Worker ] [ Worker ] [ DataNode ] Disk Disk CPU CPU CPU CPU Disk Disk CPU CPU CPU CPU Disk Disk Disk Disk RAM RAM RAM RAM Disk Disk RAM RAM RAM RAM Disk Disk [ DataNode ] [ Worker ] [ Worker ] [ DataNode ] Disk Disk CPU CPU CPU CPU Disk Disk CPU CPU CPU CPU Disk Disk Disk Disk RAM RAM RAM RAM Disk Disk RAM RAM RAM RAM Disk Disk [ DataNode ] [ Worker ] [ Worker ] [ DataNode ] Disk Disk CPU CPU CPU CPU Disk Disk CPU CPU CPU CPU Disk Disk Disk Disk RAM RAM RAM RAM Disk Disk RAM RAM RAM RAM Disk Disk 1 2 3 4 5
Spark 아키텍처와구현원리 Data Work Flow [ 하둡 MapReduce] Job DAG : 1 번째 Job -> 2 번째 Job ->... -> N 번째 Job 1번째 Job 2번째 Job 4번째 Job 5번째 Job 3 번째 Job Input Data 임시 Data Input Data 임시 Data 임시 Data Output Data HDFS
Spark 아키텍처와구현원리 Data Work Flow 하둡 MapReduce의단점 - Disk I/O가크다. - 네트워크부하가크다. - 전체 Work을여러단계의 Job으로분리하는것은굉장히어렵고이해하기도힘들다. 대안 => 스파크 - 입출력사이의중간과정은 Disk대신메모리에서처리. - 작업은단일 Job : 대신여러단계의 Stage로구분해서내부적으로실행됨. - 배치성작업뿐만아니라대화형분석이가능 - Data Work Flow : RDD(Resilient Distributed Dataset, 탄력적인분산데이터셋 ) Graph로유지관리. - Input, Map, Shuffle, Reduce, Output을위한체계적인 Operator 제공. - Operator는 Transformation과 Action로크게구분. - Action 연산시에만실제실행 : 지연실행 (Lazy Evaluation) - Persist( ) 를요청하면특정 RDD를인메모리에상주시킴. - 물리적인파티션을직접관리할수있음.
Spark 아키텍처와구현원리 Spark RDD 와인메모리방식 RDD 개요 - Resilient Distributed Dataset : 탄력적인분산데이터셋 - Resilient : 처리과정에서일부데이터가손상되어서복구가가능 ( 부분손상 -> 부분복구 ) - Distributed : 처리과정에서데이터를여러머신에분산저장 => 파티션 (Partition) - RDD Graph : [Input] -> RDD -> RDD ->... -> RDD -> [Output] RDD RDD RDD RDD RDD RDD RDD Input Input 데이터저장소 (HDFS or RDBMS) Output Output
Spark 아키텍처와구현원리 Spark RDD 와인메모리방식 RDD 논리및물리계획 A Block1 A Block2 A Block3 B B Block1 Block2 C Block1 [ HDFS ] [ Worker ] [ Worker ] [ Worker ] Input - A Input - B Input - C B Part1 B Part1 RDD - B B1 Part1 B1 Part1 C Part1 RDD B1 RDD - C A Part1 A Part2 A Part3 BC Part1 BC Part2 BC Part3 RDD - A RDD - BC [ Worker ] [ Worker ] [ Worker ] A BC A BC A BC RDD - ABC B1 = B.filter( ) BC = B1.union( C ) ABC Part1 ABC Part2 ABC Part3 RDD - D ABC = A.join( BC ) D = ABC.aggregate( ) D Part1 D Part2 D Part3 Output - D 논리계획 물리계획 D Block1 D Block2 D Block3 [ HDFS ]
Spark 아키텍처와구현원리 Spark 연산및대화형분석 Transformation과 Action - Transformation은입력데이터셋을단계별 RDD로변형하는연산자 - Action은결과를콘솔에서보거나 HDFS등에저장하는연산자 - Action 연산자를실행하면입력소스에서데이터를불러와서처리하고그결과를보여주거나저장 => 즉 Transformation 연산은실제실행되지않고 Action 연산을요청할때에만실행되므로이런방식을 Lazy Evaluation( 지연실행 ) 이라고부름 특징 - Pig, Hive 등이제공하는연산자보다직관적이고효율적인다수의연산자를지원함 - 주의할점은 Action을요청할때마다데이터를입력소스에서불러와서처리함 => 원하는 RDD에 persist( ) 메소드로분산캐싱을명시하는방법을제공 - 대화형으로동일한입력데이터셋을처리하고요약 ( 집계 ) 할때효율적임 - Spark 연산자를이용한다양한라이브러리를제공
Spark 아키텍처와구현원리 Spark 연산및대화형분석 효율적인대화형분석절차 [ 첫번째작업 ] Scala> A1.count( ) 12,300 콘솔에서바로확인 RDD-A 3. 캐싱 ( 인메모리 ) 2.RDD 변형 (Transformation) RDD-A1 RDD-AB RDD 변형 [ 두번째작업 ] Scala> B = load( 'B' ) A.filter( ) A1.persist( ) RDD-B Scala> AB = A1.join( B ) Scala> C = AB.groupby( ) RDD-C 1. 입력데이터셋 로딩 집계후 [ N 번째작업 ] 결과저장 Input - A Input - B RDD-D Scala> D = C.aggregate( ) Scala> D.SaveAsText( 'D' ) Output - D
Spark 아키텍처와구현원리 Spark 연산및대화형분석 분산캐싱 ( 인메모리 ) - 매번 HDFS나 RDBMS에서데이터를불러오는것은비효율적이고시간이오래걸림 - 원하는특정 RDD를분산캐싱 ( 다수의머신에있는메모리에데이터를상주시킴 ) 할수있음 - 캐싱은메모리 ( 우선 ) 와 Disk 모두가능. - 직렬화 ( 압축, 컬럼기반 ) 옵션을지원. SSD 권장 옵션 필요공간 CPU시간 메모리저장 디스크저장 비고 MEMORY_ONLY 많음 낮음 YES X MEMORY_ONLY_SER 적음 높음 YES X 직렬화 MEMORY_AND_DISK 많음 중간 일부 ( 우선 ) 일부 MEMORY_AND_DISK_SER 적음 높음 일부 ( 우선 ) 일부 직렬화 DISK_ONLY 적음 낮음 X YES
Spark 아키텍처와구현원리 Spark 연산및대화형분석 성능고려사항 RAM 은빠르지만 저장용량이적음 DISK 보다빠름 RAM 보다용량이큼 [ HDFS 데이터용 ] [ Spark 캐싱용 ] 10 GB/s [ RAM ] 100 MB/s [ DISK ] 500 MB/s [ SSD ]
Spark 을활용한대화형분석 실행방식 로컬모드 $ bin/pyspark --master local - Shell을실행하는로컬머신에서, 단일프로세서로 Spark이실행됨 - Spark/PySpark Shell 모두가능 로컬 [N] 모드 $ bin/pyspark --master local[n] - Shell을실행하는로컬머신에서, [N] 개의프로세서로 Spark이병렬로실행됨 - Spark/PySpark Shell 모두가능 분산모드 $ bin/spark-shell --master spark://master.host:7077 - Spark Master/Worker 데몬이구동되고있는클러스터환경에서, 분산 + 병렬로실행됨 - Spark Shell 만가능 -> 클러스터매니저를 Mesos를사용할경우 $ bin/spark-shell --master mesos://mesos.host:port
Spark 을활용한대화형분석 Spark 기능구조도 [ 패키지 ] SparkSQL GraphX MLlib Streaming SparkR... [ 맵리듀스실행엔진 ] Spark Core [ 자원관리시스템 ] YARN, Mesos, Spark Standalone Manager [ 데이터저장소 ] HDFS, NoSQL, RDBMS
Spark 을활용한대화형분석 Spark 의주요기능 구분모듈기능분야 핵심 Spark Core 맵리듀스와같은병렬처리및반복연산분산병렬처리 Spark SQL 하이브와같은 SQL 분석데이터웨어하우스 주요패키지 MLlib 마훗과같은머신러닝라이브러리데이터마이닝 GraphX 네트워크분석 SNA 등네트워크분석 Spark Streaming 스톰과같은실시간스트리밍분석스트리밍처리및분석 확장프로젝트 BlinkDB 빠른응답속도를가진 SQL 쿼리분석 Ad-Hoc 분석 SparkR 통계패키지인 R 과의통합통계분석
Spark 을활용한대화형분석 SparkSQL SQL 쿼리분석 - 입력 : HDFS/Hive, RDBMS, NoSQL, Text File( Plain Text, JSON, XML 등 ) - 데이터셋을 DB Table로처리, 내부적으로는 DataFrame 형식 SparkSQL Spark Core Hive Presto HBase HDFS Text JSON XML Mysql Oracle 몽고 DB
Part V 머신러닝의이해
머신러닝개요 머신러닝주요기법
머신러닝개요 머신러닝의기본구조 사전학습 ( Training ) INPUT 블랙박스 (Hidden Layer) OUTPUT
머신러닝개요 통계 / 분석프로세스 해석 / 평가 (Assessment) 모델링 (Modeling) 변형 (Transformation) 결과 정제 (Cleansing) 패턴패턴 선정 (Selection) 목표데이터 분석주제선정 대상데이터원본데이터원천데이터 대상데이터대상데이터 전처리
머신러닝개요 데이터마이닝 & 머신러닝프로세스 데이터 데이터 모델 모델 모델적용 획득 전처리 학습 평가 ( 서비스 ) 최적화 (AB 테스트 )
머신러닝개요 통계 데이터마이닝 머신러닝
머신러닝개요 Descriptive statistics via summary( ) > summary(mtcars[vars]) mpg hp wt Min. :10.4 Min. :52.0 Min. :1.51 1st Qu. :15.4 1st Qu. :96.5 1st Qu. :2.58 Median :19.2 Median :123.0 Median :3.33 Mean :20.1 Mean :146.7 Mean :3.22 3rd Qu. :22.8 3rd Qu. :180.0 3rd Qu. :3.61 Max. :33.9 Max. :335.0 Max. :5.42
머신러닝개요 N=32 Bandwidth=2.477 Kernel Density of Miles Per Gallon
머신러닝개요
머신러닝개요
머신러닝개요
머신러닝개요
Part VI Spark MLlib
1. Data Model 1) Data Model ML을위해필요한 Data Type의분류 (1) Spark Basic Type - vector, matrix, array - list, dataframe (2) MLlib Type - LabeledPoint - Rating - 알고리즘별 Model Class
1. Data Model 1) Data Model Spark Basic Type (1) Vector - 동일한 Type의데이터 N개, 1차원, 고정길이 // Create a dense vector (1.0, 0.0, 3.0). val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) [Note] 수학분야의방향과크기를가지고있는데이터를의미하지않음 (2) Matrix - 동일한 Type의데이터, 2차원, M개의행과 N개의열로구성 ( M*N 행열 ) // Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) (3) Array - 동일한 Type의데이터, N차원 (1차원, 2차원,... N차원 ), 고정길이 var z = new Array[String](3); z(0) = "Zara"; z(1) = "Nuha"; z(2) = "Ayan" var z2 = Array("Zara", "Nuha", "Ayan") (4) List - 동일한 Type의데이터, 1차원, 가변길이 (5) DataFrame - 다양한 Type의데이터, 2차원, 엑셀 Sheet or 데이터베이스테이블구조와유사
1. Data Model 1) Data Model Vector Type (1) Vector의분류 - 고밀도 : Dense -> 전체데이터를 double type의고정길이배열에저장 - 저밀도 : Sparse -> 0.0이아닌값과그인덱스를저장 (2) Example // Create the dense vector <1.0, 0.0, 2.0, 0.0>; val densevec1 = Vectors.dense(1.0, 0.0, 2.0, 0.0) val densevec2 = Vectors.dense(Array(1.0, 0.0, 2.0, 0.0)) // Create the sparse vector <1.0, 0.0, 2.0, 0.0>; val sparsevec1 = Vectors.sparse(4, Array(0, 2), Array(1.0, 2.0)) //(int size, int[] indices, double[] values) (3) Vectors 메소드 apply argmax asinstanceof compressed copy foreachactive isinstanceof numactives numnonzeros size toarray todense tojson tosparse tostring
1. Data Model 1) Data Model LabeledPoint Type (1) LabeledPoint = ( Labeled : double, Features : vectors ) - Labeled -> 1.Classification : 명목형값 (classes) -> double Binary( True, False ) -> 1.0 / 0.0 MultiClass -> 0.0 1.0 2.0... N.0 2.Regression : 연속형값 ( 수치형 ), double - Features -> 속성값 Vectors(double) 명목형 Feature의값은 double type으로변환해야함 (2) Example scala> val examples = MLUtils.loadLibSVMFile(sc, "file:///data/spark-1.6.0/data/mllib/sample_binary_classification_data.txt") examples: org.apache.spark.rdd.rdd[org.apache.spark.mllib.regression.labeledpoint] scala> println(examples.take(2).mkstring("\n")) (0.0, (692, [127,128,129,...,656,657], [ 51.0,159.0,253.0,...,141.0, 37.0] ) ) (1.0, (692, [158,159,160,...,682,683], [124.0,253.0,255.0,...,253.0,220.0] ) ) scala> val data = sc.textfile("file:///data/spark-1.6.0/data/mllib/sample_binary_classification_data.txt") scala> println(data.take(2).mkstring("\n")) 0 128:51 129:159 130:253... 657:141 658:37 1 159:124 160:253 161:255... 683:253 684:220
2. Data 처리 2) 데이터처리 데이터처리개요 1. 데이터셋선택 2. 데이터로딩 3. 데이터탐색 4. 데이터클린징 5. 데이터변환
2. Data 처리 2) 데이터처리 머신러닝공개데이터셋 1. UCI 데이터저장소 - 300여개의데이터셋 (classification,regression, clustering, recommender systems) http://archive.ics.uci.edu/ml/ 2. 아마존공개데이터셋 - Human Genome Project, Common Crawl web corpus, Wikipedia data, Google Books Ngrams http://aws.amazon.com/publicdatasets/ 3. 캐글 (Kaggle) 데이터셋 - 머신러닝대회, classification, regression, ranking, recommender systems, image analysis http://www.kaggle.com/competitions 4. KDnuggets 데이터셋 - 공개데이터셋목록 http://www.kdnuggets.com/datasets/index.html
2. Data 처리 2) 데이터처리 Spark Examles 데이터셋 1. 위치 $SPARK_HOME/data/mllib/ 2. 파일목록 ( Line 개수, Word 개수, File Size ) 2000 4000 63973 gmm_data.txt 6 18 72 kmeans_data.txt 1000 11000 197105 lr_data.txt 6 12 24 pagerank_data.txt 19 57 164 pic_data.txt 100 13610 104736 sample_binary_classification_data.txt 6 34 68 sample_fpgrowth.txt 99 100 1598 sample_isotonic_regression_data.txt 12 132 264 sample_lda_data.txt 100 13610 104736 sample_libsvm_data.txt 501 5511 119069 sample_linear_regression_data.txt 1501 1501 14351 sample_movielens_data.txt 150 737 6953 sample_multiclass_classification_data.txt 11 36 95 sample_naive_bayes_data.txt 322 5474 39474 sample_svm_data.txt 569 569 115476 sample_tree_data.csv 1000 2000 42060 lr-data/random.data 67 536 10395 ridge-data/lpsa.data
2. Data 처리 2) 데이터처리 데이터로딩 1. 데이터소스 - 로컬파일시스템 - 분산파일시스템 : HDFS, 아마존 S3 등 - RDBMS / NoSQL : Mysql, Oralce, 카산드라, HBase, ElasticSearch 등 -> JDBC Driver로연결 2. 파일포맷 - Text File - JSON File - CSV File( CSV is ",", TSV is "\t" ) - Key/Value(Hadoop Format) File - Object
2. Data 처리 2) 데이터처리 데이터로딩 3. Text File 불러오기 val data = sc.textfile("file:///data/spark-1.6.0/data/mllib/pic_data.txt") println(data.take(3).mkstring("\n")) data.first() data.count() 4. 컬럼분리 ( BackSpace, CSV, TSV ) val initdata = data.map { line => val values = line.split(' ')map(_.todouble) Vectors.dense(values.init) } println(initdata.take(3).mkstring("\n")) val firstdata = data.map { line => val values = line.split(' ')map(_.todouble) Vectors.dense(values(0)) } println(firstdata.take(3).mkstring("\n")) val lastdata = data.map { line => val values = line.split(' ')map(_.todouble) Vectors.dense(values.last) } println(lastdata.take(3).mkstring("\n")) val taildata = data.map { line => val values = line.split(' ')map(_.todouble) Vectors.dense(values.tail) } println(taildata.take(3).mkstring("\n"))
2. Data 처리 2) 데이터처리 데이터탐색 1. RDD 기초통계 scala> val second = data.map { line => val values = line.split(' ').map(_.todouble) (values(1)) } second: org.apache.spark.rdd.rdd[double] = MapPartitionsRDD[23] at map at <console>:36 scala> println(second.collect().mkstring("\n")) 1.0 2.0... scala> second.stats res42: org.apache.spark.util.statcounter = (count: 19, mean: 7.526316, stdev: 4.649487, max: 15.000000, min: 1.000000)
2. Data 처리 2) 데이터처리 데이터탐색 2. RDD[Vector] 기초통계 scala> val features = data.map { line => val values = line.split(' ').map(_.todouble) Vectors.dense(values) } # MultivariateStatisticalSummary 메소드목록 asinstanceof count isinstanceof max mean min norml1 norml2 numnonzeros tostring variance features: org.apache.spark.rdd.rdd[org.apache.spark.mllib.linalg.vector] = MapPartitionsRDD[30] at map at <console>:41 scala> println(features.take(3).mkstring("\n")) [0.0,1.0,1.0] [0.0,2.0,1.0] [0.0,3.0,1.0] scala> import org.apache.spark.mllib.stat.{multivariatestatisticalsummary, Statistics} scala> val summary: MultivariateStatisticalSummary = Statistics.colStats(features) scala> summary.mean res56: org.apache.spark.mllib.linalg.vector = [5.789473684210526, 7.526315789473685, 0.9526315789473684] scala> summary.variance res57: org.apache.spark.mllib.linalg.vector = [21.953216374269005, 22.8187134502924, 0.04263157894736842] scala> summary.numnonzeros res58: org.apache.spark.mllib.linalg.vector = [16.0, 19.0, 19.0]
2. Data 처리 2) 데이터처리 데이터탐색 3. RDD[Vector] Correlations scala> val correlmatrix: Matrix = Statistics.corr(features, "pearson") correlmatrix: org.apache.spark.mllib.linalg.matrix = 1.0 0.8814343025316858 0.14417085866598092 0.8814343025316858 1.0 0.1787639407161585 0.14417085866598092 0.1787639407161585 1.0
2. Data 처리 2) 데이터처리 데이터변환 1. 텍스트변환 : TF(Term Frequency) 데이터셋 : http://qwone.com/~jason/20newsgroups/ - 20 개의뉴스그룹 scala> import org.apache.spark.mllib.feature.hashingtf scala> val data = sc.wholetextfiles("file:///data/spark-1.6.0/data/train.electronics/*") scala> val text = data.map { case (file, text) => text } scala> text.count res3: Long = 591 scala> text.first() res4: String = "From: keith@radio.nl.nuwc.navy.mil... scala> val tf = new HashingTF(numFeatures = 10000) scala> val termf = text.map(txt => tf.transform(txt.split(" "))) scala> termf.first() res5: org.apache.spark.mllib.linalg.vector = (10000,[0,22,43,45,54,73,145,182,334,434,866,1015,1075,1563,1577,1666,1691,1786,1818,2266,2284,2752,2932,3159,3355,3480,3502,3521,3524,3543,4145,4262,4304,4468,4773,4801,5240,5475,5636,5848,5853,5936,6048,6051,6216,6852,6979,6988,7118,7285,7525,7817,7865,8056,8642,8 661,8786,9011,9131,9467,9493,9792],[7.0,1.0,1.0,7.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,2.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,2.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])
2. Data 처리 2) 데이터처리 데이터변환 2. 속성값정량화 : StandardScaler scala> import org.apache.spark.mllib.feature.standardscaler scala> val example = sc.textfile("file:///data/spark-1.6.0/data/mllib/ridge-data/lpsa.data") scala> val data = example.map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.todouble))) }.cache() scala> val scaler1 = new StandardScaler().fit(data.map(x => x.features)) scala> val scaler2 = new StandardScaler(withMean = true, withstd = true).fit(data.map(x => x.features)) scala> val data1 = data.map(x => LabeledPoint(x.label, scaler1.transform(x.features))) scala> val data2 = data.map(x => LabeledPoint(x.label, scaler2.transform(vectors.dense(x.features.toarray)))) scala> val summary: MultivariateStatisticalSummary = Statistics.colStats(data.map(x => x.features)) scala> val summary1: MultivariateStatisticalSummary = Statistics.colStats(data1.map(x => x.features)) scala> val summary2: MultivariateStatisticalSummary = Statistics.colStats(data2.map(x => x.features))
2. Data 처리 2) 데이터처리 데이터변환 2. 속성값정량화 : StandardScaler scala> summary.mean [-0.030983588171116905,-0.00661741108409053,0.11823713020843973,-0.01993077331580595,0.017840200894241987,- 0.024915030703527408,-0.029404560602868696,0.06691288828053416] scala> summary1.mean [-0.029388629245711173,-0.005948318727428181,0.11733735411196436,-0.019755816802758758,0.017585582898541785,- 0.024870818742510403,-0.029955039130781458,0.06440613582227006] scala> summary2.mean [0.0, -2.220446049250313E-16,1.1102230246251565E-16,1.1102230246251565E-16,5.551115123125783E-17,- 1.1102230246251565E-16,1.1102230246251565E-16,2.220446049250313E-16] scala> summary.variance [1.111487960445174,1.2376212760140817,1.0153953695719211,1.0177903265464625,1.0291672192693442,1.0035584882413895,0.9 635840566524871,1.079356884436503] scala> summary1.variance [1.0,1.0,1.0,0.9999999999999998,1.0,1.0000000000000004,0.9999999999999998,1.0000000000000004] scala> summary2.variance [1.0,1.0,1.0,0.9999999999999996,1.0000000000000004,1.0000000000000002,0.9999999999999996,1.0000000000000004]
3. MLlib Example MLlib Process Process 1단계 : 데이터로딩 2단계 : 학습데이터 / 평가데이터로분리 3단계 : 학습 (Training) 4단계 : 평가 5단계 : 모델저장 6단계 : 서비스활용
3. MLlib Example Spark MLlib 예제
3. MLlib Example MLlib Example NaiveBayes 분류기 (1) scala> import org.apache.spark.mllib.classification.{naivebayes, NaiveBayesModel} scala> import org.apache.spark.mllib.linalg.vectors scala> import org.apache.spark.mllib.regression.labeledpoint scala> val data = sc.textfile("file:///data/spark-1.6.0/data/mllib/sample_naive_bayes_data.txt") scala> println(data.collect().mkstring("\n")) scala> val parseddata = data.map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.todouble))) } scala> val splits = parseddata.randomsplit(array(0.6, 0.4), seed = 11L) scala> val training = splits(0) scala> val test = splits(1) scala> training.count()
3. MLlib Example MLlib Example NaiveBayes 분류기 (2) scala> val model = NaiveBayes.train(training, lambda = 1.0, modeltype = "multinomial") scala> val predictionandlabel = test.map(p => (model.predict(p.features), p.label)) scala> val accuracy = 1.0 * predictionandlabel.filter(x => x._1 == x._2).count() scala> test.count() scala> println(predictionandlabel.collect().mkstring("\n")) scala> val trainprediction = training.map(p => (model.predict(p.features), p.label)) scala> val trainaccuracy = 1.0 * trainprediction.filter(x => x._1 == x._2).count() scala> training.count() scala> println(training.collect().mkstring("\n")) scala> println(trainprediction.collect().mkstring("\n")) scala> model.save(sc, "mymodel_naivebayes") scala> val NBModel = NaiveBayesModel.load(sc, "mymodel_naivebayes")
3. MLlib Example MLlib Example DecisionTree 분류기 (1) scala> import org.apache.spark.mllib.tree.decisiontree scala> import org.apache.spark.mllib.tree.model.decisiontreemodel scala> import org.apache.spark.mllib.util.mlutils scala> val data = MLUtils.loadLibSVMFile(sc, "file:///data/spark-1.6.0/data/mllib/sample_libsvm_data.txt") scala> println(data.collect().mkstring("\n")) scala> val numclasses = 2 scala> val categoricalfeaturesinfo = Map[Int, Int]() scala> val impurity = "gini" scala> val maxdepth = 5 scala> val maxbins = 32
3. MLlib Example MLlib Example DecisionTree 분류기 (2) scala> val splits = data.randomsplit(array(0.7, 0.3)) scala> val (trainingdata, testdata) = (splits(0), splits(1)) scala> val model = DecisionTree.trainClassifier(trainingData, numclasses, categoricalfeaturesinfo, impurity, maxdepth, maxbins) scala> val labelandpreds = testdata.map { point => val prediction = model.predict(point.features) (point.label, prediction) } scala> val testerr = labelandpreds.filter(r => r._1!= r._2).count.todouble scala> testdata.count() scala> model.save(sc, "mymodel_dtree") scala> val DTModel = DecisionTreeModel.load(sc, "mymodel_dtree")
4. MLlib 알고리즘 Spark MLlib 알고리즘목록 logistic regression and linear support vector machine (SVM) classification and regression tree random forest and gradient-boosted trees recommendation via alternating least squares (ALS) clustering via k-means, bisecting k-means, Gaussian mixtures (GMM), and power iteration clustering topic modeling via latent Dirichlet allocation (LDA) survival analysis via accelerated failure time model singular value decomposition (SVD) and QR decomposition principal component analysis (PCA) linear regression with L1, L2, and elastic-net regularization isotonic regression multinomial/binomial naive Bayes frequent itemset mining via FP-growth and association rules sequential pattern mining via PrefixSpan summary statistics and hypothesis testing feature transformations model evaluation and hyper-parameter tuning
4. MLlib 알고리즘 Spark MLlib 알고리즘분류 분류및회귀 linear models (SVMs, logistic regression, linear regression) decision trees naive Bayes ensembles of trees (Random Forests and Gradient-Boosted Trees) isotonic regression 추천 (Collaborative filtering) alternating least squares (ALS) 군집 k-means, bisecting k-means, streaming k-means Gaussian mixture power iteration clustering (PIC) latent Dirichlet allocation (LDA) 차원축소 singular value decomposition (SVD) principal component analysis (PCA) 빈발패턴마이닝 (Frequent pattern mining) FP-growth association rules PrefixSpan
4. MLlib 알고리즘 분류및회귀 ( 예측 ) 모델 구분 모델 Binary Classification Multiclass Classification Regression linear SVMs, logistic regression, decision trees, random forests, gradient-boosted trees, naive Bayes logistic regression, decision trees, random forests, naive Bayes linear least squares, Lasso, ridge regression, decision trees, random forests, gradient-boosted trees, isotonic regression
# 참고자료 Spark Papers Spark RDD Spark: Cluster Computing with Working Sets June 2010 http://www.cs.berkeley.edu/~matei/papers/2010/hotcloud_spark.pdf Spark RDD : Fault Tolerant & In-Memory Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In- Memory Cluster Computing April 2012 http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf
# 참고자료 Spark Books
감사합니다.