اذهب إلى المحتوى

الخيوط threads والمعالجة على التوازي في جافا


رضوى العربي

استخدمنا تقنية البرمجة على التوازي بمثال القسم الفرعي التعاود داخل الخيوط من المقال السابق البرمجة باستخدام الخيوط threads في جافا لتنفيذ أجزاءٍ صغيرة من مهمةٍ كبيرة، حيث تَسمَح تلك التقنية للحواسيب مُتعدّدة المعالجات بإكمال عملية المعالجة بوتيرةٍ أسرع. ولكننا في الواقع لم نَستخدِم لا الطريقة الأفضل لتقسيم المشكلة ولا الطريقة الأفضل لإدارة الخيوط threads. سنتناول بهذا القسم نسختين من نفس البرنامج؛ حيث سنُحسِّن بالنسخة الأولى طريقة تقسيم المشكلة إلى عدّة مهام؛ بينما سنُحسِّن بالنسخة الثانية طريقة اِستخدام الخيوط. بالإضافة إلى ذلك، سنمرّ عبر مجموعةٍ من الأصناف المبنية مسبقًا والتي تُوفِّرها جافا لدعم المعالجة المتوازية، وسنناقش بنهاية القسم التابعين wait()‎ و notify()‎ المُستخدمين للتحكُّم بالعمليات المتوازية تحكمًا مباشرًا.

تقسيم المشكلات

يُقسِّم البرنامج MultiprocessingDemo1.java مُهمة حساب صورة إلى عدة مهامٍ فرعية، ويُسنِد كُلًا منها إلى خيط. يَعمَل هذا البرنامج بنجاح، ولكن هناك مشكلة متمثّلة في إمكانية استغراق بعض المهام وقتًا أطول من المهام الأخرى. في حين يُقسِّم البرنامج الصورة إلى أجزاءٍ متساوية، تتطلَّب بعض أجزائها معالجةً أكثر من أجزاءٍ أخرى. إذا شغَّلت البرنامج باستخدام ثلاثة خيوط، ستلاحظ أن معالجة الجزء الأوسط تستغرِق وقتًا أطول بعض الشيء من معالجة الجزأين السفلي والعلوي.

في العموم، عندما نُقسِّم مشكلةً معينةً إلى عدة مشكلاتٍ فرعيةٍ أصغر، يكون من الصعب توقُّع الزمن الذي ستحتاجه معالجة كل مشكلةٍ منها. والآن، لنفترض أن لدينا مشكلةً فرعيةً معينة تستغرق زمنًا أطول من غيرها. في تلك الحالة، ستكتمل جميع الخيوط عدا الخيط المسؤول عن معالجة تلك المشكلة؛ حيث سيستمر بالعمل لفترةٍ طويلةٍ نسبيًا، وسيَعمَل معالجٌ واحدٌ فقط خلال تلك الفترة دون بقية المعالجات.

إذا احتوى الحاسوب مثلًا على معالجين، وقسَّمنا مشكلةً ما إلى مشكلتين فرعيتين، ثم أنشأنا خيطًا لكل مشكلةٍ منهما. نطمح باستخدامنا لمعالجيّن إلى الحصول على الإجابة بنصف الزمن الذي سيستغرقه الحصول عليها عند استخدام معالجٍ واحد، ولكن إذا كان حل مشكلةٍ منهما يحتاج إلى أربعة أضعاف الزمن الذي يحتاجه حل المشكلة الأخرى، فسيكون معالجًا واحدًا فقط مُشغَّلًا غالبية الوقت، ولم نُقلِل في تلك الحالة الزمن المطلوب لحل المشكلة بنسبةٍ أكبر من 20%.

حتى لو تمكَّنا من تقسيم المشكلة إلى عدة مشكلاتٍ فرعية تتطلَّب زمن المعالجة نفسه، فإننا لا نستطيع الاعتماد ببساطةٍ على حقيقة كونها ستتطلَّب زمنًا متساويًا؛ حيث من الممكن أن تكون بعض المعالجات منهمكةً بتشغيل برامجٍ أخرى، أو قد يكون بعضها أبطأ عمومًا (هذا ليس محتملًا إذا شغَّلنا البرنامج على حاسوبٍ واحد، ولكن تختلف سرعة المعالجات بالحوسبة المُوزَّعة من خلال مجموعة حواسيب عبر الشبكة -وهو ما سنفعله لاحقًا-، وتُعدّ عندها مسألةً مهمة).

تتمثّل التقنية الأكثر شيوعًا للتعامل مع كل تلك المشكلات في تقسيم المشكلة إلى عددٍ كبيرٍ من المشكلات الفرعية الأصغر؛ أي أكبر بكثير من عدد المعالجات الموجودة، وسيضطّر بالتالي كل معالجٍ لحلّ عدة مشكلاتٍ فرعية. عندما يُكمِل معالج مهمةً فرعيةً معينة، تُسنَد إليه مهمةٌ فرعيةٌ أخرى ليَعمَل عليها إلى أن تُسنَد جميع المهام الفرعية. سيظل بالطبع هناك اختلافٌ بالزمن الذي تتطلَّبه كل مهمة، وبالتالي قد يُكمِل معالجٌ معينٌ عدة مشكلاتٍ فرعية، بينما قد يَعمَل معالجٌ آخر على مهمةٍ واحدةٍ معقدة؛ كما قد يُكمِل معالج بطيءٌ أو مشغولٌ مهمةً واحدةً فقط أو اثنتين، بينما قد يُنهِي معالجٌ آخر خمس أو ست مهام.

في العموم، سيَعمَل كل معالجٍ وفقًا لسرعته الخاصة، وستستمر غالبية المعالجات بالعمل إلى قرب اكتمال المعالجة بالكامل بشرط أن تكون المشكلات الفرعية صغيرةً كفاية. ويُطلَق على ذلك "توزيع الحمل load balancing"؛ أي توزيع حمل المعالجة بين المعالجات المتاحة لإبقائها جميعًا منشغلةً بأقصى ما يُمكِن. ستنهي بعض المعالجات بالطبع عملها قبل بعضها الآخر، ولكن بفترةٍ لا تتعدى الزمن المطلوب لإتمام المهمة الفرعية الأطول.

كما ذكرنا بالأعلى، ينبغي أن يكون حجم المشكلات الفرعية صغيرًا لا غاية في الصغر؛ لأن تقسيم المشكلة وإسنادها إلى المعالجات يتطلَّب حملًا إضافيًا overhead؛ فإذا كانت المشكلات الفرعية غايةً في الصغر، سيتراكم الحمل الإضافي ويُشكِّل فارقًا بمقدار العمل الكلي المطلوب. كانت المهمة بالمثال السابق هي حساب لون كل بكسلٍ بالصورة، فإذا أردنا تقسيم تلك المهمة إلى عدة مهامٍ فرعية، هناك عدة احتمالات؛ فقد تتكوَّن كل مهمة فرعية مثلًا من حساب لون بكسلٍ واحدٍ فقط، ولكنها تُعدّ صغيرة للغاية. بدلًا من ذلك، ستتكوَّن كل مهمةٍ فرعية من حساب لون صفٍ واحد من البكسلات. نظرًا لوجود عدة مئاتٍ من الصفوف بكل صورة، سيكون عدد المهام الفرعية كبيرًا كفاية، كما سيكون حجم كل مهمةٍ فرعيةٍ منها مناسبًا. سينتج عن ذلك توزيعًا جيدًا للحمل مع قدرٍ معقولٍ من الحمل الإضافي.

ملاحظة: تُعدّ المشكلة التي ناقشناها بالمثال السابق سهلةً للغاية بالنسبة للبرمجة على التوازي؛ بمعنى أنه عند تقسيم مشكلة حساب ألوان الصورة إلى عدة مشكلاتٍ فرعية أصغر، ستَظِل جميع المشكلات الفرعية مستقلةً تمامًا، وبالتالي يُمكِن معالجة أي عددٍ منها بنفس الوقت ووفقًا لأي ترتيب. في المقابل، قد تتطلّب بعض المهام الفرعية بمشكلاتٍ أخرى النتائج المحسوبة بواسطة بعض المهام الأخرى؛ أي أن المهام الفرعية غير مستقلة، وبالتالي ستتعقد الأمور في تلك الحالات، وسيُصبح ترتيب تنفيذ المهام الفرعية مُهمًا. علاوةً على ذلك، سيكون من الضروري توفير طريقةٍ ما لتشارُك تلك النتائج بين المهام؛ وفي حالة تنفيذها بخيوطٍ مختلفة، سنضطّر إلى مواجهة كل تلك المشكلات المُتعلّقة بالتحكم بوصول الخيوط إلى الموارد التشاركية. لذلك، يُعدّ تقسيم مشكلةٍ معينةٍ لمعالجتها على التوازي أمرًا أصعب بكثير مما قد يوحي به المثال السابق، ولكن بالنهاية، يُعدّ ذلك موضوعًا لدورةٍ تدريبية عن الحوسبة المتوازية لا دورةٍ عن أساسيات البرمجة.

