2015 yuantl Ch 11-2. Multi-Thread 2015 년 6 월 5 일 교수김영탁 영남대학교공과대학정보통신공학과 (Tel : +82-53-810-2497; Fax : +82-53-810-4742 http://antl.yu.ac.kr/; E-mail : ytkim@yu.ac.kr)
Outline 프로세스와스레드 Task 수행이병렬로처리되어야하는경우 왜다중프로세스 / 다중스레드가필요한가? 다중프로세스와다중스레드의차이점 Windows 환경에서의다중스레드생성, 초기화및종료 다중스레드로의파라메터전달 큐를이용한다중스레드간의데이터전달 Critical Section을사용한다중스레드간의동기화 turn semaphore를사용한실행순서관리 간단한다중스레드의예제 ch11-2 - 2
프로세스 (Process) Process 프로세스 (process) 란프로그램이수행중인상태 (program in execution) 각프로세스마다개별적으로메모리가할당됨 (text core, initialized data (BSS), noninitialized data, heap (dynamically allocated memory), stack) 일반적인 PC 나대부분의컴퓨터환경에서하나의물리적인 CPU 상에다수의프로세스가실행되는 Multi-tasking 이지원되며, 운영체제가다수의프로세스를일정시간마다실행기회를가지게하는테스크스케쥴링을지원 하나의프로세스가실행을중단하고, 다른프로세스가실행될수있게하는것을컨텍스트스위칭 (Context switching ) 이라하며, 운영체제의 process scheduling & switching가이를담당함 하나의물리적인 CPU가사용되는시스템에서는임의의순간에는하나의프로세스만실행되나, 일정시간 ( 예 : 100ms) 마다프로세스가교체되며실행되기때문에전체적으로다수의프로그램들이동시에실행되는것과같이보이게됨 ch11-2 - 3
스레드 (Thread) 스레드 (Thread) 스레드는하나의프로세스내부에포함되는함수들이동시에실행될수있게한작은단위프로세서 (lightweight process) 기본적으로 CPU를사용하게하는기본단위 하나의프로세스에포함된다수의스레드들은프로세스의메모리자원들 (code section, data section, Heap 등 ) 과운영체제의자원들 ( 예 : 파일입출력등 ) 을공유함 ch11-2 - 4
프로세스 (Process) 와스레드 (Thread) 의차이점 스레드 (Thread) 란? 어떠한프로그램내에서, 특히프로세스 (process) 내에서실행되는흐름의단위. 일반적으로한프로그램은하나의 thread를가지고있지만, 프로그램환경에따라둘이상의 thread를동시에실행할수있다. 이를멀티스레드 (multithread) 라한다. 프로세스는각각개별적인 code, data, file을가지나, 스레드는자신들이포함된프로세스의 code, data, file들을공유함 ch11-2 - 5
Task 수행이병렬로처리되어야하는경우 양방향동시전송이지원되는멀티미디어정보통신응용프로그램 (application) full-duplex 실시간전화서비스 : 상대편의음성정보를수신하면서, 동시에나의음성정보를전송하여야함 음성정보의입력과출력이동시에처리될수있어야함 영상정보의입력과출력이동시에처리될수있어야함 ch11-2 - 6
실시간화상전화기의기능블럭도 User Input & Output MIC/Camera USER A (Client Mode) Speaker/Monitor Input Output User Input & Output MIC/Camera USER B (Server Mode) Speaker/Monitor Input Output MIC Camera Speaker Monitor MIC Camera Speaker Monitor ReadAudioThread() - ReadStream() - Create RTP Packet -Set RTP Packet ReadVideoThread() -CvCapture() - Create RTP Packet -Set RTP Packet streamapp WriteAudioThread() - WriteStream() - Set Sync Time Shared SyncTime WriteVideoThread() -ShowImage() - Read Sync Time ReadAudioThread() - ReadStream() - Create RTP Packet -Set RTP Packet ReadVideoThread() -CvCapture() - Create RTP Packet -Set RTP Packet streamapp WriteAudioThread() - WriteStream() -Set Sync Time Shared SyncTime WriteVideoThread() -ShowImage() - Read Sync Time Audio Buffer Video Buffer Audio Buffer Video Buffer Audio Buffer Video Buffer Audio Buffer Video Buffer SendThread() - Compare Timestamp video/audio ReceiveThread() - Identify video/audio SendThread() - Compare Timestamp video/audio ReceiveThread() - Identify video/audio - Sendto() - Recvfrom() - Sendto() - Recvfrom() RTP Datagram UDP Socket sendto() recvfrom() Tx Rx RTP Datagram UDP Socket sendto() recvfrom() Tx Rx ch11-2 - 7
양방향동시전송 (full-duplex) Twitter User A (Client mode: Account User or Follower) Input Message using MFC GUI Simple Twitter Client Display received Message using MFC GUI User B (Server mode) Simple Twitter Server Receive Message, Classify Message, and multi-cast or uni-cast message StreamApp StreamApp SendMsg() - create AppPDU - calculate checksum - enqueue() TxQueue TCB wnd, rwnd,cwnd, accmack,elps_t, SeqNo, ThreadSendPDU() - dequeue() -SendTo() - delete Acked PDUs - retransmit time-out PDUs Thread Timer ACK Flow & Error Control ThreadReassemblePDU() -dequeue() -displaymsg() ThreadRecvPDU() -RecvFrom() - verify checksum - enqueue() RxQueue SendMsg() - create AppPDU - calculate checksum - enqueue() TxQueue TCB wnd, rwnd,cwnd, accmack,elps_t, SeqNo, ThreadSendPDU() - dequeue() -SendTo() - delete Acked PDUs - retransmit time-out PDUs Thread Timer ACK Flow & Error Control ThreadReassemblePDU() - dequeue() - displaymsg() ThreadRecvPDU() -RecvFrom() - verify checksum - enqueue() RxQueue Datagram UDP Socket Datagram UDP Socket SendTo() RecvFrom() SendTo() RecvFrom() Tx Rx Tx Rx ch11-2 - 8
스레드함수의구현 스레드의생성및초기화 프로그램에포함되는함수중, 병렬로실행되어야하는함수를스레드로지정 파라메터구조체포인터를통하여, 스레드생성및실행에관련된정보를 main() 함수로부터전달받으며, 파라메터구조체는필요에따라정의 스레드는보통지정된회수만큼실행을하거나, 무한루프로실행함 스레드함수의예 DWORD WINAPI Thread_producer(LPVOID pparam) ThreadParam *pthrparam; /*void* 자료형으로스레드에전달된인자를형변환을통해 pthrparam 구조체로변환 */ pthrparam = (ThreadParam *)pparam;.... while (1) ch11-2 - 9
스레드함수로의파라메터전달 스레드파라메터전달을위한구조체정의 ( 예 ) 필요에따라파라메터항목들을포함하는구조체정의 기본적으로 Critical section에관련된정보, 공유되는큐의정보, 파일입출력에관련된정보를포함 typedef struct ThreadParam Circular_Int_Queue *queue; CRITICAL_SECTION* pcs; int role; ThreadParam; typedef struct ThreadParam CRITICAL_SECTION *pcs; Queue *pq; ROLE role; // unsigned int addr; int max_queue; int duration; FILE *fout; ThreadParam; ch11-2 - 10
스레드간의정보전달 큐를사용한정보전달 스레드간에정보 / 메시지 / 신호를전달하기위하여 FIFO 동작을수행하는 queue를사용 Queue의 end에정보를추가하는 enqueue() Queue의 front에있는정보를추출하는 dequeue() Queue는다수의스레드가공유하는자원 (shared resource) 이며, 임계구역 (critical section) 으로보호되어야함 PacketGen() 1 Queue (shared resource, critical section) PacketGen() 2 linktx() PacketGen() 3 ch11-2 - 11
Critical Section (1) Critical section 다중스레드사용을지원하는운영체제는프로그램실행중에스레드또는프로세스간에교체가일어날수있게하여, 다수의스레드 / 프로세스가병렬로처리될수있도록관리 Context switching이일어나면, 현재실행중이던스레드 / 프로세스의중간상태가임시저장되고, 다른스레드 / 프로세스가실행됨 프로그램실행중에특정구역은실행이종료될때까지스레드 / 프로세서교체가일어나지않도록관리하여야하는경우가있음 아래의스레드예에서 critical section으로보호하여야할구역은? Thread_Deposit (int deposit) // account is shared variable cur_account = account; cur_account = cur_account + deposit; account = cur_account; print(account);.... 은행잔고 account Thread_Withdraw (int withdraw) // account is shared variable cur_account = account; cur_account = cur_account - withdraw; account = cur_account; print(account);.... ch11-2 - 12
Critical Section (2) Critical section 를현재어떤스레드 / 프로세스가실행중에있다는상태를 Critical section 을표시하는변수로표시 semaphore 라고부르기도함 Critical section 을표시하는변수의설정 InitializeCriticalSection(LPCRITICAL_SECTION lpcriticalsection) initialization of critical section must be called before using EnterCriticalSection(), LeaveCriticalSection() Critical section 영역지정 EnterCriticalSection(LPCRITICAL_SECTION lpcriticalsection) LeaveCriticalSection(LPCRITICAL_SECTION lpcriticalsection) the procedures between these two functions are defined as critical section Critical section 을표시하는변수의소멸 DeleteCriticalSection(LPCRITICAL_SECTION lplcriticalsection) delete the critical section flag executed at the process exit ch11-2 - 13
스레드의종료 (1) MS-Windows 환경에서의스레드소멸및관리를위한함수들 생성된스레드가스스로함수실행을종료할때까지기다리거나, 지정된시간이후에강제종료를시킬수있음 WaitForSingleObject() 와 TerminateThread() 함수를사용 HANDLE m_hthread; m_hthread = CreateThread(NULL, 0, Thread_Function_Name, pthrdparam, 0, NULL);.... // 생성된스레드가별도로실행이됨 DWORD nexitcode = NULL; GetExitCodeThread(m_hThread, &nexitcode); WaitForSingleObject(m_hthread, THRD_EXE_TIME_MS); // wait for terminate thread TerminateThread(m_hThread, nexitcode); CloseHandle(m_hThread); ch11-2 - 14
스레드의종료 (2) 스레드가스스로종료할때까지기다리는경우 main() 함수에서는무한대시간 (INFINITIVE) 으로기다림 WaitForSingleObject(m_hthread, INFINITIVE); // wait for terminate thread TerminateThread(m_hThread, nexitcode); 스레드를일정시간동안실행하게한후, 강제종료를시키는경우 #define THRD_EXE_TIME_MS 5000 // mili-second 단위의시간 WaitForSingleObject(m_hthread, THRD_EXE_TIME_MS); // wait for terminate thread TerminateThread(m_hThread, nexitcode); ch11-2 - 15
Two Simple Threads version 1 thread_main() create thread_a() create thread_b() print thread_m :: MMMM thread_a() print thread_a :: AAAA thread_b() print thread_b :: BBBB ch11-2 - 16
/* SimpleThreadsVer1.cpp (1) */ #include<stdio.h> #include<windows.h> #include<time.h> enum ROLE PRODUCER, CONSUMER ; typedef struct ThreadParam char mark; ThreadParam; DWORD WINAPI Thread_A(LPVOID pparam); DWORD WINAPI Thread_B(LPVOID pparam); void main() /* 변수선언 */ ThreadParam *pthrparam; /* 각스레드로전달될파라미터구조체 */ HANDLE hthread_a, hthread_b; /* 스레드정보를관리하게될핸들러변수 */ DWORD nexitcode = NULL; /* thread_a 에전달될파라미터값초기화 */ pthrparam = (ThreadParam*)malloc(sizeof(ThreadParam)); pthrparam->mark = 'A'; /* CreateThread API 를이용하여 Thread 생성과전달할인자를전달한후반환되어지는해당 Thread 에대한정보를 hthread_a 핸들러에저장 */ hthread_a = CreateThread(NULL, 0, Thread_A, pthrparam, 0, NULL); ch11-2 - 17
/* SimpleThreadsVer1.cpp (2) */ /* thread_b 에전달될파라미터값초기화 */ pthrparam = (ThreadParam*)malloc(sizeof(ThreadParam)); pthrparam->mark = B'; hthread_b = CreateThread(NULL, 0, Thread_B, pthrparam, 0, NULL); /* main() thread 실행 */ char mark = M ; for (int i = 0; i < 100; i++) printf("thread_main... "); for (int j = 0; j < 50; j++) printf("%c", mark); printf(" n"); /* main 스레드가생성한 thread_a 가스스로종료할때까지대기 */ WaitForSingleObject(hThread_A, INFINITE); GetExitCodeThread(hThread_A, &nexitcode); TerminateThread(hThread_A, nexitcode); /* thread_a 종료 */ CloseHandle(hThread_A); /* 스레드핸들러종료 */ /* main 스레드가생성한 thread_b 가스스로종료할때까지대기 */ WaitForSingleObject(hThread_B, INFINITE); GetExitCodeThread(hThread_B, &nexitcode); TerminateThread(hThread_B, nexitcode); /* thread _B 종료 */ CloseHandle(hThread_B); /* 스레드핸들러종료 */ ch11-2 - 18
/* SimpleThreadsVer1.cpp (3) */ DWORD WINAPI Thread_A(LPVOID pparam) ThreadParam *pthrparam = (ThreadParam *)pparam; int data = 0; int sleep_time_ms = 0; char mark = pthrparam->mark; for (int i = 0; i < 100; i++) printf("thread_a... "); for (int j = 0; j < 50; j++) printf("%c", mark); printf(" n"); return 0; /* SimpleThreadsVer1.cpp (4) */ DWORD WINAPI Thread_B(LPVOID pparam) ThreadParam *pthrparam = (ThreadParam *)pparam; char mark = pthrparam->mark; for (int i = 0; i < 100; i++) printf("thread_b... "); for (int j = 0; j < 50; j++) printf("%c", mark); printf(" n"); return 0; ch11-2 - 19
실행결과 thread_a, thread_b, thread_main 이일정시간마다번갈아가며실행 한줄이다출력되기전에 thread간의교체가발생되어, 출력이섞임 만약, 한번에 thread 하나가한줄씩만출력하고, 번갈아가며출력하기위해서는어떻게수정하여야하나? critical section 사용 : 한줄을다출력할때까지, 다른스레드가실행되지못하도록보호 turn semaphore 사용 : 한줄을다출력한후에는반드시다른스레드가실행되도록실행순서 (turn) 제어 ch11-2 - 20
Two Simple Threads version 2 thread_main() create thread_a() create thread_b() CRITICAL_SECTION crit EnterCriticalSection() print thread_m :: MMMM LeaveCriticalSection() thread_a() EnterCriticalSection() print thread_a :: AAAA LeaveCriticalSection() thread_b() EnterCriticalSection() print thread_b :: BBBB LeaveCriticalSection() ch11-2 - 21
/* SimpleThreadsVer2.cpp (1) */ #include<stdio.h> #include<windows.h> #include<time.h> enum ROLE PRODUCER, CONSUMER ; typedef struct ThreadParam CRITICAL_SECTION* pcs; char mark; ThreadParam; DWORD WINAPI Thread_A(LPVOID pparam); DWORD WINAPI Thread_B(LPVOID pparam); void main() /* 변수선언 */ CRITICAL_SECTION crit; ThreadParam *pthrparam; HANDLE hthread_a, hthread_b; DWORD nexitcode = NULL; /* 변수초기화 */ InitializeCriticalSection(&crit); /* SimpleThreadsVer2.cpp (2) */ /* Consumer 스레드에전달될파라미터값초기화 */ pthrparam = (ThreadParam*) malloc(sizeof(threadparam)); pthrparam->pcs = &crit; pthrparam->mark = 'A'; hthread_a = CreateThread(NULL, 0, Thread_A, pthrparam, 0, NULL); /* Producer 스레드에전달될파라미터값초기화 */ pthrparam = (ThreadParam*) malloc(sizeof(threadparam)); pthrparam->pcs = &crit; pthrparam->mark = 'B'; hthread_b = CreateThread(NULL, 0, Thread_B, pthrparam, 0, NULL); WaitForSingleObject(hThread_A, INFINITE); GetExitCodeThread(hThread_A, &nexitcode); TerminateThread(hThread_A, nexitcode); CloseHandle(hThread_A); WaitForSingleObject(hThread_B, INFINITE); GetExitCodeThread(hThread_B, &nexitcode); TerminateThread(hThread_B, nexitcode); CloseHandle(hThread_B); DeleteCriticalSection(&crit); ch11-2 - 22
/* SimpleThreadsVer2.cpp (3) */ DWORD WINAPI Thread_A(LPVOID pparam) ThreadParam *pthrparam; pthrparam = (ThreadParam *)pparam; char turn = ' 0'; char mark = pthrparam->mark; for (int i = 0; i < 20; i++) EnterCriticalSection(pThrParam->pCS); printf("thread_a... "); for (int j = 0; j < 50; j++) printf("%c", mark); printf(" n"); LeaveCriticalSection(pThrParam->pCS); Sleep(100); printf("thread_a finished... n"); return 0; /* SimpleThreadsVer2.cpp (4) */ DWORD WINAPI Thread_B(LPVOID pparam) ThreadParam *pthrparam; pthrparam = (ThreadParam *)pparam; char mark = pthrparam->mark; char turn; for (int i = 0; i < 20; i++) EnterCriticalSection(pThrParam->pCS); printf("thread_b... "); for (int j = 0; j < 50; j++) printf("%c", mark); printf(" n"); LeaveCriticalSection(pThrParam->pCS); Sleep(100); printf("thread_b finished... n"); return 0; ch11-2 - 23
실행결과 (ver 2) critical section을사용하여, 하나의스레드가한줄의출력을완전히완료할때까지다른스레드가실행되지못하게보호 thread_a 또는 thread_b가두줄씩출력하는경우가있음 각스레드가한번에한줄씩만출력하게하는방법은? ch11-2 - 24
Two Simple Threads version 3 thread_main() create thread_a(); create thread_b(); CRITICAL_SECTION crit char turn = A ; wait until (turn == M ) EnterCriticalSection() print thread_m :: MMMM turn = A ; LeaveCriticalSection() thread_a() wait until (*pturn == A ) EnterCriticalSection() print thread_a :: AAAA *pturn = B ; LeaveCriticalSection() thread_b() wait until (*pturn == B ) EnterCriticalSection() print thread_b :: BBBB *pturn = M ; LeaveCriticalSection() ch11-2 - 25
/* SimpleThreadsVer3.cpp (1) */ #include<stdio.h> #include<windows.h> #include<time.h> enum ROLE PRODUCER, CONSUMER ; typedef struct ThreadParam CRITICAL_SECTION* pcs; char mark; char *pturn; ThreadParam; DWORD WINAPI Thread_A(LPVOID pparam); DWORD WINAPI Thread_B(LPVOID pparam); void main() /* 변수선언 */ CRITICAL_SECTION crit; ThreadParam *pthrparam; HANDLE hthread_a, hthread_b; DWORD nexitcode = NULL; char turn = 'A'; /* 변수초기화 */ InitializeCriticalSection(&crit); /* SimpleThreadsVer3.cpp (2) */ /* Consumer 스레드에전달될파라미터값초기화 */ pthrparam = (ThreadParam*) malloc(sizeof(threadparam)); pthrparam->pcs = &crit; pthrparam->mark = 'A'; pthrparam->pturn = &turn; hthread_a = CreateThread(NULL, 0, Thread_A, pthrparam, 0, NULL); /* Producer 스레드에전달될파라미터값초기화 */ pthrparam = (ThreadParam*) malloc(sizeof(threadparam)); pthrparam->pcs = &crit; pthrparam->mark = 'B'; pthrparam->pturn = &turn; hthread_b = CreateThread(NULL, 0, Thread_B, pthrparam, 0, NULL); WaitForSingleObject(hThread_A, INFINITE); GetExitCodeThread(hThread_A, &nexitcode); TerminateThread(hThread_A, nexitcode); CloseHandle(hThread_A); WaitForSingleObject(hThread_B, INFINITE); GetExitCodeThread(hThread_B, &nexitcode); TerminateThread(hThread_B, nexitcode); CloseHandle(hThread_B); DeleteCriticalSection(&crit); ch11-2 - 26
/* SimpleThreadsVer3.cpp (3) */ DWORD WINAPI Thread_A(LPVOID pparam) ThreadParam *pthrparam; pthrparam = (ThreadParam *)pparam; char mark = pthrparam->mark; char turn = ' 0'; for (int i = 0; i < 20; i++) do EnterCriticalSection(pThrParam->pCS); turn = *pthrparam->pturn; LeaveCriticalSection(pThrParam->pCS); if (turn == 'A') break; else Sleep(100); while (turn!= 'A'); EnterCriticalSection(pThrParam->pCS); printf("thread_a... "); for (int j = 0; j < 50; j++) printf("%c", mark); printf(" n"); *pthrparam->pturn = 'B'; LeaveCriticalSection(pThrParam->pCS); Sleep(10); printf("thread_a finished... n"); return 0; ch11-2 - 27 /* SimpleThreadsVer3.cpp (4) */ DWORD WINAPI Thread_B(LPVOID pparam) ThreadParam *pthrparam; pthrparam = (ThreadParam *)pparam; char mark = pthrparam->mark; char turn = 0 ; for (int i = 0; i < 20; i++) do EnterCriticalSection(pThrParam->pCS); turn = *pthrparam->pturn; LeaveCriticalSection(pThrParam->pCS); if (turn == 'B') break; else Sleep(10); while (turn!= 'B'); EnterCriticalSection(pThrParam->pCS); printf("thread_b... "); for (int j = 0; j < 50; j++) printf("%c", mark); printf(" n"); *pthrparam->pturn = 'A'; LeaveCriticalSection(pThrParam->pCS); Sleep(100); printf("thread_b finished... n"); return 0;
실행결과 (ver 3) critical section과함께turn semaphore를사용 하나의스레드가한줄의출력을완료한후, 다른스레드가출력을완료할때까지계속기다리게함 thread_a 또는 thread_b가한번에한줄씩만출력하며, 번갈아가며출력 ch11-2 - 28
Two Simple Threads with shared Queue version 4 shared Queue 사용 thread_main() create thread_a() create thread_b() CRITICAL_SECTION crit Queue queue; thread_a() wait until (*pq is not FULL) EnterCriticalSection() Element e = data[i]; enqueue(pq, e); LeaveCriticalSection() Queue thread_b() wait until (fifoq is not EMPTY) EnterCriticalSection() Element e = dequeue(pq); LeaveCriticalSection() ch11-2 - 29
Shared Queue 의구현방법 (1) Circular Queue array 기반의구현 first_index, last_index 가저장된데이터를가르킴 first_index와 last_index의초기값은 0 index 값은 0 ~ N-1 범위의값을가지며, N-1 이후에는 0으로순환됨 enqueue() pq->data[last_index] = element; pq-> last_index = (pq-> last_index + 1) % N; pq->num_data++; dequeue() element = pq->data[first_index]; pq-> first_index = (pq-> first_index + 1) % N; pq->num_data--; ch11-2 - 30
Shared Queue 의구현방법 (2) List Queue Doubly Linked List 기반의구현 LinkNode, LinkedList 구조체필요 enqueue() 에서는현재의 *plast 뒤에새로운 list node를추가 dequeue() 에서는현재의 *pfirst 노드를읽고, remove Priority Queue Heap priority queue 기반의구현 complete binary tree로구성 insertheap() 에서는 upheap bubbling이수행되며, 항상기준이되는 key 값이가장작은 ( 또는가장큰 ) element가 root에위치하도록관리됨 removemin() 에서는 downheap bubbling이수행되며, 남아있는항목들중기준이되는 key 값이가장작은 ( 또는가장큰 ) element가 root에위치하도록관리됨 ch11-2 - 31
간단한 Producer Consumer 스레드구현예 Thread Producer generate and data randomly, and enqueue() into the shared queue if queue is full, it waits sleeps randomly chosen time repeats the data generation for given execution duration Thread Consumer dequeues a data from the shared queue if queue is empty, it waits sleeps randomly chosen time repeats the data processing for given execution duration Functional Block diagram Critical Section Thread_ Producer enqueue() dequeue() Thread_ Consumer Circular_Queue_int ch11-2 - 32
Queue - Header File /* Queue.h */ #ifndef CIRCULAR_INT_Q_H #define CIRCULAR_INT_Q_H typedef struct Circular_Int_Queue int numdata; int max_q_size; int first_index; int last_index; int *data; Circular_Int_Queue(int maxqsize); // 구조체의초기화생성 Circular_Int_Queue; int enqueue(circular_int_queue* pq, int data); int dequeue(circular_int_queue* pq); int isqueueempty(circular_int_queue* pq); int isqueuefull(circular_int_queue* pq); void printqueue(circular_int_queue* pq); void freequeuebuffer(circular_int_queue* pq); #endif ch11-2 - 33
Queue 멤버함수 /* Queue.c (1) */ #include <stdio.h> #include <stdlib.h> #include"queue.h" Circular_Int_Queue::Circular_Int_Queue(int maxqsize) max_q_size = maxqsize; data = (int *)malloc(sizeof(int)* max_q_size); first_index = last_index = 0; numdata = 0; for (int i = 0; i < max_q_size; i++) data[i] = -1; void freequeuebuffer(circular_int_queue* pq) free(pq->data); ch11-2 - 34
Queue 멤버함수 /* Queue.c (2) */ int enqueue(circular_int_queue* pq, int data) int index = -1; if (isqueuefull(pq)) printf("queue is Full!! n"); return -1; else index = pq->last_index; pq->data[index] = data; pq->last_index = (pq->last_index + 1) % pq->max_q_size; pq->numdata++; return 0; /* Queue.c (3) */ int dequeue(circular_int_queue* pq) int data = 0; int index = -1; if (isqueueempty(pq)) printf("queue is Empty!! n"); return -1; else index = pq->first_index; data = pq->data[index]; pq->data[index] = -1; pq->first_index = (pq->first_index + 1) % pq->max_q_size; pq->numdata--; return data; ch11-2 - 35
/* Queue.c (4) */ int isqueueempty(circular_int_queue* pq) if (pq->numdata == 0) return 1; else return 0; int isqueuefull(circular_int_queue* pq) if (pq->numdata == pq->max_q_size) return 1; else return 0; void printqueue(circular_int_queue* pq) int count = 0; printf(" Current queue (numdata: %2d) [", pq->numdata); for (int i = 0; i<pq->numdata; i++) printf(" %2d", pq->data[(pq->first_index + i) % pq->max_q_size]); if (i < (pq->numdata - 1)) printf(", "); printf("] n"); ch11-2 - 36
main(), Threads /* Multi-thread.c (1) */ #include<stdio.h> #include<windows.h> #include<time.h> #include"queue.h" #define MAX_QUEUE_SIZE 10 #define THREAD_EXECUTION_TIME 10000 // 10 sec enum ROLE PRODUCER, CONSUMER ; typedef struct ThreadParam Circular_Int_Queue *queue; CRITICAL_SECTION* pcs; int role; ThreadParam; DWORD WINAPI Thread_producer(LPVOID pparam); DWORD WINAPI Thread_consumer(LPVOID pparam); ch11-2 - 37
/* Multi-thread.c (2) */ void main() Circular_Int_Queue circular_intq(max_queue_size); // Queue 생성및초기화 /* 임계구역을나누어스레드간의공유자원사용을관리하게될 CriticalSection 변수 */ CRITICAL_SECTION crit; ThreadParam *pthrparam; /* 각스레드로전달될파라미터구조체 */ /* 스레드정보를관리하게될핸들러변수 */ HANDLE hthreadproducer, hthreadconsumer; DWORD nexitcode = NULL; Circular_Int_Queue *pq = &circular_intq; /* 변수초기화 */ InitializeCriticalSection(&crit); /* 스레드에서사용될 CriticalSection 변수초기화 */ /*Consumer 스레드에전달될파라미터값초기화 */ pthrparam = (ThreadParam*)malloc(sizeof(ThreadParam)); pthrparam->pcs = &crit; pthrparam->queue = pq; pthrparam->role = CONSUMER; /*CreateThread API 를이용하여 Thread 생성과전달할인자를전달한후반환되어지는해당 Thread 에대한정보를 hthreadconsumer 핸들러에저장 */ hthreadconsumer = CreateThread(NULL, 0, Thread_consumer, pthrparam, 0, NULL); printf("thread consumer has been created and instantitated.. n"); ch11-2 - 38
/* Multi-thread.c (3) */ /*Producer 스레드에전달될파라미터값초기화 */ pthrparam = (ThreadParam*)malloc(sizeof(ThreadParam)); pthrparam->pcs = &crit; pthrparam->queue = pq; pthrparam->role = PRODUCER; /*CreateThread API 를이용하여 Thread 생성과전달할인자를전달한후반환되어지는해당 Thread 에대한정보를 hthreadproducer 핸들러에저장 */ hthreadproducer = CreateThread(NULL, 0, Thread_producer, pthrparam, 0, NULL); printf("thread producer has been created and instantitated.. n"); /*main 스레드가생성한 Producer 스레드가끝날때까지대기 */ printf("waiting for the termination of thread producer... n"); WaitForSingleObject(hThreadProducer, THREAD_EXECUTION_TIME); GetExitCodeThread(hThreadProducer, &nexitcode); TerminateThread(hThreadProducer, nexitcode); /*Producer 스레드종료 */ CloseHandle(hThreadProducer); /* 스레드핸들러종료 */ printf("thread producer is now terminated.. n"); /*main 스레드가생성한 Comsumer 스레드가끝날때까지대기 */ printf("waiting for the termination of thread consumer... n"); WaitForSingleObject(hThreadconsumer, THREAD_EXECUTION_TIME); GetExitCodeThread(hThreadconsumer, &nexitcode); TerminateThread(hThreadconsumer, nexitcode); /*Comsumer 스레드종료 */ CloseHandle(hThreadconsumer); /* 스레드핸들러종료 */ printf("thread consumer is now terminated.. n"); ch11-2 - 39
/* Multi-thread.c (4) */ /* 스레드에서사용된 CriticalSection 변수 Delete*/ DeleteCriticalSection(&crit); freequeuebuffer(pq); DWORD WINAPI Thread_producer(LPVOID pparam) ThreadParam *pthrparam; /*void* 자료형으로스레드에전달된인자를형변환을통해 pthrparam 구조체로변환 */ pthrparam = (ThreadParam *)pparam; Circular_Int_Queue *pq = pthrparam->queue; int data = 0; int sleep_time_ms = 0; int enq_res; srand(time(null)); while (1) data = rand() % 100; /* 공유자원에접근하는임계구역에진입하기전 cs 변수를이용하여, Lock 을잡고임계구역에진입한번에하나의스레드만이공유자원에접근하도록제한함 */ EnterCriticalSection(pThrParam->pCS); printf("thread_producer::enqueue(data = %2d) => ", data); ch11-2 - 40
/* Multi-thread.c (4) */ enq_res = enqueue(pq, data); LeaveCriticalSection(pThrParam->pCS); if (enq_res!= -1) EnterCriticalSection(pThrParam->pCS); printqueue(pq); LeaveCriticalSection(pThrParam->pCS); else // 만약 Queue 가 FULL 상태이면, 여유공간이생길때까지반복하여 enqueue() 시도 do Sleep(200); EnterCriticalSection(pThrParam->pCS); enq_res = enqueue(pq, data); LeaveCriticalSection(pThrParam->pCS); while (enq_res == -1); EnterCriticalSection(pThrParam->pCS); printqueue(pq); LeaveCriticalSection(pThrParam->pCS); sleep_time_ms = rand() % 500; Sleep(sleep_time_ms); return 0; ch11-2 - 41
/* Multi-thread.c (4) */ DWORD WINAPI Thread_consumer(LPVOID pparam) ThreadParam *pthrparam; /*void* 자료형으로스레드에전달된인자를형변환을통해 pthrparam 구조체로변환 */ pthrparam = (ThreadParam *)pparam; Circular_Int_Queue *pq = pthrparam->queue; int sleep_time_ms = 0; int dequeue_data = -1; srand(time(null)); while (1) /* 공유자원에접근하는임계구역에진입하기전 cs 변수를이용하여, Lock 을잡고임계구역에진입한번에하나의스레드만이공유자원에접근하도록제한함 */ EnterCriticalSection(pThrParam->pCS); printf("thread_consumer::dequeue() => "); dequeue_data = dequeue(pq); if (dequeue_data > -1) printf("dequeue data (%2d)", dequeue_data); printqueue(pq); /* 공유자원에대한처리를종료한후 critical section Lock 을놓아주어해당임계구역에대한권리를다른스레드가가질수있도록허락함 */ LeaveCriticalSection(pThrParam->pCS); sleep_time_ms = rand() % 1000; Sleep(sleep_time_ms); return 0; ch11-2 - 42
실행결과 ch11-2 - 43
ch11-2 - 44
Task Generator Task Processor Functional Block diagram 2 개의 threads: Task_Gen, Task_Proc 1 개의 Linked List 기반 Queue Critical Section Thread_ Task_Gen enqueue() dequeue() Thread_ Task_Proc ListQueue (for task) ch11-2 - 45
Thread Task_Gen Thread_Task_Gen() 는차례대로 0 ~ 19 값의 task_id 와임의로생성되는 task_name 를가지는 Task 구조체변수를생성하며, 이생성된 task 를공유되는 queue 에저장 (enqueue) 시킨다. 항상 isfull() 함수를사용하여, queue가 FULL 상태인가를먼저확인하고, 만약 queue가 FULL 상태이면, 0.1초를 sleep한후다시queue 상태를점검한후, 여유공간이생기면저장한다. Queue에데이터를저장한후에는, printqueue() 함수를사용하여현재의 queue 상태를출력시키며, 0.1 ~ 1초사이의값을임의로선정하여그기간동안 sleep한후, 데이터생성및 enqueue 동작을반복한다. Thread Task_Proc Thread_Task_Proc() 는 isempty() 함수를사용하여 queue 의상태를확인하여, 만약 EMPTY 상태가아니면 queue 에저장되어있는데이터를추출 (dequeue) 하여 Task 정보 (task_id, task_name) 을출력하고, printqueue() 함수를사용하여 queue 상태를출력한후, 0.1 ~ 1초사이의값을임의로선정한후, 그기간동안sleep하고, queue로부터의데이터추출을반복한다. 만약 queue가 EMPTY 상태이면, 0.1 초동안sleep한후, queue를상태를다시점검하고, EMPTY 상태가아니면데이터를추출한다. ch11-2 - 46
main() 함수 main() 함수는 MAX_CAPACITY 10인 queue를구조체변수의초기화기능을사용하여설정하며, 두개의스레드 (Thread_Task_Proc(), Thread_Task_Gen()) 를생성하고, 이들스레드들이 queue와 CRITICAL_SECTION criticalsection를공유하도록하며, 각각의 role을 TASK_PROC와 TASK_GEN으로지정한다. 생성된 Thread_Task_Gen() 는지정된개수 ( 예 : 20) 의Task를생성하여, enqueue 시킨후, 종료를한다. Thread_Task_Proc() 는지정된개수의 task를차례로 dequeue한후, 이를처리 ( 화면에출력 ) 한후, 스스로종료한다. main() 함수에서는 WaitForSingleObject() 함수를사용하여, Thread_Task_Gen() 이스스로종료할때까지기다린다. Thread_Task_Gen() 이종료되면, main() 함수에서는 WaitForSingleObject() 함수를사용하여, Thread_Task_Proc() 이스스로종료할때까지기다린다. ch11-2 - 47
Task_Gen, Task_Proc, ListQueue 의구현 /* ListQueue.h (1) */ #ifndef LIST_QUEUE_H #define LIST_QUEUE_H /* Queue based on Linked List */ #include <stdio.h> #include <stdlib.h> typedef struct Task int task_id; char task_name[16]; Task; typedef struct ListNode Task *ptask; ListNode *prev; ListNode *next; /* ListQueue.h (2) */ typedef struct ListQueue ListNode *pfront; ListNode *pend; int size; int max_capacity; ListQueue(int max_cap); ListQueue; bool isempty(listqueue *pq); bool isfull(listqueue *pq); void initqueue(listqueue *pq); int enqueue(listqueue *pq, Task *ptsk); Task *dequeue(listqueue *pq); void printqueue(listqueue *pq); #endif ListNode; ch11-2 - 48
/* ListQueue.c (1) */ #include "ListQueue.h" ListQueue::ListQueue(int max_cap) max_capacity = max_cap; size = 0; pfront = pend = NULL; bool isempty(listqueue *pq) if (pq->size == 0) return true; else return false; bool isfull(listqueue *pq) if (pq->size >= pq->max_capacity) return true; else return false; /* ListQueue.c (2) */ void printqueue(listqueue *pq) Task *ptsk; if (isempty(pq)) printf(" Exception::Queue is Empty!! n"); return; printf(" Queue status (size:%2d) : ", pq->size); ListNode *pln = pq->pfront; for (int i = 0; i < pq->size; i++) ptsk = pln->ptask; printf(" task (%2d, %8s)", ptsk->task_id, ptsk->task_name); pln = pln->next; printf(" n"); ch11-2 - 49
/* ListQueue.c (2) */ int enqueue(listqueue *pq, Task *ptsk) ListNode *pnewln; pnewln = (ListNode *) malloc(sizeof(listnode)); pnewln->ptask = ptsk; pnewln->next = NULL; if (isfull(pq)) return -1; // queue is full else if (isempty(pq)) // queue is empty pnewln->prev = NULL; pq->pfront = pq->pend = pnewln; else pnewln->prev = pq->pend; pq->pend->next = pnewln; pq->pend = pnewln; pq->size++; return pq->size; /* ListQueue.c (3) */ Task *dequeue(listqueue *pq) Task *ptsk; if (isempty(pq)) printf(" Exception::Queue is Empty!! n"); return NULL; ptsk = pq->pfront->ptask; pq->size--; ListNode *pln = pq->pfront; pq->pfront = pq->pfront->next; free(pln); return ptsk; ch11-2 - 50
/* main.c (1) */ #include<stdio.h> #include<windows.h> #include<time.h> #include"listqueue.h" #define MAX_CAPACITY 4 #define TASK_NAME_LEN 8 #define TASK_NAME_LEN_MIN 4 #define NUM_TASK_GEN 20 #define THRD_TASK_GEN_INTERVAL 500 // 500ms #define THRD_TASK_PROC_INTERVAL 1000 // 1000ms #define THREAD_EXECUTION_TIME 5000 // 5 sec enum ROLE TASK_GEN, TASK_PROC ; typedef struct ThreadParam ListQueue *pq; CRITICAL_SECTION* pcs; int role; ThreadParam; DWORD WINAPI Thread_TaskGen(LPVOID pparam); DWORD WINAPI Thread_TaskProc(LPVOID pparam); ch11-2 - 51
/* main.c (2) */ void main() /* 변수선언 */ ListQueue queue(max_capacity); Elm_t data; ListQueue *pq = &queue; /* 임계구역을나누어스레드간의공유자원사용을관리하게될 CriticalSection 변수 */ CRITICAL_SECTION crit; /* 각스레드로전달될파라미터구조체 */ ThreadParam *pthrparam; /* 스레드정보를관리하게될핸들러변수 */ HANDLE hthreadtaskgen, hthreadtaskproc; DWORD nexitcode = NULL; /* 스레드에서사용될 CriticalSection 변수초기화 */ InitializeCriticalSection(&crit); /*Task_Processor 스레드에전달될파라미터값초기화 */ pthrparam = (ThreadParam*)malloc(sizeof(ThreadParam)); pthrparam->pcs = &crit; pthrparam->pq = pq; pthrparam->role = TASK_PROC; /*CreateThread API를이용하여 Thread 생성과전달할인자를전달한후반환되어지는해당 Thread에대한정보를 hthreadtask_processor 핸들러에저장 */ hthreadtaskproc = CreateThread(NULL, 0, Thread_TaskProc, pthrparam, 0, NULL); printf("thread Task_Proc has been created and instantitated.. n"); ch11-2 - 52
/* main.c (3) */ /*Task_Generator 스레드에전달될파라미터값초기화 */ pthrparam = (ThreadParam*)malloc(sizeof(ThreadParam)); pthrparam->pcs = &crit; pthrparam->pq = pq; pthrparam->role = TASK_GEN; /*CreateThread API 를이용하여 Thread 생성과전달할인자를전달한후반환되어지는해당 Thread 에대한정보를 hthreadtask_generator 핸들러에저장 */ hthreadtaskgen = CreateThread(NULL, 0, Thread_TaskGen, pthrparam, 0, NULL); printf("thread Task_Gen has been created and instantitated.. n"); /*main 스레드가생성한 Task_Generator 스레드가끝날때까지대기 */ printf("waiting for the termination of thread Task_Gen... n"); WaitForSingleObject(hThreadTaskGen, INFINITE); GetExitCodeThread(hThreadTaskGen, &nexitcode); TerminateThread(hThreadTaskGen, nexitcode); /*Task_Generator 스레드종료 */ CloseHandle(hThreadTaskGen); /* 스레드핸들러종료 */ printf("thread Task_Gen is now terminated.. n"); /*main 스레드가생성한 Task_Gen 스레드가끝날때까지대기 */ printf("waiting for the termination of thread Task_Proc... n"); WaitForSingleObject(hThreadTaskProc, INFINITE); GetExitCodeThread(hThreadTaskProc, &nexitcode); TerminateThread(hThreadTaskProc, nexitcode); /* Task_Proc 스레드종료 */ CloseHandle(hThreadTaskProc); /* 스레드핸들러종료 */ printf("thread Task_Proc is now terminated.. n"); DeleteCriticalSection(&crit); /* 스레드에서사용된 CriticalSection 변수 Delete*/ ch11-2 - 53
/* main.c (4) */ DWORD WINAPI Thread_TaskGen(LPVOID pparam) ThreadParam *pthrparam; Task *ptask; char t_name[task_name_len]; int t_name_len; int i, j; /* void * 자료형으로스레드에전달된인자를형변환을통해 pthrparam 구조체로변환 */ pthrparam = (ThreadParam *)pparam; ListQueue *pq = pthrparam->pq; int data = 0; int sleep_time_ms = 0; int enq_res; srand(time(null)); for (i = 0; i < NUM_TASK_GEN; i++) ptask = (Task *)malloc(sizeof(task)); ptask->task_id = i; for (int j = 0; j < TASK_NAME_LEN; j++) t_name[j] = ' 0'; t_name_len = rand() % (TASK_NAME_LEN - TASK_NAME_LEN_MIN) + TASK_NAME_LEN_MIN; t_name[0] = rand() % 26 + 'A'; for (j = 1; j < t_name_len; j++) t_name[j] = rand() % 26 + 'a'; t_name[j] = ' 0'; strcpy(ptask->task_name, t_name); ch11-2 - 54
/* main.c (5) */ EnterCriticalSection(pThrParam->pCS); printf("taskgen::enqueue(%2d, %8s) => ", ptask->task_id, ptask->task_name); enq_res = enqueue(pq, ptask); LeaveCriticalSection(pThrParam->pCS); if (enq_res!= -1) EnterCriticalSection(pThrParam->pCS); printqueue(pq); LeaveCriticalSection(pThrParam->pCS); else // queue is full EnterCriticalSection(pThrParam->pCS); printf("queue is Full!! n"); LeaveCriticalSection(pThrParam->pCS); do Sleep(200); EnterCriticalSection(pThrParam->pCS); enq_res = enqueue(pq, ptask); LeaveCriticalSection(pThrParam->pCS); while (enq_res == -1); EnterCriticalSection(pThrParam->pCS); printf(" enqueue() after queue has been Full =>"); printqueue(pq); LeaveCriticalSection(pThrParam->pCS); sleep_time_ms = rand() % THRD_TASK_GEN_INTERVAL; Sleep(sleep_time_ms); // for printf("taskgen:: Total %d tasks have been generated!! n", NUM_TASK_GEN); return 0; ch11-2 - 55
/* main.c (6) */ DWORD WINAPI Thread_TaskProc(LPVOID pparam) ThreadParam *pthrparam; /*void* 자료형으로스레드에전달된인자를형변환을통해 pthrparam 구조체로변환 */ pthrparam = (ThreadParam *)pparam; ListQueue *pq = pthrparam->pq; int sleep_time_ms = 0; int dequeue_data = -1; int count = 0; Task *ptsk; srand(time(null)); while (1) /* 공유자원에접근하는임계구역에진입하기전, cs 변수를이용하여, Lock 을잡고임계구역에진입한번에하나의스레드만이공유자원에접근하도록제한함 */ EnterCriticalSection(pThrParam->pCS); printf("taskproc::dequeue() => "); ptsk = dequeue(pq); if (ptsk!= NULL) printf("task(%2d, %8s)", ptsk->task_id, ptsk->task_name); printqueue(pq); count++; ch11-2 - 56
/* main.c (7) */ /* 공유자원에대한처리를종료한후 Lock 을놓아주어해당임계구역에대한권리를다른스레드가가질수있도록허락함 */ LeaveCriticalSection(pThrParam->pCS); if (count >= NUM_TASK_GEN) printf("taskproc:: Total %d tasks have been processed!! n", count); break; sleep_time_ms = rand() % THRD_TASK_PROC_INTERVAL; Sleep(sleep_time_ms); return 0; ch11-2 - 57
ch11-2 - 58
Homework 11 11.1 Critical section 이필요한이유에대하여설명하라. 11.2 Queuing System and Multi-Threads (1) 다음과같은 typedef 을정의하라 : typedef unsigned int UINT_32; typedef unsigned short UINT_16; typedef unsigned char UINT_8; (2) 구조체 struct Packet 은다음과같은멤버를가진다 : UINT_32 srcaddr; // source address UINT_32 dstaddr; // destination address UINT_8 priority; // priority of protocol data unit ( 사용자 / 프로토콜정보의우선순위 ) UINT_32 seqno; // sequence number UINT_32 payloadlength; // length of payload UINT_8 *ppayload; // payload (3) 다음함수들이구조체 struct Packet 를위하여사용된다 : void initpacket(packet *ppkt, unsigned int saddr, unsigned int sn); FILE * fprintpacket(file *fout, Packet* ppkt); ch11-2 - 59
11.2 Queuing System and Multi-Threads (2) (4) 구조체 struct ListNode 는다음과같은데이터멤버를가진다 : Packet *ppkt; ListNode *pnext; ListNode *pprev; (5) 구조체 struct Queue 는다음과같은데이터멤버를가진다 : int numpkts; ListNode* pfirst; ListNode* plast; (6) 다음함수들은구조체 struct Queue 와관련되어사용된다 : int enqueue(queue *pq, Packet* ppkt); // enqueue a packet into the queue. // The list node is pointing a packet. Packet* dequeue(queue *pq); // remove a list node from the queue, and return a Packet. void printqueue(file *fout, Queue *pq); // print all list node in the queue ch11-2 - 60
11.2 Queuing System and Multi-Threads (3) (7) 다중스레드의초기화를위하여다음과같은구조체 (ThreadParam) 가스레드로파라메터를전달하기위하여사용된다 : typedef struct ThreadParam CRITICAL_SECTION *pcs; // pointer to the shared critical section Queue *pq; // pointer to the shared queue int role; // packetgen or linktx UINT_32 addr; int max_queue; int duration; // duration of the thread execution (1 minute). FILE *fout; // pointer to the output file stream ThreadParam; (8) Thread packetgen() 는주기적으로패킷들을생성하며, 생성된패킷들을 queue에삽입 (enqueue) 한다. 각스레드 packetgen() 은지정된송신주소 (srcaddr) 를사용한다. 패킷생성시간간격은 1 ~ 5 초사이의값이임의로설정된다. (9) Thread linktx() 는 queue를지속적으로점검하며, 만약 queue가 empty 상태이면 1 초를 sleep 한다. Queue가 empty가아니면, queue로부터패킷1개를추출 (dequeue) 하여그패킷의정보를지정된파일로출력한다. ch11-2 - 61
11.2 Queuing System and Multi-Threads (4) (10) main() 함수는다음과같은내용을실행한다 : 3 의 critical section 이생성되어다수의 packetgen() 스레드와 linktx() 스레드간의공유자원 (3 개의 queue) 에대한사용을제어한다. 5 개의패킷생성스레드를생성하고, 초기화한다. 1 개의 link transmission 스레드를생성하고, 초기화한다. main() 함수는 3 의구조체 struct Queue 변수를생성하고, 3 의 critical section 을생성하여, 5 개의 packet generation 스레드와 1 개의 link transmission 스레드가공유하게한다. 각패킷생성스레드는각각 30 개의패킷 ( 목적지주소는 random 하게설정하며, 우선순위 (priority) 는 0 ~ 2 의값을 random 하게설정 ) 을생성하여, 우선순위에따라지정된 queue 에넣는다. linktx() 스레드는항상우선순위가높은 queue 를먼저점검하며, 만약패킷이존재하는경우, 이를우선처리한다. 패킷생성스레드에서생성되어 enqueue 된패킷들은 linktx() 스레드에의하여 dequeue 된후, 송신측주소에따라분류되어개수가점검된다. 각송신주소로부터의우선순위별총패킷개수는프로그램의종료단계에서출력되어, 정확하게전달되었는지를확인한다. 프로그램의수행내용은출력파일 output.txt 에출력된다. PacketGen () 1 PacketGen () 2 PacketGen () 3 PacketGen () 4 PacketGen () 5 High-priority (2) Queue Mid-priority (1) Queue Low-priority (0) Queue linktx() ch11-2 - 62