اذهب إلى المحتوى

الخيوط Threads والشبكات في جافا


رضوى العربي

ناقشنا بالمقال السابق عدة أمثلة على البرمجة الشبكية، حيث تعلّمنا طريقة إنشاء اتصالٍ شبكيٍ وطريقة تبادل المعلومات عبر تلك الاتصالات، ولكننا لم نتعامل مع واحدةٍ من أهم خاصيات البرمجة عبر الشبكة، وهي حقيقة كون الاتصال الشبكي غير متزامن asynchronous. بالنسبة لبرنامج معين بأحد طرفي اتصال شبكي معين، قد تصِل الرسائل من الطرف الآخر بأي لحظة؛ أي يكون وصول رسالةٍ معينة حدثًا خارج نطاق تحكُّم البرنامج المُستقبِل للرسالة، ولذلك ربما تكون الاستعانة بواجهة برمجة تطبيقات API مبنيةٍ على الأحداث وخاصةٍ بالشبكات فكرةً جيدةً تُمكِّننا من التعامل مع الطبيعة غير المتزامنة للاتصالات عبر الشبكة؛ ولكن لا تعتمد جافا في الواقع على تلك الطريقة، حيث تَستخدِم برمجة الشبكات بلغة جافا الخيوط threads عمومًا.

مشكلة الدخل والخرج المعطل Blocking I/O

كما ناقشنا في مقال تواصل تطبيقات جافا عبر الشبكة، تَستخدِم برمجة الشبكات المقابس sockets، والتي تشير إلى أحد طرفي اتصال شبكي. يَملُك كل مقبس مجرى دْخَل input stream ومجرى خرج output stream، وتُنقَل البيانات المُرسَلة إلى مجرى الخرج بأحد طرفي الإتصال عبر الشبكة إلى مجرى الدْخَل بالطرف الآخر.

إذا أراد برنامجٌ معينٌ قراءة البيانات من مجرى الدْخَل الخاص بالمقبس، فعليه استدعاء أحد توابع قراءة البيانات من مجرى الدْخَل. قد تكون البيانات قد وصلت بالفعل قبل استدعاء التابع، وفي تلك الحالة يَسترجِع التابع البيانات ويعيدها على الفور. ومع ذلك، قد يضطّر التابع لانتظار وصول البيانات من الطرف الآخر من الاتصال؛ أي يُعطَّل block كُلًا من التابع والخيط الذي استدعاه إلى أن تَصِل البيانات.

قد يتعطَّل أيضًا تابع إرسال البيانات إلى مجرى الخرج الخاص بالمقبس؛ حيث يحدث ذلك إذا حاول البرنامج إرسال البيانات إلى المقبس بسرعةٍ أكبر من سرعة نقل البيانات عبر الشبكة. يَستخدِم المقبس مخزنًا مؤقتًا buffer لحمل البيانات المُفترَض نقلها عبر الشبكة. تذكَّر أن المخزن المؤقت هو مساحةٌ من الذاكرة تَعمَل بطريقةٍ مشابهةٍ للرتل queue. يضع تابع الخرج البيانات بالمخزن المؤقت، ثم يقرأها برنامج منخفض المستوى، ويُرسِلها عبر الشبكة. إذا كان المخزن ممتلئًا، يتعطَّل تابع الخرج إلى أن تتوفَّر مساحةٌ بالمخزن.

اقتباس

ملاحظة: عندما يعود تابع الخرج، لا يَعنِي ذلك أن البيانات قد أُرسِلَت، وإنما يَعنِي فقط أن البيانات قد وُضِعَت بالمخزن تمهيدًا لنقلها لاحقًا.

يَستخدِم الاتصال الشبكي دخلًا وخرجًا مُعطِّلًا؛ لأن عمليات الدْخَل والخَرْج عبر الشبكة قد تتسبَّب بحدوث تعطّيلٍ لفتراتٍ غير محددة، ولذلك لا بُدّ أن تكون البرامج المُستخدِمة للشبكات مهيأةً للتعامل مع ذلك التعطيل. في بعض الحالات، قد يكون إغلاق البرنامج لجميع العمليات الأخرى وانتظار البيانات المطلوبة مقبولًا، مثلما يحدث عندما يُحاوِل برنامج سطر أوامر قراءة البيانات التي ينبغي أن يُدْخِلها المُستخدِم.

في الواقع، تُعدّ مدخلات المُستخدِم نوعًا من الدْخل والخَرْج المُعطِّل. ومع ذلك، تَسمَح الخيوط لبعض أجزاء البرنامج بالاستمرار بالعمل بينما بعض الأجزاء الأخرى مُعطَّلة. على سبيل المثال، ربما يكون من المناسب اِستخدَام خيطٍ واحدٍ مع برنامج عميل client شبكي يُرسِل طلباتٍ إلى خادم server، إذا لم يكن هناك أي شيءٍ ينبغي للبرنامج فعله بينما ينتظر رد الخادم. في المقابل، قد يتصل برنامج خادم شبكي مع عدة عملاء بنفس الوقت، وبالتالي بينما ينتظر الخادم وصول البيانات من العميل، سيكون لديه الكثير ليفعله خلال تلك الفترة، مثل الاتصال مع عملاءٍ آخرين. عندما يَستخدِم الخادم خيوطًا مختلفةً لمعالجة الاتصالات مع العملاء المختلفة، لن يؤدي التعطُّل الناتج عن عميلٍ معين إلى إيقاف تعامل الخادم مع العملاء الآخرين.

يختلف استخدام الخيوط للتعامل مع مشكلة الدْخَل والخَرْج المعُطِّل جذريًا عن استخدامها لتسريع عملية المعالجة. عندما نَستخدِم الخيوط لزيادة سرعة المعالجة -كما ناقشنا بالقسم مجمع الخيوط وأرتال المهام من مقال الخيوط threads والمعالجة على التوازي في جافا السابق- كان من المنطقي استخدام خيطٍ واحدٍ لكل معالج. إذا كان الحاسوب يحتوي على معالج واحد، فلن يزيد اِستخدَام أكثر من خيط ن سرعة المعالجة إطلاقًا، بل قد يُبطئها نتيجةً للحمل الزائد overhead الناتج عن إنشاء الخيوط وإداراتها.

من الجهة الأخرى، إذا كان الهدف هو حل مشكلة الدْخَل والخَرْج المُعطِّل، فسييكون من المنطقي اِستخدَام خيوطٍ بعددٍ أكبر مما يحتويه الحاسوب من معالجات؛ لأن كثيرًا من تلك الخيوط قد يتعطَّل بأي لحظة، وستتنافس بالتالي الخيوط غير المُعطَّلة فقط على زمن المعالجة. إذا أردنا تشغيل جميع المعالجات على الدوام، فلا بُدّ من وجود خيطٍ واحدٍ نشطٍ لكل معالج (في الواقع، أقل من ذلك بعض الشيء لنَسمَح بالاختلافات بين عدد الخيوط النشطة من فترةٍ لأخرى). تقضِي الخيوط ببرامج الخوادم مثلًا معظم الوقت مُعطَّلة بانتظار اكتمال عمليات الدخل والخرج؛ فإذا كانت الخيوط مُعطَّلةً بنسبة 90% من الوقت مثلًا، فسنحتاج إلى خيوطٍ بعددٍ يُقارِب عشرة أضعاف عدد المعالجات؛ أي تستطيع الخوادم الاستفادة من اِستخدَام عددٍ كبيرٍ من الخيوط حتى لو كان الحاسوب يحتوي على معالجٍ واحد.

برنامج محادثة عبر الشبكة غير متزامن

