헤더파일 : MultiThreadWithSignal.h

/*******************************************************************************
 * MultiThreadWithSignal.h
 *
 *
 *
 * IDENTIFICATION & REVISION
 *        $Id$
 *
 * NOTES
 *
 *
 ******************************************************************************/

/**
 * @file MultiThreadWithSignal.h
 * @brief 다중 연결 리스트( Doubly Linked List ) 큐
 *        Enqueue와 Dequeue를 스레드를 통해 진행
 *        헤더파일
 */


#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>


#define THREAD_COUNT    100
#define DATA_MAX_COUNT  2000


/* 큐를 이루는 노드 구조체 */
typedef struct qNode
{
    int             mData;             /* Data */
    int             mEnqueueThreadId;  /* 데이터를 준 스레드 ID */
    
    struct qNode  * mPrevNode;         /* 이전 Node */
    struct qNode  * mNextNode;         /* 다음 Node */
} qNode;


/* 큐 구조체 */
typedef struct qQueue
{
    qNode  * mFrontNode;      /* Dequeue */
    qNode  * mRearNode;       /* Enqueue */
    
    int      mCount;          /* 노드 갯수 */
} qQueue;


/* 스레드 생성 시 인자로 넘겨주는 구조체 */
typedef struct qArgPthread
{
    qQueue  * mQueue;         /* Queue */
    int       mThreadId;      /* 스레드 ID */
} qArgPthread;


/* Initialize Queue */
void  qInitQueue( qQueue * aQueue );


/* Enqueue */
void  * qEnqueue( void * aArgPthread );


/* Dequeue */
void  * qDequeue( void * aArgPthread );


소스파일 : MultiThreadWithSignal.c

/*******************************************************************************
 * MultiThreadWithSignal.c
 *
 * Copyright (c) 2011, SUNJESOFT Inc.
 *
 *
 * IDENTIFICATION & REVISION
 *        $Id$
 *
 * NOTES
 *
 *
 ******************************************************************************/

/**
 * @file MultiThreadWithSignal.c
 * @brief 다중 연결 리스트( Doubly Linked List ) 큐
 *        Enqueue와 Dequeue를 스레드를 통해 진행
 *        Enqueue 스레드가 enqueue 이후 signal을 전송,
 *        Dequeue 스레드가 signal을 받고 dequeue를 수행
 *        다시 Enqueue 스레드에게 작업을 마친 flag를 넘긴다.
 */


#include "MultiThreadWithSignal.h"


/* Global Mutex & Condition Variable */
pthread_mutex_t  gMutexForThread;
pthread_mutex_t  gMutexForEnqDeq[THREAD_COUNT];

pthread_cond_t   gCondition[THREAD_COUNT];


/* Flag Variables */
int  gMainFlag;
int  gFlagForThread[THREAD_COUNT];
int  gFlagForEnqDeq[THREAD_COUNT];


