How Hadoop Works 박영택 컴퓨터학부
HDFS Basic Concepts HDFS 는 Java 로작성된파일시스템 Google 의 GFS 기반 기존파일시스템의상위에서동작 ext3, ext4 or xfs
HDFS 의 file 저장방식 File 은 block 단위로분할 각 block 은기본적으로 64MB 또는 128MB 크기 데이터가로드될때여러 machine 에분산되어저장됨 같은 file 의다른 block 들은서로다른 machine 에저장됨 이를통해효율적인 MapReduce 처리가가능 Block 들은여러 machine 에복제되어 Data node 에저장됨 기본 replication 은 3 개 각 block 은서로다른 3 개의 machine 에저장되어있다는것을의미 Name node 로불리는 master node 는어떤 block 들이 file 을구성하고있고, 어느위치에저장되어있는지에대한정보를 meta data 로관리
HDFS 의 file 저장방식 (1) 데이터 file 들은 block 으로분할되고, 데이터노드에분산되어저장
HDFS 의 file 저장방식 (2) 데이터 file 들은 block 으로분할되고, 데이터노드에분산되어저장
HDFS 의 file 저장방식 (3) 데이터 file 들은 block 으로분할되고, 데이터노드에분산되어저장 각 block 은여러 node 에복제되어저장 (default 3x)
HDFS 의 file 저장방식 (4) 데이터 file 들은 block 으로분할되고, 데이터노드에분산되어저장 각 block 은여러 node 에복제되어저장 (default 3x) Name node 는 metadata 를저장
HDFS File 이 64MB 또는 128MB 의 block 으로분할될때, file 이 block 의크기보다작은경우에는 block 크기전체를사용하지않음 Block 들은 Hadoop configuration 에설정된디렉토리를통해저장됨 NameNode 의 metadata 를사용하지않으면, HDFS 에접근할수있는방법이존재하지않음 클라이언트애플리케이션이 file 에접근하는경우 : NameNode와통신하여 file을구성하고있는 block들의정보와 DataNode의 block의위치정보를제공받음 이후데이터를읽기위해 DataNode와직접통신 NameNode는 bottleneck이되지않음
HDFS 접근방법 Shell 커맨드라인을사용 : haddop fs Java API Ecosystem 프로젝트 Flume Sqoop Hue network sourc 로부터데이터수집 HDFS 와 RDBMS 사이의데이터전송 Web 기반의 interactive UI 로 browse, upload, download, file view 등이가능
Example: Storing and Retrieving Files (1)
Example: Storing and Retrieving Files (2)
Example: Storing and Retrieving Files (3)
Example: Storing and Retrieving Files (4)
HDFS NameNode Availability NameNode daemon 은반드시항상실행되고있어야함 NameNode 가중단되면, 클러스터는접근이불가능 High Availability mode (in CDH4 and later) 2 개의 NameNode : Active 와 Standby Classic mode 1개의 NameNode 또다른 helper node는 SecondaryNameNode backup이목적이아니며, 장애발생시 NameNode를대신하는것이불가능 NameNode를복구할수있는정보를제공
Hadoop Server roles Clients Data Analytics Jobs Map Reduce Data Storage Jobs HDFS Job Tracker Name Node Secondary Name Node Masters Data Node & Task Tracker Data Node & Task Tracker Data Node & Task Tracker Data Node & Task Tracker... Data Node & Task Tracker Data Node & Task Tracker Slaves
Hadoop 의구성요소 Client Name Node 를통해정보를받고이후직접적으로 Data Node 와통신을한다. Master node 물리적으로 Master Node 역할 (Job Tracker, Name Node) 을하는노드로서, slave node 에대한정보와실행을할 Tasks 에대한관리를담당 Slave node 물리적으로 Slave Node 역할 (Data Node, Task Node) 을하는노드로서, 실제로데이터를분산되어가지고있으며 Client 에서요청이오면데이터를전달하는역할및담당 Task 를수행하는역할 두가지관점 Data Analytics 관점 Job Tracker : 노드에 Task 를할당하는역할과모든 Task 를모니터링하고실패할경우 Task 를재실행하는역할 Task Tracker : Task 는 Map Task 와 Reduce Task 로나눠질수있으며, Task 가위치한 HDFS 의데이터를사용하여 MapReduce 수행. Data Storage 관점 Name Node : HDFS의파일및디렉토리에대한메타데이터 (metadata) 를유지, 클라이언트로부터데이터위치요청이오면전달, 장비손상시 Secondary Node 로대체 Data Node : 데이터를 HDFS 의 Block 단위로구성, Fault Recovery 를위해 default 로 3 copy 를유지, Heartbeat 을통하여지속적으로파일위치전달
맵리듀스 (MapReduce) 란? 맵리듀스는여러노드에태스크를분배하는방법 각노드프로세스데이터는해당노드에저장 가능한경우 두단계로구성 맵 (Map) 리듀스 (Reduce)
Hadoop Components : MapReduce MapReduce 는 Hadoop 클러스터의데이터를처리하기위한시스템 2 개의 phase 로구성 : Map, Reduce Map 과 Reduce 사이에는 shuffle 과 sort 라는스테이지가있음 각 Map task 는전체데이터셋에대해서별개의부분에대한작업을수행 기본적으로하나의 HDFS block 을대상으로수행 모든 Map task 가종료되면, MapReduce 시스템은 intermediate 데이터를 Reduce phase 를수행 할노드로분산하여전송
맵리듀스 : 구성도
맵리듀스 : JobTracker 맵리듀스 Job들은 JobTracker라는소프트웨어데몬에의해제어됨. JobTracker는 Master Node 에있음. 클라이언트는맵리듀스 Job을 JobTracker에게보낸다. JobTracker는클러스터의다른노드들에게맵과리듀스 Task를할당한다. 이노드들은 TaskTracker라는소프트웨어데몬에의해각각실행된다. TaskTracker는실제로맵또는리듀스 Task를인스턴스화하고, 진행상황을 JobTracker에게보고할책임이있다.
맵리듀스 : 용어 Job 은 Full program 데이터집합을통해맵퍼 (Mapper) 와리듀서 (Reducer) 를전체실행 Task는데이터조각을통해하나의맵퍼또는리듀서를실행 Task attempt는 Task를실행하기위한특정인스턴스 적어도 Task가있기때문에많은 Task attempt가있을것이다. 만약 Task attempt가실패하면, JobTracker에의해서다른 Task Attempt가시작될것이다. Speculative execution( 나중에참조 ) 는또한완료된 Task들보다더많은 Task를시도할수있다.
맵리듀스 : 맵퍼 (Mapper) 하둡은네트워크트래픽을방지하기위해, 메타데이터의일부분을가지고노드에서처리한다. 동시에실행되는여러맵퍼는각각입력데이터의일부를처리하는단계를포함한다. 맵퍼는 Key/Value 쌍의형태로데이터를읽는다. 맵퍼의 0 개또는그이상의 Key/Value 상을출력한다 (pseudo-code): map(in_key, in_value) -> (inter_key, inter_value) list
맵리듀스 : 맵퍼 (Mapper) (cont d) 맵퍼는입력값의 Key 를사용하기도하지만, 완전히무시하기도한다. 예를들어, 표준패턴은한번에파일의라인을읽는다. Key 는라인이시작되는파일에 Byte Offset 이다. Value 는라인자체의내용이다. 일반적으로 Key 는관련이없는것으로간주한다. 만약맵퍼가무언가를출력하는경우, 출력형태는 Key/Value 쌍이어야한다.
맵퍼예제 : Upper Case Mapper 대문자로바꾸기 (pseudo-code): Let map(k, v) = emit(k.toupper(), v.toupper()) ( foo, bar ) -> ( FOO, BAR ) ( foo, other ) -> ( FOO, OTHER ) ( baz, more data ) -> ( BAZ, MORE DATA )
맵퍼예제 : Explode Mapper 입력된단어를철자로분리하여출력 (pseudo-code): Let map(k, v) = foreach char c in v: emit (k, c) ( foo, bar ) -> ( foo, b ), ( foo, a ), ( foo, r ) ( baz, other ) -> ( baz, o ), ( baz, t ), ( baz, h ), ( baz, e ), ( baz, r )
맵퍼예제 : Filter Mapper 입력값의 Value 가소수인경우에만 Key/Value 쌍출력 (pseudo-code): Let map(k, v) = if (isprime(v)) then emit(k, v) ( foo, 7) -> ( foo, 7) ( baz, 10) -> nothing
맵퍼예제 : Changing Keyspaces 맵퍼의출력 key 값이입력데이터의 key 값과동일할필요는없다. 단어의길이를키로출력 (Pseudo-code): Let map(k, v) = emit(v.length(), v) ( foo, bar ) -> (3, bar ) ( baz, other ) -> (5, other ) ( foo, abracadabra ) -> (11, abracadabra )
맵리듀스 : 리듀서 (The Reducer) 맵단계가끝나면, 중간단계의키값을기반으로중간값 (intermediate values) 를리스트형태로조합 리스트를리듀서로전달 하나의리듀서나여러개의리듀서가존재할것이다. 잡 (job) 설정에서이부분은정의되어있다.( 추후설명 ) 중간키와연관되어있는모든값은같은리듀서로보내진다. 중간키와그값들의리스트들은키순서대로정렬되어리듀서로보내진다. 이단계는 셔플 (shuffle) 과정렬 (sort) 라고알려져있다. 리듀서의 output 은 O 이거나 key/value 의쌍이다. 이결과들은 HDFS 에저장된다. 실제로리듀서는보통 input 키에대해서하나의 key/value 쌍으로배출되어쓰여진다.
리듀서예제 : Sum Reducer 각중간키값과관련있는모든값들을합 (pseudo-code) let reduce(k, vals) = sum = 0 foreach int i in vals: sum += i emit(k, sum) ( bar', [9, 3, -17, 44]) -> ( bar. 39) ( foo', [123, 100, 77]) -> ( foo', 300)
리듀서예제 : Identity Reducer Identity 리듀서는매우흔하다.(pseudo-code) let reduce(k, vals) = foreach v in vals: emit(k, v) ('bar', [123, 100, 77]) -> ('bar', 123), ('bar', 100), ('bar', 77) ('foo', [9, 3, -17, 44]) -> ('foo', 9), ('foo', 3), ('foo', -17), ('foo', 44)
맵리듀스예제 : Word Count 큰 Input 데이터에서나타나는각단어의개수를세는것 Word Count 는맵리듀스프로그래밍에서 Hello world 와같다. map(string input_key, String input_value) foreach word w in input_value: emit(w, 1) reduce(string output_key, Iterator<int> intermediate_vals) set count = 0 foreach v in intermediate_vals: count += v emit(output_key, count)
맵리듀스예제 : Word Count (cont d) 맵퍼의 Input (3414, 'the cat sat on the mat') (3437, 'the aardvark sat on the sofa') 맵퍼로부터의 Output ('the', 1), ('cat', 1), ('sat', 1), ('on', 1), ('the', 1), ('mat', 1), ('the', 1), ('aardvark', 1), ('sat', 1), ('on', 1), ('the', 1), ('sofa', 1)
맵리듀스예제 : Word Count (cont d) 리듀서로보내기위한중간데이터 ('aardvark', [1]) ('cat', [1]) ('mat', [1]) ('on', [1, 1]) ('sat', [1, 1]) ('sofa', [1]) ('the', [1, 1, 1, 1]) 마지막리듀서의 Output ('aardvark', 1) ('cat', 1) ('mat', 1) ('on', 2) ('sat', 2) ('sofa', 1) ('the', 4)
맵리듀스 : 데이터지역성 (Data Locality) 가능하면언제든지하둡은노드의맵태스크 (Map task) 가자신의 HDFS 노드에있는데이터블럭에서동작하는지를확인하려고할것이다. 만약불가능하다면맵태스크는데이터를가공하여네트워크를통하여다른노드로데이터를전달해야만할것이다. 맵태스크가끝나자마자데이터는네트워크를통하여리듀서들로전달된다. 비록리듀서들이맵태스크와물리적으로같은머신에서동작하더라도리듀서들은데이터지역 성을알수있는개념은없다.( 즉, 맵태스크가자신의머신에존재하는리듀서로보낸다는보장 은못함 ) 일반적으로, 모든맵퍼 (Mappers) 들은모든리듀서들과통신을해야한다.
맵리듀스 : 셔플과정렬은병목현상 (Bottleneck)? 셔플과정렬단계는병목현상을보인다. reduce 함수는맵퍼가끝날때까지리듀서가시작할수없다. 실제로, 하둡은맵퍼들중에일을끝내는매퍼들은맵퍼에서리듀서로데이터를옮기기시작할것이다. 이것은마지막맵퍼가끝나자마자대용량의데이터를한번에이동시키는병목현상을완화한다. 위와같은행위는설정가능하다. 개발자는맵퍼의완료퍼센트를조정하여그퍼센트만큼도달하면리듀서로데이터를보내기시작한다. 개발자의 reduce 함수는모든중간데이터들의이동과정렬이끝날때까지시작하지않는다.
맵리듀스 : 느린맵퍼는병목현상인가? 하나의맵태스크가다른맵테스크들보다느릴수있다. 아마도하드웨어의결함때문이거나머신이느리기때문에. 위와같은상황이나타난다면병목현상이될수있다. 리듀서에서 reduce 함수는맵퍼가끝날때까지시작할수없다. 하둡은 speculative execution 사용하여병목현상을완화한다. 만약맵퍼가다른맵퍼보다심각하게느리면새로운맵퍼인스턴스를생성하여같은데이터를가지고다른머신에서실행할것이다. 느려진작업을끝내기위해첫번째맵퍼의결과들을사용할수있다. 하둡은계속실행중인 ( 끝나지않는 ) 매퍼들을 kill 할수있다.
맵리듀스잡의생성과실행 맵퍼와리듀서클래스를작성잡 잡과이를클러스터에등록하기위한설정을하는 Driver 클래스작성 맵퍼, 리듀서, Driver 클래스컴파일 예제 javac -classpath `hadoop classpath` *.java 맵퍼, 리듀서, Driver 클래스파일의 jar 파일생성 예제 : jar cvf foo.jar *.class hadoop jar 명령어실행을통해하둡클러스터에잡을생성 예제 : hadoop jar foo.jar Foo in_dir out_dir