اذهب إلى المحتوى

سنكمل في هذا المقال ما تحدثنا عنه في المقال السابق الجزء الأول عملية بناء خادم ويب متعدد مهام المعالجة، فإذا لم تكن قد قرأت المقال السابق، فاقرأه قبل قراءة هذا المقال.

تحويل خادم ويب ذو خيط وحيد إلى خادم متعدد المهام

يعالج الآن خادم الويب كل طلب بدوره، يعني أنه لن يعالج اتصال ثاني حتى ينتهي من معالجة الطلب الأول. سيصبح التنفيذ التسلسلي أقل كفاءةً كلما زادت الطلبات على الخادم؛ فإذا استقبل الخادم طلبًا يتطلب وقتًا طويلًا لمعالجته ستنتظر الطلبات التالية وقتًا أطول حتى ينتهي الطلب الطويل حتى لو كانت الطلبات التالية تُنفذ بسرعة. يجب حل هذه المشكلة ولكن أولًا لنلاحظها أثناء العمل.

محاكاة طلب بطيء في تنفيذ الخادم الحالي

لنلاحظ كيف يؤثر طلب بطيء المعالجة على الطلبات الأخرى المقدمة إلى تنفيذ الخادم الحالي. تنفذ الشيفرة 10 طلب معالجة إلى ‎/sleep بمحاكاة استجابة بطيئة التي تسبب سكون الخادم لخمس ثوانٍ قبل الاستجابة.

  • اسم الملف: src/main.rs
use std::{
    fs,
    io::{prelude::*, BufReader},
    net::{TcpListener, TcpStream},
    thread,
    time::Duration,
};
// --snip--

fn handle_connection(mut stream: TcpStream) {
    // --snip--

    let (status_line, filename) = match &request_line[..] {
        "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"),
        "GET /sleep HTTP/1.1" => {
            thread::sleep(Duration::from_secs(5));
            ("HTTP/1.1 200 OK", "hello.html")
        }
        _ => ("HTTP/1.1 404 NOT FOUND", "404.html"),
    };

    // --snip--
}

[الشيفرة 10: محاكاة استجابة بطيئة عن طريق سكون الخادم لخمس ثوان]

بدلنا من if إلى match إذ لدينا ثلاث حالات. يجب أن نطابق صراحةً مع قطعة من request_line لمطابقة النمط مع قيم السلسلة النصية المجردة. لا تُسند match ولا تُحصل تلقائيًا كما تفعل توابع المساواة.

تكون الذراع الأولى هي نفس كتلة if من الشيفرة 9، وتطابق الذراع الثانية الطلب إلى ‎/sleep ويسكن الخادم لخمس ثوان عندما يُستقبل الطلب قبل تصيير صفحة HTML الناجحة، والذراع الثالثة هي نفس كتلة else من الشيفرة 9.

يمكن ملاحظة أن الخادم بدائي، لكن تعالج المكاتب الحقيقية طلبات متعددة بطريقة مختصرة أكثر.

شغل الخادم باستخدام cargo run، ثم افتح نافذتي متصفح واحدة من أجل "/http://127.0.0.1:7878" وأُخرى من أجل "http://127.0.0.1:7878/sleep". إذا أدخلت ‎/ URI عدة مرات كما سابقًا سترى أنه يستجيب بسرعة، لكن إذا أدخلت ‎/sleep ومن ثم حمّلت "/" سترى أن "/" ينتظر حتى يسكن sleep خمس ثوان كاملة قبل أن يُحمّل.

هناك تقنيات متعددة لتفادي التراكم خلف طلب بطيء، والطريقة التي سنتبعها هي مجمع خيط thread pool.

تحسن الإنتاجية باستخدام مجمع خيط

مجمع خيط thread pool هو مجموعة من الخيوط المُنشأة التي تنتظر معالجة مهمة. عندما يستقبل البرنامج مهمةً، يُعيّن واحد من الخيوط في المجمع لأداء المهمة ومعالجتها وتبقى باقي الخيوط في المجمع متاحةً لمعالجة أي مهمة تأتي أثناء معالجة الخيط الأول للمهمة، وعندما ينتهي الخيط من معالجة المهمة يعود إلى مجمع الخيوط الخاملة جاهزًا لمعالجة أي مهمة جديدة. يسمح مجمع خيط معالجة الاتصالات بصورةٍ متزامنة ويزيد إنتاجية الخادم.

سنحدد عدد الخيوط في المجمع برقم صغير لحمايتنا من هجوم حجب الخدمة Denial of Service ‎ -أو اختصارًا DoS. إذا طلبنا من البرنامج إنشاء خيط لكل طلب قادم، يمكن لشخص إنشاء 10 مليون طلب مستهلكًا كل الموارد المتاحة وموقفًا معالجة الطلبات نهائيًا.