مجمع الخيوط وأرتال المهام

بعد أن نُقرِّر طريقة تقسيم المشكلة إلى عدة مهامٍ فرعية، ينبغي أن نعرف كيفية إسناد تلك المهام الفرعية إلى الخيوط؛ حيث ينبغي وفقًا للأسلوب كائني التوجه object-oriented تمثيل كل مهمةٍ فرعيةٍ بواسطة كائن، ولأن كل مهمة تُمثِّل عملية معالجة معينة، فمن البديهي أن يُعرِّف ذلك الكائن تابع نسخة instance method يُنفِّذ تلك العملية.

من الضروري استدعاء التابع المقابل لمهمةٍ معينة حتى تُنفَّذ، وسيكون تابع المعالجة لهذا البرنامج، هو run()‎، وسيُنفِّذ الكائن المُمثِّل للمهمة الواجهة interface القياسية Runnable التي ناقشناها بقسم إنشاء الخيوط وتشغيلها من مقال مقدمة إلى الخيوط Threads في جافا. تُعدّ تلك الواجهة الطريقة المباشرة لتمثيل مهام المعالجة، ويُمكِننا إنشاء خيطٍ جديدٍ لكل كائن Runnable، ولكن في حالة وجود عددٍ كبيرٍ من المهام، لا يكون لذلك أي معنى؛ نتيجةً لمقدار الحمل الإضافي الناتج عن إنشاء كل خيطٍ جديد. بدلًا من ذلك، يُفضَّل إنشاء عددٍ قليل من الخيوط، بحيث يُسمَح لكُلٍ منها بتنفيذ قدرٍ معيّنٍ من المهام.

لاحِظ أن عدد الخيوط التي ينبغي استخدامها غير معروف، وقد يعتمد على المشكلة التي نحاول حلها. يتمحور الهدف عمومًا في إبقاء جميع معالجات الحاسوب مُشغَّلة؛ فبالنسبة لمثال حساب الصورة، كان إنشاء خيطٍ مقابل كل معالجٍ مناسبًا، ولكنه قد لا يتناسب مع جميع المشكلات. وبالتحديد، إذا كان هناك خيطٌ قد يتسبَّب بحدوث تعطيلٍ block لفترةٍ طويلة بينما ينتظر وقوع حدثٍ event معين، أو بينما ينتظر الوصول إلى موردٍ ما، فلربما يكون من الأفضل إنشاء خيوطٍ إضافية؛ لكي يَعمَل عليها كل معالجٍ أثناء تعطُّل الخيوط الأخرى.

يُطلَق على مجموعة الخيوط المُتاحة لتنفيذ المهام اسم "مجمع الخيوط thread pool"، وتُستخدَم بغرض تجنُّب إنشاء خيطٍ جديدٍ لكل مهمة، حيث تُسنَد المهمة المطلوب تنفيذها إلى أي خيطٍ مُتاحٍ بالمجمع.

عندما تنشغِل جميع الخيوط الموجودة بالمجمع، تضطّر أي مهامٍ أخرى إضافية للانتظار إلى أن تُتاح إحدى الخيوط، ويُعدّ ذلك تطبيقًا مباشرًا على الرتل queue؛ حيث يرتبط بمجمع الخيوط رتلٌ مُكوَّنٌ من المهام قيد الانتظار. بمجرد توفُّر مهمةٍ جديدة، ستُضَاف إلى الرتل؛ وبمجرد انتهاء خيطٍ معينٍ من تنفيذ المهمة المُوكَلة إليه، فسيَحصُل على مهمةٍ أخرى من الرتل ليَعمَل عليها.

هناك رتل مهامٍ task queue وحيدٍ لمجمع الخيوط. يَعنِي ذلك استخدام جميع خيوط المجمع نفس الرتل، فهو يُعدّ موردًا تشاركيًا. كما هو الحال مع أي موردٍ تشاركي، قد تقع حالات التسابق race conditions، ولهذا يكون استخدام المزامنة synchronization ضروريًا؛ فقد يحاول بدونها خيطان قراءة عنصرٍ من الرتل بنفس الوقت مثلًا، وعندها سيَحصُل كلاهما على نفس العنصر. حاول التعرف على الأماكن المُحتمَلة لوقوع حالات التسابق بالتابع dequeue()‎ المُعرَّف في قسم الأرتال Queues من الفصل المكدس Stack والرتل Queue وأنواع البيانات المجردة ADT.

تُوفِّر جافا الصنف ConcurrentLinkedQueue من أجل حل تلك المشكلة؛ وهو صنفٌ مُعرَّفٌ بحزمة package java.util.concurrent إلى جانب أصنافٍ أخرى مفيدة للبرمجة على التوازي. لاحِظ أنه صنف ذو معاملاتٍ غير مُحدَّدة النوع parameterized، ولذلك إذا أردنا إنشاء رتلٍ لحَمْل كائناتٍ من النوع Runnable، يُمكِننا كتابة ما يلي:

ConcurrentLinkedQueue<Runnable> queue = new ConcurrentLinkedQueue<>();

يُمثِّل هذا الصنف رتلًا مُنفَّذًا مثل قائمةٍ مترابطة linked list، كما أن عملياته متزامنةً بطريقةٍ مناسبة. ليست العمليات المُعرَّفة بالصنف ConcurrentLinkedQueue نفس العمليات على الأرتال التي عهدناها؛ فعلى سبيل المثال، يُضيف التابع queue.add(x)‎ العنصر الجديد x إلى نهاية queue؛ بينما يَحذِف التابع queue.poll()‎ عنصرًا من مقدمة الرتل queue. إذا كان الرتل فارغًا، يعيد التابع queue.poll()‎ القيمة null، ولهذا يُمكِننا استخدامه لفحص فيما إذا كان الرتل فارغًا، أو لإسترجاع عنصرٍ إذا لم يكن كذلك.

في الحقيقة، يُفضَّل فعل ذلك على هذا النحو؛ فقد يؤدي التأكُّد مما إذا كان الرتل فارغًا قبل قراءة عنصرٍ منه إلى وقوع حالة تسابق؛ حيث يستطيع خيطٌ آخر بدون تحقيق المزامنة حذف آخر عنصرٍ بالرتل باللحظة الواقعة بين لحظتي اختبارٍ للرتل فيما إذا كان فارغًا ومحاولة قراءة العنصر من الرتل، وفي تلك الحالة لن نجد شيئًا عند محاولة قراءة العنصر. في المقابل، يُعدّ التابع queue.poll()‎ بمثابة عمليةٍ ذرية atomic.

يُمكِننا استخدام رتلٍ ينتمي إلى الصنف ConcurrentLinkedQueue مع مجمع خيوطٍ لحساب الصورة من المثال السابق، حيث سنُنشِئ جميع المهام المسؤولة عن حساب الصورة ونضيفها إلى الرتل، ثم سنُنشِئ الخيوط التي ستُنفِّذ تلك المهام، ونُشغِّلها. سيتضمَّن كل خيطٍ منها حلقة تكرار loop بحيث يُستدعى تابع الرتل poll()‎ بكل تكرارٍ لقراءة مهمةٍ منه ثم تُنفَّذ. نظراً لأن المهمة هي كائنٌ من النوع Runnable، فكل ما ينبغي أن يفعله الخيط هو استدعاء تابع المهمة run()‎؛ عندما يُعيد التابع poll()‎ القيمة null، فإن الرتل قد أصبح فارغًا أي أن جميع المهام قد أُسندَت لخيوطٍ أخرى، ويُمكِن عندها للخيط المُستدعِي الانتهاء.

يُنفِّذ البرنامج MultiprocessingDemo2.java تلك الفكرة؛ حيث يَستخدِم رتلًا، اسمه taskQueue من النوع ConcurrentLinkedQueue<Runnable>‎ لحَمْل المهام. كما يَسمَح البرنامج للمُستخدِم بإلغاء العملية قبل انتهائها، حيث يَستخدِم متغيرًا منطقيًا متطايرًا volatile، اسمه running إشارةً للخيط بأن المُستخدِم قد ألغى العملية. عندما تُصبِح قيمة ذلك المتغير مساويةً للقيمة false، ينبغي أن ينتهي الخيط حتى لو لم يَكن الرتل فارغًا. تُعرِّف الشيفرة التالية الصنف المُتداخِل WorkerThread لتمثيل الخيوط:

private class WorkerThread extends Thread {
    public void run() {
        try {
            while (running) {
                Runnable task = taskQueue.poll(); // اقرأ مهمةً من الرتل
                if (task == null)
                    break; // لأن الرتل فارغ
                task.run();  // Execute the task;
            }
        }
        finally {
            threadFinished(); // ‫تذكّر أن الخيط قد انتهى.
   // ‫أضفناها بعبارة `finally` لنتأكَّد من استدعائها
        }
    }
}