سنفحص الآن المثال الأول على استخدام الخيوط ببرامج الاتصال الشبكي، وسيكون برنامج محادثة بواجهة مُستخدِم رسومية GUI.

اِستخدَمنا بروتوكولًا للاتصال في برامج المحادثة عبر سطر الأوامر CLChatClient.java و CLChatServer.java في مقال تواصل تطبيقات جافا عبر الشبكة المشار إليه بالأعلى؛ حيث بعدما يُدْخِل المُستخدِم بأحد طرفي الاتصال رسالةً، لا بُدّ له أن ينتظر ردًا من الطرف الآخر. سيكون من الأفضل لو أنشأنا برنامج محادثة غير متزامن asynchronous، حيث سيتمكَّن المُستخدِم من الاستمرار بكتابة الرسائل وإرسالها دون انتظار أي ردودٍ من الطرف الآخر، كما ستُعرَض الرسائل الآتية من الطرف الآخر بمجرد وصولها. في الواقع، من الصعب تنفيذ ذلك ببرنامج سطر أوامر، ولكنه بديهي ببرنامجٍ ذي واجهة مُستخدِم رسومية.

تتمثل فكرة البرنامج باختصار في إنشاء خيطٍ وظيفته قراءة الرسائل القادمة من الطرف الآخر من الاتصال، وتُعرَض للمُستخدِم بمجرد وصولها، ثم يُعطَّل خيط قراءة الرسائل إلى أن تَصِل رسائلٌ أخرى. تستطيع الخيوط الأخرى الاستمرار بالعمل بينما ذلك الخيط مُعطَّل؛ حيث سيتمكَّن خيط معالجة الأحداث events بالتحديد من الاستجابة لما يفعله المُستخدِم، وسيتمكَّن بالتالي من إرسال الرسائل بعد أن يكتبها المُستخدِم مباشرةً.

يَعمَل برنامج "GUIChat" مثل عميلٍ أو خادم.و يحتوي البرنامج على زر "Listen"، وعندما ينقر عليه المُستخدِم، يُنشِئ البرنامج مقبس خادم يَستمِع إلى طلبات الاتصال القادمة، وبالتالي يَعمَل البرنامج خادمًا. يحتوي البرنامج أيضًا على زر "Connect"، وعندما يَنقُر عليه المُستخدِم، يُرسِل البرنامج طلب اتصال، وبالتالي يَعمَل البرنامج عميلًا. يَستمِع الخادم كما جرت العادة إلى رقم منفذٍ port number معين، ولا بُدّ أن يعرف العميل الحاسوب الذي يَعمَل عليه الخادم ورقم المنفذ الذي يَستمِع إليه. تحتوي نافذة البرنامج "GUIChat" على صناديق إدخال تَسمَح للمُستخدِم بإدخال تلك المعلومات.

بمجرد بدء الاتصال بين برنامجي "GUIChat"، يستطيع المُستخدِم الموجود بأي طرفٍ من طرفي الاتصال إرسال الرسائل إلى المُستخدِم بالطرف الآخر. تحتوي النافذة على صندوق إدخالٍ يَسمَح للمُستخدِم بكتابة الرسائل، ثم النقر على زر العودة الى بداية السطر لإرسالها. يستجيب خيط معالجة الأحداث إلى الحدث الناشئ عن كتابة الرسالة بإرسالها. وفي المقابل، تُستقبَل الرسائل القادمة بخيطٍ منفصلٍ جُلّ ما يفعله هو انتظار الرسائل القادمة. يكون ذلك الخيط مُعطَّلًا أثناء انتظاره؛ وعند وصول رسالة، يَعرِضها للمُستخدِم. تحتوي نافذة البرنامج على مساحةٍ كبيرةٍ لِعرض الرسائل المُرسَلة والمُستقبَلة إلى جانب بعض المعلومات الأخرى عن الاتصال الشبكي.

يُفضَّل أن تُصرِّف compile شيفرة البرنامج GUIChat.java، وتُجرِّبه. إذا أردت تجريبه على حاسوبٍ واحد، يُمكِنك تشغيل نسختين من البرنامج على نفس الحاسوب، وإنشاء الاتصال بين نافذتي البرنامج باستخدام "localhost" أو "127.0.0.1" على أنه اسمٌ للحاسوب. حاوِل أيضًا قراءة الشيفرة المصدرية للبرنامج. سنناقش بعضها فقط فيما يلي.

يَستخدِم البرنامج الصنف المُتداخِل ConnectionHandler المُشتَق من الصنف Thread لمعالجة معظم المهام المُتعلّقة بالشبكة، حيث يتولى الخيط المُمثَّل بواسطة الصنف ConnectionHandler مهمة إنشاء الاتصال الشبكي، وقراءة الرسائل القادمة بعد فتح الاتصال. نضمَن عدم تعطُّل واجهة المُستخدِم الرسومية أثناء إنشاء الاتصال بوضعنا الشيفرة المسؤولة عن إنشاء الاتصال واستقبال الرسائل بخيطٍ منفصل. يُعدّ فتح الاتصال عمليةً مُعطِّلة، مثل عملية قراءة الرسائل، وقد تَستغرِق بعض الوقت.

يُنشِئ الصنف ConnectionHandler الاتصال بكلا الحالتين؛ أي عندما يَعمَل البرنامج خادمًا أو عميلًا. يُنشَأ ذلك الخيط عندما ينقر المُستخدم على زر "Listen" أو زر "Connect"؛ حيث يؤدي زر "Listen" إلى عمل البرنامج على أنه خادم؛ بينما يؤدي زر"Connect" إلى عمله عميلًا. يُميّز الصنف ConnectionHandler بين تلك الحالتين بتعريف بانيين constructors معروضين بالأسفل. يَعرِض التابع postMessage()‎ رسالةً بالمساحة النصية الموجودة بالنافذة لكي يراها المُستخدِم. ألقِ نظرةً على تعريف كلا البانيين:

// 1
ConnectionHandler(int port) {  // For acting as the "server."
   state = ConnectionState.LISTENING;
   this.port = port;
   postMessage("\nLISTENING ON PORT " + port + "\n");
   try { setDaemon(true); }
   catch (Exception e) {}
   start();
}

// 2
ConnectionHandler(String remoteHost, int port) {  // For acting as "client."
   state = ConnectionState.CONNECTING;
   this.remoteHost = remoteHost;
   this.port = port;
   postMessage("\nCONNECTING TO " + remoteHost + " ON PORT " + port + "\n");
   try { setDaemon(true); }
   catch (Exception e) {}
   start();
}

حيث:

  • [1]: تعني اِستمِع إلى طلب اتصالٍ برقم منفَّذٍ مُخصَّص. لا يفعل الباني أي عملياتٍ شبكية، وإنما يضبُط فقط بعض متغيرات النسخ ويُشغِّل الخيط. يَستمِع الخيط إلى طلب اتصالٍ واحدٍ فقط، ثم يَغلِق مقبس الخادم.
  • [2]: تعني  اِفتح اتصال مع الحاسوب برقم المنفَّذ المُخصَّص. لا يفعل الباني أي عمليات شبكية، وإنما يضبُط فقط بعض متغيرات النسخ ويُشغِّل الخيط.

لاحِظ أن state هو متغير نسخة instance variable من نوع التعداد enumerated type التالي:

enum ConnectionState { LISTENING, CONNECTING, CONNECTED, CLOSED };

