4. Analytics 도구 (1) 상용분석도구 빅데이터기술이전부터다양한분석도구가상용으로제시되어왔다. 주요한것은다음과같다. MS Excel: SAS: SPSS Modeler: 원래는 SPSS Clementine이었으나 IBM 인수. Statistica: StatSoft사의개발제품. Mathematica 그밖에도 Salford Systems, KXEN, Angoss 등과수학분석에많이사용되는 MATLAB, Strata 등도많이사용된다. (2) 오픈소스 가. R 1 R 은원래 1970 년대의 S 라는통계프로그램에기반하여개발되었다. S 는 AT&T 에서개발되어 Insightful Corporation 라는회사에라이선스제공되었는데이를참조하여뉴질랜드의 Ihaka 등이 1990 년대에 R 을개발하였다. GPL 라이선스조건을가지는공개소프트웨어로서다음과같은영역에서장점을가진다. 데이터처리, 문자및수치계산등이효율적이다. 행렬식등처리와 hash 테이블, 정규식표현등기능이풍부하다. 데이터분석, 통계함수등이풍부하다. 개발초기부터그래픽기능이뛰어나다. 프로그래밍언어로동작이가능하다. 단, 올바른사용을위해 R 에대해다음을유의할필요가있다. R은데이터베이스는아니다. 단, DBMS에연결은가능하다. 원래명령어방식이다. 최근일부 GUI 가제시되고있다. (R Studio와 Java, Tcl/Tk 등 ) 1 R 의기본사용법에대해서는본서의부록을참조할것. 한편 R 의공식사이트는 : http://cran.r-project.org/
모든계산이메인메모리에서진행되므로메모리관리에유의할것. 스프레드쉬트로사용할수는없으나 Excel 등연계는가능하다. 무엇보다도 CRAN (Comprehensive R Archive Network) 에서패키지를고를수있고이를통해기능의확장이가능하다. 절차적프로그래밍언어로서일반적으로명령어방식을이용하지만 R Studio 가이용가능하고 Eclipse 에 plugin 방식으로 2 이용할수도있다. 나. WEKA WEKA 는마이닝알고리즘을모아놓은인기있는프로그램이다. 오픈소스로서 java 기반이며다음의곳에서다운로드받을수도있고많은기술문서도제공된다. http://www.cs.waikato.ac.nz/ml/weka/ 다. 기타의프로젝트 Storm & Kafka Storm 과 Kafka 는스트림처리기술로서많은기업에서활용하고있다. 이들은메모리기반의 (inmemory) 실시간의사결정지원시스템으로서 Hadoop 이가지는배치처리의한계를극복하게해준다. Storm 은 Twitter 에서개발한 분산형실시간연산시스템 으로서 Hadoop 에서 batch 처리를했던데반해이를실시간처리하게해준다. Kafka 는 LinkedIn 에서개발한메시징시스템으로서활동스트림 (activity stream) 과그배후의데이터처리파이프라인 (data processing pipeline) 의기반기술로작용한다. Drill & Dremel Drill 과 Dremel 은대규모, ad-hoc query 를하면서도시간지체가거의없어서특히데이터의탐색에유용하다. 초당 petabyte 규모의데이터를스캔할수도있어서 ad hoc query 는물론시각화작업에도유용하고현업분석자도많이사용하고있다. Drill 은 Google 의 Dremel 에해당하는오픈소스솔루션이다. (Google 은이미 BigQuery 라는이름으로 Dremel-as-a-Service 를제공하고있다 ). Drill 과 Dremel 모두 Hadoop 에친화적이어서 MapReduce 와의연계가쉬우며 Sawzall, Pig 그리고 Hive 에이르기까지 Hadoop 위에서동작할수있도록많은인터페이스가개발되어있다. 라. R 과 MapReduce 의통합 R 과 Hadoop 의통합은여러가지형태로시도되고이용되고있는데여기서는특히가장많이이용되는 3 가지를선정하여소개한다. 2 http://www.walware.de/goto/statet 2
R+Streaming Rhipe RHadoop 라이선스 GPL-2 와 GPL-3 의혼합이며이것이 Apache 2.0 라이선스조건인 Hadoop 에이통합됨 Apache.20 Apache 2.0 설치복잡성 용이함. 다소복잡. 비교적용이. 정도 R 패키지를각각의 Rhipe 를 R 과 R 을각각의 DataNode 에 함께 DataNode 에설치. 설치하지만 DataNode 에 단, RHadoop 은 패키지는 Yum 으로 설치. 단, 이를 R 이외일부 설치가능 위해서 Protocol 패키지를먼저 Buffer 를 설치해야하지만 build 해야함. ( 즉, dependency 있음 ) CRAN 에서 이용가능. 클라이언트 클라이언트통합 높은통합. 높은통합. 측에서의 R 과의통합 없음. Hadoop 명령어를 Rhipe 는 R 의 library 로서해당 RHadoop 그자체가 R 의 library 로서 통해 streaming 함수호출시 사용자는자신의 job 을수행하면서 MapReduce job map 과 reduce map 측과 수행을담당함. 함수를 R 로작성 reduce 측의 사용자로서는 script 를지정함. native 형태의 R map 과 reduce 함수를 R 로 작성하기만하면 Rhipe 가이들을 transport 하고 invoke 시켜줌. 사용된기술 Streaming (Streaming 이아니라 ) Java 로구현된자신의 map, reduce 를수행함. 이들 Java 함수는 RHadoop 이 Hadoop 과 streaming 위에서단순하면서도경량의 wrapper 로서동작. 3
R+Streaming Rhipe RHadoop map, reduce 따라서독자적인 입력을 Protocol MapReduce 코드를 Buffers encoded 가지지않으며 형태로 Rhipe C 간단한 wrapper R 실행파일에 스크립트를가짐. streaming 입력하며 Rhipe C 실행파일은 Streaming 에서 wrapper R 스크립트를 자체내장된 R 을 호출하며이것이 통해사용자의 다시사용자의 map, map, reduce 함수를수행 reduce 함수를호출 주로사용되는 MapReduce R 을떠나지 R 을떠나지않고도 경우 함수에대해정교한 않고도 R 과 R-MapReduce 를 조절작업이필요한 MapReduce 를 이용하고자할경우 경우 ( 예 : 이용하고자할 또한기존의 partitioning, 정렬 ) 경우 MapReduce 의 입출력관련 Format class 를이용하고자 하는경우 주의할점 기존 Protocol Buffers Unique key 에대한 R 스크립트에서 encoded reducer 값을모두 직접 invoke 데이터에 간직하기위해서 시키기가어려움 대해서는 많은메모리가필요. 독자적인형식의 값은 입출력포맷이 reducer 함수에 필요함. 스트림전달되지 않음 4
IV. 빅데이터분석의실제예 1. Mahout 3 를이용한군집분석 (1) 개요 가. Mahout 프로젝트의배경 Mahout 는 Apache 프로젝트의한분과로진행되고있는기계학습용 Java 라이브러리이다. 기계학습이란 4 요컨대 ' 대상데이터에대해컴퓨터가알아서분석할수있도록로직을구성하는것 ' 을말하는데그간이런기능이데이터마이닝솔루션들로구현되어이미활발히사용되고있었다. 그러다가최근 Hadoop 의 MapReduce 프레임워크활용을전제로한기계학습프로그램으로 Mahout 가개발되어인기를끌고있다. Mahout 는 Lucene 프로젝트가모태가되어시작되었다. 텍스트검색및텍스트처리프로젝트인 Lucene 에서텍스트마이닝관련된프로그램이꾸준히축적되었고 2008 년이를 Mahout 프로젝트로독립시키게된것이다. 그리고별도프로젝트로독립한초기에 Mahout 는 Taste 라는별도의기계학습용프로그램도흡수하여짧은시간내에도약의계기를맞이했다. 아래그림에서 Mahout 와관련된각종프로젝트가그림으로표현되어있다. 무엇보다중요한것은 Mahout 가데이터마이닝알고리즘몇가지를구현했다는것외에도이들을구현함에있어 Hadoop 의 MapReduce 프레임워크와결합을이루었다는것이다. 수학계산이많은마이닝은고성능컴퓨터의큰메모리와연산기능을요구하지만 Mahout 는최대한 MapReduce 기능을활용하여빅데이터분석이보다용이해졌다. 3 Mahout 는흔히 ' 머하우트 ' 라고발음한다. 4 앞서기계학습을 인공지능의한영역으로서사람이아닌컴퓨터가학습할수있도록알고리즘을연구및분석, 개발하는 기술 로정의한바있다. 5
나. Mahout 의설치 Mahout 도다른 Java 응용프로그램설치와마찬가지로 JVM 위에서동작하므로이에대한환경구축이우선이루어져야한다. 그리고이위에 Maven 의추가설치가필요하다. 이는 Mahout 가현재활발히개발진행중이어서 update 와 upgrade 가빈번하므로이들관리를자동화하기위한것이다. 즉, 라이브러리의존관계및컴파일관리를 Maven 라는 build 및 release 관리프로그램을통해실시한다. 또다른요건으로는 Mahout 는특히 Hadoop 의 MapReduce 분산처리기능의활용을전제로하므로 Mahout 의운영을위해서는 Hadoop 을설치하여야한다. 아울러프로그램의개발및수정을위한개발환경 5 을선택한다. 즉, 선택한 IDE 내에서 Mahout 프로젝트를생성하는방식으로 Mahout 를설치한다. 다. Mahout 가제공하는기능 Mahout 는그어떤프로젝트못지않게활발히개발진행중이며속속새로운기능들이추가되고있으므로해당프로젝트현황을홈페이지를통해확인할필요가있다. 2013 년 7 월현재 6 Mahout 는다음과같은기능을제공한다. Collaborative Filtering 추천서비스 예 : Amazon의구매이력분석에따른책추천. 군집화 (clustering) K-Means, Fuzzy K-Means Mean Shift clustering Dirichlet process clustering LDA (Latent Dirichlet Allocation) 빈발패턴마이닝 분류 (Classification) Naive Bayes 분류기 Random forest 의사결정트리분류기여기서는 Mahout 의군집화에대해서만살펴본다. 이외의기능에대해서는 Mahout 프로젝트사이트 (mahout.apache.org) 를참조. 5 Eclipse, NetBeans 등을개인의취향에따라선택. 6 version 0.7 6
(2) Mahout 에서의군집화기능 7 가. 첫시도 : 단순좌표데이터의군집화사례 군집화구현에서중요한것은무엇보다각항목간의유사성과이질성을어떻게판단하고이를어떻게프로그램에표현할것인가의문제라하겠다. 또한편, 앞서우리는여러가지군집화관련접근법을살펴본바있는데 Mahout 에서이용하는구체적알고리즘중여기서는특히대표적인알고리즘의하나인 k-means 를이용하기로한다. k-means fuzzy k-means canopy 기타단순한예를통해 Mahout 이용방법을살펴본다. 다음과같이 x-y 좌표계상의점으로표현되는 9 개의점 (point) 데이타가주어졌다고하자. 다음페이지의도표에해당 9 개점데이터가제시되었고오른편에서이를 x-y 좌표면에표시하였다. 이제이들 9 개점을 Mahout 를이용하여 2 개그룹으로군집화하려는것이여기서의작업내용이다. 우선이들각점데이터를프로그램에입력하기위해서는이진의벡터포맷을 SequenceFile 로표현한다. (x 좌표, y 좌표 ) (1,1) (2,1) (1,2) (2,2) (3,3) (8,8) (8,9) (9,8) (9,9) 7 Mahout 를이용한군집화사례에대한설명은 Sean Owen ( 외 ), Mahout In Action, 2012 의해당부분을인용및수정하였다. 7
흔히벡터 (vector) 란 ' 크기와방향을동시에나타내는물리량 ' 을뜻하지만 Mahout 에서는각데이터의수치정보를 ordered list 의형태로표현한것을의미한다. 특히단순한예제에서는이를 2 차원좌표상의점에대한데이터를 ordered list 로표현한것으로볼수있겠다. 먼저 k-means 알고리즘에필요한함수에다음파라미터를지정한다. SequenceFile: 입력할벡터정보와, 군집별중심에대한정보를초기화한다. 유사도평가에대한척도. 유사도평가로서의다양한거리개념에대해앞서살펴본바있다. EuclideanDistanceMeasure의경우 Euclide 거리를적용할때사용된다. convergencethreshold : 초기화된정보를출발점으로군집화작업이반복적으로진행되는데이러한반복작업을계속할지여부에대한판단기준을나타낸다. 입력데이터에대한 Vector 정보이들정보를가지고다음의순서로군집화작업을수행한다. Vectorizatoin 입력데이터를 vector 형태로변환 Vector 를해당위치에기록 입력데이터 군집화 (clustering) 작업을반복수행 출력디렉토리서작업결과를획득 중심부초기화 목표군집 (grouping) 의중심위치를지정 이제이작업을수행하는 Java 프로그램이다음과같이제시되어있다. public class MahoutClusterFirst { public static final double[][] points ={{1,1,{2,1,{1,2,{2,2,{3,3,{8,8,{9,8,{8,9,{9,9; public static void writepointstofile(list<vector> points, String filename, FileSystem fs, Configuration conf) throws IOException { Path path = new Path(fileName); SequenceFile.Writer writer = new SequenceFile.Writer(fs, conf, path, LongWritable.class, VectorWritable.class); long recnum = 0; VectorWritable vec = new VectorWritable(); for (Vector point : points) { vec.set(point); writer.append(new LongWritable(recNum++), vec); writer.close(); public static List<Vector> getpoints(double[][] raw) { List<Vector> points = new ArrayList<Vector>(); for (int i = 0; i < raw.length; i++) { double[] fr = raw[i]; 8
Vector vec = new RandomAccessSparseVector(fr.length); vec.assign(fr); points.add(vec); return points; public static void main(string args[]) throws Exception { int k = 2; List<Vector> vectors = getpoints(points); File testdata = new File("testdata"); if (!testdata.exists()) { testdata.mkdir(); testdata = new File("testdata/points"); if (!testdata.exists()) { testdata.mkdir(); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); writepointstofile(vectors,"testdata/points/file1",fs,conf); Path path = new Path("testdata/clusters/part-00000"); SequenceFile.Writer writer = new SequenceFile.Writer( fs, conf, path, Text.class, Cluster.class); for (int i = 0; i < k; i++) { Vector vec = vectors.get(i); Cluster cluster = new Cluster( vec, i, new EuclideanDistanceMeasure()); writer.append(new Text(cluster.getIdentifier()), cluster); writer.close(); KMeansDriver.run(conf, new Path("testdata/points"), new Path("testdata/clusters"), new Path("output"), new EuclideanDistanceMeasure(), 0.001, 10, true, false); SequenceFile.Reader reader = new SequenceFile.Reader(fs, new Path("output/" + Cluster.CLUSTERED_POINTS_DIR + "/part-m-00000"), conf); IntWritable key = new IntWritable(); WeightedVectorWritable value = new WeightedVectorWritable(); while (reader.next(key, value)) { System.out.println( value.tostring() + " belongs to cluster " + key.tostring()); reader.close(); 위프로그램에서는주어진데이터를 2 개의그룹으로나누기위한군집화작업의출발점으로 (1,1) 과 (2,1) 의 2 개점을지정하였는데그수행에따른산출물이다음에표시되어있다. 9
1.0: [1.000, 1.000] belongs to cluster 0 1.0: [2.000, 1.000] belongs to cluster 0 1.0: [1.000, 2.000] belongs to cluster 0 1.0: [2.000, 2.000] belongs to cluster 0 1.0: [3.000, 3.000] belongs to cluster 0 1.0: [8.000, 8.000] belongs to cluster 1 1.0: [9.000, 8.000] belongs to cluster 1 1.0: [8.000, 9.000] belongs to cluster 1 즉, 맨처음입력되었던 9 개의점들이좌표데이터에따라 cluster-0 와 cluster-1 의 2 개그룹으로 군집화되었음을알수있다. 앞서보았듯 8 거리개념에는여러가지가있어서상황에따라선택하게되는데 Mahout 라이브러리에서도 EucliideanDistanceMeasure 외에다음의다양한거리척도가제공되고있다. SquredEucliideanDistanceMeasure ManhattanDistanceMeasure CosineDistanceMeasure TanimotoDistanceMeasure WeightedDistanceMeasure 이들각각의내용은그이름으로쉽게눈치챌수있고각거리척도에대한설명이이책앞부분에있으므로주요거리척도적용을그림으로표현한것을제시하는것으로대신한다. 8 III. 분석기법 3. 주요분석기법 (5) 군집이론 ( 나 ) 구간척도 10
위그림중왼쪽에서 Euclid 거리와 Manhattan 거리계산을비교하고있다. 오른쪽에서는 cosine 각도에의한거리계산방식을보여주고있다. 한편우리가살펴본 9 개의점에대한좌표데이터에위의서로다른거리척도를적용한결과가다음과같다. 거리척도 반복작업 0 번군집에 1 번군집에 회수 속하는 vector 속하는 vector EucliideanDistanceMeasure 3 0,1,2,3,4 5,6,7,8 SquredEucliideanDistanceMeasure 5 0,1,2,3,4 5,6,7,8 ManhattanDistanceMeasure 3 0,1,2,3,4 5,6,7,8 CosineDistanceMeasure 1 1 0,2,3,4, 5,6,7,8 TanimotoDistanceMeasure 3 0,1,2,3,4 5,6,7,8 위결과를보면각각반복회수도다르고결과또한차이가있음을알수있다. 이는각거리척도의절대적 우열때문이라기보다적용분야에따라고유한특성을가지는것으로볼수있을것이다. 나. 둘째시도 : 이미지패턴의군집화사례 실제의군집화작업에서는앞서좌표데이터와같이입력데이터가단순한형태로주어지지않으므로분석대상이되는객체의데이터를올바르게 vector 데이터로변환하는작업이선행되어야한다. Mahout 에서는다음 3 가지 class 를통해 vector 데이터를표현한다. DenseVector: vector 데이터를 double 데이터타입의배열의형태로표현하는 class이다. RandomAccessSparseVector: HashMap으로구현된 random access 용의 sparse vector이다. SequentialAccessSparseVector: 순차작업용 vector이다. 이들중어떤클래스를이용해서분석대상이되는객체데이터를 vector 로표현할것인가는적용알고리즘에따라달라진다. Mahout 에서는 vector 를 Java 의 interface 형태로제공하고있는데특히관련 vector 데이터중빈항목이많은경우 - 즉, sparse 인경우 - 이를 sparse vector 라하고대부분의 11
vector 데이터가실데이터로채워진경우를 dense vector 라고한다. Mahout 에는이밖에도매우다양한 vector 연산을할수있도록 class 지정이되어있다. 이제또하나의예제로서사과를분류하는예를살펴본다. 사과를분류할때는우선크기, 색깔, 무게등의어떤분류기준을적용할것인지정해야한다. 물론측정만가능하다면그어떤것이든분류기준으로삼을수있고이들여러가지를가중치적용하여종합적판단도가능하다. 편의상우리는다음과같이아래표와같이무게 (Kg 단위 ), 색깔 ( 녹색, 빨강, 노랑 ), 크기 (Small, Large, Medium) 가제각각다른 5 개의사과가주어졌다고하자. 사과무게 (Kg) 색깔 (RGB) 크기 Vector 분석대상 0 차원 1 차원 2 차원데이터표현 Small,round, green 0.11 510 1 [0.11, 510,1] Large, oval, red 0.23 650 3 [0.23, 650,3] Small,elongated, red 0.09 630 1 [0.09,630,1] Large, round, yellow 0.25 590 3 [0.25,590,3] Medium, Oval, green 0.18 520 2 [0.18,520,2] Mahout 에서이들데이터의 vector 표현은이들속성데이터를각각하나의차원으로표현하는데위표의 제일마지막열에표시되어있다. 이제사과를분류하기위한 vector 생성프로그램은다음과같다. public class MahoutClsterSecond { public class MahoutClusterFirst { public static final double[][] points = {{1, 1, {2, 1, {1, 2, {2, 2, {3, 3, {8, 8, {9, 8, {8, 9, {9, 9; public static void writepointstofile(list<vector> points, String filename, FileSystem fs, Configuration conf) throws IOException { Path path = new Path(fileName); SequenceFile.Writer writer =new SequenceFile.Writer(fs, conf, path, LongWritable.class, VectorWritable.class); long recnum = 0; VectorWritable vec = new VectorWritable(); for (Vector point : points) { vec.set(point); writer.append(new LongWritable(recNum++), vec); writer.close(); 12
public static List<Vector> getpoints(double[][] raw) { List<Vector> points = new ArrayList<Vector>(); for (int i = 0; i < raw.length; i++) { double[] fr = raw[i]; Vector vec=new RandomAccessSparseVector(fr.length); vec.assign(fr); points.add(vec); return points; public static void main(string args[]) throws Exception { int k = 2; List<Vector> vectors = getpoints(points); File testdata = new File("testdata"); if (!testdata.exists()) { testdata.mkdir(); testdata = new File("testdata/points"); if (!testdata.exists()) { testdata.mkdir(); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); writepointstofile(vectors, "testdata/points/file1", fs, conf); Path path = new Path("testdata/clusters/part-00000"); SequenceFile.Writer writer = new SequenceFile.Writer( fs, conf, path, Text.class, Cluster.class); for (int i = 0; i < k; i++) { Vector vec = vectors.get(i); Cluster cluster = new Cluster( vec, i, new EuclideanDistanceMeasure()); writer.append(new Text(cluster.getIdentifier()), cluster); writer.close(); KMeansDriver.run(conf, new Path("testdata/points"), new Path("testdata/clusters"), new Path("output"), new EuclideanDistanceMeasure(), 0.001, 10, true, false); SequenceFile.Reader reader = new SequenceFile.Reader(fs,new Path("output/" + Cluster.CLUSTERED_POINTS_DIR + "/part-m-00000"), conf); IntWritable key = new IntWritable(); WeightedVectorWritable value = new WeightedVectorWritable(); while (reader.next(key, value)) { System.out.println( value.tostring() + " belongs to cluster " + key.tostring()); reader.close(); 13
그리고이렇게표현된 vector 데이터에대해앞서좌표점을분류에서와같이알고리즘을선택하여 수행하면분류작업이완성되게된다. 14
2. 병렬형빈발패턴알고리즘 Apriori 알고리즘과 FP-Growth 알고리즘은데이터의크기가커질경우가지치기작업을통해 9 계산부담을해결했다. 그러나빅데이터의경우그규모가워낙커서다른방식의개선이필요해졌는데 Apriori 알고리즘의경우알고리즘자체의개선을위한노력이기울여진반면 FP-Growth 알고리즘은그자체가 Apriori 보다훨씬빨랐으므로알고리즘자체의수정못지않게 FP-Growth 를병렬처리하려는데에도촛점이맞추어졌다. (1) FP-Growth 알고리즘 FP-Growth 알고리즘은기본적으로분할정복방식을취한다. 이를위해다음의준비작업을한다. DB scan 으로빈발항목리스트를정렬한결과 (F-List) 를얻고 두번째의 DB scan 을통해 F-List 를 FP-Tree 로압축시킨다. 이제이결과물을가지고 FP-Growth 는지지도가의미있는숫자이상인것들을재귀적으로탐색한다. 다음은알고리즘을간략화한것이다. FP-Growth 용 Algorithm 1 위의 Algorithm 1 에서 F-List 를계산하는데있어서의시간복잡성은해싱기법에의할때 O(DBSize) 이다. 9 III. 분석기법 3. 주요분석기법 (3) 데이터상호관련성의분석 가. 빈발항목분석. 15
FP-Growth 용 Algorithm 2 Algorithm2 에서의 Growth() 의계산은다항식관계로서 FPGrowth() 는 Growth() 라는프로시저를재귀적으로호출한다. 이때, 다단계조건에따라메모리에서처리하므로병목현상의원인이되는등다음과같은문제를가진다. 스토리지문제 - 대형 DB 에서 FP-tree 가커지면한꺼번에메모리에탑재하기가어려워서여러개작은 DB 로나누어야했고결국이들소형 DB 별로 local FP-tree 를생성해야했다. 계산및분배문제 - 모든단계를병렬화할필요가있다. 통신오버헤드 - 분산 FP-tree 는상호의존성이있어서이들 thread 처리도중동기화의부담이컸다. 지지도값 - 분석애플리케이션에따라지지도값을낮게책정해야할경우계산의부담이엄청났다. (2) 병렬형 FP-Growth 알고리즘 FP-Growth 알고리즘을보다고속으로처리하고자하는욕구가커졌다. 그이유는특히웹검색과같은 경우 long-tail 10 의성격이강조되면서지지도가낮은수많은질의를무시할수없는경우가많기때문이다. 즉, 과거와달리아무리작은요청이나질의항목도모두고속검색으로해결을해주지않으면안되게되었다. 10 소수의히트상품 ( 항목 ) 이절대적위력을발휘했던과거의시장패턴에서이제는꼬리부분에있는다수의상품 ( 항목 ) 들의중요성이더욱커지고있는사회현상을말한다. 예컨대전자상거래의경우인터넷검색을통해누구나쉽게원하는물건을찾아낼수있게되어특정인기품목못지않게틈새상품이중요해지는새로운경제패러다임이형성된것이다. ' 롱테일 ' 은판매곡선에서불룩솟아오른머리부분에이어길게늘어지는꼬리부분을가리킨다. 16
나아가 Facted Search 11 와같이다차원의검색이필요한경우관련항목의검색을위해관련질의를신속히처리하는것은미루거나피할수없는기능이되었다. 그동안 FP-Growth 알고리즘의성능개선을위해작업 (task) 의병렬처리에주된관심이있었다. 즉, 모듈간의통신오버헤드를극복하고공유메모리를이용한다중 thread 의처리에집중되어왔다. 그러나소규모환경에서는확실한성능개선을보이던것이대규모환경에서는메모리에서병목현상이일어날뿐만아니라 cache 처리의문제, 메모리와 I/O 간의불균형문제등이발생하게되었다. 특히중요한것은장애극복 (fault-recovery) 의문제였다. 다수노드클러스터환경에서특정노드에서의장애가빈번했기때문이다. 이에 MapReduce 를적용한병렬형 FP-Growth 알고리즘이소개되었다. 특히대규모의 FP-Growth 작업을수평분할 ( 즉, sharding) 하여독립적작업단위로나누고이들을 MapReduce job 으로처리하는데그결과거의선형의성능개선 (near-linear scalability) 은거둘뿐아니라작업실패시자동재계산도가능해졌다. 이하에서병렬형 FP-Growth 알고리즘을소개한다. 12 11 정보를여러측면으로 - 즉, 다차원으로 - 분류하여제공하고사용자는이를필터를통해원하는것결과만얻도록하는것을말한다. 예를들어사용자가 'apple' 을검색하면과일사과에관련된항목 ( 사과에대한생물학 / 영양학 ) 과시장동향 ( 재배기술 / 가격동향 / 수급동향 ) 은물론미국 Apple 사의정보 ( 주가동향 /'ipod' 및 'iphone' 신제품 ) 가검색되어체계적으로분류되고제시되어야한다. 이를위해서는제반항목에대한분류 (faceted classification) 작업을실시간으로수행하는것이중요하다. 12 http://infolab.stanford.edu/~echang/recsys08-69.pdf 의논문. 17
병렬형 FP 의 5 단계 1 단계 : Sharding DB 를나눈후각각을 P 개의컴퓨터에저장한다. 이러한분할및분배를 sharding, 각각의부분을 shard 라고부른다. 2 단계 : Counting 작업을병렬로수행 DB 내의각거래내역에대한지지도를계산한다. 각각의 mapper 에는하나씩의 shard 가전달되며작업 결과는 F-list 에저장된다. 18
3 단계 : 항목의 Grouping 작업 F-List 에서의모든항목을나누어 Q 그룹에전달하는데이때의그룹리스트를 group list (G-list) 라고 부른다. 한편 F-list 와 G-list 는크기가작고복잡도가낮아서한대컴퓨터로도가능한다. 4 단계 : 병렬의 FP-Growth PFP 의가장핵심적인부분. 완전한 MapReduce 작업이이루어진다. Mapper - group 별거래내역을생성한다. 각각의 mapper instance 에는 1 단계에서의 DB shard 가전달되는데먼저 G-list 를읽은후 shard 의거래항목을처리한다. 각 mapper 는작업의수행결과를 key-value pair 의형태로작성한다. (key: group-id, value: 생성된 group 별거래 ). Reducer - group-dependent shard 에대한 FP-Growth 작업 : 모든 mapper instance 가작업을끝내면각각의 group-id 에대해 MapReduce 프레임워크가자동으로모든그룹별거래항목을정렬시켜서 shard 로만든다. 각각의 reducer instance 에는 group-dependent shard 가전달되어이를처리하게되고각각의 shard 에는 reducer instance 가 local FP-tree 를작성한다. 재귀적처리작업동안발견된패턴을 출력한다. 5 단계 : 종합 (Aggregating) 4 단계에서의작업결과를종합한다. 19
20
3. R 을 MapReduce 에적용한시계열데이터분석 R 을 MapReduce 적용하는방법을알아보기위해단순한예를중심으로살펴본다. 13 시계열형태의비행기운항데이터를항공사, 공항, 비행기별로추출, 분석하는프로젝트 14 로서 mapper 및 reducer 함수를 R 언어로작성하여수행하였다. 구체적으로는다음 2 단계로작업이이루어졌다. (1) CSV 타입의데이터를압축하여 R 로읽어들인다. # load list of all files flights.files <-list.files(path=flights.folder.path, pattern="*.csv.gz") # read files in data.table flights <- data.table(read.csv(flights.files[i], stringsasfactors=f)) (2) 그런후비행기별, 항공사별, 공항별로분석을실시하여중간결과를도출한다. 다음은운항일지를 항공사별로뽑아내는프로그램이다. getflightsstatusbyairlines <- function(flights, yr){ # 연도별추출 if(verbose) cat("getting stats for airlines:", '\n') airlines.stats <- flights[, list( dep_airports=length(unique(origin)), flihts=length(origin), flights_cancelled=sum(cancelled, na.rm=t), flights_diverted=sum(diverted, na.rm=t), flights_departed_late=length(which(depdelay > 0)), flights_arrived_late=length(which(arrdelay > 0)), total_dep_delay_in_mins=sum(depdelay[which(depdelay>0)]), avg_dep_delay_in_mins=round( mean(depdelay[which(depdelay > 0)])), median_dep_delay_in_mins=round( median(depdelay[which(depdelay > 0)])), miles_traveled=sum(distance, na.rm=t) ), by=uniquecarrier][, year:=yr] #change col order setcolorder(airlines.stats, c("year",colnames(airlines.stats)[-ncol(airlines.stats)])) #save this data savedata(airlines.stats, paste(flights.folder.path, "stats/5/airlines_stats_", yr, ".csv", sep="")) #clear up space rm(airlines.stats) 13 R 의사용법및프로그래밍에대하여는이책의부록을참조할것. 14 https://github.com/datadolphyn/r 에프로그램소스가있으며원래 1 억 2 천만개의 record, 12 GB 데이터를대상으로하는프로젝트이다. 한편본설명의촛점이되는 map-reduce 와직접관련되지는않지만프로그램에서사용되는 data.table 에대하여는 http://datatable.r-forge.r-project.org/ 를볼것. 21
# continue.. see git full code # map 함수의시작 # mapflightstats <- function(){ for(j in 1:period) { yr<-as.integer(gsub("[^0-9]","",gsub("(.*)(\\.csv)","\\1", flights.files[j]))) flights.data.file <-paste(flights.folder.path,flights.files[j],sep="") if(verbose) cat(yr, ": Reading : ", flights.data.file, "\n") flights <- data.table(read.csv(flights.data.file, stringsasfactors=f)) setkeyv(flights,c("year","uniquecarrier","dest","origin","month")) # call functions getflightstatsforyear(flights, yr) getflightsstatusbyairlines(flights, yr) getflightsstatsbyairport(flights, yr) 이상과같이항공사및공항별로운항일지를뽑아낸후디스크에저장하고총량분석데이터를가지는 132 개의중간데이터를추출하였다. 이제 reduce 함수를통해다음과같이중간데이터를집계하여 ( 비행기, 항공사, 공항별 ) 최종결과를도출하였다. #reduce 함수로서통합집계작업 reduceflightstats <- function(){ n <- 1:6 folder.path <- paste("./raw-data/flights/stats/", n, "/", sep="") print(folder.path) for(i in n){ filenames <- paste(folder.path[i], list.files(path=folder.path[i],pattern="*.csv"), sep="") dt <- do.call("rbind", lapply(filenames, read.csv, stringsasfactors=f)) print(nrow(dt)) savedata(dt, paste("./raw-data/flights/stats/", i, ".csv", sep="")) 22
4. Social CRM 과 Social 네트워크분석 (1) 배경 전통적 ( 분석 ) CRM 전통적 CRM 특히분석 CRM 은전사적차원에일관성있는통일적관점을제공하고전부서가고객의 단계별상황에맞춘대고객행위를할수있게된다. Direct Marketing 을예로하여 CRM 을이용한 BI 구현을살펴본다. (1) 예측변수를이용해서고객을등급메긴다. - 예측변수는각각의고객에대한측정값으로서예컨대 recency 15 ( 최근성 ) 를통해새로마케팅캠페인을했을때호응도를가늠해볼수있다. 가장최근구입한사람이가장호응도가높은것이라는것이다 (2) 여러예측변수를가중치적용하여조합한다 예측모델을만든다. 앞서 recency 에소득을추가하고가중치를반영한공식을만들수있다. 2 x recency + personal income 위모델에서우리는단순선형관계를가정하였으나이외에도조건에따른규칙을정의함으로써 business rule 을정의할수도있다. If 고객이지방거주자이고, and 월지출액이높다면, then 계약관계를지속할확률이높다. 또는 If 고객이도시거주자이고, and 신상품구매비율이높다면, then 이번캠페인에는참여하지않을것이다. (3) 실고객데이터를통해컴퓨터로모델을수립한다 15 RFM 분석에서는 Recency ( 얼마나최근구입했는가 ), Frequency ( 얼마나빈번하게구입했는가 ), Monetary amount ( 총 구입금액이얼마인가 ) 의 3 가지를통해제품구입가능성이높은고객들을추려낸다. RFM 의한계를극복한것이고객평생가치 (customer lifetime value) 평가법이다. 23
가장중요하고도어려운점은예측모델을만드는것이다. (4) 적용결과를다각도로검증한다. 컴퓨터상의모델링이완료되면이를검증한다. 이익곡선과현장에서의캠페인을통한이익증가추이를비교하는데처음에는고객접촉이늘수록이익도증가하지만얼마후부터는이익증가가감소하고약 70% 고객접촉이후부터는이익자체가감소됨을보여준다. 시간과예산을투입해도캠페인비용증가에비해매출증가가이루어지지않기때문이다. 전형적인이익곡선의모습 CRM과데이터관리 (1 단계 ) 필요데이터의판별 고객별로필요한데이터와전략지표 ( 예 : 수익률, 해지율 ) 를식별하고한다. (2 단계 ) 데이터를획득, 통합한다 데이터소스를확인하고획득방안을수립한다. 과거엄두내지못했던데이터도과감하게포함시킨다. (3 단계 ) 분류 큰범주별분류와세부기준에분류한다. 고객서술형 (Descriptive) 데이터 소비자행동 (Behavioral) 관련데이터 경영환경및여건에관한데이터 (Contextual data) (4 단계 ) 고객정보통합 CDI (customer data integration) 기업내부에서발생된정보와외부로부터새로획득된정보 과거실적과미래예측정보및정형정보와비정형정보 24
빅데이터와관련하여서는위의 2~3 단계에빅데이터기술이동원될수있는데이처럼빅데이터분석과 CRM 의통합하면각종정책변수상의상호관련성을찾아내고, 패턴을분류하며숨겨진추세를발견할수 있다. (2) Social CRM 가. 일반론 그동안 CRM 은주로 POS (Point of Sale), 전화 /fax, 메일, 대면접촉일지등의정형데이터중심이었다. 반면 Social CRM 이란고객을보다심도있게이해하기위해 SNS 의정보를이용하는방식으로 CRM 을확장하는것을말한다. 소비자욕구가다양해지고비정형정보가생성교환되면서이를분석할수있는토대가마련되었고모든상품과서비스는개인화되는추세이므로특히중요해졌다. 나. 분석방법 분석대상 SNS 에서의각종대화 / 댓글 / 논평등을지속적으로수집하고관리한다. 과거막연했던브랜드충성도나광고의효과분석도구매정보뿐아니라홈페이지방문자의각종방문기록과댓글을측정하는방식으로분석한다. 정형화된고객관리 DB 의분석 전자메일, SNS 에서의각종기록, 댓글등을수집, 분석 홈페이지및각종의웹사이트에대한방문및클릭동향 ETL 2.0 SNS 의데이터는다양한소셜미디어와전통적 CRM 을통합하는것이중요하다. 특히기존 CRM 의정형데이터와 SNS 의비정형데이터가서로다르게정의되었으므로 ETL 의기능이확장되어야한다 16. 예컨대 SugarCRM 의경우 SNS 고객접촉데이터를연결하고 LinkedIn 의업종및분야별전문가와통합, 그들과의대화를수집, 분석하는방식을취한다. 이들은대개 REST API 17 또는웹서비스방식을통해 Hadoop 을연결하며일부클라우드방식으로상용 aggregation 서비스를제공한다. 16 이처럼연결 / 추출의대상범위를확대하거나스트림데이터와같은대량데이터를위해확장한것을 ETL 2.0 이라고부르기도한다. 17 REST 는 Representational State Transfer 의약자로서 HTTP 의 GET, POST, PUT, DELETE method 만을사용해서작업하는것을말한다. 25
군집화 (clustering) 모델에서는수집된고객정보를통해동일한행태, 태도및관심을가지는부류로그룹화, 군집 화한다. 다. 이용전략 최근의미기반 (semantic) 분석, sentiment 분석등을자연어처리 (NLP) 기법으로해결하려는움직임이활발하다. 블로그나 SNS 의컨텐츠가자연어형태를가지기때문이다. 특히 sentiment 분석은텍스트로부터고객이가진주관적인정보를획득하고분석하는것을말한다. 예를들면 실제로좋아하는지? 어떤점을좋아하고싫어하는지? 또한이러한선호도는각종고객집단별로 어떠한특성과변화행태를보이는지? 자사가실시하는광고가호응을얻고있는지? 호응도와반대의견은어떠한지? 특정사건이나정책에대한여론동향은어떤지? 특정매체나유력인사의의견뿐만아니라실제 논의되는동향들은어떠한지? 아직자연어처리와여러감정처리 ( 예 : 비꼬거나빈정거리는것 ) 에한계가있지만여러시점의분석결과를축적, 비교함으로써맥락 (context) 을분석하는등의방식을통해상당부분대안이제시되고있다. 한편 push 전략도가능하다. SNS/ 포털사이트에서자사의존재를알리고대화하거나자체의커뮤니티사이트를구축하는것등이다. 전략 모든정보를한곳에모아서전사적으로축적, 관리, 공유한다. 이때어떤범위까지수집, 분석할지에대한전략을수립한다. ( 예 : 특정 hash tag, 특정고객집단및주제어등으로한정 ) CRM 과다른시스템을통합한다. e-commerce 의경우추천시스템과연계하고조직내업무규칙과절차를재조정한다. 26
Social Marketing 과연계한다. 여기에는 SNS + Commerce + M-commerce + LBS, NFS 등이 모두포함된다. (3) 소셜네트워크분석 가. 개요 소셜네트워크분석 (SNA: Social network analysis) 은사람, 집단, 기관, 컴퓨터, URL, 지식들간의상호관련성과그정도를보다본격적으로측정, 분석하는것을말한다. 이때네트워크내에서의 node 는사람내지집단을의미하고 link 는 node 들사이의관계또는흐름을뜻하게된다. 간단하게는그림으로표현하고이를직관적으로알아볼수있지만복잡한관계는수학적기법을사용하게된다. 우선네트워크내에서의대상이되는객체의위치를이해할필요가있다. 여기서위치란노드의중앙부 (centrality) 18 를파악하는것을말한다. 이를통해이들이차지하는역할과 ( 예컨대전문가그룹, 연결자, 지도자 (leader), 교량역할, 고립자등으로분류할수도있다.) 어떻게그룹을이루는지를알수있기때문이다. 나. 세부기법 MCL 과 DBSCAN 앞서본여러군집화기법에추가하여그래프분석이론이동원된다. 그래프이론기반의군집화기법에는 Markov Clustering Algorithm (MCL) 를들수있는데이는 random walk 를시뮬레이션하는방식으로꼭지점 (clusters of vertices) 을발견해내는것이그핵심이다. 즉, 만약그래프내에군집이존재한다면군집내의링크는군집간에서보다 link 의갯수가더많을것이라는점에착안한다. 하나의꼭지점 (vertex) 에서그래프를임의로탐사하면하나의군집내에서더자주최종점에도달할것이고이러한 random walk 를통해군집을발견해낸다. 이때 Random walk 는 Markov Chain 기법을동원한다. Markov Chain 은 X 1, X 2, X 3,... 등의일련의상태정보로표현된다. 현재어떤상태가주어졌다면과거및미래의상태는이와독립적일것이다. 즉, 시스템의다음상태는현재상태에따라서만달라질수있다. P r (X n+1 =x X 1 =x 1,X 2 =x 2,X 3 =x 3,...,X n =x n )=Pr(X n+1 =x X n =x n ) Pr 함수를통해 Markov Chain 내에서의확률값을계산하지만 Pr 은 X n 을입력받을경우에만 X n+1 에대해동일한상태값을얻을수있게된다. 또한 MCL 에서상태는확률 matrix 로표현된다. 즉, 그래프에서는 vertex 를벡터로여기고모서리 (Edge) 를벡터의항목으로간주한다. 이들모든노드벡터를그래프에위치시키면그래프의연결상태를보여주는행렬이만들어진다. 18 중심도 (centrality) 에도여러가지가있으나세부설명을생략한다. http://faculty.ucr.edu/~hanneman/nettext/index.html 참조 27
행렬식의생성 이제행렬식을중심으로군집을찾는데있어서소셜네트워크표현이 sparse matrix 의경우특히 Map- Reduce 프레임워크를이용하면성능과확장성에있어더욱큰효과를보게된다. 19 이제 MCL 알고리즘에대한추가의세부설명대신에이것이 Map-Reduce 프레임워크에어떻게적용되는지에대해서만간단히살펴본다. Markov Matrix 군집화알고리즘 ( 목적 ) Markov Matrix 기법을통해 Social Network 의군집화분석을실시 ( 입력사항 ) connectivity (similarity)sparse matrix ( 출력사항 ) markov (probability) matrix class MarkovMapper method map(column) sum=sum(column) for all entry column do entry.value=entry.value/sum collect{column.id, {out, entry.id, entry. value collect{entry.id, {in, column.id, entry.value class MarkovReducer method reduce(key, list {sub-key, id, value) newcolumn=ø 19 http://www.diva-portal.org/smash/get/diva2:556816/fulltext01.pdf 28
newrow=ø for all element list do if sub-key is out then newcolumn.add({key, id, value) else newrow.add({id, key, value) collect newcolumn collect newrow 그밖에 MCL 을 Map-Reduce 적용할때다양한대안이있을수있다. 그밖에 DBSCAN 의 Map-Reduce 방식에대해서는설명을생략한다. 20 20 MCL 과 DBSCAN 기법을 MapReduce 방식으로 Social Network 분석에활용하는세부구현방법은다음문서를참조할 것. http://www.diva-portal.org/smash/get/diva2:556816/fulltext01.pdf 29
5. 얼굴인식 ( 보안 ) 과 Hadoop (1) 사례 : 미국보스톤마라톤폭발사건 ( 사례 ) 2013 년 8 월 15 일미국보스톤마라톤경주중폭발사건으로 3 명이사망하고수십명이부상당하는사건이발생하였다. FBI 는휴대폰중계기의통화기록 (call log), 문자메시지, SNS 데이터, 사진, 민간제보, 단서등자료를확보하고 10 TB 의영상화면을분석해서 4 일만에범인을검거하였다. 우리는화상데이터분석을통한얼굴인식처리문제만을빅데이터와의관점에서살펴본다. 21 (2) 얼굴인식 (face recognition) 얼굴인식은얼굴의각종근육과그움직임, 대칭구도, 전반적형상을분석해서특정얼굴을구별할수있도록특징을찾아내는것이며 (a) 촬영한영상에서특정얼굴을찾아내는검출과 (b) 검출된얼굴이등록된사람의것인지판별하는인증부분으로나뉜다. 다음은패턴인식의기본프로세스를나타낸그림이다. 그림중붉은색의글씨는패턴인식과정중얼굴인식에해당되는프로세스를나타낸다. 21 이외에도스마트폰사용자의위치추적및 SNS 데이터처리가중요했는데 SNS 데이터처리에대해서는 Topsy 라는회사가유명해졌다. 2010 년이래의방대한 tweet 데이터를빅데이터처리하고단어검색과위치추정 (geo-inferencing) 기능을통해큰효과를거두었다는것이다. 또한 JackBe 라는회사의실시간 intelligence 소프트웨어가다양한분석에활용되었다. 30
PCA ( 주요인분석 : Principal Components Analysis) SVM (Support Vector Machines) Gabor Wavelets 22 Hough Transform 23 얼굴인식의특징적절차로다음과같은것을들수있다. 탐지기법에는크게다음과같은방법이있다. 지식기반방법 전형적얼굴에대한지식을규칙으로표현 변치않는특징 (feature invariant) 분석 - 표정, 얼굴가장자리, 피부질감등변화에도변치 않는특징을추출한다. 구조적유사성탐지 (structural matching) 템플리트기반의유사성탐지 (Template matching) 외양 (appearance) 에의한인식과인공신경망의이용 정규화란얼굴특징을이루는점의위치정보를받아개별파라미터를생성하고클래스내에존재하는 차이점을제거하는것이고특징추출은정규화된이미지를기반으로얼굴을표현하는 key 를출력하고이를 비교하는것이다. PCA 와 Gabor Wavelets 등을이용한다. 22 질감특성을나타내기위한기법의하나이다. 영상검색에는이외에도색상특성을표현하는칼라히스토그램, 모양특성을표현하는경계영역기술 (boundary description) 등이있다. 23 Hough (' 허프 ' 라고읽는다 ) 변환은이미지가불완전할때파라미터공간에서일종의투표방식을통해객체를선정하는기법이다. 이후 GHT (generalized Hough transform) 로확장되었다. 31
(3) HIPI 가. 개요 HIPI 란 Hadoop Image Processing Interface for Image-based MapReduce Tasks 의약어로서 Hadoop 기능을이미지처리에접목시키고자하는오픈소스프로젝트로서 24 주된목적은다음과같다. 이미지프로세싱과컴퓨터비전을 MapReduce 프레임워크를통해구현하도록확장가능한 개방형라이브러리의개발 이미지에대한필터링을손쉽게적용할수있도록함. HIPI 는 Hadoop 의 MapReduce 프레임워크를위한라이브러리로서분산환경에서이미지 프로세싱을할수있도록 API 를제공한다. Hadoop MapReduce 가데이터입출력에는효율적이지만이미지표출에그리효율적이지못한점에 주목하고이를해결하는데에 HIPI 라이브러리설계의초점이주어졌다. HIPI 의 Image Bundle 로입력이미지파일을지정하면이후의작업특히병렬처리를위한제반작업은 HIPI 라이브 러리가자동으로처리한다. 또한사용자가 pixel 단위로이미지제어할수있도록 float 이미지형태로배포된다. 즉, 모든이미지는 표준형식 ( 예 : jpeg, png 등 ) 으로저장되지만 HIPI 가 float 이미지로 encode/decode 하므로이미지의 평균픽셀값의계산등이자유롭게수행될수있다. 24 참고 : HIPPI 가보스톤폭발사건해결에직접사용되었는지는알수없으나대규모이미지처리에 Hadoop 이사용된것은 분명한것으로알려진다. 프로젝트홈페이지는 http://hipi.cs.virginia.edu/ 32
나. MapReduce 용이미지프로세싱 API HIPI 에서가장많이사용되는 class 는다음과같다. HIPI Image Bundle (HIB) 이미지파일을저장시 HIPI Image Bundle 이라는타입을이용하는데여기서는실제파일과 Index 파일 ( 메타데이터 ) 를구별하여저장한다. Unix 에서의 tar 파일과흡사하며 HipiImageBundle 클래스를통해구현된다. 이파일은 Hadoop MapReduce 에서직접사용된다. Float Image HIPI 에대한주된입력파일클래스로서이미지파일을 pixel 단위로표현하되각각의 pixel 을단일 floating-point precision 으로표시한다. Cull Mapper 이미지를수집또는버릴수있도록설계된것으로서 CullMapper::cull 라는함수를가진특별한 Mapper 타입을정의하는역할을한다. 다. HIPI 에서의 Job HIPI 는 Hadoop 의표준 job 클래스를확장해서이미지처리를위한작업시손쉽게필요한파라미터설정을할수있도록해준다. HipiJob 을이용할떄의 2 가지주요한작업은다음과같다. 33
투기적실행 25 의활성화 / 비활성화 HipiJob::set{Map,ReduceSpeculativeExecution 를이용하며특정노드에서작업이늦어질때해당 task 를종료 (kill) 하거나다른 task 의결과를이용할수도있다. Map 출력레코드의압축을활성화 / 비활성화 HipiJob::setCompressMapOutput 를이용하며 Map task 가끝난후 redoce task 로전달되기전에출력물을압축할지여부를결정한다. (4) 덧붙이는말 Minority Report? 가. 동향 얼마전영화 "Minority Report" 가큰주목을끌었다. 2054 년미국워싱턴을배경으로범죄발생전에범죄를예측해처벌하는특수기동부서 "PreCrime" 의이야기였다. 영화자체는다소과장되었으나이와유사한현상이현실화되고있다. 이미미국에서가석방심사시대상자의각종자료를분석, 활용하는곳이 50% 를넘어섰고 L.A. 같은곳에서는빅데이터를활용한 " 예방치안 (predictive policing)" 을실시중에있다고한다. 범죄취약지역을선정하여예방조치하는것이다. 이기술의하나가 FAST (Future Attribute Screening Technology) 로서개인행태를분석하면타인에게위해를끼치려는의도를알아차릴수있다. 이미범인프로파일링기법이법원으로부터정당성을인정받았고금융기관의신용평가등에도이미상당부분적용되고있는데 Hadoop 을중심으로한빅데이터처리기술이그핵심이되고있는것으로알려져있다. 나. 범죄프로파일링 이제우리의관심인빅데이터기술의측면에서중범죄자분류엔진 (felony classifier) 을보자. 이것은데이터를분석하여프로파일링하여예컨대경범죄를저지른사람이중범죄로이어질가능성을분석한다. 위의분류모델은 2 억 5 천만명의경범죄피고와 4 천만명의피고데이터를분석하고이에대해다음 기능을가지는분석엔진을적용한다. 데이터획득과교환 25 II. Hadoop 3. Hadoop 의기능요소 (2) MapReduce 라. Task 의투기적실행 을참조할것 34
Noise filter Blocking Linking 군집화분석엔진의핵심인분류작업에는지역별분류는물론 15,000 개의 label ( 특성 ) 을기준으로한예측인자가포함된다고한다. 다음은모델에서의특성정보의예로서몸에문신여부, 성별과머리, 눈, 피부의색을구분하고어떤범죄경험이있는지등을나타내고있다. Personal Profile Person.NumBodyMarks Person.HasTattoo Person.IsMale Person.HairColor Person.EyeColor Person.SkinColor Criminal Profile Offenses.NumOffenses Offenses.OnlyTraffic 다음은이러한특성정보를프로그램화하는일부예이다. class EyeColor(Extractor): normalizer = { 'bro': 'brown,'blu': 'blue', 'blk': 'black', 'hzl': 'hazel, 'haz : 'hazel, 'grn': 'green schema = {'type': 'enum', 'name': 'EyeColors','symbols': ('black','brown','hazel','blue','green','other','unknown') def extract(self, record): recorded = record['profile'].get('eyecolor', None) if recorded is None: return 'unknown'recorded = recorded.lower() if recorded in self.normalizer: recorded = self.normalizer[recorded] for i in self.schema['symbols']: if recorded.startswith(i): recorded = i if recorded in self.schema['symbols']: return recorded else: return 'other' 이러한기본사항에대해훈련데이터를가지고모델의훈련을실시한다. 35
다음은이들분석후출력된예측데이터의포맷이다. key: e926f511b7f8289c64130a266c66411e val: offenses: - {CaseID: MDAOC206059-2, CaseInfo: 'CASE DISPO: TRIAL, CJIS CODE: 3 5010', Disposition: STET, Key: hyg-mdaoc206059, OffenseClass: M, OffenseCount: '2', OffenseDate: '20041205', OffenseDesc: 'THEFT:LESS $500 VALUE' - {CaseID: MDAOC206060-1, CaseInfo: 'CASE DISPO: TRIAL, CJIS CODE: 1 4803', Disposition: GUILTY, Key: hyg-mdaoc206060, OffenseClass: M, OffenseCount: '1', OffenseDate: '20040928', OffenseDesc: FALSE STATEMENT TO OFFICER profile: {BodyMarks: 'TAT L ARM;,TAT L SHLD: N/A;,TAT R ARM: N/A;,TAT R SHLD: N/A;,TAT RF ARM;,TAT UL ARM;,TAT UR AR', DOB: '19711206', DOB.Completeness: '111', EyeColor: HAZEL, Gender: m, HairColor: BROWN, Height: 5'8", SkinColor: FAIR, State: 'DE,MD,MD,MD,MD,MD,MD,MD,MD,MD,MD,MD,MD, Weight: 180 LBS Training Labels: key: e926f511b7f8289c64130a266c66411e val: label: true offenses: - {CaseID: MDAOC206065-4, CaseInfo: 'CASE DISPO: TRIAL, CJIS CODE: 1 6501', Disposition: NOLLE PROSEQUI, Key: hyg-mdaoc206065, OffenseClass: F, OffenseCount: '1', OffenseDesc: ARSON 2ND DEGREE 다음은판별을위한의사결정트리의일부분이다. 36
5. 부정탐지와 MapReduce 부정탐지 (Fraud Detection) 는빅데이터의주요응용분야중하나이며넓은개념으로서의부정행위분석 (fraud analysis) 26 중하나이다. 사기행위에는부정확한대출신청, 사기성거래, 명의도용및부정보험청구. 신용카드부정사용등다양한종류가있는데예측모델을통해잘못된거래를방지하거나회피하고이에대한노출위험자체를낮추고자한다. 예측모델의수립은특별히관리해야할유형을식별하고이들예측변수에대한적절한가중치를부여하는등의순서를거친다. 부정탐지관련빅데이터분석기술에는다음과같은것이포함된다. (1) Deduplication 에 MapReduce 기술을적용한다. 앞서 27 데이터축소에대해언급한바있는데 Deduplication 도중복된내용 (chunk 또는 extent 라고도함 ) 을줄이는기법의하나이다. 특히통신사의 CDR 28 또는동일한데이터패킷이전송될때이를최적화하는데자주사용된다. 보통데이터 Deduplication 을위해서 hashing, binary 비교, delta differencing 등을사용했었지만이제 MapReduce 를통해서보다효율적으로데이터중복을해소하게되었다. 실제로 MapReduce 와 HDFS, 스트리밍등의기술을조합하여다양한방법이제안되어활발히사용되고있는데부정탐지에서 deduplication 을빅데이터처리함으로써저장공간도줄이고통신대역을효율적으로사용하여실시간처리가한결용이해졌다. 다음이대표적이다. Entity matching - 이름, 전화번호, 주소등의일치여부확인. SNS 신원확인 - 소셜네트워크상의프로파일을통해소비자의신원을밝힌다. ( 단, 프라이버시문제가있음을유의할것.) (2) 이상행위 (outlier) 탐지 가장전형적인부정탐지작업이다. 업무별이상특징을반영하여알고리즘작성하는데여기에는군집화, 확률분포, 경로분석, sessionization, tokenization 및 attribution, 회귀분석및상관분석, 그래프분석및시각화기술이포함된다. 26 부정행위분석에는부정의탐지 (detection), 예방 (prevention), 억제 (reduction) 등 3 가지가모두포함된다. 27 III. 분석이론 2. 모델링과데이터전처리 28 III. 분석이론 3. 분석기법 (8) 스트림데이터에서설명한바있음. 37
(3) 업무프로세스 (Workflow) 에반영 - Transaction streaming, 감독, 경고발송 (alert forwarding), 추가진행제한 (transaction blocking) 등을이용해서업무규칙을업무프로세스에일체화시킴으로써프로세스자체를자동화하는것을말한다. 한편빅데이터를이용한부정탐지업무는다음과같은특징을가진다. (1) 업무특성상 DW 를그대로사용하는경우는거의없다. 부정알고리즘을효과적으로수립하고유연한분석을위해데이터분석시고정된스키마대신별도의 (custom) 스크립트를사용한다. (2) 신속한대처를위해서는빠른처리가관건이다. 이런의미에서 index, 백업등의 DB 의부담은일체없애는것이좋으며흔히 Flume 같은도구를이용해서데이터를신속하게적재한다. 여기서는사기탐지에많이이용되는 KNN 알고리즘을 Map-Reduce 방식으로수행하는것을간단히소개한다. KNN 알고리즘의 Map-Reduce 적용 ( 목적 ) KNN 알고리즘을 Map-Reduce 프레임워크를이용하여수행함으로서 처리효율과확장성을높임. Map() ( 입력항목 ) 모든점질의점 (query point) p ( 출력항목 ) k nearest neighbors ( 로컬 ) Emit the k closest points to p -- Reduce() ( 입력항목 ) Key: null; values: local neighbors 질의점 p ( 출력항목 ) k nearest neighbors ( 전역 : global) Emit the k closest points to p among all local neighbors 38
이제신용카드에서의사기탐지를간략화한예를살펴본다 29. 우선신용카드결제시다음의 4 가지속성데이터가제공된다고가정한다. 승인신청번호 ID 신청시간 금액 가맹점분류그리고다음의샘플데이터중제일마지막것이이상치라고하자. YX66AJ9U 1025 20.47 약국 98ZCM6B1 1910 55.50 식당 XXXX7362 0100 1875.40 보석판매점 작업순서 (1) 우선거리계산을한다. 거리의개념에는 Euclidean, Manhattan 거리등다양한선택이가능하다. 수치데이터의경우 ( 예 : 금액 ) 수치차이가거리가된다. 카테고리데이터의경우 ( 예 : 상점분류 ) 동일타입이면 0, 다른타입이면 1 이다. 카테고리데이터가계층구조를가지는경우에는하나의 node 에서다른 node 로옮아가는 edge 의최소숫자가거리가된다. (2) 그리고이들각속성별거리를합산한다. 이때적용분야에따라비중을달리할수도있다. 이들거리를계산함에있어 Map-Reduce 방식을적용한다. 특히거리계산의비용은 O(nxn) 인데즉, 1 백만개의승인신청이있다면 1 백만 x 1 백만 = 1 조개의계산이수행되어야한다는것을의미한다. (3) 이들작업은나뉘어수행된후 reducer 로전달된다. 100 개의 Hadoop node 가있고 node 당 10 개의 reducer slot 이있다면각각의 reducer 에는대략 10 억개의거리계산작업이수행된다. 이때이러한작업을어떻게배분할것인가가문제되는데 partitioned hashing 방식을사용하면편리하다. If h1 = hash(id1) and h2 = hash(id2) then use function of h1 and h2 as the key of the mapper output. 이제 h1 또는 h2 에 hash 된 id 를가지는모든승인신청은하나의 reducer 에배정될것이다. 다음은 해당분야의코드예이다. String partition=partordinal>=0? items[partordinal] : "none"; 29 http://www.slideshare.net/pkghosh99/outlier-and-fraud-detection 의사례보고를참조하였다. 39
hash =(items[idordinal].hashcode() % bucketcount+bucketcount) / 2 ; for (int i = 0; i < bucketcount; ++i) { if (i < hash){ hashpair = hash * 1000 + i; keyholder.set(partition, hashpair,0); valueholder.set("0" + value.tostring()); else { hashpair = i * 1000 + hash; keyholder.set(partition, hashpair,1); valueholder.set("1" + value.tostring()); context.write(keyholder, valueholder); 그리고나서각각의데이터점을중심으로 k 개의최근접점을찾은후거리에따라정렬한다. 앞서의 Map-Reduce 에의하면다음과같이 key, value 가산출될것이다. key -> (6JHQ79UA, 5) value -> (JSXNUV9R, 5) key -> (JSXNUV9R,, 5) value -> (6JHQ79UA, 5) key -> (6JHQ79UA, 89) value -> (Y1AWCM5P, 89) key -> (Y1AWCM5P, 89) value -> (6JHQ79UA, 172) reducer 에서는 reducer 가수행될때 key 로 transaction ID 와 value 로서이웃 transaction ID 및 거리 pair 의리스트가생성된다. reducer 에서는각 value 에대해반복해서거리평균을계산해낸후 이들을출력한다. 1IKVOMZE 5 1JI0A0UE, 173 1KWBJ4W3, 278... XXXX7362, 538 거리개념에기반한계산을통해서당초이상치거래신청이예외사항으로보고됨을알수있다. 물론위설명이지나치게단순한것이사실이다. 실무에서는그룹화를위한 grouping comparator 등다양한항목이추가될것이다. 또한 k 를어떻게정할것인가는 KNN 의여전한과제이다. K 값이너무낮으면 low bias 현상이발생하고너무크면 low variance 현상이발생하므로최적값을정하기위해다소의실험이필요하다. 이제또다른예로서다음은우리나라에서의특히전자금융거래에대한탐지에대한한보고서이다. 30 30 이상금융거래탐지및대응프레임워크, TTA ( 한국정보통신기술협회 ) 정보통신단체표준 (TTAK.KO-12.0178), 2011 년 12 월 21 일 40
41
6. 스트리밍을이용한 R 과 Hadoop 의혼용 앞서리눅스에서의 stream 방식을통해 Hadoop 과다른프로그래밍언어를함께이용할수있다고하였는데 31 여기서는이기법을통해 Hadoop 과 R 을함께이용하기로한다. 특히 Hadoop 을이용한빅데이터처리에있어 R 을이용하되 map 만으로구성하는경우와 Map 과 Reduce 를모두구성하는경우를비교하면서살펴본다. 32 (1) R 을이용한 map 기능만의구현 MapReduce 에서일반적으로그러하듯이 R 을이용하면서도 map 함수만을적용할수있다. 데이터를 join 하거나그룹화할필요도없고별도의 sort 작업도필요없는경우 reduce 함수를생략해도무방하기때문이다. 여기서는주식가격의일별평균을구하는경우를살펴보는데데이터파일이다음 column 으로구성되는 csv 포맷이라고가정하자. 다음데이터파일에서보듯이각줄마다개별주식의주가가나열되어있다. Symbol,Date,Open,High,Low,Close,Volume,Adj Close $ head -6 test-data/stocks.txt AAPL,2009-01-02,85.88,91.04,85.16,90.75,26643400,90.75 AAPL,2008-01-02,199.27,200.26,192.55,194.84,38542100,194.84 AAPL,2007-01-03,86.29,86.58,81.90,83.80,44225700,83.80 AAPL,2006-01-03,72.38,74.75,72.25,74.75,28829800,74.75 AAPL,2005-01-03,64.78,65.11,62.60,63.29,24714000,31.65 AAPL,2004-01-02,21.55,21.75,21.18,21.28,5165800,10.64 이제위의데이터를읽고계산하는작업을다음의 R 코드에서수행한다. 1 #! /usr/bin/env Rscript 2 options(warn=-1) 3 sink("/dev/null") 4 input <- file("stdin", "r") 5 6 while(length(currentline<-readlines(input,n=1,warn=false))>0) { 7 8 fields <- unlist(strsplit(currentline, ",")) 9 10 lowhigh <- c(as.double(fields[3]), as.double(fields[6])) 11 12 stock_mean <- mean(lowhigh) 13 31 II. Hadoop 4. Hadoop 설치운영과프로그래밍 (2) 나 항목 (Java 와다른언어의혼용 ) 을참조할것. 32 Alex Holmes, Hadoop in Practice, Manning Publications, 2012 42
14 sink() 15 16 cat(fields[1], fields[2], stock_mean, "\n", sep="\t") 17 18 sink("/dev/null") 19 20 21 close(input) 맨첫줄에서 Rscript 가수행될환경을지정하였는데이는 Linux/Unix 에서의 shell 지정과마찬가지이다. 둘째줄의옵션을통해 warning 메시지의출력을억제한후결과가출력될곳을 /dev/null 로지정하였다. 다음으로 4 번줄의 input 명령어를통해입력파일을읽기 mode 로지정하였다. 이제 6~20 번줄사이에있는 while 문의블록에서파일의각줄별로반복작업의내용을지정하였다. 8 번줄의 Unlist 를통해읽어들인각각의문자열을콤마 (,) 를기준으로분리 (split) 시킨후그결과의리스트를벡터로옮긴후다음 10 번줄에서새로운 vector 를만들고 3 번째및 6 번째 column 의데이터, 즉, 주식의시가와종가를 lowhigh 라는벡터변수로저장하였다. 12 번줄에서평균계산을한후, 14 번줄에서 sink() 명령에대해아무런 argument 를지정하지않음으로써표준출력 (standard output) 을복원하였다. 16 번줄에서 cat 명령으로 1, 2 번필드와함께연결한후 18 번줄에서스트리밍처리라는환경을반영하여모든출력을다시 /dev/null 로변경하였다. 이제 map-only 의스트리밍작업구조가다음그림에표시되어있다. 실제위의프로그램을수행하면다음과같은결과가출력된다. $ cat test-data/stocks.txt src/main/test/stock_day_avg.r 43
AAPL 2009-01-02 88.315 AAPL 2008-01-02 197.055 AAPL 2007-01-03 85.045 AAPL 2006-01-03 73.565... 출력결과가정상임을확인한후이를 Hadoop 의 job 으로서수행한다. $ export HADOOP_HOME = /usr/lib/hadoop $ ${HADOOP_HOME/bin/hadoop fs -rmr output $ ${HADOOP_HOME/bin/hadoop fs -put test-data/stocks.txt stocks.txt $ ${HADOOP_HOME/bin/hadoop \ jar ${HADOOP_HOME/contrib/streaming/*.jar \ -D mapreduce.job.reduces=0 \ -inputformat org.apache.hadoop.mapred.textinputformat \ - input stocks.txt \ - output output \ - mapper `pwd'/src/main/test/stock_day_avg.r \ - file `pwd`//src/main/test/stock_day_avg.r 여기서는 linux 상의 export 명령을통해 Hadoop 의설치장소를밝히고, 출력장소등에대한정리작업 (housekeeping) 을하였다. 그런후 jar 명령을통해스트리밍프로그램의위치를지정하였다. 유의할것은 -D 옵션에서 mapreduce.job.reduces=0 로지정함으로써 reducer 함수는적용되지않도록한점이다. 이후입출력파일을지정한후 -mapper 에서 map 함수로수행할프로그램을지정하였다. 위와같이 mapper 함수를통한수행을하여도출력결과는앞서 R 프로그램수행과동일하게된다. (2) R 을 Map 과 Reduce 모두에적용하는경우 map 과 reduce 를모두적용하는경우를본다. 여기서는단순평균뿐아니라이동평균 (CMA: cumulative moving average) 을계산한다. #! /usr/bin/env Rscript options(warn=-1) sink("/dev/null") outputmean <- function(stock, means) { stock_mean <- mean(means) sink() cat(stock, stock_mean, "\n", sep="\t") sink("/dev/null") input <- file("stdin", "r") prevkey <- "" 44
means <- numeric(0) while(length(currentline <- readlines(input,n=1,warn=false))>0) { fields <- unlist(strsplit(currentline, "\t")) key <- fields[1] mean <- as.double(fields[3]) if( identical(prevkey, "") identical(prevkey, key)) { prevkey <- key means <- c(means, mean) else { outputmean(prevkey, means) prevkey <- key means <- c(means, mean) if(!identical(prevkey, "")) { outputmean(prevkey, means) close(input) 위 R 스크립트의 output<- function(stock, means) {... 함수에서주식코드와평균에대한 vector 를입력받은후 CMA 를계산하고그결과를표준출력장치로보내도록지정하였다. 이후 while() 문내에서실제작업이이루어지는데특히 while () 안에서의 if..else.. 문에서새로운 key 를 발견하면함수를호출해서 CMA 를계산한후이를표준출력장치에보내도록 outputmean() 함수를 호출하고있다. 위에서보았듯이 MapReduce 는 map 에서의결과물을정렬하고그룹화하는데위에서는주가코드별로 - 즉, 이것을 key 로하여 - 이들작업이진행되었다. 이제위의 R 스크립트를 stock_cma.r 라는이름의파일로저장한후다음과같은명령어로그결과를 확인한다. $ cat test-data/stocks.txt src/main/test/stock_day_avg.r sort --key 1,1 src/main/test/stock_cma.r AAPL 68.997 CSCO 49.94775 GOOG 123.9468 MSFT 101.297 YHOO 94.55789 다음그림에서는스트리밍작업과 R 스크립트가함께 reduce 측에적용되는구조를보여주고있다. 45
이제이작업도 Hadoop 의 job 으로수행할수있다. $ export HADOOP_HOME = /usr/lib/hadoop $ ${HADOOP_HOME/bin/hadoop fs -rmr output $ ${HADOOP_HOME/bin/hadoop fs -put test-data/stocks.txt stocks.txt $ ${HADOOP_HOME/bin/hadoop \ jar ${HADOOP_HOME/contrib/streaming/*.jar \ -inputformat org.apache.hadoop.mapred.textinputformat \ - input stocks.txt \ - output output \ - mapper `pwd'/src/main/test/stock_day_avg.r \ - reducer `pwd'/src/main/test/stock_day_cma.r \ - file `pwd`//src/main/test/stock_day_avg.r 이명령어를보면앞서 map 만으로구성된프로그램의수행과동일하며단지 -reducer... 문장을통해 reduce 함수만이추가되었음을알수있다. 그리고이러한 Hadoop 수행명령의결과역시위의 R 수행결과와동일함은물론이다. 46