بدلًا من إنشاء عدد لا نهائي من الخيوط، سننشئ عددًا محددًا من الخيوط في المجمع، وستذهب الطلبات المرسلة إلى المجمع للمعالجة. يحافظ المجمع على ترتيب الطلبات القادمة في رتل، كل خيط يأخذ طلبًا من الرتل يعالجه ثم يطلب من الرتل طلبًا آخر. يمكن معالجة N طلب بهذه الطريقة، إذ تمثّل N عدد الخيوط. إذا كان كل خيط يعالج طلبًا طويل التنفيذ يمكن أن تتراكم الطلبات في الرتل ولكننا بذلك نكون قد زدنا عدد الطلبات طويلة التنفيذ التي يمكن معالجتها قبل أن نصل إلى تلك المرحلة.

هذه إحدى طرق زيادة إنتاجية خادم ويب، ويمكن استكشاف طرق أُخرى مثل نموذج اشتقاق/جمع fork/join أو نموذج الدخل والخرج للخيط الواحد غير المتزامن single-threaded async I/O أو نموذج الدخل والخرج للخيوط المتعددة غير المتزامن multi-threaded async I/O model. يمكنك قراءة وتنفيذ هذه الحلول إذا كنت مهتمًا بهذا الموضوع وكل هذه الخيارات ممكنة مع لغة برمجية ذات مستوى منخفض مثل رست.

قبل البدء بتنفيذ مجمع خيط، لنتحدث كيف يجب أن يكون استخدام المجمع. عند بداية تصميم الشيفرة، تساعدك كتابة واجهة المستخدم في التصميم. اكتب واجهة برمجة التطبيق API للشيفرة بهيكلية تشبه طريقة استدعائها، ثم نفذ الوظيفة داخل الهيكل بدلًا من تنفيذ الوظيفة ثم بناء واجهة برمجة التطبيق العامة.

سنستخدم طريقة تطوير مُقادة بالمصرف compiler-driven، هذه طريقة مشابهة لاستخدامنا التطوير المُقاد بالاختبار test-driven كما فعلنا في مشروع سابق في الفصل 12. سنكتب الشيفرة التي تستدعي الدالة المُرادة، ومن ثم ننظر إلى الأخطاء من المصرّف لتحديد ماذا يجب أن نغير حتى تعمل الشيفرة. يجب الحديث بدايةً عن التقنيات التي لن نستعملها.

إنشاء خيط لكل طلب

لنلاحظ بدايةً كيف ستكون الشيفرة في حال أنشأنا خيطًا لكل طلب. كما ذكرنا سابقًا، لن تكون هذه خطتنا النهائية وذلك لمشكلة إنشاء عدد لا نهائي من الخيوط ولكنها نقطة بداية لإنشاء خادم متعدد الخيوط قادر على العمل، ثم سنضيف مجمع الخيط مثل تحسين وستكون مقارنة الحلين أسهل. تظهر الشيفرة 11 التغييرات لجعل main تُنشئ خيط جديد لمعالجة كل مجرى داخل حلقة for.

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        thread::spawn(|| {
            handle_connection(stream);
        });
    }
}

[الشيفرة 11: إنشاء خيط جديد لكل مجرى]

كما تعلمنا سابقًا في مقال سابق استخدام الخيوط Threads لتنفيذ شيفرات رست بصورة متزامنة آنيًا، يُنشئ thread::spawn خيطًا جديدًا وينفذ الشيفرة في المُغلف في الخيط الجديد. إذا نفذت الشيفرة وحملت "‎/sleep" في المتصفح ومن ثم "/" في نافذتي متصفح أُخريين ستلاحظ أن الطلبات إلى "/" لا تنتظر "‎/sleep‎" لينتهي ولكن كما ذكرنا سابقًا سيطغى هذا على النظام لأننا ننشئ خيوطًا دون حد.

إنشاء عدد محدد من الخيوط

نريد من مجمع الخيط أن يعمل بطريقة مشابهة ومألوفة حتى لا يحتاج التبديل من الخيوط لمجمع خيط أي تعديلات كبيرة للشيفرة التي تستخدمها واجهة برمجة التطبيق. تظهر الشيفرة 12 واجهة افتراضية لهيكل ThreadPool الذي نريد استخدامه بدلًا عن thread::spawn.

  • اسم الملف: src/main.rs
fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming() {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }
}

[الشيفرة 12: واجهة ThreadPool المثالية]

استخدمنا ThreadPool::new لإنشاء مجمع خيط جديد بعدد خيوط يمكن تعديله وفي حالتنا أربعة. لدى pool.execute واجهة مماثلة للدالة thread:spawn في حلقة for إذ تأخذ مغلفًا يجب أن ينفذه المجمع لكل مجرى. نحتاج لتنفيذ pool.execute أن تأخذ مغلفًا وتعطيه لخيط في المجمع لينفذه. لن تُصرّف هذه الشيفرة ولكن سنجربها كي يدلنا المصرف عن كيفية إصلاحها.

إنشاء مجمع خيط باستخدام التطوير المقاد بالمصرف

