맵리듀스 하둡실행
HDFS 맵리듀스
HDFS 작동방식 FileInputFormat subclass 를이용 Hadoop 은자동으로 HDFS 내의파일경로로부터데이터를입력 블록지역성을최대한활용하는방식 작업을클러스터에배분한다.
JAVA 기반 HDFS1 hello.txt 라는이름의파일을생성 메시지를기록한 기록된파일읽어 화면에출력 해당파일이이미존재하는경우삭제한후작업 1: import java.io.file; 2: import java.io.ioexception; 3: 4: import org.apache.hadoop.conf.configuration; 5: import org.apache.hadoop.fs.filesystem; 6: import org.apache.hadoop.fs.fsdatainputstream; 7: import org.apache.hadoop.fs.fsdataoutputstream; 8: import org.apache.hadoop.fs.path; 9: 10: public class HDFSHelloWorld { 11: 12: public static final String thefilename = "hello.txt"; 13: public static final String message = "Hello, world!\n"; 14: 15: public static void main (String [] args) throws IOException { 16: //abstract FileSystem object 에대한 handle 을획득 17: Configuration conf = new Configuration(); 18: FileSystem fs = FileSystem.get(conf); 19://Configuration object 는 default 의파라미터를이용 20: Path filenamepath = new Path(theFilename);
JAVA 기반 HDFS2 fs.create() 호출을통해 FSDataOutputStream 를반환받고이용 이후정보를일반 stream write 함수를이용해서기록 파일을다시열어서읽고이를 UTF-8 encoded 문자열로변환한후화면에출력 21: 22: try { 23: if (fs.exists(filenamepath)) { 24: // remove the file first 25: fs.delete(filenamepath); 26: } 27:// 파일이있는지여부확인 28: FSDataOutputStream out = fs.create(filenamepath); 29: out.writeutf(message); 30: out.close(); 31:// 32: FSDataInputStream in = fs.open(filenamepath); 33: String messagein = in.readutf(); 34: System.out.print(messageIn); 35: in.close(); 46: } catch (IOException ioe) { 47: System.err.println("IOException during operation: " + ioe.tostring()); 48: System.exit(1); 49: } 40: } 41: }
맵리듀스 맵리듀스
개념 대량배치병렬처리 모든데이터항목 (data elements) 은변경불가능 (immutable) 리스트처리 데이터항목의리스트 (list) 를출력데이터항목의리스트 (list) 로변형 (transform) 시키는일
단계 매핑 (List 를상호연결 Mapping Lists) 원시데이터로부터필요한데이터만적용및가공 데이터만필요하기때문에대소문자를구별하지않기등 리듀싱 ( 리스트축약 Reducing Lists) 각값들을총괄 Reducer 함수는키값을중심으로출력값을반환 종합하기 ( 키값중심종합 ) 수많은값의 list 를하나의값으로변환 변환된키를중심으로종합
구조 맵리듀스
프로그램구조 구조 Mapper Reducer Main 처리 주요컴포넌트 InputFormat: 입력포맷정의 Maper: Map 처리정의 Reducer:Reduce 처리정의 OutputFormat: 출력포맷정의 RecordReadear: 레코드 ( 줄 ) 단위로읽어서 Map 으로전달 RecordWriter: 잡의출력을받아출력 OutCommitter:MapReduce 잡의완료처리
InputFormat 특징 입력파일이분할 (split) 되는방식이나읽어들이는방식을정의하는 class 입력으로사용될파일또는기타의 object 를선택라인별레코드화 파일을 task 로분해할 InputSplits 을정의 파일을읽어들이는 RecordReader 를생성하는 factory 를제공 사용 입력파일에어떤 InputFormat 을적용할지에대해서는 job 을정의하는 JobConf object 의 setinputformat() method 를호출하는방식으로정의 InputFormat: 설명 Key: Value: TextInputFormat Default 포맷 ; 텍스트파일의각 line 을읽어들인다. 각 line 의 byte offse t 각 line 의내용 KeyValueInputFor mat 각 line 을 key, val p air 로 parse 한다. 첫째 tab 문자까지의모든내용 line 의나머지내용 SequenceFileInput Format Hadoop 고유의고성능바이너리포맷 사용자정의 사용자정의
InputSplit 특징 Job 을부분으로쪼개는역할 Map task 는한파일의전체를읽거나일부분만을읽을수있음 디폴트상태에서 FileInputFormat 과하위 class 는파일을 64 MB 단위의 chunk (HDFS 에서의블록크기와동일 ) 로분할 이값은 hadoop-site.xml 에서 mapred.min.split.size 파라미터를이용 일을 chunk 단위로처리하면하나의파일에대해여러개의 map task 를병렬적으로수행가능 파일이매우큰경우이러한 parallelism 을통해성능을획기적으로높일수도있음 더욱중요한것은파일을구성하는다양한블록을클러스터내의여러 node 에분배할수있다는것
RecordReader 특징 InputSplit 을통해일의단위가지정이를액세스하는방법 source 에서실제로적재한후이를 Mapper 가읽기에수월한 (key, value) pair 로변환하는일 RecordReader class 가담당 RecordReader instance 는 InputFormat 에의해정의 디폴트의 InputFormat 인 TextInputFormat 은 LineRecordReader 를제공 입력파일의각각의 line 을새로운값 (value) 로취급 line 별 key 는파일에서의 byte offset InputSplit 전체가완료될때까지 RecordReader 는계속호출 (invoke) RecordReader 가호출되면 Mapper 의 map() method 역시호출된다.
Mapper 특징 MapReduce 프로그램의맨처음사용자가정의한작업을수행 Key 와 value 가주어지면 map() method 는 (key, value) pair(s) 를 Reducer 에게전달 새로운 Mapper instance 는각각의 map task (InputSplit) 에생성 Map task 는전체적으로 job input 을구성 각각의 mapper 간은독립적으로작동서로영향을주지않아 map task 의신뢰성이로컬기기의신뢰성에전적으로좌우 map() method 는 key 와 value 이외에 2 개의파라미터를전달받음 구성 OutputCollector object Reporter object
Partition & Shuffle 특징 첫번째 map task 가종료한후에도각 node 들은여러개의다른 map task 들을수행가능 map task 로부터의중간산출물을이를필요로하는 reducer 에게로전달하기시작 map 의산출물을 reducer 에게로옮기는것을 shuffling 이라고함 중간단계 key space 의일부가각각의 reduce node 에할당 이들 subset (partition) 들은 reduce task 에게입력 map task 는그어떤 partition 에도 (key, value) pair 를전달 하나의 key 에대한모든값은항상그 origin 이어떤 mapper 였든상관없이병합됨 중간산출데이터의각항목을어디로보낼지에대해 map node 는의견일치를보아야 Partitioner class 는주어진 (key, value) pair 가어떤 partition 으로갈지를결정 디폴트의 partitioner 는 key 에대한 hash 값을계산한후그결과에따라 partition 을할당
Reducer Reducer instance 는사용자가어플의 instance 로서 job 의 2 번째단계 Reducer 에게할당된 partition 에서각각의 key 에대해 Reducer 의 reduce() method 는단한번호출 이를통해 key 에연결된모든 value 에대한 iterator 와 key 를회수 iterator 에의해하나의 key 와관련된 value 들이반환될때그순서는무작위 Reducer 는또한 OutputCollector 와 Reporter object 를파라미터의형식으로받게되는데이들은 map() method 에서와같은방식으로이용
OutputFormat OutputCollector 에게제공되는 (key, value) pair 는출력파일에기록실제기록되는방식은 OutputFormat 에의해결정 OutputFormat 은앞서의 InputFormat class 와같은방식으로동작 Hadoop 이제공하는 OutputFormat 의 instance 는로컬디스크또는 HDFS 상의파일에기록기록물모두일반적인 FileOutputFormat 에서상속각각의 Reducer 는각각의파일을일반적인출력디렉토리에기록 파일명은통상 part-nnnnn 라는이름을가진다. (nnnnn 는 reduce task 와관련된 partition id 이다.) 출력디렉토리는 FileOutputFormat.setOutputPath() method 에의해결정 OutputFormat: TextOutputFormat SequenceFileOutputFormat NullOutputFormat 설명 Default; line 을 key \t value 형태로기록한다 뒤에오는 MapReduce job 으로읽어들이기에적당한형태의바이너리파일로기록한다. 입력을무시한다
RecordWriter InputFormat 이실제로개별레코드를 RecordReader 실행을통해읽는것과마찬가지로 OutputFormat class 도 RecordWriter object 에대한 factory 역할 OutputFormat 에지정된대로. 개별레코드를파일에기록하는데이용 Reducer 에의해작성된출력파일은 HDFS 에남아있으므로다른 MapReduce job 또는별도의프로그램또는사용자의직접개입을통해이용가능
프로그래밍 맵리듀스
Mapper 알고리즘 워드카운트 매퍼 \bin\yarn jar share/hadoop/mapreduce/hadoop-mapreduceexamples-2.6.0.jar wordcount indata output 파일을돌면서 단어는 1 이라는가중치를준다 리듀서 모아진단어를돌면서 단어별숫자를더한다 mapper (filename, file-contents): for each word in file-contents: emit (word, 1) reducer (word, values): sum = 0 for each value in values: sum = sum + value emit (word, sum)
맵프로그래밍 JAVA /* Mapper 를다중상속받고 MapReduceBase 의상속을받아 MapClass 생성데이터형 LongWritable=long,Text=string,IntWritable=int, Mapper <Keyin valuein keyout valueout> */ Public static class MapClass extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(longwritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { // 문자화 String line = value.tostring(); StringTokenizer itr = new StringTokenizer(line); // 문자를돌면서단어당한개의키로매핑 while (itr.hasmoretokens()) { word.set(itr.nexttoken()); output.collect(word, one); }
리듀스 JAVA //Reducer 다중상속받아서 Reduce 클래스생성 public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { int sum = 0; // 돌면서키값당숫자증가시키기 while (values.hasnext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }
구동 JAVA Driver public void run(string inputpath, String outputpath) throws Exception { //Job 수행설정정보 JobConf conf = new JobConf(WordCount.class); conf.setjobname("wordcount"); // the keys are words (strings) conf.setoutputkeyclass(text.class); // the values are counts (ints) conf.setoutputvalueclass(intwritable.class); // 맵지정 conf.setmapperclass(mapclass.class); // 리듀스지정 conf.setreducerclass(reduce.class); FileInputFormat.addInputPath(conf, new Path(inputPath)); FileOutputFormat.setOutputPath(conf, new Path(outputPath)); JobClient.runJob(conf); }