بدأنا في مقال الجزء الأول ببناء مشروع عملي بلغة رست وهو عبارة عن خادم ويب متعدد مهام المعالجة، إذ بنينا الخادم الأساسي وكان أحادي خيط المعالجة، وعملنا في مقال الجزء الثاني على تحويله إلى خادم متعدد خيوط المعالجة، وسننهي في هذا المقال بناء الخادم ليصبح جاهزًا، فإذا لم تكن قرأت المقالات السابقة، فارجع لها قبل قراءة هذا المقال.
الإغلاق الرشيق وتحرير الذاكرة
تستجيب الشيفرة 20 للطلبات بصورةٍ غير متزامنة عبر استخدام مجمع خيط كما نريد، إذ نحصل على بعض التحذيرات من حقول workers
و id
و thread
التي لن نستخدمها مباشرةً وتذكرنا أننا لم نحرر أي شيء من الذاكرة. عندما نستخدم الحل البدائي الذي هو استخدام مفتاحي "ctrl-c" لإيقاف الخيط الرئيسي، تتوقف الخيوط مباشرةً حتى لو كانوا يخدّمون طلبًا.
سننفّذ سمة Drop
لاستدعاء join
على كل خيط في المجمع لكي ننهي الطلبات التي تعمل قبل الإغلاق، ثم سننفّذ طريقةً لإخبار الخيوط ألا تقبل طلبات جديدة قبل الإغلاق. لرؤية عمل هذا الكود سنعدّل الخادم ليقبل طلبين فقط قبل أن يغلق مجمع الخيط thread pool.
تنفيذ سمة Drop على مجمع خيط
لنبدأ بتنفيذ Drop
على مجمع الخيط الخاص بنا. عندما يُسقط المجمع يجب أن تجتمع كل الخيوط للتأكد من أن عملهم قد انتهى. تظهر الشيفرة 22 المحاولة الأولى لتطبيق Drop
، إذ لن تعمل الشيفرة حاليًا.
- اسم الملف: src/lib.rs
impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); worker.thread.join().unwrap(); } } }
[الشيفرة 22: ضم كل خيط عندما يخرج المجمع خارج النطاق]
أولًا، نمرّ على كل workers
في مجمع الخيط، واستُخدمت &mut
هنا لأن self
هو مرجع متغيّر، ونريد أيضًا تغيير worker
. نطبع لكل عامل رسالةً تقول أن هذا العامل سيُغلق، ثم نستدعي join
على خيط العمال. إذا فشل استدعاء join
نستخدم unwrap
لجعل رست تهلع وتذهب إلى إغلاق غير رشيق.
سنحصل على هذا الخطأ عند تصريف هذه الشيفرة:
$ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0507]: cannot move out of `worker.thread` which is behind a mutable reference --> src/lib.rs:52:13 | 52 | worker.thread.join().unwrap(); | ^^^^^^^^^^^^^ ------ `worker.thread` moved due to this method call | | | move occurs because `worker.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait | note: this function takes ownership of the receiver `self`, which moves `worker.thread` --> /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/thread/mod.rs:1581:17 For more information about this error, try `rustc --explain E0507`. error: could not compile `hello` due to previous error
يوضّح الخطأ أننا لا يمكن أن نستدعي join
لأنه لدينا استعارة متغيرة على كل worker
وتأخذ join
ملكية وسطائها، ولمعالجة هذه المشكلة نحن بحاجة لنقل الخيط خارج نسخة Worker
التي تملك thread
حتى تستطيع join
استهلاك الخيط، وقد فعلنا ذلك في الشيفرة 15 من المقال تنفيذ نمط تصميمي Design Pattern كائني التوجه Object-Oriented في لغة رست. إذا احتفظ Worker
بـ Option<thread::JoinHandle<()>>
، يمكننا استدعاء تابع take
على Option
لنقل القيمة خارج المتغاير Some
وإبقاء المتغاير None
في مكانه، بمعنى آخر سيحتوي Worker
عامل على متغاير Some
في Thread
الخاص به وعندما نريد تحرير ذاكرة Worker
نستبدل Some
بالقيمة None
حتى لا يوجد لدى Worker
أي خيط لينفذه.
لذا نحن نعرف أننا نريد تحديث تعريف Worker
على النحو التالي.
- اسم الملف: src/lib.rs
struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, }
الآن لنتابع المصرّف لنجد أية أماكن أُخرى تحتاج تغيير، وبالتحقق من الشيفرة نجد خطأين:
$ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0599]: no method named `join` found for enum `Option` in the current scope --> src/lib.rs:52:27 | 52 | worker.thread.join().unwrap(); | ^^^^ method not found in `Option<JoinHandle<()>>` | note: the method `join` exists on the type `JoinHandle<()>` --> /rustc/d5a82bbd26e1ad8b7401f6a718a9c57c96905483/library/std/src/thread/mod.rs:1581:5 help: consider using `Option::expect` to unwrap the `JoinHandle<()>` value, panicking if the value is an `Option::None` | 52 | worker.thread.expect("REASON").join().unwrap(); | +++++++++++++++++ error[E0308]: mismatched types --> src/lib.rs:72:22 | 72 | Worker { id, thread } | ^^^^^^ expected enum `Option`, found struct `JoinHandle` | = note: expected enum `Option<JoinHandle<()>>` found struct `JoinHandle<_>` help: try wrapping the expression in `Some` | 72 | Worker { id, thread: Some(thread) } | +++++++++++++ + Some errors have detailed explanations: E0308, E0599. For more information about an error, try `rustc --explain E0308`. error: could not compile `hello` due to 2 previous errors
لنعالج الخطأ الثاني الذي يشير إلى الشيفرة في نهاية Worker::new
، إذ نريد تغليف قيمة thread
في Some
عندما ننشئ Worker
جديد. أجرِ الخطوات التالية لتصحيح هذا الخطأ:
- اسم الملف: src/lib.rs
impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // --snip-- Worker { id, thread: Some(thread), } } }
الخطأ الأول هو في تنفيذ Drop
، وذكرنا سابقًا أننا أردنا استدعاء take
على قيمة Option
لنقل thread
خارج worker
. أجرِ التغييرات التالية لتصحيح هذا الخطأ:
- اسم الملف: src/lib.rs
impl Drop for ThreadPool { fn drop(&mut self) { for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } }
كما تحدثنا سابقًا في المقال البرمجة كائنية التوجه OOP في لغة رست، يأخذ التابع take
على Option
المتغاير Some
خارجًا ويبقي None
بدلًا عنه. استخدمنا if let
لتفكيك Some
والحصول على الخيط، ثم استدعينا join
على الخيط. إذا كان خيط العامل هو أساسًا None
نعرف أن العامل قد حرًر ذاكرته ولا يحصل شيء في هذه الحالة.
الإشارة للخيط ليتوقف عن الاستماع إلى الوظائف
تُصرّف الشيفرة بدون تحذيرات بعد كل التغييرات التي أجريناها، ولكن الخبر السيء أنها لا تعمل كما أردنا. النقطة المهمة هي في منطق المغلفات المنفذة بواسطة خيوط نسخ Worker
، إذ نستدعي حتى اللحظة join
لكن لا تُغلق الخيوط لأننها تعمل في loop
للأبد بحثًا عن وظائف. إذا أسقطنا Threadpool
بتنفيذنا الحالي للسمة drop
، سيُمنع الخيط الأساسي للأبد بانتظار الخيط الأول حتى ينتهي، ولحل هذه المشكلة نحتاج لتغيير تنفيذ drop
في ThreadPool
، ثم إجراء تغيير في حلقة Worker
.
أولًا، سنغير تنفيذ drop
في ThreadPool
ليسقِط صراحةً sender
قبل انتظار الخيوط لتنتهي. تظهر الشيفرة 23 التغييرات في ThreadPool
لتسقط صراحةً sender
. استخدمنا نفس تقنياتOption
و take
كما فعلنا مع الخيط لكي يستطيع نقل sender
خارج ThreadPool
.
- اسم الملف: src/lib.rs:
pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } // --snip-- impl ThreadPool { pub fn new(size: usize) -> ThreadPool { // --snip-- ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } }
[الشيفرة 23: إسقاط sender
صراحةً قبل جمع الخيوط الفعالة]
يغلق إسقاط sender
القناة، وهذا يشير بدوره إلى عدم إرسال أي رسائل إضافية، وعندما نفعل ذلك تعيد كل الاستدعاءات إلى recv
التي تجريها الخيوط الفعالة في الحلقة اللانهائية خطأً. نغير حلقة Worker
في الشيفرة 24 لتخرج من الحلقة برشاقة في تلك الحالة، يعني أن الخيوط ستنتهي عندما يستدعي join
عليهم في تنفيذ drop
في ThreadPool
.
- اسم الملف: src/lib.rs
impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } }); Worker { id, thread: Some(thread), } } }
[الشيفرة 24: الخروج صراحةً من الحلقة عندما تعيد recv
خطأ]
لرؤية عمل هذه الشيفرة: سنعدل main
لتقبل فقط طلبين قبل أن تُغلق الخادم برشاقة كما تظهر الشيفرة 25.
- اسم الملف: src/main.rs
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); }
[الشيفرة 25: إغلاق الخادم بعد خدمة طلبين عن طريق الخروج من الحلقة]
لا نريد أن يتوقف خادم حقيقي بعد خدمة طلبين فقط، وتبين هذه الشيفرة أن الإغلاق الرشيق وتحرير الذاكرة يعملان بصورةٍ نظامية.
تُعرّف دالة take
في سمة Iterator
وتحدد التكرار إلى أول عنصرين بالحد الأقصى. سيخرج ThreadPool
خارج النطاق في نهاية main
وستُطبَّق سمة drop
.
شغّل الخادم باستخدام cargo run
وأرسل ثلاثة طلبات. سيعطي الطلب الثالث خطأ وسترى الخرج في الطرفية على النحو التالي:
$ cargo run Compiling hello v0.1.0 (file:///projects/hello) Finished dev [unoptimized + debuginfo] target(s) in 1.0s Running `target/debug/hello` Worker 0 got a job; executing. Shutting down. Shutting down worker 0 Worker 3 got a job; executing. Worker 1 disconnected; shutting down. Worker 2 disconnected; shutting down. Worker 3 disconnected; shutting down. Worker 0 disconnected; shutting down. Shutting down worker 1 Shutting down worker 2 Shutting down worker 3
يمكن أن ترى ترتيبًا مختلفًا للخيوط الفعالة والرسائل المطبوعة. تعمل الشيفرة وفقًا لهذه الرسائل كما يلي: أخذ العاملان 0 و 3 الطلبين الأولين وتوقف الخادم عن قبول الاتصالات بعد ثاني اتصال، وبدأ تنفيذ Drop
في العمل على ThreadPool
قبل أخذ العامل 3 وظيفته. يفصل إسقاط sender
كل العمال ويخبرهم أن يُغلقوا، ويطبع كل عامل رسالةً عندما يُغلقوا ويستدعي مجمع الخيط join
لانتظار كل خيط عامل لينتهي.
لاحظ ميّزة مهمة في هذا التنفيذ، إذ قام ThreadPool
بإسقاط sender
وجرّبنا ضم العامل 0 قبل أن يستقبل أي عامل خطأ. لم يتلق العامل 0 أي خطأ من recv
بعد، لذا تنتظر كتلة الخيط الأساسية أن ينتهي العامل 0. في تلك الأثناء استقبل العامل 3 وظيفة ثم استقبلت كل الخيوط خطأ. ينتظر الخيط الأساسي باقي العمال لينتهوا عندما ينتهي العامل 0. وبحلول هذه النقطة يخرج كل عامل من حلقته ويتوقف.
تهانينا، فقد أنهينا المشروع ولدينا الآن خادم ويب بسيط يستخدم مجمع خيط للاستجابة بصورةٍ غير متزامنة، ونستطيع إجراء إغلاق رشيق للخادم الذي يحرر من الذاكرة كل الخيوط في المجمع.
هذه هي الشيفرة الكاملة بمثابة مرجع.
- اسم الملف: src/main.rs
use hello::ThreadPool; use std::fs; use std::io::prelude::*; use std::net::TcpListener; use std::net::TcpStream; use std::thread; use std::time::Duration; fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming().take(2) { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } println!("Shutting down."); } fn handle_connection(mut stream: TcpStream) { let mut buffer = [0; 1024]; stream.read(&mut buffer).unwrap(); let get = b"GET / HTTP/1.1\r\n"; let sleep = b"GET /sleep HTTP/1.1\r\n"; let (status_line, filename) = if buffer.starts_with(get) { ("HTTP/1.1 200 OK", "hello.html") } else if buffer.starts_with(sleep) { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } else { ("HTTP/1.1 404 NOT FOUND", "404.html") }; let contents = fs::read_to_string(filename).unwrap(); let response = format!( "{}\r\nContent-Length: {}\r\n\r\n{}", status_line, contents.len(), contents ); stream.write_all(response.as_bytes()).unwrap(); stream.flush().unwrap(); }
- اسم الملف: src/lib.rs
use std::{ sync::{mpsc, Arc, Mutex}, thread, }; pub struct ThreadPool { workers: Vec<Worker>, sender: Option<mpsc::Sender<Job>>, } type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender: Some(sender), } } pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.as_ref().unwrap().send(job).unwrap(); } } impl Drop for ThreadPool { fn drop(&mut self) { drop(self.sender.take()); for worker in &mut self.workers { println!("Shutting down worker {}", worker.id); if let Some(thread) = worker.thread.take() { thread.join().unwrap(); } } } } struct Worker { id: usize, thread: Option<thread::JoinHandle<()>>, } impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let message = receiver.lock().unwrap().recv(); match message { Ok(job) => { println!("Worker {id} got a job; executing."); job(); } Err(_) => { println!("Worker {id} disconnected; shutting down."); break; } } }); Worker { id, thread: Some(thread), } } }
يمكننا إجراء المزيد إذا أردنا تحسين المشروع، وإليك بعض الأفكار:
-
أضِف المزيد من التوثيق إلى
ThreadPool
وتوابعه العامة. - أضِف بعض الاختبارات لوظيفة المكتبة.
-
غيّر الاستدعاءات من
unwrap
إلى معالجة خطأ أكثر متانة. -
استخدم
ThreadPool
لتنفيذ أعمال غير خدمة طلبات الويب. - ابحث عن وحدة مجمع خيط مصرفة على creats.io ونفذ خادم ويب باستخدام الوحدة المصرفة، ثم قارن واجهة برمجة التطبيقات API والمتانة بينها وبين مجمع الخيط الذي نفذناه.
خاتمة
عظيم جدًا! فقد وصلنا إلى نهاية سلسلة البرمجة بلغة رست . نريد أن نشكرك لانضمامك إلينا في هذه الجولة في رست. أنت الآن جاهز لتنفيذ مشاريع رست ومساعدة الآخرين في مشاريعهم. تذكر أنه هناك مجتمع مرحب من مستخدمي رست الذين يحبون المساعدة في أي صعوبة يمكن أن تواجهها في استعمالك رست.
ترجمة -وبتصرف- لقسم من الفصل Final Project: Building a Multithreaded Web Server من كتاب The Rust Programming Language.
أفضل التعليقات
لا توجد أية تعليقات بعد
انضم إلى النقاش
يمكنك أن تنشر الآن وتسجل لاحقًا. إذا كان لديك حساب، فسجل الدخول الآن لتنشر باسم حسابك.