أجرِ التغييرات في الشيفرة 12 على الملف src/main.rs واستخدم أخطاء المصرّف من cargo check لقيادة التطوير. هذه أول خطأ نحصل عليه:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0433]: failed to resolve: use of undeclared type `ThreadPool`
  --> src/main.rs:11:16
   |
11 |     let pool = ThreadPool::new(4);
   |                ^^^^^^^^^^ use of undeclared type `ThreadPool`

For more information about this error, try `rustc --explain E0433`.
error: could not compile `hello` due to previous error

عظيم، يبين هذ الخطأ أننا بحاجة إلى نوع أو وحدة ThreadPool. سيكون تنفيذ Threadpool الخاص بنا مستقل عن عمل خادم الويب، لذا لنبدّل الوحدة المصرفة hello من وحدة ثنائية مصرفة إلى وحدة مكتبة مصرفة لاحتواء تنفيذ Threadpool. يمكننا بعد ذلك استخدام مكتبة مجمع الخيط المنفصلة لفعل أي عمل نريده باستخدام مجمع خيط وليس فقط لطلبات خادم الويب.

أنشئ src/lib.rs الذي يحتوي التالي، وهو أبسط تعريف لهيكل ThreadPool يمكن الحصول عليه.

  • اسم الملف: src/lib.rs
pub struct ThreadPool;

ثم عدل ملف main.rs لجلب ThreadPool من المكتبة إلى النطاق بإضافة الشيفرة التالية في مقدمة الملف src/main.rs.

  • اسم الملف: src/main.rs
use hello::ThreadPool;

لن تعمل هذه الشيفرة ولكن لننظر مجددًا إلى الخطأ التالي الذي نريد معالجته:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope
  --> src/main.rs:12:28
   |
12 |     let pool = ThreadPool::new(4);
   |                            ^^^ function or associated item not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error

يشير هذا الخطأ أننا نحتاج إلى انتاج دالة مرتبطة اسمها new من أجل ThreadPool. يمكننا معرفة أن new تحتاج معامل يقبل 4 مثل وسيط ويجب أن يعيد نسخة ThreadPool. لننفذ أبسط دالة new التي تحتوي هذه الصفات characteristics.

  • اسم الملف: src/lib.rs
pub struct ThreadPool;

impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        ThreadPool
    }
}

اخترنا نوع usize للمعامل size لأننا نعرف أن العدد السالب للطلبات غير منطقي ونعرف أيضًا أننا سنستخدم 4 ليمثّل عدد العناصر في مجموعة الخيوط وهذا هو عمل نوع usize كما تحدثنا سابقًا في قسم "أنواع الأعداد الصحيحة" في المقال أنواع البيانات Data Types في لغة رست.

لنتحقق من الشيفرة مجددًا:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope
  --> src/main.rs:17:14
   |
17 |         pool.execute(|| {
   |              ^^^^^^^ method not found in `ThreadPool`

For more information about this error, try `rustc --explain E0599`.
error: could not compile `hello` due to previous error

يحدث الخطأ الآن لأنه ليس لدينا تابع execute على ThreadPool. تذكر من قسم "إنشاء عدد محدد من الخيوط" أننا قررنا أن مجمع الخيط يجب أن يكون له واجهة تشبه thread::spawn، وقررنا أيضّأ أننا سننفذ التابع execute ليأخذ المغلف المعُطى له ويعطيه لخيط خامل في المجمع لينفذه.

سنعرّف تابع execute على ThreadPool ليأخذ المغلف مثل معامل. تذكر من القسم "نقل القيم خارج المغلف وسمات Fn" في المقال المغلفات closures في لغة رست أننا بإمكاننا أخذ المغلفات مثل معاملات باستخدام ثلاث سمات هي Fn أو FnMut أو FnOnce. يجب أن نحدد أي نوع مغلف نريد استخدامه هنا، نحن نعرف أننا سنفعل شيئًا يشابه تنفيذ المكتبة القياسية لدالةthread::spawn، لذلك دعنا نرى ما هي القيود الموجودة لبصمة thread::spawn على معاملاتها. تظهر التوثيقات التالي:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

المعامل F هو الذي يهمنا، والمعامل T متعلق بالقيمة المُعادة ولسنا مهتمين بها. يمكننا أن نرى أن spawn تستخدم FnOnce مثل قيد سمة على F، وهذا ما نريده أيضًأ لأننا نريد تمرير الوسيط الذي نأخذه في execute إلى spawn. يمكننا التأكد أيضًا أن FnOnce هي السمة المُراد استخدامها لأن خيط تنفيذ الطلب سينفِّذ فقط طلب المغلف مرةً واحدة، والذي يطابق Once في FnOnce.

لدى معامل نوع F أيضًا قيد سمة Send وقيد دورة حياة static' المفيدان في حالتنا؛ فنحن بحاجة Send لنقل المغلف من خيط لآخر، و static' لأننا لا نعرف الوقت اللازم ليُنفذ الخيط. لننشئ تابع execute على ThreadPool التي تأخذ معامل معمم للنوع F مع هذه القيود.

  • اسم الملف: src/lib.rs
impl ThreadPool {
    // --snip--
    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
    }
}

استخدمنا () بعد FnOnce لأن FnOnce تمثل مغلفًا لا يأخذ معاملات ويعيد نوع الوحدة (). يمكن إهمال النوع المُعاد من البصمة كما في تعريفات الدالة، ولكن حتى لو لم يوجد أي معاملات نحن بحاجة الأقواس.

هذا هو أبسط تنفيذ لدالة execute، فهي لا تعمل شيئًا، لكننا فقط بحاجة أن تُصرّف شيفرتنا، لنتحقق منها مجددًا.

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s

إنها تُصرّف، لكن لاحظ إذا جربت cargo run وأجريت طلبًا في المتصفح، سترى الأخطاء في المتصفح نفسها التي رأيناها في بداية الفصل. لم تستدع المكتبة المغلف المُمرر إلى execute حتى الآن.

ملاحظة: هناك مقولة عن لغات البرمجة ذات المُصرّفات الحازمة مثل هاسكل Haskell ورست وهي "إذا صُرفت الشيفرة فإنها تعمل" ولكن هذه المقولة ليست صحيحة إجمالًا، إذ يُصرّف مشروعنا، لكنه لا يعمل شيئًا إطلاقًا. إذا كنا نريد إنشاء مشروع حقيقي ومكتمل، الآن هو الوقت المثالي لكتابة وحدات اختبار للتحقق من أن الشيفرة تُصرّف ولها السلوك المرغوب.

التحقق من صحة عدد الخيوط في new

لن نغيّر شيئًا للمعاملين new و parameter. لننفذ متن الدوال بالسلوك الذي نريده، ولنبدأ بالدالة new. اخترنا سابقًا نوع غير مؤشر للمعامل size لأن مجمع بعدد خيوط سلبي هو غير منطقي، ولكن مجمع بعدد خيوط صفر ليس منطقيًا أيضًا ولكن unsize صالح. سنضيف الشيفرة التي تتحقق من أن size أكبر من الصفر قبل إعادة نسخة من ThreadPool وجعل البرنامج يهلع إذا حصل على قيمة صفر باستخدام ماكرو assert!‎ كما في الشيفرة 13.

  • اسم الملف: src/lib.rs
impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        ThreadPool
    }

    // --snip--
}