تُمثِّل قيم ذلك التعداد الحالات المختلفة للاتصال الشبكي. يُفضَّل غالبًا التعامل مع الاتصال الشبكي مثل آلة حالة state machine (انظر قسم آلات الحالة من مقال تعرف على أهم الأحداث والتعامل معها في مكتبة جافا إف إكس JavaFX)؛ نظرًا لاعتماد الاستجابة إلى الأحداث المختلفة عادةً على الحالة التي كان عليها الاتصال عند وقوع الحدث. يُحدِّد الخيط ما إذا كان عليه التصرف مثل خادمٍ أو عميل أثناء إنشاء الاتصال من خلال ضبط قيمة المتغير state إلى القيمة LISTENING أو CONNECTING.

بمجرد بدء تشغيل الخيط، يُنفَّذ التابع run()‎ المُعرَّف على النحو التالي:

// 1
public void run() {
   try {
      if (state == ConnectionState.LISTENING) {
            // أنشِئ اتصالًا بصفة خادم
         listener = new ServerSocket(port);
         socket = listener.accept();
         listener.close();
      }
      else if (state == ConnectionState.CONNECTING) {
            // أنشِئ اتصالًا بصفة عميل
         socket = new Socket(remoteHost,port);
      }
      connectionOpened();  // 2
      while (state == ConnectionState.CONNECTED) {
            // 3
         String input = in.readLine();
         if (input == null)
            connectionClosedFromOtherSide(); // أغلق المقبس وبلِّغ المُستخدِم
         else
            received(input);  // بلِّغ الرسالة إلى المُستخدم
      }
   }
   catch (Exception e) {
         // 4
      if (state != ConnectionState.CLOSED)
         postMessage("\n\n ERROR:  " + e);
   }
   finally {  // Clean up before terminating the thread.
      cleanUp();
   }
}

حيث أن:

  • [1]: يُستدعَى التابع run()‎ بواسطة الخيط. يَفتَح التابع اتصالًا مثل خادمٍ أو عميل بالاعتماد على نوع الباني المُستخدَم.
  • [2]: جهِّز الاتصال بما في ذلك إنشاء كائن من الصنف BufferedReader لقراءة الرسائل القادمة.
  • [3]: اقرأ سطرًا نصيًا واحدًا من الطرف الآخر من الاتصال وبلِّغه للمُستخدِم.
  • [4]: حدث خطأ. بلِّغ المُستخدِم إذا لم يكن قد أغلق الاتصال، لأنه قد يكون الخطأ المُتوقَّع حدوثه عند غلق مقبس الاتصال.

يَستدعِي هذا التابع مجموعةً من التوابع الأخرى لإنجاز بعض الأمور، ولكن بإمكانك فهم الفكرة العامة لطريقة عمله. بعد فتح الاتصال مثل خادمٍ أو عميل، يُنفِّذ التابع run()‎ حلقة while يَستقبِل خلالها الرسائل من الطرف الآخر من الاتصال ويعالجها إلى أن يُغلَق الاتصال. من المهم فهم طريقة غلق الاتصال؛ حيث تُوفِّر نافذة البرنامج GUIChat الزر "Disconnect"، وعندما ينقر عليه المُستخدِم، يُغلَق الاتصال. يَستجِيب البرنامج إلى ذلك الحدث بغلق المقبس المُمثِّل للاتصال، وكذلك بضبط حالة الاتصال إلى CLOSED.

في تلك الأثناء، قد يكون خيط معالجة الاتصال مُعطَّلًا بانتظار الرسالة التالية نتيجة لاستدعائه التابع in.readLine()‎. عندما يُغلِق خيط واجهة المُستخدِم الرسومية المقبس، يفشل ذلك التابع ويُبلِّغ عن استثناءٍ exception يتسبَّب بانتهاء الخيط. إذا كان خيط معالجة الاتصال واقعًا بين لحظتي استدعاء التابع in.readLine()‎ عند غلق المقبس، تنتهي حلقة while لأن حالة الاتصال تتبدَّل من CONNECTED إلى CLOSED. يؤدي غلق نافذة البرنامج إلى غلق الاتصال بنفس الطريقة.

علاوةً على ذلك، قد يُغلَق الاتصال بواسطة المُستخدِم على الطرف الآخر، ويُغلَق في تلك الحالة مجرى الرسائل القادمة، ويُعيد التابع in.readLine()‎ بهذا الطرف من الاتصال القيمة null، وهو ما يُشير إلى نهاية المجرى، ويُمثِّل إشارةً إلى أن المُستخدِم الآخر قد أغلق الاتصال.

سنُلقِي نظرةً أخيرة على شيفرة البرنامج GUIChat وخاصةً على التوابع المسؤولة عن إرسال الرسائل واستقبالها. يَستدعِي الخيطان تلك التوابع؛ حيث يَستدعِي خيط معالجة الأحداث التابع send()‎ بغرض إرسال رسالةٍ إلى المُستخدِم على الطرف الآخر من الاتصال استجابةً لفِعِل المُستخدِم. قد تؤدي عملية إرسال البيانات إلى حدوث تعطيل -وإن كان احتمال ذلك ضئيلًا- إذا أصبح المخزن المؤقت buffer الخاص بالمقبس ممتلئًا. ربما يأخذ برنامج أكثر تطورًا ذلك الاحتمال بالحسبان. يَستخدِم التابع كائنًا من النوع PrintWriter، اسمه هو out، لإرسال الخرج إلى مجرى الخرج الخاص بالمقبس. لاحِظ أن التابع متزامنٌ ليَمنَع تعديل حالة الاتصال أثناء إجراء عملية الإرسال. انظر شيفرة التابع:

// 1
synchronized void send(String message) {
   if (state == ConnectionState.CONNECTED) {
      postMessage("SEND:  " + message);
      out.println(message);
      out.flush();
      if (out.checkError()) {
         postMessage("\nERROR OCCURRED WHILE TRYING TO SEND DATA.");
         close();  // أغلِق الاتصال
      }
   }
}

وتعني [1]: يُرسِل رسالةً إلى الطرف الآخر من الاتصال ويعرضها على الشاشة. ينبغي أن يُستدعَى هذا التابع عندما تكون حالة الاتصال ConnectionState.CONNECTED فقط. يتجاهل التابع الاستدعاء بالحالات الأخرى.

يَستدعِي خيط معالجة الأحداث التابع received()‎ بعد قراءة رسالةٍ من المُستخدِم على الطرف الآخر من الاتصال، حيث يعرِض التابع الرسالة للمُستخدِم. لاحِظ أن التابع متزامن لتجنُّب حالة التسابق race condition التي يُمكِنها أن تحدُث إذا بدَّل خيطٌ آخر حالة الاتصال بينما ما يزال التابع قيد التنفيذ. ألقِ نظرةً على شيفرة التابع:

// 1
synchronized private void received(String message) {
   if (state == ConnectionState.CONNECTED)
      postMessage("RECEIVE:  " + message);
}

[1] يُستدعَى هذا التابع بواسطة التابع run()‎ عند استقبال رسالةٍ من الطرف الآخر من الاتصال. يُظهِر التابع الرسالة على الشاشة إذا كانت حالة الاتصال CONNECTED فقط؛ لأنها قد تَصِل بعد أن ينقر المُستخدم على زر "Disconnect"، وبالتالي لا ينبغي أن يراها المُستخدِم.

خادم شبكي متعدد الخيوط

