تُسهِل بيئة عمل .NET من البرمجة مُتعددة الأنوية (multi-core programming) من خلال توفير مَكتبة تَوازِي المَهامّ Task Parallel Library، حيث تَسمَح لك المكتبة بكتابة شيفرة -بالإضافة إلى كَوْنها مَقْرُوءة- فهي تُؤقلم نفسها مع العدد المُتاح من الأنوية (cores)، مما يَضمَن الترقية التلقائية للشيفرة وتحسين أدائها مع ترقية البيئة (environment).
توازي البيانات (Data parallelism)
تَوازِي البيانات (Data parallelism) هو التَّنفيذ المُتواقِت (concurrent) لنفس ذات العملية على عدة عناصر مُخزَّنة إِمّا بتَجمِيعة أو بمصفوفة. يُوفِّر إطار العمل .NET كلًا من البنائين البرمجيين (constructs) أو بشكل أدق التابعين Parallel.For
و Parallel.Foreach
لتَّنْفيذ حلقة (loop) مُتواقِتة من خلال تجزئة البيانات إلى مجموعات تُعالَج بنفس ذات الوقت. تُعدّ كلًا منهما النُسخة المُتواقِتة من الحلقة (loop) باستخدام for
و foreach
على الترتيب. مثلًا انظر المثال التالي:
// النسخة المتتالية foreach (var item in sourcecollection){ Process(item); } // النسخة المتواقتة Parallel.Foreach(sourcecollection, item => Process(item));
صُمم التابع Parallel.ForEach
بطريقة تَسمَح له -إن أمكن- باِستخدَام أكثر مِن نواة (core) أثناء التَّنْفيذ مما يُحسِن من أداء الشيفرة.
استخدام التابع Parallel.For
يُنفِّذ البناء البرمجي أو التابع Parallel.For
حلقة (loop) مُتواقِتة.
تتوافر عدة بصمات من هذا التابع تَستقبِل جميعها القيمة الأولية والنهائية للحلقة كأول مُعامِلين. تختلف البصمات بعد ذلك في عدد ونوع المُعامِلات لكن بالأخير لابُدّ لهن جميعًا من اِستقبَال مُفوَّض مَتن الحلقة من النوع Delegate
، والذي يَحوِي العبارات البرمجية المطلوب تَّنْفيذها. على سبيل المثال، تَستقبِل البصمة، في المثال التالي، مُفوَّض مَتن الحلقة كمُعامِل رابع. تُمرَّر قيمة الحلقة الحالية (counter) لهذا المُفوَّض كأول مُعامِل له.
ملحوظة: تتوفَّر بصمات مُبسطة من التابع Parallel.For
تَقتصِر على المُعامِلات الثلاثة المذكورة بالأعلى والتي تُمثِل الأساس الجوهري لأيّ حلقة (loop) سواء كانت مُتواقِتة أم لا.
يُنشِئ التابع Parallel.For
عدة خيوط (threads) تُنفَّذ بنفس ذات الوقت. عادةً ما يحتاج كل خيط (thread) منها لمُتغيّر محلي (local) خاص يَتوَافر طوال فترة حياة (lifetime) الخيط. تَسمَح البصمة في المثال التالي بذلك، حيث يُستخدَم مُعامِلها الثالث لتهيئة القيمة المبدئية لذلك المُتغيّر. يُمرَّر هذا المتغير كمُعامِل ثالث إلى مُفوَّض مَتن الحلقة، بحيث يَستطيع التعديل من قيمته وإعادته لتَستقبِله الحلقة التالية ضِمْن نفس الخيط. تستمر العملية حتى ينتهي تَّنْفيذ جميع الحلقات ضِمْن نفس الخيط، وعندها تُمرَّر القيمة النهائية للمُتغيّر إلى مُفوَّض أخير (Delegate) من النوع Action
، والذي يُنفَّذ مرة واحدة فقط قبل انتهاء فترة حياة الخيط. هذا المفوَّض الأخير هو المُعامِل الخامس للبصمة في المثال التالي.
في المثال التالي، يُحسَب مجموع الأعداد من 1 إلى 10000 بصورة مُتواقِتة. بعد انتهاء كل خيط من تَّنْفيذ حلقاته، تُضاف القيمة النهائية للمُتغيّر المحلي الخاص بالخيط localSum
إلى القيمة الكلية الفعلّية total
باِستخدَام التابع Interlocked.Add
وذلك لضَمان الأمن الخيطي (thread-safety).
using System.Threading; int Foo() { int total = 0; Parallel.For(1, 10001, () => 0, // القيمة المبدئية, (num, state, localSum) => num + localSum, localSum => Interlocked.Add(ref total, localSum)); return total; }
استخدام التابع Parallel.ForEach
كالمثال السابق، يُنفِّذ التابع Parallel.ForEach
حلقة (loop) مُتواقِتة، لكنه يُجريها على تَجمِيعَة بحيث تُقسَّم عناصر التَجمِيعَة وتُوزَّع على الخيوط المُنشئة.
using System.Threading; int Foo() { int total = 0; var numbers = Enumerable.Range(1, 10000).ToList(); Parallel.ForEach(numbers, () => 0, // القيمة المبدئية (num, state, localSum) => num + localSum, localSum => Interlocked.Add(ref total, localSum)); return total; }
تَتوفَّر بصمات أبسط من التابع Parallel.ForEach
، كالمثال التالي:
Parallel.Foreach(sourcecollection, item => Process(item));
يتوفَّر التابع Parallel.ForEach
بلغة الفيجوال بيسك كالتالي:
For Each row As DataRow In FooDataTable.Rows Me.RowsToProcess.Add(row) Next Dim myOptions As ParallelOptions = New ParallelOptions() myOptions.MaxDegreeOfParallelism = environment.processorcount Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state) ProcessRowParallel(currentRow, state) End Sub)
توازي المهام (Task parallelism)
تَوازِي المَهامّ (Task parallelism) هو التَّنْفيذ المُتواقِت (concurrent) لعمليات مُنفصلة.
تَعتمِد مكتبة تَوازِي المَهامّ (Task Parallel Library) بشكل رئيسي على مفهوم المَهامّ (tasks)، والتي يُمكن مُوازنتِها بالخيوط (threads) ولكن على مُستوى أعلى من التجريد (abstraction).
تَكون المَهامّ إمّا من النوع Task
إذا كانت لا تُعيد قيمة أو من النوع Task<T>
إذا كانت تُعيد قيمة من النوع T.
استخدام التابع Parallel.Invoke
يُعدّ التابع Parallel.Invoke
أحد أبسط وسائل تَحقيق تَوازِي المَهامّ، حيث لا يَتطلَّب أي تَعامُل مباشر مع النوع Task
. فقط مَرِّر إليه أيّ عدد من العمليات، كلًا منها من النوع Delegate<Action>
، وسيُحوِّلها إلى عدة مَهامّ (tasks) ثم يُنفِّذها بصورة مُتواقِتة، كالمثال التالي:
var actions = Enumerable.Range(1, 10).Select( n => new Action( () => { Console.WriteLine("I'm task " + n); if((n & 1) == 0) throw new Exception("Exception from task " + n); } )).ToArray(); try { Parallel.Invoke(actions); } catch(AggregateException ex) { foreach(var inner in ex.InnerExceptions) Console.WriteLine("Task failed: " + inner.Message); }
لمزيد من التَحكُم ستحتاج إلى التَعامُل مع النوع Task
مباشرة.
تنشئة مُهِمّة Task
باستخدام باني كائنات النوع Task
يُمكنك إنشاء نُسخة من النوع Task
مباشرة باِستخدَام باني الكائنات. فقط مَرِّر مُعامِلًا من النوع Delegate
إليه يَحوِي العبارات البرمجية المُراد تَّنْفيذها خلال المُهِمّة. تحتاج لإستدعاء التابع Start
اِستدعاءً صريحًا لبدء تَّنْفيذ المُهِمّة. كالتالي:
var task = new Task( () => { Console.WriteLine("Task code starting..."); Thread.Sleep(2000); Console.WriteLine("...task code ending!"); }); Console.WriteLine("Starting task..."); task.Start(); task.Wait(); Console.WriteLine("Task completed!");
تنشئة مُهِمّة Task
باستخدام التابع Task.Run
بالمثل يَستقبِل التابع Task.Run
مُعامِلًا من النوع Delegate
يَحوِي العبارات البرمجية المُراد تَّنْفيذها خلال المُهِمّة، وينشئ مُهِمّة Task
ويُنفِّذها في آن واحد. لذا، أنت لست في حاجة لاِستدعاء التابع Start
.
إذا كان الـ Delegate
المُمرَّر من النوع Action
، يُنشَئ مُتغيّر من النوع Task
. كالتالي:
Console.WriteLine("Starting task..."); var task = Task.Run( () => { Console.WriteLine("Task code starting..."); Thread.Sleep(2000); Console.WriteLine("...task code ending!"); }); task.Wait(); Console.WriteLine("Task completed!");
تنشئة مُهِمّة Task<T>
باستخدام التابع Task.Run
إذا كان الـ Delegate
المُمرَّر من النوع Func<T>
، تُنشَئ مُهمِة (task) من النوع Task<T>
، بحيث تكون القيمة العائدة من المُهِمّة من النوع T. كالتالي:
Task<int> t = Task.Run( () => { int sum = 0; for(int i = 0; i < 500; i++) sum += i; return sum; }); Console.WriteLine(t.Result); // 124750
كما رأينا بالمثال السابق، تُتيِح الخاصية Result
بالنوع Task
الوُلوج للقيمة العائدة من المُهِمّة.
إذا كانت المُهِمّة تُنفَّذ بشكل غيْر مُتزامِن (asynchronous)، فإن اِستخدَام await
لانتظار انتهائها من العَمَل يُعيد القيمة الفعلّية والتي تكون من النوع T، كالتالي:
public async Task DoSomeWork() { WebClient client = new WebClient(); // نظرًا لاستخدام await تسند نتيجة المهمة إلى المتغير string response = await client.DownloadStringTaskAsync("http://somedomain.com"); }
إلغاء مهمة باستخدام CancellationToken
يُمكن إلغاء مُهِمّة أثناء تَّنْفيذها من خلال مُفتاح الإلغاء CancellationToken
.
تُنشِئ المُهِمّة الأساسية مُفتاح إلغاء (cancellation token) وتُمرِّره لإحدى مُهِمّاتها الفرعية أثناء تَنشِئتها من خلال المُعامِل state
بباني كائنات النوع Task
. وفي حالة أَرَادت إلغائها، يُمكِنها اِستدعاء التابع cancellationTokenSource.Cancel
.
في المقابل، ينبغي للمُهِمّة الفرعية أن تَستدعِي التابع ThrowIfCancellationRequested
دَوريًّا لضَمان الاستجابة لطلب الإلغاء. كالمثال التالي:
var cancellationTokenSource = new CancellationTokenSource(); var cancellationToken = cancellationTokenSource.Token; var task = new Task( (state) => { int i = 1; var myCancellationToken = (CancellationToken)state; while(true) { Console.Write("{0} ", i++); Thread.Sleep(1000); myCancellationToken.ThrowIfCancellationRequested(); } }, cancellationToken: cancellationToken, state: cancellationToken); Console.WriteLine("Counting to infinity. Press any key to cancel!"); task.Start(); Console.ReadKey(); cancellationTokenSource.Cancel(); try { task.Wait(); } catch(AggregateException ex) { ex.Handle(inner => inner is OperationCanceledException); } Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}"); //Canceled
تستطيع المُهِمّة الفرعية أيضًا فحص وجود طلب الإلغاء باستخدام التابع IsCancellationRequested
، والذي يُعيد قيمة منطقية. يُمكِنها بعد ذلك التبلّيغ عن اعتراض من النوع OperationCanceledException
، كالمثال التالي:
//New task delegate int i = 1; var myCancellationToken = (CancellationToken)state; while(!myCancellationToken.IsCancellationRequested) { Console.Write("{0} ", i++); Thread.Sleep(1000); } Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!"); throw new OperationCanceledException(myCancellationToken);
لاحظ أنه بالإضافة إلى تمرير مفتاح الإلغاء (cancellation token) للمُعامِل state
بباني كائنات النوع Task
، فقد مُرِّر مفتاح الإلغاء أيضًا للمُعامِل cancellationToken
. يُعدّ ذلك ضروريًا لضَمان انتقال المُهِمّة إلى الحالة Canceled
بدلًا من Faulted
عند اِستدعاء التابع ThrowIfCancellationRequested
. مُرِّر أيضًا مفتاح الإلغاء لمُعامِل باني كائنات النوع OperationCanceledException
تمريرًا صريحًا لنفس السبب.
ضمان منطقية السياق التنفيذي باستخدام النوع AsyncLocal
يُمكن للمُهِمّة الأساسية (parent task) تمرير بعض البيانات إلى مَهامّها الفرعية (children tasks) باستخدام النوع AsyncLocal
الذي يَسمَح بتَوفُّر نُسخة محلية (local) من تلك البيانات لكل مُهِمّة فرعية بشكل لا يُؤثر على البيانات الأصلية ضِمْن المُهِمّة الأساسية (parent task) مما يَضمَن منطقية السياق التَّنْفيذي.
في المثال التالي:
(1): لا يُؤثِر تَغيير قيمة المُتغيّر على بقية المَهامّ الفرعية؛ نظرًا لكَوْنه محلي (local).
(2): تَدَفَّقت القيمة من التابع main
إلى المُهِمّة task1
ومِنْ ثَمَّ إلى هذه المُهِمّة الفرعية دون أن تَتأثَر بأي تَغيّير قد يقع ضِمْن مُهمات اخرى.
(3): بالمثل، تَدَفَّقت القيمة من التابع main
إلى المُهِمّة task1
ومِنْ ثَمَّ إلى هذه المُهِمّة الفرعية، لكن في هذه الحالة حَدَث تعديل ضمن المُهِمّة task1
والتي تُعدّ ضِمْن سياقها التَّنْفيذي ولذلك تَأثَرت بالتَغيّير.
void Main() { AsyncLocal<string> user = new AsyncLocal<string>(); user.Value = "initial user"; Task.Run(() => user.Value = "user from another task"); // (1) var task1 = Task.Run( () => { Console.WriteLine(user.Value); // "initial user" Task.Run( () => { // "initial user" Console.WriteLine(user.Value); // (2) }).Wait(); user.Value = "user from task1"; Task.Run( () => { // "user from task1" Console.WriteLine(user.Value); // (3) }).Wait(); }); task1.Wait(); // "initial user" Console.WriteLine(user.Value); }
ملحوظة: المُمارسة المُثلى تكون بقَصر اَستخدَام النوع AsyncLocal
على المُتغيّرات من نوع القيمة (value types) أو من النوع غيْر المُتغير (immutable)؛ لأن الخاصية AsynLocal.Value
تَحمِل نُسخة من المُتغيّر، بالتالي إذا كان المُتغيّر من النوع المَرجِعي (reference type)، فإنها ستَحمِل مَرجِعًا إلى المُتغيّر الأصلي. يعني ذلك أن أي تَغْيير على قيمة AsynLocal.Value
ضِمْن مُهِمّة فرعية معينة -في حالة النوع المَرجِعي- سينعكِس مباشرة على المُتغيّر الأصلى وتباعًا على قيمته ضِمْن المَهامّ الفرعية الأُخرى.
التعامل مع تجميعة مهام
يُوفِّر كلا من النوعين Task
و Task<T>
العديد من التوابع للتَعامُل مع تَجمِيعَة مَهامّ.
ينتظر التابعين WaitAll
و WhenAll
انتهاء تَّنْفيذ جميع المَهامّ ضمن التَجمِيعَة. في المُقابل، ينتظر التابعين WhenAny
و WaitAny
انتهاء تَّنْفيذ مُهِمّة واحدة فقط.
يَعمَل التابعين WaitAll
و WaitAny
بصورة مُتزامِنة (synchronous) مما يَتسبب بحُدوث تَعطِّيل (blocking). يُعدّ كلًا من التابعين WhenAll
و WhenAny
النسخة اللا مُتزامِنة وغيْر المُعَطِّلة (non-blocking/asynchronous) منهما على الترتيب ويُعيدان قيمة من النوع Task
يُمكنك انتظار انتهائها من العَمَل باستخدَام await
.
استخدام التابع Task.WaitAll
var tasks = Enumerable.Range(1, 5).Select( n => new Task<int>( () => { Console.WriteLine("I'm task " + n); return n; } )).ToArray(); foreach(var task in tasks) task.Start(); Task.WaitAll(tasks); foreach(var task in tasks) Console.WriteLine(task.Result);
استخدام التابع WaitAny
var allTasks = Enumerable.Range(1, 5).Select( n => new Task<int>(() => n) ).ToArray(); var pendingTasks = allTasks.ToArray(); foreach(var task in allTasks) task.Start(); while(pendingTasks.Length > 0) { var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)]; Console.WriteLine("Task {0} finished", finishedTask.Result); pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray(); } Task.WaitAll(allTasks);
ملحوظة: يُعدّ استدعاء WaitAll
ضروريًا نظرًا لأن WaitAny
لا تُبلِّغ عن الاعتراضات.
استخدام التابع WhenAll
var random = new Random(); IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select( n => Task.Run( () => { Console.WriteLine("I'm task " + n); return n; })); Task<int[]> task = Task.WhenAll(tasks); int[] results = await task; Console.WriteLine(string.Join(",", results.Select(n => n.ToString()))); // Output: 1,2,3,4,5
استخدام التابع WhenAny
var random = new Random(); IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select( n => Task.Run(async() => { Console.WriteLine("I'm task " + n); await Task.Delay(random.Next(10,1000)); return n; })); Task<Task<int>> whenAnyTask = Task.WhenAny(tasks); Task<int> completedTask = await whenAnyTask; Console.WriteLine("The winner is: task " + await completedTask); await Task.WhenAll(tasks); Console.WriteLine("All tasks finished!");
معالجة الاعتراضات (Exception Handling)
استخدام التابع WaitAll
يُبلِّغ التابع WaitAll
عن اعتراض من النوع AggregateException
إذا بَلَّغت مُهِمّة واحدة أو أكثر عن اعتراض. وبالتالي، تستطيع تَضْمِين العبارة البرمجية Task.WaitAll
داخل كتلة try..catch
، لتتمكن من مُعالجة الاعتراضات، كالمثال التالي:
var task1 = Task.Run( () => { Console.WriteLine("Task 1 code starting..."); throw new Exception("Oh no, exception from task 1!!"); }); var task2 = Task.Run( () => { Console.WriteLine("Task 2 code starting..."); throw new Exception("Oh no, exception from task 2!!"); }); Console.WriteLine("Starting tasks..."); try { Task.WaitAll(task1, task2); } catch(AggregateException ex) { Console.WriteLine("Task(s) failed!"); foreach(var inner in ex.InnerExceptions) Console.WriteLine(inner.Message); } Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
بدون استخدام Wait
var task1 = Task.Run( () => { Console.WriteLine("Task 1 code starting..."); throw new Exception("Oh no, exception from task 1!!"); }); var task2 = Task.Run( () => { Console.WriteLine("Task 2 code starting..."); throw new Exception("Oh no, exception from task 2!!"); }); var tasks = new[] {task1, task2}; Console.WriteLine("Starting tasks..."); while(tasks.All(task => !task.IsCompleted)); foreach(var task in tasks) { if(task.IsFaulted) Console.WriteLine("Task failed: " + task.Exception.InnerExceptions.First().Message); } Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted
تدفق البيانات (Dataflow)
تُسهِّل مكتبة تَوازِي المَهامّ لتدفق البيانات (TPL Dataflow Library) من البرمجة وفق نموذج تَدَفُّق البيانات (dataflow programming model).
تُوفِّر المكتبة العديد من أنواع كتل تَدَفُّق البيانات (dataflow blocks) والتي يُعدّ كُلا منها هيكل بياني يُستخدَم إمّا كمصدر للبيانات (source) أو كمقصِد (target) أو كليهما (propagator). يُمكن أيضًا تقسيم أنواع كتل التَدَفُّق إلى كتل تَخزين مُؤقت (buffering) تَحمِل البيانات للمستهلكين، وكتل تَّنْفيذية (execution) تُنفِّذ مُفوَّض يُمرَّر إليها، وكتل تجميعية (grouping).
بالإضافة إلى ذلك، تُوفِّر المكتبة العديد من التوابع لإرسال الرسائل واستقبالها من وإلى الكتل المختلفة، البعض منها مُتزامِن (synchronous) والبعض الآخر ليس كذلك.
استخدام النوع BufferBlock
لتنشئة متزامنة لنمط المُنتِج والمُستهلِك (producer-consumer pattern)
تُصنَف الكتل من النوع BufferBlock
ضِمْن كتل التخزين المؤقت (buffering)، وتَعمَل كمصدر ومَقصِد للبيانات (propagator).
يُستخدَم التابعين Post
و Receive
لكتابة الرسائل (messages) إلى كتل التَدَفُّق واستقبالها منها على الترتيب بشكل مُتزامِن (synchronous).
تُعرِّف الشيفرة التالية صنف المُنتِج:
public class Producer { private static Random random = new Random((int)DateTime.UtcNow.Ticks); // انتج القيمة المُرسلة إلى كتلة التدفق public double Produce() { var value = random.NextDouble(); Console.WriteLine($"Producing value: {value}"); return value; } }
تُعرِّف الشيفرة التالية صنف المُستهلك:
public class Consumer { //consume the value that will be received from buffer block public void Consume (double value) => Console.WriteLine($"Consuming value: {value}"); }
لاحظ كيف ضُمْنت كلًا من شيفرة المُنتِج (producer) والمُستهلِك (consumer) داخل مُهِمّتين مُنفصلتين بحيث يُصبِح من الممكن تَّنْفيذهما بصورة مُتواقِتة (concurrently)، كالتالي:
class Program { private static BufferBlock<double> buffer = new BufferBlock<double>(); static void Main (string[] args) { // انشئ مهمة ترسل قيمة قيمة من المُنتج إلى كتلة التدفق كل ثانية var producerTask = Task.Run( async () => { var producer = new Producer(); while(true) { buffer.Post(producer.Produce()); await Task.Delay(1000); } }); // انشئ مهمة تستقبل القيم من كتلة التدفق var consumerTask = Task.Run( () => { var consumer = new Consumer(); while(true) { consumer.Consume(buffer.Receive()); } }); Task.WaitAll(new[] { producerTask, consumerTask }); } }
استخدام النوع BufferBlock
لتنشئة لا مُتزامِنة لنمط المُنتِج والمُستهلِك
يُستخدَم التابعين SendAsync
و ReceiveAsync
لكتابة الرسائل (messages) إلى كتل التَدَفُّق واستقبالها منها على الترتيب بشكل لا مُتزامِن (asynchronous).
var bufferBlock = new BufferBlock<int>( new DataflowBlockOptions { BoundedCapacity = 1000 }); var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token; var producerTask = Task.Run( async () => { var random = new Random(); while (!cancellationToken.IsCancellationRequested) { var value = random.Next(); await bufferBlock.SendAsync(value, cancellationToken); } }); var consumerTask = Task.Run( async () => { while (await bufferBlock.OutputAvailableAsync()) { var value = bufferBlock.Receive(); Console.WriteLine(value); } }); await Task.WhenAll(producerTask, consumerTask);
استخدام النوع BlockingCollection
لتنشئة حلقة لنمط المنتج والمستهلك
لا يَقَع النوع BlockingCollection
ضِمْن مكتبة تَوازِي المَهامّ لتَدَفُّق البيانات (TPL Dataflow Library)، فهو مجرد تجميعة آمنة خيطيًا (thread-safe)، ويُوفِّر تَّنْفيذًا (implementation) لنمط المُنتِج والمُستهلِك (producer/consumer pattern)، لكنه لا يَتمتَع ببقية الميزات الأُخرى التي تُوفِّرها كتل التَدَفُّق. يُمكنك اِستخدَامه لحل المشاكل البسيطة التي لا تَتطلَّب خط أنابيب (pipeline) معقَّد. انظر المثال التالي:
var consumerTask = Task.Run(() => { foreach(var item in collection.GetConsumingEnumerable()) { Console.WriteLine("Consumed: " + item); Thread.Sleep(random.Next(10,1000)); } Console.WriteLine("Consumer completed!"); });
var collection = new BlockingCollection<int>(5); var random = new Random(); var producerTask = Task.Run(() => { for(int item=1; item<=10; item++) { collection.Add(item); Console.WriteLine("Produced: " + item); Thread.Sleep(random.Next(10,1000)); } collection.CompleteAdding(); Console.WriteLine("Producer completed!"); });
ضُمنت كلًا من شيفرة المُنتِج (producer) والمُستهِلك (consumer) داخل مُهِمّتين منفصلتين بحيث يُصبِح من المُمكن تَّنْفيذهما بصورة مُتواقِتة (concurrently).
Task.WaitAll(producerTask, consumerTask); Console.WriteLine("Everything completed!");
من الجدير بالذكر أنه في حالة عدم استدعاء التابع CompleteAdding
، تستطيع مُتابعة إضافة عناصر للتَجمِيعَة حتى لو أصبح المُستهِلك قيْد التَّنْفيذ. اِستدعِي التابع collection.CompleteAdding()
فقط عندما تكون متأكدًا أنك لم تَعدْ في حاجة لإضافة المزيد من العناصر.
يُمكن اِستخدَام هذا النوع لتَّنْفيذ نمط عدة مُنتجِين لمُستهِلك وحيد، والذي يعنيّ وجود عدة مصادر للبيانات، تكون مسئولة عن تغذية التَجمِيعَة -من نفس النوع BlockingCollection
- بالعناصر، وفي المقابل يَسَحَب مُستهلِك وحيد العناصر ويقوم بمُعالَجتِها بطريقة ما.
لاحظ أنه في حالة عدم استدعاء التابع CompleteAdding
وكانت التجميعة فارغة، فان المُعدَّد العائد من collection.GetConsumingEnumerable()
سيتَسَبَّب بحُدوث تَعطّيل (blocking) إلى أن يُضاف عنصر جديد للتجميعة أو إلى أن يُستدعَى التابع BlockingCollection.CompleteAdding()
.
كتابة رسائل إلى كتلة تدفق من النوع ActionBlock
تُصنَف الكتل من النوع ActionBlock
ضِمْن الكتل التَّنْفيذية (execution) حيث تُنفِّذ مُفوَّض Delegate
من النوع Action
كلما استقبَلت بيانات جديدة، وتَعمَل كمقصِد للبيانات (target) فقط. انظر المثال التالي:
// انشئ كتلة تدفق بحدث لا متزامن var block = new ActionBlock<string>( async hostName => { IPAddress[] ipAddresses = await Dns.GetHostAddressesAsync(hostName); Console.WriteLine(ipAddresses[0]); }); // ارسل البيانات إلى كتلة التدفق للمُعالجة block.Post("google.com"); block.Post("reddit.com"); block.Post("stackoverflow.com"); // بلغ كتلة التدفق بالتوقف عن استقبال عناصر جديدة block.Complete(); // انتظر حتي تنتهي كتلة التدفق من معالجة جميع العناصر await block.Completion;
توصيل كتل التدفق لتنشئة خط أنابيب البيانات (data pipelines)
يُستخدَم التابع LinkTo
لتوصيل كتلتي تَدَفُّق (dataflow blocks) بحيث تَعمَل الأولى كمصدر للبيانات (source) والاخرى كمقصِد (target). يعني ذلك أنه بمجرد استقبَال الأولى لرسالة جديدة ومُعالَجتِها، فإنها ستُبلِّغ الكتل المتصلة بها بوجود رسالة جديدة. يُمكنك إنشاء خط أنابيب (pipeline) عن طريق توصيل عدة كتل تَدَفُّق ببعضها. كالمثال التالي:
var httpClient = new HttpClient(); // أنشئ كتلة تدفق لاستقبال رابط وإعادة محتوياته كسِلسِلة نصية var downloaderBlock = new TransformBlock<string, string>( async uri => await httpClient.GetStringAsync(uri)); // انشئ كتلة تدفق لاستقبال سلسلة نصية وطباعتها على الطرفية var printerBlock = new ActionBlock<string>( contents => Console.WriteLine(contents)); // اجعل كتلة جلب محتويات الروابط مكتملة بمجرد اكتمال كتلة تدفق الطباعة var dataflowLinkOptions = new DataflowLinkOptions {PropagateCompletion = true}; // وصل كتلتي التدفق لإنشاء خط أنابيب downloaderBlock.LinkTo(printerBlock, dataflowLinkOptions); // أرسل عدة روابط لكتلة التدفق الأولى والتي ستُمرر محتوياتها إلى كتلة التدفق الثانية downloaderBlock.Post("http://youtube.com"); downloaderBlock.Post("http://github.com"); downloaderBlock.Post("http://twitter.com"); downloaderBlock.Complete(); // Completion will propagate to printerBlock await printerBlock.Completion; // Only need to wait for the last block in the pipeline
السياق التزامني (Synchronization Contexts)
يُعدّ السياق التزامني تمثيلًا تجريديًا (abstract representation) للبيئة التي تُنفَّذ بها الشيفرة.
يُستخدَم عادةً عندما يُريد خيط ما تَّنْفيذ شيفرة ضِمْن خيط آخر. في هذه الحالة، يُلتقَط سياق الخيط المستهدف SynchronizationContext
، ويُمرَّر للخيط الفرعي، الذي يستطيع -عند الحاجة- اِستخدَام التابع SynchronizationContext.Post
لإرسال مُفوَّض (delegate) إلى سياق الخيط المُلتقَط لضَمان تَّنْفيذ المفوَّض ضِمْن ذلك الخيط.
يكون ذلك بالأخص ضروريًا عند التَعامُل مع مُكوِنات الواجهة (UI component)، والتي لا يمكن تعديل خاصياتها إلا من خلال الخيط المالك الذي انشأها.
الولوج إلى مُكِونات الواجهة (UI components) من خيوط أخرى
إذا أردت تغيير خاصية مُكِون واجهة أو نموذج تَحكُم (form control)، مثل صندوق نصي (textbox) أو عنوان (label)، من خيط (thread) آخر غير خيط الواجهة الرسومية المُنشِئ لهذا المُكِون. فلابُدّ من اِستخدَام طرائق مُعينة غيْر التعديل المُباشر، وإلا ستحصل على رسالة الخطأ التالية:
اقتباس"العمليات عبر الخيوط غير صالحة: مُحاولة الولوج إلى مُكِون واجهة من خيط آخر غير الخيط الذي أنشأه"
فمثلًا، ستؤدي الشيفرة التالية إلى التبلِّيغ عن اعتراض يَحمِل الرسالة السابقة:
private void button4_Click(object sender, EventArgs e) { Thread thread = new Thread(updatetextbox); thread.Start(); } private void updatetextbox() { textBox1.Text = "updated"; }
استخدام السياق التزامُني SynchronizationContext
يمكن لخيط بالخلفية (background) تنفيذ شيفرة بخيط الواجهة (UI thread) لتَحْديث إحدى مُكوِنات الواجهة (UI component) من خلال استقبال السياق التزامني لخيط الواجهة من النوع SynchronizationContext
، كالمثال التالي:
void Button_Click(object sender, EventArgs args) { SynchronizationContext context = SynchronizationContext.Current; Task.Run( () => { for(int i = 0; i < 10; i++) { Thread.Sleep(500); context.Post(ShowProgress, "Work complete on item " + i); } } }
void UpdateCallback(object state) { // يُستدعى هذا التابع من خلال خيط الواجهة فقط ولذلك تُحدث بنجاح this.MyTextBox.Text = state as string; }
في المثال بالأعلى، إذا حَاول الخيط الفرعي تَحْديث خاصية مُكوِن الواجهة MyTextBox.Text
داخل الحلقة for
مباشرة، سيُبلَّغ عن خطأ خيطي (threading). أما إذا أرسل الحَدَث UpdateCallback
إلى السياق التزامني SynchronizationContext
الخاص بخيط الواجهة (UI thread)، ستتم عملية تَحْديث الصندوق النصي ضِمْن نفس خيط مُكوِنات الواجهة بنجاح.
عمليًا، ينبغي أن تَستخدَم الواجهة System.IProgress
لتَحْديث شريط التقدم (progress)؛ حيث يَلتقِط التَّنْفيذ الافتراضي لهذه الواجهة System.Progress
سياق المُنشِئ التزامني (synchronisation context) تلقائيًا.
استخدام التابعين Control.Invoke
أو Control.BeginInvoke
حل آخر هو أن تَستخدِم أحد التابعين Control.Invoke
أو Control.BeginInvoke
لتَغْيير مُحتوى مُكِون الواجهة -صندوق نصي مثلًا- من خيط آخر غير الخيط المالك لنموذج التحكم (form control). علاوة على ذلك، يُمكنك الاستعانة بالخاصية Control.InvokeRequired
لفحص ما إذا كان استدعاء نموذج التحكم ضروريًا أم لا، كالتالي:
private void updatetextbox() { if (textBox1.InvokeRequired) textBox1.BeginInvoke((Action)(() => textBox1.Text = "updated")); else textBox1.Text = "updated"; }
إذا كنت ستَحتَاج لتَّنْفيذ ذلك بشكل مُتكرر، فربما من الأفضل تَضْمِين هذه الشيفرة داخل دالة مُوسِعَة (extension) للأنواع القابلة للاستدعاء (invokeable)، كالتالي:
public static class Extensions { public static void BeginInvokeIfRequired(this ISynchronizeInvoke obj, Action action) { if (obj.InvokeRequired) obj.BeginInvoke(action, new object[0]); else action(); } }
وبالتالي يُصبِح تَحْديث صندوق نصي من أي خيط أمرًا يسيرًا، كالتالي:
private void updatetextbox() { textBox1.BeginInvokeIfRequired(() => textBox1.Text = "updated"); }
الفرق بين التابعين Control.BeginInvoke
و Control.Invoke
هو أن الأول يُنفَّذ بشكل غير مُتزامِن (asynchronous) أما الثاني فيُنفَّذ بشكل مُتزامِن (synchronous). يعني ذلك أنه إذا اُستخدِم التابع الأول Control.BeginInvoke
، فإن الشيفرة المكتوبة بعد استدعاء هذا التابع ستُنفَّذ بغض النظر عن انتهاء المفوَّض المُمرَّر إلى التابع من عَمَله أم لا. في المقابل، لن تُنفَّذ الشيفرة المكتوبة بعد التابع الثاني Control.Invoke
إلا بعد انتهاء المفوَّض من العَمَل. يؤدي ذلك إلى حُدوث تَعطّيل بالخيط الأساسي ويَتسبَّب ببطئ الشيفرة إلى حد كبير خاصةً إذا لجأت إلى استخدام هذا التابع بكثرة. ربما أيضًا قد يَتسبَّب بحُدوث قفل ميت (deadlock) في حالة انتظار خيط الواجهة (GUI thread) انتهاء الخيط الفرعي من عمله أو انتظار تحريره لمَورِد معين.
استخدام الواجهة IProgress
لاحظ أن التابع Report
مُنفَّذ صراحةً (explicit implementation) ضِمْن الواجهة IProgress
، وليس موجودًا ضِمْن النوع المُنفِّذ System.Progress
. لذا إمّا أن تَستدعِي التابع من خلال الواجهة أو من خلال النوع لكن بعد أن تُحوّله لنوع الواجهة (casting)، كالمثال التالي:
var p1 = new Progress<int>(); p1.Report(1); // خطأ تصريف IProgress<int> p2 = new Progress<int>(); p2.Report(2); // تُصرف بنجاح var p3 = new Progress<int>(); ((IProgress<int>)p3).Report(3); // تُصرف بنجاح
مثال بسيط لتحديث التقدم
يُمكن لعملية معينة أن تَستخدِم الواجهة IProgress
لتَحْديث شريط التقدم لعملية أخرى عن طريق استقبال كائن من الواجهة IProgress
، واستدعاء تابعها Report
كلما أرادت تَحْديث التقدم، كالمثال التالي:
void Main() { IProgress<int> p = new Progress<int>( progress => { Console.WriteLine("Running Step: {0}", progress); }); LongJob(p); }
public void LongJob(IProgress<int> progress) { var max = 10; for (int i = 0; i < max; i++) { progress.Report(i); } }
الخْرج:
Running Step: 0 Running Step: 3 Running Step: 4 Running Step: 5 Running Step: 6 Running Step: 7 Running Step: 8 Running Step: 9 Running Step: 2 Running Step: 1
يُنفَّذ التابع IProgress.Report()
بشكل لا مُتزامِن (asynchronously)، لذلك ربما لا تُطبَع الأرقام بنفس ترتيب الاستدعاء. بالتالي، يُعدّ غير مُلائم عندما يكون التَحْديث وِفق ترتيب الاستدعاء ضروريًا.
ترجمة -وبتصرف- للفصول 38 - 39 - 41 - 42 - 43 - 44 - 31 من كتاب .NET Framework Notes for Professionals
أفضل التعليقات
لا توجد أية تعليقات بعد
انضم إلى النقاش
يمكنك أن تنشر الآن وتسجل لاحقًا. إذا كان لديك حساب، فسجل الدخول الآن لتنشر باسم حسابك.