헤더파일 : MultiThreadWithSignal.h
/******************************************************************************* * MultiThreadWithSignal.h * * * * IDENTIFICATION & REVISION * $Id$ * * NOTES * * ******************************************************************************/ /** * @file MultiThreadWithSignal.h * @brief 다중 연결 리스트( Doubly Linked List ) 큐 * Enqueue와 Dequeue를 스레드를 통해 진행 * 헤더파일 */ #include <stdio.h> #include <stdlib.h> #include <pthread.h> #define THREAD_COUNT 100 #define DATA_MAX_COUNT 2000 /* 큐를 이루는 노드 구조체 */ typedef struct qNode { int mData; /* Data */ int mEnqueueThreadId; /* 데이터를 준 스레드 ID */ struct qNode * mPrevNode; /* 이전 Node */ struct qNode * mNextNode; /* 다음 Node */ } qNode; /* 큐 구조체 */ typedef struct qQueue { qNode * mFrontNode; /* Dequeue */ qNode * mRearNode; /* Enqueue */ int mCount; /* 노드 갯수 */ } qQueue; /* 스레드 생성 시 인자로 넘겨주는 구조체 */ typedef struct qArgPthread { qQueue * mQueue; /* Queue */ int mThreadId; /* 스레드 ID */ } qArgPthread; /* Initialize Queue */ void qInitQueue( qQueue * aQueue ); /* Enqueue */ void * qEnqueue( void * aArgPthread ); /* Dequeue */ void * qDequeue( void * aArgPthread );
소스파일 : MultiThreadWithSignal.c
/******************************************************************************* * MultiThreadWithSignal.c * * Copyright (c) 2011, SUNJESOFT Inc. * * * IDENTIFICATION & REVISION * $Id$ * * NOTES * * ******************************************************************************/ /** * @file MultiThreadWithSignal.c * @brief 다중 연결 리스트( Doubly Linked List ) 큐 * Enqueue와 Dequeue를 스레드를 통해 진행 * Enqueue 스레드가 enqueue 이후 signal을 전송, * Dequeue 스레드가 signal을 받고 dequeue를 수행 * 다시 Enqueue 스레드에게 작업을 마친 flag를 넘긴다. */ #include "MultiThreadWithSignal.h" /* Global Mutex & Condition Variable */ pthread_mutex_t gMutexForThread; pthread_mutex_t gMutexForEnqDeq[THREAD_COUNT]; pthread_cond_t gCondition[THREAD_COUNT]; /* Flag Variables */ int gMainFlag; int gFlagForThread[THREAD_COUNT]; int gFlagForEnqDeq[THREAD_COUNT]; int main() { pthread_t sEnqueueThread[THREAD_COUNT]; /* THREAD_COUNT 만큼의 Enqueue 스레드 선언 */ pthread_t sDequeueThread[THREAD_COUNT]; /* THREAD_COUNT 만큼의 Dequeue 스레드 선언 */ qArgPthread * sArgEnqueueThread[THREAD_COUNT]; /* THREAD_COUNT 만큼의 인자로 넘겨줄 구조체 */ qArgPthread * sArgDequeueThread[THREAD_COUNT]; /* THREAD_COUNT 만큼의 인자로 넘겨줄 구조체 */ qQueue * sQueue = NULL; int sIsCreatedThread = 0; /* 스레드가 Create되었는지 여부를 알기 위한 변수 */ int sIsJoinedThread = 0; /* 스레드가 Join되었는지 여부를 알기 위한 변수 */ int sReturnValue = 0; /* 스레드가 Join 수행 시 반환하는 값 */ int sCountThread = 0; /* 스레드 개수, 각종 for문의 정수형 변수 */ int sDoubleThreadCount = 0; sQueue = (qQueue *)malloc( sizeof( qQueue ) ); sDoubleThreadCount = THREAD_COUNT * 2; /* THREAD_COUNT의 두배 */ /* * Initialize */ pthread_mutex_init( &gMutexForThread, NULL ); for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ ) { /* Mutex */ pthread_mutex_init( &gMutexForEnqDeq[sCountThread], NULL ); /* Condition Value */ pthread_cond_init( &gCondition[sCountThread], NULL ); /* Flags */ gFlagForThread[sCountThread] = 0; gFlagForEnqDeq[sCountThread] = 0; } gMainFlag = 0; /* Flag */ qInitQueue( sQueue ); /* 큐 sQueue 초기화 */ /* 스레드 Mutex Lock */ pthread_mutex_lock( &gMutexForThread ); /* * 스레드 Create */ /* Enqueue 스레드 */ for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ ) { sArgEnqueueThread[sCountThread] = (qArgPthread *)malloc( sizeof( qArgPthread ) ); /* 인자의 스레드 ID는 0, 1, ..., n */ sArgEnqueueThread[sCountThread]->mQueue = sQueue; sArgEnqueueThread[sCountThread]->mThreadId = sCountThread; sIsCreatedThread = pthread_create( &sEnqueueThread[sCountThread], NULL, &qEnqueue, (void *)sArgEnqueueThread[sCountThread] ); if( sIsCreatedThread != 0 ) { printf("[ERROR-001] :: cannot create thread.\n"); } } /* Dequeue 스레드 */ for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ ) { sArgDequeueThread[sCountThread] = (qArgPthread *)malloc( sizeof( qArgPthread ) ); /* 인자의 스레드 ID는 0, 1, ..., n */ sArgDequeueThread[sCountThread]->mQueue = sQueue; sArgDequeueThread[sCountThread]->mThreadId = sCountThread; sIsCreatedThread = pthread_create( &sDequeueThread[sCountThread], NULL, &qDequeue, (void *)sArgDequeueThread[sCountThread] ); if( sIsCreatedThread != 0 ) { printf("[ERROR-001] :: cannot create thread.\n"); } } /* 모든 스레드 생성 후 플래그 값 확인 */ while( gMainFlag < sDoubleThreadCount ) { gMainFlag = 0; for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ ) { gMainFlag += gFlagForThread[sCountThread]; } sleep(0); } /* 스레드 Mutex Unlock */ pthread_mutex_unlock( &gMutexForThread ); /* * 스레드 Join */ for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ ) { /* Enqueue 스레드 */ sIsJoinedThread = pthread_join( sEnqueueThread[sCountThread], (void **)&sReturnValue ); if( sIsJoinedThread != 0 ) { printf("[ERROR-002] :: cannot join enqueue thread.\n"); } /* Dequeue 스레드 */ sIsJoinedThread = pthread_join( sDequeueThread[sCountThread], (void **)&sReturnValue ); if( sIsJoinedThread != 0 ) { printf("[ERROR-002] :: cannot join dequeue thread.\n"); } } printf("\n+-----------------------------------+"); printf("\n| [SYSTEM] :: program is Exited.... |\n"); printf("+-----------------------------------+\n\n"); return 0; } /* Initialize Queue */ void qInitQueue( qQueue * aQueue ) { aQueue->mFrontNode = NULL; aQueue->mRearNode = NULL; aQueue->mCount = 0; printf("\n+-----------------------------------+"); printf("\n| [SYSTEM] :: queue is Initialized. |\n"); printf("+-----------------------------------+\n\n"); } /* Enqueue */ void * qEnqueue( void * aArgPthread ) { qArgPthread * sArgPthread = NULL; qQueue * sQueue = NULL; qNode * sNewNode = NULL; int sInputData = 0; int sRepeatCount = 0; int sThreadId = 0; /* int sMaxBuff = 0; */ sArgPthread = (qArgPthread *)aArgPthread; sQueue = sArgPthread->mQueue; sThreadId = sArgPthread->mThreadId; /* sMaxBuff = DATA_MAX_COUNT * 2; */ printf("[#%3d][ENQUEUE_THREAD] is ready.\n", sThreadId ); gFlagForThread[sThreadId] += 1; while( sRepeatCount < DATA_MAX_COUNT ) { /* Dequeue 스레드의 준비를 기다린다. */ while( gFlagForEnqDeq[sThreadId] == 0 ) { sleep(0); } /* 스레드 Mutex Lock */ pthread_mutex_lock( &gMutexForThread ); sNewNode = (qNode *)malloc( sizeof( qNode ) ); if( sQueue->mCount >= THREAD_COUNT /* sMaxBuff */ ) { /* printf("[THREAD_%d]\n[ERROR-003] :: queue is full. [ %d/%d ]\n\n", */ /* sThreadId, */ /* DATA_MAX_COUNT, */ /* DATA_MAX_COUNT ); */ } else { sInputData = rand() % 90 + 10; /* 임의의 데이터 값을 두자리 수로 고정 */ sNewNode->mData = sInputData; sNewNode->mNextNode = NULL; sNewNode->mEnqueueThreadId = sThreadId; printf("[#%3d][ENQUEUE_THREAD] Enqueue DATA : %d( %d/%d ) QUEUE STATUS : [ %d/%d ]\n", sThreadId, sInputData, sRepeatCount+1, DATA_MAX_COUNT, sQueue->mCount+1, THREAD_COUNT /* sMaxBuff */ ); if( sQueue->mCount == 0 ) { sQueue->mFrontNode = sNewNode; sNewNode->mPrevNode = NULL; } else { sQueue->mRearNode->mNextNode = sNewNode; sNewNode->mPrevNode = sQueue->mRearNode; } sQueue->mRearNode = sNewNode; sQueue->mCount++; sRepeatCount++; } /* 스레드 Mutex Unlock */ pthread_mutex_unlock( &gMutexForThread ); /* Enqueue Mutex Lock */ pthread_mutex_lock( &gMutexForEnqDeq[sThreadId] ); gFlagForEnqDeq[sThreadId] = 0; /* 0으로 다시 초기화 */ /* Dequeue 스레드에게 시그널 */ pthread_cond_signal( &gCondition[sThreadId] ); /* Enqueue Mutex Unlock */ pthread_mutex_unlock( &gMutexForEnqDeq[sThreadId] ); /* if( sRepeatCount%5 == 0 ) */ /* { */ usleep(50); /* } */ } } /* Dequeue */ void * qDequeue( void * aArgPthread ) { qArgPthread * sArgPthread = NULL; qQueue * sQueue = NULL; int sOutputData = 0; int sRepeatCount = 0; int sThreadId = 0; int sEnqueueThreadId = 0; /* int sMaxBuff = 0; */ sArgPthread = (qArgPthread *)aArgPthread; sQueue = sArgPthread->mQueue; sThreadId = sArgPthread->mThreadId; /* sMaxBuff = DATA_MAX_COUNT * 2; */ printf("[#%3d][DEQUEUE_THREAD] is ready.\n", sThreadId ); gFlagForThread[sThreadId] += 1; while( sRepeatCount < DATA_MAX_COUNT ) { /* Mutex Lock */ pthread_mutex_lock( &gMutexForEnqDeq[sThreadId] ); /* Enqueue 스레드에게 준비가 되었음을 알림 */ gFlagForEnqDeq[sThreadId] = 1; /* Enqueue 까지 기다린다. */ pthread_cond_wait( &gCondition[sThreadId], &gMutexForEnqDeq[sThreadId] ); /* Mutex Unlock */ pthread_mutex_unlock( &gMutexForEnqDeq[sThreadId] ); /* 스레드 Mutex Lock */ pthread_mutex_lock( &gMutexForThread ); if( sQueue->mCount == 0 ) { /* printf("[THREAD_%d]\n[ERROR-004] :: queue is empty. [ 0/%d ]\n\n", */ /* sThreadId, */ /* DATA_MAX_COUNT ); */ } else { sOutputData = sQueue->mFrontNode->mData; sEnqueueThreadId = sQueue->mFrontNode->mEnqueueThreadId; printf("[#%3d][DEQUEUE_THREAD] Dequeue DATA : %d( %d/%d ) QUEUE STATUS : [ %d/%d ] data by [#%3d][ENQUEUE_THREAD]\n", sThreadId, sOutputData, sRepeatCount+1, DATA_MAX_COUNT, sQueue->mCount-1, THREAD_COUNT /* sMaxBuff */, sEnqueueThreadId ); if( sQueue->mCount == 1 ) { free( sQueue->mFrontNode ); sQueue->mFrontNode = NULL; sQueue->mRearNode = NULL; sQueue->mCount = 0; } else { sQueue->mFrontNode = sQueue->mFrontNode->mNextNode; free( sQueue->mFrontNode->mPrevNode ); sQueue->mFrontNode->mPrevNode = NULL; sQueue->mCount--; } sRepeatCount++; } /* 스레드 Mutex Unlock */ pthread_mutex_unlock( &gMutexForThread ); /* if( sRepeatCount%5 == 0 ) */ /* { */ usleep(50); /* } */ } }
공부/코딩연습 등의 이유로 얼마든지 퍼가셔도 좋습니다.
하지만 라인마다 의미를 파악하지 않고 무작정 복사 붙여넣기는
아무것도 남지 않습니다.
또한 댓글로 궁금하신 라인 등 얼마든지 물어보시면
바로바로 대답해드리겠습니다 :)
공부를 게을리하지 맙시다!
'IT > C - Programming' 카테고리의 다른 글
[C코드] :: MULTIPROCESS QUEUE code (멀티 프로세스를 이용한 큐 코드) (3) | 2019.07.05 |
---|---|
[C개념] :: 자료형(Data type) 별 크기 및 범위 (0) | 2018.09.28 |
[C코드] :: BINARY SEARCH code (이진 탐색 코드) (0) | 2018.03.28 |
[C코드] :: INSERTION SORT code (삽입 정렬 코드) (0) | 2018.03.27 |
[C코드] :: Array based QUEUE code (배열 기반 큐 코드) (0) | 2018.03.26 |