يَستخدِم البرنامج الصنف المُتداخِل MandelbrotTask لتمثيل مهمة حساب صفٍ واحدٍ من البكسلات، حيث يُنفِّذ ذلك الصنف الواجهة Runnable، ويَحسِب تابعه run()‎ لون كل بكسلٍ بالصف، ثم يَنسَخ تلك الألوان إلى الصورة. تُوضِح الشيفرة التالية ما يفعله البرنامج عند بدء عملية المعالجة مع حذف قليلٍ من التفاصيل:

taskQueue = new ConcurrentLinkedQueue<Runnable>(); // Create the queue.
for (int row = 0; row < height; row++) {  // عدد الصفوف الموجودة بالصورة
    MandelbrotTask task;
    task = ... ;  // أنشِئ مهمةً لمعالجة صفٍ واحد من الصورة
    taskQueue.add(task); // أضف المهمة إلى الرتل
}

int threadCount = ... ; // عدد الخيوط الموجودة بالمجمع. يضبُطه المُستخدِم
workers = new WorkerThread[threadCount];
running = true;  // اضبط الإشارة قبل بدء تشغيل الخيوط
threadsRemaining = workers;  // عدد الخيوط قيد التشغيل
for (int i = 0; i < threadCount; i++) {
    workers[i] = new WorkerThread();
    try {
        workers[i].setPriority( Thread.currentThread().getPriority() - 1 );
    }
    catch (Exception e) {
    }
    workers[i].start();
}

تجدر الإشارة هنا إلى أنه من الضروري إضافة المهام إلى الرتل قبل بدء تشغيل الخيوط؛ لأننا نَستخدِم الرتل الفارغ بمثابة إشارةٍ إلى ضرورة انتهاء الخيوط؛ أي إذا وجدت الخيوط عند تشغيلها الرتل فارغًا، فستنتهي فورًا دون أن تُنجِز أي مهمة.

جرِّب البرنامج "MultiprocessingDemo2"؛ فهو يَحسِب نفس الصورة التي يَحسبِها البرنامج "MultiprocessingDemo1"، ولكنه يختلف عنه بالترتيب الذي تُحسَب على أساسه الصفوف في حالة استخدام أكثر من خيط. إذا شاهدت البرنامج بحرص، فستلاحظ عدم إضافة صفوف البكسلات بالترتيب من أعلى إلى أسفل؛ لأن خيط الصف i+1 قد يُنهِي عمله قبل أن يُنهِي خيط الصف i عمله أو حتى ما يَسبقه من صفوف. ستلاحِظ هذا التأثير بقدرٍ أكبر إذا استخدمت عدد خيوطٍ أكبر مما يحتويه حاسوبك من معالجات. جرِّب 20 خيطًأ مثلًا.

نمط المنتج والمستهلك والأرتال المعطلة

يُنشِئ البرنامج "MultiprocessingDemo2" مجمع خيوطٍ جديد تمامًا بكل مرةٍ يَرسِم بها صورة، وهو ما يبدو سيئًا. أليس من المفترض إنشاء مجموعةٍ واحدةٍ فقط من الخيوط ببداية البرنامج، واستخدامها لحساب أي صورة؟ حيث أن الهدف من اِستخدَام مجمع الخيوط بالنهاية هو انتظار الخيوط للمهام الجديدة وتنفيذها. ولكننا، لم نُوضِّح حتى الآن أي طريقةٍ لجعل خيطٍ ينتظر قدوم مهمةٍ جديدة، حيث يُوفِّر الرتل المُعطِّل blocking queue ذلك.

يُعدّ الرتل المُعطِّل تنفيذًا لإحدى أنماط المعالجة على التوازي، وهو نمط المُنتِج والمُستهلِك producer/consumer؛ حيث يُستخدَم هذا النمط في حالة وجود "مُنتِجٍ" واحدٍ أو أكثر لشيءٍ معين إلى جانب وجود "مُستهلِكٍ" واحد أو أكثر لذلك الشيء. لا بُدّ أن يَعمَل جميع المُنتِجين والمُستهِلِكين بنفس الوقت (أي معالجة على التوازي). إذا لم يَتوفَّر أي شيء للمعالجة، فسيضطّر المُستهلِك للانتظار إلى أن تتَوفَّر إحداها. قد يضطَّر المُنتِج ببعض التطبيقات للانتظار أحيانًا: إذا كان مُعدّل استهلاك الأشياء واحدًا لكل دقيقةٍ مثلًا، فمن غير المنطقي أن يكون معدّل إنتاجها اثنين لكل دقيقة مثلًا؛ لأنه سيؤدي إلى تراكمها بصورةٍ غير محدودة. ولذلك، لا بُدّ من تخصيص حدٍ أقصى لعدد الأشياء قيد الانتظار، وإذا وصلنا إلى ذلك الحد، لا بُدّ أن يتوقَّف المنتجون عن إنتاج أي أشياءٍ أخرى لبعض الوقت.

والآن، سنحتاج إلى طريقةٍ لنقل الأشياء من المُنتجِين إلى المُستهلِكين، حيث يُعدّ الرتل الحل الأمثل لذلك: سيَضَع المنتجون الأشياء بأحد طرفي الرتل، وسيقرأها المُستهلِكون من الطرف الآخر.

001Producer_Consumer.png

نظرًا لأننا نُجرِي معالجةً على التوازي، سنَستخدِم رتلًا متزامنًا، ولكننا نحتاج إلى ما هو أكثر من ذلك؛ فعندما يُصبِح الرتل فارغًا، نريد طريقةً تضطّر المستهلِكين للانتظار إلى أن يُوضَع شيءٌ جديدٌ بالرتل؛ وإذا أصبح ممتلئًا، نريد طريقةً تضطّر المُنتجِين للانتظار إلى أن يُتاح مكانٌ بالرتل. يُمثَّل كُلٌ من المُستهلِكين والمُنتجِين باستخدام الخيوط. إذا كان الخيط مُتوقِّفًا بانتظار حدوث شيءٍ معين، يُقَال أنه مُعطَّل blocked، ولذلك يُعدُّ الرتل المُعطِّل هو نوع الرتل الذي نحتاجه.

عندما نَستخدِم رتلًا معطِّلًا، وكان ذلك الرتل فارغًا، ستؤدي عملية سحب dequeue عنصرٍ من الرتل إلى تعطيل المُستدعِي؛ أي إذا حاول خيطٌ معينٌ سحَب عنصرٍ من رتلٍ فارغ، فسيتوقَّف إلى أن يُتاح عنصرٌ جديد، وسيستيقظ عندها الخيط ويقرأ العنصر ويُكمِل عمله. بالمثل، إذا وصل الرتل إلى سعته القصوى، وحاول مُنتِجٌ معينٌ إدخال عنصرٍ به، فسيتعطَّل إلى أن يُتَاح مكانٌ بالرتل.

تحتوي حزمة java.util.concurrent على صنفين يُنفِّذان الرتل المُعطِّل: LinkedBlockingQueue و ArrayBlockingQueue، وهما من الأنواع ذات المعاملات غير مُحدَّدة النوع parameterized types؛ أي يَسمحَا بتخصيص نوع العنصر الذي سيَحمله الرتل، ويُنفِّذ كلٌ منهما الواجهة BlockingQueue. إذا كان bqueue رتلًا مُعطِّلًا ينتمي إلى أحد الصنفين السابقين، فإنه يُعرِّف العمليات التالية:

  • bqueue.take()‎: يَحذِف item من الرتل ويعيده؛ فإذا كان الرتل فارغًا عند استدعائه، يتعطَّل الخيط المُستدعِي إلى أن يُتاح عنصرٌ جديدٌ بالرتل. يُبلِّغ التابع عن استثناءٍ exception من النوع InterruptedException إذا قُوطَع الخيط أثناء تعطُّله.
  • bqueue.put(item)‎: يُضيِف item إلى الرتل. إذا كان للرتل سعةً قصوى وقد أصبح ممتلئًا، يتعطَّل الخيط المُستدعِي إلى أن يُتَاح مكانٌ بالرتل. يُبلِّغ التابع عن استثناءٍ من النوع InterruptedException إذا قُوطَع الخيط أثناء تعطُّله.
  • bqueue.add(item)‎: يُضيف item إلى الرتل في حالة وجود مكانٍ متاح. إذا كان للرتل سعةً قصوى وقد أصبح ممتلئًا، يُبلِّغ التابع عن استثناءٍ من النوع IllegalStateException، ولكنه لا يُعطِّل المُستدعِي.
  • bqueue.clear()‎: يحذِف جميع العناصر من الرتل ويُهملها.

