Producer/Consumer - bounded buffer 문제

데이터를 만들어내는 Producer(생산자) 쓰레드와 그런 데이터들을 가져오는 Consumer(소비자) 쓰레드가 여러개 있다고 했을 때, 이러한 데이터들이 담길 버퍼가 필요하다.

이런 프로듀서 / 컨슈머 구조는 실제로도 자주 일어나는데, 리눅스의 파이프라인이 이와 같다.

예를 들어 grep foo bar.txt | wc -l를 실행시키는 경우, grep 의 출력 결과를 표준출력으로 보내는 대신에 파이프(버퍼)로 보내는 프로듀서가 되며, 이러한 파이프(버퍼)에서 wc 가 데이터를 컨슘한다고 볼 수 있다.

이렇게 프로듀서들과 컨슈머 쓰레드들 사이에 오고갈 데이터들에 대한 동기화를 잘 시켜야 할 때 고려해야할 사항들이 몇가지 있다.

  1. 프로듀서가 빈 공간이 없는 버퍼에 데이터를 넣으려고 하는 경우

  2. 컨슈머가 빈 버퍼로부터 데이터를 가져오려는 경우

시스템에서 관리되는 요청 큐(버퍼)는 값을 무한히 넣을 수 없기 때문에 bounded buffer 라고도 불리며, 이 버퍼에 프로듀서와 컨슈머가 데이터를 넣었다 뺐다 하는 과정에서 경쟁조건이 발생할 수 있기 때문에, 이를 잘 해결해야 한다.

무한대가 아닌 유한대의 버퍼에, 끊임없이 집어넣으려는 프로듀서와 가져가려는 컨슈머가 여럿 있을때 버퍼를 사용하여 경쟁조건이 발생할 수 있는데 이게 발생하지 않게 잘 동기화 하면서 구현해보자.


간단하게 버퍼의 크기가 1이고, 프로듀서와 컨슈머가 각각 이 버퍼로부터 값을 쓰고 읽어오는 형식이라고 예를 들어보자.

int buffer;
int count = 0;

void put(int value)
{
    count = 1;
    buffer = value;
}

void get()
{
    count = 0;
    return buffer;
}

프로듀서는 put()을 호출함으로써 원하는 value를 버퍼에 집어넣는데, 이 때 count를 업데이트함으로써 이 버퍼에 값이 들어있음을 표시해준다.

컨슈머는 그래서 get()을 통해 count 를 변경시켜 자기가 값을 가져갔다고 알린 뒤, 버퍼의 값을 가져온다.

그러나 컨슈머가 아직 버퍼에서 값을 꺼내가지 않았음에도 프로듀서가 계속 put을 시도할 수 있기 때문에, put하기 전에 카운트 값을 확인하여 CV에서 대기할 수 있도록 만든다.

마찬가지로 컨슈머가 get을 호출 하기 전에서도 매번 카운트값을 확인하여, 버퍼가 비어있을 경우에는 CV에서 대기함으로써 프로듀서와 컨슈머가 정상적으로 작동할 수 있도록 한다.

int loops;
cond_t cond;
mutex_t mutex;

void* producer(void* arg)
{
    int i;
    for(i = 0 ; i < loops ; i++)
    {
        Pthread_mutex_lock(&mutex);
        // 버퍼에 값이 들어있을 경우에는
        if(count == 1)
            // 버퍼에 빌 때 까지 CV 에서 대기
            Pthread_cond_wait(&cond, &mutex);
        // 누군가가 시그널을 보내 깨워주면 put 호출
        put(i);
        // 버퍼에 값을 넣었다는 시그널을 보내서 CV에 대기중이던 컨슈머에게 알림
        Pthread_cond_signal(&cond);
        Pthread_mutex_unlock(&mutex);
    }
}

void* consumer(void* arg)
{
    int i;
    for(i = 0 ; i < loops ; i++)
    {
        Pthread_mutex_lock(&mutex);
        // 버퍼가 비어있을 경우
        if(count == 0)
            // 프로듀서가 채워줄 때 까지 CV 에서 대기
            Pthread_cond_wait(&cond, &mutex);
        // 시그널을 받았다면 get을 통해 버퍼의 값 가져온 뒤
        int tmp = get();
        // 프로듀서에게 시그널 보냄
        Pthread_cond_signal(&cond);
        Pthread_mutex_unlock(&mutex);
    }
}


