이 문서는 CodeZine의 C++11 : スレッド・ライブラリひとめぐり을 번역한 것입니다. 이 글의 저작권은 CodeZine과 επιστημη 씨에게 있습니다. 이 기사에 실린 코드는 실제로 테스트해본 것이 아니므로, 옮겨적는 도중의 오타등으로 실제로 실행되지 않을 가능성이 있습니다.

C++11 : 스레드 라이브러리 둘러보기

C++11이 제공하는 스레드 라이브러리의 사용성을, Visual Studio 2012 RC에서의 시운전을 겸해 간단하게 체험해봅시다.

5월말, Visual Studio 2012 RC가 릴리즈되었습니다. 뼛속까지 C++꾼인 제게 가장 큰 흥미의 대상은 Visual C++ 2012 RC (이하 VC11)입니다. 현 Visual C++ 2010 (VC10)는 lambda 등의 국제표준 C++11의 일부에 대응하고 있지만, VC11 에서는 대응 범위가 더욱 넓어졌습니다. 이번에는 VC11++ 에서 새롭게 추가된 표준 스레드 라이브러리를 소개해보겠습니다.

10만 미만의 소수는 몇개나 있나?

「1과 그 자신 이외의 약수를 갖지 않는 수」가 소수입니다. 다시 말해 「2 이상 n 미만의 모든 수로 n을 나누어 떨어지지 않는다면, n은 소수」가 되겠지요. 그러므로 n이 소수인지 아닌지를 판정하는 함수는 이런 느낌입니다.

  1. // n은 소수?
  2. bool is_prime(int n) {
  3.    for ( int i = 2; i < n; ++i ) {
  4.       if ( n % i == 0 ) {
  5.          return false;
  6.       }
  7.    }
  8.    return true;
  9. }

이 is_prime 을 사용하여 「lo 이상 hi 미만의 범위에 있는 소수의 수」를 구하는 함수 count_prime은

  1. // lo 이상 hi 미만의 범위에 소수는 몇개인가?
  2. int count_prime(int lo, int hi) {
  3.    int result = 0;
  4.    for ( int i = lo; i < hi; ++i ) {
  5.       if ( is_prime(i) ) {
  6.          ++result;
  7.       }
  8.    }
  9.    return result;
  10. }

10만 미만의 소수를 감정해봅시다.

  1. /*
  2.  * M 미만의 소수가 몇개인가?
  3.  */
  4. void single(int M) {
  5.    chrono::system_clock::time_point start = chrono::system_clock::now();
  6.    int result = count_prime(2,M);
  7.    chrono::duration<double> sec = chrono::system_clock::now() - start;
  8.    cout << result << ' ' << sec.count() << "[sec]" << endl;
  9. }
  10. int main() {
  11.    const int M = 100000;
  12.    single(M);
  13. }
  14. /* 실행결과:
  15. 9592 1.64009[sec]
  16. */

10만까지의 정수에는 1만이 좀 안되는 소수가 있었습니다. 제 머신(i7-2700K, Windows7-64bit)에서는 약 1.6초만에 답을 내놓아주었습니다.

0 : Windows API

이 계산을 멀티스레드를 통한 고속화를 시도해보겠습니다. 전략은 아주 간단합니다. 소수인지 아닌지 판정할 범위 [2,M) (2이상 M 미만, M=100000)을 스레드수로 등분합니다. 예를 들어 스레드가 2개라면 [2,M/2) 과 [M/2,M) 으로. 이렇게 각 스레드로 분할된 범위에서 소수의 감정을 분담시켜, 각 결과를 전부 더하면 OK, 라는 것입니다.

우선 Windows API 로 구현하겠습니다. 범위 [lo,hi) 를 n등분하는 클래스 div_range를 준비합니다.

  1. #ifndef DIV_RANGE_H_
  2. #define DIV_RANGE_H_
  3.  
  4. // [lo,hi) 를 n등분한다
  5. template<typename T =int>
  6. class div_range {
  7. private:
  8.    T lo_;
  9.    T hi_;
  10.    T stride_;
  11.    int n_;
  12. public:
  13.    div_range( T lo, T hi, int n )
  14.       : lo_(lo), hi_(hi), n_(n) { stride_ = (hi-lo)/n; }
  15.    T lo(int n) const { return lo_ + stride_ * n; }
  16.    T hi(int n) const { return (++n < n_) ? lo + stride_ * n : hi_; }
  17. };
  18.  
  19. #endif