[الشيفرة 13: تنفيذ Threadpool:new ليهلع إذا كان size صفر]

أضفنا بعض التوثيق إلى ThreadPool باستخدام تعليقات doc. لاحظ أننا اتبعنا خطوات التوثيق الجيدة بإضافة قسم يستدعي الحالات التي يمكن للدالة أن تهلع فيها كما تحدثنا في الفصل 14. جرب تنفيذ cargo run --open واضغط على هيكل ThreadPool لرؤية كيف تبدو المستندات المُنشأة للدالة new.

يمكننا تغيير new إلى build بدلًا من إضافة ماكرو assert!‎، ونعيد Result كما فعلنا في Config::build في مشروع الدخل والخرج في الشيفرة 9 في المقال كتابة برنامج سطر أوامر بلغة رست: إعادة بناء التعليمات البرمجية لتحسين النمطية Modularity والتعامل مع الأخطاء، لكننا قررنا في حالتنا أن إنشاء مجمع خيط بدون أي خيوط هو خطأ لا يمكن استرداده. إذا كنت طموحًا جرب كتابة دالة اسمها build مع البصمة التالية لمقارنته مع الدالة new.

pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {

إنشاء مساحة لتخزين الخيوط

لدينا الآن طريقة لمعرفة أنه لدينا عدد صالح من الخيوط لتخزينها في المجمع، إذ يمكننا إنشاء هذه الخيوط وتخزينها في هيكل ThreadPool قبل إرجاعها إلى الهيكل، ولكن كيف نخزن الخيوط؟ لنلاحظ بصمة thread::spawn.

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
    where
        F: FnOnce() -> T,
        F: Send + 'static,
        T: Send + 'static,

يُعاد JoinHandle<T>‎ من الدالة spawn، إذ تمثّل T النوع الذي يعيده المغلف. لنستعمل JoinHandle أيضًا لنرى ما سيحدث، إذ سيعالج المغلف الذي نمرره إلى مجمع الخيط الاتصال ولا يعيد أي شيء لذا T ستكون نوع وحدة ().

ستُصرّف الشيفرة في الشيفرة 14 ولكن لا تُنشئ أي خيوط. غيّرنا تعريف ThreadPool لتحتوي شعاعًا من نسخة thread::JoinHandle<()>‎ وهيأنا الشعاع بسعة size وضبطنا حلقة for التي تعيد بعض الشيفرة لإنشاء الخيوط وتعيد نسخة ThreadPool تحتويهم.

  • اسم الملف: src/main.rs
use std::thread;

pub struct ThreadPool {
    threads: Vec<thread::JoinHandle<()>>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut threads = Vec::with_capacity(size);

        for _ in 0..size {
            // create some threads and store them in the vector
        }

        ThreadPool { threads }
    }
    // --snip--
}

