يُعد تمرير الرسائل message passing أحد الطرق الشائعة لضمان أمن التزامن، إذ تتواصل الخيوط threads أو المنفذون actors فيما بينهم بإرسال رسائل تحتوي على بيانات. يمكن توضيح هذه الفكرة باقتباس من شعار في توثيق لغة البرمجة جو Go: "لا تتواصل بمشاركة الذاكرة، شارك الذاكرة بالتواصل".
تؤمّن مكتبة رست القياسية تنفيذًا للقنوات channels لتحقيق عملية تزامن إرسال الرسائل message-sending concurrency، والقناة هي مفهوم برمجي عام تُرسل به البيانات من خيط إلى آخر. يمكنك تخيل القناة في البرمجة مثل قناة مياه باتجاه واحد مثل جدول أو نهر، فإذا وضعت بطة مطاطية في النهر ستنتقل في المجرى إلى نهاية القناة المائية.
تنقسم القناة إلى نصفين أحدها مُرسل transmitter والآخر مُستقبل receiver؛ والمُرسل هو القسم الموجود أعلى النهر الذي تضع فيه البطة المطاطية؛ والمُستقبل هو ما تصل إليه البطة المطاطية في نهاية النهر. يستدعي قسمٌ من الشيفرة البرمجية التوابع على المُرسل مع البيانات المراد إرسالها، والقسم الآخر يتحقق من وصول الرسالة في القسم المُستقبل. يمكن القول إن القناة قد أُغلقت إذا سقط أي من القسمين المُرسل أو المستقبل.
سنعمل هنا على برنامج يحتوي على خيط واحد لتوليد القيم وإرسالها عبر القناة وخيط آخر سيستقبل القيم ويطبعها، وسنرسل قيمًا بسيطةً بين الخيوط باستخدام القناة وذلك بهدف توضيح هذه الميزة فقط، ويمكنك استخدام القناة -بعد أن نتعرف عليها جيدًا- على أي خيوط تحتاج لأن تتواصل مع بعضها بعضًا، مثل نظام محادثة أو نظام يكون فيه عدة خيوط تجري حسابات وإرسال هذه الأجزاء إلى خيط واحد يجّمع القيم.
نُنشئ في الشيفرة 6 قناةً دون استخدامها. لاحظ أن الشيفرة البرمجية لن تُصرف بعد، لأن رست لا تستطيع معرفة ما هو نوع القيم التي نريد إرسالها عبر هذه القناة.
اسم الملف: src/main.rs
use std::sync::mpsc; fn main() { let (tx, rx) = mpsc::channel(); }
الشيفرة 6: انشاء قناة وإسناد القسمين tx
و rx
إليها
أنشأنا قناةً جديدةً باستخدام دالة mpsc::channel
، ويعني الاختصار "mpsc" عدة منتجين multiple producer ومستهلك واحد single consumer. باختصار، طريقة تطبيق القنوات في مكتبة رست القياسية هي أن كل قناة تحتوي على عدة مُرسلين ينتجون القيم ولكن هناك مُستقبل واحد لاستهلاك تلك القيم. تخيّل عدة مجاري streams تلتقي في نهرٍ واحد كبير: أي سينتهي كل شيء يُرسل من المجاري في نهر واحد في النهاية. نبدأ بمنتج واحد وبعدها نضيف عدة منتجين بعد عمل هذا المثال.
تُعيد الدالة mpsc::channel
صفًا tuple، العنصر الأول هو طرف الإرسال (المُرسل) والعنصر الثاني هو طرف الاستقبال (المُستقبل)، وتُستخدم عادةً الاختصارات tx
و rx
في العديد من الحقول fields للمُرسل والمُستقبل على التوالي، لذا نُسمي متغيراتنا وفقًا لهذا الاصطلاح للدلالة على كل طرف.
نستخدم التعليمة let
بنمط pattern يدمّر هيكلية الصفوف، وسنتحدث عن استخدام الأنماط في تعليمة let
وتدمير الهيكلية لاحقًا، ويكفي الآن معرفة أن استخدام تعليمة let
هي الطريقة الأفضل لاستخراج أقسام من الصف المُعاد باستخدام mpsc::channel
.
لننقل الطرف المُرسل إلى خيط مُنشأ ولنجعله يرسل سلسلةً نصيةً لكي يتواصل الخيط المُنشأ مع الخيط الرئيسي كما هو موضح في الشيفرة 7. يُماثل هذا الأمر وضع بطة مطاطية أعلى النهر أو إرسال رسالة من خيط إلى آخر.
اسم الملف: src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); }
الشيفرة 7: نقل tx
إلى خيط مُنشأ وإرسال "hi"
استخدمنا thread::spawn
مجددًا لإنشاء خيط جديد، ثم استخدمنا move
لنقل tx
إلى مُغلّف closure بحيث يمتلك الخيط المُنشأ القيمة tx
. يجب على الخيط المُنشأ أن يمتلك الطرف المُرسل لكي يستطيع إرسال رسائل عبر القناة، ولدى المرسل تابع send
الذي يأخذ القيمة المُراد إرسالها، إذ يعيد التابع send
قيمةً من النوع Result<T, E>
، لذا إذا كان المُستقبل قد اُسقط وليس هناك مكان لإرسال القيمة، تعيد عملية الإرسال خطأً. استدعينا في هذا المثال unwrap
ليهلع في حال الخطأ، ولكن يجب التعامل مع حالة الهلع في التطبيقات الحقيقية بطريقة مناسبة، راجع مقال الأخطاء والتعامل معها في لغة رست لمراجعة استراتيجيات التعامل المناسب مع الأخطاء.
سنحصل في الشيفرة 8 على القيمة من المُستقبل في الخيط الأساسي، وتشابه هذه العملية استرجاع البطة المطاطية من نهاية النهر أو استقبال الرسالة.
اسم الملف: src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); }); let received = rx.recv().unwrap(); println!("Got: {}", received); }
الشيفرة 8: استقبال القيمة "hi" في الخيط الأساسي وطباعتها
للمستقبل تابعان مفيدان، هما recv
و try_recv
، استخدمنا recv
-وهو اختصارٌ لكلمة استقبال receive- الذي يمنع تنفيذ الخيط الرئيسي وينتظر حتى تصل القيمة إلى نهاية القناة، ويعيد التابع recv
بعد وصول القيمة إلى نهاية القناة القيمة ذاتها داخل النوع Result<T,E>
، وعندما يُغلق المُرسل يعيد التابع recv
خطأ للإشارة إلى أنه لا يوجد المزيد من القيم قادمة.
لا يحجب التابع try_recv
الخيط الرئيسي وإنما يعيد قيمةً من النوع Result<T,E>
مباشرةً، وتحتوي قيمة Ok
رسالةً إذا كان هناك رسالة متوفرة وإلا فقيمة Err
إذا لا يوجد أي رسائل هذه المرة. استخدام try_recv
مفيدٌ إذا كان للخيط أعمالٌ أخرى لينفذها بينما ينتظر الرسائل، وبإمكاننا كتابة حلقة تستدعي try_recv
بصورةٍ متكررة لتتعامل مع الرسائل في حال قدومها، أو تنفّذ أعمالًا أخرى قبل تحققها مجدداً.
استخدمنا في مثالنا التابع recv
لبساطته، وليس لدينا أي عمل آخر للخيط الرئيسي غير انتظار الرسائل، لذا فإن حجب الخيط الرئيس مناسب.
عندما ننفذ الشيفرة البرمجية في الشيفرة 8 نلاحظ القيمة المطبوعة في الخيط الرئيسي:
Got: hi
هذا ممتاز!
القنوات ونقل الملكية
تلعب الملكية دورًا مهمًا في إرسال الرسائل لأنها تساعد في كتابة شيفرة آمنة ومتزامنة، إذ يتطلب منع الأخطاء في البرمجة المتزامنة الانتباه على الملكية ضمن برنامجك باستخدام لغة رست. لنكتب مثالًا يظهر كيف تعمل كل من القنوات والملكية لمنع المشاكل، وسنستخدم قيمة val
في الخيط المُنشأ بعد أن أرسلناه عبر القناة. جرّب تصريف الشيفرة البرمجية في الشيفرة 9 لترى سبب كون هذه الشيفرة غير مسموحة.
اسم الملف: src/main.rs
use std::sync::mpsc; use std::thread; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hi"); tx.send(val).unwrap(); println!("val is {}", val); }); let received = rx.recv().unwrap(); println!("Got: {}", received); }
الشيفرة 9: محاولة استخدام val
بعد إرسالها عبر القناة
حاولنا هنا طباعة val
بعد أن أرسلناها عبر القناة باستخدام tx.send
، وسيكون السماح بذلك فكرةً سيئة، إذ يستطيع الخيط التعديل على القيمة أو إسقاطها بعد إرسالها عبره، ومن الممكن أن يسبّب تعديل الخيط الآخر أخطاءً أو قيمًا غير متوقعة أو بيانات غير موجودة، إلا أن رست تعطينا رسالة خطأ إذا جربنا تصريف الشيفرة البرمجية في الشيفرة 9.
$ cargo run Compiling message-passing v0.1.0 (file:///projects/message-passing) error[E0382]: borrow of moved value: `val` --> src/main.rs:10:31 | 8 | let val = String::from("hi"); | --- move occurs because `val` has type `String`, which does not implement the `Copy` trait 9 | tx.send(val).unwrap(); | --- value moved here 10 | println!("val is {}", val); | ^^^ value borrowed here after move | = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info) For more information about this error, try `rustc --explain E0382`. error: could not compile `message-passing` due to previous error
حدث خطأ وقت التصريف نتيجة حدوث خطأ في التزامن. تأخذ دالة send
ملكيتها من معاملاتها، وعندما تُنقل القيمة، يأخذ المستقبل ملكيتها، وهذا يمنعنا من استخدامها مرةً أخرى بعد إرسالها؛ وبالنتيجة يتحقق نظام الملكية من أن كل شيء على ما يرام.
إرسال قيم متعددة مع انتظار المستقبِل
استطعنا تصريف وتنفيذ الشيفرة 8، ولكن لم يظهر لنا أن خيطين مستقلين كانا يتكلمان مع بعضهما عبر القناة. أجرينا بعض التعديلات في الشيفرة 10 لنبين أن الشيفرة البرمجية في الشيفرة 8 تُنفَّذ بصورةٍ متزامنة. سيُرسل الخيط المُنشأ الآن عدة رسائل وسيتوقف لمدة ثانية بين كل رسالة.
اسم الملف: src/main.rs
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); } }
الشيفرة 10: إرسال رسائل متعددة مع التوقف بين كل عملية إرسال
للخيط المُنشأ هذه المرة شعاع vector من السلاسل النصية التي نريد إرسالها إلى الخيط الأساسي. نمرّ على السلاسل النصية ونرسلها بصورةٍ إفرادية ونتوقف بين كل واحدة وأخرى باستدعاء دالة thread::sleep
مع قيمة Duration
مساوية إلى 1 ثانية.
لم نستدعِ في الخيط الأساسي الدالة recv
صراحةً بل عاملنا rx
مثل مكرّر iterator، إذ نطبع كل قيمة نستقبلها، وتنتهي عملية التكرار عندما تُغلق القناة.
نلاحظ الخرج التالية عند تنفيذ الشيفرة البرمجية في الشيفرة 10 مع توقف لمدة ثانية بين كل سطر وآخر:
Got: hi Got: from Got: the Got: thread
يمكننا معرفة أن الخيط الأساسي ينتظر استلام القيم من الخيط المُنشأ لأنه ليس لدينا شيفرة تتوقف أو تتأخر في الحلقة for
ضمن الخيط الأساسي.
إنشاء عدة منتجين بواسطة نسخ المرسل
قلنا سابقًا أن mpsc
هو اختصار لعدة منتجين ومستهلك واحد، دعنا نستخدم mpsc
للبناء على الشيفرة 10 وذلك لإنشاء خيوط متعددة ترسل كلها قيمًا المُستقبل ذاته، ونستطيع عمل ذلك بنسخ المُرسِل كما هو موضح في الشيفرة 11:
اسم الملف: src/main.rs
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { // --snip-- let (tx, rx) = mpsc::channel(); let tx1 = tx.clone(); thread::spawn(move || { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("thread"), ]; for val in vals { tx1.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); thread::spawn(move || { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); thread::sleep(Duration::from_secs(1)); } }); for received in rx { println!("Got: {}", received); } // --snip-- }
الشيفرة 11: إرسال عدّة رسائل من عدّة منتجين
نستدعي هذه المرة clone
على المُرسِل قبل إنشاء أول خيط، وسيمنحنا ذلك مُرسلًا جديدًا يمكننا تمريره إلى الخيط المُنشأ الأول، ثم نمرر المُرسل الأصلي إلى الخيط المنشأ الثاني، وهذا يعطينا خيطين يرسل كل منهما رسالة مختلفة إلى مستقبل واحد.
يجب أن يكون الخرج كما يلي عندما ننفذ الشيفرة السابقة:
Got: hi Got: more Got: from Got: messages Got: for Got: the Got: thread Got: you
قد تجد القيم بترتيب مختلف حسب نظامك، وهذا ما يجعل التزامن مثيرًا للاهتمام وصعبًا في الوقت ذاته، وإذا حاولت تجربة التزامن باستخدام thread::sleep
بإعطائه قيمًا مختلفة على خيوط مختلفة، فكل تنفيذ سيكون غير محدد أكثر، مما يعطيك خرج مختلف في كل مرة.
الآن وبعد تعلمنا كيفية عمل القنوات سنتعلم في المقالات التالية نوعًا آخرًا من التزامن.
ترجمة -وبتصرف- لقسم من الفصل Fearless Concurrency من كتاب The Rust Programming Language.
اقرأ أيضًا:
- المقال التالي: تزامن الحالة المشتركة Shared-State Concurrency في لغة رست وتوسيع التزامن مع Send و Sync
- المقال السابق: استخدام الخيوط Threads لتنفيذ شيفرات رست بصورة متزامنة آنيًا
- الأخطاء والتعامل معها في لغة رست Rust
- استخدام الهياكل structs لتنظيم البيانات في لغة رست Rust
- الحزم packages والوحدات المصرفة crates في لغة رست Rust
أفضل التعليقات
لا توجد أية تعليقات بعد
انضم إلى النقاش
يمكنك أن تنشر الآن وتسجل لاحقًا. إذا كان لديك حساب، فسجل الدخول الآن لتنشر باسم حسابك.