تُعرِّف الأرتال المُعطِّلة بجافا الكثير من التوابع الأخرى. يتشابه التابع bqueue.poll(500)‎ مثلًا مع التابع bqueue.take()‎ باستثناء أنه يُعطِّل لمدة 500 ميللي ثانية بحدٍ أقصى، ولكن التوابع المذكورة بالأعلى كافيةٌ للأمثلة التالية. لاحِظ وجود تابعين لإضافة العناصر إلى الرتل؛ حيث يُعطِّل التابع bqueue.put(item)‎ المُستدعِي إذا لم يَكُن هناك أي مكانٍ آخر متاحٍ بالرتل، ولذلك يُستخدَم مع أرتال التعطيل محدودة السعة؛ بينما لا يُعطِّل التابع bqueue.add(item)‎ المُستدعِي، ولذلك يُستخدَم مع أرتال التعطيل التي تملُك سعةً غير محدودة.

تُخصَّص السعة القصوى لرتلٍ من النوع ArrayBlockingQueue عند إنشائه. تُنشِئ الشيفرة التالية على سبيل المثال رتلًا مُعطِّلًا بإمكانه حَمْل ما يَصِل إلى 25 كائنٍ من النوع ItemType:

ArrayBlockingQueue<ItemType> bqueue = new ArrayBlockingQueue<>(25);

إذا اِستخدَمنا الرتل المُعرَّف بالأعلى، فسيُعطِّل التابع bqueue.put(item)‎ المُستدعِي إذا كان bqueue يحتوي على 25 عنصرٍ بالفعل؛ بينما يُبلِّغ التابع bqueue.add(item)‎ عن استثناءٍ في تلك الحالة. يضمَن ذلك عدم إنتاج العناصر بمعدّلٍ أسرع من مُعدّل استهلاكها. في المقابل، يُستخدَم الصنف LinkedBlockingQueue لإنشاء أرتالٍ مُعطِّلة بسعةٍ غير محدودة. ألقِ نظرةً على المثال التالي:

LinkedBlockingQueue<ItemType> bqueue = new LinkedBlockingQueue<>();

تُنشِئ الشيفرة السابقة رتلًا بدون حدٍ أقصى لعدد العناصر التي يُمكِنه حملها. في تلك الحالة، لن يتسبَّب التابع bqueue.put(item)‎ بحدوث تعطيل نهائيًا، ولن يُبلِّغ التابع bqueue.add(item)‎ عن استثناءٍ من النوع IllegalStateException على الإطلاق. يُمكِننا اِستخدَام الصنف LinkedBlockingQueue إذا أردنا تجنُّب تعطيل المُنتجِين، ولكن من الجهة الأخرى، لا بُدّ أن نتأكَّد من بقاء الرتل بحجمٍ معقول. يؤدي التابع bqueue.take()‎ إلى حدوث تعطيلٍ إذا كان الرتل فارغًا لكلا الصنفين.

يَستخدِم البرنامج التوضيحي MultiprocessingDemo3.java رتلًا من الصنف LinkedBlockingQueue بدلًا من الصنف ConcurrentLinkedQueue المُستخدَم في النسخة السابقة من البرنامج MultiprocessingDemo2.java. يحتوي الرتل في هذا المثال على عدة مهام (أي العناصر المنتمية للنوع Runnable)، ويُصرَّح عنه على أنه تابع نسخة instance variable اسمه taskQueue على النحو التالي:

LinkedBlockingQueue<Runnable> taskQueue;

عندما ينقر المُستخدِم على زر "Start" لحساب الصورة، نُضيف جميع مهام حساب الصورة إلى ذلك الرتل باستدعاء التابع taskQueue.add(task)‎ لكل مهمةٍ على حدى. من المهم أن يحدث ذلك دون تعطيل؛ لأننا نُنشِئ تلك المهام بخيط معالجة الأحداث events الذي لا ينبغي تعطيله. لن ينمو الرتل إلى ما لانهاية؛ لأن البرنامج يَعمَل على صورةٍ واحدةٍ فقط بكل مرة، وهناك مئاتٌ قليلةٌ من المهام للصورة الواحدة.

على نحوٍ مشابه للنسخة السابقة من البرنامج، ستحذِف الخيوط العاملة المنتمية إلى مجمع الخيوط thread pool المهام من الرتل وتُنفِّذها، ولكنها -أي الخيوط- تُنشَئ مرةً واحدةً فقط ببداية البرنامج؛ أو بتعبيرٍ أدق عندما ينقر المُستخدِم على زر "Start" لأول مرة. يُعاد استخدام نفس تلك الخيوط لأي عددٍ من الصور، وفي حالة عدم وجود أي مهامٍ أخرى، سيُصبِح الرتل فارغًا، وستتعطَّل الخيوط إلى حين قدوم مهامٍ جديدة. تُنفِّذ تلك الخيوط حلقة تكرارٍ لا نهائية infinite loop، وتُعالِج المهام للأبد، ولكنها تقضِي وقتًا طويلًا معطَّلةً بانتظار إضافة مهمةٍ جديدةٍ إلى الرتل. انظر تعريف الصنف المُمثِل لتلك الخيوط:

// 1
private class WorkerThread extends Thread {
    WorkerThread() {
        try {
            setPriority( Thread.currentThread().getPriority() - 1);
        }
        catch (Exception e) {
        }
        try {
            setDaemon(true);
        }
        catch (Exception e) {
        }
        start(); // يبدأ الخيط العمل بمجرد تشغيله
    }
    public void run() {
        while (true) {
            try {
                Runnable task = taskQueue.take(); // انتظر مهمة إذا كان ذلك ضروريًا
                task.run();
            }
            catch (InterruptedException e) {
            }
        }
    }
}

[1] يُعرِّف هذا الصنف الخيوط العاملة الموجودة بمجمع الخيوط، حيث يَعمَل كائنٌ من هذا الصنف بحلقةٍ يَسترجِع كل تكرارٍ منها مهمةً من الرتل taskQueue ثم يستدعي التابع run()‎ الخاص بتلك المهمة. إذا كان الرتل فارغًا، يتعطَّل الخيط إلى أن تتوفَّر مهمةٌ جديدةٌ بالرتل. يتولى الباني مهمة بدء الخيط، وبالتالي لن يضطر البرنامج main لفعل ذلك. يَعمَل الخيط بأولويةٍ أقل من أولوية الخيط الذي اِستدعَى الباني. صُمِّم الصنف لكي يَعمَل بحلقةٍ لا نهائية تنتهي فقط عند إغلاق آلة جافا الافتراضية Java virtual machine، وهذا على فرض عدم تبليغ المهام المُنفَّذة عن أي استثناءاتٍ وهو افتراضٌ صحيحٌ بهذا البرنامج. يَضبُط الباني الخيط ليعمل مثل خيطٍ خفي، وبالتالي تنتهي آلة جافا الافتراضية تلقائيًا عندما تكون الخيوط الوحيدة الموجودة من النوع الخفي، أي لا يَمنَع وجود تلك الخيوط آلة جافا من الإغلاق.

ينبغي فحص طريقة عمل مجمع الخيوط، حيث تُنشَأ الخيوط وتُشغَّل قبل وجود أي مهمة. يَستدعِي كل خيطٍ منها التابع taskQueue.take()‎ فورًا، ونظرًا لأن رتل المهام فارغ، تتعطَّل جميع الخيوط بمجرد تشغيلها. والآن، لكي نُعالِج صورةً معينة، يُنشِئ خيط معالجة الأحداث المهام الخاصة بتلك الصورة، ويُضيفها إلى الرتل. بمجرد حدوث ذلك، تعود الخيوط للعمل وتبدأ بمعالجة المهام، ويستمر الحال كذلك إلى أن يَفرُغ الرتل مرةً أخرى. في حالة تشغيل البرنامج بحاسوبٍ مُتعدّد المعالجات، تبدأ بعض الخيوط بمعالجة المهام المُضافة إلى الرتل بينما ما يزال خيط معالجة الأحداث مستمرٌ بإضافة المهام. عندما يُصبِح الرتل فارغًا، تتعطَّل الخيوط مجددًا إلى أن نرغب بمعالجة صورةٍ جديدة.

إضافةً إلى ما سبق، قد نرغب بإلغاء معالجة صورةٍ معينةٍ قبل انتهائها، ولكننا لا نريد إنهاء الخيوط العاملة في تلك الحالة. عندما ينقر المُستخدِم على الزر "Abort"، يَستدعِي البرنامج التابع taskQueue.clear()‎، مما يَمنَع إسناد أي مهامٍ أخرى إلى الخيوط، ومع ذلك فمن المحتمل أن تكون بعض المهام قيد التنفيذ بالفعل بينما نُفرِّغ الرتل، وستكتمل بالتالي تلك المهام بعد إلغاء المعالجة المُفترَض كونهم أجزاءٌ منها، ولكننا لا نريد تطبيق خَرْج تلك المهام على الصورة.