[الشيفرة 14: إنشاء شعاع للهيكل ThreadPool الذي يحتوي الخيوط]

جلبنا std::thread إلى النطاق في وحدة المكتبة المصرفة لأننا نستخدم Thread::JoinHandle بمثابة نوع العنصر في الشعاع في ThreadPool. تُنشئ ThreadPool شعاعًا جديدًا يحتوي عناصر size عندما يُستقبل حجم صالح.

تعمل الدالة with_capacity نفس مهام Vec::new ولكن بفرق مهم هو أنها تحجز مسبقًا المساحة في الشعاع لأننا نريد تخزين عناصر size في الشعاع. إجراء هذا الحجز مسبقًا هو أكثر كفاءة من استخدام Vec::new الذي يغير حجمه كلما اُضيفت عناصر.

عندما تنفذ cargo check مجدّدًا ينبغي أن تنجح.

هيكل عامل Worker Struct مسؤول عن ارسال شيفرة من مجمع الخيط إلى خيط

تركنا تعليقًا في حلقة for متعلق بإنشاء الخيوط في الشيفرة 14. سننظر هنا إلى كيفية إنشاء الخيوط حقيقةً، إذ تؤمن المكتبة القياسية thread::spawn بمثابة طريقة لإنشاء الخيوط ويتوقع thread::spawn الحصول على بعض الشيفرة لكي ينفذها الخيط بعد إنشائه فورًا، ولكن نريد في حالتنا إنشاء خيوط وجعلهم ينتظرون شيفرةً سنرسلها لاحقًا. لا يقدم تنفيذ المكتبة القياسية للخيوط أي طريقة لعمل ذلك، إذ يجب تنفيذها يدويًا.

سننفذ هذا السلوك عن طريق إضافة هيكلية بيانات جديدة بين ThreadPool والخيوط التي ستدير هذه السلوك الجديد، وسندعو هيكل البيانات هذا "العامل Worker" وهو مصطلح عام في تنفيذات مجمّع الخيوط. يأخذ العامل الشيفرة التي بحاجة لتنفيذ وينفّذها في خيط العامل. فكر كيف يعمل الناس في مطبخ المطعم، إذ ينتظر العاملون طلبات الزبائن ويكونوا مسؤولين عن أخذ هذه الطلبات وتنفيذها.

بدلًا من تخزين شعاع نسخة JoinHandle<()>‎ في مجمع الخيط، نخزن نسخًا من هيكل Worker. يخزن كل Worker نسخة JoinHandle<()>‎ واحدة، ثم ننفذ تابع على Worker الذي يأخذ مغلف شيفرة لينفذه ويرسله إلى خيط يعمل حاليًا لينفذه. سنعطي كل عامل رقمًا معرّفًا id للتمييز بين العمال المختلفين في المجمع عند التسجيل أو تنقيح الأخطاء.

هكذا ستكون العملية الجديدة عند إنشاء ThreadPool. سننفذ الشيفرة التي ترسل المغلف إلى الخيط بعد إعداد Worker بهذه الطريقة:

  1. عرّف هيكل Worker الذي يحتوي id و JoinHandle<()>‎.
  2. عدّل ThreadPool لتحتوي شعاع من نسخ Worker.
  3. عرّف دالة Worker::new التي تأخذ رقم id وتعيد نسخة Worker التي تحتوي id وخيط مُنشأ بمغلف فارغ.
  4. استخدم عداد حلقة for لإنشاء id وإنشاء Worker جديد مع ذلك الرقم id وخرن العامل في الشعاع.

إذا كنت جاهزًا للتحدي، جرّب تنفيذ هذه التغييرات بنفسك قبل النظر إلى الشيفرة في الشيفرة 15.

جاهز؟ يوجد في الشيفرة 15 إحدى طرق عمل التعديلات السابقة.

  • اسم الملف:src/lib.rs
use std::thread;

pub struct ThreadPool {
    workers: Vec<Worker>,
}

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers }
    }
    // --snip--
}

struct Worker {
    id: usize,
    thread: thread::JoinHandle<()>,
}

impl Worker {
    fn new(id: usize) -> Worker {
        let thread = thread::spawn(|| {});

        Worker { id, thread }
    }
}

[الشيفرة 15: تعديل ThreadPool بحيث تحتوي نسخة Worker بدلًا من احتواء الخيط مباشرةً]

عدّلنا اسم حقل ThreadPool من threads إلى workers لأنه يحتوي نسخ Worker بدلًا من نسخ JoinHandle<()>‎. استخدمنا العداد في حلقة for مثل وسيط لدالة Worker::new وخزّنا كل Worker جديد في شعاع اسمه workers.

لا تحتاج الشيفرة الخارجية (كما في الخادم في src/main.rs) أن تعرف تفاصيل التنفيذ بما يتعلق باستخدام هيكل Worker داخل ThreadPool، لذا نجعل كل من هيكل Worker ودالة new خاصين private. تستخدم الدالة Worker::new المعرّف id المُعطى وتخزن نسخة JoinHandle<()>‎ المُنشأة عن طريق إنشاء خيط جديد باستخدام مغلف فارغ.

