اذهب إلى المحتوى

استخدام ميزة تمرير الرسائل لنقل البيانات بين الخيوط Threads في لغة رست Rust


Naser Dakhel

يُعد تمرير الرسائل message passing أحد الطرق الشائعة لضمان أمن التزامن، إذ تتواصل الخيوط threads أو المنفذون actors فيما بينهم بإرسال رسائل تحتوي على بيانات. يمكن توضيح هذه الفكرة باقتباس من شعار في توثيق لغة البرمجة جو Go: "لا تتواصل بمشاركة الذاكرة، شارك الذاكرة بالتواصل".

تؤمّن مكتبة رست القياسية تنفيذًا للقنوات channels لتحقيق عملية تزامن إرسال الرسائل message-sending concurrency، والقناة هي مفهوم برمجي عام تُرسل به البيانات من خيط إلى آخر. يمكنك تخيل القناة في البرمجة مثل قناة مياه باتجاه واحد مثل جدول أو نهر، فإذا وضعت بطة مطاطية في النهر ستنتقل في المجرى إلى نهاية القناة المائية.

تنقسم القناة إلى نصفين أحدها مُرسل transmitter والآخر مُستقبل receiver؛ والمُرسل هو القسم الموجود أعلى النهر الذي تضع فيه البطة المطاطية؛ والمُستقبل هو ما تصل إليه البطة المطاطية في نهاية النهر. يستدعي قسمٌ من الشيفرة البرمجية التوابع على المُرسل مع البيانات المراد إرسالها، والقسم الآخر يتحقق من وصول الرسالة في القسم المُستقبل. يمكن القول إن القناة قد أُغلقت إذا سقط أي من القسمين المُرسل أو المستقبل.

سنعمل هنا على برنامج يحتوي على خيط واحد لتوليد القيم وإرسالها عبر القناة وخيط آخر سيستقبل القيم ويطبعها، وسنرسل قيمًا بسيطةً بين الخيوط باستخدام القناة وذلك بهدف توضيح هذه الميزة فقط، ويمكنك استخدام القناة -بعد أن نتعرف عليها جيدًا- على أي خيوط تحتاج لأن تتواصل مع بعضها بعضًا، مثل نظام محادثة أو نظام يكون فيه عدة خيوط تجري حسابات وإرسال هذه الأجزاء إلى خيط واحد يجّمع القيم.

نُنشئ في الشيفرة 6 قناةً دون استخدامها. لاحظ أن الشيفرة البرمجية لن تُصرف بعد، لأن رست لا تستطيع معرفة ما هو نوع القيم التي نريد إرسالها عبر هذه القناة.

اسم الملف: src/main.rs

use std::sync::mpsc;

fn main() {
    let (tx, rx) = mpsc::channel();
}

الشيفرة 6: انشاء قناة وإسناد القسمين tx و rx إليها

أنشأنا قناةً جديدةً باستخدام دالة mpsc::channel، ويعني الاختصار "mpsc" عدة منتجين multiple producer ومستهلك واحد single consumer. باختصار، طريقة تطبيق القنوات في مكتبة رست القياسية هي أن كل قناة تحتوي على عدة مُرسلين ينتجون القيم ولكن هناك مُستقبل واحد لاستهلاك تلك القيم. تخيّل عدة مجاري streams تلتقي في نهرٍ واحد كبير: أي سينتهي كل شيء يُرسل من المجاري في نهر واحد في النهاية. نبدأ بمنتج واحد وبعدها نضيف عدة منتجين بعد عمل هذا المثال.

تُعيد الدالة mpsc::channel صفًا tuple، العنصر الأول هو طرف الإرسال (المُرسل) والعنصر الثاني هو طرف الاستقبال (المُستقبل)، وتُستخدم عادةً الاختصارات tx و rx في العديد من الحقول fields للمُرسل والمُستقبل على التوالي، لذا نُسمي متغيراتنا وفقًا لهذا الاصطلاح للدلالة على كل طرف.

نستخدم التعليمة let بنمط pattern يدمّر هيكلية الصفوف، وسنتحدث عن استخدام الأنماط في تعليمة let وتدمير الهيكلية لاحقًا، ويكفي الآن معرفة أن استخدام تعليمة let هي الطريقة الأفضل لاستخراج أقسام من الصف المُعاد باستخدام mpsc::channel.

لننقل الطرف المُرسل إلى خيط مُنشأ ولنجعله يرسل سلسلةً نصيةً لكي يتواصل الخيط المُنشأ مع الخيط الرئيسي كما هو موضح في الشيفرة 7. يُماثل هذا الأمر وضع بطة مطاطية أعلى النهر أو إرسال رسالة من خيط إلى آخر.

اسم الملف: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });
}

الشيفرة 7: نقل tx إلى خيط مُنشأ وإرسال "hi"