تُستخدَم الخيوط عادةً ببرامج الخوادم؛ لأنها تَسمَح للخادم بالتعامل مع مجموعةٍ من العملاء بنفس الوقت. عندما يبقى عميلٌ معينٌ متصلًا لفترةٍ طويلةٍ نسبيًا، لا ينبغي أن يضطّر العملاء الآخرون إلى انتظار الخدمة، وحتى إن كان من المتوقَّع أن يكون تعامل الخادم مع كل عميلٍ قصيرًا للغاية، حيث لا يُمكِننا أن نفترض أن ذلك سيكون هو الحال دائمًا؛ فقد يُسيء عميلٌ معينٌ التصرف، ويبقى متصلًا دون أن يُرسِل البيانات التي يتوقَّعها الخادم؛ لذلك ينبغي أن نأخذ تلك الاحتمالية بالحسبان، لأننا إن لم نفعل، فإنه قد يؤدي إلى غلق الخيط تمامًا. في المقابل، إذا اعتمد الخادم على الخيوط، سيكون هناك خيوطٌ أخرى بإمكانها الرد على العملاء الآخرين.

يُعدّ المثال التوضيحي DateServer.java من مقال تواصل تطبيقات جافا عبر الشبكة برنامج خادم بسيطٍ للغاية؛ فهو لا يَستخدِم الخيوط، ولذلك لا بُدّ أن ينتهي الخادم من اتصاله مع عميلٍ معين قبل أن يَقبَل اتصالًا من عميلٍ آخر. سنرى الآن كيفية تحويل ذلك البرنامج إلى خادمٍ مبني على الخيوط. في الواقع، يُعَد هذا الخادم بسيطًا جدًا لدرجة أن اِستخدَام الخيوط معه لا معنى له، ولكن الفكرة أنه من الممكن تطبيق نفس تلك التقنيات على الخوادم الأكثر تعقيدًا. ألقِ نظرةً على تمرين 12.5 على سبيل المثال.

اقتباس

ملاحظة: لا يحتاج برنامج العميل DateClient.java المقابل لبرنامج الخادم إلى استخدام الخيوط؛ لأن العميل يَستخدِم اتصالًا واحدًا. سيَعمَل برنامج العميل الأصلي مع النسخة الجديدة من برنامج الخادم.

لنناقش الآن المحاولة الأولى من البرنامج DateServerWithThreads.java، حيث يُنشِئ هذا البرنامج خيطًا جديدًا بكل مرة يَستقبِل خلالها طلب اتصال بدلًا من مُعالجة الاتصال بنفسه باستدعاء برنامجٍ فرعي subroutine. يُنشِئ البرنامج main الخيط ويُمرِّر إليه الاتصال. يَستغرِق ذلك وقتًا قصيرًا للغاية، والأهم أنه لن يتسبَّب بحدوث تعطيل. في المقابل، يُعالِج تابع الخيط run()‎ الاتصال بنفس الكيفية التي عالجنا بها الاتصال بالبرنامج الأصلي. تُعد برمجة ذلك سهلة، ويمكنك إلقاء نظرةٍ على النسخة الجديدة من البرنامج بعد إجراء التعديلات؛ حيث يمكننا ملاحظة أن الباني constructor الخاص بخيط الاتصال لا يَفعَل شيئًا تقريبًا، ولا يُسبِّب تعطيلًا، وهذا أمرٌ مهم؛ لأنه يُنفَّذ بالخيط الرئيسي:

import java.net.*;
import java.io.*;
import java.util.Date;

// 1
public class DateServerWithThreads {

    public static final int LISTENING_PORT = 32007;

    public static void main(String[] args) {

        ServerSocket listener;  // اِستمِع لطلبات الاتصال القادمة
        Socket connection;      // للتواصل مع البرنامج المتصِل

    // استمر باستقبال طلبات الاتصال ومعالجتها للأبد إلى أن يحدث خطأ ما

        try {
            listener = new ServerSocket(LISTENING_PORT);
            System.out.println("Listening on port " + LISTENING_PORT);
            while (true) {
        // اِقبَل طلب الاتصال التالي وأنشِئ خيطًا لمعالجته
                connection = listener.accept(); 
                ConnectionHandler handler = new ConnectionHandler(connection);
                handler.start();
            }
        }
        catch (Exception e) {
            System.out.println("Sorry, the server has shut down.");
            System.out.println("Error:  " + e);
            return;
        }

    }  // end main()


    // 2
    private static class ConnectionHandler extends Thread {
        Socket client; // يُمثِّل اتصالًا مع عميل
        ConnectionHandler(Socket socket) {
            client = socket;
        }
        public void run() {
                 // (code copied from the original DateServer program)
            String clientAddress = client.getInetAddress().toString();
            try {
                System.out.println("Connection from " + clientAddress );
                Date now = new Date();  // التوقيت والتاريخ الحالي
                PrintWriter outgoing;   // مجرى لإرسال البيانات
                outgoing = new PrintWriter( client.getOutputStream() );
                outgoing.println( now.toString() );
                outgoing.flush();  // تأكّد من إرسال البيانات
                client.close();
            }
            catch (Exception e){
                System.out.println("Error on connection with: " 
                        + clientAddress + ": " + e);
            }
        }
    }


} //end class DateServerWithThreads

إذ أن:

  • [1]: يمثّل هذا البرنامج خادمًا يَستقبِل طلبات الاتصال برقم المنفذ الذي يُخصِّصه الثابث LISTENING_PORT. يرسِل البرنامج عند فتح اتصال التوقيت الحالي إلى المقبس المتصِل، ويستمر باستقبال طلبات الاتصال ومعالجتها إلى أن يُغلَق بالضغط على CONTROL-C على سبيل المثال. تُنشِئ هذه النسخة من البرنامج خيطًا جديدًا لكل طلب اتصال.
  • [2]: يُعرِّف خيطًا لمعالجة الاتصال مع عميلٍ واحد.

انظر إلى نهاية التابع run()‎، حيث أضفنا clientAddress إلى رسالة الخطأ؛ لنتمكَّن من معرفة الاتصال الذي تشير إليه رسالة الخطأ. نظرًا لأن الخيوط تُنفَّذ على التوازي، قد يَختلِط خرج الخيوط المختلفة؛ أي ليس من الضروري أن تَظهَر الرسائل الخاصة بخيطٍ معين معًا، وإنما قد يَفصِل بينها رسائلٌ من خيوطٍ أخرى. يُمثِّل ذلك واحدًا فقط من ضمن التعقيدات الكثيرة التي ينبغي الانتباه لها عند تعاملنا مع الخيوط.

استخدام مجمع الخيوط

لا يُعدّ إنشاء خيطٍ جديدٍ لكل اتصال الحل الأفضل، خاصةً إذا كانت تلك الاتصالات قصيرة. لحسن الحظ، يُمكِننا أن نَستخدِم مُجمّع خيوط الذي ناقشناه بقسم مجمع الخيوط وأرتال المهام من المقال السابق.

يُعدّ البرنامج DateServerWithThreadPool.java نسخةً مُحسنَّةً من الخادم؛ حيث يَستخدِم مجمع خيوط. يُنفِّذ كل خيطٍ ضمن ذلك المُجمّع حلقة تكرار لا نهائية infinite loop يُعالِج كل تكرارٍ منها اتصالًا. علينا أن نَجِد طريقةً تُمكِّن البرنامج main من إرسال الاتصالات إلى الخيوط. ومن البديهي أن نَستخدِم رتلًا مُعطِّلًا blocking queue لذلك الغرض، وسيكون اسمه هو connectionQueue. تقرأ خيوط معالجة الاتصال الاتصالات من الرتل؛ ونظرًا لكونه رتلًا مُعطِّلًا، تتعطَّل الخيوط عندما يكون الرتل فارغًا، وتستيقظ عند إتاحة اتصالٍ جديدٍ بالرتل. لن نحتاج إلى أي تقنيات مزامنة أو اتصال أخرى؛ فكل شيءٍ مبنيٌ مُسبقًا بالرتل المُعطِّل. تعرض الشيفرة التالية التابع run()‎ الخاص بخيوط معالجة الاتصال:

public void run() {
    while (true) {
        Socket client;
        try {
            client = connectionQueue.take();  // تعطَّل إلى أن يُتاح عنصر جديد
        }
        catch (InterruptedException e) {
            continue; // إذا قُوطِعَ، عدّ إلى بداية حلقة التكرار
        }
        String clientAddress = client.getInetAddress().toString();
        try {
            System.out.println("Connection from " + clientAddress );
            System.out.println("Handled by thread " + this);
            Date now = new Date();  // التاريخ والتوقيت الحالي
            PrintWriter outgoing;   // مجرى لإرسال البيانات
            outgoing = new PrintWriter( client.getOutputStream() );
            outgoing.println( now.toString() );
            outgoing.flush();  // تأكّد من إرسال البيانات فعليًا
            client.close();
        }
        catch (Exception e){
            System.out.println("Error on connection with: " 
                    + clientAddress + ": " + e);
        }
    }
}

يُنفِّذ البرنامج main()‎ حلقة تكرارٍ لا نهائية تَستقبِل الاتصالات وتُضيفها إلى الرتل:

while (true) {
    // اقبل طلب الاتصال التالي وأضِفه إلى الرتل
    connection = listener.accept();
    try {
        connectionQueue.put(connection); // تعطَّل إذا كان الرتل ممتلئًا
    }
    catch (InterruptedException e) {
    }
}

لاحِظ أن الرتل بهذا البرنامج من النوع ArrayBlockingQueue<Socket>‎؛ أي أن لديه سعةً قصوى، وبالتالي إذا كان الرتل ممتلئًا، سيؤدي تنفيذ عملية put()‎ إلى حدوث تعطيل. ولكن ألسنا نريد تجنُّب حدوث أي تعطيلٍ بالبرنامج main()‎؟ لأنه إذا تعطُّل، فلن يستقبل الخادم أي اتصالاتٍ أخرى، ويضطّر العملاء الذي يحاولون الاتصال إلى الانتظار. أليس من الأفضل اِستخدام الصنف LinkedBlockingQueue بسعةٍ غير محدودة؟

في الحقيقة، تُعدّ الاتصالات الموجودة بالرتل المُعطِّل قيد الانتظار على أية حال، أي أنها لم تُخدَّم بعد؛ وإذا ازداد حجم الرتل بدرجةٍ غير معقولة، فستضطّر الاتصالات الموجودة بالرتل إلى الانتظار إلى فتراتٍ غير معقولةٍ أيضًا. وبالتالي، إذا ازداد حجم الرتل إلى ما لانهاية، لا يَعنِي ذلك سوى أن الخادم يَستقبِل طلبات اتصال بسرعةٍ أكبر من سرعة قدرته على معالجة تلك الاتصالات.

قد يحدث ذلك لعدة أسباب: ربما يكون خادمك غير قوي كفاية لمعالجة كمية الاتصالات المطلوبة، وينبغي في تلك الحالة أن تشتري خادمًا جديدًا؛ أو ربما بسبب عدم احتواء مجمع الخيوط على عددٍ كافٍ من الخيوط بدرجة تَسمَح بالاستفادة من إمكانيات الخادم بالكامل، وينبغي في تلك الحالة زيادة حجم مجمع الخيوط ليتوافق مع إمكانيات الخادم؛ أو ربما يتعرَّض الخادم لهجوم الحرمان من الخدمات Denial Of Service، أي أن هناك شخصٌ معينٌ يحاول متعمّدًا إرسال طلبات اتصال إلى الخادم بقدرٍ أكبر مما يَستطيع احتماله في محاولة لمنع العملاء من الوصول إلى الخدمة.

في جميع الأحوال، يُعدّ الصنف ArrayBlockingQueue ذو السعة المحدودة هو الخيار الأصح. ينبغي أن يكون الرتل قصيرًا حتى لا تضطّر الإتصالات الموجودة به إلى الانتظار لفتراتٍ طويلة قبل وصولها إلى الخدمة. بالنسبة للخوادم الحقيقية، لا بُدّ من ضبط كُلٍ من حجم الرتل وعدد خيوط المجمع بما يتناسب مع الخادم؛ كما ينبغي أن يُؤخَذ العتاد والشبكة التي يَعمَل عليها الخادم بالحسبان؛ وكذلك طبيعة طلبات العملاء التي يُفترَض معالجتها، وهو ما يُمثِّل تحديًا صعبًا عمومًا.

بالمناسبة، هناك احتماليةٌ أخرى قد تسوء الأمور نتيجة لها: لنفترض أن الخادم يحتاج إلى قراءة بعض البيانات من العميل، ولكن العميل لا يرسل أية بيانات. في تلك الحالة، قد يتعطَّل الخيط الذي يحاول قراءة البيانات إلى الأبد بانتظار المُدْخَلات. إذا كنا نَستخدِم مجمع خيوط، فقد يَحدُث ذلك لجميع الخيوط الموجودة بالمجمع، وعندها، لن تحدث أي معالجةٍ أخرى. يتمثل حل تلك المشكلة بإيقاف الاتصال إذا بقي غير نشطٍ لفترةٍ طويلة من الوقت.

ينتبه كل خيط اتصال عادةً إلى آخر توقيت استقبل به بياناتٍ من العميل. يُشغِّل الخادم خيطًا آخر يُطلَق عليه أحيانًا اسم "خيط reaper" تيمُنًا بفرقة "Grim Reaper"، ويوقظه دوريًا؛ حتى يَفحَص خيوط الاتصال، ويرى فيما إذا كان هناك خيطٌ غير نشط لفترةٍ طويلة. إذا بقي خيطٌ قيد الانتظار لفترة طويلة، فإنه يُنهَى، ويحلّ محله خيطٌ جديد. تُمثِّل الإجابة على سؤال "كم من الوقت ينبغي أن تكون تلك الفترة الطويلة؟" تحديًا آخر.

الحوسبة الموزعة

لقد رأينا طريقة اِستخدام الخيوط لإجراء المعالجة على التوازي؛ حيث تَعمَل عدة معالجات معًا لإنجاز مهمةٍ معينة. كان افتراضنا حتى الآن هو أن جميع تلك المعالجات موجودةٌ ضمن حاسوبٍ واحد مُتعدِّد المعالجات، ولكن يُمكِن إجراء المعالجة على التوازي باستخدام معالجاتٍ موجودة بحواسيبٍ مختلفة شرط أن تكون تلك الحواسيب متصلةً عبر شبكةٍ معينة؛ ويُطلَق على هذا النوع من المعالجة المتوازية -أي أن تَعمَل عدة حواسيب معًا لإنجاز مهمةٍ عبر الشبكة- اسم الحوسبة المُوزَّعة distributed computing.

يُمكِننا أن ننظر لشبكة الانترنت وكأنها نموذجٌ ضخمٌ لحوسبةٍ موزَّعة، ولكننا في الواقع معنيون بالكيفية التي يمكن أن تتعاون من خلالها الحواسيب المتصلة عبر شبكةٍ على حل مشكلة حوسبية. هناك طرائقٌ متعددة للحوسبة الموزّعة، وتدعم جافا بعضًا منها. على سبيل المثال، تُمكِّن كُلًا من تقنية استدعاء التوابع عن بعد Remote Method Invocation - RMI وتقنية كوربا Common Object Request Broker Architecture - CORBA برنامجًا مُشغَّلًا على حاسوبٍ معين من استدعاء توابع كائناتٍ موجودة بحواسيبٍ اخرى. يَسمَح ذلك بتصميم برنامجٍ كائني التوجه object-oriented تُنفَّذ أجزاءه المختلفة بحواسيب مختلفة. تدعم RMI الاتصال بين كائنات جافا فقط؛ بينما تُعدّ CORBA المعيار الأكثر عمومية، حيث تَسمَح للكائنات المكتوبة بأي لغة برمجية، بما في ذلك جافا، بالتواصل مع بعضها بعضًا.

