II. Hadoop 1. Hadoop 개요 (1) Hadoop 의배경 2002 년미국의프로그래머인 Doug Cutting 과 Mike Cafarella 은 Lucene 이라는텍스트검색엔진개발프로젝트를진행하면서 Nutch 라는하위프로젝트를통해파서 (Parser) 및크롤러 (Crawler) 1 를개발하고있었다. 그런데이때 Nutch 가가져오는방대한데이터를처리할길이없어난감해하던차에 2004 년 Google 이발간한 GFS 논문과 MapReduce 논문 2 을보고이를기초로프로그래밍에들어갔다. 약 3~4 개월의프로그래밍을통해 HDFS 파일시스템을만든후이위에 Nutch 를이식했으며이후실제클러스터까지구축하여운영했다. 3 이후 2004 년부터 2008 년까지추가의개발이진행되었다. 특히 2006 년에는 Yahoo 에입사하고 Hadoop 을오픈소스화했는데이후상당히많은 Yahoo 의프로그래머들이참여하면서개발도활발히진척되었고활용면에서도 Yahoo 의기간업무에서핵심적역할을하게되었다고한다. 이후프로그램의안정화, 성능개선등과함께 Facebook 등에서도적극참여하였고결국 2006 년부터는 Apache 프로젝트로등재되어전세계의수많은개발자가참여하게되었다. 재미있는것은 Google 에서도자신들의 MapReduce 와 GFS 가소스코드를공개하지않는폐쇄형 (closed) 솔루션이었던관계로신입사원의교육에서는 Hadoop 의 MapReduce 를대신이용했다고한다. 1 Parser 는문장을정해진규칙 ( 문법 ) 에의거해서분석하는것을말한다. 이때문법이란언어상의문법일수도있고프로그래밍언어상의문법도포함된다. 한편크롤러 (crawler) 란웹페이지를뒤져가며방문해서정보를수집해오는프로그램을말한다. 2 http://research.google.com/archive/gfs.html 과 http://research.google.com/archive/mapreduce.html 3 다만비용상의문제로초기에는 4 대, 그리고이후에는약 30 대까지만확장되어운영하였다고한다.
한편흥미로운것은 Google 은당초함수형 (functional) 프로그래밍기법으로서의 Map & Reduce 에대해특허를획득했었다는점이다. 물론, 이에대해반론도만만치않아서 Google 이전에도많은학자와프로그래머들사이에상당한논의와구현이있었기때문에특허의대상이될수없다고하는논란이많았었다. 이러한논란속에서 2010 년최종적으로특허를획득한후 3 개월만에 Google 은 Apache 에대해특허침해로문제삼지않겠다고선언함으로써모든논의에종지부를찍었다. 이제완벽한오픈소스로서세상사람에게공개된것이다.
(2) Hadoop 의 version 가. Hadoop 의라이선스 Apache 프로젝트로서의 Hadoop 은 Apache 라이선스조건 4 을따르므로 Linux 에서와마찬가지로다양한판본 ( 배포판 ) 이존재할수있다. (Linux 에 Red Hat, Ubuntu, Suse 등의배포판이존재하는것과마찬가지논리 ) 게다가 Hadoop 과연계되는여러종류의프로젝트가존재하기때문에이들상호간의조합에따른버전별테스트또한만만치않은작업이다. 이에따라많은회사에서 Hadoop 관련배포판을제공하고있는데대표적인것이 Cloudera 와 Hortonworks 이다. 5 Hadoop 은일반적으로 RPM, tar, deb 등다양한형태의설치파일이제공되는데배포판별로 bundle 되는도구는다를수있고이를테스트한후 bundle 하므로자신에맞는배포판을고르는것이편리하다. 배포판 Apache http://hadoop.apache.org Cloudera www.cloudera.com 설명 Hadoop 의원본 Source 역할을하며 tar ball 로만패키징. 별다른부가도구는제공치않음. 선두주자이며 Hadoop 클러스터관리도구가우수 오픈소스 / 상용모델병행 ( 클러스터규모에 따름 ) HortonWorks www.hortonworks.com MapR www.mapr.com WANdisco www.wandisco.com Intel Apache Hadoop 을충실히따름각종관리도구제공. 오픈소스 (HDFS 대신 ) 자체의파일시스템제공관리도구와 HA 기능 ( 미러링, snapshot, 기타 ) hadoop version 2 기반관리도구. Free / Premium 모델병행 Encryption 지원 http://hadoop.intel.com 4 Apache License 는미국의 Apache 재단 (ASF: Apache Software Foundation) 에서자신들의프로젝트를대상으로설정했던공개소프트웨어라이센스조건이다. Apache License 에서는해당라이센스의조건과저작권만명기하면얼마든지자유롭게사용, 배포및수정이가능하도록하고있다. (http://www.apache.org/licenses/ ) 5 Cloudera 는 Hadoop 개발자인 Doug Cutting 이 CTO 로있는회사로서마치 Linux 에서의 Red Hat 이점하는위치를가져가려고노력하고있다. 한편 Yahoo 의 Hadoop 개발팀은 Hortonworks 의핵심요원이되었다.
배포판 설명 나. Hadoop version 의경과 성능개선위해하드웨어가속및 S/W stack 추가 2006 년 Apache 프로젝트에등재된이래로 0.1 부터 0.14 까지매달새로운버전이발표하였고 0.15 부터 0.20 까지는분기별로새버전을발표했다. 0.20 은 2009 년발표하였다. Hadoop-0.20 은현재의안정판의근간이되었는데 2011 년 12 월에 version 1.0.0 이탄생하였다. 실제로대부분의배포판들이이버전에의거하고있다. ( 예 : Apache Hadoop 0.20.x, CDH3.*, HDP1.* 등 ) 현재시점 (2013 년 8 월 ) 을중심으로보면 1.2.1 의안정버전과 2.x 버전의베타버전이진행중이다. 0.21 hadoop 1.0 이되었으며 2013.9 월현재안정버전은 ver. 1.2.1 0.23 Trunk 에서파생되어 Hadoop 2.0 이됨.
(3) 특징 가. 설계상의특징 우리가전혀새롭게빅데이터처리를위한솔루션을개발한다고가정해보자. 아마도다음의몇가지요구사항이제시될것이다. 슈퍼컴퓨터같은비싼서버대신저가서버를이용하자. 이들서버를수십 ~ 수백 ~ 수천대로클러스터구성하면기존의슈퍼컴퓨터를훨씬적은비용으로대체할수있을것이다. 대규모클러스터를구성할경우시스템장애는불가피하다. 따라서장애극복을위해데이터를여러곳에복제 (replicate) 하고사용하자. 이렇게복잡한클러스터환경에서는기존의분산컴퓨팅에서요구되었던시스템상의각종관리기능 (marshalling) 은과감하게생략하자. 그러면서도외부에서는이들모두가하나의파일시스템이미지로보이도록해야한다. 복잡한시스템프로그램을효율적으로구현하기위해서는 functional 프로그래밍기법을동원한다. 즉, 분산처리를함수중심의프로그램기법으로구현하자. 대형파일이많으므로여러파일로나누어처리하되이들각각 (split) 의블럭크기는가급적크게 ( 수십 ~ 수백 MB) 하는것이좋겠다. 속도를개선하기위해서는 write-once, read may, immutable 타입을기준으로하자. 대규모작업을위해서 node단위뿐만아니라 rack단위또는스위치단위로확장할수있도록한다. 바로이러한요구사항이 2000 년대초의 Google 이당면한문제였었다. 그리고이에대한해답이 Google 의 Map Reduce 이고이를구현하여오픈소스화한것이 Hadoop 인것이다. Google 의 MapReduce 그리고오픈소스버전으로서의 Hadoop 은설계사상에있어기존방법론과다른뚜렷한특징을가진다. 프로그램모델의단순화 데이터중심의사고 (Moving Computation to Data) 우선첫째항목을보자면 Hadoop 의설계사상은기존방식에비해매우단순하다. 기존의병렬컴퓨팅에서는대용량데이터처리를분산환경에서진행함에따라분산된컴퓨터내지프로그램 ( 프로세스 ) 상호간의통신과제어 ( 특히 socket 통신또는 marshalling 6 등 ) 이중요했다. 그런데이러한상호제어는또다른오버헤드가되어복잡성이끝없이증대되는요인이되었다. Hadoop 에서는뒤에서보는 6 marshalling 은메모리상의객체를스토리지에저장또는네트워크상에서전송할수있도록변환하는것을말한다. 흔히 여러컴퓨터에서원격의객체 (remote object) ( 또는데이터 ) 를전달할때사용된다. Marshalling 은이처럼프로세스간또는 thread 간에서로데이터를주고받고자하지만제공하는 remote procedure call (RPC) 의방식이다를때주로이용된다.
바와같이가능한모든관리커뮤니케이션을과감하게생략해서각각의컴퓨터 ( 이를 node 라고한다 ) 가독립적으로작업을실시하도록하였다. Hadoop 의둘째특징은컴퓨터로하여금자신과가장가까이있는데이터를직접처리하도록하는점이다. 일견당연해보이지만우리가과거해오던방식과는정반대임을알수있다. 과거에는데이터분석작업시외곽에있는데이터를중앙컴퓨터로가지고와서작업하였다. 즉, 데이터를컴퓨터가있는곳으로가져와서작업했던것이다. 반면 Hadoop 에서는데이터를중앙의서버에전달하여처리하는대신데이터의소스로부터입력을받는즉시다수 node 컴퓨터에게일괄적으로 ( 무작위적으로 ) 배분시켜주고각각의컴퓨터는자신이할당받은데이터에대해 ( 앞서말한것처럼중앙의관리시스템의간섭이나다른컴퓨터와의통신이없이 ) 독자적으로지정된임무를수행하게된다. 다음그림은이러한과정을잘보여주고있다. 7 7 위그림및 HDFS, MapReduce 설명에서의일부그림은 Yahoo 에서의 Hadoop Tutorial 의그림을일부인용하였다. Yahoo 의 Hadoop tutorial (http://developer.yahoo.com/hadoop/tutorial/) 은 Hadoop 의원리를상세하고친절하게소개하고있다, 당초 Yahoo 는 Doug Cutting 을영입한뒤별도의전담팀을두고 Hadoop Core 를개발, 이용하였으며이를 Apache Foundation 에오픈소스로공개한바있다.
이처럼단순성을중시하면서도데이터를중심에놓는사고방식의전환의결과 Hadoop 은거의선형 (linear) 형태의확장성을가지게되었다. 즉, 10 대로하던작업을동일한성능의 20 대컴퓨터를적용할때 2 배의성능향상을, 그리고 100 대의컴퓨터를적용하면거의 10 배의성능향상이가능해진것이다. 무엇보다이때의각각의컴퓨터는고가의장비일필요없이일반적인중저가서버로도충분해져서전체적인총소유비용 (TCO: Total Cost of Ownership) 이획기적으로절감되었다. 나. 사용상의특징 Hadoop 은대규모데이터처리용응용프로그램을분산환경에서수행시킬수있도록해주는오픈소스프레임워크로서다음과같은특징을가진다. 손쉽게적용이가능하다 독립적으로환경구축할경우아주저가의컴퓨터도고성능컴퓨팅환경을충분히이용가능하며클라우드컴퓨팅환경을이용할경우한시적이용또는점진적증설도가능하다. 안정적운용이가능하다 설계자체가저가컴퓨터활용을염두에두었고장애를극복하기위한장치를가능한소프트웨어로구현하였다. 즉, 기존고가슈퍼컴퓨터의대체를주된목적의하나로삼으면서미래유연성확보에주력하였다. 확장성이좋다 성능확장이필요할경우수평적으로기존 ( 저가 ) 컴퓨터를추가시키면그대수만큼성능확장이되도록하였다. 현재 Hadoop기반으로약 4만대의서버를연결한클러스터가일상적으로운영될정도로규모가확장되었다.
2. Hadoop 의아키텍처 (1) 개요 Hadoop 에서실제로직의처리는 Map Reduce 방식으로한다. 즉, 흐트러뜨려서 ( 분산하여 ) 처리한후각각의결과만다시모으는방식이다. 단, 이라한처리과정은모두프레임워크에서자동화한다. Hadoop 은빅데이터처리를위해여러대의컴퓨터로클러스터를구성하는데클러스터란특정한기능수행을위해여러대의컴퓨터가네트워크로연결한것을말한다. 이때클러스터를구성하는개별컴퓨터를노드 (node) 라고한다. ( 이하에서는 node 라칭한다 ) Hadoop 은이처럼물리적으로는 node 를연결하고그위에서다양한소프트웨어기법을동원하여거대한프레임워크를구축하였다. 이하에서 Hadoop 의아키텍처를다양한측면에서살펴본다. 가. 사용자차원 사용자차원에서분석가가분석모델링을한후프로그래머에게프로그램작성을요청한다. 프로그래머는 Hadoop 프레임워크에맞추어프로그램을작성한후이를실행한다. 한편클러스터환경에서의데이터와시스템의관리는컴퓨터운영관리자가담당한다. 분석가는실제분석작업을진행하여사업상의중요한정보를찾아낸다. 복잡한모델링이필요치않은일상적분석의경우에는 Pig 및 Pig Latin 과같은 Hadoop 내하위프로젝트에서제공하는간편한질의어를이용하는것이그림에표현되어있다. 나. 물리적차원 Hadoop 클러스터는한대의네임노드 (NameNode) 서버와여러대의데이터노드 (DataNode) 서버로 구성된다.
다. 논리적차원 Hadoop 프레임워크는여러개의데몬 (daemon) 프로그램으로구성된다. 데몬프로그램이란서버의메인메모리상에서백그라운드로 8 수행되는여러가지프로그램을말한다. 이때여러개의데몬프로그램이상호동작하는방식으로서 Hadoop 은 master-slave 의형태를취하는데 Master 란작업수행을지시및관리하고 Slave 는 Master 의관리하에실제부여된작업을수행하는것을말한다. 라. 기능적차원 Hadoop 은핵심모듈 (core module) 과관련프로젝트로구성된다. HDFS (Hadoop Distributed Filesystem) 파일시스템으로서데이터를저장하고이를 Hadoop 프레 임워크에입력하는역할을한다. MapReduce Hadoop 의대표적인프로그래밍모델이다. ( 뒤에서자세히설명한다.) 8 백그라운드작업 (background job) 이란이용자의특별한작업없이배후에서시스템이자체적으로수행하는프로세스를 말한다.
기타의데몬프로그램 HDFS와 MapReduce를보조하여전체적인운영효율을높여주는각종의유틸리티소프트웨어이다. 결국 Hadoop 프레임워크는주로 ( 데이터관리 layer 로서의 ) HDFS 와 ( 프로그래밍모델로서의 ) MapReduce 의 2 가지로구성되며이들 (HDFS 와 MapReduce) 모두각각 master 기능을하는데몬프로그램과 slave 역할을하는데몬프로그램으로나뉘어진다. 즉, Hadoop 에서의서버노드에서는각각다음의여러가지데몬프로세스 9 를수행한다. NameNode ( 및 Secondary NameNode) DataNode JobTracker TaskTracker 이중 NameNode 와 DataNode 는 HDFS 에대한기능을하는데몬프로그램인반면 JobTracker 와 TaskTracker 데몬프로그램은 MapReduce 에대한기능을하는데몬이다. 9 프로세스란프로그램이수행되고있는상태를말한다. 데몬프로세스는이중에서사용자가별도의수행명령을내리는지 여부에상관없이 ( 메인메모리에서 ) 항상수행되면서대기하고있는것을말한다. 서비스프로그램이라고도한다.
그리고 NameNode ( 및 Secondary NameNode) 와 JobTracker 는 master node 에서만수행되는반면 JobTracker 와 TaskTracker 데몬프로그램은나머지여러대의 slave node 에서수행되게된다. Master node ( 서버 ) Slave node ( 서버 ) 클라이언트 PC ( 작업자 ) HDFS NameNode (Secondary node 가있을수있다 ) DataNode MapReduce JobTracker TaskTracker Master node 에게지시하고진행상황을출력한다. 달리말하면 Hadoop 은논리적아키텍쳐상에서 저장을위한계층 (HDFS layer) 과 실제작업을위한계층 (Computation layer 즉, MapReduce layer) 의 2 개레이어로구성된다. 이들각각에서는해당되는여러데몬프로그램이수행되면서사용자의명령을기다린다. 명령은명령어줄 (command-line) 을이용할수도있으나주로데이터흐름을염두에두고필요한알고리즘을구현하는프로그래밍 (Data Flow Programming) 방식을이용하는것이일반적이다. (2) Hadoop 의처리흐름도
Hadoop 에서는데이터가그중심역할을한다. 따라서여기서는데이터와그처리흐름을중심으로살펴 본다. MapReduce에서 : 하나의 master node (JobTracker) 가여러개의작업 node (TaskTrackers) 를운용, 관리한다. 클라이언트 PC는 job을 JobTracker에게 submit한다 JobTracker는각각의 job을 task별로분할한다 (map/reduce) 필요에따라 task를 TaskTrackers 에게배분한다. HDFS (Hadoop Distributed File System) 에서 하나의 name node와여러개의 data node를운용한다. 데이터는고정길이 (64 MB) 의블록으로나누어서처리한다 HDFS에서는입력작업을 map 처리하고그출력작업을 reduce 한다.
3. Hadoop 의기능요소 HDFS 와 MapReduce (1) HDFS (Hadoop Distributed File System) 가. HDFS 설계원칙 앞서도언급했지만데이터를많은사용자가네트워크환경에서이용하기위해개발된분산파일시스템으로는 NFS (Network File System) 가대표적이다. 그러나 NFS 는하나의기기에보관된하나의논리볼륨 (logical volume) 만을원격에서액세스한다는제한점을가진다. HDFS 는이러한 NFS 의한계를극복하기위해고안되었고설계원칙상다음특징을가진다. HDFS는대용량데이터 (terabyte 또는 petabyte) 를저장하도록고안되었다. 이를위해서데이터를여러대의컴퓨터에나누어저장시킨다. 즉, NFS보다훨씬큰파일도지원한다. HDFS에서데이터저장의신뢰성이훨씬높아서개별컴퓨터가이상을일으켜도데이터를이용할수있다. HDFS는데이터액세스의성능개선도용이하다. 클러스터에 node만추가하면이용가능한클라이언트숫자는계속늘어날수있다. HDFS는 Hadoop의 MapReduce와잘통합된다. 반면 HDFS 의성능은개선되었지만설계원칙상불가피하게특정응용프로그램에는적합하지않은결과를초래한다. 즉, NFS 만큼범용은아니라는뜻이다. HDFS 를이용시에는다음의상충되는요소들을함께고려해야한다. HDFS는파일을순차적스트리밍방식으로읽는 (long sequential streaming reads) 응용프로그램을전제로하였다. 따라서무작위로 random access하는경우에는탐색시간 (seek time) 이길어지는단점이있다. HDFS는한번기록한후읽기를여러번반복하는것 (write-once, read-many) 을기준으로하였다. ( 파일에데이터를추가하는기능이 Hadoop 0.19부터지원되기는하지만성능상한계가있다.) 작업대상파일의크기가크고읽기작업이순차적으로이루어지다보니 HDFS에서는데이터의캐싱 (local caching) 을하지않는다. 실제로캐싱작업에서는시스템의부담이커지므로차라리데이터의소스로부터읽기작업을다시실행하는것이오히려더나은경우가많다. 개별기기가장애를일으키는일이빈번할뿐아니라여러대의기기가동시에문제를발생시키는경우도있으므로 ( 예 : rack 의장애로인해여러대컴퓨터가동시에동작하지않는경우 ) 클러스터는이러한것들을견딜수있도록해야한다. 이때비록일부기기의성능이저하되는한이있더라도시스템전체가느려지거나데이터를이용할수없는결과가초래되지않도록대책을세워야한다. 당초 HDFS 는 GFS 를기초로설계된블록기반의 (block-structured) 파일시스템이다. 따라서각각의파일을일정한크기의블록으로나눈후클러스터내의여러기기에분산저장하는데이때의개별기기에는 DataNode 라는파일시스템데몬이동작한다. 이때 Hadoop 은이들각각의블록을보관하는컴퓨터를블록별로 (on a block-by-block basis) 무작위로선택한다. 그결과파일을이용하는때에는여러컴퓨터
사이의협력이필요해지는단점이있는대신단일기기용분산파일시스템보다는지원되는파일의크기가훨씬커지는장점이있다. 파일을이용하는데여러대컴퓨터가협력한다는점에서는이들중한대만고장을일으켜도파일작업을할수없게된다는문제가발생하는데 HDFS 에서는각각의블록을여러대의컴퓨터에중복하여저장하는방식으로이문제를해결한다. ( 이때의중복저장하는숫자를중복인자 (replication factor) 라하며 3 을기본으로하되조정이가능하다 ) DataNode 들이여러파일의블록들을저장한다. 여기서는 replication factor 가 2 이고 NameNode 는파일이름과 그파일의 block id 를대응시킨다. 대부분의파일시스템에서블록의크기는 4KB 또는 8 KB 이다. 반면 HDFS 에서의기본 (default) 블록사이즈는 64MB 로서훨씬큰데이는파일당필요한메타데이터는작아지게하는잇점을제공한다 ( 파일당필요한블록의개수자체가줄어든다 ). 또한 HDFS 에서는데이터를순차방식으로고속 stream read 하므로대용량데이터를효율적으로읽어들일수있다. NTFS 또는 EXT 같은일반파일시스템에서는평균크기가작은파일을전제로하지만 HDFS 는크기가큰파일을전제로한다. 그결과예컨대 100MB 크기의파일이라고해도블록 2 개면충분하다. PC 의경우파일크기도작고이곳저곳의내용을 random 하게액세스하는경우가대부분이지만 HDFS 는프로그램이블록을처음부터끝까지읽어들인다고가정하였으며이로인해 MapReduce 스타일의프로그래밍이편리해진다. HDFS 에서데이터파일을처리하는주된기준은레코드단위이다. 즉, 입력파일을레코드를기준으로절단 (split) 하며각프로세스는 HDFS 파일의위치에따라 ( 이를지역성 (locality) 이라한다 ) 할당된레코드만처리한다. 이는얼핏당연해보이지만실제로는기존데이터처리방식과는정반대이다. 즉, 기존에는작업대상파일 ( 즉, 스토리지 ) 을전산시스템의중앙부에놓고 이때관리효율을높이기위해 SAN 내지 NAS 를이용함 모든컴퓨터 node 는이들중앙의데이터중필요한부분을가져와서작업하고결과를다시중앙스토리지에반환 / 저장하는방식을취하였다. 그러나과정을유기적으로조정하기위해 socket 프로그램내지응용프로그램에서의 marshalling 등많은통신오버헤드가과도하게발생하였다.
그러나 Hadoop 에서는데이터를중앙에모으는대신이를처리할컴퓨터에게보내준다 10 는 (Moving computation to the data) 발상의전환을통해기존방식에서의관리부담이원천적으로사라지게되었다. Hadoop 의프로그래밍프레임워크에서데이터는논리적차원에서레코드단위로처리 (recordoriented) 된다. 따라서각각의입력파일은여러개의줄 (line) 또는별도로응용프로그램의로직에의해지정된나름의형식으로분해된다. 클러스터내의각 node 에서수행되는프로세스들역시이들레코드단위로작업을수행한다. 구체적으로는대상데이터세트 (dataset) 11 를블록단위로나누고중복적으로 slave node 에게분배하는한편이들에대한 MapReduce 의작업결과를읽거나파일에기록하는일련의파일작업을포함한다. 다만, HDFS 는일반 Linux 의파일시스템과는독립된것이므로반드시 Hadoop 명령어를통해서만이를관리할수있다. 일반적으로 Hadoop 에서는데이터파일을 Linux 파일로생성한후이를 HDFS 로복사해와서작업하는방식을취한다. 그리고그파일대해서응용프로그램을통해 MapReduce 작업을수행할때도모든작업은 HDFS 파일을직접다루기보다는 MapReduce 를통해 (key/value) 의쌍으로이루어진 record 단위로작업을수행하는방식을취한다. 즉, 대부분의경우 HDFS 파일에대해직접기록하거나읽는대신응용프로그램에서의 MapReduce 를통해파일작업이진행되는것이다. 나. HDFS 에서의데몬프로그램 NameNode HDFS 에서 master 인 NameNode 는분산환경에서저장기능을담당한다. 즉, 실제작업의대상이되는파일을블록단위로나누어서 slave node 들에게분배할뿐만아니라전체적인 ( 분산 ) 파일시스템의이상유무도체크하고 slave 컴퓨터인 DataNode 에서의데이터입출력작업 (low-level I/O tasks) 을지휘한다. 이러한작업은메모리소모도크고입출력도빈번하므로 NameNode 에이상이발생하면 Hadoop 클러스터는전체가동작을멈추게된다. 10 HDFS 는입력데이터가들어오면즉시그파일을쪼개서각각의 node 에분배한다. 이때하나의파일로부터만들어진여러개조각을각각 chunk 라고부른다. 특기할것은이들 chunk 를각각의노드에분배할때여러개로복제하여분배한다는점이다. 즉, A 라는이름의파일을 3 조각내었다면 ( 이들각각을 A 0, A 1, A 2 라면 ) 이들각각의 chunk 는여러개 ( 통상 3 개이상 ) 로복제하여즉, A 0 /A 0 /A 0, A 1 /A 1 /A 1, A 2 /A 2 /A 2 의여러복제본을 node 에분배한다. 한걸음나아가이들각각의복제된조각에대해이를분배받은 node 는아무런차별없이동일하게지시받은작업을수행한다. 따라서특정 node 에장애가발생하였을경우각 chunk 가다른 node 에존재할뿐만아니라이미동시에수행되고있는상태여서얼마든지장애 node 의내용을다른 node 에서의동일내용 chunk 로대체할수있는것이다. 물론이처럼장애에대처하기위해서전체적인상황을감독하는컴퓨터 (monitoring system) 는데이터를 re-replicate 하는등끊임없이관리작업을수행한다. 11 원래 dataset 는특히행과열형식으로체계화된데이터를말하지만여기서는 ( 즉, Hadoop 에관한논의전체에서 ) Hadoop 의응용프로그램의작업대상이되는데이터를뜻한다고보면된다.
파일시스템에서메타데이터관리의신뢰성도중요하다. 특히데이터파일은 write once, read many 형태를취하지만메타데이터 ( 예 : 파일및디렉토리이름등 ) 만큼은여러클라이언트컴퓨터가동시에수정하려시도할수있으므로동기화기능이중요하기때문에 NameNode 가별도의컴퓨터에서관리한다. NameNode 는파일시스템의모든메타데이터를관리하는데파일당메타데이터의크기가작으므로 ( 파일명, 사용권한, 각각의블록의위치정도만관리 ) 이들메타정보는 NameNode 기기의메인메모리에상주시켜이용한다. 이처럼한클러스터에는단한개의 NameNode 가존재하고 master node 로지정된컴퓨터가 NameNode 를전담관리하며다른작업은일체하지않는다. 작업시클라이언트는 NameNode 에대한질의를통해 ( 특정파일의블럭의목록등의정보등에대한 ) 메타데이터를메인메모리로부터가져온다. 하지만이후의작업은 NameNode 의간섭없이 DataNode 로부터병렬로직접 read 작업을통해수행한다. 이처럼 NameNode 의관여없이데이터를통째로가져오기때문에효율이더욱높아진다. 개별 DataNode 가장애를일으키는경우에도전체적인시스템의이용이가능하지만 NameNode 에장애가발생하면시스템전체의이용이불가능해진다. 다만일상적인작업현장에서는 (DataNode 와는달리 ) NameNode 는주도적역할을하지않기때문에장애의위험성은매우낮다. 만전을기하기위해 NameNode 를이중으로가져가려는노력이있을수있는데그방법이 secondary NameNode 의이용이다. 12 DataNode Hadoop 에서는데이터를읽어들임과동시에각노드에데이터가분배된다. HDFS 는큰데이터파일을여러개로분리시키고각각의 node 가이를처리하게하는데이들각각의조각 (chunk) 은여러대컴퓨터에중복적으로복제되어서한컴퓨터에장애가발생해도다른컴퓨터를통해데이터를이용할수있다. 또한모니터링시스템을통해저장된데이터의일부에문제가발견될때에는이를다시복제하도록조처할한다. 이때이들모든파일조각들은하나의 namespace 를공유하므로클러스터내의모든노드들이이를이용할수있다. slave 기기는 DataNode 데몬을통해분산파일의 read/write 작업을수행한다. 그리고이때 DataNode 데몬프로그램은다른 DataNode 와통신하면서자신의데이터블럭을복제하기도하고직접처리작업을수행하기도한다. HDFS 의분산파일시스템에서의읽기및쓰기작업은대상파일을 random 하게블록단위로나누어진행하는데이들 DataNode 작업은수시로 NameNode 에보고되고그내역은 NameNode 에메타데이터의형식으로저장된다. 12 단, secondary NameNode 의작업은실제 HDFS 에대해이루어지는것이아니고 NameNode 와의연락만을유지한다. 즉, 주기적으로 HDFS 의메타데이터를유지관리함으로써유사시 NameNode 가제역할을하지못할경우를대비한다. 그러나 실제로이상시에는관리자가직접 secondary NameNode 를구동시켜주어야한다.
다. HDFS 명령어 HDFS 는일반 Unix/Linux 의파일시스템과는전혀별개이다. DataNode 데몬을수행하는기기에서 ls 명령을수행하면일반 Linux 파일시스템의내용은보이지만 HDFS 의파일은보이지않는다. 마찬가지로 fopen() 또는 fread() 같은표준의읽기 / 쓰기작업도불가능하다. 요컨대파일시스템으로서의각종작업에대해서 HDFS 는 HDFS 나름의독자적인명령어와 shell 구조를가지고있다는말이다. 이러한현상은 HDFS 가별도의독립된 namespace 를가지기때문이다. HDFS ( 정확히는 HDFS 를구성하는블록 ) 내의파일은 DataNode 서비스가관리하는별도의디렉토리에저장된다. 그리고이들파일은 block id 로만표시된다. HDFS 에저장된파일에 Linux 의일반파일수정방식을 ( 예 : ls, cp, mv, etc) 적용할수도없다. 대신 HDFS 는자체의파일관리방식을가지는데그모습은기존의방식과매우유사하다. 한편 HDFS 파일시스템을이용하려면처음한차례에한해다음 HDFS 명령어를통해포맷팅이되어야한다. ( 단, $ 프롬프트이후의것들이명령어와 option 임 ) user@namenode:hadoop$ bin/hadoop namenode -format 이하에서는이러한선행작업이되었다고전제한다. 한편 Hadoop 의작업에서는일반적으로데이터파일이 Hadoop 이외의곳에서생성되는것을전제로하는경우가많다. 즉, 별도의텍스트파일이있다거나, 또는각종의 log 파일이외부에서이미생성되었다고보고이를 HDFS 로복사하는방식으로읽어들이는것이다. 그리고파일을 HDFS 로읽어들인후에도 MapReduce 프로그램이이를처리할때는 MapReduce 응용프로그램에서직접읽는대신 Hadoop 의 MapReduce 프레임워크를통해 HDFS 파일을 (key/value pair 형태의 ) 개별레코드형식으로파싱 (parsing) 하여사용한다. 기본적인파일 shell 명령어 Hadoop 의파일명령어의표준형은다음과같다.
hadoop fs cmd <args> 여기서 cmd 는구체적인명령어를그리고 <args> 는매개변수 (argument) 를나타낸다. 다만명령어 cmd 는일반 Unix/Linux 의명령어와그형태가매우유사하다. 예를들어서특정디렉토리 내에서의파일의목록을보려면 : hadoop fs ls 라고하면된다. HDFS 의디폴트작업위치는 /usr/$user 이지만 ( 단, $USER 는 login 한사용자이름을뜻함 ) 그렇다고그디렉토리가자동생성되지는않으므로다음의명령어를통해이를직접만들어주어야한다. hadoop fs mkdir /user/hkyoon Linux/Unix 에서로컬파일을 HDFS 로복사해오는명령어는다음과같다. hadoop fs put example.txt 앞서 ls 명령을재귀적으로사용하여하위디렉토리내의목록까지함께보려면 lsr 명령어를이용한다. $ hadoop fs -lsr / drwxr-xr-x - hkyoon supergroup 0 2013-01-14 10:23 /user drwxr-xr-x hkyoon supergroup 0 2013-01-14 11:02 /user/hkyoon -rw-r--r-- 1 hkyoon supergroup 264 2013-01-14 11:02 /user/hkyoon/example.txt 위목록에서마지막줄의소유자이름 (hkyoon) 앞의 1 이란숫자가복제본개수 (replication factor) 를나타낸다. 앞서의 put 과반대로 HDFS 로부터로컬시스템의파일로가져오는명령으로 get 이있다. 한편파일의내용을보려면 cat 이라는명령어를사용한다. Unix/Linux 의 catalog 에해당한다. 다음에서는여기에 head 라는 Linux 명령을 pipeline 으로연결하여적용했다. hadoop fs -cat example.txt head 이외에도다음과같은다양한 HDFS 의파일명령어가제공된다. cat chgrp chmod chown copyfromlocal copytolocal cp du dus expunge get getmerge ls lsr mkdir movefromlocal mv put rm rmr setrep stat tail test text touchz
이들명령어에대한구체적용법에대해서는 http://hadoop.apache.org/docs/r0.18.3/hdfs_shell.html 참조하면된다. 프로그램을이용한 HDFS 파일의이용 Hadoop 에서는각종의작업을위한 HDFS API 가제공된다. 이를위해우선 org.apache.hadoop.fs 의 package 와함께필요한 API 를프로그램에서이용하는데각 API 의세부내용은 Hadoop api 는 hadoop.apache.org/docs/current/api/ 를참조하면된다. 아래에서는 hadoop.txt 라는이름의파일을만든후 "My First Hadoop API call!\n" 라는문자열을기록하고다시이를읽어서화면에출력하는프로그램이다. 13 13 단, 프로그램은전체흐름의이해를위해주석및 package import 부분은생략하였다.
public class HDFSExample { public static final String FileName = "hadoop.txt"; public static final String message = "My First Hadoop API call!\n"; public static void main (String [] args) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path filenamepath = new Path(theFilename); try { if (fs.exists(filenamepath)) { fs.delete(filenamepath); } FSDataOutputStream out = fs.create(filenamepath); out.writeutf(message); out.close(); //Open Config file to read FSDataInputStream in = fs.open(filenamepath); String messagein = in.readutf(); System.out.print(messageIn); in.close(); } catch (IOException ioe) { System.err.println("IOException ~~~: " + ioe.tostring()); System.exit(1); } } } (2) MapReduce 가. MapReduce 기본개념 MapReduce 는 Hadoop 의프로그래밍패러다임으로서입력데이터리스트를출력리스트로변형시킴에있어과정전체를한번에처리하지않고크게두덩어리로나눈후한단계의작업이완료되면그다음의단계로이어지도록하는것을말한다. 마치 Unix/Linux 에서의 pipeline 이여러개의명령어를연결시켜서차례로수행하도록하는것처럼 MapReduce 프로그램은데이터처리작업을 Map 과 reduce 라는두번에걸쳐실시한다. 그리고이과정에서 partitioning 과 shuffling 작업이보조적으로수행된다. 입력 list 의각항목에 Map 함수를적용해서새출력 list 가만들어짐.
프로그래머가 mapper 와 reducer 에필요한작업내용을지정한후 Hadoop 에서수행시키면이들 mapper 와 reducer 가 partitioning 및 shuffling 의지원을받아가면서분산처리되는것이다. 예를들면 mapper 단계에서먼저입력파일을분할하여여러 node 에 ( 중복 ) 배분한후각각할당된데이터를처리한다. Reducer 는 mapper 의처리결과를넘겨받은후이들을종합하여최종결과물을만들어내다. 결국같은 key 를가진 value 는하나로통합된다. ( 다음그림에서는색깔별로각각의 key 를나타낸다고가정 ) 다음의예에서는몇가지단어가수록된입력파일을분석해서단어별빈도수를조사하는모습을그림으로 표현하였다. Haddop의데이터타입 MapReduce 에서는사용자가임의로타입을지정하거나또는 Java 의기본데이터타입을그대로이용할수없며반드시 MapReduce 프레임워크가지원하는데이터타입을이용해야한다. Hadoop 에서 key 와 value 에대해적용할수있는데이터타입의규칙은다음과같다. Writable interface를실행하는 class는 value가될수있다. WritableComparable<T> interface를실행하는 class는 key 또는 value 모두가될수있다.
( 참고로 WritableComparable<T> interface 는 Writable 과 java.lang.comparable<t> interface 를합한것이다.) 이처럼 key 를정의할때비교가능성 (comparability) 이필요한것은 value 와는달리이들이 reduce 단계에서정렬되기때문이다. 다음은 Hadoop 이기본으로제공하는데이터타입인데이들은 Java 표준데이터타입에대한 wrapper 로서 WritableComparable interface 를구현하며 key/value pair 로많이이용된다. BooleanWritable ByteWritable DoubleWritable FloatWritable IntWritable LongWritable Text NullWritable 필요시위 interface 를구현하여자신만의데이터타입을만들수있다. Mapper 함수와 Reducer 함수 Mapper 로사용하기위해서해당 class 를작성할때는 Mapper interface 를구현하면서 MapReduce base class 를확장하는방식으로구현한다. MapReduce 는 mapper 및 reducer 의 base class 로 2 개 method 를가진다. void configure(jobconf job) void close( ) Mapper interface 는 map() 이라는단하나의 method 를가지는데그 signature 는다음과같다. void map(k1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter ) throws IOException 다음은 Hadoop 에서미리제공하는 Mapper 의실행 class 이다. ( 각각의의미는 http://hadoop.apache.org/docs 참조할것.) IdentityMapper<K,V> RegexMapper<K> InverseMapper<K,V> TokenCounterMapper<K> Reducer 역시 MapReduce base classs 를확장하면서 Reducer interface 를구현해야한다. Reducer interface 역시 reduce() method 를가지며그 signature 는다음과같다. void reduce(k2 key, Iterator<V2> values, OutputCollector<K3,V3> output, Reporter reporter ) throws IOException
다음은 Hadoop 에서미리제공하는 Reducer 의실행 class 이다. (http://hadoop.apache.org/docs 참조.) IdentityReducer<K,V> LongSumReducer<K> Word Count의예이제구체적인예제프로그램을살펴본다. 텍스트파일의내용을분석하여출현하는단어별빈도를출력하는것이다. 이프로그램은 Hadoop 을설치하면함께따라오는것으로서그위치는 src/examples/org/apache/hadoop/examples/wordcount.java 이다. 다만, 여기서는한글데이터파일을대상으로하는것으로가정하였고프로그램은대폭단순화하였다. 우선아래의시 ( 詩 ) 를내용으로하는파일이있다고가정한다. 때로 1 때로마주친책과사람들에게서.. 주눅이들게하는글을만납니다.. 주눅이들게하는사람을만납니다.. 이시에나오는단어별 빈도는오른쪽과같다. 14 마주친 1 책과 1 사람들에게서 1 주눅이 2 들게 2 글을 1 만납니다 2 사람을 1 이러한단어의빈도분석프로그램의기본로직은다음과같다. define WordFrequency as FrequencyTable; for each document in documentset { T = tokenize(document); for each token in T { WordFrequency [Word]++; } } display(wordfrequency); 위에서는빈도결과를저장할 WordFrequency 라는테이블을지정한후각각의문서 ( 여기는하나의 문서 ) 에대해분석을실시하였다. Tokenize 는문장을단어별로분절해주는함수로서각각의단어가발견될 때마다 WordFrequency[Word] 를 1 씩증가시켰다. 14 문장에서단어를추출하는것을 tokenize 한다고하는데이는검색을위한색인추출 (indexing) 의핵심과정이다. 다만 실제로는어근과어미의분리, 단수형과복수형처리및현재형 / 과거형 / 미래형의처리등형태소분석문제가발생할수있지만 여기서는이를생략하였다. 논의의주제를해칠수있기때문이다.
이제이러한작업을각각 500 페이지인책 1,000 권에대해수행한다고하자. 이런단순한작업만도웬만한컴퓨터로며칠씩걸릴것이다. 심지어데이터가수시로추가 / 변경되거나또는더복잡하고세련된분석을하고자한다면뭔가다른방법을강구하지않으면안된다. 이경우위로직을프로그램을여러대의컴퓨터에서동시에수행시킨후결과를취합하면수행시간이수십분의 1 로줄게된다. 이를위해다음과같이수정한다. ( 분배처리단계 ) define WordFrequency as FrequencyTable; for each document in documentsubset { T = tokenize(document); for each Word in T { WordFrequency[Word]++; } } partition_and_sort(wordfrequency); sendtosecondphase(wordfrequency); ( 결과합산단계 ) define totalwordfrequency as FrequencyTable; for each WordFrequency received from firstphase { FrequencyTableAdd (totalwordfrequency, WordFrequency); } 여기서는 Partition_and_sort() 을통해앞의처리결과 WordFrequency[Word] 를가 / 나 / 다 / 순서로 나눈후정렬하고 ( 중간합산하는등의추가작업은생략하였다.) 그결과를다음단계에전달하였다. 그리고 합산단계에서는이들을전체목록 (totalwordfrequency) 에추가하였다. 위 분배처리단계 가 MapReduce 에서의 mapping 단계에해당하고, 결과합산단계 는 reducing 단계에해당한다. 결국 mapping 단계는변환및필터 (filter) 의단계이고 reducing 단계는종합 ( 합산 ) 의 단계이다. 한편 mapper 와 reducer 그리고 partitioning 과 shuffling 의제반작업이매끄럽게이루어지려면이들 각각을처리한후다음단계로넘겨지는데이터가일정한구조를가지는것이필요하다. 실제로이들 mapping 단계와 reducing 단계에각각은물론이고이들여러단계의연결과정에서도모든데이터는 (key/value) 의 list 로처리된다. 대상데이터내용이어떠하든, 그리고 mapper, reducer 에지정된 업무로직이어떠하든그작업이효율적으로수행되기위해서는 MapReduce 에서주된데이터포맷 (data primitive) 으로 list 와 (key/value) pair 를이용하는것이다. 다시말하면 MapReduce 에서는그어떤 값도독립해서존재하지않는다. 모든값에는자신에적용되는 key 가존재하며또한모든 Key 는관련된 값을식별해준다. 다음그림에단계별로전달되는데이터의 (key/value) pair 의형태와그설명과함께 나타나있다. 얼핏보면매우복잡해보이지만우리는여기서기본규칙을발견할수있다. 즉, 데이터처리작업을 map 과 reduce 의단계로나눈후각각의처리로직을 Hadoop 이지정한 API 를이용하여프로그램
작성한다. 이때데이터에대해서는포맷을 (key/value) pair 로하되이들이단계별로이동할때미리정한규칙을따르게하면이후의전반적인처리는 Hadoop 의 MapReduce 에서관리하고조정한다. 즉, 개발자가이들규칙에충실히프로그램을작성하면하나의작업을수백, 수천대컴퓨터에동시병렬적으로처리할수도있고결과적으로전체적인처리성능은무한히커질수있는것이다. 다음은 WordCount 프로그램의작업로직을좀더 MapReduce 규칙에부합하는용어로 ( 여전히 pseudo 코드로 ) 작성한것이다.
map(string filename, String document) { for each document in documentsets{ line = readline(document) List<String> T = tokenize(document); for each token in T { emit ((String)token, (Integer) 1); } } } reduce(string token, List<Integer> values) { Integer sum = 0; for each value in values { sum = sum + value; } emit ((String)token, (Integer) sum); } map() 과 reduce() 함수모두출력물이리스트형태를가진다는점만유념한다면이코드는앞서의것과 매우유사함을알수있다. 이제실제 Hadoop 에서의 Java 코드를본다. ( 단, 가독성을위해 package import, exception 및 컴파일러 annotation 을생략했다.) public class WordCount extends Configured implements Tool { public static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(longwritable key,text value,context context){ String line = value.tostring(); StringTokenizer tokenizer=new StringTokenizer(line); while(tokenizer.hasmoretokens()){ word.set(tokenizer.nexttoken()); context.write(word, one); } } }
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ public void reduce(text key,iterable<intwritable> values, Context context){ int sum = 0; for(intwritable value: values){ sum += value.get(); } context.write(key, new IntWritable(sum)); } } public int run(string[] args) { Job job = new Job(getConf()); job.setjarbyclass(wordcount.class); job.setjobname("wordcount"); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); job.setmapperclass(wordcountmap.class); job.setcombinerclass(wordcountreducer.class); job.setreducerclass(wordcountreducer.class); job.setinputformatclass(textinputformat.class); job.setoutputformatclass(textoutputformat.class); } FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean success = job.waitforcompletion(true); return success? 0: 1; } public static void main(string[] args) { int result = ToolRunner.run(new WordCount(), args); System.exit(result); } Hadoop 의모든 MapReduce 프로그래밍은실제로위와같은모습을가지므로실제작업에있어서는 프로그램별로일일이새로작성하지않고앞에서와같은프로그램 template 을변형하여이용하는것이 보통이다. 다른예로서여러대자동차의시간별속도정보가 log 파일에담겼다고하자. 아마도번호판을 key 로하고관련된정보가기록되어있을것이다.. 나-1263 노-7489 허-4123 라-4656... 65kph, 12:00pm 50kph, 12:02pm 70kph, 12:05pm 65kph, 12:15pm Hadoop 에서의 mapping 및 reducing 함수는값 (values) 만을받을수는없고언제나 (key, value) 의 pair 를받는다. 이들각각의함수의출력역시동일한원칙이적용되며이들모두 key 와 value 가다음 단계의데이터흐름에서의 list 형태로산출 (emitted) 된다.
다만 MapReduce 에서는 Mapper 과 Reducer 의동작방식이다른언어보다는덜엄격하다. 함수형언어에서는하나의 mapper 가반드시각각의입력항목에대해하나의출력항목을산출하고 reducer 역시하나의입력리스트에대해하나의출력항목만을만들어야한다. 그러나 MapReduce 에서는각단계에서임의의수의 (0,1, 또는심지어수백개의 ) 값들이산출될수있다. reducer 도마찬가지로임의의수의값을출력할수있다. 단계별처리과정에서 Key 를기준으로 reduce space 가분할되며 reducer 함수는많은값들의 list 를하나의값으로변환시킨다. 나. MapReduce 의주요데몬 JobTracker MapReduce 데몬들도 HDFS 와마찬가지로 master-slave 의구조를가진다. 이때 JobTracker 는 master 의역할을, 그리고 TaskTracker 는 slave node 의역할을담당한다. JobTracker 는응용프로그램과 Hadoop 사이의중간연락을하는데몬으로서파일처리를위한실행계획, node 할당, task monitoring 등을담당한다. 즉, 프로그램을수행시키면 JobTracker 가대상파일을찾아낸후각각의 node 에게작업할내용을할당하고이것이제대로수행되고있는지감독한다. 그리고이과정에서작업이실패하면다른노드에게재작업을지시한다. JobTracker 는 master node 가이를수행하는데 Hadoop 클러스터내에는단 1 개의 JobTracker 데몬이존재한다. TaskTracker JobTracker 가 MapReduce 의마스터기능을수행하는것에대응하여 TaskTracker 는 JobTracker 가지시한사항을성실하게집행한다. 그리고 (HDFS 에서의 NameNode 와 DataNode 와마찬가지로 ) 이들 TaskTracker 는 JobTracker 와지속적으로통신하면서정상적인동작유무를확인하는데이상이발생하면 JobTracker 는해당작업을클러스터내의다른 node 에게중복하여수행하도록지시한다. 특기할것은각각의 slave node 에는한개의 TaskTracker 가존재하지만이하나의 TaskTracker 는여러개의 JVM 을생성시켜서각각의 slave node 내에서여러개의 map task 와 reduce task 를병렬적으로수행시키게된다는점이다. 다음그림은 1 대의 master (NameNode 와 JobTracker 가수행됨 ) 와 4 대의 slave node (DataNode 와 TaskTracker 가수행됨 ) 가이용되는모습으로서백업용의 secondary NameNode 가존재함을알수있다. 이들 master 와 slave node 들상호간통신에서는 SSH 채널이이용된다.
그동안논의된 Map/Reduce 전체를정리한그림은다음과같다. 앞의그림을간단히살펴본다. HDFS 클러스터에파일이적재됨으로써 MapReduce 입력이시작된다. 이들파일은전체 node에균등하게배분되는데이에대해 MapReduce 프로그램이수행되면서 node에서는 mapping task가시작된다. Mapping작업에서각각의 task는동등한것으로서서로구별되지않으며각 mapper는그어떤입력파일도처리가능하다. 각각의 mapper는자신컴퓨터근처에존재하는파일을적재한후곧바로처리에들어간다. mapping 이끝나면중간산출물로서의 intermediate (key, value) pair가각컴퓨터사이에서교환되고같은 key를가지는모든 value들은하나의 reducer에게보내진다. Reducer에서여러개 mapper로부터의출력이하나로병합된다.
끝으로 2004 년 Google 에서발표한 MapReduce 논문 (http://research.google.com/archive/mapreduce-osdi04.pdf ) 의그림을소개한다. 번호별설명을 통해내용을이해할수있을것이다. 다. MapReduce 의데이터흐름 Hadoop 에서의 MapReduce 의중요성을감안하여작업프로세스가아닌데이터의흐름을중심으로 정리하여본다. 아래그림에서는각단계에서의데이터흐름을 pipeline 으로보여주고있다. Hadoop MapReduce 의데이터흐름에대한상세한모습
입력파일 : MapReduce task 용데이터는보통 HDFS 상에존재한다. 파일포맷은그때그때다른데 line 기반의 log 파일, binary 포맷, multiline 의입력레코드등그어떤것도가능하다. 다만, 용량은매우클것이다. InputFormat: 입력파일의분할방식이나읽어들이는방식을정의하는클래스이다. 입력으로사용될파일또는기타의 object를선택한다. 파일을 task로분해할 InputSplits 을정의한다. 파일을읽기위한 RecordReader 를생성하는 factory를제공. Hadoop 은여러개의 InputFormat 관련클래스를제공한다. FileInputFormat 라는이름의 abstract type 이존재해서파일을대상으로하는모든 InputFormat 은이클래스로부터파생된다. Hadoop 의 job 을수행하기시작할때 FileInputFormat 에읽으려는파일의경로를제시하면 FileInputFormat 이이디렉토리에있는모든파일을읽어들인다. 그런후각각의파일을여러개의 InputSplit 으로분해한다. 개발자는입력파일에어떤 InputFormat 을적용할지를 JobConf object 의 setinputformat() method 를호출하는방식으로정의한다. 표준 InputFormat 이다음표에나와있다. InputFormat 설명 Key Value TextInputFormat 디폴트포맷이며 각 line 의 각 텍스트파일의각 byte offset line 의 줄을읽어들임. 내용 KeyValueInputFormat 각 line 을 key, val 첫째 tab line 의 pair 로 문자까지의 나머지 parse 한다. 모든내용 내용 SequenceFileInputFormat Hadoop 고유의 사용자정의 사용자 고성능바이너리 정의 포맷 TextInputFormat은입력파일의각 line을별개의레코드로취급하지만별도의 parsing작업은하지않는다. 이것은특히 unformatted 데이터또는로그파일과같은 line기반의레코드등에유용하다. KeyValueInputFormat 역시입력파일의각 line을별개의레코드로취급한다. 다만 TextInputFormat이 line 전체를하나의값 (value) 으로여기는반면 KeyValueInputFormat은각 line을 tab 문자를기준으로 key와 value로분해한다는점이다르다. SequenceFileInputFormat 은 Hadoop에특수한바이너리파일을읽어들이는데 Hadoop mapper 에고속으로읽어들일수있는몇가지부가기능이제공된다.
InputSplits: InputSplit 은 MapReduce 프로그램에서 map task 의작업단위가된다. Data set 에적용되는 MapReduce 프로그램은이를총체적으로 Job 이라고부르는데이 job 은수백개의 task 로구성된다. Map task 는파일전체또는일부분만도읽을수있다. 디폴트상태에서 FileInputFormat 과하위 class 는파일을 64 MB 단위의 chunk (HDFS 에서의블록크기와동일 ) 로분할되는데이값은 hadoopsite.xml 에서 mapred.min.split.size 파라미터를이용하거나특정 JobConf object 에서파라미터를 override 하는방식으로변경할수있다. 파일을 chunk 단위로처리하면하나의파일에대해여러개의 map task 를병렬수행할수있어서성능이획기적으로높아진다. 또한각각의블록을한 node 에서다른 node 로옮길필요없이파일을구성하는다양한블록을클러스터내의여러 node 에분배하여해당 node 에서직접 (locally) 처리할수있다. 파일포맷이 chunk 단위로쪼개기어려울경우에는 custom 의 InputFormat 을작성해서파일분할의조건과방법을지정할수도있다. RecordReader: InputSplit 을통해일의단위가지정되지만그액세스방법은정의되지않는다. 데이터를적재한후 Mapper 가읽기쉬운 (key, value) pair 로변환하는일은 RecordReader 클래스가담당한다. RecordReader instance 는 InputFormat 에의해정의된다. 디폴트의 InputFormat 인 TextInputFormat 은 LineRecordReader 를제공하는데여기서는입력파일의각각의 line 을새로운값으로취급한다. 각 line 에대한 key 는파일에서의 byte offset 이다. 입력시 InputSplit 전체가완료될때까지 RecordReader 는계속호출 (invoke) 된다. RecordReader 가호출되면 Mapper 의 map() method 역시호출된다. Mapper Key 와 value 가주어지면 map() method 는 (key, value) pair(s) 를 Reducer 에게전달한다. 새로운 Mapper instance 는각각의 map task (InputSplit) 에대한별도의 Java 프로세스속에서만들어진다. 이러한 map task 는전체적으로하나의 job input 을구성한다. 각각의 mapper 는다른 mapper 와어떤통신도하지않는데따라서각 map task 의신뢰성은로컬기기의신뢰성에전적으로좌우되게된다. map() method 는 key 와 value 이외에 2 개의파라미터를전달받는다. OutputCollector object는 collect() 를통해 (key, value) pair를 job의 reduce 단계로전달해준다. Reporter object 는현재 task에대한정보를제공한다. Reporter의 getinputsplit() 는현재의 InputSplit을설명하는 object를반환한다. 또한 map task로하여금진행상태에대한정보를시스템내다른요소에게제공할수도있다. setstatus() method 를통해사용자에게상태메시지를제공할수도있다. incrcounter() 를통해서는 shared performance counter를증가시킬수도있다. Partition & Shuffle: 첫번째 map task 가종료되면각 node 들이다른 map task 들을수행하고있는도중에도 map task 로부터의중간산출물을이를필요로하는 reducer 에게로전달하기시작한다. 이처럼 map 의산출물을 reducer 에게로옮기는것을 shuffling 한다고하는데중간단계 key space 의일부가각각의
reduce node 에할당된다. 이들 subset ( 이를 "partition" 이라고함 ) 들은 reduce task 에입력된다. 각 map task 는그어떤 partition 에도 (key, value) pair 를전달할수있다. 하나의 key 에대한모든값은항상그원천이어떤 mapper 였든상관없이병합된다. 따라서중간산출데이터의각항목을어디로보낼지에대해 map node 는의견일치를보아야한다. Partitioner 클래스는주어진 (key, value) pair 가어떤 partition 으로갈지를결정하는데디폴트상태에서는 key 에대한 hash 값에따라 partition 을할당한다. Combiner: Combiner 를이용하면 MapReduce job 이사용하는대역폭을절감할수있다. Combiner 는 Mapper 와 Reducer 사이에서진행되는것으로서선택적적용이가능한데이경우 map task 가수행되는모든 node 에대해 Combiner 클래스의 instance 가적용된다. 각각의 node 에서 Mapper instance 가산출한데이터를입력받고 Combiner 가지정된작업을하면그결과물은 Reducer 에게보내진다. Combiner 는일종의 "mini-reduce" 프로세스로서하나의단위컴퓨터에서생성된데이터만을대상으로한다. 이밖에 Fault Tolerant 기능과 Checkpoint 기능도있다. 앞서의 WordCount 프로그램의경우각단어가발견될때마다 (word, 1) pair 를산출했는데예컨대 " 주눅이 " 라는단어가 2 번발견되면 (" 주눅이 ", 1) pair 가 2 번출력되고따라서 Reducer 에게도 2 번전달된다. 그러나, Combiner 를통하면이들이각각의컴퓨터에서합산되어서 (" 주눅이 ", 2) pair 가단한번만 Reducer 에게전달된다. 이처럼모든노드에서여러번반복해서전달되던항목을중간합산하여각단어당한번씩만전달됨으로써 shuffle 프로세스에서요구되는대역폭을획기적으로줄이게되고결과적으로 job 처리속도가개선된다. 이것역시별도의프로그램작업없이구동프로그램에다음의한줄만삽입하면 MapReduce 프레임워크가자동진행해준다. conf.setcombinerclass(reduce.class); Combiner 단계가 MapReduce 의데이터흐름에포함되었다.
Sort: 각각의 reduce task 는여러개의중간 key 에관련된 value 를합산 (reduce) 한다. 개별 node 에서의 일련의중간 key 는 Hadoop 이이를자동으로정렬한후 Reducer 에게보내진다. Reduce: 각각의 reduce task 에대해 Reducer instance 가만들어진다. Reducer instance 는사용자가생성시키는것으로서 job 별로중요한 2 번째단계가된다. Reducer 에게할당된 partition 에서의각각의 key 에대해 Reducer 의 reduce() method 는단한번호출되는데이를통해 key 에연결된모든 value 에대한 iterator 와 key 를받는다. iterator 에의해하나의 key 와이에관련된 value 들이반환될때그순서는무작위이다. Reducer 는또한 OutputCollector 와 Reporter object 를파라미터로받게되는데이들은 map() method 에서와같은방식으로이용된다. OutputFormat: OutputFormat 은 Hadoop 이파일기록에이용하기위해제공되는것으로서일반적인 FileOutputFormat 에서상속된것이다. OutputCollector 에게제공되는 (key, value) pair 는출력파일에기록된다. 이때실제기록되는방식은 OutputFormat 에따라다르지만기본적으로앞서의 InputFormat class 와같은방식으로동작한다. 즉, Hadoop 이제공하는 OutputFormat 의 instance 가로컬디스크또는 HDFS 상의파일에기록된다. 각각의 Reducer 는각파일을일반적인출력디렉토리에기록될때통상 part-nnnnn 라는이름을가진다. ( 여기서 nnnnn 는 reduce task 와관련된 partition id 이다.) 출력디렉토리는 FileOutputFormat.setOutputPath() method 에의해결정된다. 특별한 OutputFormat 을사용하려는경우에는 MapReduce job 을정의하는 JobConf object 의 setoutputformat() method 를통해지정한다. OutputFormat TextOutputFormat SequenceFileOutputFormat NullOutputFormat 설명 Default; line 을 "key \t value" 형태로기록한다뒤에오는 MapReduce job 으로읽어들이기에적당한형태의바이너리파일로기록한다. 입력을무시한다 디폴트상태의 instance 는 TextOutputFormat 으로서텍스트파일의각 line 에 (key, value) pair 를 기록한다. 이를나중에 MapReduce task 를통해 KeyValueInputFormat class 로다시읽어들일수 있는데이는사람도읽을수있다.
MapReduce job 들상호간에이용되는또다른포맷이 SequenceFileOutputFormat 인데이는임의의데이터타입을파일로신속하게 serialize 해주는기능을한다. 이에대응되는 SequenceFileInputFormat 은파일을같은타입으로 deserialize 하고앞서의 Reducer 가산출했던것과같은방식으로다음 Mapper 에게전달한다. NullOutputFormat 은아무런출력파일을만들지않으며 OutputCollector 에의해전달받은 (key, value) pair 를무시한다. 이는 reduce() method 에서독자의출력파일에기록하고있어서 Hadoop 프레임워크에의해추가의빈출력파일이만들지않으려는경우유용하다. RecordWriter: InputFormat 이실제로개별레코드를 RecordReader 실행을통해읽는것과마찬가지로 OutputFormat 클래스도 RecordWriter object 에대한 factory 역할을한다. 이들은 OutputFormat 에지정된대로개별레코드를파일에기록하는데이용된다. Reducer 에의해작성된출력파일은 HDFS 에남아있으므로다른 MapReduce job 또는별도의프로그램또는사용자의직접개입을통해이용할수있다. 라. Task 의투기적실행 NFS 및 AFS 같은분산파일시스템에서간혹로컬파일시스템보다오히려성능이훨씬늦은것처럼보이는경우가있다. 이는여러노드와 CPU 간에 cache 메모리의내용을동기화 ( 이를 cache coherence 라고부른다 ) 하고데이터안전을확보하기위해 I/O 작업을조정 ( 일종의 synchronous I/O) 하기때문이다. 성능을개선하기위해장착된 cache 메모리가오히려여러컴퓨터간의내용동기화필요성에따라부담으로작용하는것이다. Hadoop 에서도문제가된것중의하나가 task 를여러 node 가나누어실행할때처리속도가늦은일부 node 가전체시스템의성능을저하시키는문제였는데하드웨어문제, 소프트웨어설정 (configuration) 의오류등그이유는여러가지가있을수있다. 특히문제가되는것은오히려프로그램이정상적으로종료될때이를찾아내기가더욱어려워진다는점이다. Hadoop 은이처럼 task 가당초예정된시간보다오래걸릴경우아예새로운 task 를다시수행시키는방법을취한다. 일종의백업개념인셈이다. 예컨대만약 Hadoop 클러스터내의한 node 컴퓨터가매우낮은속도의디스크콘트롤러를가져서다른 node 에비해 10% 의속도밖에나지않는다고하자. 이경우다른 99 개의 map task 가이미수행을종료했는데도마지막하나의 map task 가끝나기를기다리게될것이다. 이경우추가의 task 를독립적으로수행시키고개별 task 는자신의입력이어디에서제공되는지를모르게한다. Task 들은단지 Hadoop 플랫폼만을신뢰하고자신의입력과처리된출력결과만을 Hadoop 과의협력하에제공하게된다. 결과적으로동일한입력이동시에여러번처리하게되며 Job 에서의대부분의 task 가종료됨에따라 Hadoop 은잔여의 task 중중복된 (redundant) task 를더이상할일이없는나머지 node 에분배 (schedule) 한다.
이러한작업프로세스를투기적실행 (speculative execution) 이라고하는데만약 task 가완료되면이사항을 JobTracker 에게알린다. 여러개의중복된 task 에서먼저종료된 task 가실제수행자로확정되는데이를 definitive copy 라고부른다. 만약이때에도다른중복 task 가투기적실행을하고있다면 Hadoop 은 TaskTracker 로하여금이들 task 및그결과를포기, 무시하라고지시한다. Reducer 는결국가장먼저완료되어도착하는 Mapper 로부터의입력사항만받아들이게된다. 투기적실행은 default 상태에서활성화되어있지만 JobConf 의 option 중다음 2 개항목을 false 로하여비활성화할수있다. 이전버전의 Hadoop 의경우 : mapred.map.tasks.speculative.execution mapred.reduce.tasks.speculative.execution 새버전에서는 JobConf 의 option 중다음을 false 로설정하면된다. mapreduce.map.speculative mapreduce.reduce.speculative
4. Hadoop 설치운영과프로그래밍 (1) Hadoop 의설치와운용 가. 설치 권장기기사양그동안소위저가하드웨어 (commodity server) 로표현되어왔던하드웨어에대해 Hadoop 이권장 ( 또는용인 ) 하는것은없다. Hadoop 의설계원칙을생각할때 Linux 를적용할수있는정도라면구형서버나심지어 PC 도적용가능하다. 다만, 전체시스템의안정성과효과적운영을생각한다면 Hadoop 의하드웨어시스템역시최소한의성능과안정성을갖출필요가있으므로다음과같은사양을권장한다. mid-level rack 타입의서버 듀얼 socket, ECC RAM 메모리 네트워크카드 : 기가비트급 Ethernet 권장단, DataNode 서버에는 RAID 를적용하지않아야한다. HDFS 에서파일복제와에러체킹기능을이미구현하고있으므로이를중복지정할때발생되는혼란을피하기위함이다. 다만 NameNode 에는 RAID 와같은신뢰성향상장치가얼마든지허용되고권장된다. 일반적으로 Xeon processors 1.8-2.0GHz 이상으로하되 Hadoop job 이 core 당 1 ~ 2 GB RAM 소모. (Python 등 script 사용시메모리추가소요 ) 하므로이에의거하여충분한메모리장착을권고한다. 또한일정한수 ( 복수 ) 의 HDD 를가지는시스템여러대가바람직하다 Hadoop 은기본적으로 I/Obound job 을주로수행하기때문이다. 설치요건사항 Java 환경 : Sun Java 1.6 이상 운영체제 : 가급적 Linux를권고한다. (cygwin을설치하면 MS Windows에서도 Hadoop이용이가능하긴하다 ) 다운로드및설치 Hadoop 을설치하는방법은크게 Apache 프로젝트사이트에서다운로드하여설치하는방법과 Cloudera 또는 Hortonworks 와같은독자적배포판 (CDH) 을이용하는방법이있다. 이는마치 Linux 에서 RedHat 의 Enterprise Linux, Fedora, Ubuntu 등의다양한배포판이있는것과같은데모두동일한오픈소스기반이며단지 Hadoop 관련프로젝트의조합과이에대한테스트절차및패키징이다를뿐이다. 독자배포판을이용하는것이여러가지로편리하지만여기서는예시의목적으로 Apache 프로젝트사이트에서다운로드설치하는방식을개략적으로기술한다.
http://hadoop.apache.org/releases.html#download 에서다운로드하고압축을푼후 (gunzip) 다음과같이설치한다. $ tar -zxf hadoop-1.0.8.tar.gz $ chmod -R g-w hadoop-1.0.8/ root 권한으로 unpack 된내용을적당한위치 ( 예컨대 /usr/local) 로옮긴다 $ sudo mv hadoop-1.0.8 /usr/local/ $ cd /usr/local $ sudo chown -R root:root hadoop-1.0.8/ 이제설치가완료되었으며설정사항의변경이필요한경우 conf/hadoop-env.sh 및 hadoop-site.xml 를 이용한다. HDFS 의환경설정 (configuration) Hadoop 설치디렉토리밑의 conf/hadoop-defaults.xml 파일에는각종데몬프로그램의지정, 복제본 갯수 (replication factor) 또는 Block size 등다양한환경설정값이 key-value pair 의형태로저장되어 있다. <property> <name>property-name</name> <value>property-value</value> </property> 따라서설정값의변경이필요하다면이곳에서바꾸어주면된다. 나. Hadoop Cluster 의운영환경 Hadoop 클러스터는기본적으로다음 3 가지운영모드중하나를선택할수있는데이들은각각 개발, 확산준비 (staging), 운영 (production) 의 3 개단계에해당한다고할수있다. Local (standalone) 모드 디폴트모드로서아래의설정파일은모두빈파일로설치된다. 이때는파일시스템도 HDFS를사용하지않고 Hadoop의각종데몬프로그램도수행되지않을뿐아니라모든프로세스는단하나의 JVM 위에서수행된다. Linux 명령어를이용해서파일작업을하게되는데주로 MapReduce 프로그램을처음개발하거나디버깅하는용도로사용된다. Pseudo-distributed 모드 정식의 Hadoop 클러스터모드이지만단지물리적으로서버한대에모아놓은것을말한다. 따라서 Hadoop의 HDFS 파일시스템도이용되고모든데몬도수행된다. 각각의데몬은별도의 JVM 위에서수행되며한대의서버임에도불구하고데몬상호간에는 SSH 프로토콜로통신을한다. 뿐만아니라각서비스마다별도의로그파일이생성되는등한대의서버내에서도마치이모든것들이분산환경인것처럼동작한다. Fully Distributed 모드 완벽한 Hadoop의분산클러스터환경이다. 즉, master node와 backup 서버그리고 DataNode, TaskTracker 데몬등이독립적으로분산환경에서동작한다. 한편이들은모두다음 3 개설정파일의내용을변경함으로써가능하다.
core-site.xml hdfs-site.xml mapred-site.xml 한편빅데이터의 Hadoop 클러스터는상대적으로매우큰시스템으로운영되는경우가많다. 수십대는기본이고수백대또는수천대에이르는경우도흔하다. 이때수십대 rack 이동원된다면 rack topology 설계역시중요하다. 대규모환경에서는가상화엔진을적용할수도있다. 아래그림은이러한 multi-rack 환경을나타내고있다. 대규모 Clusters: ( 수백대이상 ) 의 Multiple Rack 환경 위와같은경우 Rack awareness 의문제가발생한다. 이러한것을 rack-aware placement 정책이라고하며 multi-rack 에서 block 의 replica 의분산에따른데이터손실이없도록하려는것이다. Hadoop 에서는 DNSToSwitchMapping 이라는 interface 를이용해서 rack topology 에대응하도록 java 프로그램을작성한다. 또는사용자스크립트가각노드에서수행되도록디폴트 mapper 를이용한다.
(2) Hadoop 프로그래밍 가. 개발도구 Eclipse 를이용 Eclipse 용 MapReduce plugin 을추가하여사용할수있다. NetBeans 이용 NetBeans 에도 MapReduce plugin 이있으므로이를사용할수있다. 나. Java 와다른언어의혼용 Unix/Linux 에서 pipeline 개념은여러처리작업 ( 모듈 ) 을서로연결 (chain) 시키는것을말한다. 15 또한이와관련된개념으로 Message queue 라는것도있는데이는하나의작업을위해여러프로세스가상호협력하는경우이들프로세스간의통신 (IPC: inter-process communication) 방법의하나로서이들 (processing primitives) 사이의동기화를담당한다. 이방식을취할경우프로그래머는자신이원하는결과를얻기위해전체적인처리방식을먼저설계한다. 그런후이를몇개의작은처리방식으로나누고 15 Linux 에서의 pipeline 란하나의프로그램에서처리된결과물을다른프로그램을넘겨주고그로하여금처리하게하는 것으로서 pipe 라고도한다. 예를들어다음이 people.txt 라는이름의파일이있다고가정한다. ( 즉, 성 ( 성 ), 이름, 나이, 우편번호, 전화번호의 4 개의 column 으로구성됨 ) 김철수 38417-941555-1212 Nixon Nixon 26138-705916-5763 신민아 19407-051246-3457 김영희 47135-601674-6972 김씨성을가진사람을모두모아서우편번호의순서로정렬하고자하는경우다음과같이처리할수있다. grep 김 people.txt > grep.out sort +3 grep.out rm grep.out
순서대로늘어놓아단계적으로수행하게된다. 이때앞단계 (producer) 의처리결과를뒷단계 (consumer) 에게전달하는방식이된다. 다. Hadoop 클러스터의관리 HDFS 에서의 Node 퇴출 (decommissioning) 다음의절차에따라특정 node 를퇴출하거나재등록할수있다. Step 1: 클러스터 configuration 의 excludes 파일을이용한다. 즉, conf/hadoop-site.xml 파일에 dfs.hosts.exclude 키를추가하고 NameNode 의파일에해당하는경로를지정한다. Step 2: decommission 할호스트를결정한다. 즉, dfs.hosts.exclude 에해당기기를등록함으로써 NameNode 에연결되는것을방지한다. Step 3: 설정정보를 reload 시키기위해다음명령을수행한다. bin/hadoop dfsadmin -refreshnodes. bin/hadoop dfsadmin refreshnodes Step 4: 노드를 Shutdown한다. decommission 후그 H/W는 shutdown가능하다. 이때 bin/hadoop dfsadmin -report 명령을이용하면현재연결된노드의목록을볼수있다. Step 5: excludes 파일을재수정한다. 일단 decommission되면 excludes 파일로부터다시제거한다. (3) 결론 가. Hadoop 플랫폼 Hadoop 은많은관련프로젝트를통해하나의생태계를구성했으나프로젝트중몇가지는보완적인 성격을가지고있으며아울러다음과같이각종도구간에직간접적인연계작업이가능하도록되어있다.
나. HPC vs. 빅데이터 슈퍼컴퓨터가빅데이터에기여한만큼이나차이점또한분명하다. 즉, 빅데이터에서는데이터의입출력을효율화하기위해컴퓨터 I/O ( 입출력 ) 에많은중점을두는반면수학 / 과학연산및공학계산등을중심으로하는 Technical 컴퓨팅은분산시스템만으로는해결이어려운경우가많다. 고성능의 CPU 동원과이들의병렬처리가불가피하며이를위해 Lustre 파일시스템과같은병렬파일시스템이흔히이용된다. 뿐만아니라흔히이용되는가상메모리뿐만아니라계산의효율을위해불가피하게많은 real 메모리가소요되기도한다. 최근병렬프로세싱에도빅데이터의기술을혼용하려는여러가지노력이경주되고있으나기술의원천과개발목적상빅데이터기술과 HPC 상에는여전히큰차이점이존재하는것이사실이다. 다. Hadoop 이적합한영역 다음의경우가 Hadoop 을적용하기에적합한대표적인예이다. 복합형 (Complex) 데이터 데이터의소스가여러곳인경우 데이터의양이많은경우 Batch 처리가가능한경우 병렬처리가가능한경우 클러스터내의여러 node로데이터를분배할수있는경우실제로 log 파일의분석, 고객데이터분석, 검색엔진에서의 index 작업, 텍스트분석 (text analytics 문맥분석, cluster 분석, 문서분류등 ) 유전자배열, 이미지분석등에큰효과를보고있다. 라. Hadoop 이적합하지않은영역 수치계산 ( 예 ; 구조해석, 시뮬레이션, Fibonacci sequence의계산등 ) 과 CPU나메모리의집중적활용이요구되는 ( 좁은의미의슈퍼컴퓨터 ) 영역에서는큰효과를보지못하고있다. 정형화된 (Atomic) 데이터를중심으로하는경우에는일반적인 RDBMS를이용하는것이효율적이다. ( 대신구조화된데이터를대상으로개발된 Hadoop 프로젝트로서 HBase가있다. 이에대해서는뒤에논의 ) 결론적으로볼때 데이터중심의작업특히 dataset의변환을하는경우에는두단계로나누어작업하는것이효과적이다. CPU-bound 작업의경우분산컴퓨팅의이점을크게발견하기힘들다. RDBMS나 DW의역할은여전히남이있다. 그러나데이터중심의작업에서는 Hadoop이탁월하다.
5. Hadoop 과관련된프로젝트 (Hadoop 생태계 ) Hadoop 이외에도관련된프로젝트를다음과같이구별해볼수있다. Hadoop Core: HDFS와 MapReduce로구성된 Hadoop솔루션 Hadoop 프로젝트 : Apache 프로젝트중 Hadoop과직접관련된프로젝트. 여기에는 Hadoop외에 Pig, HBase, ZooKeeper, Chukwa를포함한많은프로젝트가있다. ( 계속늘어나고있다.) 본장의이하에서는각각의프로젝트에대해간략히살펴보되데이터마이닝과기계학습을위한 Mahout 에대해서는장을달리하여보다세부설명을한다. 16 (1) Pig Pig 는 Hadoop 에서의프로그래밍을보다쉽게만들어주는데이터흐름중심의고급 (High-level) 프로그래밍언어이다. 17 내부적으로는 Pig 가병렬처리를통해 Hadoop 의확장성, 신뢰성등고유한특성을그대로유지하며 MapReduce 의최적화도자동으로수행해서성능을유지시켜준다. Pig 는크게 2 가지요소로구성된다. 고급의데이터처리언어인 Pig Latin 16 IV. 빅데이터의적용실례 1. Mahout 를이용한빅데이터분석을볼것. 17 SQL 이최종출력결과를중심으로사고하게되어있는반면 Pig 는알고리즘중심의데이터흐름에주안점을둔다. 즉, Hadoop 이절차와알고리즘중심인반면 SQL 은선언적 (Declarative) 방식으로결과를출력시키는것이다.
Pig Latin 으로만들어진스크립트프로그램을컴파일하고수행해주는제반환경 PIG Latin 언어의예 (WordCount) A = load './input.txt'; B = foreach A generate flatten(tokenize((chararray)$0)) as word; C = group B by word; D = foreach C generate COUNT(B), group; store D into './wordcount'; Pig 에서는데이터변환작업을 Pig Latin 으로작성하게되는데여기에는 filter, union, group, join 등이 모두포함된다. Pig Latin 을수행하는방법에는다음의 3 가지가있다. 대화형명령어방식 (grunt shell) Pig Latin 을이용하여스크립트프로그램의작성 Java 프로그램에 Pig Latin 을내장하여사용 Pig Latin 은상당히직관적명령어이고 SQL 질의어와도상당히유사하다. 간단한예를살펴본다. log = LOAD excite-small.log AS (user, time, query); grp = GROUP log BY user; counter = FOREACH grp GENERATE group, COUNT(log); DUMP counter; 위예에서는 excite-small.log 파일을적재한후사용자별로분류하고각사용자별출현빈도를출력하였다. Pig 는이처럼데이터흐름, 즉, 데이터처리알고리즘구현을중심으로한독자적인명령어체계를가지고있을뿐아니라자체의데이터타입 18 도가지며필요시사용자가별도로함수를작성하여 (UDF: user-defined functions) 사용할수도있다. (2) Hive Hive 는 hadoop 에기반한데이터웨어하우스이다. Hive 는 DW 계층이 Hadoop 의 18 Pig 라는이름은돼지가무엇이든잘먹는다는의미에서라고한다. 어떤데이터이든입력받아서부지런히처리할수있는 효율적인언어라는뜻으로해석할수있겠다.
구성모델 (HDFS 및 MapReduce) 위에탑재된형태이므로빅데이터처리에최적화되어있다. 즉, 실행면에서 MapReduce 를, 데이터저장장치의측면에서는 HDFS 에기반하면서각종의질의어를제공한다. Hive 의가장큰설계원칙은대규모이면서도 rich data type 의구조화된데이터를관리하는것이다. 19 Hadoop 초기부터기존데이터의방대함과이미많은사람이익혔던관계형 DBMS 에서의 SQL 을어떻게할것인가를두고많은논의가있었다. 그리고그결론이일종의 SQL 파생어 (dialect) 로서의 HQL 과그하부 DBMS 로서의 Hive 이다. Hive 는개발자에게편리하며 HQL 은분석가가쉽게이해할수있고또한기존데이터를이식하는것도편리하다. 즉, Hive 의 HiveQL 은 SQL 을사용해본사람이라면누구나쉽게 mapper 와 reducer 를쉽게이용할수있도록되어있다. 기본적인 SQL 문 (Select From, Join, Group By, union all 등 ) 과 Join 명령 (Equi-Join, Multi-Table Insert, Multi-Group-By) 은물론 multitable query 등을포함하는 SQL primitive 를제공한다. 나아가 HiveQL 를확장하여사용자가작성하는 aggregation, column 변환, embedded MapReduce script 등도손쉽게이용할수있다. 외부인터페이스는명령어방식이중심이지만필요하다면브라우저를통한 WebGUI 도가능하며외부 DB 를 JDBC 로액세스할수도있다. 위그림에서보듯이사용자가 ( 명령어줄, Web 화면, JDBC/ODBC 등의외부데이터베이스등을통해 ) Hive 에명령을주면 Hive 는이를해석 (parsing) 한후메타스토어 (metastore) 를참조하여실행계획을수립하고최종적으로이를수행한다. Hive 와함께배포되는것이몇가지있는데 CLI, HWI (Hive Web Interface), JDBC/ODBC, Thrift 서버등이다. 모든명령어는드라이버로전달되고여기서해당명령을 MapReduce job 과함께수행한다. 20 19 원래는 Facebook 에서사용자및 log 데이터를분석하려는목적에서시작되었는데 Ad hoc 분석, 데이터마이닝, 요약분석, Spam 탐지등에성공적으로활용되었고이후이를오픈소스화하였다. 20 Hive 는다수의사용자가이용할수있도록메타데이터를자체의관계형데이터베이스에저장하고관리한다. 사실 Hadoop 은 Apache 프로젝트로서관계형 DBMS 인 Derby 를이용하는데실제 Hive 의 Metastore 는 metastore_db 라는이름의데이터베이스로관리된다.
Hive 는수행전과정을통해 JobTracker 와통신하고데이터는 HDFS 에서관리하지만테이블 schema 와기타의시스템메타데이터는 MetaStore 라는별도의 RDB 에저장하고관리한다. Hive 는다음의 3 가지데이터구성요소를가진다. Table: Hive 은 RDBMS 에서의 table 과마찬가지로 row 와 column 으로구성된다. 그러나 Hive 는기본적으로 Hadoop HDFS 를이용하고단지필요시다른파일시스템에존재하는 table 을가져올수있도록되어있다. Partition: Hive 에서의 table 은하나또는여러개의 partition 을지원한다. 이들 partition 은하위파일시스템에서의디렉토리에대응된다. 예를들어 autos 라는이름의 table 이있다면 key 값이 12345 이고그제조업자가 Hyundai 라면해당 partition 에대한경로는 /hivewh/autos/kv=12345/hyundai 가된다. Bucket: 데이터는여러개의 bucket 으로구성된다. Bucket 은 partition 디렉토리내의파일로서저장된다. 이때 bucket 은테이블내의 column 에대한 hash 값에의거하여관리된다. 앞서예에서 Hyundai 라는이름의 bucket 을가지게될것이고여기에관련자동차의속성이간직될것이다 Hive 의 metadata 는 metastore 라는외부에별도저장된다. metastore 는 column type, owner, key 및해당데이터값과같은 Hive 의 schema 세부사항을간직하는관계형데이터베이스로서 catalog 데이터를 Hadoop 전체프레임워크내의다른메타데이터서비스와동기화시켜주는역할을한다. 앞서 MapReduce 설명시예제프로그램으로서 WordCount 예제를살펴볼때 MapReduce 로구현한우리의프로그램은약 50 여줄을차지하였다. 이제이를 Hive 의질의어인 HQL 로수행한다면그명령은다음과같다. 질의어형태로단 7 줄의 HQL 로가능해졌다. CREATE TABLE docs (line STRING); LOAD DATA INPATH 'docs' OVERWRITE INTO TABLE docs; CREATE TABLE word_count AS SELECT word, count(1) AS count FROM (SELECT explode(split(line, '\s')) AS word FROM docs) w GROUP BY word ORDER BY word; 또다른예를통해 Hive 의사용법을보기로한다. Hive> CREATE TABLE customer (cust_no INT, order_no INT) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY, > STORED AS TEXTFILE; Ok Time taken: 0.12 seconds
이처럼 SQL 문과유사한다양한명령어가제공된다. 한편 Hive 의데이터모델은일반데이터베이스와다른특징을가진다. 즉, 각각의테이블은디렉토리로서관리된다. 즉, 위의 customer 라는이름의테이블이있다면이는디폴트디렉토리인 /user/hive/warehouse/customer 라는형식으로저장된다. 뿐만아니라관계형 DB 에서 column 별로 index 를설정하는것과는달리 Hive 에서는 column 을 partition 별로나누어서관리한다. 위의 customer 테이블이 last_name 과 register_date 라는 2 개의 partition column 으로구성되어있다면 Hive 의디렉토리구조는다음과같이된다. /user/hive/warehouse/customer/last_name=kim/register_date=201201 /user/hive/warehouse/customer/last_name=kim/register_date=201202 /user/hive/warehouse/customer/last_name=lee/register_date=201201 /user/hive/warehouse/customer/last_name=lee/register_date=201202 Partitioning 은이외에도매우다양한방법으로 Hive 에서활용하고있다. 또한 bucket 이라는 hashing 기반의데이터구분법도이용함으로써방대한데이터의경우에도 full scan 하지않도록함으로써성능을극대화시킨다. 한편 Hive 는엄밀한의미에서의데이터베이스라고할수없다는견해도있는데이는 Hadoop 과 HDFS 가가진설계사상에따른어쩔수없는특징이다. 우선 Hive는 record 단위로수정, 추가, 삭제를할수없다. 또한 Hadoop이 batch 처리중심이다보니 Hive에대한질의시응답시간 (latency time) 이길어지게되었다. 이들각각의질의에대해 MapReduce 처리를하다보니생기는오버헤드때문이다. Hive에서는 transaction 처리를지원하지않는다. 따라서 OLTP에적용이어렵고 OLAP에의이용이중심이된다. 요컨대 Hive 는데이터웨어하우스업무즉, 변경의필요가크지않고즉각적응답이필요하지않은정적데이터분석시에적합하다하겠다. 다만일반 DW 와달리 batch 처리중심이므로 Hive 는즉각적결과도출보다는수분 ~ 몇시간정도소요되더라도보다복잡한분석을하는경우에유용하다. 대신 Hadoop 기반이어서일반의 DW 보다훨씬더확정성이크고매우견고한시스템이라하겠다. (3) HBase HBase 는 Google 의 BigTable 을참조로개발된 Hadoop 의 HDFS 위에서실행되는비관계형분산데이터베이스이다. HBase 과 Hadoop 은상호보완적이다. 우선 Hadoop 의장점을그대로이용한다. 장애극복 (Fault tolerance) 확장성 MapReduce를이용한 Batch처리
한편 HDFS 가가진 random read/write 가되지않는다는단점을극복한다. 즉, 저장기능으로서의 HDFS 를이용하면서도 log 기록기능을구현하고이를 HDFS 파일에결합 (merge) 하는방식으로 random read/write 기능을보완한다. HBase 에있는테이블은 Hadoop 의 MapReduce 작업에대한입력또는출력항목이될수있다. 또한 Java API 또는 REST, Avro 및 Thrift gateway API 를통해 HBase 를접속할수도있다. HBase 자체의장점을요약하면다음과같다. 데이터크기가 Giga바이트 ~Peta바이트급에이르는데이터저장 매우높은 write throughput cache 용량이대폭증가 노드추가시 cache 크기가커진다. Data layout - key lookup에서뛰어난효과, sparse columns에대해서도특별한성능손실이없다. 이해를명확히하기위해 HBase 와 RDBMS 를비교하면다음과같다.
RDBMS HBase 사용모드대화형및 Batch 방식 Batch 방식중심 데이터형태 (layout) Row 중심 Column 중심 거래처리 (transactions) 처리에적합 단일 row 중심거래처리 ( 현재 ) 질의언어 (Query) SQL get/put/scan 등의명령어 Index 임의의 column 에 적용 Row-key 에만적용 Schema 정적인 Schema 동적인 Schema Update 여부보안이용가능한최대데이터크기 Read/Write 의 throuput 한계치 Read & write many times Authentication/ Authorization Terabyte 급수천개 query 처리 / 초 Write once, read many times 별도의보안기능없음 Petabyte 급 ( 선형 (linear) 확장이가능하다 ) 수백만개 query 처리 / 초 HBase 는현재많은오픈소스프로젝트와 yahoo!, Adobe 를비롯한수많은기관에서사용하고있다. 21 SQL 에서의 Join SQL: INSERT INTO TABLE pv_users SELECT pv.pageid, u.age FROM page_view pv JOIN user u ON (pv.userid = u.userid) 21 마찬가지로 BigTable 도 2004 년 Google 에서개발된이래로 Google Maps, Gmail, YouTube 등에서활발하게사용되고 있다.
MapReduce 에서의 Join (4) ZooKeeper 개요 ZooKeeper 는 Google 의 Chubby 를참조하여개발한분산시스템자원조정 (coordination) 서비스로서설정정보, naming 등에대한분산동기화및그룹서비스를제공한다. 당초 Yahoo! 에서 ZooKeeper 를개발하게된동기는다음과같다. (1) 우선 multithreaded 프로그램에서수많은 thread 를관리하고조정하기위해 lock 이필요하다. (2) 또한시스템의규모가커지면 lock 을분산환경에서관리하도록할필요가있다. 특히 scale-out 환경에서다음기능이중요해진다. 분산 lock 분산 queue Group membership Master 의선출 분산 configuration 기타의기능 : 여기서기타의기능이란다음기능을포함한다. 작업을순서에따라진행하도록관리한다. 분산환경에서일부지체되는 task 가있을수있기때문이다.
Update 사항이 atomic 하도록할것. 즉, 성공또는실패중하나만있을뿐부분수정상태는있을수없다. ( either succeed completely or fail completely ) 또한일단진행된변경사항은지속될것. 즉, 기기장애등의경우에도기발생된변경사항은지속 (persist) 될것 ZooKeeper의데이터모델과 znode 계층적파일시스템구조를가지며계층구조 (tree) 내에서의 Node 를 znode 라고부른다. 이러한 znode 는데이터를가지기도하고다른 znode 를포함하기도한다. 또한모든 Znode 는일정한경로상에위치하고 UNIX 스타일의경로정보를가진다. ( 예 : /rock/punk) 한편 znode 의종류는다음과같으며이는 znode 생성시에지정된다. Persistent 형 - 별도로삭제하지않는한지속된다. Ephemeral 형 - 자신을생성한 client 의 session 에따라존속유무가결정된다. 순차적 Znode: Znode 는선택적으로 sequence 번호를가질수있다. ( 예 : foo- 0000000001) 이러한 sequence 번호를통해순서를매길수있게된다. ZooKeeper의주요 API Node 생성관련 node 존재여부의검사 / Access the node node 삭제관련 Get / set children 및 Get / set data 상태의동기화, 권한관리 (ACL) 등 ZooKeeper의 2가지모드 Standalone 모드 - 하나의 ZooKeeper 데몬이수행됨 Clustered 모드 여러서버가존재하며하나는 leader 가되고나머지서버가 follower 가된다. Follower 들은 read 요청만처리한다. 설치와운용 zookeeper.apache.org 에서설치파일을가져온후 tar 로설치한다. $ tar -zxvf zookeeper-3.4.3.tar.gz $ cd zookeeper-3.4.3 $ export PATH=$PATH:`pwd`/bin 또다른설치방법으로는미리패키지로만들어진것을이용하는방법이다. 즉, CDH (Cloudera s Distribution) 에도포함되어있으므로 yum 또는 apt-get 등을이용할수도있다.
설치후에는다음처럼환경을설정하거나설정사항을변경할수있다. # NOTE: we re in the zookeeper-3.4.3 directory $ cp conf/zoo_sample.cfg conf/zoo.cfg $ vi conf/zoo.cfg 여기서.cfg 파일의중요한설정파라미터는다음과같다. ticktime: ZooKeeper 에서의 time 단위 datadir: 데이터가저장된로컬파일시스템 clientport: 클라이언트가접속하는 TCP port 설치후 ZooKeeper 수행방법은 tarball 로설치한경우다음과같다. $ zkserver.sh start 이와함께프로그램내에서사용하기위해서는다른라이브러리와마찬가지로 ZooKeeper JAR 를 프로젝트에포함시킨후 API 를이용한다. (5) Sqoop Apache Sqoop 는데이터베이스와 Apache 사이에서 bulk data 를 ( 즉, 데이터를통쨰로 ) 전달하는 도구로서일종의교량역할을한다. Sqoop 는명령어방식으로사용하며일부데이터만적재할수도있고 ( 즉, incremental load) 또는 Hive 또는 HBase 에서 SQL 대신사용하여테이블을가져올수도있다. sqoop 의이용하는데에는다음방식중선택하여사용할수있다. sqoop 명령어를명령어줄에서직접실행 sqoop job 에등록하여실행 $ sqoop connect jdbc:mysql://database.example.com/users \ - uername openwith password net123 all-tables \ -warehouse-dir/common/warehouse 다음은 Oracle 에서데이터를가져오는또다른예이다. Sqoop import --connect jdbc:oracle:thin:@//dbserver:1521/masterdb
--username hr --table emp --where start_date > 01-01-2012 Sqoop import jdbc:oracle:thin:@//dbserver:1521/masterdb --username myuser --table shops --split-by shop_id --num-mappers 16 (6) Flume 서버에서서비스를실행할때많은로그파일이생성된다. 과거이러한로그데이터의크기로인해이를 무시하는일이많았으나이제는그렇지않다. Hadoop 과같은빅데이터기술을이용하면이를통해 시스템의건강을확인할수도있고동시에성능개선을위해유용하기때문이다. Flume 은대용량 log 데이터를수집, 운반, 통합하는분산형서비스 (distributed data collection service) 이다. Fault Tolerant 기능을갖추고있으며다양한 Fail-over 및 recovery 기능을제공한다. 원칙적으로중앙집중형운영을하면서도동적관리방식을이용할수있게해주며이와관련된 API 를제공해서손쉽게이를확장할수있게해준다. Flume 의모델은다음과같은요소로구성된다. Agent: 데이터를가져온다 Processor (optional): 필요한중간처리를한다. Collector: 데이터를저장장치에기록한다
Flume source 는웹서버와같은외부소스에서발생시킨이벤트에의한데이터파일 ( 이를 event 라 부른다 ) 을받아들이는역할을한다. sink 는 event 를 channel 에서제거하고이를 HDFS 같은외부 저장소에기록하거나 Flume 소스에전달한다. (7) 기능별주요 Hadoop 관련프로젝트 작업종류솔루션 ( 오픈소스 ) 비고 데이터의수집 Flume, Scribe, Chukwa 소스데이터를가져옴. 데이터의저장 NoSQL (HBase, Cassandra, MongoDB), Data Warehouse 데이터저장을통한 persistency 확보 실시간분석 Storm 일종의 real-time Hadoop 워크플로우 Oozie Job workflow engine 빅데이터분산처리 Hadoop (HDFS, MapReduce) 분산처리프레임워크 분석및통계 Mahout, R 분석모델링및엔진 관리및모니터링 ZooKeeper, Hue 클러스터관리 / 자원관리 Serialization Avro, Thrift 객체데이터및데이터전송처리 끝으로다음그림에 Hadoop 관련프로젝트가일반적작업순서에의거하여나열되어있다. 22 22 원래의그림은 Prashanth Babu 이작성한것으로알려짐 (http://nosql.mypopescu.com/post/17715359030/hadoopecosystem-map 에서재인용 )