데이터를실시간으로모아서 처리하고자하는다양한기법들 김병곤 fharenheit@gmail.com
플랫폼을말하다 2
실시간빅데이터의요건들 l 쇼핑몰사이트의사용자클릭스트림을통해실시간개인화 l 대용량이메일서버의스팸탐지및필터링 l 위치정보기반광고서비스 l 사용자및시스템이벤트를이용한실시간보안감시 l 시스템정보수집을통한장비고장예측 l 실시간차량추적및위치정보수집을이용한도로교통상황파악 l 사용자의액션수집을이용한이상행위탐지 3
Scale Up vs. Scale Out 4
Scale Up vs. Scale Out l Scale Up Concurrent Programming 단일애플리케이션에서 Multi-Core Achitecture 를잘활용하는방법 언어 : Erlang, Scalar, Clojure l Scale Out Distributed Programming 네트워크로연결된다수의머신을통해작업을분배 기법 : MapReduce l Scale Up 과 Scale Out 을선정하는기준 Scalability & Performance 5
Scale Out 이대세 l Scale Out 을선택하는가장큰이유 Continuous Availability Continuous Redundancy Cost/Performance Flexibility Continuous Upgrade Geographical Distribution l 하지만 Scale Out 은어렵다 Availability, Scalability Indexing, Partitioning, Replication 6
실시간처리방식에따른기술적특징 구분 기술적특징 시간에따른이벤트의일련의연속된흐름을처리 특정한 time window 또는건수를연속적으로처리 이벤트중심처리 Scale up 아키텍처 선언적 rule 기반처리 단순한시스템구성 시간에따른이벤트의일련의연속된흐름을처리 스트리밍중심처리 Scale out 아키텍처 Computation 중심처리 복잡한시스템구성 7
실시간처리를위한오픈소스 오픈소스라이센스언어확장방법 즉시 Rule 추가및변경 필요한 인프라 구현방식 Esper CEP GPL2 Commercial Java Scale up 예없음 선언적 SQL Query Like Drools Fusion CEP ASL 2.0 Java Scale up 예없음 선언적 일반적은 Rule 기반 Query 지원 Storm EPL 1.0 Clojure Scale Out Zookeeper 기반 구현가능 ZeroMQ Zookeeper 프로그래밍 Apache S4 ASL 2.0 Java Scale Out Zookeeper 기반 구현가능 Zookeeper ( 옵션 ) YARN ( 옵션 ) 프로그래밍 Apache Kafka APL 2.0 Java Scale Out Zookeeper 프로그래밍 Message Queue 8
실시간처리를위한오픈소스 오픈소스문서화성숙도커뮤니티 URL 참고 Esper CEP 매우 좋음 높음, 안정적중간 esper.codehaus.org Drools Fusion CEP 좋음 3 년이상, 안정적작음 www.jboss.org Storm 있음운영에사용빠르게성장 github.com/nathanmarz/storm 배포 기능좋음 Apache S4 평균낮음, 운영에사용중간 incubator.apache.org/s4 Apache Kafka 좋음운영에사용작음 incubator.apache.org/kafka 9
실시간처리를위한오픈소스 오픈소스문서화성숙도커뮤니티 URL 참고 Esper CEP 매우 좋음 높음, 안정적중간 esper.codehaus.org Drools Fusion CEP 좋음 3 년이상, 안정적작음 www.jboss.org Storm 있음운영에사용빠르게성장 github.com/nathanmarz/storm 배포 기능좋음 Apache S4 평균낮음, 운영에사용중간 incubator.apache.org/s4 Apache Kafka 좋음운영에사용작음 incubator.apache.org/kafka 10
Tweetping 1
Splunk 1
Sumo Logic 1
Realtime Big Data Architecture Complex Event Processing Engine Web Server Log Aggregator Network Socket Ingress Adapter Network Socket Event Query Engine Rule Engine Client API Outgress Adapter Client API NoSQL Cluster (MongoDB) Application Server Log Aggregator Application Server Producer Streaming Processing Engine Adapter Node Node Node Client API Node Node In-Memory Data Grid Node Node Node Node Node Mongos Mongos Config mongod mongod mongod mongod mongod mongod mongod mongod Grid File System Distributed Messaging Broker Broker Broker HDFS API JDBC MySQL Cluster Client API MapReduce Consumer Consumer MySQL MySQL HDFS API MapReduce HDFS API Sqoop Apache Hadoop Cluster Namenode Datanode Datanode Datanode Apache Pig Apache Hive ZooKeeper Ganglia ZooKeeper Ganglia ZooKeeper Ganglia ZooKeeper Ganglia DAG Engine PigLatin Metastore HiveQL 1
CEP & Flume & Hadoop Web Server Log Aggregator Decorator Application Server Network Socket HTTP Input Adapter Thrift Input Adapter HTTP Input Adapter Complex Event Processing Engine Event Query Engine [EPL] select user, sys,... from cpu Mail Output Adapter RDMBS Output Adapter HDFS Output Adapter Log Aggregator Decorator Proxy Server Log Collector Log Aggregator Collector Collector Decorator Thrift Collector JDBC HDFS API /LOG/MON/YYYYMMDDD MySQL MySQL Cluster MySQL HDFS API /CEP/MON/YYYYMMDDD Sqoop Apache Hadoop Cluster Namenode Datanode Datanode Datanode Apache Pig Apache Hive ZooKeeper Ganglia ZooKeeper Ganglia ZooKeeper Ganglia ZooKeeper Ga nglia DAG Engine PigLatin Metastore HiveQL 15
Esper CEP l Scale Up 아키텍처기반이벤트탐지 Complex Event Processing Esper Engine HTTP Input Adapter [Query Statement] select user, sys,... from cpu POJO Mail Output Adapter Thrift Input Adapter [Query Statement] select user, sys,... from cpu POJO RDMBS Output Adapter HTTP Input Adapter [Query Statement] select user, sys,... from cpu POJO HDFS Output Adapter Event Query Pattern Language Core Container 16
CEP 의 Length Window 17
CEP 의 Time Window 18
CEP 의 Time Window EPServiceProvider epservice = EPServiceProviderManager.getDefaultProvider(); String expression = "select avg(price) from org.myapp.event.orderevent.win:time(30 sec)"; EPStatement statement = epservice.getepadministrator().createepl(expression); MyListener listener = new MyListener(); statement.addlistener(listener); public class MyListener implements UpdateListener { public void update(eventbean[] newevents, EventBean[] oldevents) { EventBean event = newevents[0]; System.out.println("avg=" + event.get("avg(price)")); } } select account, avg(amount) from Withdrawal.win:time(4 sec) group by account having amount > 1000 19
MapReduce 처리방법 출처 : Manning Hadoop In Practice 20
Apache S4 l Scale Out 아키텍처기반분산스트리밍 S4 distributed stream computing platform Scalable 노드추가시선형적성능향상 Cluster Management ZooKeeper 를이용한 클러스터관리 Extensible 단순한 API 를이용하여애플리케이 션을개발하고배포 Decentralized No Single Point Failure Partial Fault-Tolerance 장비장애시자동으로 Stand-By Server 활성화 Proven Yahoo! 검색운영시스템에적용 21
Near Real-Time Search Index Hadoop MapReduce? Chunk Data 만다루는 Hadoop Unbound Stream 에적합하지않음 22
Apache S4 Processing Element Processing Node Processing Element Processing Element Processing Element Input Eve nt Output Event l Processing Element(PE) Business Logic 이위치하는곳, 이벤트스트림을입력받아처리후출력 l Processing Node 다수의 PE 를배포하고연계하여처리하는노드 23
Apache S4 Processing Element PhoneCallPE 를통과하면새로운형태의 Eve nt POJO 를생성한다. 24
Apache S4 PE Pipeline S4 에서제공하는기능으로 Partitioning 에따라서이벤트 를수신하여로드밸런싱 Processing Node 간부하를분산 각각의이벤트를결합하여 Lucene 인덱스생성 25
Apache S4 Client Adapter External System 2334 6077 5077 Client Adapter S4 TCP UDP 6077 5077 Client Adapter UDP S4-0 S4-1 S4 App S4 App 5078 S4 Application 1 S4 Application 2 26
Apache S4 Twitter Topic Counter 27
Apache S4 Twitter Topic Counter 28
STORM l Highly Distributed Realtime Computation System l Any Programming Language l Scalable l Falut-Tolerant 29
STORM l Scale Out 아키텍처기반분산스트리밍 30
STORM Browser h"p://www.website.com/ Search Web Server Log Aggregator Application Server Log Aggregator Application Server Producer Spout STORM Bolt Bolt Bolt Client API Node Node In-Memory Data Grid Node Node Client API Node Node Node NoSQL Cluster (MongoDB) Mongos mongod mongod mongod mongod Mongos mongod mongod mongod mongod Config Grid File System 31
STORM Spout & Bolt https://github.com/nathanmarz/storm/wiki/tutorial 32
STORM Spout l 데이터를클라이언트에서수집하는역할 public class CountrySpout extends BaseRichSpout { } private Queue<String> feedqueue = new LinkedList<String>(); private SpoutOutputCollector collector; private StormSpoutServer stormspoutserver; public void nexttuple() { String next = feedqueue.poll(); if (next!= null) { collector.emit(new Values(next), next); } } public void open(map conf, TopologyContext context, SpoutOutputCollector collector) { stormspoutserver = new StormSpoutServer(9191, this); stormspoutserver.run(); this.collector = collector; } public void declareoutputfields(outputfieldsdeclarer declarer) { declarer.declare(new Fields("line")); } public Queue<String> getfeedqueue() { return feedqueue; } 33
STORM Bolt l Spout 또는 Bolt 가전달한이벤트스트림을처리 public class CountryCounterBolt extends BaseBasicBolt { Map<String, Integer> counters; @Override public void prepare(map stormconf, TopologyContext context) { this.counters = new HashMap<String, Integer>(); } } @Override public void execute(tuple input, BasicOutputCollector collector) { String str = input.getstring(0); if (!counters.containskey(str)) { counters.put(str, 1); } else { Integer c = counters.get(str) + 1; counters.put(str, c); } } 34
STORM - Topology public static Topology createtopology() { TopologyBuilder builder = new TopologyBuilder(); builder.setspout("tweets-collector", new ApiStreamingSpout(), 1); builder.setbolt("hashtags-splitter", new HashTagsSplitter(), 2).shuffleGrouping("tweets-collector"); builder.setbolt("hashtags-counter", new HashtagsCounterBolt(), 2).fieldsGrouping("hashtags-splitter", new Fields("hashtags")); return builder.createtopology(); } 35
STORM Message Guarantee 36
STORM Transaction 37
Getting Started with Storm 38
CEP 를이용한 Twitter 메시지수집 실시간스트리밍 Twitter Adapter Twitter4J Twitter Adapter Starter 실시간스트리밍 어댑터시작 CEP 엔진초기화 Twitter Status Listener Esper CEP Engine 위치정보실시간전달 이벤트리스너등록 Esper Update Listener 단위시간당위치건수측정 39
Apache Flume 를이용한 Twitter 메시지수집 실시간스트리밍 Twitter Adapter Twitter4J Twitter Adapter Starter 실시간스트리밍 어댑터시작 Twitter Status Listener 로그파일적재 Flume Tail Exec Source Flume Memory Channel Flume HDFS Sink HDFS 적재 40
배운것들 l 시스템구축에대한노하우또는노력이절대적으로필요. l 상대방의경험이중요할수도있지만중요하지않을수있다. l 많은하드웨어가없다면구현이어렵다. l 업무를알아야한다. 기술이모두라면좋겠지만결국은업무를알아야한다. l 업무의특성에따라서, 비기능적특성으로인하여다른오픈소스, 필요하면상용을써야할수도있다. l 끊임없는테스트와프로파일링만이살길. l 모든것을실시간으로할수없다. 41
실습예제템플릿 https://github.com/fharenheit/esper-flume-hands-on-lab 42
JBoss Community (http://www.jboss.org) Korea JBoss User Group (http://cafe.naver.com/jbossug) 43