int main()
{
    pthread_t      sEnqueueThread[THREAD_COUNT];      /* THREAD_COUNT 만큼의 Enqueue 스레드 선언 */
    pthread_t      sDequeueThread[THREAD_COUNT];      /* THREAD_COUNT 만큼의 Dequeue 스레드 선언 */
    
    qArgPthread  * sArgEnqueueThread[THREAD_COUNT];   /* THREAD_COUNT 만큼의 인자로 넘겨줄 구조체 */
    qArgPthread  * sArgDequeueThread[THREAD_COUNT];   /* THREAD_COUNT 만큼의 인자로 넘겨줄 구조체 */

    qQueue       * sQueue              = NULL;

    int            sIsCreatedThread    = 0;           /* 스레드가 Create되었는지 여부를 알기 위한 변수 */
    int            sIsJoinedThread     = 0;           /* 스레드가 Join되었는지 여부를 알기 위한 변수 */
    int            sReturnValue        = 0;           /* 스레드가 Join 수행 시 반환하는 값 */
    int            sCountThread        = 0;           /* 스레드 개수, 각종 for문의 정수형 변수 */
    int            sDoubleThreadCount  = 0;

    sQueue  = (qQueue *)malloc( sizeof( qQueue ) );
    
    sDoubleThreadCount = THREAD_COUNT * 2;            /* THREAD_COUNT의 두배 */

    
    /*
     * Initialize
     */

    pthread_mutex_init( &gMutexForThread, NULL );

    for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ )
    {
        /* Mutex */        
        pthread_mutex_init( &gMutexForEnqDeq[sCountThread], NULL );

        /* Condition Value */
        pthread_cond_init( &gCondition[sCountThread], NULL );

        /* Flags */
        gFlagForThread[sCountThread] = 0;
        gFlagForEnqDeq[sCountThread] = 0;
    }

    gMainFlag = 0;            /* Flag */
    
    qInitQueue( sQueue );     /* 큐 sQueue 초기화 */

    
    /* 스레드 Mutex Lock */
    pthread_mutex_lock( &gMutexForThread );

    
    /*
     * 스레드 Create
     */


    /* Enqueue 스레드 */
    for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ )
    {
        sArgEnqueueThread[sCountThread] = (qArgPthread *)malloc( sizeof( qArgPthread ) );

        /* 인자의 스레드 ID는 0, 1, ..., n */
        sArgEnqueueThread[sCountThread]->mQueue     = sQueue;
        sArgEnqueueThread[sCountThread]->mThreadId  = sCountThread;
        
        sIsCreatedThread = pthread_create( &sEnqueueThread[sCountThread],
                                           NULL,
                                           &qEnqueue,
                                           (void *)sArgEnqueueThread[sCountThread] );

        if( sIsCreatedThread != 0 )
        {
            printf("[ERROR-001] :: cannot create thread.\n");
        }
    }
    
    /* Dequeue 스레드 */
    for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ )
    {
        sArgDequeueThread[sCountThread] = (qArgPthread *)malloc( sizeof( qArgPthread ) );

        /* 인자의 스레드 ID는 0, 1, ..., n */
        sArgDequeueThread[sCountThread]->mQueue     = sQueue;
        sArgDequeueThread[sCountThread]->mThreadId  = sCountThread;
        
        sIsCreatedThread = pthread_create( &sDequeueThread[sCountThread],
                                           NULL,
                                           &qDequeue,
                                           (void *)sArgDequeueThread[sCountThread] );

        if( sIsCreatedThread != 0 )
        {
            printf("[ERROR-001] :: cannot create thread.\n");
        }
    }

    /* 모든 스레드 생성 후 플래그 값 확인 */
    while( gMainFlag < sDoubleThreadCount )
    {
        gMainFlag = 0;
        for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ )
        {
            gMainFlag += gFlagForThread[sCountThread];
        }
        sleep(0);
    }

    /* 스레드 Mutex Unlock */
    pthread_mutex_unlock( &gMutexForThread );
    
    /*
     * 스레드 Join
     */

    for( sCountThread = 0; sCountThread < THREAD_COUNT; sCountThread++ )
    {
        /* Enqueue 스레드 */
        sIsJoinedThread = pthread_join( sEnqueueThread[sCountThread],
                                        (void **)&sReturnValue );

        if( sIsJoinedThread != 0 )
        {
            printf("[ERROR-002] :: cannot join enqueue thread.\n");
        }

        
        /* Dequeue 스레드 */
        sIsJoinedThread = pthread_join( sDequeueThread[sCountThread],
                                        (void **)&sReturnValue );

        if( sIsJoinedThread != 0 )
        {
            printf("[ERROR-002] :: cannot join dequeue thread.\n");
        }
        
    }

    printf("\n+-----------------------------------+");
    printf("\n| [SYSTEM] :: program is Exited.... |\n");
    printf("+-----------------------------------+\n\n");
    
    return 0;
}



/* Initialize Queue */
void qInitQueue( qQueue * aQueue )
{
    aQueue->mFrontNode  = NULL;
    aQueue->mRearNode   = NULL;
    aQueue->mCount      = 0;

    printf("\n+-----------------------------------+");
    printf("\n| [SYSTEM] :: queue is Initialized. |\n");
    printf("+-----------------------------------+\n\n");

}



/* Enqueue */
void  * qEnqueue( void * aArgPthread )
{
    qArgPthread * sArgPthread   = NULL;
    qQueue      * sQueue        = NULL;
    qNode       * sNewNode      = NULL;

    int           sInputData    = 0;
    int           sRepeatCount  = 0;
    int           sThreadId     = 0;
    /* int           sMaxBuff      = 0; */

    sArgPthread = (qArgPthread *)aArgPthread;

    sQueue      = sArgPthread->mQueue;
    sThreadId   = sArgPthread->mThreadId;

    /* sMaxBuff    = DATA_MAX_COUNT * 2; */

    printf("[#%3d][ENQUEUE_THREAD] is ready.\n", sThreadId );
    gFlagForThread[sThreadId] += 1;
    
    while( sRepeatCount < DATA_MAX_COUNT )
    {
        /* Dequeue 스레드의 준비를 기다린다. */
        while( gFlagForEnqDeq[sThreadId] == 0 )
        {
            sleep(0);
        }
        
        /* 스레드 Mutex Lock */
        pthread_mutex_lock( &gMutexForThread );

        sNewNode = (qNode *)malloc( sizeof( qNode ) );
            
        if( sQueue->mCount >= THREAD_COUNT /* sMaxBuff */ )
        {
            /* printf("[THREAD_%d]\n[ERROR-003] :: queue is full. [ %d/%d ]\n\n", */
            /*        sThreadId, */
            /*        DATA_MAX_COUNT, */
            /*        DATA_MAX_COUNT ); */
        }
        else
        {
            sInputData                  = rand() % 90 + 10; /* 임의의 데이터 값을 두자리 수로 고정 */
            sNewNode->mData             = sInputData;
            sNewNode->mNextNode         = NULL;
            sNewNode->mEnqueueThreadId  = sThreadId;

            printf("[#%3d][ENQUEUE_THREAD] Enqueue DATA : %d( %d/%d ) QUEUE STATUS : [ %d/%d ]\n",
                   sThreadId,
                   sInputData,
                   sRepeatCount+1,
                   DATA_MAX_COUNT,
                   sQueue->mCount+1,
                   THREAD_COUNT /* sMaxBuff */ );

            if( sQueue->mCount == 0 )
            {
                sQueue->mFrontNode   = sNewNode;
                sNewNode->mPrevNode  = NULL;
            }
            else
            {
                sQueue->mRearNode->mNextNode  = sNewNode;
                sNewNode->mPrevNode           = sQueue->mRearNode;
            }

            sQueue->mRearNode  = sNewNode;
            sQueue->mCount++;

            sRepeatCount++;
        }

        /* 스레드 Mutex Unlock */
        pthread_mutex_unlock( &gMutexForThread );

        /* Enqueue Mutex Lock */
        pthread_mutex_lock( &gMutexForEnqDeq[sThreadId] );

        gFlagForEnqDeq[sThreadId] = 0; /* 0으로 다시 초기화 */
        
        /* Dequeue 스레드에게 시그널 */
        pthread_cond_signal( &gCondition[sThreadId] );

        /* Enqueue Mutex Unlock */
        pthread_mutex_unlock( &gMutexForEnqDeq[sThreadId] );

        /* if( sRepeatCount%5 == 0 ) */
        /* { */
        usleep(50);
        /* } */
    }
}