كما هو الحال مع الشبكات، لدينا مشكلة تحديد موقع الخدمات (المقصود بالخدمة هنا هو الكائن المُتاح للاستدعاء عبر الشبكة)، أي كيف يستطيع حاسوبٌ معين معرفة الحاسوب الذي تَعمَل عليه خدمةٌ معينة ورقم المنفذ الذي تَستمِع إليه؟ تحلّ تقنيتي "RMI" و "CORBA" تلك المشكلة باستخدام "وسيط طلب request broker"؛ حيث يمثّل ذلك الوسيط برنامج خادم يَعمَل بعنوانٍ معروف، ويحتوي على قائمة الخدمات المتاحة بالحواسيب الأخرى، وتُبلِّغ الحواسيب المُقدِّمَة لخدمات الوسيط بخدماتها. ينبغي أن تعرف الحواسيب التي تحتاج خدمةً معينةً عنوان الوسيط للتواصل معه وسؤاله عن الخدمات المُتاحة وعناوينها.

تُعدّ "RMI" و "COBRA" أنظمةً معقدة وليس من السهل تمامًا استخدامها، وذُكرت هنا لأنها جزءٌ من واجهة برمجة تطبيقات API جافا للشبكات القياسية، ولكننا لن نناقشها إلى أبعد من ذلك. بدلًا منها، سنفحص مثالًا أبسط على الحوسبة المُوزَّعة يَستخدِم أساسيات الشبكات فقط.

يتشابه هذا المثال مع ذلك الذي اِستخدمناه بالبرنامج MultiprocessingDemo1.java ونُسخه المختلفة بالمقالين البرمجة باستخدام الخيوط threads في جافا والمقال السابق، وهو مثال حساب ألوان صورةٍ معقدة، ولكننا لن نُنشِئ هنا برنامجًا بواجهة مُستخدِم رسومية، ولن نعرض الصورة على الشاشة. تَستخدِم عملية المعالجة أبسط أنواع المعالجة على التوازي بتقسيم المشكلة إلى عدة مهام يُمكِن تنفيذها باستقلالية دون الاتصال مع المهام الأخرى.

لتطبيق الحوسبة المُوزَّعة على هذا النوع من المشكلات، سنَستخدِم برنامجًا رئيسيًا master يتولى مسؤولية تقسيم المشكلة إلى عدة مهام وإرسالها عبر الشبكة إلى البرامج العاملة worker لتُنفِّذها. ينبغي أن تُرسِل تلك البرامج نتائجها إلى البرنامج الرئيسي الذي يَدمِج نتائج جميع المهام معًا ليكوِّن حلًا لكامل المشكلة. يُطلَق على البرامج العاملة ضمن هذا السياق اسم "البرامج التابعة slaves"، ويُقال أن البرنامج يَستخدِم أسلوب "البرنامج الرئيسي والبرنامج التابع master/slave" لإجراء الحوسبة المُوزَّعة.

يتكوَّن هذا البرنامج من ثلاثة ملفات؛ حيث يُعرِّف الملف CLMandelbrotMaster.java البرنامج الرئيسي، بينما يُعرِّف الملف CLMandelbrotWorker.java البرامج العاملة التابعة؛ وأخيرًا يُعرِّف الملف CLMandelbrotTask.java الصنف CLMandelbrotTask الذي يُمثِّل المهمة التي تُنفِّذها البرامج العاملة.

يُقسِّم البرنامج الرئيسي master المشكلة إلى عدة مهام، ثم يُوزِّعها على البرامج العاملة، التي تُنفِّذ تلك المهام، ثم تُعيد النتائج إلى البرنامج الرئيسي. ويُطبِّق البرنامج الرئيسي بالنهاية نتائج جميع المهام المفردة على المشكلة الأساسية.

لتشغيل البرنامج، سنبدأ أولًا بتشغيل البرنامج "CLMandelbrotWorker" على عدّة حواسيب (ربما بتنفيذها بسطر الأوامر). يَستخدِم ذلك البرنامج الصنف CLMandelbrotTask، ولذلك يجب أن يكون كل من الملف CLMandelbrotWorker.class والملف CLMandelbrotTask.class متوفرين بالحواسيب المُمثِّلة للبرامج العاملة. بعد ذلك، سنُشغِّل البرنامج المسمى "CLMandelbrotMaster" على الحاسوب المُمثِل للبرنامج الرئيسي، مع ملاحظة أنه يحتاج إلى الصنف CLMandelbrotTask أيضًا.

يجب أن نُمرِّر اسم أو عنوان بروتوكول الانترنت IP address الخاص بجميع الحواسيب العاملة إلى البرنامج المسمى "CLMandelbrotMaster" مثل وسطاءٍ عبر سطر الأوامر. تستمع البرامج العاملة إلى طلبات الاتصال القادمة من البرنامج الرئيسي، ولهذا يجب أن يعرف البرنامج الرئيسي العناوين التي سيُرسِل إليها تلك الطلبات. على سبيل المثال، إذا كان البرنامج العامل مُشغَّلًا على ثلاثة حواسيب عناوينها هي: 172.21.7.101 و 172.21.7.102 و 172.21.7.103، يُمكِننا أن نُشغِّل البرنامج "CLMandelbrotMaster" بكتابة الأمر التالي:

java  CLMandelbrotMaster  172.21.7.101  172.21.7.102  172.21.7.103

سينُشِئ البرنامج الرئيسي اتصالًا شبكيًا مع البرنامج العامل بكل عنوان بروتوكول إنترنت، وستُستخدَم تلك الاتصالات للتواصل بين كُلٍ من البرنامج الرئيسي والبرامج العاملة.

يُمكِننا تشغيل عدة نسخٍ من البرنامج "CLMandelbrotWorker" على نفس الحاسوب، ولكن ينبغي أن يستمع كُلًا منها إلى الاتصالات الشبكية القادمة عبر أرقام منافذٍ مختلفة. كما يُمكِننا تشغيل البرنامج "CLMandelbrotWorker" على نفس الحاسوب الذي يَعمَل عليه البرنامج "CLMandelbrotMaster"، ولربما ستلاحظ عندها زيادة سرعة البرنامج إذا كان حاسوبك يحتوي على عدة معالجات. ألقِ نظرةً على التعليقات الموجودة بشيفرة البرنامج لمزيدٍ من المعلومات. نستعرض فيما يلي بعض الأوامر المُمكِن استخدامها لتشغيل البرنامج الرئيسي مع نسختين من البرنامج العامل على نفس الحاسوب. اُكتُبها بنوافذ سطر أوامر مختلفة:

java  CLMandelbrotWorker                             (Listens on default port)

java  CLMandelbrotWorker  2501                       (Listens on port 2501)

java  CLMandelbrotMaster  localhost  localhost:2501

يَحلّ البرنامج "CLMandelbrotMaster" نفس المشكلة بالضبط في كل مرةٍ نُشغِّل بها. في الواقع، طبيعة المشكلة ذاتها غير مهم، ولكنها تمثّل هنا حساب البيانات التي يتطلَّبها رسم جزءٍ صغير من صورة مجموعة ماندلبرو Mandelbrot Set الشهيرة. إذا كنت تريد رؤية الصورة الناتجة، ألغِ التعليق الموجود فوق الاستدعاء saveImage()‎ بنهاية البرنامج main()‎ المُعرَّف بالملف CLMandelbrotMaster.java).

