محمد الميداوي

كيفية إنشاء خيط std::thread

تُنشَأ الخيوط في C++‎ باستخدام الصنف std::thread، والخيط (thread) هو مسار تنفيذ منفصل أشبه بمساعد يساعدك على أداء مهمة فرعية أثناء إنجازك لمهمة أخرى، ثم يتوقف عند اكتمال تنفيذ الشيفرة في الخيط.

يجب أن تمرر شيفرة ما للخيط عند إنشائه لينفذها، مثل:

  • الدوالّ الحرّة (Free functions).
  • الدوال التابعة.
  • الكائنات الدالية (Functor).
  • تعبيرات لامدا.

تكون المعاملات المُمرَّرة:

المعامل التفاصيل
other تأخذ ملكية ‎other‎، بحيث أنّ ‎other‎ تفقد ملكيّة الخيط (thread)
func دالّة لأجل استدعائها في خيط منفصل
args وسائط لـ ‎func‎

هذا مثال على تمرير دالّة حرّة لتُنفّذ في خيط منفصل:

#include <iostream>
#include <thread>

void foo(int a)
{
    std::cout << a << '\n';
}

int main()
{
    // إنشاء وتنفيذ الخيط

تكون foo هنا هي الدالة محل التنفيذ، و10 هو الوسيط الممرر إليها، وسينفَذ الخيط الآن بشكل منفصل ويُنتَظر هنا حتى تمام تنفيذه، انظر:

    std::thread thread(foo, 10);    

    thread.join();

    return 0;
}

هذا مثال على استخدام تابع ليُنفّذ في خيط منفصل:

#include <iostream>
#include <thread>

class Bar
{
    public:
        void foo(int a)
        {
            std::cout << a << '\n';
        }
};

int main()
{
    Bar bar;

    // إنشاء وتنفيذ الخيط
    std::thread thread(&Bar::foo, &bar, 10);    // Pass 10 to member function

التابع سيُنفَّذ الآن بشكل منفصل، سننتظر هنا حتى تمام تنفيذ الخيط، فهذه عملية معطِّلة (Blocking)، نتابع:

    thread.join();

    return 0;
}

مثال على استخدام كائن دالّي:

#include <iostream>
#include <thread>

class Bar
{
    public:
        void operator()(int a)
        {
            std::cout << a << '\n';
        }
};

int main()
{
    Bar bar;

    // انشاء الخيط وتنفيذه
    std::thread thread(bar, 10);    // مرر 10 إلى الكائن الدالي.

سيُنفَّذ الكائن الدالي الآن بشكل منفصل، سننتظر هنا حتى تمام تنفيذ الخيط، فهذه عملية معطِّلة (Blocking)، نتابع:

   thread.join();

    return 0;
}

مثال على تمرير تعبير لامدا:

#include <iostream>
#include <thread>

int main()
{
    auto lambda =[](int a)
    {
        std::cout << a << '\n';
    };
    // انشاء الخيط وتنفيذه
    std::thread thread(lambda, 10);    // تمرير 10 إلى تعبير لامدا

سيُنفَّذ تعبير لامدا الآن بشكل منفصل، وسننتظر اكتمال تنفيذ الخيط، فهي عملية معطِّلة، نتابع:

    thread.join();

    return 0;
}

تمرير مرجع إلى خيط

لا يمكنك تمرير مرجع -أو مرجع ثابت ‎const‎- مباشرةً إلى خيط، لأنّ الخيط سينسخها/ينقلها، بل استخدم ‎std::reference_wrapper‎:

void foo(int & b)
{
    b = 10;
}

int a = 1;
std::thread thread
{
    foo, std::ref(a)
};    

الآن، a ممرَّرة على شكل مرجع، تابع المثال:

thread.join();
std::cout << a << '\n';    //  10
void bar(const ComplexObject &co)
{
    co.doCalculations();
}

ComplexObject object;
std::thread thread
{
    bar, std::cref(object)
};

أيضًا، object ممرَّر الآن على شكل &const، تابع:

thread.join();
std::cout << object.getResult() << '\n';   

استخدام std::async بدلاً من std::thread

تستطيع std‎::async أن تنشئ خيوطًا رغم أنها أضعف من ‎std‎::thread، لكنّها تتميّر بأنّها أسهل في حال كنت تريد تنفيذ دالة بشكل غير متزامن (asynchronously).

استدعاء دالّة بشكل غير متزامن

#include <future>
#include <iostream>

unsigned int square(unsigned int i)
{
    return i * i;
}

int main()
{
    auto f = std::async (std::launch::async, square, 8);
    std::cout << "square currently running\n";    // square افعل شيئا ما أثناء تنفيذ
    std::cout << "result is " << f.get() << '\n';    // square الحصول على النتيجة من
}

أخطاء شائعة

  • تعيد std::async كائن std::future يحتوي القيمة المُعادة التي ستحسُبها الدالّة، وعند تدمير ‎future‎ فإنّها تنتظر حتى يكتمل الخيط ممّا يجعل الشيفرة أحادية الخيوط (single threaded). يُمكن تجاهَل هذا السلوك إذا لم تكن بحاجة إلى القيمة المُعادة:
std::async (std::launch::async, square, 5);

في الشيفرة السابقة، انتهى تنفيذ الخيط لأن قيمة future قد دُمِّرت.

  • تعمل std::async بدون سياسة إطلاق (launch policy)، لذا فإنّ التعبير ‎std::async(square, 5);‎ سيُصرَّف. عندئذ يقرر النظام إن كان سينشئ خيطًا أم لا. والفكرة أنّ النظام سيختار إنشاء خيط إن لم يكن عدد الخيوط قيد التنفيذ أكبر ممّا يمكنه التعامل معه. لكن عادة ما تختار التنفيذات (implementations) عدم إنشاء خيط في مثل هذه المواقف، لذا ستحتاج إلى إعادة تعريف هذا السلوك باستخدام ‎std::launch::async‎، التي تجبر النظام على إنشاء الخيط.