استخدمنا thread::spawn مجددًا لإنشاء خيط جديد، ثم استخدمنا move لنقل tx إلى مُغلّف closure بحيث يمتلك الخيط المُنشأ القيمة tx. يجب على الخيط المُنشأ أن يمتلك الطرف المُرسل لكي يستطيع إرسال رسائل عبر القناة، ولدى المرسل تابع send الذي يأخذ القيمة المُراد إرسالها، إذ يعيد التابع send قيمةً من النوع Result<T, E>‎، لذا إذا كان المُستقبل قد اُسقط وليس هناك مكان لإرسال القيمة، تعيد عملية الإرسال خطأً. استدعينا في هذا المثال unwrap ليهلع في حال الخطأ، ولكن يجب التعامل مع حالة الهلع في التطبيقات الحقيقية بطريقة مناسبة، راجع مقال الأخطاء والتعامل معها في لغة رست لمراجعة استراتيجيات التعامل المناسب مع الأخطاء.

سنحصل في الشيفرة 8 على القيمة من المُستقبل في الخيط الأساسي، وتشابه هذه العملية استرجاع البطة المطاطية من نهاية النهر أو استقبال الرسالة.

اسم الملف: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

الشيفرة 8: استقبال القيمة "hi" في الخيط الأساسي وطباعتها

للمستقبل تابعان مفيدان، هما recv و try_recv، استخدمنا recv -وهو اختصارٌ لكلمة استقبال receive- الذي يمنع تنفيذ الخيط الرئيسي وينتظر حتى تصل القيمة إلى نهاية القناة، ويعيد التابع recv بعد وصول القيمة إلى نهاية القناة القيمة ذاتها داخل النوع Result<T,E>‎، وعندما يُغلق المُرسل يعيد التابع recv خطأ للإشارة إلى أنه لا يوجد المزيد من القيم قادمة.

لا يحجب التابع try_recv الخيط الرئيسي وإنما يعيد قيمةً من النوع Result<T,E>‎ مباشرةً، وتحتوي قيمة Ok رسالةً إذا كان هناك رسالة متوفرة وإلا فقيمة Err إذا لا يوجد أي رسائل هذه المرة. استخدام try_recv مفيدٌ إذا كان للخيط أعمالٌ أخرى لينفذها بينما ينتظر الرسائل، وبإمكاننا كتابة حلقة تستدعي try_recv بصورةٍ متكررة لتتعامل مع الرسائل في حال قدومها، أو تنفّذ أعمالًا أخرى قبل تحققها مجدداً.

استخدمنا في مثالنا التابع recv لبساطته، وليس لدينا أي عمل آخر للخيط الرئيسي غير انتظار الرسائل، لذا فإن حجب الخيط الرئيس مناسب.

عندما ننفذ الشيفرة البرمجية في الشيفرة 8 نلاحظ القيمة المطبوعة في الخيط الرئيسي:

Got: hi

هذا ممتاز!

القنوات ونقل الملكية

تلعب الملكية دورًا مهمًا في إرسال الرسائل لأنها تساعد في كتابة شيفرة آمنة ومتزامنة، إذ يتطلب منع الأخطاء في البرمجة المتزامنة الانتباه على الملكية ضمن برنامجك باستخدام لغة رست. لنكتب مثالًا يظهر كيف تعمل كل من القنوات والملكية لمنع المشاكل، وسنستخدم قيمة val في الخيط المُنشأ بعد أن أرسلناه عبر القناة. جرّب تصريف الشيفرة البرمجية في الشيفرة 9 لترى سبب كون هذه الشيفرة غير مسموحة.

اسم الملف: src/main.rs

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

الشيفرة 9: محاولة استخدام val بعد إرسالها عبر القناة

حاولنا هنا طباعة val بعد أن أرسلناها عبر القناة باستخدام tx.send، وسيكون السماح بذلك فكرةً سيئة، إذ يستطيع الخيط التعديل على القيمة أو إسقاطها بعد إرسالها عبره، ومن الممكن أن يسبّب تعديل الخيط الآخر أخطاءً أو قيمًا غير متوقعة أو بيانات غير موجودة، إلا أن رست تعطينا رسالة خطأ إذا جربنا تصريف الشيفرة البرمجية في الشيفرة 9.