يُمكِنك تشغيل البرنامج "CLMandelbrotMaster" مع عددٍ مختلف من البرامج العاملة لرؤية مدى اعتمادية الزمن الذي يتطلَّبه حل المشكلة على عدد البرامج العاملة. لاحِظ استمرار البرامج العاملة بالعمل حتى بعد انتهاء البرنامج الرئيسي؛ أي يُمكِنك تشغيل البرنامج الرئيسي عدة مرات دون الاضطرار لإعادة تشغيل البرامج العاملة في كل مرة. علاوةً على ذلك، إذا شغَّلت البرنامج "CLMandelbrotMaster" دون أن تُمرِّر إليه أي وسطاء، فإنه سيحلّ المشكلة بالكامل بمفرده، ويُمكِنك بذلك رؤية الزمن الذي يتطلَّبه حل المشكلة بدون الحوسبة المُوزَّعة.

في الواقع، جرَّبنا ذلك بإحدى الحواسيب القديمة والبطيئة جدًا، ووجدنا أن البرنامج "CLMandelbrotMaster" قد استغرق 40 ثانية لحل المشكلة بمفرده؛ في حين استغرق 43 ثانية عند تشغيل برنامجٍ عاملٍ واحد. تُمثِّل تلك الزيادة العمل الإضافي المطلوب لاستخدام الشبكات، حيث يتطلَّب إنشاء الاتصال الشبكي، وإرسال الرسائل عبر الشبكة المزيد من الوقت. في المقابل، اِستغرَق البرنامج 22 ثانية فقط لحل المشكلة عند تشغيل برنامجين عاملين بحواسيبٍ مختلفة. في تلك الحالة، نفَّذ كل برنامجٍ منهما نصف العمل على التوازي، ولذلك أنُجزَت العملية بنصف الوقت تقريبًا.

يستمر الزمن بالنقصان مع زيادة عدد البرامج العاملة، ولكن إلى حدٍ معين؛ لأن البرنامج الرئيسي لديه أيضًا بعض العمل الواجب إنجازه مهما ازداد عدد البرامج العاملة، وعليه لا يُمكِن للزمن الكلي المطلوب لحل المشكلة أن يكون أقل من الزمن الذي يستغرقه البرنامج الرئيسي لتنفيذ عمله. وفي تلك الحالة، بدا أن أقل زمنٍ ممكنٍ هو 5 ثوانٍ تقريبًا.

والآن، لنرى طريقة برمجة هذا التطبيق المُوزَّع. سيُقسِّم البرنامج الرئيسي المشكلة إلى مجموعة مهام؛ بحيث تُمثَّل كل مهمة منها باستخدام كائنٍ ينتمي إلى الصنف CLMandelbrotTask. تُرسَل تلك المهام إلى البرامج العاملة، التي ينبغي لها أن تعيد نتائجها بعد أن تحسبها. يَعنِي ذلك أننا بحاجة إلى نوعٍ من البروتوكول للتواصل؛ ولذلك قررنا استخدام مجرى محارف character stream لهذا الغرض.

سيُشفِّر البرنامج الرئيسي كل مهمةٍ بهيئة سطرٍ نصي يُرسَل إلى البرنامج العامل، الذي سيَفك التشفير ويُعيد السطر إلى كائنٍ من الصنف CLMandelbrotTask ليتمكَّن من فهم المهمة المفترض تنفيذها. بعد ذلك، يُنجِز البرنامج العامل المهمة، ثم يُشفِّر النتائج بهيئة سطرٍ نصي آخر يُرسَل عائدًا إلى البرنامج الرئيسي. أخيرًا، يَفُك البرنامج الرئيسي الشيفرة، ويَدمِج النتيجة مع نتائج المهام الأخرى. بعد اكتمال جميع المهام ودمج نتائجها معًا، تكون المشكلة قد حُلَّت.

لا يستقبل كل برنامجٍ عاملٍ مهمةً واحدةً فقط، وإنما متتاليةً من المهام؛ فبمجرد أن يُنجز مهمةً معينة، ويُرسِل نتائجها، تُسنَد إليه مهمةٌ جديدة. يَستقبِل البرنامج العامل الأمر "close" بعد اكتمال جميع المهام ليخبره بأن عليه إغلاق الاتصال. يُجرَى كل ما سبق ضمن تابعٍ، اسمه handleConnection()‎ بالملف CLMandelbrotWorker.java، حيث يُستدعَى ذلك التابع لمعالجة اتصالٍ قد اُنشئ بالفعل مع البرنامج الرئيسي. يَستخدِم ذلك التابع بدوره تابعًا آخر، اسمه readTask()‎ لفك تشفير المهمة التي استقبلها من البرنامج الرئيسي؛ كما يستخدِم التابع writeResults()‎ بهدف تشفير نتائج المهمة قبل إرسالها إلى البرنامج الرئيسي. يجب عليه أيضًا معالجة أي أخطاءٍ ممكنة. انظر تعريف التابع:

private static void handleConnection(Socket connection) {
   try {
      BufferedReader in = new BufferedReader( 
                new InputStreamReader( connection.getInputStream()) );
      PrintWriter out = new PrintWriter(connection.getOutputStream());
      while (true) {
         String line = in.readLine();  // رسالة من البرنامج الرئيسي
         if (line == null) {
               // واجهنا نهاية المجرى. لا ينبغي أن يحدث ذلك
            throw new Exception("Connection closed unexpectedly.");
         }
         if (line.startsWith(CLOSE_CONNECTION_COMMAND)) {
               // يُمثِّل الانتهاء الطبيعي للاتصال
            System.out.println("Received close command.");
            break;
         }
         else if (line.startsWith(TASK_COMMAND)) {
               // يُمثِل مهمةً من النوع‫ CLMandelbrotTask ينبغي أن ينفذها الخيط
            CLMandelbrotTask task = readTask(line);  // فك تشفير الرسالة
            task.compute();  // نفِّذ المهمة
            out.println(writeResults(task));  //  أرسِل النتائج
            out.flush();  // تأكّد من إرسال النتائج
         }
         else {
               // ليس هناك أي رسائل أخرى ضمن البروتوكول
            throw new Exception("Illegal command received.");
         }
      }
   }
   catch (Exception e) {
      System.out.println("Client connection closed with error " + e);
   }
   finally {
      try {
         connection.close();  // تأكّد من إغلاق المقبس
      }
      catch (Exception e) {
      }
   }
}

لا يُنفَّذ التابع المُعرَّف بالأعلى بخيطٍ thread مُنفصل، فالبرنامج العامل لديه شيءٌ واحدٌ فقط ليفعله بأي لحظة، ولا يحتاج إلى عدة خيوط.

لنعود الآن إلى البرنامج الرئيسي CLMandelbrotMaster.java، حيث نواجه وضعًا أكثر تعقيدًا. يجب أن يُنشِئ البرنامج اتصالًا مع مجموعةٍ من البرامج العاملة عبر مجموعةٍ من الاتصالات الشبكية، وسيَستخدِم البرنامج لإنجاز ذلك عدة خيوط؛ بحيث يُعالِج كل خيطٍ منها الاتصال مع برنامج عاملٍ واحد. تُوضِّح الشيفرة الوهمية pseudocode التالية الفكرة العامة للبرنامج main()‎:

