이 문서는 CodeZine의 C++11 : スレッド・ライブラリひとめぐり을 번역한 것입니다. 이 글의 저작권은 CodeZine과 επιστημη 씨에게 있습니다. 이 기사에 실린 코드는 실제로 테스트해본 것이 아니므로, 옮겨적는 도중의 오타등으로 실제로 실행되지 않을 가능성이 있습니다.
C++11 : 스레드 라이브러리 둘러보기
C++11이 제공하는 스레드 라이브러리의 사용성을, Visual Studio 2012 RC에서의 시운전을 겸해 간단하게 체험해봅시다.
5월말, Visual Studio 2012 RC가 릴리즈되었습니다. 뼛속까지 C++꾼인 제게 가장 큰 흥미의 대상은 Visual C++ 2012 RC (이하 VC11)입니다. 현 Visual C++ 2010 (VC10)는 lambda 등의 국제표준 C++11의 일부에 대응하고 있지만, VC11 에서는 대응 범위가 더욱 넓어졌습니다. 이번에는 VC11++ 에서 새롭게 추가된 표준 스레드 라이브러리를 소개해보겠습니다.
10만 미만의 소수는 몇개나 있나?
「1과 그 자신 이외의 약수를 갖지 않는 수」가 소수입니다. 다시 말해 「2 이상 n 미만의 모든 수로 n을 나누어 떨어지지 않는다면, n은 소수」가 되겠지요. 그러므로 n이 소수인지 아닌지를 판정하는 함수는 이런 느낌입니다.
- // n은 소수?
- bool is_prime(int n) {
- for ( int i = 2; i < n; ++i ) {
- if ( n % i == 0 ) {
- return false;
- }
- }
- return true;
- }
이 is_prime 을 사용하여 「lo 이상 hi 미만의 범위에 있는 소수의 수」를 구하는 함수 count_prime은
- // lo 이상 hi 미만의 범위에 소수는 몇개인가?
- int count_prime(int lo, int hi) {
- int result = 0;
- for ( int i = lo; i < hi; ++i ) {
- if ( is_prime(i) ) {
- ++result;
- }
- }
- return result;
- }
10만 미만의 소수를 감정해봅시다.
- /*
- * M 미만의 소수가 몇개인가?
- */
- void single(int M) {
- chrono::system_clock::time_point start = chrono::system_clock::now();
- int result = count_prime(2,M);
- chrono::duration<double> sec = chrono::system_clock::now() - start;
- cout << result << ' ' << sec.count() << "[sec]" << endl;
- }
- int main() {
- const int M = 100000;
- single(M);
- }
- /* 실행결과:
- 9592 1.64009[sec]
- */
10만까지의 정수에는 1만이 좀 안되는 소수가 있었습니다. 제 머신(i7-2700K, Windows7-64bit)에서는 약 1.6초만에 답을 내놓아주었습니다.
0 : Windows API
이 계산을 멀티스레드를 통한 고속화를 시도해보겠습니다. 전략은 아주 간단합니다. 소수인지 아닌지 판정할 범위 [2,M) (2이상 M 미만, M=100000)을 스레드수로 등분합니다. 예를 들어 스레드가 2개라면 [2,M/2) 과 [M/2,M) 으로. 이렇게 각 스레드로 분할된 범위에서 소수의 감정을 분담시켜, 각 결과를 전부 더하면 OK, 라는 것입니다.
우선 Windows API 로 구현하겠습니다. 범위 [lo,hi) 를 n등분하는 클래스 div_range를 준비합니다.
- #ifndef DIV_RANGE_H_
- #define DIV_RANGE_H_
- // [lo,hi) 를 n등분한다
- template<typename T =int>
- class div_range {
- private:
- T lo_;
- T hi_;
- T stride_;
- int n_;
- public:
- div_range( T lo, T hi, int n )
- : lo_(lo), hi_(hi), n_(n) { stride_ = (hi-lo)/n; }
- T lo(int n) const { return lo_ + stride_ * n; }
- T hi(int n) const { return (++n < n_) ? lo + stride_ * n : hi_; }
- };
- #endif
분할된 범위별로 스레드를 만들어, 모든 스레드가 완료된 시점에서 정산합니다.
- /* Win32 thread 를 위한 wrapper */
- // <0>:loi, <1>:hi, <1>:result
- typedef tuple<int,int,int> thread_io;
- DWORD WINAPI thread_entry(LPVOID argv) {
- thread_io& io = *static_cast<thread_io*>(argv);
- get<2>(io) = count_prime(get<0>(io), get<1>(io));
- return 0;
- }
- /*
- * M 미만의 소수는 몇개인가?
- */
- void multi(int M, int nthr) {
- vector<HANDLE> handle(nthr);
- vector<thread_io> io(nthr);
- div_range<> rng(2,M,nthr);
- for ( int i = 0; i < nthr; ++i ) {
- io[i] = thread_io(rng.lo(i), rng.hi(i), 0);
- }
- chrono::system_clock::time_point start = chrono::system_clock::now();
- for ( int i = 0; i < nthr; ++i ) {
- handle[i] = CreateThread(NULL, 0, &thread_entry, &io[i], 0, NULL);
- }
- WaitForMultipleObjects(nthr, &handle[0], TRUE, INFINITE);
- chrono::duration<double> sec = chrono::system_clock::now() - start;
- int result = 0;
- for ( i = 0; i < nthr; ++i ) {
- CloseHandle(handle[i]);
- result += get<2>(io[i]);
- }
- cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
- }
- int main() {
- const int M = 100000;
- for ( int i = 0; i < M; ++i ) multi(M, i);
- }
- /* 실행결과:
- 9592 1.6651[sec] : 1
- 9592 1.21607[sec] : 2
- 9592 0.970055[sec] : 3
- 9592 0.752043[sec] : 4
- 9592 0.631036[sec] : 5
- 9592 0.52903[sec] : 6
- 9592 0.549031[sec] : 7
- 9592 0.497028[sec] : 8
- 9592 0.470027[sec] : 9
스레드 수 1~9 에서 실행시켜보니, 8개의 논리 코어를 가진 i7에서는 당연하지만 스레드 8개 정도에서 스피드가 한계에 부딪힙니다. 3배 정도 빨라졌군요. Windows API로 구현할 경우, 스레드의 본체인 count_prime 을 스레드에 올리기 위한 wrapper (이 샘플에서는 thread_io 와 thread_entry)가 필요합니다.
1 : thread
그럼 C++11 의 스레드 라이브러리를 사용한 버전을 봅시다.
- void multi(int M, int nthr) {
- vector<thread> thr(nthr);
- vector<int> count(nthr);
- div_range<> rng(2,M,nthr);
- chrono::system_clock::time_point start = chrono::system_clock::now();
- for ( int i = 0; i < nthr; ++i ) {
- thr[i] = thread([&,i](int lo, int hi) { count[i] = count_prime(lo,hi); }, rng.lo(i), rng.hi(i));
- }
- int result = 0;
- for ( int i = 0; i < nthr; ++i ) {
- thr[i].join();
- result += count[i];
- }
- chrono::duration<double> sec = chrono::system_clock::now() - start;
- cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
- }
- int main() {
- const int M = 100000;
- for ( int i = 1; i < 10; ++i ) multi(M, i);
- }
어떻습니까? Windows API를 통한 구현과 비교하면 훨씬 간단하여, std::sthread의 생성자에 스레드의 진입점과 인수를 넣어주는 것만으로 스레드가 생성되어 움직입니다. 나머지는 join() 으로 완료되기를 기다리는 것 뿐.
생성자에 넘길 스레드 진입점은 ()로 된 것, 즉 함수자라면 뭐든 OK입니다.
- void count_prime_function(int lo, int hi, int& result) {
- result = count_prime(lo, hi);
- }
- class count_prime_class {
- int& result_;
- public:
- count_prime_class(int& result) : result_(result) {}
- void operator()(int lo, int hi) { result_ = count_prime(lo, hi); }
- };
- void multi(int M) {
- thread thr[3];
- int count[3];
- div_range<> rng(2,M,3);
- auto count_prime_lambda = [&](int lo, int hi) { count[2] = count_prime(lo,hi); };
- chrono::system_clock::time_point start = chrono::system_clock::now();
- // 함수 포인터, 클래스 인스턴스, lambda식으로 스레드를 작성
- thr[0] = thread(count_prime_function, rng.lo(0), rng.hi(0), ref(count[0]));
- thr[1] = thread(count_prime_class(count[1]), rng.lo(1), rng.hi(1));
- thr[2] = thread(count_prime_lambda, rng.lo(2), rng.hi(2));
- for ( thread& t : thr ) t.join();
- chrono::duration<double> sec = chrono::system_clock::now() - start;
- cout << count[0] + count[1] + count[2] << ' ' << sec.count() << "[sec]" << endl;
- }
2 : async/future
thread 를 사용하면서 join()을 통해 완료를 기다린 후 결과를 참조(읽기)하고 있습니다만, async/future를 사용하면 완료 대기와 결과 참조가 간략화됩니다. 리턴값(결과)를 돌려주는 함수자와 인수를 async에 넘겨주면 future가 리턴됩니다. 그 future에 대해 get() 하는 것만으로 완료 대기와 결과 참조를 같이 할 수 있습니다.
- class count_prime_class {
- public:
- int operator()(int lo, int hi) { return count_prime(lo, hi); }
- };
- void multi(int M) {
- future<int> fut[3];
- div_range<> rng(2,M,3);
- auto count_prime_lambda = [&](int lo, int hi) { return count_prime(lo,hi); };
- chrono::system_clock::time_point start = chrono::system_clock::now();
- // 함수 포인터, 클래스 인스턴스, lambda식으로 future를 작성
- fut[0] = async(count_prime, rng.lo(0), rng.hi(0));
- fut[1] = async(count_prime_class(), rng.lo(1), rng.hi(1));
- fut[2] = async(count_prime_lambda, rng.lo(2), rng.hi(2));
- int result = fut[0].get() + fut[1].get() + fut[2].get();
- chrono::duration<double> sec = chrono::system_clock::now() - start;
- cout << count[0] + count[1] + count[2] << ' ' << sec.count() << "[sec]" << endl;
- }
3 : mutex/atomic
1 : thread 에서 사용한 코드를 조금 바꿔보겠습니다. lo 이상 hi 미만의 소수를 감정하는 coutn_prime을 전부 없애버리고, lambda 내에서 직접 처리하게 해보겠습니다.
- void multi(int M, int nthr) {
- vector<thread> thr(nthr);
- div_range<> rng(2, M, nthr);
- int result = 0;
- chrono::system_clock::time_point start = chrono::system_clock::now();
- for ( int t = 0; t < nthr; ++t ) {
- thr[t] = thread([&](int lo, int hi) {
- for ( int n = lo; n < hi; ++n ) {
- // n이 소수라면 result에 1을 더함
- if ( is_prime(n) ) {
- ++result;
- }
- },
- rng.lo(t), rng.hi(t));
- }
- for ( thread& th : thr ) { th.join(); }
- chrono::duration<double> sec = chrono::system_clock::now() - start;
- cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
- }
잘 굴러갈 것 같…지만, 여기에는 커다란 함정이 있습니다. n이 소수일때 ++result 하고 있는데, ++result 는 result 를 읽고 「1을 더해서/쓰고 돌려주는 처리」를 합니다. 읽고 나서 쓰고 돌려줄 때까지의 사이에 다른 스레드가 끼어들면 result 의 결과가 이상해집니다. 이것을 data-race(자료 경합)이라고 부르며, 위태로운 타이밍에 발생하기에 재현하기가 어려운 상당히 까다로운 버그입니다. 이런 때에 「여기서 여기까지, 다른 스레드는 들어오지 마(밖에서 기다려)!」를 실현하는 것이 mutex(mutual exclusion : 상호배타) 입니다. mutex 를 lock() 한 후 unlock() 할 때까지, 다른 스레드는 lock() 지점에서 블럭됩니다. 위의 예시를 제대로 움직이려면
- ...
- mutex mtx;
- int result = 0;
- ...
- // n이 소수라면 result에 1을 더함
- if ( is_prime(n) ) {
- mtx.lock();
- ++result;
- mtx.unlock();
- }
- ...
혹은
- ...
- mutex mtx;
- int result = 0;
- ...
- // n이 소수라면 result에 1을 더함
- if ( is_prime(n) ) {
- lock_guard<mutex> guard(mtx);
- ++result;
- }
- ...
lock_guard는 생성될 때 lock()/파괴될 때 unlock() 해주므로 unlock() 하는 것을 잊어버릴 위험이 없어 편합니다. 도중에 예외가 발생해도 확실하게 unlock() 해주고요.
또, int, long 등 내장 타입 및 포인터 타입에 대해서 ++, --, +=, -=, &=, |= 등의 연산을 행하는(극히 짧은) 동안만 다른 스레드의 끼어들기를 억지할 때에는 고속/경량인 atomic 을 추천합니다. 위의 예시라면
- ...
- atomic<int> result = 0;
- ...
- // n이 소수라면 result에 1을 더함
- if ( is_prime(n) ) {
- ++result;
- }
- ...
로 OK입니다.
4 : contition_variable
condition_variable 을 사용하여 한 스레드에서 다른 스레드 이벤트를 던질 수 있습니다. 이벤트를 기다리는 스레드에서는
- mutex mtx;
- condition_variable cv;
- unique_lock<mutex> lock(mtx);
- ...
- cv.wait(lock);
- ...
condition_variable에서 wait() 하면 lock 된 mutex를 일단 풀고(unlock 하고) 대기 상태가 됩니다. 이벤트를 송출할 스레드에서는
- unique_lock<mutex> lock(mtx);
- ...
- cv.notify_all();
- ...
condition_variable에 notify_all (또는 notify_one) 하면 대기하는 쪽의 wait()가 풀림과 동시에 mutex가 다시 lock 되는 구조입니다.
이것을 이용해서 소수를 감정하고 있는 모든 스레드의 완료를 기다려보겠습니다.
- void multi(int M, int nthr) {
- vector<thread> thr(nthr);
- div_range<> rng(2,M,nthr);
- condition_variable cond;
- int finished = 0;
- atomic<int> result = 0;
- mutex mtx;
- chrono::system_clock::time_point start = chrono::system_clock::now();
- for ( int t = 0; t < nthr; ++t ) {
- thr[t] = thread([&](int lo, int hi) {
- for( int n = lo; n < hi; ++n ) {
- if( is_prime(n) ) ++result;
- }
- lock_guard<mutex> guard(mtx);
- ++finished;
- cond.notify_one();
- },
- rng.lo(t) rng.hi(t));
- }
- unique_lock<mutex> lock(mtx);
- // 모든 스레드가 ++finished 해서 finished == nthr 이 되는 것을 기다림
- cond.wait( lock, [&]() { return finished == nthr; } );
- chrono::duration<double> sec = chrono::system_clock::now() - start;
- cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
- for( thread& th : thr ) { th.join(); }
- }
condition_variable 을 사용한 예를 하나 더 들어보겠습니다. 랑데부(rendezvous) 혹은 배리어(barrier) 라고 불리는 「대기」 구조입니다.
- class rendezvous {
- public:
- rendezvous(unsigned int count)
- : threshold_(count), count_(count), generation_(0) {
- if ( count == 0 ) { throw std::invalid_argument("count cannot be zero."); }
- }
- bool wait() {
- std::unique_lock<std::mutex> lock(mutex_);
- unsigned int gen = generation_;
- if ( --count_ == 0 ) {
- generation_++;
- count_ = threshold_;
- condition_.notify_all();
- return true;
- }
- condition_.wait(lock, [&]() { return gen != generation_; });
- return false;
- }
- private:
- std::mutex mutex_;
- std::condition_variable condition_;
- unsigned int threshold_;
- unsigned int count_;
- unsigned int generation_;
- };
rendezvous r(5); 처럼, 기다릴 사람 수(스레드 수)를 인수로 하는 생성자를 만듭니다. 각 스레드가 r.wait() 하면 전원이 모일때까지 대기 상태가 되어, 마지막 스레드가 r.wait() 하는 순간 전원의 블럭이 풀려 일제히 움직이기 시작합니다.
앞서 본 모든 스레드 완료 대기를 rendezvous 로 구현하면,
- void multi(int M, int nthr) {
- vector<thread> thr(nthr);
- div_range<> rng(2,M,nthr);
- atomic<int> result = 0;
- rendezvous quit(nthr+1);
- chrono::system_clock::time_point start = chrono::system_clock::now();
- for ( int t = 0; t < nthr; ++t ) {
- thr[t] = thread([&](int lo, int hi) {
- for( int n = lo; n < lo; ++n ) {
- if ( is_prime(n) ) ++result;
- }
- quit.wait();
- },
- rng.lo(t), rng.hi(t));
- }
- quit.wait(); // 모든 스레드가 wait 할때까지 기다림
- chrono::duration<double> sec = chrono::system_clock::now() - start;
- cout << result << ' ' << sec.count() << "[sec] : " << nthr << endl;
- for ( thread& th : thr ) { th.join(); }
- }
C++11 이 제공하는 스레드 라이브러리를 간단하게 소개해봤습니다. 어떻습니까? 멀티스레드 애플리케이션을 훨씬 간단하게 작성할 수 있게 되었습니다. CPU 미터가 모든 코어를 채우는 High Performance Computing 의 쾌감을 즐겨보시길.
이 글은 스프링노트에서 작성되었습니다.