اذهب إلى المحتوى

بدأنا في مقال الجزء الأول ببناء مشروع عملي بلغة رست وهو عبارة عن خادم ويب متعدد مهام المعالجة، إذ بنينا الخادم الأساسي وكان أحادي خيط المعالجة، وعملنا في مقال الجزء الثاني على تحويله إلى خادم متعدد خيوط المعالجة، وسننهي في هذا المقال بناء الخادم ليصبح جاهزًا، فإذا لم تكن قرأت المقالات السابقة، فارجع لها قبل قراءة هذا المقال.

الإغلاق الرشيق وتحرير الذاكرة

تستجيب الشيفرة 20 للطلبات بصورةٍ غير متزامنة عبر استخدام مجمع خيط كما نريد، إذ نحصل على بعض التحذيرات من حقول workers و id و thread التي لن نستخدمها مباشرةً وتذكرنا أننا لم نحرر أي شيء من الذاكرة. عندما نستخدم الحل البدائي الذي هو استخدام مفتاحي "ctrl-c" لإيقاف الخيط الرئيسي، تتوقف الخيوط مباشرةً حتى لو كانوا يخدّمون طلبًا.

سننفّذ سمة Drop لاستدعاء join على كل خيط في المجمع لكي ننهي الطلبات التي تعمل قبل الإغلاق، ثم سننفّذ طريقةً لإخبار الخيوط ألا تقبل طلبات جديدة قبل الإغلاق. لرؤية عمل هذا الكود سنعدّل الخادم ليقبل طلبين فقط قبل أن يغلق مجمع الخيط thread pool.

تنفيذ سمة Drop على مجمع خيط

لنبدأ بتنفيذ Drop على مجمع الخيط الخاص بنا. عندما يُسقط المجمع يجب أن تجتمع كل الخيوط للتأكد من أن عملهم قد انتهى. تظهر الشيفرة 22 المحاولة الأولى لتطبيق Drop، إذ لن تعمل الشيفرة حاليًا.

  • اسم الملف: src/lib.rs
impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            worker.thread.join().unwrap();
        }
    }
}

[الشيفرة 22: ضم كل خيط عندما يخرج المجمع خارج النطاق]

أولًا، نمرّ على كل workers في مجمع الخيط، واستُخدمت ‎&mut هنا لأن self هو مرجع متغيّر، ونريد أيضًا تغيير worker. نطبع لكل عامل رسالةً تقول أن هذا العامل سيُغلق، ثم نستدعي join على خيط العمال. إذا فشل استدعاء join نستخدم unwrap لجعل رست تهلع وتذهب إلى إغلاق غير رشيق.

سنحصل على هذا الخطأ عند تصريف هذه الشيفرة:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference
  --> src/lib.rs:52:13
   |
52 |             worker.thread.join().unwrap();
   |             ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call
   |             |
   |             move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait
   |
note: this function takes ownership of the receiver `self`, which moves `worker.thread`
  --> /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/thread/mod.rs:1581:17

For more information about this error, try `rustc --explain E0507`.
error: could not compile `hello` due to previous error

يوضّح الخطأ أننا لا يمكن أن نستدعي join لأنه لدينا استعارة متغيرة على كل worker وتأخذ join ملكية وسطائها، ولمعالجة هذه المشكلة نحن بحاجة لنقل الخيط خارج نسخة Worker التي تملك thread حتى تستطيع join استهلاك الخيط، وقد فعلنا ذلك في الشيفرة 15 من المقال تنفيذ نمط تصميمي Design Pattern كائني التوجه Object-Oriented في لغة رست. إذا احتفظ Worker بـ Option<thread::JoinHandle<()>>‎، يمكننا استدعاء تابع take على Option لنقل القيمة خارج المتغاير Some وإبقاء المتغاير None في مكانه، بمعنى آخر سيحتوي Worker عامل على متغاير Some في Thread الخاص به وعندما نريد تحرير ذاكرة Worker نستبدل Some بالقيمة None حتى لا يوجد لدى Worker أي خيط لينفذه.