أساسيات التزامن بين الخيوط

يمكن تحقيق تزامن الخيوط باستخدام كائنات المزامنة (mutexes)، وتوفّر المكتبة القياسية العديد من أنواع كائنات المزامنة تلك لكن أبسطها هو ‎std::mutex‎ وسنتحدث عن تلك الكائنات بالتفصيل في القسم التالي.

ولقفل كائن مزامنة ستحتاج إلى إنشاء قفل (lock) خاصّ به، وأبسط أنواع الأقفال هو ‎std::lock_guard‎:

std::mutex m;
void worker()
{
    std::lock_guard<std::mutex > guard(m);    // يحصل على قفلٍ على كائن المزامنة
    // الشيفرة المُزامَنة هنا
}    // سيُحرَّر كائن المزامنة عندما يخرج الدرع عن النطاق 

سيُقفل كائن المزامنة باستخدام ‎std::lock_guard‎ طول العمر الافتراضي لكائن القفل، وإن أردت التحكم في المناطق المقفلة يدويًا، فاستخدم ‎std::unique_lock‎:

std::mutex m;
void worker()
{

افتراضيًا، إنشاء unique_lock من كائن مزامنة سيقفل ذلك الكائن، ونستطيع إنشاء درع في حالة مفتوحة عبر تمرير std::defer_lock كوسيط ثاني ثم نقفل يدويًا فيما بعد، تابع المثال:

    std::unique_lock<std::mutex > guard(m, std::defer_lock);
    // لم يُقفَل كائن المزامنة بعد
    guard.lock();
    // شيفرة خاصة
    guard.unlock();
    // تحرير كائن المزامنة مجددا
}

كائنات المزامنة Mutexes

كائنات المزامنة هي بنيات مزامنة بسيطة غير تكرارية (non-recursive) تُستخدَم لحماية البيانات التي يمكن الوصول إليها من خيوط متعددة (multiple threads).

std::atomic_int temp
{
    0
};
std::mutex _mutex;

std::thread t([ &]()
{
    while (temp != -1)
    {
        std::this_thread::sleep_for(std::chrono::seconds(5));
        std::unique_lock<std::mutex > lock(_mutex);

        temp = 0;
    } });

while (true)
{
    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    std::unique_lock<std::mutex > lock(_mutex, std::try_to_lock);
    if (temp < INT_MAX)
        temp++;
    cout << temp << endl;

}

أنواع كائنات المزامنة

توفّر الإصدارات C++1x عددًا من أصناف كائنات المزامنة:

  • std::mutex - توفّر وظائف القفل الأساسية.
  • std::timed_mutex - توفّر وظائف try_to_lock
  • std::recursive_mutex - تتيح القفل التكراري من قِبل نفس الخيط
  • std::shared_mutex و std::shared_timed_mutex - توفّران وظائف قفل مشتركة وحصرية

الأقفال std::lock

تستخدِم الأقفال std::lock خوارزميات لتجنب الشلل الوظيفي (deadlock) من أجل قفل كائنات المزامنة. وعند رفع اعتراض أثناء استدعاء لقَفل عدة كائنات فستفتح ‎std::lock‎ الكائنات المُقفلة قبل إعادة رفع الاعتراض.

std::lock(_mutex1, _mutex2);

الأقفال الحصرية (std::unique_lock) والأقفال المشتركة (std::shared_lock) والأقفال المُؤمّنة ( std::lock_guard)

تُستخدم هذه الأقفال مع آلية RAII للحصول على أقفال المحاولة (try locks)، وأقفال المحاولة الموقوتة (timed try locks)، والأقفال التكرارية (recursive locks).

  • std::unique_lock - تسمح بالملكية الحصرية لكائنات المزامنة.
  • std::shared_lock - تسمح بالملكية المشتركة لكائنات المزامنة، إذ يمكن لعدّة خيوط أن تحتفظ بقفل مشتركtd::shared_locks خاصّ بكائن مزامنة مشترك std::shared_mutex. وقد أتيح منذ C++‎ 14
  • std::lock_guard - بديل خفيف للأقفال الحصرية std::unique_lock والأقفال المشتركة std::shared_lock.
#include <unordered_map>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <string>
#include <iostream>

class PhoneBook
{
    public:
        std::string getPhoneNo(const std::string &name)
        {
            std::shared_lock<std::shared_timed_mutex > l(_protect);
            auto it = _phonebook.find(name);
            if (it != _phonebook.end())
                return (*it).second;
            return "";
        }

    void addPhoneNo(const std::string &name, const std::string &phone)
    {
        std::unique_lock<std::shared_timed_mutex > l(_protect);
        _phonebook[name] = phone;
    }

    std::shared_timed_mutex _protect;
    std::unordered_map<std::string, std::string > _phonebook;
};

استراتيجيات قفل الأصناف: std::try_to_lock و std::adopt_lock و std::defer_lock