يُمكِننا حلّ تلك المشكلة بإسناد رقم وظيفة لكل وظيفة معالجة؛ حيث سيُخزَّن رقم الوظيفة الحالية بمتغير نسخة instance variable اسمه jobNum. ينبغي أن يحتوي كل كائن مُمثِّل لمهمة على تابع نسخة يُحدِّد الوظيفة التي تُعدّ تلك المهمة جزءًا منها. تزداد قيمة jobNum بمقدار الواحد عند انتهاء وظيفة؛ إما لأنها انتهت على نحوٍ طبيعي؛ أو لأن المُستخدِم قد ألغاها. عند اكتمال مهمةٍ معينة، لا بُدّ أن نوازن بين رقم الوظيفة المُخزَّن بكائن المهمة وبين jobNum؛ فإذا كانا متساويين، تكون المهمة جزءًا من الوظيفة الحالية، ويُطبَق خَرْجها على الصورة؛ أما إذا لم يكونا متساويين، تكون تلك المهمة جزءًا من الوظيفة السابقة، ويُهمَل خَرْجها.

من المهم أن يكون الوصول إلى jobNum متزامنًا synchronized، وإلا قد يَفحَص خيطٌ معينٌ رقم الوظيفة بينما يزيده خيطٌ آخر، وعندها قد نَعرِض خرجًا معنيًّا لوظيفةٍ سابقة مع أننا ألغينا تلك الوظيفة. جميع التوابع التي تقرأ قيمة jobNum أو تُعدِّله في هذا البرنامج متزامنة. يُمكِنك قراءة شيفرة البرنامج لترى طريقة عملها.

هناك ملاحظةٌ إضافية عن البرنامج "MultiprocessingDemo3"، وهي: نحن لم نُوفِّر أي طريقةٍ لإنهاء الخيوط العاملة ضمن ذلك البرنامج أي أنها ستستمر بالعمل إلى أن نُغلِق آلة جافا الافتراضية Java Virtual Machine. يُمكِننا السماح بإنهاء الخيوط قبل ذلك باستخدام متغيرٍ متطايرٍ volatile، اسمه running، وضبط قيمته إلى false عندما نرغب بإنهائها، وسنُعرِّف التابع run()‎ الموجود بالخيوط على النحو التالي:

public void run() {
    while ( running ) {
       try {
          Runnable task = taskQueue.take();
          task.run();
       }
       catch (InterruptedException e) {
       }
    }
}

ومع ذلك، إذا كان هناك خيطٌ مُعطّلٌ نتيجةً لاستدعاء taskQueue.take()‎، فلن يتمكَّن من رؤية القيمة الجديدة للمتغيّر running قبل أن يعود للعمل. لنتأكَّد من إنهائه، يُمكِننا استدعاء التابع worker.interrupt()‎ لكل خيط worker بعد ضبط قيمة running إلى false.

في حالة تنفيذ خيطٍ لمهمةٍ بينما نضبُط قيمة running إلى false، فإنه لن ينتهي حتى يُكمِل تلك المهمة. إذا كانت المهام قصيرةً نسبيًا، لن يُشكِّل ذلك مشكلة، ولكن إذا استغرقت المهام وقتًا أطول مما ترغب بانتظاره، فلا بُدّ أن تَفحَص المهام قيمة running دوريًا، وتنتهي إذا أصبحت قيمته مساويةً القيمة false.

نهج ExecutorService لتنفيذ المهام

يشيع استخدام مجمعات الخيوط thread pools بالبرمجة على التوازي، ولذلك، تُوفِّر جافا أدوات عالية المستوى لإنشاء مجمعات الخيوط وإدارتها. تُعرِّف الواجهة ExecutorService من حزمة java.util.concurrent خدماتٍ يُمكِنها تنفيذ المهام المُرسَلة إليها. يحتوي الصنف Executors على توابعٍ ساكنة static تُنشِئ أنواعًا مختلفةً من النوع ExecutorService. وبالأخص، يُنشِئ التابع Executors.newFixedThreadPool(n)‎ مجمع خيوطٍ مُكوَّن من عدد n من الخيوط، حيث n هي عدد صحيح. تُنشِئ الشيفرة التالية مجمع خيوط مُكوّنٍ من خيطٍ واحدٍ لكل معالج:

int processors = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(processors);

يُستخدَم التابع executor.execute(task)‎ لإرسال كائنٍ من النوع Runnable لتنفيذه، ويعود على الفور بعد وضعه للمهمة داخل رتل المهام المُنتظِرَة. تَحذِف الخيوط الموجودة بمجمع الخيوط المهام من الرتل وتُنفِّذها.

يُخبِر التابع executor.shutdown()‎ مجمع الخيوط بأن عليه الانتهاء بعد تنفيذ جميع المهام المُنتظِرَة، ويعود التابع على الفور دون أن ينتظر انتهاء الخيوط. بعد استدعاء ذلك التابع، لا يُسمَح بإضافة مهامٍ جديدة. يُمكِنك استدعاء shutdown()‎ أكثر من مرة، ولن يُعدّ ذلك خطأً. لا تُعدّ الخيوط الموجودة بمجمع الخيوط خيوطًا خفية daemon threads؛ أي في حالة انتهاء الخيوط الأخرى دون إغلاق الخدمة، يكون وجود تلك الخيوط كافٍ لمنع إغلاق آلة جافا الافتراضية.

يتشابه التابع executor.shutdownNow()‎ مع التابع executor.shutdown()‎، إلا أنه يُهمِل المهام التي ما تزال قيد الانتظار بالرتل، وتُكمِل الخيوط المهام التي كانت قد حُذفت من الرتل بالفعل قبل الإغلاق.

يَختلف البرنامج التوضيحي MultiprocessingDemo4.java عن البرنامج MultiprocessingDemo3؛ حيث يَستخدِم النوع ExecutorService بدلًا من الاستخدام المباشر للخيوط والأرتال المُعطِّلة. نظرًا لعدم وجود طريقةٍ بسيطةٍ تَسمَح للنوع ExecutorService بتجاهُل المهام المُنتظِرَة دون أن يُغلَق، يُنشِئ البرنامج "MultiprocessingDemo4" كائنًا جديدًا من النوع ExecutorService لكل صورة.

يُمكِننا تمثيل المهام المُستخدَمة مع النوع ExecutorService بكائناتٍ من النوع Callable<T>‎، حيث يُمثِّل ذلك النوع واجهة نوع دالة functional interface ذات معاملاتٍ غير مُحدَّدة النوع، ويُعرِّف التابع call()‎، الذي لا يستقبل أي معاملاتٍ ويعيد النوع T. يُمثِل النوع Callable مهمةً تُخرِج قيمة.

يُمكِننا إرسال كائنٍ c من النوع Callable إلى النوع ExecutorService باستدعاء التابع executor.submit(c)‎، حيث تُنفَّذ المهمة من النوع Callable بلحظةٍ ما في المستقبل. في تلك الحالة، كيف سنَحصُل على نتيجة المعالجة عند اكتمالها؟ يُمكِننا حل تلك المشكلة باستخدام واجهةٍ أخرى هي Future<T>‎، التي تُمثِّل قيمةً من النوع T قد تكون غير متاحةٍ حتى وقتٍ ما بالمستقبل. يعيد التابع executor.submit(c)‎ قيمةً من النوع Future تُمثِّل نتيجة المعالجة المؤجَّلة.

يُعرِّف كائنٌ v من النوع Future مجموعةً من التوابع، مثل الدالة المنطقية v.isDone()‎ التي يُمكِننا استدعاؤها لفحص فيما إذا كانت نتيجة المعالجة قد أصبحت متاحة؛ وكذلك التابع v.get()‎ الذي يسترجع نتيجة المعالجة المؤجَّلة، وسيُعطًّل إلى أن تُصبِح القيمة متاحة، كما قد يُبلِّغ عن استثناءات، ولذلك ينبغي استدعاؤه ضمن تعليمة try..catch.

يَستخدِم المثال ThreadTest4.java الأنواع Callable و Future و ExecutorService لعدّ عدد الأعداد الأولية الواقعة ضمن نطاقٍ معينٍ من الأعداد الصحيحة؛ كما يُجرِي نفس المعالجة التي أجراها البرنامج ThreadTest2.java في قسم الإقصاء التشاركي Mutual Exclusion وتعليمة التزامن synchronized من مقال مقدمة إلى الخيوط Threads في جافا. ستَعُدّ كل مهمةٍ فرعية في هذا البرنامج عدد الأعداد الأولية ضمن نطاقٍ أصغر من الأعداد الصحيحة، وستُمثَّل تلك المهام الفرعية من خلال كائناتٍ من النوع Callable<Integer>‎ المُعرَّفة بالصنف المتداخل nested التالي:

// 1
private static class CountPrimesTask implements Callable<Integer> {
    int min, max;
    public CountPrimesTask(int min, int max) {
        this.min = min;
        this.max = max;
    }
    public Integer call() {
        int count = countPrimes(min,max);  // يبدأ بالعدّ
        return count;
    }
}

[1] تعدّ الكائنات المنتمية إلى هذا الصنف الأعداد الأولية الموجودة ضمن نطاقٍ معين من الأعداد الصحيحة من min إلى max. تُمرَّر قيمة المتغيرين min و max مثل معاملاتٍ للباني. يحسب التابع call()‎ عدد الأعداد الأولية ثم يعيدها.

ستُرسَل جميع المهام الفرعية إلى مجمع خيوط مُنفَّذ باستخدام النوع ExecutorService، وتُخزَّن النتائج من النوع Future التي يعيدها داخل مصفوفةٍ من النوع ArrayList. ألقِ نظرةً على الشيفرة التالية:

int processors = Runtime.getRuntime().availableProcessors();
ExecutorService executor = Executors.newFixedThreadPool(processors);

ArrayList<Future<Integer>> results = new ArrayList<>();

for (int i = 0; i < numberOfTasks; i++) {

    CountPrimesTask oneTask = . . . ;
    Future<Integer> oneResult = executor.submit( oneTask );
    results.add(oneResult);  // خزِّن الكائن الذي يُمثِّل النتيجة المؤجَلة

}

لا بُدّ أن نُضيف الأعداد الصحيحة الناتجة عن المهام الفرعية إلى المجموع النهائي. سنحصل أولًا على خَرْج تلك المهام باستدعاء التابع get()‎ لعناصر المصفوفة المنتمية إلى النوع Future. لن تكتمل العملية إلا بعد انتهاء جميع المهام الفرعية، لأن التابع يُعطِّل المُستدعِي إلى أن تتوفَّر النتيجة. ألقِ نظرةً على الشيفرة التالية:

int total = 0;
for ( Future<Integer> res : results) {
    try {
        total += res.get();  // انتظر اكتمال المهمة
    } 
    catch (Exception e) { // لا ينبغي أن تحدث بهذا البرنامج
    }
}

تابعا الانتظار Wait والتنبيه Notify

إذا كنا نريد كتابة تنفيذٍ للرتل المعطِّل، فينبغي أن نُعطِّل الخيط إلى حين وقوع حدثٍ معين؛ أي أن ينتظر الخيط وقوع ذلك الحدث، وينبغي أن نُبلِّغه عند وقوعه بطريقةٍ ما. سنَستخدِم لذلك خيطين؛ حيث يقع الفعل المُسبِّب للحدث المنتظَر (مثل إضافة عنصرٍ إلى رتل) بخيطٍ غير الخيط المُعطَّل.

لا يُمثِّل ما يَلي مشكلةً للأرتال المُعطِّلة فقط؛ ففي حالة وجود خيطٍ يُنتِج خرجًا يحتاج إليه خيطٌ آخر، فإن ذلك يَفرِض نوعًا من التقييد على الترتيب الذي ينبغي للخيوط أن تُنفِّذ العمليات على أساسه. إذا وصلنا إلى النقطة التي يحتاج خلالها الخيط الثاني إلى الخرج الناتج عن الخيط الأول، قد يضطّر الخيط الثاني إلى التوقُّف وانتظار إتاحة ذلك الخرج؛ ونظرًا لأنه لا يستطيع الاستمرار، فإنه قد ينام sleep، ولا بُدّ في تلك الحالة من توفير طريقةٍ لتنبيهه عندما يُصبِح الخرج متاحًا، حتى يستيقظ ويُكمِل عملية المعالجة.

تُوفِّر جافا بالطبع طريقةً لتنفيذ هذا النوع من الانتظار والتنبيه؛ حيث يحتوي الصنف Object على تابعي النسخة wait()‎ و notify()‎، ويُمكِن استخدامهما مع أي كائن، كما يمكن للأرتال المعطِّلة اِستخدَام تلك التوابع ضمن تنفيذها الداخلي، ولكنها منخفضة المستوى وعرضةً للأخطاء؛ ولذلك يُفضَّل اِستخدام أساليب التحكُّم عالية المستوى مثل أرتال الأولوية قدر الإمكان. مع ذلك، من الجيد معرفة القليل عن التابعين wait()‎ و notify()‎، فلربما قد تحتاج إلى استخدامهما مباشرةً. من غير المعروف فيما إذا كانت أصناف جافا القياسية للأرتال المُعطِّلة تَستخدِم هذين التابعين فعليًا، خاصةً مع توقُّر طرائقٍ أخرى لحل مشكلة الانتظار والتنبيه.

السبب وراء ضرورة ربط التابعين wait()‎ و notify()‎ بالكائنات واضح، وبالتالي ليس هناك داعٍ للقلق بشأن ذلك، فهو يَسمَح على الأقل بتوجيه تنبيهاتٍ من أنواعٍ مختلفة إلى مستقبلين من أنواعٍ مختلفة اعتمادًا على تابع الكائنnotify()‎ المُستدعى.

عندما يَستدعِي خيطٌ ما التابع wait()‎ الخاص بكائنٍ معين، يتوقَّف ذلك الخيط وينام إلى حين استدعاء التابع notify()‎ الخاص بنفس الكائن، حيث سيكون استدعاؤه ضروريًا من خلال خيطٍ آخر؛ لأن الخيط الذي اِستدعَى wait()‎ سيكون نائمًا. تَعمَل إحدى الأنماط الشائعة على النحو التالي: يستدعِي خيط A التابع wait()‎ عندما يحتاج إلى الخرج الناتج من خيط B، ولكن ذلك الخرج غير متاحٍ بعد. عندما يُحصِّل الخيط B الخرج المطلوب، فإنه يَستدعِي التابع notify()‎ الذي سيوقِظ الخيط A إذا كان منتظرًا ليتمكَّن من اِستخدَام الناتج. في الواقع، ليس من الخطأ استدعاء التابع notify()‎ حتى لو لم يَكُن هناك أي خيوطٍ مُنتظِرَة، فليس لها أي تأثير. لنُنفِّذ ذلك، ينبغي أن يُنفِّذ الخيط A شيفرةً مشابهةً لما يلي، حيث obj هو كائن:

if ( resultIsAvailable() == false )
   obj.wait();  // انتظر تنبيهًا بأن النتيجة مُتاحة
useTheResult();

بينما ينبغي أن يُنفِّذ الخيط B شيفرةً مشابهةً لما يَلي:

generateTheResult();
obj.notify();  // أرسل تنبيهًا بأن النتيجة قد أصبحت متاحة

تعاني تلك الشيفرة من حالة تسابق race condition، فقد يُنفِّذ الخيطان شيفرتهما بالترتيب التالي:

// يفحص الخيط‫ A التابع `resultIsAvailable()‎` ولا يجد النتيجة بعد، لذلك، يُنفِّذ تعليمة `obj.wait()‎`، ولكن قبل أن يفعل،
1.  Thread A checks resultIsAvailable() and finds that the result is not ready,
        so it decides to execute the obj.wait() statement, but before it does,
// ‫ينتهي الخيط B من عمله ويَستدعِي التابع `obj.notify()‎`
2.  Thread B finishes generating the result and calls obj.notify()
// ‫يَستدعِي الخيط A التابع `obj.wait()‎` لينتظر تنبيهًا بتوفُّر النتيجة 
3.  Thread A calls obj.wait() to wait for notification that the result is ready.

ينتظر الخيط A بالخطوة الثالثة تنبيهًا لن يحدث أبدًا؛ لأن notify()‎ قد اُستدعيَت بالفعل بالخطوة الثانية. يُمثِّل ذلك نوعًا من القفل الميت deadlock الذي يُمكِنه أن يترك الخيط A مُنتظِرًا للأبد. نحتاج إذًا إلى نوعٍ من المزامنة synchronization. يكمن حل تلك المشكلة في وضع شيفرة الخيطين A و B داخل تعليمة synchronized، ومن البديهي أن تكون المزامنة بناءً على نفس الكائن obj المُستخدَم عند استدعاء wait()‎ و notify()‎.

نظرًا لأهمية استخدام المزامنة عند كل استدعاءٍ للتابعين wait()‎ و notify()‎ تقريبًا، جعلته جافا أمرًا ضروريًا؛ أي بإمكان خيطٍ معينٍ استدعاء obj.wait()‎ أو obj.notify()‎ فقط إذا كان ذلك الخيط قد حَصَل على قفل المزامنة المُرتبِط بالكائن obj؛ أما إذا لم يَكُن قد حَصَل عليه، يحدث استثناء من النوع IllegalMonitorStateException. لا يتطلَّب هذا الاستثناء معالجةً إجباريةً ولا يُلتقَط على الأرجح. علاوةً على ذلك، قد يُبلِّغ التابع wait()‎ عن اتستثناءٍ من النوع InterruptedException، ولذلك لا بُدّ من استدعائه ضمن تعليمة try لمعالجته.