لذا نحن نعرف أننا نريد تحديث تعريف Worker على النحو التالي.

  • اسم الملف: src/lib.rs
struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

الآن لنتابع المصرّف لنجد أية أماكن أُخرى تحتاج تغيير، وبالتحقق من الشيفرة نجد خطأين:

$ cargo check
    Checking hello v0.1.0 (file:///projects/hello)
error[E0599]: no method named `join` found for enum `Option` in the current scope
  --> src/lib.rs:52:27
   |
52 |             worker.thread.join().unwrap();
   |                           ^^^^ method not found in `Option<JoinHandle<()>>`
   |
note: the method `join` exists on the type `JoinHandle<()>`
  --> /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/thread/mod.rs:1581:5
help: consider using `Option::expect` to unwrap the `JoinHandle<()>` value, panicking if the value is an `Option::None`
   |
52 |             worker.thread.expect("REASON").join().unwrap();
   |                          +++++++++++++++++

error[E0308]: mismatched types
  --> src/lib.rs:72:22
   |
72 |         Worker { id, thread }
   |                      ^^^^^^ expected enum `Option`, found struct `JoinHandle`
   |
   = note: expected enum `Option<JoinHandle<()>>`
            found struct `JoinHandle<_>`
help: try wrapping the expression in `Some`
   |
72 |         Worker { id, thread: Some(thread) }
   |                      +++++++++++++      +

Some errors have detailed explanations: E0308, E0599.
For more information about an error, try `rustc --explain E0308`.
error: could not compile `hello` due to 2 previous errors

لنعالج الخطأ الثاني الذي يشير إلى الشيفرة في نهاية Worker::new، إذ نريد تغليف قيمة thread في Some عندما ننشئ Worker جديد. أجرِ الخطوات التالية لتصحيح هذا الخطأ:

  • اسم الملف: src/lib.rs
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        // --snip--

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

الخطأ الأول هو في تنفيذ Drop، وذكرنا سابقًا أننا أردنا استدعاء take على قيمة Option لنقل thread خارج worker. أجرِ التغييرات التالية لتصحيح هذا الخطأ:

  • اسم الملف: src/lib.rs
impl Drop for ThreadPool {
    fn drop(&mut self) {
        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

كما تحدثنا سابقًا في المقال البرمجة كائنية التوجه OOP في لغة رست، يأخذ التابع take على Option المتغاير Some خارجًا ويبقي None بدلًا عنه. استخدمنا if let لتفكيك Some والحصول على الخيط، ثم استدعينا join على الخيط. إذا كان خيط العامل هو أساسًا None نعرف أن العامل قد حرًر ذاكرته ولا يحصل شيء في هذه الحالة.

الإشارة للخيط ليتوقف عن الاستماع إلى الوظائف

تُصرّف الشيفرة بدون تحذيرات بعد كل التغييرات التي أجريناها، ولكن الخبر السيء أنها لا تعمل كما أردنا. النقطة المهمة هي في منطق المغلفات المنفذة بواسطة خيوط نسخ Worker، إذ نستدعي حتى اللحظة join لكن لا تُغلق الخيوط لأننها تعمل في loop للأبد بحثًا عن وظائف. إذا أسقطنا Threadpool بتنفيذنا الحالي للسمة drop، سيُمنع الخيط الأساسي للأبد بانتظار الخيط الأول حتى ينتهي، ولحل هذه المشكلة نحتاج لتغيير تنفيذ drop في ThreadPool، ثم إجراء تغيير في حلقة Worker.

أولًا، سنغير تنفيذ drop في ThreadPool ليسقِط صراحةً sender قبل انتظار الخيوط لتنتهي. تظهر الشيفرة 23 التغييرات في ThreadPool لتسقط صراحةً sender. استخدمنا نفس تقنياتOption و take كما فعلنا مع الخيط لكي يستطيع نقل sender خارج ThreadPool.

  • اسم الملف: src/lib.rs:
pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}
// --snip--
impl ThreadPool {
    pub fn new(size: usize) -> ThreadPool {
        // --snip--

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

[الشيفرة 23: إسقاط sender صراحةً قبل جمع الخيوط الفعالة]

يغلق إسقاط sender القناة، وهذا يشير بدوره إلى عدم إرسال أي رسائل إضافية، وعندما نفعل ذلك تعيد كل الاستدعاءات إلى recv التي تجريها الخيوط الفعالة في الحلقة اللانهائية خطأً. نغير حلقة Worker في الشيفرة 24 لتخرج من الحلقة برشاقة في تلك الحالة، يعني أن الخيوط ستنتهي عندما يستدعي join عليهم في تنفيذ drop في ThreadPool.

  • اسم الملف: src/lib.rs
impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!("Worker {id} got a job; executing.");

                    job();
                }
                Err(_) => {
                    println!("Worker {id} disconnected; shutting down.");
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

[الشيفرة 24: الخروج صراحةً من الحلقة عندما تعيد recv خطأ]

لرؤية عمل هذه الشيفرة: سنعدل main لتقبل فقط طلبين قبل أن تُغلق الخادم برشاقة كما تظهر الشيفرة 25.

  • اسم الملف: src/main.rs
fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

[الشيفرة 25: إغلاق الخادم بعد خدمة طلبين عن طريق الخروج من الحلقة]

لا نريد أن يتوقف خادم حقيقي بعد خدمة طلبين فقط، وتبين هذه الشيفرة أن الإغلاق الرشيق وتحرير الذاكرة يعملان بصورةٍ نظامية.

تُعرّف دالة take في سمة Iterator وتحدد التكرار إلى أول عنصرين بالحد الأقصى. سيخرج ThreadPool خارج النطاق في نهاية main وستُطبَّق سمة drop.

شغّل الخادم باستخدام cargo run وأرسل ثلاثة طلبات. سيعطي الطلب الثالث خطأ وسترى الخرج في الطرفية على النحو التالي:

$ cargo run
   Compiling hello v0.1.0 (file:///projects/hello)
    Finished dev [unoptimized + debuginfo] target(s) in 1.0s
     Running `target/debug/hello`
Worker 0 got a job; executing.
Shutting down.
Shutting down worker 0
Worker 3 got a job; executing.
Worker 1 disconnected; shutting down.
Worker 2 disconnected; shutting down.
Worker 3 disconnected; shutting down.
Worker 0 disconnected; shutting down.
Shutting down worker 1
Shutting down worker 2
Shutting down worker 3

يمكن أن ترى ترتيبًا مختلفًا للخيوط الفعالة والرسائل المطبوعة. تعمل الشيفرة وفقًا لهذه الرسائل كما يلي: أخذ العاملان 0 و 3 الطلبين الأولين وتوقف الخادم عن قبول الاتصالات بعد ثاني اتصال، وبدأ تنفيذ Drop في العمل على ThreadPool قبل أخذ العامل 3 وظيفته. يفصل إسقاط sender كل العمال ويخبرهم أن يُغلقوا، ويطبع كل عامل رسالةً عندما يُغلقوا ويستدعي مجمع الخيط join لانتظار كل خيط عامل لينتهي.

لاحظ ميّزة مهمة في هذا التنفيذ، إذ قام ThreadPool بإسقاط sender وجرّبنا ضم العامل 0 قبل أن يستقبل أي عامل خطأ. لم يتلق العامل 0 أي خطأ من recv بعد، لذا تنتظر كتلة الخيط الأساسية أن ينتهي العامل 0. في تلك الأثناء استقبل العامل 3 وظيفة ثم استقبلت كل الخيوط خطأ. ينتظر الخيط الأساسي باقي العمال لينتهوا عندما ينتهي العامل 0. وبحلول هذه النقطة يخرج كل عامل من حلقته ويتوقف.

تهانينا، فقد أنهينا المشروع ولدينا الآن خادم ويب بسيط يستخدم مجمع خيط للاستجابة بصورةٍ غير متزامنة، ونستطيع إجراء إغلاق رشيق للخادم الذي يحرر من الذاكرة كل الخيوط في المجمع.

هذه هي الشيفرة الكاملة بمثابة مرجع.

  • اسم الملف: src/main.rs
use hello::ThreadPool;
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
use std::thread;
use std::time::Duration;

fn main() {
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
    let pool = ThreadPool::new(4);

    for stream in listener.incoming().take(2) {
        let stream = stream.unwrap();

        pool.execute(|| {
            handle_connection(stream);
        });
    }

    println!("Shutting down.");
}

fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK", "hello.html")
    } else if buffer.starts_with(sleep) {
        thread::sleep(Duration::from_secs(5));
        ("HTTP/1.1 200 OK", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND", "404.html")
    };

    let contents = fs::read_to_string(filename).unwrap();

    let response = format!(
        "{}\r\nContent-Length: {}\r\n\r\n{}",
        status_line,
        contents.len(),
        contents
    );

    stream.write_all(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}
  • اسم الملف: src/lib.rs
use std::{
    sync::{mpsc, Arc, Mutex},
    thread,
};

pub struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

type Job = Box<dyn FnOnce() + Send + 'static>;

impl ThreadPool {
    /// Create a new ThreadPool.
    ///
    /// The size is the number of threads in the pool.
    ///
    /// # Panics
    ///
    /// The `new` function will panic if the size is zero.
    pub fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();

        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    pub fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);

        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);

            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!("Worker {id} got a job; executing.");

                    job();
                }
                Err(_) => {
                    println!("Worker {id} disconnected; shutting down.");
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

يمكننا إجراء المزيد إذا أردنا تحسين المشروع، وإليك بعض الأفكار:

  • أضِف المزيد من التوثيق إلى ThreadPool وتوابعه العامة.
  • أضِف بعض الاختبارات لوظيفة المكتبة.
  • غيّر الاستدعاءات من unwrap إلى معالجة خطأ أكثر متانة.
  • استخدم ThreadPool لتنفيذ أعمال غير خدمة طلبات الويب.
  • ابحث عن وحدة مجمع خيط مصرفة على creats.io ونفذ خادم ويب باستخدام الوحدة المصرفة، ثم قارن واجهة برمجة التطبيقات API والمتانة بينها وبين مجمع الخيط الذي نفذناه.

خاتمة

عظيم جدًا! فقد وصلنا إلى نهاية سلسلة البرمجة بلغة رست . نريد أن نشكرك لانضمامك إلينا في هذه الجولة في رست. أنت الآن جاهز لتنفيذ مشاريع رست ومساعدة الآخرين في مشاريعهم. تذكر أنه هناك مجتمع مرحب من مستخدمي رست الذين يحبون المساعدة في أي صعوبة يمكن أن تواجهها في استعمالك رست.

ترجمة -وبتصرف- لقسم من الفصل Final Project: Building a Multithreaded Web Server من كتاب The Rust Programming Language.

اقرأ أيضًا


تفاعل الأعضاء

أفضل التعليقات

لا توجد أية تعليقات بعد



انضم إلى النقاش

يمكنك أن تنشر الآن وتسجل لاحقًا. إذا كان لديك حساب، فسجل الدخول الآن لتنشر باسم حسابك.

زائر
أضف تعليق

×   لقد أضفت محتوى بخط أو تنسيق مختلف.   Restore formatting

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   جرى استعادة المحتوى السابق..   امسح المحرر

×   You cannot paste images directly. Upload or insert images from URL.


×
×
  • أضف...