لدينا ثلاث استراتيجيات لتختار منها عند إنشاء قفل حصري: ‎std::try_to_lock‎ و std::defer_lock و std::adopt_lock:

  1. std::try_to_lock - تسمح بمحاولة القفل (trying a lock) بدون تعطيل:
{
    std::atomic_int temp
    { 0 };
    std::mutex _mutex;

    std::thread t([ &]()
    {
        while (temp != -1)
        {
            std::this_thread::sleep_for(std::chrono::seconds(5));
            std::unique_lock<std::mutex > lock(_mutex, std::try_to_lock);

            if (lock.owns_lock())
            {
                //افعل شيئًا
                temp = 0;
            }
        } });

    while (true)
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        std::unique_lock<std::mutex > lock(_mutex, std::try_to_lock);
        if (lock.owns_lock())
        {
            if (temp < INT_MAX)
            {
                ++temp;
            }

            std::cout << temp << std::endl;
        }
    }
}
  1. std::defer_lock - تسمح بإنشاء بنية قفل دون الحصول على القفل. ذلك أنه عند قفل أكثر من كائن مزامنة، فهناك إمكانية لحدوث شلل وظيفي إذا حاولت دالتان الحصول على الأقفال في نفس الوقت:
{
    std::unique_lock<std::mutex > lock1(_mutex1, std::defer_lock);
    std::unique_lock<std::mutex > lock2(_mutex2, std::defer_lock);
    lock1.lock()
    lock2.lock();    // شلل وظيفي هنا
    std::cout << "Locked! << std::endl;
    //...
}

يمكن الحصول على الأقفال وإصدارها بالترتيب المناسب مع الشيفرة التالية، بغض النظر عما يحدث في الدالة:

{
    std::unique_lock<std::mutex > lock1(_mutex1, std::defer_lock);
    std::unique_lock<std::mutex > lock2(_mutex2, std::defer_lock);
    std::lock(lock1, lock2);    // لن يحدث شلل وظيفي.
    std::cout << "Locked! << std::endl;
    //...

}
  1. std::adopt_lock - لا تحاول القفل مرّة ثانية إذا كان الخيط المُستدعي يملك القفل حاليًا.
{
    std::unique_lock<std::mutex > lock1(_mutex1, std::adopt_lock);
    std::unique_lock<std::mutex > lock2(_mutex2, std::adopt_lock);
    std::cout << "Locked! << std::endl;
    //...
}

تذكّر أنّ std::adopt_lock ليست بديلاً عن استخدام كائنات المزامنة التكرارية، فسيحرَّر كائن المزامنة عند خروج القفل عن النطاق.

الأقفال النطاقية std::scoped_lock ‏(C++ 17)

توفّر الأقفال النطاقية std::scoped_lock دلالات RAII لامتلاك كائن مزامنة أو أكثر، وتُستخدم مع خوارزميات تجنّب الشلل الوظيفي التي تستخدمها الأقفال العادية ‎std::lock‎. وعندما تُدمَّر ‎std::scoped_lock‎، فإن كائنات المزامنة تُحرّر بالترتيب العكسي لترتيب الحصول عليها.

{
    std::scoped_lock lock
    {
        _mutex1, _mutex2
    };
    // افعل شيئا ما
}

كائنات المزامنة التكرارية Recursive Mutex

تسمح كائنات المزامنة التكرارية لخيط ما بقفل أحد الموارد بدون حد معين، ولا توجد مبررات كثيرة لاستخدام هذه التقنية، لكن قد تحتاج بعض التنفيذات (implementations) المعقّدة إلى استدعاء نسخة مُحمّلة تحميلا زائدًا (overloaded) من دالّة دون تحرير القفل. انظر المثال التالي:

std::atomic_int temp
{
    0
};
std::recursive_mutex _mutex;

تطلق launch_deferred مهامًا غير متزامنة على نفس معرِّف الخيط، تابع …

auto future1 = std::async (
    std::launch::deferred,
[ &]()
    {
        std::cout << std::this_thread::get_id() << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds(3));
        std::unique_lock<std::recursive_mutex > lock(_mutex);
        temp = 0;
    });
auto future2 = std::async (
    std::launch::deferred,
[ &]()
    {
        std::cout << std::this_thread::get_id() << std::endl;
        while (true)
        {
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
            std::unique_lock<std::recursive_mutex > lock(_mutex, std::try_to_lock);
            if (temp < INT_MAX)
                temp++;
            cout << temp << endl;

        } });
future1.get();
future2.get();

هياكل مزامنة الخيوط

قد يتطلّب استعمال الخيوط أن تستخدم بعض تقنيات المزامنة إذا كانت الخيوط تتفاعل مع بعضها، وسنتحدث في هذا الموضوع عن عدد من الهياكل التي توفّرها المكتبة القياسية لحل هذه المشكلة.

std::condition_variable_any و std::cv_status

‎std::condition_variable_any‎ هي تعميم لـ‎std::condition_variable‎، ويمكن أن تعمل مع أيّ نوع من الهياكل الأساسية القابلة للقفل (BasicLockable structure).

وstd::cv_status كقيمة مُعادة من متغيّر شرطي يكون لها رمزا إعادة (return codes) محتملان:

  • std::cv_status::no_timeout: إن لم تكن هناك مهلة (timeout)، وتمّ إشعارالمتغيّر الشرطي.
  • std::cv_status::timeout: عند انتهاء مهلة المتغيّر الشرطي.

الأقفال المشتركة std::shared_lock

يمكن استخدام الأقفال المشتركة مع قفل حصريّ (unique lock) من أجل السماح بعدّة قارئات (readers)، لكن مع كاتبات (writers) حصرية.

#include <unordered_map>
#include <mutex>
#include <shared_mutex>
#include <thread>
#include <string>
#include <iostream>

class PhoneBook
{
    public:
        string getPhoneNo(const std::string &name)
        {
            shared_lock<shared_timed_mutex> r(_protect);
            auto it = _phonebook.find(name);
            if (it == _phonebook.end())
                return (*it).second;
            return "";
        }

    void addPhoneNo(const std::string &name, const std::string &phone)
    {
        unique_lock<shared_timed_mutex> w(_protect);
        _phonebook[name] = phone;
    }

    shared_timed_mutex _protect;
    unordered_map<string, string> _phonebook;
};