그러나 이 경우 프로듀서나 컨슈머 쓰레드가 여럿 실행되고 있을때는 기대하던 대로 동작하지 않는다.

컨슈머1이 CV에서 대기하다가 나오기 전에, 다른 컨슈머가 미리 버퍼로부터 값을 가져간 뒤에 컨슈머1이 스케줄링 되는 경우에는 버퍼에 값이 없는 상태이기 때문에 get()에서 에러가 발생할 것이다.

즉, 프로듀서가 버퍼에 값을 넣은 후 CV에서 대기중이던 컨슈머1을 깨워주긴 했지만, 운영체제가 관리하던 레디큐에 이미 들어가있던 컨슈머2가 먼저 스케줄링 되면서 값을 가져간 상황이다.

이런 문제를 해결하기 위해 단순히 if 문으로 count 상태값을 한번만 검사하는 것이 아닌 while 문으로 매번 꺠어날때마다 새로 검사해줄 수 있겠다.

while(count == 0)
    Pthread_cond_wait(&cond, &mutex);
    // 깨어나더라도 다시 while 문으로 돌아감으로써, 버퍼가 비어있으면 또다시 Sleep

만약 다른 컨슈머가 이미 값을 가져갔다고 하더라도, 현재 실행중인 컨슈머 쓰레드는 다시 count 값을 검사하기 때문에, 값이 없더라도 get을 호출하지 않고 CV에 들어가기 때문이다.

그러나 이 역시도, 모든 쓰레드들이 CV에 들어가버리는 최악의 사태가 발생할 수 있기 때문에, CV를 여러개 둠으로써 다시 해결할 수 있다.

현재 코드에서는 컨슈머가 값을 가져간 후 프로듀서에게 시그널을 보내 다시 값을 채우게 했어야 했지만, 다른 컨슈머가 스케줄링 되었기 때문에 문제가 발생하였다.

그래서 애초에 컨슈머들과 프로듀서들이 줄 서는 공간을 구별해서, 프로듀서가 대기할 CV와 컨슈머가 대기할 CV로 나눠 관리하는 방식이다.

cond_t empty, fill;

while(count == 0)
    Pthread_cond_wait(&fill, &mutex);
put(i);
Pthread_cond_signal(&empty);

컨슈머는 프로듀서가 줄서는 CV인 empty에 시그널을 보내고, 자신은 fill이라는 CV에서 대기함으로써 문제를 해결할 수 있다.


지금까지는 버퍼의 크기를 1로 설정하고 단일 변수에 값을 세팅하고 소비하는 실행 흐름으로 확인해보았지만, 버퍼의 크기가 늘어난다고 해서 프로듀서나 컨슈머 코드가 변경되지는 않는다.

대신 put()과 get()이 아래와 같은 형식으로 변경된다.

int buffer[MAX];
int fill = 0;
int use = 0;
int count = 0;

void put(int value)
{
    buffer[fill] = value;
    fill = (fill + 1) % MAX;
    count++;
}

void get()
{
    int tmp = buffer[use];
    use = (use + 1) % MAX;
    count--;
    return tmp;
}

버퍼의 크기를, 미리 설정해둔 최대값 MAX까지 가질 수 있는 배열 형식으로 선언하고, 프로듀서와 컨슈머가 버퍼에서 가져갈 때 참조할 인덱스값도 필요하다.

또한 put과 get 과정에서, 단순히 배열에 값을 추가하고 삭제하는 방식으로 구현하면 인덱스가 금방 초과되기 때문에, 모듈러 연산을 통해 버퍼가 원형인 것 마냥 사용할 수 있다.

이렇게 버퍼를 확장할 경우에는 생산자와 소비자 쓰레드들 사이에 문맥교환이 발생하는 빈도가 훨씬 줄어들기 때문에 병행성이 좋아져 효율적이게 된다.