ملاحظة: سيهلع thread::spawn إذا كان نظام التشغيل لا يستطيع إنشاء خيط بسبب عدم توفر موارد كافية، وسيؤدي هذا إلى هلع كامل الخادم حتى لو كان إنشاء بعض الخيوط ممكنًا. للتبسيط يمكن قبول هذا السلوك ولكن في تنفيذ مجمع خيط مُنتج ينبغي استخدام std::thread::Builder ودالة spawn الخاصة به التي تعيد Result.

تُصرّف هذه الشيفرة وتخزن عددًا من نسخ Worker الذي حددناه مثل وسيط إلى ThreadPool::new. لكننا لم نعالج المغلف الذي نحصل عليه في execute. لنتعرف على كيفية عمل ذلك تاليًا.

إرسال طلبات إلى الخيوط عن طريق القنوات

المشكلة التالية التي سنتعامل معها هي أن المغلفات المُعطاة إلى thread::spawn لا تفعل شيئًا إطلاقًا، وسنحصل حاليًا على المغلف الذي نريد تنفيذه في تابع execute، لكن نحن بحاجة لإعطاء thread::spawn مغلفًا لينفذه عندما ننشئ كل Worker خلال إنشاء ThreadPool.

نريد تشغيل هياكل Worker التي أنشأناها للبحث عن شيفرة من الرتل في ThreadPool وأن ترسل تلك الشيفرة إلى خيطها لينفّذها.

ستكون القنوات التي تعلمناها سابقًا في المقال استخدام ميزة تمرير الرسائل Message Passing لنقل البيانات بين الخيوط Threads في لغة رست -والتي تُعد طريقة بسيطة للتواصل بين خيطين- طريقةً ممتازةً لحالتنا، إذ سنستعمل قناةً لتعمل مثل رتل للوظائف، وترسل execute وظيفة من ThreadPool إلى نسخة Worker التي ترسل بدورها الوظيفة إلى خيطها. ستكون الخطة على النحو التالي:

  1. يُنشئ ThreadPool قناة ويحتفظ بالمرسل.
  2. يحتفظ كل Worker بالمستقبل.
  3. ننشئ هيكل Job جديد يحتفظ بالمغلف الذي نريد إرساله عبر القناة.
  4. يرسل تابع execute الوظيفة المراد تنفيذها عبر المرسل.
  5. سيتكرر مرور Worker على المستقبل وينفذ المغلف لأي وظيفة يستقبلها في الخيط.

لنحاول إنشاء قناة في ThreadPool::new والاحتفاظ بالمرسل في نسخة ThreadPool كما في الشيفرة 16. لا يحتوي هيكل Job أي شيء الآن، لكنه سيكون نوع العنصر المُرسل عبر القناة.

  • اسم الملف: src/lib.rs
use std::{sync::mpsc, thread};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: mpsc::Sender<Job>,
}

struct Job;

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id));
        }

        ThreadPool { workers, sender }
    }
    // --snip--
}

[الشيفرة 16: تعديل ThreadPool لتخزين مرسل القناة التي ترسل نسخ Job]

أنشأنا القناة الجديدة في ThreadPool:new وجعلنا المجمع يحتفظ بالمرسل. ستصرّف هذه الشيفرة بنجاح.

لنجرب تمرير مستقبل القناة إلى كل عامل عندما ينشئ مجمع الخيط القناة. نعرف أننا نريد استخدام المستقبل في الخيط الذي أنشأه العامل، لذا سنشير إلى معامل receiver في المغلف بمرجع reference. لن تُصرّف الشيفرة 17.

  • اسم الملف: src/lib.rs
impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, receiver));
        }

        ThreadPool { workers, sender }
    }
    // --snip--
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker {
        let thread = thread::spawn(|| {
            receiver;
        });

        Worker { id, thread }
    }
}

[الشيفرة 17: تمرير المستقبل إلى العمال workers]

أجرينا بعض التغييرات الصغيرة والمباشرة، إذ مررنا المستقبل إلى Worker::new واستخدمناه داخل المغلف.

عندما نتحقق من الشيفرة سنحصل على هذا الخطأ:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0382]: use of moved value: `receiver`
  --> src/lib.rs:26:42
   |
21 |         let (sender, receiver) = mpsc::channel();
   |                      -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait
...
26 |             workers.push(Worker::new(id, receiver));
   |                                          ^^^^^^^^ value moved here, in previous iteration of loop

For more information about this error, try `rustc --explain E0382`.
error: could not compile `hello` due to previous error

