빅데이터기술개요 2016/8/20 ~ 9/3 윤형기 (hky@openwith.net)
D2 http://www.openwith.net 2
Hadoop MR v1 과 v2 http://www.openwith.net 3
Hadoop1 MR Daemons http://www.openwith.net 4
필요성 Feature Multi-tenancy Cluster Utilization Scalability 기능 YARN allows multiple access engines to use Hadoop as the common standard for batch, interactive and real-time engines that can simultaneously access the same data set. Multi-tenant data processing improves an enterprise s return on its Hadoop investments. Dynamic allocation of cluster resources를통해 MR 작업향상 Scheduling 기능개선으로확장성강화 (thousands of nodes managing PB s of data). http://www.openwith.net 5
Hadoop 1 Limitations Scalability NameNode 가취약점 Re-startability 낮은 Resource Utilization MR 에한정 Lack of wire-compatible protocols Max cluster size 4,000 nodes Max. concurrent tasks 40,000 Coarse sync in Job tracker Failure kills all queued and running jobs Restart is very tricky due to complex state Hard partition of resources into map and reduce slots Doesn t support other programs Iterative applications implementations are 10x slower Client and cluster must be of same version Applications and workflows cannot migrate to different clusters http://www.openwith.net 6
Hadoop 2 Design concept job Tracker 의기능을 2 개 function 으로분리 cluster resource management Application life-cycle management MR becomes user library, or one of the application residing in Hadoop http://www.openwith.net 7
MRv2 진행경과 http://www.openwith.net 8
MRv1 vs. MRv2 http://www.openwith.net 9
작업방식 개요 JobTracker/TaskTracker 의기능을세분화 a global ResourceManager a per-application ApplicationMaster a per-node slave NodeManager a per-application Container running on a NodeManager ResourceManager 와 NodeManager 가새로도입 ResourceManager ResourceManager 가 application 간의자원요청을관리 (arbitrates resources among applications) ResourceManager 의 scheduler 를통해 resource allocation to applications ApplicationMaster = a framework-specific entity 로서필요한 resource container 를 scheduler 로부터할당받음 ResourceManager 와협의한후 NodeManager(s) 를통해 component tasks 를수행 Also, tracks status & monitors progress NodeManager = per-machine slave, is responsible for launching the applications containers, monitoring their resource usage (cpu, memory, disk, network) and reporting the same to the ResourceManager. http://www.openwith.net 10
Hadoop 프로그래밍
Hadoop 1.0 2.0 Hadoop 1.0 HDFS MR Hadoop 2.0 = Hadoop v1.0 + HDFS HA support of HDFS NameNode through/with ZooKeeper for failure detection & active NameNode election HDFS Federation HDFS snapshot Heterogeneous Storage hierarchy support In-memory data cashing YARN
Hadoop 2.0 YARN = central resource scheduler = ResourceManager + NodeManager + container (= a unit of resource allocation) JobTracker 에서분화» Cluster management & Job scheduling RM» Job coordination Application Master (; This shifting of allocation coordination responsibilities reduces the burden on the RM)» + new JobHistoryServer
Hadoop 1.0
Hadoop 2.0 과 YARN 출처 : http://www.edureka.co/blog/introduction-tohadoop-2-0-and-advantages-of-hadoop-2-0/
YARN
출처 : http://www.edureka.co/blog/introduction-tohadoop-2-0-and-advantages-of-hadoop-2-0/
YARN 의특징 (1) JobTracker 를 RM 과 ApplicationMaster 로분리 YARN cluster 마다 AM 이존재하고 cluster 내의각서버마다 NM 가존재 (2) 효율적인자원관리 각서버마다의 NM 들이 task 를실행하고필요한자원을과니하므로 Hadoop 1.0 에서와같은 Mapper, Reducer 의 slot 수와같은개념자체가없어졌다. H2.0 에서는 Mapper, Reducer 가모두 container 안에서동작하고 container 자체도전체 cluster 의 resource 상황과요청된 job 의 resource 요구에따라결정된다. (3) 확장성범위확대 기존 4,000 대 node, 40,000 개 task 의한계 - 이러한한계가극복됨 (4) 다양한분산처리환경지원 SPARK, HAMA, GIRAPH 등. 그밖에도 SAP, IBM, EMC 등이자사의솔루션과연동을추진
YARN 의구성요소 (1) RM ; cluster 마다존재하며 cluster 전반의자원관리와 task 들의 scheduling 담당. a. Scheduler b. Application Manager c. Resource Tracker (2) Node Manager ; 해당 container 의 resource 사용량을모니터링하고관련정보를 Resource Manager 에게알린다. a. Application Master = 하나의프로그램에대한 master 역할 b. Container ; 모든작업 (job) 은여러개의 task 로세분화며각 task 는하나의 container 안에서실행.
YARN 활용 :
MR 프로그래밍
Data Types
[ 실습 ] Streaming pipes www.gutenberg.org Hound of Baskerville input.txt mapper1.py $./mapper1.py < input.txt $./mapper2.py < input.txt $./mapper2.py < input.txt sort $./mapper2.py < input.txt sort./reducer2.py $./mapper3.py < input.txt sort./reducer2.py $./mapper3.py < input.txt sort./reducer3.py sort -r $./mapper3.py < input.txt sort./reducer3.py sort r head n 3
[ 실습 ] MapReduce 기초 MR and computational flows
[ 실습 ] MR for WordCount
[ 실습 ] MR for WordCount + Combiner 추가
import public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasmoretokens()) { word.set(itr.nexttoken()); context.write(word, one);
public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); result.set(sum); context.write(key, result);
public static void main(string[] args) throws Exception { Configuration conf = new Configuration(); String[] otherargs = new GenericOptionsParser(conf, args).getremainingargs(); if (otherargs.length!= 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); Job job = Job.getInstance(conf, "word count"); job.setjarbyclass(wordcount.class); job.setmapperclass(tokenizermapper.class); /**** To enable Combiner, uncomment! ****/ //job.setcombinerclass(intsumreducer.class); job.setreducerclass(intsumreducer.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true)? 0 : 1);
[ 실습 ] MR 활용 Analytics Web log 의평균, 최대, 최소파일크기를파악하는 Hadoop MR 프로그램 데이터 : weblog dataset from ftp://ita.ee.lbl.gov/traces/nasa_access_log_jul95.gz
public class MsgSizeAggregateMapReduce extends Configured implements Tool { public static void main(string[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new MsgSizeAggregateMapReduce(), args); System.exit(res); @Override public int run(string[] args) throws Exception { if (args.length!= 2) { System.err.println("Usage: <input_path> <output_path>"); System.exit(-1); /* input parameters */ String inputpath = args[0]; String outputpath = args[1]; Job job = Job.getInstance(getConf(), "WebLogMessageSizeAggregator"); job.setjarbyclass(msgsizeaggregatemapreduce.class); job.setmapperclass(amapper.class); job.setreducerclass(areducer.class); job.setnumreducetasks(1); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); FileInputFormat.setInputPaths(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); int exitstatus = job.waitforcompletion(true)? 0 : 1; return exitstatus; /* @author Srinath Perera (hemapani@apache.org) * @author Thilina Gunarathne (thilina@apache.org) */
public static class AMapper extends Mapper<Object, Text, Text, IntWritable> { public static final Pattern httplogpattern = Pattern.compile("([^\\s]+) - - \\[(.+)\\] \"([^\\s]+) (/[^\\s]*) HTTP/[^\\s]+\" [^\\s]+ ([0-9]+)"); public void map(object key, Text value, Context context) throws IOException, InterruptedException { Matcher matcher = httplogpattern.matcher(value.tostring()); if (matcher.matches()) { int size = Integer.parseInt(matcher.group(5)); context.write(new Text("msgSize"), new IntWritable(size));
public static class AReducer extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { double tot = 0; int count = 0; int min = Integer.MAX_VALUE; int max = 0; Iterator<IntWritable> iterator = values.iterator(); while (iterator.hasnext()) { int value = iterator.next().get(); tot = tot + value; count++; if (value < min) { min = value; if (value > max) { max = value; context.write(new Text("Mean"), new IntWritable((int) tot / count)); context.write(new Text("Max"), new IntWritable(max)); context.write(new Text("Min"), new IntWritable(min));
YARN 의문제점 Complexity Protocol are at very low level, very verbose Long running job 에적합치않음 Application doesn't survive Master crash No built-in communication between container and master Hard to debug http://www.openwith.net 37
Hadoop 의장단점과대응 Haddop 의장점 commodity h/w scale-out fault-tolerance flexibility by MR Hadoop 의단점 MR! Missing! - schema 와 optimizer, index, view,... 기존 tool 과의호환성결여 해결책 : Hive SQL to MR Compiler + Execution 엔진 Pluggable storage layer (SerDes) 미해결숙제 : Hive ANSI SQL, UDF,... MR Latency overhead 계속작업중...! http://www.openwith.net 38