/* Dequeue */
void  * qDequeue( void * aArgPthread )
{
    qArgPthread * sArgPthread   = NULL;
    qQueue      * sQueue        = NULL;

    int       sOutputData       = 0;
    int       sRepeatCount      = 0;
    int       sThreadId         = 0;
    int       sEnqueueThreadId  = 0;
    /* int       sMaxBuff          = 0; */

    sArgPthread = (qArgPthread *)aArgPthread;
    
    sQueue     = sArgPthread->mQueue;
    sThreadId  = sArgPthread->mThreadId;

    /* sMaxBuff   = DATA_MAX_COUNT * 2; */

    printf("[#%3d][DEQUEUE_THREAD] is ready.\n", sThreadId );
    gFlagForThread[sThreadId] += 1;

    while( sRepeatCount < DATA_MAX_COUNT )
    {
        /* Mutex Lock */
        pthread_mutex_lock( &gMutexForEnqDeq[sThreadId] );

        /* Enqueue 스레드에게 준비가 되었음을 알림 */
        gFlagForEnqDeq[sThreadId] = 1;

        /* Enqueue 까지 기다린다. */
        pthread_cond_wait( &gCondition[sThreadId], &gMutexForEnqDeq[sThreadId] );

        /* Mutex Unlock */
        pthread_mutex_unlock( &gMutexForEnqDeq[sThreadId] );
        
        /* 스레드 Mutex Lock */
        pthread_mutex_lock( &gMutexForThread );

        if( sQueue->mCount == 0 )
        {
            /* printf("[THREAD_%d]\n[ERROR-004] :: queue is empty. [ 0/%d ]\n\n", */
            /*        sThreadId, */
            /*        DATA_MAX_COUNT ); */
        }
        else
        {
            sOutputData       = sQueue->mFrontNode->mData;
            sEnqueueThreadId  = sQueue->mFrontNode->mEnqueueThreadId;

            printf("[#%3d][DEQUEUE_THREAD] Dequeue DATA : %d( %d/%d ) QUEUE STATUS : [ %d/%d ] data by [#%3d][ENQUEUE_THREAD]\n",
                   sThreadId,
                   sOutputData,
                   sRepeatCount+1,
                   DATA_MAX_COUNT,
                   sQueue->mCount-1,
                   THREAD_COUNT /* sMaxBuff */,
                   sEnqueueThreadId );

            if( sQueue->mCount == 1 )
            {
                free( sQueue->mFrontNode );
                sQueue->mFrontNode  = NULL;
                sQueue->mRearNode   = NULL;
                sQueue->mCount      = 0;
            }
            else
            {
                sQueue->mFrontNode = sQueue->mFrontNode->mNextNode;
                free( sQueue->mFrontNode->mPrevNode );
                sQueue->mFrontNode->mPrevNode = NULL;
                sQueue->mCount--;
            }

            sRepeatCount++;
        }

        /* 스레드 Mutex Unlock */
        pthread_mutex_unlock( &gMutexForThread );

        /* if( sRepeatCount%5 == 0 ) */
        /* { */
            usleep(50);
        /* } */

    }

}



공부/코딩연습 등의 이유로 얼마든지 퍼가셔도 좋습니다.


하지만 라인마다 의미를 파악하지 않고 무작정 복사 붙여넣기는

아무것도 남지 않습니다.


또한 댓글로 궁금하신 라인 등 얼마든지 물어보시면 

바로바로 대답해드리겠습니다 :)


공부를 게을리하지 맙시다! 


블로그 이미지

차트

소소한 일상 C코드 DB 항상 행복하게^-^★

,