تحاول الشيفرة تمرير receiver لنسخ متعددة من Worker ولكن هذا لن يعمل كما تتذكر سابقًا من المقال استخدام ميزة تمرير الرسائل Message Passing لنقل البيانات بين الخيوط Threads في لغة رست، إذ أن تنفيذ القناة المقدم من رست هو مُنتجين producer متعددين ومستهلك consumer واحد، وهذا يعني أنه لا يمكن نسخ الطرف المستهلك من القناة لإصلاح هذا الخطأ ولا نريد أيضًا إرسال رسائل متعددة لمستهلكين متعددين، بل نحتاج قائمة رسائل واحدة مع عمال متعددين لكي تعالج كل رسالة مرةً واحدةً فقط. إضافةً إلى ذلك، يتطلب أخذ وظيفة من رتل القناة تغيير receiver، لذا تحتاج الخيوط طريقةً آمنةً لتشارك وتعدل receiver، وإلا نحصل على حالات سباق (كما تحدثنا في الفصل السابق المشار إليه).

تذكر المؤشرات الذكية الآمنة للخيوط التي تحدثنا عنها سابقًا في المقال تزامن الحالة المشتركة Shared-State Concurrency في لغة رست وتوسيع التزامن مع Send و Sync؛ فنحن بحاجة لاستخدام Arc<Mutex<T>>‎ لمشاركة الملكية لعدد من الخيوط والسماح للخيوط بتغيير القيمة. يسمح نوع Arc لعدد من العمال من مُلك المستقبل وتضمن Mutex حصول عامل واحد على الوظيفة من المستقبل. تظهر الشيفرة 18 التغييرات التي يجب عملها.

  • اسم الملف: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};
// --snip--

impl ThreadPool {
    // --snip--
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool { workers, sender }
    }

    // --snip--
}

// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--
    }
}

[الشيفرة 18: مشاركة المستقبل بين العمال باستخدام Arc و Mutex]

نضع المستقبل فيThreadPool::new في Arc و Mutex، وننسخ Arc لكل عامل لتزيد عدّ المرجع ليستطيع العمال مشاركة ملكية المستقبل.

تُصرّف الشيفرة بنجاح مع هذه التغييرات، لقد اقتربنا من تحقيق هدفنا.

تنفيذ تابع التنفيذ execute

لننفذ أخيرًا تابع execute على ThreadPool، إذ سغيّر أيضًا Job من هيكل إلى نوع اسم بديل لكائن السمة الذي يحتوي نوع المغلف الذي يستقبله execute. كما تحدثنا في قسم "إنشاء مرادفات للنوع بواسطة أسماء النوع البديلة" في المقال الأنواع والدوال المتقدمة في لغة رست، يسمح لنا نوع الاسم البديل بتقصير الأنواع الطويلة لسهولة الاستخدام كما في الشفرة 19.

  • اسم الملف: src/lib.rs
// --snip--

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    // --snip--

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.send(job).unwrap();
    }
}

// --snip--

[الشيفرة 19: إنشاء نوع اسم بديل Job لـ Box يحتوي كل مغلف وارسال العمل عبر القناة]

بعد إنشاء نسخة Job جديدة باستخدام المغلف نحصل على execute ونرسل الوظيفة عبر الطرف المرسل للقناة. نستدعي unwrap على send في حال فشل الإرسال؛ إذ يمكن حصول ذلك إذا أوقفنا كل الخيوط من التنفيذ، وهذا يعني توقُف الطرف المستقبل عن استقبال أي رسائل جديدة. لا يمكننا الآن إيقاف الخيوط من التنفيذ، إذ تستمر خيوطنا بالتنفيذ طالما المجمع موجود. سبب استخدام unwrap هو أننا نعرف أن حالة الفشل هذه لن تحصل ولكن المصرّف لا يعرف ذلك.

لم ننتهي كليًا بعد، فالمغلف المُمرر إلى thread::spawn يسند الطرف المستقبل من القناة فقط، لكن نريد بدلًا من ذلك أن يتكرر المغلف للأبد ويسأل الطرف المستقبل من القناة عن وظيفة وينفذ الوظيفة عندما يحصل عليها. دعنا نجري التغييرات الموضحة في الشيفرة 20 للدالة Worker::new.

  • اسم الملف: src/lib.rs
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let job = receiver.lock().unwrap().recv().unwrap();

            println!("Worker {id} got a job; executing.");

            job();
        });

        Worker { id, thread }
    }
}

[الشيفرة 20: استقبال وتنفيذ الوظائف في خيط العامل]

نستدعي أولًا lock على receiver للحصول على mutex، ونستدعي unwrap ليهلع على أي خطأ. قد يفشل الحصول على قفل إذا كان mutex في حالة مسمومة poisoned، والتي تحصل إذا هلع أحد الخيوط عند احتفاظه بالقفل بدلًا من ترك القفل، وسيكون استدعاء unwrap في هذه الحالة هو العمل الأفضل. غيّر unwrap إلى expect على راحتك لتظهر رسالة خطأ ذات معنى.

إذا حصلنا على القفل على mutex، نستدعي recv لاستقبال Job من القناة. يتخطى استدعاء unwrap الأخير أي أخطاء أيضًا والتي ربما قد تحصل إذا اُغلق، على نحوٍ مشابه لكيفية إعادة Err من قِبل تابع send إذا أُغلق المستقبل.