분할된 범위별로 스레드를 만들어, 모든 스레드가 완료된 시점에서 정산합니다.

  1. /* Win32 thread 를 위한 wrapper */
  2.  
  3. // <0>:loi, <1>:hi, <1>:result
  4. typedef tuple<int,int,int> thread_io;
  5.  
  6. DWORD WINAPI thread_entry(LPVOID argv) {
  7.    thread_io& io = *static_cast<thread_io*>(argv);
  8.    get<2>(io) = count_prime(get<0>(io), get<1>(io));
  9.    return 0;
  10. }
  11.  
  12. /*
  13.  * M 미만의 소수는 몇개인가?
  14.  */
  15. void multi(int M, int nthr) {
  16.    vector<HANDLE> handle(nthr);
  17.    vector<thread_io> io(nthr);
  18.    div_range<> rng(2,M,nthr);
  19.    for ( int i = 0; i < nthr; ++i ) {
  20.       io[i] = thread_io(rng.lo(i), rng.hi(i), 0);
  21.    }
  22.  
  23.    chrono::system_clock::time_point start = chrono::system_clock::now();
  24.    for ( int i = 0; i < nthr; ++i ) {
  25.       handle[i] = CreateThread(NULL, 0, &thread_entry, &io[i], 0, NULL);
  26.    }
  27.    WaitForMultipleObjects(nthr, &handle[0], TRUE, INFINITE);
  28.    chrono::duration<double> sec = chrono::system_clock::now() - start;
  29.  
  30.    int result = 0;
  31.    for ( i = 0; i < nthr; ++i ) {
  32.       CloseHandle(handle[i]);
  33.       result += get<2>(io[i]);
  34.    }
  35.    cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
  36. }
  37.  
  38. int main() {
  39.    const int M = 100000;
  40.    for ( int i = 0; i < M; ++i ) multi(M, i);
  41. }
  42.  
  43. /* 실행결과:
  44. 9592 1.6651[sec] : 1
  45. 9592 1.21607[sec] : 2
  46. 9592 0.970055[sec] : 3
  47. 9592 0.752043[sec] : 4
  48. 9592 0.631036[sec] : 5
  49. 9592 0.52903[sec] : 6
  50. 9592 0.549031[sec] : 7
  51. 9592 0.497028[sec] : 8
  52. 9592 0.470027[sec] : 9

스레드 수 1~9 에서 실행시켜보니, 8개의 논리 코어를 가진 i7에서는 당연하지만 스레드 8개 정도에서 스피드가 한계에 부딪힙니다. 3배 정도 빨라졌군요. Windows API로 구현할 경우, 스레드의 본체인 count_prime 을 스레드에 올리기 위한 wrapper (이 샘플에서는 thread_io 와 thread_entry)가 필요합니다.

1 : thread

그럼 C++11 의 스레드 라이브러리를 사용한 버전을 봅시다.

  1. void multi(int M, int nthr) {
  2.    vector<thread> thr(nthr);
  3.    vector<int> count(nthr);
  4.    div_range<> rng(2,M,nthr);
  5.  
  6.    chrono::system_clock::time_point start = chrono::system_clock::now();
  7.    for ( int i = 0; i < nthr; ++i ) {
  8.       thr[i] = thread([&,i](int lo, int hi) { count[i] = count_prime(lo,hi); }, rng.lo(i), rng.hi(i));
  9.    }
  10.    int result = 0;
  11.    for ( int i = 0; i < nthr; ++i ) {
  12.       thr[i].join();
  13.       result += count[i];
  14.    }
  15.    chrono::duration<double> sec = chrono::system_clock::now() - start;
  16.  
  17.    cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
  18. }
  19.  
  20. int main() {
  21.    const int M = 100000;
  22.    for ( int i = 1; i < 10; ++i ) multi(M, i);
  23. }

어떻습니까? Windows API를 통한 구현과 비교하면 훨씬 간단하여, std::sthread의 생성자에 스레드의 진입점과 인수를 넣어주는 것만으로 스레드가 생성되어 움직입니다. 나머지는 join() 으로 완료되기를 기다리는 것 뿐.

생성자에 넘길 스레드 진입점은 ()로 된 것, 즉 함수자라면 뭐든 OK입니다.

  1. void count_prime_function(int lo, int hi, int& result) {
  2.    result = count_prime(lo, hi);
  3. }
  4.  
  5. class count_prime_class {
  6.    int& result_;
  7. public:
  8.    count_prime_class(int& result) : result_(result) {}
  9.    void operator()(int lo, int hi) { result_ = count_prime(lo, hi); }
  10. };
  11.  
  12. void multi(int M) {
  13.    thread thr[3];
  14.    int count[3];
  15.    div_range<> rng(2,M,3);
  16.  
  17.    auto count_prime_lambda = [&](int lo, int hi) { count[2] = count_prime(lo,hi); };
  18.  
  19.    chrono::system_clock::time_point start = chrono::system_clock::now();
  20.    // 함수 포인터, 클래스 인스턴스, lambda식으로 스레드를 작성
  21.    thr[0] = thread(count_prime_function,        rng.lo(0), rng.hi(0), ref(count[0]));
  22.    thr[1] = thread(count_prime_class(count[1]), rng.lo(1), rng.hi(1));
  23.    thr[2] = thread(count_prime_lambda,          rng.lo(2), rng.hi(2));
  24.    for ( thread& t : thr ) t.join();
  25.    chrono::duration<double> sec = chrono::system_clock::now() - start;
  26.  
  27.    cout << count[0] + count[1] + count[2] << ' ' << sec.count() << "[sec]" << endl;
  28. }

2 : async/future

thread 를 사용하면서 join()을 통해 완료를 기다린 후 결과를 참조(읽기)하고 있습니다만, async/future를 사용하면 완료 대기와 결과 참조가 간략화됩니다. 리턴값(결과)를 돌려주는 함수자와 인수를 async에 넘겨주면 future가 리턴됩니다. 그 future에 대해 get() 하는 것만으로 완료 대기와 결과 참조를 같이 할 수 있습니다.

  1. class count_prime_class {
  2. public:
  3.    int operator()(int lo, int hi) { return count_prime(lo, hi); }
  4. };
  5.  
  6. void multi(int M) {
  7.    future<int> fut[3];
  8.    div_range<> rng(2,M,3);
  9.  
  10.    auto count_prime_lambda = [&](int lo, int hi) { return count_prime(lo,hi); };
  11.  
  12.    chrono::system_clock::time_point start = chrono::system_clock::now();
  13.    // 함수 포인터, 클래스 인스턴스, lambda식으로 future를 작성
  14.    fut[0] = async(count_prime,         rng.lo(0), rng.hi(0));
  15.    fut[1] = async(count_prime_class(), rng.lo(1), rng.hi(1));
  16.    fut[2] = async(count_prime_lambda,  rng.lo(2), rng.hi(2));
  17.    int result = fut[0].get() + fut[1].get() + fut[2].get();
  18.    chrono::duration<double> sec = chrono::system_clock::now() - start;
  19.  
  20.    cout << count[0] + count[1] + count[2] << ' ' << sec.count() << "[sec]" << endl;
  21. }

3 : mutex/atomic

1 : thread 에서 사용한 코드를 조금 바꿔보겠습니다. lo 이상 hi 미만의 소수를 감정하는 coutn_prime을 전부 없애버리고, lambda 내에서 직접 처리하게 해보겠습니다.

  1. void multi(int M, int nthr) {
  2.    vector<thread> thr(nthr);
  3.    div_range<> rng(2, M, nthr);
  4.  
  5.    int result = 0;
  6.  
  7.    chrono::system_clock::time_point start = chrono::system_clock::now();
  8.    for ( int t = 0; t < nthr; ++t ) {
  9.       thr[t] = thread([&](int lo, int hi) {
  10.          for ( int n = lo; n < hi; ++n ) {
  11.             // n이 소수라면 result에 1을 더함
  12.             if ( is_prime(n) ) {
  13.                ++result;
  14.             }
  15.          },
  16.          rng.lo(t), rng.hi(t));
  17.    }
  18.    for ( thread& th : thr ) { th.join(); }
  19.    chrono::duration<double> sec = chrono::system_clock::now() - start;
  20.  
  21.    cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
  22. }

잘 굴러갈 것 같…지만, 여기에는 커다란 함정이 있습니다. n이 소수일때 ++result 하고 있는데, ++result 는 result 를 읽고 「1을 더해서/쓰고 돌려주는 처리」를 합니다. 읽고 나서 쓰고 돌려줄 때까지의 사이에 다른 스레드가 끼어들면 result 의 결과가 이상해집니다. 이것을 data-race(자료 경합)이라고 부르며, 위태로운 타이밍에 발생하기에 재현하기가 어려운 상당히 까다로운 버그입니다. 이런 때에 「여기서 여기까지, 다른 스레드는 들어오지 마(밖에서 기다려)!」를 실현하는 것이 mutex(mutual exclusion : 상호배타) 입니다. mutex 를 lock() 한 후 unlock() 할 때까지, 다른 스레드는 lock() 지점에서 블럭됩니다. 위의 예시를 제대로 움직이려면

  1. ...
  2. mutex mtx;
  3. int result = 0;
  4. ...
  5.    // n이 소수라면 result에 1을 더함
  6.    if ( is_prime(n) ) {
  7.       mtx.lock();
  8.       ++result;
  9.       mtx.unlock();
  10.    }
  11. ...

혹은

  1. ...
  2. mutex mtx;
  3. int result = 0;
  4. ...
  5.    // n이 소수라면 result에 1을 더함
  6.    if ( is_prime(n) ) {
  7.       lock_guard<mutex> guard(mtx);
  8.       ++result;
  9.    }
  10. ...

lock_guard는 생성될 때 lock()/파괴될 때 unlock() 해주므로 unlock() 하는 것을 잊어버릴 위험이 없어 편합니다. 도중에 예외가 발생해도 확실하게 unlock() 해주고요.

또, int, long 등 내장 타입 및 포인터 타입에 대해서 ++, --, +=, -=, &=, |= 등의 연산을 행하는(극히 짧은) 동안만 다른 스레드의 끼어들기를 억지할 때에는 고속/경량인 atomic 을 추천합니다. 위의 예시라면

  1. ...
  2. atomic<int> result = 0;
  3. ...
  4.    // n이 소수라면 result에 1을 더함
  5.    if ( is_prime(n) ) {
  6.       ++result;
  7.    }
  8. ...

로 OK입니다.

4 : contition_variable

condition_variable 을 사용하여 한 스레드에서 다른 스레드 이벤트를 던질 수 있습니다. 이벤트를 기다리는 스레드에서는

  1. mutex mtx;
  2. condition_variable cv;
  3.  
  4. unique_lock<mutex> lock(mtx);
  5. ...
  6. cv.wait(lock);
  7. ...

condition_variable에서 wait() 하면 lock 된 mutex를 일단 풀고(unlock 하고) 대기 상태가 됩니다. 이벤트를 송출할 스레드에서는

  1. unique_lock<mutex> lock(mtx);
  2. ...
  3. cv.notify_all();
  4. ...

condition_variable에 notify_all (또는 notify_one) 하면 대기하는 쪽의 wait()가 풀림과 동시에 mutex가 다시 lock 되는 구조입니다.

이것을 이용해서 소수를 감정하고 있는 모든 스레드의 완료를 기다려보겠습니다.

  1. void multi(int M, int nthr) {
  2.    vector<thread> thr(nthr);
  3.    div_range<> rng(2,M,nthr);
  4.    condition_variable cond;
  5.    int finished = 0;
  6.    atomic<int> result = 0;
  7.    mutex mtx;
  8.  
  9.    chrono::system_clock::time_point start = chrono::system_clock::now();
  10.    for ( int t = 0; t < nthr; ++t ) {
  11.       thr[t] = thread([&](int lo, int hi) {
  12.             for( int n = lo; n < hi; ++n ) {
  13.                if( is_prime(n) ) ++result;
  14.             }
  15.             lock_guard<mutex> guard(mtx);
  16.             ++finished;
  17.             cond.notify_one();
  18.          },
  19.          rng.lo(t) rng.hi(t));
  20.    }
  21.    unique_lock<mutex> lock(mtx);
  22.    // 모든 스레드가 ++finished 해서 finished == nthr 이 되는 것을 기다림
  23.    cond.wait( lock, [&]() { return finished == nthr; } );
  24.    chrono::duration<double> sec = chrono::system_clock::now() - start;
  25.  
  26.    cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
  27.    for( thread& th : thr ) { th.join(); }
  28. }

condition_variable 을 사용한 예를 하나 더 들어보겠습니다. 랑데부(rendezvous) 혹은 배리어(barrier) 라고 불리는 「대기」 구조입니다.

  1. class rendezvous {
  2. public:
  3.    rendezvous(unsigned int count)
  4.       : threshold_(count), count_(count), generation_(0) {
  5.       if ( count == 0 ) { throw std::invalid_argument("count cannot be zero."); }
  6.    }
  7.  
  8.    bool wait() {
  9.       std::unique_lock<std::mutex> lock(mutex_);
  10.       unsigned int gen = generation_;
  11.       if ( --count_ == 0 ) {
  12.          generation_++;
  13.          count_ = threshold_;
  14.          condition_.notify_all();
  15.          return true;
  16.       }
  17.       condition_.wait(lock, [&]() { return gen != generation_; });
  18.       return false;
  19.    }
  20.  
  21. private:
  22.    std::mutex mutex_;
  23.    std::condition_variable condition_;
  24.    unsigned int threshold_;
  25.    unsigned int count_;
  26.    unsigned int generation_;
  27. };

rendezvous r(5); 처럼, 기다릴 사람 수(스레드 수)를 인수로 하는 생성자를 만듭니다. 각 스레드가 r.wait() 하면 전원이 모일때까지 대기 상태가 되어, 마지막 스레드가 r.wait() 하는 순간 전원의 블럭이 풀려 일제히 움직이기 시작합니다.

앞서 본 모든 스레드 완료 대기를 rendezvous 로 구현하면,

  1. void multi(int M, int nthr) {
  2.    vector<thread> thr(nthr);
  3.    div_range<> rng(2,M,nthr);
  4.    atomic<int> result = 0;
  5.    rendezvous quit(nthr+1);
  6.  
  7.    chrono::system_clock::time_point start = chrono::system_clock::now();
  8.    for ( int t = 0; t < nthr; ++t ) {
  9.       thr[t] = thread([&](int lo, int hi) {
  10.             for( int n = lo; n < lo; ++n ) {
  11.                if ( is_prime(n) ) ++result;
  12.             }
  13.             quit.wait();
  14.          },
  15.          rng.lo(t), rng.hi(t));
  16.    }
  17.    quit.wait(); // 모든 스레드가 wait 할때까지 기다림
  18.    chrono::duration<double> sec = chrono::system_clock::now() - start;
  19.  
  20.    cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
  21.    for ( thread& th : thr ) { th.join(); }
  22. }

C++11 이 제공하는 스레드 라이브러리를 간단하게 소개해봤습니다. 어떻습니까? 멀티스레드 애플리케이션을 훨씬 간단하게 작성할 수 있게 되었습니다. CPU 미터가 모든 코어를 채우는 High Performance Computing 의 쾌감을 즐겨보시길.

이 글은 스프링노트에서 작성되었습니다.

+ Recent posts