$ cargo run
   Compiling message-passing v0.1.0 (file:///projects/message-passing)
error[E0382]: borrow of moved value: `val`
  --> src/main.rs:10:31
   |
8  |         let val = String::from("hi");
   |             --- move occurs because `val` has type `String`, which does not implement the `Copy` trait
9  |         tx.send(val).unwrap();
   |                 --- value moved here
10 |         println!("val is {}", val);
   |                               ^^^ value borrowed here after move
   |
   = note: this error originates in the macro `$crate::format_args_nl` which comes from the expansion of the macro `println` (in Nightly builds, run with -Z macro-backtrace for more info)

For more information about this error, try `rustc --explain E0382`.
error: could not compile `message-passing` due to previous error

حدث خطأ وقت التصريف نتيجة حدوث خطأ في التزامن. تأخذ دالة send ملكيتها من معاملاتها، وعندما تُنقل القيمة، يأخذ المستقبل ملكيتها، وهذا يمنعنا من استخدامها مرةً أخرى بعد إرسالها؛ وبالنتيجة يتحقق نظام الملكية من أن كل شيء على ما يرام.

إرسال قيم متعددة مع انتظار المستقبِل

استطعنا تصريف وتنفيذ الشيفرة 8، ولكن لم يظهر لنا أن خيطين مستقلين كانا يتكلمان مع بعضهما عبر القناة. أجرينا بعض التعديلات في الشيفرة 10 لنبين أن الشيفرة البرمجية في الشيفرة 8 تُنفَّذ بصورةٍ متزامنة. سيُرسل الخيط المُنشأ الآن عدة رسائل وسيتوقف لمدة ثانية بين كل رسالة.

اسم الملف: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }
}

الشيفرة 10: إرسال رسائل متعددة مع التوقف بين كل عملية إرسال

للخيط المُنشأ هذه المرة شعاع vector من السلاسل النصية التي نريد إرسالها إلى الخيط الأساسي. نمرّ على السلاسل النصية ونرسلها بصورةٍ إفرادية ونتوقف بين كل واحدة وأخرى باستدعاء دالة thread::sleep مع قيمة Duration مساوية إلى 1 ثانية.

لم نستدعِ في الخيط الأساسي الدالة recv صراحةً بل عاملنا rx مثل مكرّر iterator، إذ نطبع كل قيمة نستقبلها، وتنتهي عملية التكرار عندما تُغلق القناة.

نلاحظ الخرج التالية عند تنفيذ الشيفرة البرمجية في الشيفرة 10 مع توقف لمدة ثانية بين كل سطر وآخر:

Got: hi
Got: from
Got: the
Got: thread

يمكننا معرفة أن الخيط الأساسي ينتظر استلام القيم من الخيط المُنشأ لأنه ليس لدينا شيفرة تتوقف أو تتأخر في الحلقة for ضمن الخيط الأساسي.

إنشاء عدة منتجين بواسطة نسخ المرسل

قلنا سابقًا أن mpsc هو اختصار لعدة منتجين ومستهلك واحد، دعنا نستخدم mpsc للبناء على الشيفرة 10 وذلك لإنشاء خيوط متعددة ترسل كلها قيمًا المُستقبل ذاته، ونستطيع عمل ذلك بنسخ المُرسِل كما هو موضح في الشيفرة 11:

اسم الملف: src/main.rs

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    // --snip--

    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    thread::spawn(move || {
        let vals = vec![
            String::from("more"),
            String::from("messages"),
            String::from("for"),
            String::from("you"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });

    for received in rx {
        println!("Got: {}", received);
    }

    // --snip--
}

الشيفرة 11: إرسال عدّة رسائل من عدّة منتجين

نستدعي هذه المرة clone على المُرسِل قبل إنشاء أول خيط، وسيمنحنا ذلك مُرسلًا جديدًا يمكننا تمريره إلى الخيط المُنشأ الأول، ثم نمرر المُرسل الأصلي إلى الخيط المنشأ الثاني، وهذا يعطينا خيطين يرسل كل منهما رسالة مختلفة إلى مستقبل واحد.

يجب أن يكون الخرج كما يلي عندما ننفذ الشيفرة السابقة:

Got: hi
Got: more
Got: from
Got: messages
Got: for
Got: the
Got: thread
Got: you

قد تجد القيم بترتيب مختلف حسب نظامك، وهذا ما يجعل التزامن مثيرًا للاهتمام وصعبًا في الوقت ذاته، وإذا حاولت تجربة التزامن باستخدام thread::sleep بإعطائه قيمًا مختلفة على خيوط مختلفة، فكل تنفيذ سيكون غير محدد أكثر، مما يعطيك خرج مختلف في كل مرة.

الآن وبعد تعلمنا كيفية عمل القنوات سنتعلم في المقالات التالية نوعًا آخرًا من التزامن.

ترجمة -وبتصرف- لقسم من الفصل Fearless Concurrency من كتاب The Rust Programming Language.

اقرأ أيضًا:


تفاعل الأعضاء

أفضل التعليقات

لا توجد أية تعليقات بعد



انضم إلى النقاش

يمكنك أن تنشر الآن وتسجل لاحقًا. إذا كان لديك حساب، فسجل الدخول الآن لتنشر باسم حسابك.

زائر
أضف تعليق

×   لقد أضفت محتوى بخط أو تنسيق مختلف.   Restore formatting

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   جرى استعادة المحتوى السابق..   امسح المحرر

×   You cannot paste images directly. Upload or insert images from URL.


×
×
  • أضف...