std::call_once و std::once_flag

تضمن std::call_once ألّا تُنفّذ دالّة معيّنة إلّا مرّة واحدة فقط من قبل الخيوط المتنافسة (competing threads). وتطلق خطأ نظامي std::system_error في حال حدث ذلك. كذلك فإن std::call_once تُستخدم مع ‎td::once_flag‎.

#include <mutex>
#include <iostream>

std::once_flag flag;
void do_something(){
    std::call_once(flag, [](){std::cout << "Happens once" << std::endl;});

    std::cout << "Happens every time" << std::endl;
}

قفل الكائنات لتحسين كفاءة الوصول

قد ترغب في قفل الكائن بالكامل أثناء إجراء عمليات متعدّدة عليه، كأن تريد فحصه أو تعديله باستخدام المُكرّرات. وإن كنت بحاجة إلى استدعاء عدّة توابع، فمن الأفضل عمومًا قفله بالكامل بدلاً قفل التوابع فرديّا، انظر:

class text_buffer
{
    // لأجل تحسين القراءة والصيانة
    using mutex_type = std::shared_timed_mutex;
    using reading_lock = std::shared_lock<mutex_type> ;
    using updates_lock = std::unique_lock<mutex_type> ;
    public:

يعيد هذا قفلًا نطاقيًا (scoped lock) تستطيع عدة قارئات أن تتشاركه مع استثناء الكاتبات في نفس الوقت، تابع المثال …

    [[nodiscard]]
    reading_lock lock_for_reading() const
    {
        return reading_lock(mtx);
    }

يعيد هذا قفلًا نطاقيًا خاصًا بكاتب واحد، مع منع القارئات، تابع …

    [[nodiscard]]
    updates_lock lock_for_updates() { return updates_lock(mtx); }

    char* data() { return buf; }
    char const* data() const { return buf; }

    char* begin() { return buf; }
    char const* begin() const { return buf; }

    char* end() { return buf + sizeof(buf); }
    char const* end() const { return buf + sizeof(buf); }

    std::size_t size() const { return sizeof(buf); }
private:
    char buf[1024];
    mutable mutex_type mtx;    // للكائنات الثابتة بأن تُقفَل mutable يسمح
};

يسمح mutable في السطر الأخير الشيفرة أعلاه بأن تُقفَل الكائنات الثابتة، ويُقفل الكائن عند حساب المجموع (checksum) من أجل القراءة، وهذا سيفسح المجال أمام الخيوط الأخرى التي ترغب في القراءة من الكائن في نفس الوقت بأن تقرأ منه.

std::size_t checksum(text_buffer const &buf)
{
    std::size_t sum = 0xA44944A4;
    // قفل الكائن لأجل القراءة
    auto lock = buf.lock_for_reading();
    for (auto c: buf)
        sum = (sum << 8) | (((unsigned char)((sum & 0xFF000000) >> 24)) ^ c);
    return sum;
}

ويؤدّي مسح الكائن إلى تحديث بياناته الداخلية، لذا يجب فعل ذلك باستخدام قفل حصري.

void clear(text_buffer & buf)
{
    auto lock = buf.lock_for_updates();    // قفل حصري
    std::fill(std::begin(buf), std::end(buf), '\0');
}

يجب توخي الحذر دائمًا عند الحصول على أكثر من قفل، والحرص على الحصول على الأقفال بنفس الترتيب لجميع الخيوط.

void transfer(text_buffer const &input, text_buffer &output)
{
    auto lock1 = input.lock_for_reading();
    auto lock2 = output.lock_for_updates();
    std::copy(std::begin(input), std::end(input), std::begin(output));
}

ملاحظة: من الأفضل إنجاز ذلك باستخدام std::deferred::lock ثمّ استدعاء std::lock

متغير تقييد الوصول

متغير تقييد الوصول (Semaphore) غير متاح حاليًا في C++‎، ولكن يمكن تنفيذه بسهولة باستخدام كائنات المزامنة والمتغيّرات الشرطية. هذا المثال مأخوذ من:

متغيرات تقييد الوصول في C++‎ 11

انظر المثال التوضيحي التالي:

#include <mutex>
#include <condition_variable>

class Semaphore
{
    public:
        Semaphore(int count_ = 0): count(count_) {}

    inline void notify(int tid)
    {
        std::unique_lock<std::mutex > lock(mtx);
        count++;
        cout << "thread " << tid << " notify" << endl;
        //أشعِر الخيط المنتظِر.
        cv.notify_one();
    }

    inline void wait(int tid)
    {
        std::unique_lock<std::mutex > lock(mtx);
        while (count == 0)
        {
            cout << "thread " << tid << " wait" << endl;
            // notify انتظر كائن المزامنة إلى حين استدعاء
            cv.wait(lock);
            cout << "thread " << tid << " run" << endl;
        }

        count--;
    }

    private:
        std::mutex mtx;
    std::condition_variable cv;
    int count;
};

مثال على استخدام متغير تقييد الوصول

تضيف الدالّة التالية أربعة خيوط، تتنافس ثلاثة منها على متغير تقييد الوصول الذي يُضبط عدّاده عند القيمة 1. وسيستدعي الخيط الأبطأ ‎notify_one()‎، ممّا يسمح لأحد الخيوط المنتظِرة بالمتابعة.

ونتيجة لهذا تبدأ ‎s1‎ على الفور، مما سيُبقي عدّاد متغير تقييد الوصول ‎count‎ دون القيمة 1، وستنتظر الخيوط الأخرى دورها في المتغيّر الشرطي حتى استدعاء notify()‎‎.

