1 동시성런타임을이용한 Actor- Based Programming Asynchronous Agents Library 김용현 (drvoss@gmail.com) http://
2 병렬성 vs 동시성 시분할 시스템에서 선점형 멀티태스킹 두 개 프로세서 코어에서의 동시 작업
3 언어적차원에서의병렬성지원 X 0041594E cmp dword ptr [CSingleton::instance_ (429B8Ch)],0 // compare 00415955 jne CSingleton::instance+99h (4159C9h) // jump not equal // Enter Critical Section 0041594E cmp dword ptr [CSingleton::instance_ (429B8Ch)],0 // compare 00415955 jne CSingleton::instance+99h (4159C9h) // jump not equal // Enter Critical Section
4 라이브러리 차원에서 지원 나는 동시성을 라이브러리 차원에서 지원하는 것 이 최선이라고 생각한다. 그리고, 그런 라이브러리 를 대대적인 언어 확장 없이도 구현할 수 있다고 생 각한다.
5 병렬프로그래밍 언어적차원에서지원 ErLang 과같은병열프로그래밍이지원되는언어 컴파일러차원에서지원 OpenMP 라이브러리차원에서지원 ConRT(PPL, AAL ), C++ AMP, Parallel Building Block(TBB, ArBB )
6 동시성런타임 (Concurrency Runtime) Microsoft 의병렬프로그래밍구현체 고도로추상화된개발자의문제해결방법에바로적용할수있도록프로그래밍모델제공 Windows 에최적화, Visual Studio 에최적화 ETW(Event Tracing for Windows) Visual Studio Parallel Debugger Visual Studio Parallel Performance Analyzer 동시성과효율적인병렬성을모두가지고있음 템플릿클래스와상수들, 그리고 CRT 구현체
7 요구사항 Windows XP sp3 이상 vista sp2, 2008 r2, windows 7 Visual Studio 2010 에포함된컴파일러이상 MSVCRXXX.DLL 필요 Visual Studio 2010 부터는 WinSxS 에정보가기록되지않고 system 폴더에기록됨 32 비트보다는 64 비트, 그리고최신운영체제가성능이좋음 Concurrency::unsupported_os 클래스를통해지원하는운영체제인지파악할수있음.
8 응용프로그램및라이브러리 프로그래밍모델 에이전트라이브러리 병렬패턴라이브러리 태스크스케줄러 동시성런타임 리소스관리자 운영체제
9 태스크스케줄러 & 리소스관리자 태스크스케줄러 태스크실행과순서등을관리 기능제한, 방식선택등정책변경을위한접근가능 확장성, 로드밸런싱, 협조적태스크스케줄링, 태스크블로킹등을백그라운드에서처리 리소스관리자 운영체제에종속된자원을추상화시킴 머신의장치를가상화시켜가지고있음 런타임동작에필요한자원을관리함
10 동시성런타임특징 Cooperative Task Scheduling Work-stealing Algorithm 한스레드에서작업이마무리되면다른스레드의남은작업을가지고와수행하여부하분산 Cooperative Blocking 공유리소스가필요한작업으로인한대기시간최소화 UMS 커널동기화매커니즘을피할수있음 할당된퀀텀을최대한이용할수있음 확장성, 로드밸런싱
11 올바른동시성런타임의사용 1. 올바른테스크정의 long term task X, short term task X 2. 에이전트라이브러리와병렬패턴라이브러리의올바른이해단순히 for 문을모두 parallel_for 로변경 X 동시성런타임이해를바탕으로비즈니스로직반영 3. 영향을주는콤포넌트는모두동시성런타임네임스페이스의자원을사용 ::Sleep() 과 Concurrency::wait() 등혼용사용 X
12 Parallel Pattern Library 비즈니스로직의병렬성을위한편리성및확장성, 이식성이높은명령형프로그래밍모델제공 알고리즘 비즈니스로직에바로반영할수있는추상화 for, for_each 병렬화리소스 동시성런타임의콤포넌트들과연동 combinable, critical_section, event 컨테이너 STL 에서사용하던습관그대로유추하여사용가능
13 for(int i = 0; i < size; ++i) { for(int j = 0; j < size; ++j) { double temp = 0; for(int k = 0; k < size; ++k) { temp += m1[i][k] * m2[k][j]; } } } for(int i = 0; i < size; ++i) { parallel_for(0,size,1,[&]int j) { { double temp = 0; for(int k = 0; k < size; ++k) { temp += m1[i][k] * m2[k][j]; } }); } array<int, 5> values = { 1, 2, 3, 4, 5 }; parallel_for(0u, values.size(), [&values] (size_t i) { values[i] *= 2; }); parallel_for_each (values.begin(), values.end(), [] (int& value) { value *= 2; });
14 Parallel Pattern Library parallel_for parallel_for_each parallel_invoke parallel_buffered_sort parallel_radixsort parallel_partitioners parallel_reduce parallel_transform parallel_sort parallel_for_fixed parallel_foreach_fixed parallel_merge parallel_accumulate parallel_accumulate_fixed parallel_partial_sum parallel_partial_sum_fixed parallel_all_of parallel_any_of parallel_none_of concurrent_vector concurrent_queue concurrent_unordered_map concurrent_unordered_multimap concurrent_unordered_set concurrent_unordered_multiset task_group reader_writer enter_critical event 그외의병렬화자원및동기화데이터구조
15 Asynchronous Agent Library 메시지전달방식을통하여에인전트의작업을격리시킴으로써동시성애플리케이션의안정성을향상시키는모델을제공 비동기에이전트 (Asynchronous Agents) 자신의상태를가지고태스크를처리하는객체 메시지전달함수 (Message Passing Function) 서로다른스레드간메시지전송함수 비동기메시지블록 (Asynchronous Message Blocks) 메시지통신에서의출발지와목적지
16 AAL 의동시성네임스페이스 비동기에이전트 Concurrency::agent 메시지전달함수 Concurrency::send() Concurrency::asend() Concurrency::receive() Concurrency::try_receive() 비동기메시지블록 Concurrency::unbounded_buffer<T> Concurrency::overwrite_buffer<T> Concurrency::single_assignment<T> Concurrency::transformer<InputT, OutputT> Concurrency::call<T> Concurrency::timer<T> Concurrency::choice<T1, T2 > Concurrency::join<T> Concurrency::multitype_join<T1, T2 >
17 비동기에이전트 Concurrency::agent 백그라운드로동작하는스케줄러에따라병렬적으로수행될태스크를정의하고상태를가지고관리하는객체 일반적으로 agent 클래스를상속받은객체 #include <agents.h> 순수가상함수 void Concurrency::agent::run() 스레드프로시저와같은역할 스케줄링에따라 run() 메소드가호출됨 인스턴스자신의실행상태값을가지고있음 HANDLE vs WCF object 클래스내부와외부에서상태값조회가능
18 #include <agents.h> class FibAgent : public Concurrency::agent { public: explicit FibAgent(int n) : num_(n) {} virtual ~FibAgent() {} protected: void run() { Fibonacci(num_); this->done(); } private: unsigned int num_; }; int main() { FibAgent fib(35); fib.start(); FibAgent::wait(&fib); } return 0;
19 Demo Concurrent::agent 를상속한에이전트만들기
20 Concurrency::agent 에이전트는자신의상태를유지 메시지블록과메시지전달함수를사용하여통신 동기화메커니즘사용 X 논리적으로일관된순서 PPL, OpenMP 와같은다른동기화매커니즘이나병렬프로그래밍모델과혼용사용가능 Chromium 과같은오픈소스에서흔히볼수있는 ThreadManager 와같은역할을함.
21 Concurrency::agent 상태 enum Concurrency::agent_status { agent_created // agent가만들어지고시작되지않은상태, agent_runnable // agent가시작되었지만실행되지않은상태, agent_started // agent가시작됨, agent_done // agent가완료됨, agent_canceled // agent가취소됨 };
22 상태에따른 agent 초기상황, 스케쥴링상황, 종료상황 사용자는생성, 스케쥴링상태진입허용, 취소, 종료상태로전환시킬수있다. 한번 started 상태로진입되면취소할수없다. agent:done 함수호출은실제의 run 함수가리턴되고, agent::wait 등의대기함수들이리턴되게한다. 취소된 agent 는시작될수없다.
23 Concurrency::agent public Concurrency::agent::cancel() Concurrency::agent::start() Concurrency::agent::status() static Concurrency::agent::wait() static Concurrency::agent::wait_for_all() static Concurrency::agent::wait_for_one() protected Concurrency::agent::done() Concurrency::agent::run()
24 메시지전달함수 함수형프로그래밍에서함수나 SendMessage API 동기화매커니즘을사용하지않고병렬프로그래밍에서자료를전달할수있음 비즈니스로직에만집중하여개발할수있음 데이터전송이간단해지고, 애플리케이션의구조가깔끔해짐 프로젝트의크기가커져도성능및디자인을보장받을수있으며기능이명백함
25 메시지전달함수 Concurrency::send(ITarget<T>*, T const data) 동기적으로 ITarget<T> 에 data를전달 SendMessage(Target, data) ITarget에서메시지를수락시 true, 거절시 false Concurrency::asend(ITarget<T>*, T const data) 비동기적으로 ITarget<T> 에 data 전달 PostMessage(Target, data) asend 함수리턴전까지데이터가들어가고, ITarget에서수락시 true Concurrency::receive(ISource<T>*, timeout) 동기적으로 ISource<T> 에서 data를가져옴. GetMessage() 뭔가들어올때까지 return 안됨 Concurrency::try_receive(ISource<T>*, timeout) 비동기적으로 ISource<T> 에서 data를가져옴. PeekMessage()
26 메시지전달함수인자 Concurrency::ISource<T> 메시지를보내는단말점인터페이스 Concurrency::ITarget<T> 메시지를받는단말점인터페이스 Concurrency::ISource<T> Concurrency::ITarget<T> Concrete Message Block T data Concrete Message Block
27 비동기메시지블럭 ISource<T>, ITarget<T> 혹은두개를모두상속받아맴버를구현한객체를의미 메시지전달함수를통하여전송된메시지를보관및처리함 기본적으로메시지를거절하거나받아들일수있는조건을지정할수있음
28 Concurrency::single_assignment ISource<T> 와 ITarget<T> 를모두상속받음 첫번째입력된값이유지됨 Concurrency::send<T>(singleAssignment, data) 전송실패시 false Concurrency::single_assignment::value() 값이들어오지않았으면블록 Concurrency::receive<T>(singleAssignment) 값이들어오지않았으면블록 함수가성공해도값이제거되지않음 Concurrency::single_assignment::has_value()
29 Concurrency::overwrite_buffer ISource<T> 와 ITarget<T> 를모두상속받음 ITarget<T> 를상속받았기때문에 send 의인자로들어갈수있고, ISource<T> 를상속받았기때문에 receive 의인자로들어갈수있음 첫값이유지되는 single_assignment 와달리마지막입력된값으로대체됨 그외특징은 single_assignment 와유사
30 Concurrency::bounded_buffer ISource<T> 와 ITarget<T> 를모두상속받음 내부적으로큐를유지하고있고입출력값또한큐의순서를따름 사용할수있는메소드는큐에입출력하는두개의메소드 Concurrency:: bounded_buffer:: enqueue() Concurrency:: bounded_buffer:: dequeue() receive 함수로값을얻어올경우 dequeue 됨
31 Demo Message Blocks
32 Concurrency::call ISource<T> 만을상속받음 call 구현클래스로작업을던질수만있고, 받는용도사용 X 일반적인스레드와유사하게사용가능 단작업큐를가지고있어작업순서가선형적 Concurrency::send<T>(callOperation, data) call 에정의된작업이끝날때까지블럭 Concurrency::call<AGENT_PARAM> * operationcall = new Concurrency::call<AGENT_PARAM>([](AGENT_PARAM p) { CompressFile(p.src.c_str(), p.target.c_str()); *p.finish = true; }); for(auto iter = l_.begin(); iter!= l_.end(); ++iter) { AGENT_PARAM param(iter->src, iter->target, finish + (i++)); asend(operationcall, param); }
33 Concurrency::transformer ISource<T> 와 ITarget<T> 를모두상속받음 작업을던지고결과값을받을수있음 ITarget<T> 의경우생성자를통한외부객체사용 O call 과는달리 receive 로받아올수있음 던진작업이대기하는대기큐와작업결과의대기큐가추상화됨 Concurrency::transformer::link_target() 타겟 ITarget<T> 클래스를대체할수있음 생성자의 ITarget<T> 을대체함 ITarget<T> 를상속받은모든클래스대입가능
34 Demo Concurrency::Transformer
35 Concurrency::choice ITarget<T> 를상속했다면이질적메시지블록이라도 2-10 개를동시에감시 먼저사용할수있는메시지블록의인덱스반환 make_choice 로템플릿함수를이용해튜플관리 choice 클래스는 ISource<T> 를상속받았기때문에 receive 함수로대기가능 Concurrency::single_assignment<int> mb1; Concurrency::overwrite_buffer<int> mb2; Concurrency::unbounded_buffer<double> mb3; auto mbtuple = make_choice(&mb1, &mb2, &mb3); // size_t idx = Concurrency::receive(mbTuple);
36 Concurrency::join 2-10 개의이질적인메시지블록을감시하되, 모든메시지블록에값이들어오면 receive 리턴 Concurrency::single_assignment<int> mb1; Concurrency::overwrite_buffer<int> mb2; Concurrency::unbounded_buffer<double> mb3; auto mbtuple = make_join(&mb1, &mb2, &mb3); // auto result = Concurrency::receive(mbTuple); std::tr1::get<0>(result); std::tr1::get<1>(result); std::tr1::get<2>(result);
37 Concurrency::timer ITarget<T> 만상속받음 생성자에지정되는밀리세컨단위마다연결된 ITarget<T> 메시지블록에메시지를전달 call<wchar_t> MessageBlock([](int i) { //... }); Concurrency::timer<wchar_t> tm (1000, i++, &MessageBlock, true); 타이머를제어할수있는 public 메소드제공 Concurrency::timer::start() Concurrency::timer::stop() Concurrency::timer::pause()
38 좋은디자인 메모리를공유사용을줄인다. 메세지블록에서값을얻어올때, 데드락이걸리는상황을주의한다. receive(),.value() agent 의상태를얻어온후상태값을확인할타이밍에는상태가변했을수있음을명심한다. 레이스컨디션을막기위하여상태의감시와대기를할때, 풀링보다는 transformer 나세마포와같은동기화매너니즘을사용
39 피해야할디자인 너무작은메시지 메시지를관리하고처리하는비용은공짜점심이아님 너무큰메시지 함수형언어에서처럼전달되는메시지의크기를적정한수준으로맞추는것이성능향상에도움이됨. 메시지의검증 전달받은메시지데이터가포인터라면접근하기전검증하는방어코딩을한다. 낮은수준에서스레드로운영되는부분의리소스할당주의 COM 의생성 / 소멸 재사용할수있는리소스의생성 / 삭제의반복 TLS 등주의
40 Reference MSDN :: 비동기에에진트라이브러리 - http://msdn.microsoft.com/kokr/library/dd492627.aspx Parallel Patterns Library, Asynchronous agents library, & Concurrency Runtime Bill Messmer MSDN 매거진 :: Actor-Based Programming with the Asynchronous Agents Library Michael Chu and Krishnan Varadarajan