// أنشِئ المهام التي ينبغي تنفيذها وضفها إلى الرتل
create the tasks that must be performed and add them to a queue
// إذا لم تُمرَّر أي وسائط بالأمر
if there are no command line arguments {
   // يُنفِّذ البرنامج الرئيسي كل شيء بنفسه 
   Remove each task from the queue and perform it.
}
else {
      // تُنفِّذ البرامج العاملة المهام
   for each command line argument:
      // احصل على معلومات العامل من وسيط سطر الأوامر
      Get information about a worker from command line argument.
      // أنشِئ خيطًا جديدًا وشغِّله لكي يُرسِل المهام إلى البرامج العاملة
      Create and start a thread to send tasks to workers.
   // انتظر اكتمال جميع الخيوط
   Wait for all threads to terminate.
}
// جميع المهام قد انتهت بفرض عدم حدوث أخطاء

تُوضَع المهام بمتغيرٍ اسمه tasks من نوع الرتل ConcurrentBlockingQueue<CLMandelbrotTask>‎؛ وتَقَرأ خيوط الاتصال المهام من ذلك الرتل، وتُرسِلها إلى البرامج العاملة. يُستخدَم التابع tasks.poll()‎ لقراءة مهمةٍ من الرتل؛ فإذا كان الرتل فارغًا، فسيُعيد القيمة null، والتي تُعدّ إشارةً إلى أن جميع المهام قد أُسنَدت بالفعل وأن بإمكان خيط الاتصال أن ينتهي.

يتولى كل خيط اتصال مهمة إرسال متتاليةٍ من المهام إلى خيطٍ عاملٍ معين؛ واستقبال النتائج التي يعيدها ذلك الخيط العامل؛ كما أنه مسؤولٌ عن إنشاء اتصالٍ مع الخيط العامل بالبداية. تُوضِّح الشيفرة الوهمية pseudocode التالية الفكرة العامة للعملية التي يُنفِّذها خيط الاتصال:

// أنشِئ مقبسًا متصلًا مع البرنامج العامل
Create a socket connected to the worker program.
// أنشِئ مجرى دخل ومجرى خرج للتواصل مع البرنامج العامل
Create input and output streams for communicating with the worker.
while (true) {
   Let task = tasks.poll().
   If task == null
      break;  // جميع المهام قد أسنَدت 
   // شفِّر‫ المهمة إلى رسالة وأرسلها إلى البرنامج العامل
   Encode the task into a message and transmit it to the worker.
   // اقرأ رد العامل
   Read the response from the worker.
   // فك تشفير الرد وعالجه
   Decode and process the response.
}
// أرسِل الأمر‫ "close" إلى البرنامج العامل
Send a "close" command to the worker.
Close the socket.

سيَعمَل ذلك بطريقة مناسبة، ولكن هناك بعض الملاحظات: أولًا، يجب أن يكون الخيط مُهيأً للتعامل مع أخطاء الشبكة. على سبيل المثال، قد تُغلَق إحدى البرامج العاملة بصورةٍ غير متوقَّعة، وسيتمكَّن البرنامج الرئيسي من إكمال عمله في تلك الحالة نظرًا لوجود برامجٍ عاملةٍ أخرى. (عندما تُشغِّل البرنامج، يُمكِنك أن تُجرِّب ما يلي: أوقف إحدى البرامج العاملة بالضغط على "CONTROL-C"، وسترى أنه بإمكان البرنامج الرئيسي أن يكتمل بنجاح). تَنشَأ المشكلة إذا حدث خطأٌ بينما يَعمَل الخيط على مهمةٍ معينة؛ ونظرًا لأننا نهدف إلى حل المشكلة كاملةً، لا بُدّ إذًا من إعادة إسناد تلك المهمة إلى برنامجٍ عاملٍ آخر. يُمكِننا تحقيق ذلك بإعادة المهام غير المكتملة إلى رتل المهام.

لسوء الحظ، لا يُعالج البرنامج الذي نَستعرِضه هنا جميع الأخطاء المحتملة. على سبيل المثال، إذا فَشَل خيط العامل الأخير، فلن يكون هناك عملاء آخرين نُسنِد إليهم المهام غير المكتملة. علاوةً على ذلك، إذا عُلّقَ الاتصال الشبكي دون أن يُولِّد خطأً فعليًا، سيُعلَّق البرنامج أيضًا نتيجة انتظاره لرد العامل. لا بُدّ لأي برنامج متين أن يجد طريقةً يَكشِف من خلالها عن تلك المشكلة ومن ثم يُعيد إسناد المهمة.

ثانيًا، تترك الطريقة الموضحة بالأعلى البرنامج العامل دون أي عملٍ أثناء معالجة البرنامج الرئيسي لرد العامل. سيكون من الأفضل لو أُسندت مهمةٌ جديدةٌ إلى البرنامج العامل قبل معالجة ردّه على المهمة السابقة؛ حيث سيُبقِي ذلك البرنامج العامل مُنشغِلًا؛ كما سيَسمَح بتنفيذ العمليتين على التوازي بدلًا من تنفيذهما على التوالي. بهذا المثال، تستغرق معالجة الرد زمنًا قصيرًا للغاية، ولذلك قد لا يُمثِل ترك العامل منتظرًا أو تشغيله على الفور فارقًا كبيرًا، ولكن ينبغي عمومًا تطبيق التوازي بأقصى قدرٍ ممكن. ألقِ نظرةً على التصور العام للخوارزمية بعد التعديل:

try {
   // أنشِئ مقبسًا متصلًا مع البرنامج العامل
   Create a socket connected to the worker program.
   // أنشِئ مجرى دخل ومجرى خرج للتواصل مع البرنامج العامل
   Create input and output streams for communicating with the worker.
   Let currentTask = tasks.poll().
   // شفِّر‫ `currentTask` إلى رسالة وأرسلها إلى البرنامج العامل
   Encode currentTask into a message and send it to the worker.
   while (true) {
     // اِقرأ الرد من البرنامج العامل
      Read the response from the worker.
      Let nextTask = tasks.poll().
      If nextTask != null {
        // أرسل‫ `nextTask` إلى البرنامج العامل قبل معالجة الرد على المهمة `currentTask`
         // شفِّر‫ `nextTask` إلى رسالة وأرسلها إلى البرنامج العامل
         Encode nextTask into a message and send it to the worker.
      }
      // فك تشفير الرد على المهمة‫ `currentTask` وعالجه
      Decode and process the response to currentTask.
      currentTask = nextTask.
      if (currentTask == null)
         break; // جميع المهام قد أسنَدت
   }
   // أرسِل الأمر‫ "close" إلى البرنامج العامل
   Send a "close" command to the worker.
   // أغلق المقبس
   Close the socket.
}
catch (Exception e) {
   // أعِد المهمة غير المكتملة إن وجدت إلى رتل المهام مرة أخرى
   Put uncompleted task, if any, back into the task queue.
}
finally {
   // أغلق المقبس
   Close the connection.
}

يُمكِنك الإطلاع على الصنف المتداخل WorkerConnection المُعرَّف بالملف CLMandelbrotMaster.java لترى طريقة تحويل الشيفرة الوهمية السابقة إلى شيفرة جافا.

ترجمة -بتصرّف- للقسم Section 4: Threads and Networking من فصل Chapter 12: Threads and Multiprocessing من كتاب Introduction to Programming Using Java.

اقرأ أيضًا


تفاعل الأعضاء

أفضل التعليقات

لا توجد أية تعليقات بعد



انضم إلى النقاش

يمكنك أن تنشر الآن وتسجل لاحقًا. إذا كان لديك حساب، فسجل الدخول الآن لتنشر باسم حسابك.

زائر
أضف تعليق

×   لقد أضفت محتوى بخط أو تنسيق مختلف.   Restore formatting

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   جرى استعادة المحتوى السابق..   امسح المحرر

×   You cannot paste images directly. Upload or insert images from URL.


×
×
  • أضف...