لنفحص الآن طريقة وصول خيطٍ معينٍ إلى نتيجةٍ يحسبها خيطٌ آخر. يُعدّ ذلك مثالًا مبسطًا على مشكلة المُنتِج والمُستهلِك producer/consumer، حيث يُنتَج عنصرٌ واحدٌ فقط ثم يُستهلَك. لنفترض أن لدينا متغيرًا تشاركيًا، اسمه sharedResult مُستخدَمٌ لنقل النتيجة من المُنتِج إلى المُستهلِك. عندما تُصبِح النتيجة جاهزة، يضبُط المُنتِج ذلك المتغير إلى قيمةٍ غير فارغة. يُحدِّد المُستهلِك من الجهة الأخرى فيما إذا كانت النتيجة جاهزةً أم لا بفحص قيمة المتغير sharedResult إذا كانت فارغة. سنَستخدِم مُتغيّرًا اسمه lock للمزامنة. يُمكِننا كتابة شيفرة الخيط المُمثِّل للمُنتِج على النحو التالي:

makeResult = generateTheResult();  // غير متزامن
synchronized(lock) {
   sharedResult = makeResult;
   lock.notify();
}

بينما سيُنفِّذ المُستهلِك الشيفرة التالية:

synchronized(lock) {
   while ( sharedResult == null ) {
      try {
         lock.wait();
      }
      catch (InterruptedException e) {
      }
   }
   useResult = sharedResult;
}
useTheResult(useResult);  // Not synchronized!

لاحِظ أن استدعاء كُلٍ من التابعين generateTheResult()‎ و useTheResult()‎ غير متزامن، لنَسمَح بتنفيذهما على التوازي مع الخيوط الأخرى التي قد تُجرِي تزامنًا بناءً على lock، ولكن نظرًا لأن المتغير sharedResult تشاركي، كان من الضروري أن تكون جميع مراجِعه references متزامنة؛ أي لا بُدّ من كتابتها داخل تعليمة synchronized، مع محاولة تنفيذ أقل ما يُمكِن عمومًا داخل كتل الشيفرة المتزامنة.

ربما لاحظت شيئًا مضحكًا بالشيفرة: لا ينتهي lock.wait()‎ قبل تنفيذ lock.notify()‎، ولكن نظرًا لأن كليهما مكتوبٌ داخل تعليمة synchronized بتزامنٍ مبني على الكائن نفسه، قد تتساءل: أليس من المستحيل تنفيذ هذين التابعين بنفس الوقت؟ في الواقع، يُعدّ التابع lock.wait()‎ حالةً خاصة؛ فعندما يَستدعِي خيطٌ ما التابع lock.wait()‎، فإنه يترك قفله بالضرورة على كائن المزامنة، مما يَسمَح لخيطٍ آخرٍ بتنفيذ كتلة شيفرة داخل تعليمة synchronized(lock)‎ أخرى يوجد بداخلها استدعاءٌ للتابع lock.notify()‎. وبالتالي، بعدما يُنهِي الخيط الثاني تنفيذ تلك الكتلة، يعود القفل إلى الخيط الأول المُستهلِك مما يُمكِّنه من إكمال عمله.

تُنتَج في نمط المُنتِج والمُستهلِك العادي عدة نتائجٍ بواسطة خيط مُنتِجٍ واحدٍ أو أكثر، وتُستهلَك بواسطة خيط مُستهلِكٍ واحدٍ أو أكثر، وبدلًا من وجود كائن sharedResult وحيد، تجد قائمةً بالكائنات المُنتَجَة التي لم تُستهلَك بعد. لنفحص طريقة فعل ذلك باستخدام صنفٍ بسيطٍ للغاية يُنفِّذ العمليات الثلاثة على رتلٍ من النوع LinkedBlockingQueue<Runnable>‎، الذي اِستخدَمناه بالبرنامج MultiprocessingDemo3. ألقِ نظرةً على الشيفرة التالية:

import java.util.LinkedList;

public class MyLinkedBlockingQueue {

    private LinkedList<Runnable> taskList = new LinkedList<Runnable>();

    public void clear() {
        synchronized(taskList) {
            taskList.clear();
        }
    }

    public void add(Runnable task) {
        synchronized(taskList) {
            taskList.addLast(task);
            taskList.notify();
        }
    }

    public Runnable take() throws InterruptedException {
        synchronized(taskList) {
            while (taskList.isEmpty())
                taskList.wait();
            return taskList.removeFirst();
        }
    }

}

سنَستخدِم كائنًا من ذلك الصنف بديلًا عن الكائن taskQueue بالبرنامج "MultiprocessingDemo3".

فضَّلنا إجراء المزامنة بناءً على الكائن taskList، ولكن كان من الممكن إجراؤها بناءً على أي كائنٍ آخر. يُمكِننا في الحقيقة استخدام توابعٍ متزامنة synchronized methods، وهو ما سيُكافِئ المزامنة بناءً على this.

اقتباس

ملاحظة: قد تَجِد استدعاءً للتابع wait()‎ أو notify()‎ داخل تابع نسخة متزامن بدون مرجع للكائن المُستخدَم. تذكَّر أن اِستخدَام wait()‎ أو notify()‎ ضمن هذا السياق يُكافِئ تمامًا اِستخدَام this.wait()‎ أو this.notify()‎.

من الضروري أن يكون استدعاء التابع taskList.clear()‎ مبنيًا على نفس الكائن حتى لو لم نَستدعِي wait()‎ أو notify()‎؛ وإذا لم نَفعَل ذلك، قد تحدث حالة تسابق race condition، ألا وهي: قد تُفرَّغ القائمة بعدما يتأكَّد التابع take()‎ من أن القائمة taskList غير فارغة وقبل أن يحاول حذف عنصرٍ منها. في تلك الحالة، ستُصبِح القائمة فارغة عند لحظة استدعاء taskList.removeFirst()‎ مما سيتسبَّب بحدوث خطأ.

في حالة تواجد عدة خيوطٍ متزامنة بناءً على كائن obj ومُنتظِرةٍ للتنبيه. يُوقِظ التابع obj.notify()‎ عند استدعائه واحدًا فقط من تلك الخيوط المُنتظِرة؛ وإذا أردت أن توقظها جميعًا، ينبغي أن تَستدعِي التابع obj.notifyAll()‎. من المناسب اِستخدَام التابع obj.notify()‎ بالمثال السابق؛ لأن الخيوط المُستهلِكة فقط هي الخيوط المُعطَّلة، ونحن نريد إيقاظ مُستهلِكٍ واحدٍ فقط عند إضافة مهمةٍ إلى الرتل، ولا يُهِم أي مُستهلِكٍ تُسنَد إليه المهمة.

في المقابل، إذا كان لدينا رتلٌ مُعطِّل blocking queue بسعةٍ قصوى، أي أنه قد يُعطِّل المُنتجِين أو المُستهلِكِين؛ فعند إضافة مهمةٍ إلى الرتل، ينبغي التأكُّد من تنبيه خيط مُستهلِكٍ لا خيط مُنتِج، ويُمثِّل استدعاء التابع notifyAll()‎ بدلًا من التابع notify()‎ إحدى حلول تلك المشكلة، لأنه سيُنبِّه جميع الخيوط بما في ذلك أي خيط مُستهلِكٍ مُنتظِر.

قد يعطيك اسم التابع obj.notify()‎ انطباعًا خاطئًا. لا يُنبِّه ذلك التابع الكائن obj بأي شيء، وإنما يُنبِّه الخيط الذي اِستدعَى التابع obj.wait()‎ إذا كان موجودًا. بالمثل، لا ينتظر الكائن obj بالاستدعاء obj.wait()‎ أي شيء، وإنما الخيط المُستدعِي هو من ينتظر.

وفي ملاحظة أخيرة بخصوص wait: هناك نسخةٌ أخرى من التابع wait()‎، وهي تَستقبِل زمنًا بوحدة الميللي ثانية مثل مُعامِل؛ وهنا سينتظر الخيط المُستدعِي للتابع obj.wait(milliseconds)‎ تنبيهًا لفترةٍ تَصِل إلى القيمة الُممرَّرة بحدٍ أقصى؛ وإذا لم يحدث التنبيه خلال تلك الفترة، يستيقظ الخيط ويُكمِل عمله دون تنبيه. تُستخدَم تلك الخاصية عمليًا لتَسمَح لخيطٍ مُنتظِر بالاستيقاظ كل فترة لإنجاز مهمةٍ دوريةٍ معينة، مثل التسبُّب في ظهور رسالة مثل "Waiting for computation to finish".