إذا لم توجد أي وظيفة في استدعاء كتل recv، سينتظر الخيط حتى تتوفر وظيفة. يضمن Mutex<T>‎ أن يكون هناك خيط Worker واحد يطلب وظيفة.

يعمل مجمع الخيط الآن، جرب cargo run وأرسل بعض الطلبات.

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
warning: field is never read: `workers`
 --> src/lib.rs:7:5
  |
7 |     workers: Vec<Worker>,
  |     ^^^^^^^^^^^^^^^^^^^^
  |
  = note: `#[warn(dead_code)]` on by default

warning: field is never read: `id`
  --> src/lib.rs:48:5
   |
48 |     id: usize,
   |     ^^^^^^^^^

warning: field is never read: `thread`
  --> src/lib.rs:49:5
   |
49 |     thread: thread::JoinHandle<()>,
   |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

warning: `hello` (lib) generated 3 warnings
    Finished dev [unoptimized + debuginfo] target(s) in 1.40s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.
Worker 1 got a job; executing.
Worker 3 got a job; executing.
Worker 0 got a job; executing.
Worker 2 got a job; executing.

لقد نجحنا، ولدينا الآن مجمع خيط ينفذ الاتصالات على نحوٍ غير متزامن. لا يُنشئ أكثر من أربعة خيوط حتى لا يُحمَل النظام بصورةٍ زائدة إذا استقبل الخادم طلبات كثيرة. إذا أرسلنا طلبًا إلى ‎/‎sleep سيكون الخادم قادرًا على خدمة طلبات أُخرى بجعل خيط آخر ينفذهم.

ملاحظة: إذا فتحنا ‎/sleep في نوافذ متعددة في المتصفح بنفس الوقت، ستُحمل واحدةٌ تلو الأُخرى بفواصل زمنية مدتها 5 ثواني لأن بعض المتصفحات تنفذ النسخ المتعددة لنفس الطلب بالترتيب لأسباب التخزين المؤقت. ليس الخادم هو سبب هذا التقصير.

بعد أن تعلمنا عن حلقة while let في المقال الأنماط Patterns واستخداماتها وقابليتها للدحض Refutability في لغة رست، ربما تتساءل لماذا لم نكتب شيفرة الخيط العامل كما في الشيفرة 21.

  • اسم الملف: src/lib.rs
// --snip--

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || {
            while let Ok(job) = receiver.lock().unwrap().recv() {
                println!("Worker {id} got a job; executing.");

                job();
            }
        });

        Worker { id, thread }
    }
}

[الشيفرة 21: طريقة تنفيذ مختلفة لدالة Worker::new باستخدام while let]

تُصرّف الشيفرة وتُنفذ ولكن لا تعطي نتيجة عمل الخيوط المرغوبة، إذ يسبب الطلب البطيء انتظار باقي الطلبات لتُعالج، والسبب بسيطٌ إلى حد ما؛ فليس لدى هيكل Mutex دالة unlock عامة لأن ملكية القفل مبينةٌ على دورة حياة MutexGuard<T>‎ داخل LockResult<MutexGuard<T>>‎ التي يعيدها التابع lock. يطبق متحقق الاستعارة قاعدة أن المورد المحمي بهيكل Mutex لا يمكن الوصول له إلا إذا احتفظنا بالقفل وقت التصريف، ولكن بهذا التنفيذ يمكن أن يبقى القفل مُحتفظًا به أكثر من اللازم إذا لم نكن منتبهين إلى دورة حياة MutexGuard<T>‎.

تعمل الشيفرة في الشيفرة 20 التي تستخدم let job = receiver.lock().unwrap().recv().unwrap();‎ إذ تُسقط أي قيمة مؤقتة مُستخدمة في التعبير على الطرف اليمين من إشارة المساواة "=" مع letعندما تنتهي تعليمة let، ولكن لا تُسقط while let (وأيضًا if let و match) القيم المؤقتة حتى نهاية الكتلة المرتبطة بها. يبقى القفل مُحتفظًا به حتى نهاية فترة استدعاء job()‎ يعني أن العمال الباقين لا يمكن أن يستقبلوا وظائف.

ترجمة -وبتصرف- لقسم من الفصل Final Project: Building a Multithreaded Web Server من كتاب The Rust Programming Language.

اقرأ أيضًا


تفاعل الأعضاء

أفضل التعليقات

لا توجد أية تعليقات بعد



انضم إلى النقاش

يمكنك أن تنشر الآن وتسجل لاحقًا. إذا كان لديك حساب، فسجل الدخول الآن لتنشر باسم حسابك.

زائر
أضف تعليق

×   لقد أضفت محتوى بخط أو تنسيق مختلف.   Restore formatting

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   جرى استعادة المحتوى السابق..   امسح المحرر

×   You cannot paste images directly. Upload or insert images from URL.


×
×
  • أضف...