헤더파일 : MultiThreadWithSignal.h
/*******************************************************************************
* MultiThreadWithSignal.h
*
*
*
* IDENTIFICATION & REVISION
* $Id$
*
* NOTES
*
*
******************************************************************************/
/**
* @file MultiThreadWithSignal.h
* @brief 다중 연결 리스트( Doubly Linked List ) 큐
* Enqueue와 Dequeue를 스레드를 통해 진행
* 헤더파일
*/
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define THREAD_COUNT 100
#define DATA_MAX_COUNT 2000
/* 큐를 이루는 노드 구조체 */
typedef struct qNode
{
int mData; /* Data */
int mEnqueueThreadId; /* 데이터를 준 스레드 ID */
struct qNode * mPrevNode; /* 이전 Node */
struct qNode * mNextNode; /* 다음 Node */
} qNode;
/* 큐 구조체 */
typedef struct qQueue
{
qNode * mFrontNode; /* Dequeue */
qNode * mRearNode; /* Enqueue */
int mCount; /* 노드 갯수 */
} qQueue;
/* 스레드 생성 시 인자로 넘겨주는 구조체 */
typedef struct qArgPthread
{
qQueue * mQueue; /* Queue */
int mThreadId; /* 스레드 ID */
} qArgPthread;
/* Initialize Queue */
void qInitQueue( qQueue * aQueue );
/* Enqueue */
void * qEnqueue( void * aArgPthread );
/* Dequeue */
void * qDequeue( void * aArgPthread );소스파일 : MultiThreadWithSignal.c
/*******************************************************************************
* MultiThreadWithSignal.c
*
* Copyright (c) 2011, SUNJESOFT Inc.
*
*
* IDENTIFICATION & REVISION
* $Id$
*
* NOTES
*
*
******************************************************************************/
/**
* @file MultiThreadWithSignal.c
* @brief 다중 연결 리스트( Doubly Linked List ) 큐
* Enqueue와 Dequeue를 스레드를 통해 진행
* Enqueue 스레드가 enqueue 이후 signal을 전송,
* Dequeue 스레드가 signal을 받고 dequeue를 수행
* 다시 Enqueue 스레드에게 작업을 마친 flag를 넘긴다.
*/
#include "MultiThreadWithSignal.h"
/* Global Mutex & Condition Variable */
pthread_mutex_t gMutexForThread;
pthread_mutex_t gMutexForEnqDeq[THREAD_COUNT];
pthread_cond_t gCondition[THREAD_COUNT];
/* Flag Variables */
int gMainFlag;
int gFlagForThread[THREAD_COUNT];
int gFlagForEnqDeq[THREAD_COUNT];
int main()
{
pthread_t sEnqueueThread[THREAD_COUNT]; /* THREAD_COUNT 만큼의 Enqueue 스레드 선언 */
pthread_t sDequeueThread[THREAD_COUNT]; /* THREAD_COUNT 만큼의 Dequeue 스레드 선언 */
qArgPthread * sArgEnqueueThread[THREAD_COUNT]; /* THREAD_COUNT 만큼의 인자로 넘겨줄 구조체 */
qArgPthread * sArgDequeueThread[THREAD_COUNT]; /* THREAD_COUNT 만큼의 인자로 넘겨줄 구조체 */
qQueue * sQueue = NULL;
int sIsCreatedThread = 0; /* 스레드가 Create되었는지 여부를 알기 위한 변수 */
int sIsJoinedThread = 0; /* 스레드가 Join되었는지 여부를 알기 위한 변수 */
int sReturnValue = 0; /* 스레드가 Join 수행 시 반환하는 값 */
int sCountThread = 0; /* 스레드 개수, 각종 for문의 정수형 변수 */
int sDoubleThreadCount = 0;
sQueue = (qQueue *)malloc( sizeof( qQueue ) );
sDoubleThreadCount = THREAD_COUNT * 2; /* THREAD_COUNT의 두배 */
/*
* Initialize
*/
pthread_mutex_init( &gMutexForThread, NULL );
for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ )
{
/* Mutex */
pthread_mutex_init( &gMutexForEnqDeq[sCountThread], NULL );
/* Condition Value */
pthread_cond_init( &gCondition[sCountThread], NULL );
/* Flags */
gFlagForThread[sCountThread] = 0;
gFlagForEnqDeq[sCountThread] = 0;
}
gMainFlag = 0; /* Flag */
qInitQueue( sQueue ); /* 큐 sQueue 초기화 */
/* 스레드 Mutex Lock */
pthread_mutex_lock( &gMutexForThread );
/*
* 스레드 Create
*/
/* Enqueue 스레드 */
for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ )
{
sArgEnqueueThread[sCountThread] = (qArgPthread *)malloc( sizeof( qArgPthread ) );
/* 인자의 스레드 ID는 0, 1, ..., n */
sArgEnqueueThread[sCountThread]->mQueue = sQueue;
sArgEnqueueThread[sCountThread]->mThreadId = sCountThread;
sIsCreatedThread = pthread_create( &sEnqueueThread[sCountThread],
NULL,
&qEnqueue,
(void *)sArgEnqueueThread[sCountThread] );
if( sIsCreatedThread != 0 )
{
printf("[ERROR-001] :: cannot create thread.\n");
}
}
/* Dequeue 스레드 */
for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ )
{
sArgDequeueThread[sCountThread] = (qArgPthread *)malloc( sizeof( qArgPthread ) );
/* 인자의 스레드 ID는 0, 1, ..., n */
sArgDequeueThread[sCountThread]->mQueue = sQueue;
sArgDequeueThread[sCountThread]->mThreadId = sCountThread;
sIsCreatedThread = pthread_create( &sDequeueThread[sCountThread],
NULL,
&qDequeue,
(void *)sArgDequeueThread[sCountThread] );
if( sIsCreatedThread != 0 )
{
printf("[ERROR-001] :: cannot create thread.\n");
}
}
/* 모든 스레드 생성 후 플래그 값 확인 */
while( gMainFlag < sDoubleThreadCount )
{
gMainFlag = 0;
for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ )
{
gMainFlag += gFlagForThread[sCountThread];
}
sleep(0);
}
/* 스레드 Mutex Unlock */
pthread_mutex_unlock( &gMutexForThread );
/*
* 스레드 Join
*/
for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ )
{
/* Enqueue 스레드 */
sIsJoinedThread = pthread_join( sEnqueueThread[sCountThread],
(void **)&sReturnValue );
if( sIsJoinedThread != 0 )
{
printf("[ERROR-002] :: cannot join enqueue thread.\n");
}
/* Dequeue 스레드 */
sIsJoinedThread = pthread_join( sDequeueThread[sCountThread],
(void **)&sReturnValue );
if( sIsJoinedThread != 0 )
{
printf("[ERROR-002] :: cannot join dequeue thread.\n");
}
}
printf("\n+-----------------------------------+");
printf("\n| [SYSTEM] :: program is Exited.... |\n");
printf("+-----------------------------------+\n\n");
return 0;
}
/* Initialize Queue */
void qInitQueue( qQueue * aQueue )
{
aQueue->mFrontNode = NULL;
aQueue->mRearNode = NULL;
aQueue->mCount = 0;
printf("\n+-----------------------------------+");
printf("\n| [SYSTEM] :: queue is Initialized. |\n");
printf("+-----------------------------------+\n\n");
}
/* Enqueue */
void * qEnqueue( void * aArgPthread )
{
qArgPthread * sArgPthread = NULL;
qQueue * sQueue = NULL;
qNode * sNewNode = NULL;
int sInputData = 0;
int sRepeatCount = 0;
int sThreadId = 0;
/* int sMaxBuff = 0; */
sArgPthread = (qArgPthread *)aArgPthread;
sQueue = sArgPthread->mQueue;
sThreadId = sArgPthread->mThreadId;
/* sMaxBuff = DATA_MAX_COUNT * 2; */
printf("[#%3d][ENQUEUE_THREAD] is ready.\n", sThreadId );
gFlagForThread[sThreadId] += 1;
while( sRepeatCount < DATA_MAX_COUNT )
{
/* Dequeue 스레드의 준비를 기다린다. */
while( gFlagForEnqDeq[sThreadId] == 0 )
{
sleep(0);
}
/* 스레드 Mutex Lock */
pthread_mutex_lock( &gMutexForThread );
sNewNode = (qNode *)malloc( sizeof( qNode ) );
if( sQueue->mCount >= THREAD_COUNT /* sMaxBuff */ )
{
/* printf("[THREAD_%d]\n[ERROR-003] :: queue is full. [ %d/%d ]\n\n", */
/* sThreadId, */
/* DATA_MAX_COUNT, */
/* DATA_MAX_COUNT ); */
}
else
{
sInputData = rand() % 90 + 10; /* 임의의 데이터 값을 두자리 수로 고정 */
sNewNode->mData = sInputData;
sNewNode->mNextNode = NULL;
sNewNode->mEnqueueThreadId = sThreadId;
printf("[#%3d][ENQUEUE_THREAD] Enqueue DATA : %d( %d/%d ) QUEUE STATUS : [ %d/%d ]\n",
sThreadId,
sInputData,
sRepeatCount+1,
DATA_MAX_COUNT,
sQueue->mCount+1,
THREAD_COUNT /* sMaxBuff */ );
if( sQueue->mCount == 0 )
{
sQueue->mFrontNode = sNewNode;
sNewNode->mPrevNode = NULL;
}
else
{
sQueue->mRearNode->mNextNode = sNewNode;
sNewNode->mPrevNode = sQueue->mRearNode;
}
sQueue->mRearNode = sNewNode;
sQueue->mCount++;
sRepeatCount++;
}
/* 스레드 Mutex Unlock */
pthread_mutex_unlock( &gMutexForThread );
/* Enqueue Mutex Lock */
pthread_mutex_lock( &gMutexForEnqDeq[sThreadId] );
gFlagForEnqDeq[sThreadId] = 0; /* 0으로 다시 초기화 */
/* Dequeue 스레드에게 시그널 */
pthread_cond_signal( &gCondition[sThreadId] );
/* Enqueue Mutex Unlock */
pthread_mutex_unlock( &gMutexForEnqDeq[sThreadId] );
/* if( sRepeatCount%5 == 0 ) */
/* { */
usleep(50);
/* } */
}
}
/* Dequeue */
void * qDequeue( void * aArgPthread )
{
qArgPthread * sArgPthread = NULL;
qQueue * sQueue = NULL;
int sOutputData = 0;
int sRepeatCount = 0;
int sThreadId = 0;
int sEnqueueThreadId = 0;
/* int sMaxBuff = 0; */
sArgPthread = (qArgPthread *)aArgPthread;
sQueue = sArgPthread->mQueue;
sThreadId = sArgPthread->mThreadId;
/* sMaxBuff = DATA_MAX_COUNT * 2; */
printf("[#%3d][DEQUEUE_THREAD] is ready.\n", sThreadId );
gFlagForThread[sThreadId] += 1;
while( sRepeatCount < DATA_MAX_COUNT )
{
/* Mutex Lock */
pthread_mutex_lock( &gMutexForEnqDeq[sThreadId] );
/* Enqueue 스레드에게 준비가 되었음을 알림 */
gFlagForEnqDeq[sThreadId] = 1;
/* Enqueue 까지 기다린다. */
pthread_cond_wait( &gCondition[sThreadId], &gMutexForEnqDeq[sThreadId] );
/* Mutex Unlock */
pthread_mutex_unlock( &gMutexForEnqDeq[sThreadId] );
/* 스레드 Mutex Lock */
pthread_mutex_lock( &gMutexForThread );
if( sQueue->mCount == 0 )
{
/* printf("[THREAD_%d]\n[ERROR-004] :: queue is empty. [ 0/%d ]\n\n", */
/* sThreadId, */
/* DATA_MAX_COUNT ); */
}
else
{
sOutputData = sQueue->mFrontNode->mData;
sEnqueueThreadId = sQueue->mFrontNode->mEnqueueThreadId;
printf("[#%3d][DEQUEUE_THREAD] Dequeue DATA : %d( %d/%d ) QUEUE STATUS : [ %d/%d ] data by [#%3d][ENQUEUE_THREAD]\n",
sThreadId,
sOutputData,
sRepeatCount+1,
DATA_MAX_COUNT,
sQueue->mCount-1,
THREAD_COUNT /* sMaxBuff */,
sEnqueueThreadId );
if( sQueue->mCount == 1 )
{
free( sQueue->mFrontNode );
sQueue->mFrontNode = NULL;
sQueue->mRearNode = NULL;
sQueue->mCount = 0;
}
else
{
sQueue->mFrontNode = sQueue->mFrontNode->mNextNode;
free( sQueue->mFrontNode->mPrevNode );
sQueue->mFrontNode->mPrevNode = NULL;
sQueue->mCount--;
}
sRepeatCount++;
}
/* 스레드 Mutex Unlock */
pthread_mutex_unlock( &gMutexForThread );
/* if( sRepeatCount%5 == 0 ) */
/* { */
usleep(50);
/* } */
}
}공부/코딩연습 등의 이유로 얼마든지 퍼가셔도 좋습니다.
하지만 라인마다 의미를 파악하지 않고 무작정 복사 붙여넣기는
아무것도 남지 않습니다.
또한 댓글로 궁금하신 라인 등 얼마든지 물어보시면
바로바로 대답해드리겠습니다 :)
공부를 게을리하지 맙시다!
'IT > C - Programming' 카테고리의 다른 글
| [C코드] :: MULTIPROCESS QUEUE code (멀티 프로세스를 이용한 큐 코드) (3) | 2019.07.05 |
|---|---|
| [C개념] :: 자료형(Data type) 별 크기 및 범위 (0) | 2018.09.28 |
| [C코드] :: BINARY SEARCH code (이진 탐색 코드) (0) | 2018.03.28 |
| [C코드] :: INSERTION SORT code (삽입 정렬 코드) (0) | 2018.03.27 |
| [C코드] :: Array based QUEUE code (배열 기반 큐 코드) (0) | 2018.03.26 |