لنفحص الآن مثالًا يَستخدِم التابعين wait()‎ و notify()‎ ليَسمَح لخيطٍ بالتحكُّم بخيطٍ آخر. يَحِلّ البرنامج التوضيحي TowersOfHanoiGUI.java مسألة أبراج هانوي التي تعرَّضنا لها بالقسم مشكلة أبراج هانوي Hanoi من مقال التعاود recursion في جافا، ويُوفِّر أزرارًا تَسمَح للمُستخدِم بالتحكُّم بتنفيذ الخوارزمية. يَستطيع المُستخدِم مثلًا النقر على زر "Next Step" ليُنفِّذ خطوةً واحدةً من الحل، والتي تُحرِّك قرصًا واحدًا من كومةٍ لأخرى. عند النقر على زر "Run"، تُنفَّذ الخوارزمية أتوماتيكيًا دون تدخُّل المُستخدِم، ويتبدَّل النص المكتوب على الزر من "Run" إلى "Pause". عند النقر على "Pause"، يتوقَّف التشغيل التلقائي. يُوفِّر البرنامج الزر "Start Over"، الذي يُلغي الحل الحالي، ويعيد المسألة إلى حالتها الابتدائية. تَعرِض الصورة التالية شكل البرنامج بإحدى خطوات الحل، ويُمكِنك رؤية الأزرار المذكورة:

002Towers_Of_Hanoi_Gui.png

يوجد خيطان بهذا البرنامج؛ حيث يُنفِّذ الأول خوارزميةً تعاودية recursive لحلّ المسألة؛ ويُعالِج الآخر الأحداث الناتجة عن أفعال المُستخدِم. عندما ينقر المُستخدِم على أحد الأزرار، تُستدعَى إحدى التوابع بخيط معالجة الأحداث، ولكن من يَستجِيب فعليًا للحدث هو الخيط المُنفِّذ للتعاود؛ فقد يُنفِّذ مثلًا خطوةً واحدةً من الحل أو يبدأه من جديد. لا بُدّ أن يُرسِل خيط معالجة الأحداث نوعًا من الإشارة إلى خيط الحل من خلال ضبط قيمة متغيرٍ يتشاركه الخيطان. اسم هذا المتغير بالبرنامج هو status، وقيمه المُحتمَلة هي الثوابت GO و PAUSE و STEP و RESTART.

عندما يُعدِّل خيط معالجة الأحداث قيمة ذلك المتغيّر، لا بُدّ أن يلاحظ خيط الحل القيمة الجديدة للمتغير، ويستجيب على أساسها؛ فإذا كانت قيمة status هي PAUSE، لا بُدّ أن يتوقَّف الخيط بانتظار نَقْر المُستخدِم على زر "Run" أو "Next Step"، ويُمثِّل ذلك الحالة المبدئية عند بدء البرنامج؛ أما إذا نقر المُستخدِم على زر "Next Step"، يَضبُط خيط معالجة الأحداث قيمة status إلى "STEP"، وبالتتابع، لا بُدّ أن يلاحِظ خيط الحل القيمة الجديدة، ويستجيب بتنفيذ خطوةٍ واحدةٍ من الحل، ثم يعيد ضَبْط قيمة status إلى PAUSE مرةً أخرى.

إذا نقر المُستخدِم على زر "Run"، تُضبَط قيمة status إلى "GO"، وينبغي أن يُنفِّذ خيط الحل الخوارزمية أتوماتيكيًا؛ وإذا نقر المُستخدِم على زر "Pause" بينما الحل مُشغَّل، تُضبَط قيمة status إلى "PAUSE"، وينبغي أن يعود خيط الحل إلى حالة الإيقاف؛ أما إذا نقر المُستخدِم على زر "Start Over"، يَضبُط خيط معالجة الأحداث قيمة المُتغيّر status إلى "RESTART"، ولا بُدّ أن يُنهِي خيط الحل حلّه الحالي.

ما يُهمّنا بهذا المثال هو الحالة التي يتوقَّف خلالها خيط الحل؛ حيث يكون الخيط نائمًا في تلك الحالة، ولا يكون بإمكانه رؤية القيمة الجديدة للمتغير status إلا إذا أيقظناه. سنَستخدِم التابع wait()‎ بخيط الحل لجعله ينام، وسنَستخدِم التابع notify()‎ بخيط معالجة الأحداث عندما نُعدِّل قيمة المتغير status لكي نُوقِظ خيط الحل. تعرض الشيفرة التالية التوابع التي تَستجيب لحدث النقر على الأزرار. عندما ينقر المُستخدِم على زر معين، يُعدِّل التابع المقابل لذلك الزر قيمة المتغير status، ثم يَستدعِي التابع notify()‎ لكي يُوقِظ خيط الحل:

synchronized private void doStopGo() {
    if (status == GO) {  // التحريكة مُشغَّلة. أوقفها
        status = PAUSE;
        nextStepButton.setDisable(false);
        runPauseButton.setText("Run");
    }
    else {  // Animation is paused.  Start it running.
        status = GO;
        nextStepButton.setDisable(true);  // يُعطَّل عند تشغيل التحريكة
        runPauseButton.setText("Pause");
    }
    notify();  // أيقظ الخيط ليتمكَّن من رؤية الحالة الجديدة
}

synchronized private void doNextStep() {
    status = STEP;
    notify();
}

synchronized private void doRestart() {
    status = RESTART;
    notify();
}

لاحِظ أن تلك التوابع متزامنة لتَسمَح باستدعاء notify()‎. تذكَّر أنه لا بُدّ للخيط المُستدعِي للتابع notify()‎ ضمن كائنٍ معين أن يكون قد حَصَل على قفل المزامنة المُرتبِط بذلك الكائن. في هذه الحالة، يكون كائن المزامنة هو this. تُعدّ المزامنة ضروريةً لأنه من الممكن أن تحدث حالات التسابق نظرًا لإمكانية خيط الحل أن يُعدِّل قيمة المتغير status.

يَستدعِي خيط الحل تابعًا اسمه checkStatus()‎ ليَفحَص قيمة status؛ فإذا كانت قيمة status تُساوِي "PAUSE"، يَستدعِي ذلك التابع بدوره التابع wait()‎ مما يؤدي إلى توقُّف خيط الحل إلى حين استدعاء خيط معالجة الأحداث للتابع notify()‎. لاحِظ أن التابع checkStatus()‎ يُبلِّغ عن استثناءٍ من النوع IllegalStateException إذا كانت قيمة status تُساوِي "RESTART":

synchronized private void checkStatus() {
    while (status == PAUSE) {
        try {
            wait();
        }
        catch (InterruptedException e) {
        }
    }
    // بالوصول إلى تلك النقطة، تكون الحالة‫ RUN أو STEP أو RESTART
    if (status == RESTART)
        throw new IllegalStateException("Restart");
    // بالوصول إلى تلك النقطة، تكون الحالة‫ RUN أو STEP وينبغي أن يستمر الحل
}

يَضبُط التابع run()‎ الخاص بخيط الحل الحالة المبدئية للمسألة، ثم يَستدعِي التابع solve()‎ لحلها، كما يُنفِّذ حلقةً لا نهائية ليتمكَّن من حل المسألة عدة مرات. يَستدعِي التابع run()‎ التابع checkStatus()‎ قبل أن يبدأ الحل، ويَستدعِي التابع solve()‎ التابع checkStatus()‎ بعد كل حركة. إذا بلَّغ التابع checkStatus()‎ عن استثناءِ من النوع IllegalStateException، يُنهَى استدعاء solve()‎ مبكرًا. كنا قد استخدمنا نفس طريقة التبليغ عن استثناء لإنهاء خوارزميةٍ تعاوديةٍ من قبل بالقسم التعاود داخل الخيوط من المقال السابق.

يُمكِنك الإطلاع على الشيفرة الكاملة للبرنامج TowersOfHanoiGUI.java لترى الطريقة التي دمجنا بها جميع تلك الأجزاء إلى البرنامج النهائي، وسيُعينك فهمه على تعلُم طريقة استخدام wait()‎ و notify()‎ مباشرةً.

ترجمة -بتصرّف- للقسم Section 3: Threads and Parallel Processing من فصل Chapter 12: Threads and Multiprocessing من كتاب Introduction to Programming Using Java.

اقرأ أيضًا


تفاعل الأعضاء

أفضل التعليقات

لا توجد أية تعليقات بعد



انضم إلى النقاش

يمكنك أن تنشر الآن وتسجل لاحقًا. إذا كان لديك حساب، فسجل الدخول الآن لتنشر باسم حسابك.

زائر
أضف تعليق

×   لقد أضفت محتوى بخط أو تنسيق مختلف.   Restore formatting

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   جرى استعادة المحتوى السابق..   امسح المحرر

×   You cannot paste images directly. Upload or insert images from URL.


×
×
  • أضف...