int main()
{
    Semaphore sem(1);

    thread s1([ &]()
    {
        while (true)
        {
            this_thread::sleep_for(std::chrono::seconds(5));
            sem.wait(1);
        } });
    thread s2([ &]()
    {
        while (true)
        {
            sem.wait(2);
        } });
    thread s3([ &]()
    {
        while (true)
        {
            this_thread::sleep_for(std::chrono::milliseconds(600));
            sem.wait(3);
        } });
    thread s4([ &]()
    {
        while (true)
        {
            this_thread::sleep_for(std::chrono::seconds(5));
            sem.notify(4);
        } });

    s1.join();
    s2.join();
    s3.join();
    s4.join();
    ...
}

إنشاء ساحة خيوط بسيطة

خيوط C++‎ 11 الأساسية منخفضة المستوى نسبيًا، لذا يمكن استخدامها لكتابة كائنات عالية المستوى مثل ساحة الخيوط (thread pool):

الإصدار ≥ C++‎ 14

يشكل كائن المزامنة والمتغير الشرطي وكائن deque طابورًا من المهام آمن على الخيوط (thread-safe):

struct tasks
{
    std::mutex m;
    std::condition_variable v;

… لاحظ أن <packaged_task<void تستطيع تخزين <packaged_task<R … :

    std::deque< std::packaged_task < void() >> work;
    //  هذا سيمثل العمل الذي أنجزته الخيوط
    std::vector<std::future < void>> finished;

(queue (lambda سيضيف لامدا إلى قائمة المهام التي سينفذها الخيط …:

     template < class F, class R = std::result_of_t < F &() >>
        std::future<R> queue(F && f)
        {
            // لتقسيم التنفيذ - packaged task - تغليف كائن الدالة في مهمة محزومة
            std::packaged_task < R() > p(std::forward<F> (f));
            auto r = p.get_future();    // الحصول على القيمة من المهمة قيد تنفيذ 
            {
                std::unique_lock<std::mutex > l(m);

سنخزن المهمة <()R> على شكل <()void>، تابع:

                work.emplace_back(std::move(p));
            }

            v.notify_one();    // إيقاظ الخيط ليعمل على المهمة
            return r;    // إعادة النتيجة المستقبلية للمهمة
        }

والآن، نبدأ عدد N من الخيوط في ساحة الخيوط، نتابع المثال:

    void start(std::size_t N = 1)
    {
        for (std::size_t i = 0; i < N; ++i)
        {

كل الخيوط الآن غير متزامنة std::async، وتنفذ ()this->thread_task، تابع:

            finished.push_back(
                std::async (
                    std::launch::async,
                [this]
                    {
                        thread_task();
                    }
            )
        );
        }
    }

تلغي ()abort كل المهام التي لم تنطلق بعد، وإخطار كل الخيوط العاملة أن تتوقف، وتنتظرهم حتى ينتهوا، تابع:

  void abort()
    {
        cancel_pending();
        finish();
    }

تلغي ()cancel_pending المهام التي لم تنطلق بعد:

    void cancel_pending()
    {
        std::unique_lock<std::mutex > l(m);
        work.clear();
    }

هنا نرسل رسالة "stop the thread" إلى جميع الخيوط، ثم نتظرها، تابع:

    void finish()
    {
        {
            std::unique_lock<std::mutex > l(m);
            for (auto && unused: finished)
            {
                work.push_back( {});
            }
        }

        v.notify_all();
        finished.clear();
    }~tasks()
    {
        finish();
    }

    private:
        //: العمل الذي يقوم به الخيط قيد التنفيذ
        void thread_task()
        {
            while (true)
            {
                // سحب مهمة من الطابور
                std::packaged_task < void() > f;
                {
                    std::unique_lock<std::mutex > l(m);
                    if (work.empty())
                    {
                        v.wait(l, [& ]
                        {
                            return !work.empty();
    });
                    }

                    f = std::move(work.front());
                    work.pop_front();
                }

                // إذا كانت المهمة غير صالحة، فسيكون علينا إلغاؤها
                if (!f.valid()) return;
                // خلاف ذلك، ينبغي تنفيذ المهمة
                f();
            }
        }
};

تعيد الدالة التالية:

tasks.queue( []{ return "hello world"s; } )

‎‎قيمة من النوع std::future<std::string>‎، والتي ستساوي عند تنفيذ كائن المهام السلسلة النصية ‎hello world‎. كذلك يمكنك إنشاء الخيوط عن طريق تنفيذ ‎tasks.start(10)‎ (والتي تطلق 10 خيوط).

إن سبب استخدام ‎packaged_task<void()>‎هو أنّه لا يوجد قالب صنف ‎std::function‎‏ مكافئ ومشطوب النوع (type-erased)، ولا يخزّن إلّا أنواع النقل فقط (move-only types). أيضًا، قد تكون كتابة نوع مخصّص أسرع من استخدام ‎packaged_task<void()>‎.

انظر هذا المثال الحيّ على ذلك.

الإصدار = C++‎ 11

في C++‎ 11، استبدل ‎result_of_t<blah>‎ بـ ‎typename result_of<blah>::type‎.

التحقق من أنّ الخيط مضموم دائمًا

عند استدعاء مدمَّر ‎std::thread‎، يجب استدعاء ‎join()‎ أو ‎detach()‎. وإذا لم يُضمّ (joined) الخيط أو يُفصل (detached)، فستُستدعى ‎std::terminate‎ افتراضيًا. يمكن تسهيل هذا عبر استخدام RAII:

class thread_joiner
{
    public:
        thread_joiner(std::thread t): t_(std::move(t)) {}
    ~thread_joiner()
        {
            if (t_.joinable())
            {
                t_.join();
            }
        }

    private:
        std::thread t_;
}

ثم يمكن كتابة ما يلي:

void perform_work()
{
    // إنجاز عمل ما
}

void t()
{
    thread_joiner j
    {
        std::thread(perform_work)
    };
    // تنفيذ بعض الحسابات أثناء تنفيذ الخيط
}    // يُضمّ الخيط هنا تلقائيا

يوفّر هذا أيضًا أمان الاعتراضات (exception safety)؛ ذلك أنّه إذا أنشأنا الخيط بشكل طبيعي ثمّ تسبّب العمل المُنجَز في ‎t()‎ برفع اعتراض، فلن تُستدعى ‎join()‎ على خيطنا، ولن تكتمل العملية.

إجراء عمليات على الخيط الحالي

std::this_thread هي فضاء اسم (namespace) يحتوي بعض الدوالّ التي يمكن استخدامها لإجراء عمليات معيّنة على الخيط الحالي من الدالّة التي استُدعِي منها.

الدالة الوصف
get_id تعيد معرِّف الخيط.
sleep_for تجعل الخيط ينام لفترة محددة.
sleep_until تجعل الخيط ينام "حتى" وقت محدد.
yield إعادة جدولة الخيوط العاملة وإعطاء الأولوية لخيوط أخرى.

يمكنك الحصول على معُرّف الخيط الحالي باستخدام ‎std::this_thread::get_id‎، انظر:

void foo()
{
    // اطبع معرّف الخيط
    std::cout << std::this_thread::get_id() << '\n';
}

std::thread thread
{
    foo
};
thread.join();    // 12556 طُبِع معرّف الخيط الآن، وسيكون شيئا يشبه
foo();    // 2420 طُبِع معرّف الخيط الرئيسي الآن، وسيكون شيئا يشبه

النوم لمدة 3 ثوانٍ باستخدام ‎std::this_thread::sleep_for‎:

void foo()
{
    std::this_thread::sleep_for(std::chrono::seconds(3));
}

std::thread thread
{
    foo
};
foo.join();
std::cout << "Waited for 3 seconds!\n";

النوم إلى أن تنقضي 3 ساعات باستخدام ‎std::this_thread::sleep_until‎:

void foo()
{
    std::this_thread::sleep_until(std::chrono::system_clock::now() + std::chrono::hours(3));
}

std::thread thread
{
    foo
};
thread.join();
std::cout << "We are now located 3 hours after the thread has been called\n";

منح الأولوية لخيوط أخرى باستخدام ‎std::this_thread::yield‎:

void foo(int a)
{
    for (int i = 0; i<al++i)
        std::this_thread::yield();    // ستأخذ خيوط أخرى الأولوية الآن، لأنّ هذا الخيط لا يفعل أي شيء مهم
    std::cout << "Hello World!\n";
}

std::thread thread
{
    foo, 10
};
thread.join();

استخدام المتغيرات الشرطية Using Condition Variables

المتغيّر الشرطي (condition variable) هو كائن أوّلي (primitive) يُستخدم مع كائن مزامنة (mutex) لتنظيم الاتصالات بين الخيوط. ورغم أنّها ليست الطريقة الوحيدة لفعل ذلك ولا الأكثر فعالية إلا أنّها تتميّز بالبساطة والسهولة.

و يمكن انتظار المتغيرات الشرطية ‎std::condition_variable‎ عبر ‎std::unique_lock<std::mutex>‎. فهذا يسمح للشيفرة بفحص الحالة المشتركة (shared state) بأمان قبل تقرير ما إذا كان يجب متابعة عملية الحصول (acquisition) على القفل أم لا.

يستخدم المثال أدناه ‎std::thread‎ و ‎std::condition_variable‎ و ‎std::mutex‎.

#include <condition_variable>
#include <cstddef>
#include <iostream>
#include <mutex>
#include <queue>
#include <random>
#include <thread>

int main()
{
    std::condition_variable cond;
    std::mutex mtx;
    std::queue<int> intq;
    bool stopped = false;
    std::thread producer
    {
    [ &]()
        {

جهّز مولد الأعداد العشوائية، وسيدفع هذا المولد أعدادًا عشوائية إلى intq، تابع المثال:

            std::default_random_engine gen {};
            std::uniform_int_distribution<int> dist {};
            std::size_t count = 4006;
            while (count--)
            {

لابد من القفل قبل تغيير الحالة التي يحميها كائن المزامنة والمتغير الشرطي condition_variable، تابع:

                std::lock_guard<std::mutex > L
                {
                    mtx
                };
                // وضع العدد العشوائي في الطابور
                intq.push(dist(gen));
                cond.notify_one();
            }

الآن تم كل شيء، احصل على القفل وعين راية الإيقاف stopped ثم نبه المستخدم، تابع … :

            std::lock_guard<std::mutex > L
            {
                mtx
            };
            std::cout << "Producer is done!" << std::endl;
            stopped = true;
            cond.notify_one();
        }
    };
    std::thread consumer
    {
    [ &]()
        {
            do {
                std::unique_lock<std::mutex > L
                {
                    mtx
                };
                cond.wait(L, [& ]()
                {
                    // الاستحواذ على القفل في حال الانتهاء أو في حال لم يكن الطابور فارغا
                    return stopped || !intq.empty();
    });
                // نحن نملك كائن المزامنة هنا
                // سحب العناصر من الطابور إلى أن يصبح فارغا
                while (!intq.empty())
                {
                    const auto val = intq.front();
                    intq.pop();
                    std::cout << "Consumer popped: " << val << std::endl;
                }

                if (stopped)
                {
                    std::cout << "Consumer is done!" << std::endl;
                    break;
                }
            } while (true);
        }
    };
    consumer.join();
    producer.join();

    std::cout << "Example Completed!" << std::endl;
    return 0;
}

عمليات الخيوط Thread operations

عندما يبدأ تنفيد خيط معيّن، فسيُنفّذ إلى أن يكتمل، لكن قد تحتاج أحيانًا إلى انتظار اكتمال تنفيذ خيط ما إن كنت تريد استخدام النتيجة التي يعيدها. على سبيل المثال:

int n;
std::thread thread
{
    calculateSomething, std::ref(n)
};
// افعل أشياء أخرى

… نحن نحتاج n الآن، انتظر الخيط إلى أن ينتهي، إن لم يكن قد انتهى فعلًا، وستكون قيمة n بعدها هي النتيجة المحسوبة في خيط آخر، تابع:

thread.join();
std::cout << n << '\n';

يمكنك أيضًا فصل (‎detach‎) الخيط، والسماح بأن يُنفّذ بحرّية:

std::thread thread
{
    doSomething
};
//فصل الخيط، فنحن لا نريده بعد الآن
thread.detach();
// سيتم إنهاء الخيط عند اكتمال تنفيذه، أو عند عودة الخيط الرئيسي

التخزين المحلي للخيوط

يمكن إنشاء خيوط مُخزّنة محليًا باستخدام الكلمة المفتاحية ‎thread_local‎، والمتغيّرات التي يُصرّح عنها بالمحدّد ‎thread_local‎ يُقال أنّ لها مدة تخزين خيطية (thread storage duration).

  • كل خيط في البرنامج له نسخته الخاصّة من كل متغيّر محلي في الخيط (thread-local variable).
  • سيُهيّأ متغيّر الخيط المحلي الموجود في نطاق دالة (محلّية) بمجرّد تمرير التحكّم إلى تعريفها. هذا المتغيّر سيكون ساكنًا ضمنيًا ما لم يُصرّح عنه عبر ‎extern‎.
  • ستُهيّأ متغيّرات الخيط المحلي الموجودة في نطاق فضاء اسم أو نطاق صنف -غير محلي- عند بدء تنفيذ الخيط.
  • تُدمّر متغيّرات الخيط المحلي عند اكتمال تنفيذ الخيط.
  • لا يمكن لأعضاء صنف معيّن أن تكون محلية في الخيط (thread-local) إلا إن كانت ساكنة، وعندها ستكون هناك نسخة واحدة من ذلك المتغيّر لكل خيط، بدلاً من نسخة واحدة لكل زوج (نُسخة، خيط) [(thread, instance)]. انظر:
void debug_counter()
{
    thread_local int count = 0;
    Logger::log("This function has been called %d times by this thread", ++count);
}

إعادة إسناد كائنات الخيوط

يمكننا إنشاء كائنات خيوط فارغة (empty thread objects)، وإسناد مهامّ معيّنة إليها لاحقًا. وإذا أسندت كائن خيط إلى خيط آخر نشط وقابل للضمّ ‎joinable‎، فستُستدعى ‎std::terminate‎ تلقائيًا قبل استبدال الخيط. انظر:

#include <thread>

void foo()
{
    std::this_thread::sleep_for(std::chrono::seconds(3));
}

// إنشاء 100 كائن خيطي فارغ
std::thread executors[100];
// شيفرة هنا
// إنشاء بعض الخيوط 
for (int i = 0; i < 100; i++)
{
    // إذا لم يُسنَد خيط إلى هذا الكائن
    if (!executors[i].joinable())
        executors[i] = std::thread(foo);
}

الآجال والوعود (Futures and Promises)

تُستخدم الآجال (Futures) والوعود لنقل كائن من خيط إلى آخر.

  • يُضبط كائن الوعود ‎std::promise‎ من قِبل الخيط الذي يولّد النتيجة.
  • يُستخدم كائن ‎std::future‎ لاسترداد قيمة، أو التحقّق من إتاحة قيمة ما، أو لإيقاف التنفيذ إلى حين إتاحة القيمة.

أصناف العمليات غير المتزامنة

  • std::async : تنفذ عملية غير متزامنة.
  • std::future : توفّر وصولًا إلى نتيجة عملية غير متزامنة.
  • std::promise : تحزِم نتيجة العملية غير المتزامنة.
  • std::packaged_task : تربط دالّة مع الوعد المرتبط بها في نوع القيمة المُعادة.

ينشئ المثال التالي وعدًا لكي يستخدمه خيط آخر:

{
    auto promise = std::promise<std::string > ();

    auto producer = std::thread([ &]
    {
        promise.set_value("Hello World");
    });

    auto future = promise.get_future();

    auto consumer = std::thread([ &]
    {
        std::cout << future.get();
    });

    producer.join();
    consumer.join();
}

مثال مؤجل غير متزامن

تقدّم الشيفرة التالية نسخة من ‎std::async‎، إلّا أنّها تتصرف كما لو كانت ‎async‎ تُستدعى عبر سياسة الإطلاق المؤجّلة ‎deferred‎. ليس لهذه الدالّة أيضًا سلوك الأجل (‎future‎) الخاص بـ ‎async‎، ويمكن تدمير القيمة المستقبلية (‎future‎) المُعادة قبل الحصول على قيمتها.

template < typename F>
    auto async_deferred(F&& func)->std::future < decltype(func()) >
    {
        using result_type = decltype(func());
        auto promise = std::promise<result_type> ();
        auto future = promise.get_future();
        std::thread(std::bind([=](std::promise<result_type>& promise)
        {
            try
            {
                promise.set_value(func());

لاحظ أن هذا لن يعمل مع <std::promise <void إذ يحتاج برمجة القوالب الوصفية (meta-template programming)، تابع المثال …

                which is out of scope
                for this example.
            }

            catch (...)
            {
                promise.set_exception(std::current_exception());
            }
        }, std::move(promise))).detach();
        return future;
    }

std::packaged_task و std::future

تحزم std::packaged_task دالّة مع الوعد المرتبط بها في نوع القيمة المُعادة:

template < typename F>
    auto async_deferred(F&& func)->std::future < decltype(func()) >
    {
        auto task = std::packaged_task < decltype(func())() > (std::forward<F> (func));
        auto future = task.get_future();
        std::thread(std::move(task)).detach();
        return std::move(future);
    }

يبدأ الخيط في العمل فورًا، ونستطيع فصله أو ضمّه في نهاية النطاق، وتكون النتيجة جاهزة عند انتهاء استدعاء الدالّة لـ std::thread finishes. لاحظ أنّ هذا يختلف قليلاً عن ‎std::async‎، إذ أنّه عند تدمير الأجل ‎std::future‎ المُعاد، فسيُعطّل (block) إلى أن ينتهي الخيط.

std::future_error و std::future_errc

إذا لم تُستوفى قيود الوعود (std::promise) والآجال (std::future)، فسيُطرَح استثناء من النوع std::future_error. وسيكون رمز الخطأ في الاستثناء من النوع std::future_errc، وستكون القيم على النحو التالي:

enum class future_errc
{
broken_promise =             /* لم تعُد المهمّة مُشتركة */,
future_already_retrieved =         /* تم استرداد القيمة المُعادة سلفا */,
promise_already_satisfied =     /* الإجابة خُزِّنت سلفا */,
no_state =                 /* محاولة الدخول إلى وعد في حالة غير مشتركة */
};

انظر الأمثلة التوضيحية التالية:

  • الوعود غير النشطة (Inactive promise)
int test()
{
    std::promise<int> pr;
    return 0;    // ok تعيد 
}
  • الوعود النشطة غير المستخدمة:
int test()
{
    std::promise<int> pr;
    auto fut = pr.get_future();    // تعطيل إلى أجل غير مسمى.
    return 0;
}
  • الاسترجاع المزدوج Double retrieval
int test()
{
    std::promise<int> pr;
    auto fut1 = pr.get_future();
    try
    {
        auto fut2 = pr.get_future();    // future محاولة ثانية للحصول على
        return 0;
    }

    catch (const std::future_error& e)
    {
        cout << e.what() << endl;    //     Error: "The future has already been retrieved from the promise or packaged_task."
        return -1;
    }

    return fut2.get();
}
  • تعيين قيمة الوعد مرتين:
int test()
{
    std::promise<int> pr;
    auto fut = pr.get_future();
    try
    {
        std::promise<int> pr2(std::move(pr));
        pr2.set_value(10);
        pr2.set_value(10);    // محاولة ثانية لتعيين قيمة الوعد، وهذا سيؤدي إلى رفع اعتراض.
    }

    catch (const std::future_error& e)
    {
      cout << e.what() << endl; // Error: "The state of the promise has already been
               //  set."
        return -1;
    }

    return fut.get();
}

std::future و std::async

تُستخدَم ‎std::async‎ في مثال الترتيب المتوازي التالي لإطلاق عدّة مهام merge_sort متوازية، وتُستخدَم std::future لانتظار النتائج ومزامنتها:

#include <iostream>
using namespace std;
void merge(int low, int mid, int high, vector<int> &num)
{
    vector<int> copy(num.size());
    int h, i, j, k;
    h = low;
    i = low;
    j = mid + 1;

    while ((h <= mid) && (j <= high))
    {
        if (num[h] <= num[j])
        {
            copy[i] = num[h];
            h++;
        }
        else
        {
            copy[i] = num[j];
            j++;
        }

        i++;
    }

    if (h > mid)
    {
        for (k = j; k <= high; k++)
        {
            copy[i] = num[k];
            i++;
        }
    }
    else
    {
        for (k = h; k <= mid; k++)
        {
            copy[i] = num[k];
            i++;
        }
    }

    for (k = low; k <= high; k++)
        swap(num[k], copy[k]);
}

void merge_sort(int low, int high, vector<int> &num)
{
    int mid;
    if (low < high)
    {
        mid = low + (high - low) / 2;
        auto future1 = std::async (std::launch::deferred, [& ]()
        {
            merge_sort(low, mid, num);
    });
        auto future2 = std::async (std::launch::deferred, [& ]()
        {
            merge_sort(mid + 1, high, num);
    });

        future1.get();
        future2.get();
        merge(low, mid, high, num);
    }
}

ملاحظة: في المثال أعلاه، تُطلَق ‎std::async‎ وفق سياسة ‎std::launch_deferred‎، وذلك لتجنّب إنشاء خيط جديد في كل استدعاء، فتُجرى في مثالنا السابق استدعاءات ‎std::async‎ دون ترتيب، إذ أنّها تُزامن في استدعاءات ‎std::future::get()‎. بالمقابل، تفرض std::launch_async إنشاء خَيط جديد في كل استدعاء.

السياسة الافتراضية هي ‎std::launch::deferred| std::launch::async‎، ممّا يعني أن التقديم سيكون هو المسؤول عن تحديد سياسة إنشاء الخيوط الجديدة.

هذا الدرس جزء من سلسلة دروس عن C++‎.

ترجمة -بتصرّف- للفصول:

  • Chapter 80: Threading
  • Chapter 85: Mutexes
  • Chapter 86: Recursive Mutex
  • Chapter 87: Semaphore
  • Chapter 88: Futures and Promises

من كتاب C++ Notes for Professionals





تفاعل الأعضاء


لا توجد أيّة تعليقات بعد



يجب أن تكون عضوًا لدينا لتتمكّن من التعليق

انشاء حساب جديد

يستغرق التسجيل بضع ثوان فقط


سجّل حسابًا جديدًا

تسجيل الدخول

تملك حسابا مسجّلا بالفعل؟


سجّل دخولك الآن