سنكمل في هذا المقال ما تحدثنا عنه في المقال السابق الجزء الأول عملية بناء خادم ويب متعدد مهام المعالجة، فإذا لم تكن قد قرأت المقال السابق، فاقرأه قبل قراءة هذا المقال.
تحويل خادم ويب ذو خيط وحيد إلى خادم متعدد المهام
يعالج الآن خادم الويب كل طلب بدوره، يعني أنه لن يعالج اتصال ثاني حتى ينتهي من معالجة الطلب الأول. سيصبح التنفيذ التسلسلي أقل كفاءةً كلما زادت الطلبات على الخادم؛ فإذا استقبل الخادم طلبًا يتطلب وقتًا طويلًا لمعالجته ستنتظر الطلبات التالية وقتًا أطول حتى ينتهي الطلب الطويل حتى لو كانت الطلبات التالية تُنفذ بسرعة. يجب حل هذه المشكلة ولكن أولًا لنلاحظها أثناء العمل.
محاكاة طلب بطيء في تنفيذ الخادم الحالي
لنلاحظ كيف يؤثر طلب بطيء المعالجة على الطلبات الأخرى المقدمة إلى تنفيذ الخادم الحالي. تنفذ الشيفرة 10 طلب معالجة إلى /sleep بمحاكاة استجابة بطيئة التي تسبب سكون الخادم لخمس ثوانٍ قبل الاستجابة.
- اسم الملف: src/main.rs
use std::{ fs, io::{prelude::*, BufReader}, net::{TcpListener, TcpStream}, thread, time::Duration, }; // --snip-- fn handle_connection(mut stream: TcpStream) { // --snip-- let (status_line, filename) = match &request_line[..] { "GET / HTTP/1.1" => ("HTTP/1.1 200 OK", "hello.html"), "GET /sleep HTTP/1.1" => { thread::sleep(Duration::from_secs(5)); ("HTTP/1.1 200 OK", "hello.html") } _ => ("HTTP/1.1 404 NOT FOUND", "404.html"), }; // --snip-- }
[الشيفرة 10: محاكاة استجابة بطيئة عن طريق سكون الخادم لخمس ثوان]
بدلنا من if
إلى match
إذ لدينا ثلاث حالات. يجب أن نطابق صراحةً مع قطعة من request_line
لمطابقة النمط مع قيم السلسلة النصية المجردة. لا تُسند match
ولا تُحصل تلقائيًا كما تفعل توابع المساواة.
تكون الذراع الأولى هي نفس كتلة if
من الشيفرة 9، وتطابق الذراع الثانية الطلب إلى /sleep ويسكن الخادم لخمس ثوان عندما يُستقبل الطلب قبل تصيير صفحة HTML الناجحة، والذراع الثالثة هي نفس كتلة else
من الشيفرة 9.
يمكن ملاحظة أن الخادم بدائي، لكن تعالج المكاتب الحقيقية طلبات متعددة بطريقة مختصرة أكثر.
شغل الخادم باستخدام cargo run
، ثم افتح نافذتي متصفح واحدة من أجل "/http://127.0.0.1:7878" وأُخرى من أجل "http://127.0.0.1:7878/sleep". إذا أدخلت / URI عدة مرات كما سابقًا سترى أنه يستجيب بسرعة، لكن إذا أدخلت /sleep ومن ثم حمّلت "/" سترى أن "/" ينتظر حتى يسكن sleep
خمس ثوان كاملة قبل أن يُحمّل.
هناك تقنيات متعددة لتفادي التراكم خلف طلب بطيء، والطريقة التي سنتبعها هي مجمع خيط thread pool.
تحسن الإنتاجية باستخدام مجمع خيط
مجمع خيط thread pool هو مجموعة من الخيوط المُنشأة التي تنتظر معالجة مهمة. عندما يستقبل البرنامج مهمةً، يُعيّن واحد من الخيوط في المجمع لأداء المهمة ومعالجتها وتبقى باقي الخيوط في المجمع متاحةً لمعالجة أي مهمة تأتي أثناء معالجة الخيط الأول للمهمة، وعندما ينتهي الخيط من معالجة المهمة يعود إلى مجمع الخيوط الخاملة جاهزًا لمعالجة أي مهمة جديدة. يسمح مجمع خيط معالجة الاتصالات بصورةٍ متزامنة ويزيد إنتاجية الخادم.
سنحدد عدد الخيوط في المجمع برقم صغير لحمايتنا من هجوم حجب الخدمة Denial of Service -أو اختصارًا DoS. إذا طلبنا من البرنامج إنشاء خيط لكل طلب قادم، يمكن لشخص إنشاء 10 مليون طلب مستهلكًا كل الموارد المتاحة وموقفًا معالجة الطلبات نهائيًا.
بدلًا من إنشاء عدد لا نهائي من الخيوط، سننشئ عددًا محددًا من الخيوط في المجمع، وستذهب الطلبات المرسلة إلى المجمع للمعالجة. يحافظ المجمع على ترتيب الطلبات القادمة في رتل، كل خيط يأخذ طلبًا من الرتل يعالجه ثم يطلب من الرتل طلبًا آخر. يمكن معالجة N طلب بهذه الطريقة، إذ تمثّل N عدد الخيوط. إذا كان كل خيط يعالج طلبًا طويل التنفيذ يمكن أن تتراكم الطلبات في الرتل ولكننا بذلك نكون قد زدنا عدد الطلبات طويلة التنفيذ التي يمكن معالجتها قبل أن نصل إلى تلك المرحلة.
هذه إحدى طرق زيادة إنتاجية خادم ويب، ويمكن استكشاف طرق أُخرى مثل نموذج اشتقاق/جمع fork/join أو نموذج الدخل والخرج للخيط الواحد غير المتزامن single-threaded async I/O أو نموذج الدخل والخرج للخيوط المتعددة غير المتزامن multi-threaded async I/O model. يمكنك قراءة وتنفيذ هذه الحلول إذا كنت مهتمًا بهذا الموضوع وكل هذه الخيارات ممكنة مع لغة برمجية ذات مستوى منخفض مثل رست.
قبل البدء بتنفيذ مجمع خيط، لنتحدث كيف يجب أن يكون استخدام المجمع. عند بداية تصميم الشيفرة، تساعدك كتابة واجهة المستخدم في التصميم. اكتب واجهة برمجة التطبيق API للشيفرة بهيكلية تشبه طريقة استدعائها، ثم نفذ الوظيفة داخل الهيكل بدلًا من تنفيذ الوظيفة ثم بناء واجهة برمجة التطبيق العامة.
سنستخدم طريقة تطوير مُقادة بالمصرف compiler-driven، هذه طريقة مشابهة لاستخدامنا التطوير المُقاد بالاختبار test-driven كما فعلنا في مشروع سابق في الفصل 12. سنكتب الشيفرة التي تستدعي الدالة المُرادة، ومن ثم ننظر إلى الأخطاء من المصرّف لتحديد ماذا يجب أن نغير حتى تعمل الشيفرة. يجب الحديث بدايةً عن التقنيات التي لن نستعملها.
إنشاء خيط لكل طلب
لنلاحظ بدايةً كيف ستكون الشيفرة في حال أنشأنا خيطًا لكل طلب. كما ذكرنا سابقًا، لن تكون هذه خطتنا النهائية وذلك لمشكلة إنشاء عدد لا نهائي من الخيوط ولكنها نقطة بداية لإنشاء خادم متعدد الخيوط قادر على العمل، ثم سنضيف مجمع الخيط مثل تحسين وستكون مقارنة الحلين أسهل. تظهر الشيفرة 11 التغييرات لجعل main
تُنشئ خيط جديد لمعالجة كل مجرى داخل حلقة for
.
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); for stream in listener.incoming() { let stream = stream.unwrap(); thread::spawn(|| { handle_connection(stream); }); } }
[الشيفرة 11: إنشاء خيط جديد لكل مجرى]
كما تعلمنا سابقًا في مقال سابق استخدام الخيوط Threads لتنفيذ شيفرات رست بصورة متزامنة آنيًا، يُنشئ thread::spawn
خيطًا جديدًا وينفذ الشيفرة في المُغلف في الخيط الجديد. إذا نفذت الشيفرة وحملت "/sleep" في المتصفح ومن ثم "/" في نافذتي متصفح أُخريين ستلاحظ أن الطلبات إلى "/" لا تنتظر "/sleep" لينتهي ولكن كما ذكرنا سابقًا سيطغى هذا على النظام لأننا ننشئ خيوطًا دون حد.
إنشاء عدد محدد من الخيوط
نريد من مجمع الخيط أن يعمل بطريقة مشابهة ومألوفة حتى لا يحتاج التبديل من الخيوط لمجمع خيط أي تعديلات كبيرة للشيفرة التي تستخدمها واجهة برمجة التطبيق. تظهر الشيفرة 12 واجهة افتراضية لهيكل ThreadPool
الذي نريد استخدامه بدلًا عن thread::spawn
.
- اسم الملف: src/main.rs
fn main() { let listener = TcpListener::bind("127.0.0.1:7878").unwrap(); let pool = ThreadPool::new(4); for stream in listener.incoming() { let stream = stream.unwrap(); pool.execute(|| { handle_connection(stream); }); } }
[الشيفرة 12: واجهة ThreadPool
المثالية]
استخدمنا ThreadPool::new
لإنشاء مجمع خيط جديد بعدد خيوط يمكن تعديله وفي حالتنا أربعة. لدى pool.execute
واجهة مماثلة للدالة thread:spawn
في حلقة for
إذ تأخذ مغلفًا يجب أن ينفذه المجمع لكل مجرى. نحتاج لتنفيذ pool.execute
أن تأخذ مغلفًا وتعطيه لخيط في المجمع لينفذه. لن تُصرّف هذه الشيفرة ولكن سنجربها كي يدلنا المصرف عن كيفية إصلاحها.
إنشاء مجمع خيط باستخدام التطوير المقاد بالمصرف
أجرِ التغييرات في الشيفرة 12 على الملف src/main.rs واستخدم أخطاء المصرّف من cargo check
لقيادة التطوير. هذه أول خطأ نحصل عليه:
$ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0433]: failed to resolve: use of undeclared type `ThreadPool` --> src/main.rs:11:16 | 11 | let pool = ThreadPool::new(4); | ^^^^^^^^^^ use of undeclared type `ThreadPool` For more information about this error, try `rustc --explain E0433`. error: could not compile `hello` due to previous error
عظيم، يبين هذ الخطأ أننا بحاجة إلى نوع أو وحدة ThreadPool
. سيكون تنفيذ Threadpool
الخاص بنا مستقل عن عمل خادم الويب، لذا لنبدّل الوحدة المصرفة hello
من وحدة ثنائية مصرفة إلى وحدة مكتبة مصرفة لاحتواء تنفيذ Threadpool
. يمكننا بعد ذلك استخدام مكتبة مجمع الخيط المنفصلة لفعل أي عمل نريده باستخدام مجمع خيط وليس فقط لطلبات خادم الويب.
أنشئ src/lib.rs الذي يحتوي التالي، وهو أبسط تعريف لهيكل ThreadPool
يمكن الحصول عليه.
- اسم الملف: src/lib.rs
pub struct ThreadPool;
ثم عدل ملف main.rs لجلب ThreadPool
من المكتبة إلى النطاق بإضافة الشيفرة التالية في مقدمة الملف src/main.rs.
- اسم الملف: src/main.rs
use hello::ThreadPool;
لن تعمل هذه الشيفرة ولكن لننظر مجددًا إلى الخطأ التالي الذي نريد معالجته:
$ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0599]: no function or associated item named `new` found for struct `ThreadPool` in the current scope --> src/main.rs:12:28 | 12 | let pool = ThreadPool::new(4); | ^^^ function or associated item not found in `ThreadPool` For more information about this error, try `rustc --explain E0599`. error: could not compile `hello` due to previous error
يشير هذا الخطأ أننا نحتاج إلى انتاج دالة مرتبطة اسمها new
من أجل ThreadPool
. يمكننا معرفة أن new
تحتاج معامل يقبل 4 مثل وسيط ويجب أن يعيد نسخة ThreadPool
. لننفذ أبسط دالة new
التي تحتوي هذه الصفات characteristics.
- اسم الملف: src/lib.rs
pub struct ThreadPool; impl ThreadPool { pub fn new(size: usize) -> ThreadPool { ThreadPool } }
اخترنا نوع usize
للمعامل size
لأننا نعرف أن العدد السالب للطلبات غير منطقي ونعرف أيضًا أننا سنستخدم 4 ليمثّل عدد العناصر في مجموعة الخيوط وهذا هو عمل نوع usize
كما تحدثنا سابقًا في قسم "أنواع الأعداد الصحيحة" في المقال أنواع البيانات Data Types في لغة رست.
لنتحقق من الشيفرة مجددًا:
$ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0599]: no method named `execute` found for struct `ThreadPool` in the current scope --> src/main.rs:17:14 | 17 | pool.execute(|| { | ^^^^^^^ method not found in `ThreadPool` For more information about this error, try `rustc --explain E0599`. error: could not compile `hello` due to previous error
يحدث الخطأ الآن لأنه ليس لدينا تابع execute
على ThreadPool
. تذكر من قسم "إنشاء عدد محدد من الخيوط" أننا قررنا أن مجمع الخيط يجب أن يكون له واجهة تشبه thread::spawn
، وقررنا أيضّأ أننا سننفذ التابع execute
ليأخذ المغلف المعُطى له ويعطيه لخيط خامل في المجمع لينفذه.
سنعرّف تابع execute
على ThreadPool
ليأخذ المغلف مثل معامل. تذكر من القسم "نقل القيم خارج المغلف وسمات Fn
" في المقال المغلفات closures في لغة رست أننا بإمكاننا أخذ المغلفات مثل معاملات باستخدام ثلاث سمات هي Fn
أو FnMut
أو FnOnce
. يجب أن نحدد أي نوع مغلف نريد استخدامه هنا، نحن نعرف أننا سنفعل شيئًا يشابه تنفيذ المكتبة القياسية لدالةthread::spawn
، لذلك دعنا نرى ما هي القيود الموجودة لبصمة thread::spawn
على معاملاتها. تظهر التوثيقات التالي:
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static,
المعامل F
هو الذي يهمنا، والمعامل T
متعلق بالقيمة المُعادة ولسنا مهتمين بها. يمكننا أن نرى أن spawn
تستخدم FnOnce
مثل قيد سمة على F
، وهذا ما نريده أيضًأ لأننا نريد تمرير الوسيط الذي نأخذه في execute
إلى spawn
. يمكننا التأكد أيضًا أن FnOnce
هي السمة المُراد استخدامها لأن خيط تنفيذ الطلب سينفِّذ فقط طلب المغلف مرةً واحدة، والذي يطابق Once
في FnOnce
.
لدى معامل نوع F
أيضًا قيد سمة Send
وقيد دورة حياة static'
المفيدان في حالتنا؛ فنحن بحاجة Send
لنقل المغلف من خيط لآخر، و static'
لأننا لا نعرف الوقت اللازم ليُنفذ الخيط. لننشئ تابع execute
على ThreadPool
التي تأخذ معامل معمم للنوع F
مع هذه القيود.
- اسم الملف: src/lib.rs
impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { } }
استخدمنا ()
بعد FnOnce
لأن FnOnce
تمثل مغلفًا لا يأخذ معاملات ويعيد نوع الوحدة ()
. يمكن إهمال النوع المُعاد من البصمة كما في تعريفات الدالة، ولكن حتى لو لم يوجد أي معاملات نحن بحاجة الأقواس.
هذا هو أبسط تنفيذ لدالة execute
، فهي لا تعمل شيئًا، لكننا فقط بحاجة أن تُصرّف شيفرتنا، لنتحقق منها مجددًا.
$ cargo check Checking hello v0.1.0 (file:///projects/hello) Finished dev [unoptimized + debuginfo] target(s) in 0.24s
إنها تُصرّف، لكن لاحظ إذا جربت cargo run
وأجريت طلبًا في المتصفح، سترى الأخطاء في المتصفح نفسها التي رأيناها في بداية الفصل. لم تستدع المكتبة المغلف المُمرر إلى execute
حتى الآن.
ملاحظة: هناك مقولة عن لغات البرمجة ذات المُصرّفات الحازمة مثل هاسكل Haskell ورست وهي "إذا صُرفت الشيفرة فإنها تعمل" ولكن هذه المقولة ليست صحيحة إجمالًا، إذ يُصرّف مشروعنا، لكنه لا يعمل شيئًا إطلاقًا. إذا كنا نريد إنشاء مشروع حقيقي ومكتمل، الآن هو الوقت المثالي لكتابة وحدات اختبار للتحقق من أن الشيفرة تُصرّف ولها السلوك المرغوب.
التحقق من صحة عدد الخيوط في new
لن نغيّر شيئًا للمعاملين new
و parameter
. لننفذ متن الدوال بالسلوك الذي نريده، ولنبدأ بالدالة new
. اخترنا سابقًا نوع غير مؤشر للمعامل size
لأن مجمع بعدد خيوط سلبي هو غير منطقي، ولكن مجمع بعدد خيوط صفر ليس منطقيًا أيضًا ولكن unsize
صالح. سنضيف الشيفرة التي تتحقق من أن size
أكبر من الصفر قبل إعادة نسخة من ThreadPool
وجعل البرنامج يهلع إذا حصل على قيمة صفر باستخدام ماكرو assert!
كما في الشيفرة 13.
- اسم الملف: src/lib.rs
impl ThreadPool { /// Create a new ThreadPool. /// /// The size is the number of threads in the pool. /// /// # Panics /// /// The `new` function will panic if the size is zero. pub fn new(size: usize) -> ThreadPool { assert!(size > 0); ThreadPool } // --snip-- }
[الشيفرة 13: تنفيذ Threadpool:new
ليهلع إذا كان size
صفر]
أضفنا بعض التوثيق إلى ThreadPool
باستخدام تعليقات doc. لاحظ أننا اتبعنا خطوات التوثيق الجيدة بإضافة قسم يستدعي الحالات التي يمكن للدالة أن تهلع فيها كما تحدثنا في الفصل 14. جرب تنفيذ cargo run --open
واضغط على هيكل ThreadPool
لرؤية كيف تبدو المستندات المُنشأة للدالة new
.
يمكننا تغيير new
إلى build
بدلًا من إضافة ماكرو assert!
، ونعيد Result
كما فعلنا في Config::build
في مشروع الدخل والخرج في الشيفرة 9 في المقال كتابة برنامج سطر أوامر بلغة رست: إعادة بناء التعليمات البرمجية لتحسين النمطية Modularity والتعامل مع الأخطاء، لكننا قررنا في حالتنا أن إنشاء مجمع خيط بدون أي خيوط هو خطأ لا يمكن استرداده. إذا كنت طموحًا جرب كتابة دالة اسمها build
مع البصمة التالية لمقارنته مع الدالة new
.
pub fn build(size: usize) -> Result<ThreadPool, PoolCreationError> {
إنشاء مساحة لتخزين الخيوط
لدينا الآن طريقة لمعرفة أنه لدينا عدد صالح من الخيوط لتخزينها في المجمع، إذ يمكننا إنشاء هذه الخيوط وتخزينها في هيكل ThreadPool
قبل إرجاعها إلى الهيكل، ولكن كيف نخزن الخيوط؟ لنلاحظ بصمة thread::spawn
.
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static,
يُعاد JoinHandle<T>
من الدالة spawn
، إذ تمثّل T
النوع الذي يعيده المغلف. لنستعمل JoinHandle
أيضًا لنرى ما سيحدث، إذ سيعالج المغلف الذي نمرره إلى مجمع الخيط الاتصال ولا يعيد أي شيء لذا T
ستكون نوع وحدة ()
.
ستُصرّف الشيفرة في الشيفرة 14 ولكن لا تُنشئ أي خيوط. غيّرنا تعريف ThreadPool
لتحتوي شعاعًا من نسخة thread::JoinHandle<()>
وهيأنا الشعاع بسعة size
وضبطنا حلقة for
التي تعيد بعض الشيفرة لإنشاء الخيوط وتعيد نسخة ThreadPool
تحتويهم.
- اسم الملف: src/main.rs
use std::thread; pub struct ThreadPool { threads: Vec<thread::JoinHandle<()>>, } impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut threads = Vec::with_capacity(size); for _ in 0..size { // create some threads and store them in the vector } ThreadPool { threads } } // --snip-- }
[الشيفرة 14: إنشاء شعاع للهيكل ThreadPool
الذي يحتوي الخيوط]
جلبنا std::thread
إلى النطاق في وحدة المكتبة المصرفة لأننا نستخدم Thread::JoinHandle
بمثابة نوع العنصر في الشعاع في ThreadPool
. تُنشئ ThreadPool
شعاعًا جديدًا يحتوي عناصر size
عندما يُستقبل حجم صالح.
تعمل الدالة with_capacity
نفس مهام Vec::new
ولكن بفرق مهم هو أنها تحجز مسبقًا المساحة في الشعاع لأننا نريد تخزين عناصر size
في الشعاع. إجراء هذا الحجز مسبقًا هو أكثر كفاءة من استخدام Vec::new
الذي يغير حجمه كلما اُضيفت عناصر.
عندما تنفذ cargo check
مجدّدًا ينبغي أن تنجح.
هيكل عامل Worker Struct مسؤول عن ارسال شيفرة من مجمع الخيط إلى خيط
تركنا تعليقًا في حلقة for
متعلق بإنشاء الخيوط في الشيفرة 14. سننظر هنا إلى كيفية إنشاء الخيوط حقيقةً، إذ تؤمن المكتبة القياسية thread::spawn
بمثابة طريقة لإنشاء الخيوط ويتوقع thread::spawn
الحصول على بعض الشيفرة لكي ينفذها الخيط بعد إنشائه فورًا، ولكن نريد في حالتنا إنشاء خيوط وجعلهم ينتظرون شيفرةً سنرسلها لاحقًا. لا يقدم تنفيذ المكتبة القياسية للخيوط أي طريقة لعمل ذلك، إذ يجب تنفيذها يدويًا.
سننفذ هذا السلوك عن طريق إضافة هيكلية بيانات جديدة بين ThreadPool
والخيوط التي ستدير هذه السلوك الجديد، وسندعو هيكل البيانات هذا "العامل Worker" وهو مصطلح عام في تنفيذات مجمّع الخيوط. يأخذ العامل الشيفرة التي بحاجة لتنفيذ وينفّذها في خيط العامل. فكر كيف يعمل الناس في مطبخ المطعم، إذ ينتظر العاملون طلبات الزبائن ويكونوا مسؤولين عن أخذ هذه الطلبات وتنفيذها.
بدلًا من تخزين شعاع نسخة JoinHandle<()>
في مجمع الخيط، نخزن نسخًا من هيكل Worker
. يخزن كل Worker
نسخة JoinHandle<()>
واحدة، ثم ننفذ تابع على Worker
الذي يأخذ مغلف شيفرة لينفذه ويرسله إلى خيط يعمل حاليًا لينفذه. سنعطي كل عامل رقمًا معرّفًا id
للتمييز بين العمال المختلفين في المجمع عند التسجيل أو تنقيح الأخطاء.
هكذا ستكون العملية الجديدة عند إنشاء ThreadPool
. سننفذ الشيفرة التي ترسل المغلف إلى الخيط بعد إعداد Worker
بهذه الطريقة:
-
عرّف هيكل
Worker
الذي يحتويid
وJoinHandle<()>
. -
عدّل
ThreadPool
لتحتوي شعاع من نسخWorker
. -
عرّف دالة
Worker::new
التي تأخذ رقمid
وتعيد نسخةWorker
التي تحتويid
وخيط مُنشأ بمغلف فارغ. -
استخدم عداد حلقة
for
لإنشاءid
وإنشاءWorker
جديد مع ذلك الرقمid
وخرن العامل في الشعاع.
إذا كنت جاهزًا للتحدي، جرّب تنفيذ هذه التغييرات بنفسك قبل النظر إلى الشيفرة في الشيفرة 15.
جاهز؟ يوجد في الشيفرة 15 إحدى طرق عمل التعديلات السابقة.
- اسم الملف:src/lib.rs
use std::thread; pub struct ThreadPool { workers: Vec<Worker>, } impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers } } // --snip-- } struct Worker { id: usize, thread: thread::JoinHandle<()>, } impl Worker { fn new(id: usize) -> Worker { let thread = thread::spawn(|| {}); Worker { id, thread } } }
[الشيفرة 15: تعديل ThreadPool
بحيث تحتوي نسخة Worker
بدلًا من احتواء الخيط مباشرةً]
عدّلنا اسم حقل ThreadPool
من threads
إلى workers
لأنه يحتوي نسخ Worker
بدلًا من نسخ JoinHandle<()>
. استخدمنا العداد في حلقة for
مثل وسيط لدالة Worker::new
وخزّنا كل Worker
جديد في شعاع اسمه workers
.
لا تحتاج الشيفرة الخارجية (كما في الخادم في src/main.rs) أن تعرف تفاصيل التنفيذ بما يتعلق باستخدام هيكل Worker
داخل ThreadPool
، لذا نجعل كل من هيكل Worker
ودالة new
خاصين private. تستخدم الدالة Worker::new
المعرّف id
المُعطى وتخزن نسخة JoinHandle<()>
المُنشأة عن طريق إنشاء خيط جديد باستخدام مغلف فارغ.
ملاحظة: سيهلع thread::spawn
إذا كان نظام التشغيل لا يستطيع إنشاء خيط بسبب عدم توفر موارد كافية، وسيؤدي هذا إلى هلع كامل الخادم حتى لو كان إنشاء بعض الخيوط ممكنًا. للتبسيط يمكن قبول هذا السلوك ولكن في تنفيذ مجمع خيط مُنتج ينبغي استخدام std::thread::Builder
ودالة spawn
الخاصة به التي تعيد Result
.
تُصرّف هذه الشيفرة وتخزن عددًا من نسخ Worker
الذي حددناه مثل وسيط إلى ThreadPool::new
. لكننا لم نعالج المغلف الذي نحصل عليه في execute
. لنتعرف على كيفية عمل ذلك تاليًا.
إرسال طلبات إلى الخيوط عن طريق القنوات
المشكلة التالية التي سنتعامل معها هي أن المغلفات المُعطاة إلى thread::spawn
لا تفعل شيئًا إطلاقًا، وسنحصل حاليًا على المغلف الذي نريد تنفيذه في تابع execute
، لكن نحن بحاجة لإعطاء thread::spawn
مغلفًا لينفذه عندما ننشئ كل Worker
خلال إنشاء ThreadPool
.
نريد تشغيل هياكل Worker
التي أنشأناها للبحث عن شيفرة من الرتل في ThreadPool
وأن ترسل تلك الشيفرة إلى خيطها لينفّذها.
ستكون القنوات التي تعلمناها سابقًا في المقال استخدام ميزة تمرير الرسائل Message Passing لنقل البيانات بين الخيوط Threads في لغة رست -والتي تُعد طريقة بسيطة للتواصل بين خيطين- طريقةً ممتازةً لحالتنا، إذ سنستعمل قناةً لتعمل مثل رتل للوظائف، وترسل execute
وظيفة من ThreadPool
إلى نسخة Worker
التي ترسل بدورها الوظيفة إلى خيطها. ستكون الخطة على النحو التالي:
-
يُنشئ
ThreadPool
قناة ويحتفظ بالمرسل. -
يحتفظ كل
Worker
بالمستقبل. -
ننشئ هيكل
Job
جديد يحتفظ بالمغلف الذي نريد إرساله عبر القناة. -
يرسل تابع
execute
الوظيفة المراد تنفيذها عبر المرسل. -
سيتكرر مرور
Worker
على المستقبل وينفذ المغلف لأي وظيفة يستقبلها في الخيط.
لنحاول إنشاء قناة في ThreadPool::new
والاحتفاظ بالمرسل في نسخة ThreadPool
كما في الشيفرة 16. لا يحتوي هيكل Job
أي شيء الآن، لكنه سيكون نوع العنصر المُرسل عبر القناة.
- اسم الملف: src/lib.rs
use std::{sync::mpsc, thread}; pub struct ThreadPool { workers: Vec<Worker>, sender: mpsc::Sender<Job>, } struct Job; impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id)); } ThreadPool { workers, sender } } // --snip-- }
[الشيفرة 16: تعديل ThreadPool
لتخزين مرسل القناة التي ترسل نسخ Job
]
أنشأنا القناة الجديدة في ThreadPool:new
وجعلنا المجمع يحتفظ بالمرسل. ستصرّف هذه الشيفرة بنجاح.
لنجرب تمرير مستقبل القناة إلى كل عامل عندما ينشئ مجمع الخيط القناة. نعرف أننا نريد استخدام المستقبل في الخيط الذي أنشأه العامل، لذا سنشير إلى معامل receiver
في المغلف بمرجع reference. لن تُصرّف الشيفرة 17.
- اسم الملف: src/lib.rs
impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, receiver)); } ThreadPool { workers, sender } } // --snip-- } // --snip-- impl Worker { fn new(id: usize, receiver: mpsc::Receiver<Job>) -> Worker { let thread = thread::spawn(|| { receiver; }); Worker { id, thread } } }
[الشيفرة 17: تمرير المستقبل إلى العمال workers]
أجرينا بعض التغييرات الصغيرة والمباشرة، إذ مررنا المستقبل إلى Worker::new
واستخدمناه داخل المغلف.
عندما نتحقق من الشيفرة سنحصل على هذا الخطأ:
$ cargo check Checking hello v0.1.0 (file:///projects/hello) error[E0382]: use of moved value: `receiver` --> src/lib.rs:26:42 | 21 | let (sender, receiver) = mpsc::channel(); | -------- move occurs because `receiver` has type `std::sync::mpsc::Receiver<Job>`, which does not implement the `Copy` trait ... 26 | workers.push(Worker::new(id, receiver)); | ^^^^^^^^ value moved here, in previous iteration of loop For more information about this error, try `rustc --explain E0382`. error: could not compile `hello` due to previous error
تحاول الشيفرة تمرير receiver
لنسخ متعددة من Worker
ولكن هذا لن يعمل كما تتذكر سابقًا من المقال استخدام ميزة تمرير الرسائل Message Passing لنقل البيانات بين الخيوط Threads في لغة رست، إذ أن تنفيذ القناة المقدم من رست هو مُنتجين producer متعددين ومستهلك consumer واحد، وهذا يعني أنه لا يمكن نسخ الطرف المستهلك من القناة لإصلاح هذا الخطأ ولا نريد أيضًا إرسال رسائل متعددة لمستهلكين متعددين، بل نحتاج قائمة رسائل واحدة مع عمال متعددين لكي تعالج كل رسالة مرةً واحدةً فقط. إضافةً إلى ذلك، يتطلب أخذ وظيفة من رتل القناة تغيير receiver
، لذا تحتاج الخيوط طريقةً آمنةً لتشارك وتعدل receiver
، وإلا نحصل على حالات سباق (كما تحدثنا في الفصل السابق المشار إليه).
تذكر المؤشرات الذكية الآمنة للخيوط التي تحدثنا عنها سابقًا في المقال تزامن الحالة المشتركة Shared-State Concurrency في لغة رست وتوسيع التزامن مع Send و Sync؛ فنحن بحاجة لاستخدام Arc<Mutex<T>>
لمشاركة الملكية لعدد من الخيوط والسماح للخيوط بتغيير القيمة. يسمح نوع Arc
لعدد من العمال من مُلك المستقبل وتضمن Mutex
حصول عامل واحد على الوظيفة من المستقبل. تظهر الشيفرة 18 التغييرات التي يجب عملها.
- اسم الملف: src/lib.rs
use std::{ sync::{mpsc, Arc, Mutex}, thread, }; // --snip-- impl ThreadPool { // --snip-- pub fn new(size: usize) -> ThreadPool { assert!(size > 0); let (sender, receiver) = mpsc::channel(); let receiver = Arc::new(Mutex::new(receiver)); let mut workers = Vec::with_capacity(size); for id in 0..size { workers.push(Worker::new(id, Arc::clone(&receiver))); } ThreadPool { workers, sender } } // --snip-- } // --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { // --snip-- } }
[الشيفرة 18: مشاركة المستقبل بين العمال باستخدام Arc
و Mutex
]
نضع المستقبل فيThreadPool::new
في Arc
و Mutex
، وننسخ Arc
لكل عامل لتزيد عدّ المرجع ليستطيع العمال مشاركة ملكية المستقبل.
تُصرّف الشيفرة بنجاح مع هذه التغييرات، لقد اقتربنا من تحقيق هدفنا.
تنفيذ تابع التنفيذ execute
لننفذ أخيرًا تابع execute
على ThreadPool
، إذ سغيّر أيضًا Job
من هيكل إلى نوع اسم بديل لكائن السمة الذي يحتوي نوع المغلف الذي يستقبله execute
. كما تحدثنا في قسم "إنشاء مرادفات للنوع بواسطة أسماء النوع البديلة" في المقال الأنواع والدوال المتقدمة في لغة رست، يسمح لنا نوع الاسم البديل بتقصير الأنواع الطويلة لسهولة الاستخدام كما في الشفرة 19.
- اسم الملف: src/lib.rs
// --snip-- type Job = Box<dyn FnOnce() + Send + 'static>; impl ThreadPool { // --snip-- pub fn execute<F>(&self, f: F) where F: FnOnce() + Send + 'static, { let job = Box::new(f); self.sender.send(job).unwrap(); } } // --snip--
[الشيفرة 19: إنشاء نوع اسم بديل Job
لـ Box
يحتوي كل مغلف وارسال العمل عبر القناة]
بعد إنشاء نسخة Job
جديدة باستخدام المغلف نحصل على execute
ونرسل الوظيفة عبر الطرف المرسل للقناة. نستدعي unwrap
على send
في حال فشل الإرسال؛ إذ يمكن حصول ذلك إذا أوقفنا كل الخيوط من التنفيذ، وهذا يعني توقُف الطرف المستقبل عن استقبال أي رسائل جديدة. لا يمكننا الآن إيقاف الخيوط من التنفيذ، إذ تستمر خيوطنا بالتنفيذ طالما المجمع موجود. سبب استخدام unwrap
هو أننا نعرف أن حالة الفشل هذه لن تحصل ولكن المصرّف لا يعرف ذلك.
لم ننتهي كليًا بعد، فالمغلف المُمرر إلى thread::spawn
يسند الطرف المستقبل من القناة فقط، لكن نريد بدلًا من ذلك أن يتكرر المغلف للأبد ويسأل الطرف المستقبل من القناة عن وظيفة وينفذ الوظيفة عندما يحصل عليها. دعنا نجري التغييرات الموضحة في الشيفرة 20 للدالة Worker::new
.
- اسم الملف: src/lib.rs
// --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || loop { let job = receiver.lock().unwrap().recv().unwrap(); println!("Worker {id} got a job; executing."); job(); }); Worker { id, thread } } }
[الشيفرة 20: استقبال وتنفيذ الوظائف في خيط العامل]
نستدعي أولًا lock
على receiver
للحصول على mutex، ونستدعي unwrap
ليهلع على أي خطأ. قد يفشل الحصول على قفل إذا كان mutex في حالة مسمومة poisoned، والتي تحصل إذا هلع أحد الخيوط عند احتفاظه بالقفل بدلًا من ترك القفل، وسيكون استدعاء unwrap
في هذه الحالة هو العمل الأفضل. غيّر unwrap
إلى expect
على راحتك لتظهر رسالة خطأ ذات معنى.
إذا حصلنا على القفل على mutex، نستدعي recv
لاستقبال Job
من القناة. يتخطى استدعاء unwrap
الأخير أي أخطاء أيضًا والتي ربما قد تحصل إذا اُغلق، على نحوٍ مشابه لكيفية إعادة Err
من قِبل تابع send
إذا أُغلق المستقبل.
إذا لم توجد أي وظيفة في استدعاء كتل recv
، سينتظر الخيط حتى تتوفر وظيفة. يضمن Mutex<T>
أن يكون هناك خيط Worker
واحد يطلب وظيفة.
يعمل مجمع الخيط الآن، جرب cargo run
وأرسل بعض الطلبات.
$ cargo run Compiling hello v0.1.0 (file:///projects/hello) warning: field is never read: `workers` --> src/lib.rs:7:5 | 7 | workers: Vec<Worker>, | ^^^^^^^^^^^^^^^^^^^^ | = note: `#[warn(dead_code)]` on by default warning: field is never read: `id` --> src/lib.rs:48:5 | 48 | id: usize, | ^^^^^^^^^ warning: field is never read: `thread` --> src/lib.rs:49:5 | 49 | thread: thread::JoinHandle<()>, | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ warning: `hello` (lib) generated 3 warnings Finished dev [unoptimized + debuginfo] target(s) in 1.40s Running `target/debug/hello` Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing. Worker 1 got a job; executing. Worker 3 got a job; executing. Worker 0 got a job; executing. Worker 2 got a job; executing.
لقد نجحنا، ولدينا الآن مجمع خيط ينفذ الاتصالات على نحوٍ غير متزامن. لا يُنشئ أكثر من أربعة خيوط حتى لا يُحمَل النظام بصورةٍ زائدة إذا استقبل الخادم طلبات كثيرة. إذا أرسلنا طلبًا إلى /sleep سيكون الخادم قادرًا على خدمة طلبات أُخرى بجعل خيط آخر ينفذهم.
ملاحظة: إذا فتحنا /sleep في نوافذ متعددة في المتصفح بنفس الوقت، ستُحمل واحدةٌ تلو الأُخرى بفواصل زمنية مدتها 5 ثواني لأن بعض المتصفحات تنفذ النسخ المتعددة لنفس الطلب بالترتيب لأسباب التخزين المؤقت. ليس الخادم هو سبب هذا التقصير.
بعد أن تعلمنا عن حلقة while let
في المقال الأنماط Patterns واستخداماتها وقابليتها للدحض Refutability في لغة رست، ربما تتساءل لماذا لم نكتب شيفرة الخيط العامل كما في الشيفرة 21.
- اسم الملف: src/lib.rs
// --snip-- impl Worker { fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker { let thread = thread::spawn(move || { while let Ok(job) = receiver.lock().unwrap().recv() { println!("Worker {id} got a job; executing."); job(); } }); Worker { id, thread } } }
[الشيفرة 21: طريقة تنفيذ مختلفة لدالة Worker::new
باستخدام while let
]
تُصرّف الشيفرة وتُنفذ ولكن لا تعطي نتيجة عمل الخيوط المرغوبة، إذ يسبب الطلب البطيء انتظار باقي الطلبات لتُعالج، والسبب بسيطٌ إلى حد ما؛ فليس لدى هيكل Mutex
دالة unlock
عامة لأن ملكية القفل مبينةٌ على دورة حياة MutexGuard<T>
داخل LockResult<MutexGuard<T>>
التي يعيدها التابع lock
. يطبق متحقق الاستعارة قاعدة أن المورد المحمي بهيكل Mutex
لا يمكن الوصول له إلا إذا احتفظنا بالقفل وقت التصريف، ولكن بهذا التنفيذ يمكن أن يبقى القفل مُحتفظًا به أكثر من اللازم إذا لم نكن منتبهين إلى دورة حياة MutexGuard<T>
.
تعمل الشيفرة في الشيفرة 20 التي تستخدم let job = receiver.lock().unwrap().recv().unwrap();
إذ تُسقط أي قيمة مؤقتة مُستخدمة في التعبير على الطرف اليمين من إشارة المساواة "=" مع let
عندما تنتهي تعليمة let
، ولكن لا تُسقط while let
(وأيضًا if let
و match
) القيم المؤقتة حتى نهاية الكتلة المرتبطة بها. يبقى القفل مُحتفظًا به حتى نهاية فترة استدعاء job()
يعني أن العمال الباقين لا يمكن أن يستقبلوا وظائف.
ترجمة -وبتصرف- لقسم من الفصل Final Project: Building a Multithreaded Web Server من كتاب The Rust Programming Language.
أفضل التعليقات
لا توجد أية تعليقات بعد
انضم إلى النقاش
يمكنك أن تنشر الآن وتسجل لاحقًا. إذا كان لديك حساب، فسجل الدخول الآن لتنشر باسم حسابك.