اذهب إلى المحتوى

تنفيذ المهام بالتوازي في dot NET


رضوى العربي

تُسهِل بيئة عمل ‎.NET من البرمجة مُتعددة الأنوية (multi-core programming) من خلال توفير مَكتبة تَوازِي المَهامّ Task Parallel Library، حيث تَسمَح لك المكتبة بكتابة شيفرة -بالإضافة إلى كَوْنها مَقْرُوءة- فهي تُؤقلم نفسها مع العدد المُتاح من الأنوية (cores)، مما يَضمَن الترقية التلقائية للشيفرة وتحسين أدائها مع ترقية البيئة (environment).

توازي البيانات (Data parallelism)

تَوازِي البيانات (Data parallelism) هو التَّنفيذ المُتواقِت (concurrent) لنفس ذات العملية على عدة عناصر مُخزَّنة إِمّا بتَجمِيعة أو بمصفوفة. يُوفِّر إطار العمل ‎.NET كلًا من البنائين البرمجيين (constructs) أو بشكل أدق التابعين Parallel.For و Parallel.Foreach لتَّنْفيذ حلقة (loop) مُتواقِتة من خلال تجزئة البيانات إلى مجموعات تُعالَج بنفس ذات الوقت. تُعدّ كلًا منهما النُسخة المُتواقِتة من الحلقة (loop) باستخدام for و foreach على الترتيب. مثلًا انظر المثال التالي:

// النسخة المتتالية
foreach (var item in sourcecollection){
    Process(item);
}

// النسخة المتواقتة
Parallel.Foreach(sourcecollection, item => Process(item));

صُمم التابع Parallel.ForEach بطريقة تَسمَح له -إن أمكن- باِستخدَام أكثر مِن نواة (core) أثناء التَّنْفيذ مما يُحسِن من أداء الشيفرة.

استخدام التابع Parallel.For

يُنفِّذ البناء البرمجي أو التابع Parallel.For حلقة (loop) مُتواقِتة.

تتوافر عدة بصمات من هذا التابع تَستقبِل جميعها القيمة الأولية والنهائية للحلقة كأول مُعامِلين. تختلف البصمات بعد ذلك في عدد ونوع المُعامِلات لكن بالأخير لابُدّ لهن جميعًا من اِستقبَال مُفوَّض مَتن الحلقة من النوع Delegate، والذي يَحوِي العبارات البرمجية المطلوب تَّنْفيذها. على سبيل المثال، تَستقبِل البصمة، في المثال التالي، مُفوَّض مَتن الحلقة كمُعامِل رابع. تُمرَّر قيمة الحلقة الحالية (counter) لهذا المُفوَّض كأول مُعامِل له.

ملحوظة: تتوفَّر بصمات مُبسطة من التابع Parallel.For تَقتصِر على المُعامِلات الثلاثة المذكورة بالأعلى والتي تُمثِل الأساس الجوهري لأيّ حلقة (loop) سواء كانت مُتواقِتة أم لا.

يُنشِئ التابع Parallel.For عدة خيوط (threads) تُنفَّذ بنفس ذات الوقت. عادةً ما يحتاج كل خيط (thread) منها لمُتغيّر محلي (local) خاص يَتوَافر طوال فترة حياة (lifetime) الخيط. تَسمَح البصمة في المثال التالي بذلك، حيث يُستخدَم مُعامِلها الثالث لتهيئة القيمة المبدئية لذلك المُتغيّر. يُمرَّر هذا المتغير كمُعامِل ثالث إلى مُفوَّض مَتن الحلقة، بحيث يَستطيع التعديل من قيمته وإعادته لتَستقبِله الحلقة التالية ضِمْن نفس الخيط. تستمر العملية حتى ينتهي تَّنْفيذ جميع الحلقات ضِمْن نفس الخيط، وعندها تُمرَّر القيمة النهائية للمُتغيّر إلى مُفوَّض أخير (Delegate) من النوع Action، والذي يُنفَّذ مرة واحدة فقط قبل انتهاء فترة حياة الخيط. هذا المفوَّض الأخير هو المُعامِل الخامس للبصمة في المثال التالي.

في المثال التالي، يُحسَب مجموع الأعداد من 1 إلى 10000 بصورة مُتواقِتة. بعد انتهاء كل خيط من تَّنْفيذ حلقاته، تُضاف القيمة النهائية للمُتغيّر المحلي الخاص بالخيط localSum إلى القيمة الكلية الفعلّية total باِستخدَام التابع Interlocked.Add وذلك لضَمان الأمن الخيطي (thread-safety).

using System.Threading;

int Foo()
{
    int total = 0;
    Parallel.For(1, 
                 10001,
                 () => 0, // القيمة المبدئية,
                 (num, state, localSum) => num + localSum,
                 localSum => Interlocked.Add(ref total, localSum));
    return total; 
}

استخدام التابع Parallel.ForEach

كالمثال السابق، يُنفِّذ التابع Parallel.ForEach حلقة (loop) مُتواقِتة، لكنه يُجريها على تَجمِيعَة بحيث تُقسَّم عناصر التَجمِيعَة وتُوزَّع على الخيوط المُنشئة.

using System.Threading;
int Foo()
{
    int total = 0;
    var numbers = Enumerable.Range(1, 10000).ToList();
    Parallel.ForEach(numbers,
                     () => 0, // القيمة المبدئية
                     (num, state, localSum) => num + localSum,
                     localSum => Interlocked.Add(ref total, localSum));
    return total; 
}

تَتوفَّر بصمات أبسط من التابع Parallel.ForEach، كالمثال التالي:

Parallel.Foreach(sourcecollection, item => Process(item));

يتوفَّر التابع Parallel.ForEach بلغة الفيجوال بيسك كالتالي:

For Each row As DataRow In FooDataTable.Rows
Me.RowsToProcess.Add(row)
Next

Dim myOptions As ParallelOptions = New ParallelOptions()
myOptions.MaxDegreeOfParallelism = environment.processorcount
Parallel.ForEach(RowsToProcess, myOptions, Sub(currentRow, state)
    ProcessRowParallel(currentRow, state)
End Sub)

توازي المهام (Task parallelism)

تَوازِي المَهامّ (Task parallelism) هو التَّنْفيذ المُتواقِت (concurrent) لعمليات مُنفصلة.

تَعتمِد مكتبة تَوازِي المَهامّ (Task Parallel Library) بشكل رئيسي على مفهوم المَهامّ (tasks)، والتي يُمكن مُوازنتِها بالخيوط (threads) ولكن على مُستوى أعلى من التجريد (abstraction).

تَكون المَهامّ إمّا من النوع Task إذا كانت لا تُعيد قيمة أو من النوع Task<T>‎ إذا كانت تُعيد قيمة من النوع T.

استخدام التابع Parallel.Invoke

يُعدّ التابع Parallel.Invoke أحد أبسط وسائل تَحقيق تَوازِي المَهامّ، حيث لا يَتطلَّب أي تَعامُل مباشر مع النوع Task. فقط مَرِّر إليه أيّ عدد من العمليات، كلًا منها من النوع Delegate<Action>‎، وسيُحوِّلها إلى عدة مَهامّ (tasks) ثم يُنفِّذها بصورة مُتواقِتة، كالمثال التالي:

var actions = Enumerable.Range(1, 10).Select(
    n => 
    new Action(
        () =>
        {
            Console.WriteLine("I'm task " + n);

            if((n & 1) == 0)
                throw new Exception("Exception from task " + n);
        }
    )).ToArray();

try
{
    Parallel.Invoke(actions);
}
catch(AggregateException ex)
{
    foreach(var inner in ex.InnerExceptions)
        Console.WriteLine("Task failed: " + inner.Message);
}

لمزيد من التَحكُم ستحتاج إلى التَعامُل مع النوع Task مباشرة.

تنشئة مُهِمّة Task باستخدام باني كائنات النوع Task

يُمكنك إنشاء نُسخة من النوع Task مباشرة باِستخدَام باني الكائنات. فقط مَرِّر مُعامِلًا من النوع Delegate إليه يَحوِي العبارات البرمجية المُراد تَّنْفيذها خلال المُهِمّة. تحتاج لإستدعاء التابع Start اِستدعاءً صريحًا لبدء تَّنْفيذ المُهِمّة. كالتالي:

var task = new Task(
    () =>
    {
        Console.WriteLine("Task code starting...");
        Thread.Sleep(2000);
        Console.WriteLine("...task code ending!");
    });

Console.WriteLine("Starting task...");
task.Start();
task.Wait();
Console.WriteLine("Task completed!");

تنشئة مُهِمّة Task باستخدام التابع Task.Run

بالمثل يَستقبِل التابع Task.Run مُعامِلًا من النوع Delegate يَحوِي العبارات البرمجية المُراد تَّنْفيذها خلال المُهِمّة، وينشئ مُهِمّة Task ويُنفِّذها في آن واحد. لذا، أنت لست في حاجة لاِستدعاء التابع Start.

إذا كان الـ Delegate المُمرَّر من النوع Action، يُنشَئ مُتغيّر من النوع Task. كالتالي:

Console.WriteLine("Starting task...");

var task = Task.Run(
    () =>
    {
        Console.WriteLine("Task code starting...");
        Thread.Sleep(2000);
        Console.WriteLine("...task code ending!");
    });

task.Wait();
Console.WriteLine("Task completed!");

تنشئة مُهِمّة Task<T>‎ باستخدام التابع Task.Run

إذا كان الـ Delegate المُمرَّر من النوع Func<T>‎، تُنشَئ مُهمِة (task) من النوع Task<T>‎، بحيث تكون القيمة العائدة من المُهِمّة من النوع T. كالتالي:

Task<int> t = Task.Run(
    () =>
    {
        int sum = 0;
        for(int i = 0; i < 500; i++)
            sum += i;
        return sum;
    });

Console.WriteLine(t.Result); // 124750

كما رأينا بالمثال السابق، تُتيِح الخاصية Result بالنوع Task الوُلوج للقيمة العائدة من المُهِمّة.

إذا كانت المُهِمّة تُنفَّذ بشكل غيْر مُتزامِن (asynchronous)، فإن اِستخدَام await لانتظار انتهائها من العَمَل يُعيد القيمة الفعلّية والتي تكون من النوع T، كالتالي:

public async Task DoSomeWork()
{
    WebClient client = new WebClient();
    // ‫نظرًا لاستخدام await تسند نتيجة المهمة إلى المتغير 
    string response = await client.DownloadStringTaskAsync("http://somedomain.com");
}

إلغاء مهمة باستخدام CancellationToken

يُمكن إلغاء مُهِمّة أثناء تَّنْفيذها من خلال مُفتاح الإلغاء CancellationToken.

تُنشِئ المُهِمّة الأساسية مُفتاح إلغاء (cancellation token) وتُمرِّره لإحدى مُهِمّاتها الفرعية أثناء تَنشِئتها من خلال المُعامِل state بباني كائنات النوع Task. وفي حالة أَرَادت إلغائها، يُمكِنها اِستدعاء التابع cancellationTokenSource.Cancel.

في المقابل، ينبغي للمُهِمّة الفرعية أن تَستدعِي التابع ThrowIfCancellationRequested دَوريًّا لضَمان الاستجابة لطلب الإلغاء. كالمثال التالي:

var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;

var task = new Task(
    (state) =>
    {
        int i = 1;
        var myCancellationToken = (CancellationToken)state;
        while(true)
        {
            Console.Write("{0} ", i++);
            Thread.Sleep(1000);
            myCancellationToken.ThrowIfCancellationRequested();
        }
    },
    cancellationToken: cancellationToken,
    state: cancellationToken);

Console.WriteLine("Counting to infinity. Press any key to cancel!");

task.Start();

Console.ReadKey();
cancellationTokenSource.Cancel();

try
{
    task.Wait();
}
catch(AggregateException ex)
{
    ex.Handle(inner => inner is OperationCanceledException);
}

Console.WriteLine($"{Environment.NewLine}You have cancelled! Task status is: {task.Status}");
//Canceled

تستطيع المُهِمّة الفرعية أيضًا فحص وجود طلب الإلغاء باستخدام التابع IsCancellationRequested، والذي يُعيد قيمة منطقية. يُمكِنها بعد ذلك التبلّيغ عن اعتراض من النوع OperationCanceledException، كالمثال التالي:

//New task delegate
int i = 1;
var myCancellationToken = (CancellationToken)state;

while(!myCancellationToken.IsCancellationRequested)
{
    Console.Write("{0} ", i++);
    Thread.Sleep(1000);
}

Console.WriteLine($"{Environment.NewLine}Ouch, I have been cancelled!!");
throw new OperationCanceledException(myCancellationToken);

لاحظ أنه بالإضافة إلى تمرير مفتاح الإلغاء (cancellation token) للمُعامِل state بباني كائنات النوع Task، فقد مُرِّر مفتاح الإلغاء أيضًا للمُعامِل cancellationToken. يُعدّ ذلك ضروريًا لضَمان انتقال المُهِمّة إلى الحالة Canceled بدلًا من Faulted عند اِستدعاء التابع ThrowIfCancellationRequested. مُرِّر أيضًا مفتاح الإلغاء لمُعامِل باني كائنات النوع OperationCanceledException تمريرًا صريحًا لنفس السبب.

ضمان منطقية السياق التنفيذي باستخدام النوع AsyncLocal

يُمكن للمُهِمّة الأساسية (parent task) تمرير بعض البيانات إلى مَهامّها الفرعية (children tasks) باستخدام النوع AsyncLocal الذي يَسمَح بتَوفُّر نُسخة محلية (local) من تلك البيانات لكل مُهِمّة فرعية بشكل لا يُؤثر على البيانات الأصلية ضِمْن المُهِمّة الأساسية (parent task) مما يَضمَن منطقية السياق التَّنْفيذي.

في المثال التالي:

(1): لا يُؤثِر تَغيير قيمة المُتغيّر على بقية المَهامّ الفرعية؛ نظرًا لكَوْنه محلي (local).

(2): تَدَفَّقت القيمة من التابع main إلى المُهِمّة task1 ومِنْ ثَمَّ إلى هذه المُهِمّة الفرعية دون أن تَتأثَر بأي تَغيّير قد يقع ضِمْن مُهمات اخرى.

(3): بالمثل، تَدَفَّقت القيمة من التابع main إلى المُهِمّة task1 ومِنْ ثَمَّ إلى هذه المُهِمّة الفرعية، لكن في هذه الحالة حَدَث تعديل ضمن المُهِمّة task1 والتي تُعدّ ضِمْن سياقها التَّنْفيذي ولذلك تَأثَرت بالتَغيّير.

void Main()
{
    AsyncLocal<string> user = new AsyncLocal<string>();
    user.Value = "initial user";

    Task.Run(() => user.Value = "user from another task"); // (1)

    var task1 = Task.Run(
        () =>
        {
            Console.WriteLine(user.Value); // "initial user"

            Task.Run(
                () =>
                {
                    // "initial user"
                    Console.WriteLine(user.Value); // (2)
                }).Wait();

            user.Value = "user from task1";

            Task.Run(
                () =>
                {
                    // "user from task1"
                    Console.WriteLine(user.Value); // (3)
                }).Wait();
        });

    task1.Wait();

    // "initial user" 
    Console.WriteLine(user.Value);
}

ملحوظة: المُمارسة المُثلى تكون بقَصر اَستخدَام النوع AsyncLocal على المُتغيّرات من نوع القيمة (value types) أو من النوع غيْر المُتغير (immutable)؛ لأن الخاصية AsynLocal.Value تَحمِل نُسخة من المُتغيّر، بالتالي إذا كان المُتغيّر من النوع المَرجِعي (reference type)، فإنها ستَحمِل مَرجِعًا إلى المُتغيّر الأصلي. يعني ذلك أن أي تَغْيير على قيمة AsynLocal.Value ضِمْن مُهِمّة فرعية معينة -في حالة النوع المَرجِعي- سينعكِس مباشرة على المُتغيّر الأصلى وتباعًا على قيمته ضِمْن المَهامّ الفرعية الأُخرى.

التعامل مع تجميعة مهام

يُوفِّر كلا من النوعين Task و Task<T>‎ العديد من التوابع للتَعامُل مع تَجمِيعَة مَهامّ.

ينتظر التابعين WaitAll و WhenAll انتهاء تَّنْفيذ جميع المَهامّ ضمن التَجمِيعَة. في المُقابل، ينتظر التابعين WhenAny و WaitAny انتهاء تَّنْفيذ مُهِمّة واحدة فقط.

يَعمَل التابعين WaitAll و WaitAny بصورة مُتزامِنة (synchronous) مما يَتسبب بحُدوث تَعطِّيل (blocking). يُعدّ كلًا من التابعين WhenAll و WhenAny النسخة اللا مُتزامِنة وغيْر المُعَطِّلة (non-blocking/asynchronous) منهما على الترتيب ويُعيدان قيمة من النوع Task يُمكنك انتظار انتهائها من العَمَل باستخدَام await.

استخدام التابع Task.WaitAll

var tasks = Enumerable.Range(1, 5).Select(
    n => new Task<int>(
        () => {
            Console.WriteLine("I'm task " + n);
            return n;
        }
    )).ToArray();

foreach(var task in tasks) 
    task.Start();

Task.WaitAll(tasks);

foreach(var task in tasks)
    Console.WriteLine(task.Result);

استخدام التابع WaitAny

var allTasks = Enumerable.Range(1, 5).Select(
    n => new Task<int>(() => n)
).ToArray();

var pendingTasks = allTasks.ToArray();

foreach(var task in allTasks) 
    task.Start();

while(pendingTasks.Length > 0)
{
    var finishedTask = pendingTasks[Task.WaitAny(pendingTasks)];
    Console.WriteLine("Task {0} finished", finishedTask.Result);
    pendingTasks = pendingTasks.Except(new[] {finishedTask}).ToArray();
}

Task.WaitAll(allTasks);

ملحوظة: يُعدّ استدعاء WaitAll ضروريًا نظرًا لأن WaitAny لا تُبلِّغ عن الاعتراضات.

استخدام التابع WhenAll

var random = new Random();

IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(
    n => Task.Run(
        () =>
        {
            Console.WriteLine("I'm task " + n);
            return n;
        }));

Task<int[]> task = Task.WhenAll(tasks);
int[] results = await task;
Console.WriteLine(string.Join(",", results.Select(n => n.ToString())));
// Output: 1,2,3,4,5

استخدام التابع WhenAny

var random = new Random();

IEnumerable<Task<int>> tasks = Enumerable.Range(1, 5).Select(
    n => Task.Run(async() => {
        Console.WriteLine("I'm task " + n);
        await Task.Delay(random.Next(10,1000));
        return n;
    }));

Task<Task<int>> whenAnyTask = Task.WhenAny(tasks);
Task<int> completedTask = await whenAnyTask;
Console.WriteLine("The winner is: task " + await completedTask);

await Task.WhenAll(tasks);
Console.WriteLine("All tasks finished!");

معالجة الاعتراضات (Exception Handling)

استخدام التابع WaitAll

يُبلِّغ التابع WaitAll عن اعتراض من النوع AggregateException إذا بَلَّغت مُهِمّة واحدة أو أكثر عن اعتراض. وبالتالي، تستطيع تَضْمِين العبارة البرمجية Task.WaitAll داخل كتلة try..catch، لتتمكن من مُعالجة الاعتراضات، كالمثال التالي:

var task1 = Task.Run(
    () =>
    {
        Console.WriteLine("Task 1 code starting...");
        throw new Exception("Oh no, exception from task 1!!");
    });

var task2 = Task.Run(
    () =>
    {
        Console.WriteLine("Task 2 code starting...");
        throw new Exception("Oh no, exception from task 2!!");
    });

Console.WriteLine("Starting tasks...");

try
{
    Task.WaitAll(task1, task2);
}
catch(AggregateException ex)
{
    Console.WriteLine("Task(s) failed!");

    foreach(var inner in ex.InnerExceptions)
        Console.WriteLine(inner.Message);
}

Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted

بدون استخدام Wait

var task1 = Task.Run(
    () =>
    {
        Console.WriteLine("Task 1 code starting...");
        throw new Exception("Oh no, exception from task 1!!");
    });

var task2 = Task.Run(
    () =>
    {
        Console.WriteLine("Task 2 code starting...");
        throw new Exception("Oh no, exception from task 2!!");
    });

var tasks = new[] {task1, task2};
Console.WriteLine("Starting tasks...");

while(tasks.All(task => !task.IsCompleted));

foreach(var task in tasks)
{
    if(task.IsFaulted)
        Console.WriteLine("Task failed: " +
                          task.Exception.InnerExceptions.First().Message);
}

Console.WriteLine("Task 1 status is: " + task1.Status); //Faulted
Console.WriteLine("Task 2 status is: " + task2.Status); //Faulted

تدفق البيانات (Dataflow)

تُسهِّل مكتبة تَوازِي المَهامّ لتدفق البيانات (TPL Dataflow Library) من البرمجة وفق نموذج تَدَفُّق البيانات (dataflow programming model).

تُوفِّر المكتبة العديد من أنواع كتل تَدَفُّق البيانات (dataflow blocks) والتي يُعدّ كُلا منها هيكل بياني يُستخدَم إمّا كمصدر للبيانات (source) أو كمقصِد (target) أو كليهما (propagator). يُمكن أيضًا تقسيم أنواع كتل التَدَفُّق إلى كتل تَخزين مُؤقت (buffering) تَحمِل البيانات للمستهلكين، وكتل تَّنْفيذية (execution) تُنفِّذ مُفوَّض يُمرَّر إليها، وكتل تجميعية (grouping).

بالإضافة إلى ذلك، تُوفِّر المكتبة العديد من التوابع لإرسال الرسائل واستقبالها من وإلى الكتل المختلفة، البعض منها مُتزامِن (synchronous) والبعض الآخر ليس كذلك.

استخدام النوع BufferBlock لتنشئة متزامنة لنمط المُنتِج والمُستهلِك (producer-consumer pattern)

تُصنَف الكتل من النوع BufferBlock ضِمْن كتل التخزين المؤقت (buffering)، وتَعمَل كمصدر ومَقصِد للبيانات (propagator).

يُستخدَم التابعين Post و Receive لكتابة الرسائل (messages) إلى كتل التَدَفُّق واستقبالها منها على الترتيب بشكل مُتزامِن (synchronous).

تُعرِّف الشيفرة التالية صنف المُنتِج:

public class Producer
{
    private static Random random = new Random((int)DateTime.UtcNow.Ticks);

    // انتج القيمة المُرسلة إلى كتلة التدفق
    public double Produce()
    {
        var value = random.NextDouble();
        Console.WriteLine($"Producing value: {value}");
        return value;
    }
}

تُعرِّف الشيفرة التالية صنف المُستهلك:

public class Consumer
{
    //consume the value that will be received from buffer block
    public void Consume (double value) => Console.WriteLine($"Consuming value: {value}");
}

لاحظ كيف ضُمْنت كلًا من شيفرة المُنتِج (producer) والمُستهلِك (consumer) داخل مُهِمّتين مُنفصلتين بحيث يُصبِح من الممكن تَّنْفيذهما بصورة مُتواقِتة (concurrently)، كالتالي:

class Program
{
    private static BufferBlock<double> buffer = new BufferBlock<double>();

    static void Main (string[] args)
    {
        // انشئ مهمة ترسل قيمة قيمة من المُنتج إلى كتلة التدفق كل ثانية
        var producerTask = Task.Run(
            async () =>
            {
                var producer = new Producer();
                while(true)
                {
                    buffer.Post(producer.Produce());
                    await Task.Delay(1000);
                }
            });

        // انشئ مهمة تستقبل القيم من كتلة التدفق
        var consumerTask = Task.Run(
            () =>
            {
                var consumer = new Consumer();
                while(true)

                {
                    consumer.Consume(buffer.Receive());
                }
            });

        Task.WaitAll(new[] { producerTask, consumerTask });
    }
}

استخدام النوع BufferBlock لتنشئة لا مُتزامِنة لنمط المُنتِج والمُستهلِك

يُستخدَم التابعين SendAsync و ReceiveAsync لكتابة الرسائل (messages) إلى كتل التَدَفُّق واستقبالها منها على الترتيب بشكل لا مُتزامِن (asynchronous).

var bufferBlock = new BufferBlock<int>(
    new DataflowBlockOptions
    {
        BoundedCapacity = 1000
    });

var cancellationToken = new CancellationTokenSource(TimeSpan.FromSeconds(10)).Token;

var producerTask = Task.Run(
    async () =>
    {
        var random = new Random();
        while (!cancellationToken.IsCancellationRequested)
        {
            var value = random.Next();
            await bufferBlock.SendAsync(value, cancellationToken);
        }
    });

var consumerTask = Task.Run(
    async () =>
    {
        while (await bufferBlock.OutputAvailableAsync())
        {
            var value = bufferBlock.Receive();
            Console.WriteLine(value);
        }
    });

await Task.WhenAll(producerTask, consumerTask);

استخدام النوع BlockingCollection لتنشئة حلقة لنمط المنتج والمستهلك

لا يَقَع النوع BlockingCollection ضِمْن مكتبة تَوازِي المَهامّ لتَدَفُّق البيانات (TPL Dataflow Library)، فهو مجرد تجميعة آمنة خيطيًا (thread-safe)، ويُوفِّر تَّنْفيذًا (implementation) لنمط المُنتِج والمُستهلِك (producer/consumer pattern)، لكنه لا يَتمتَع ببقية الميزات الأُخرى التي تُوفِّرها كتل التَدَفُّق. يُمكنك اِستخدَامه لحل المشاكل البسيطة التي لا تَتطلَّب خط أنابيب (pipeline) معقَّد. انظر المثال التالي:

var consumerTask = Task.Run(() => {
    foreach(var item in collection.GetConsumingEnumerable())
    {
        Console.WriteLine("Consumed: " + item);
        Thread.Sleep(random.Next(10,1000));
    }
    Console.WriteLine("Consumer completed!");
});
var collection = new BlockingCollection<int>(5);
var random = new Random();

var producerTask = Task.Run(() => {
    for(int item=1; item<=10; item++)
    {
        collection.Add(item);
        Console.WriteLine("Produced: " + item);
        Thread.Sleep(random.Next(10,1000));
    }

    collection.CompleteAdding();
    Console.WriteLine("Producer completed!");
});

ضُمنت كلًا من شيفرة المُنتِج (producer) والمُستهِلك (consumer) داخل مُهِمّتين منفصلتين بحيث يُصبِح من المُمكن تَّنْفيذهما بصورة مُتواقِتة (concurrently).

Task.WaitAll(producerTask, consumerTask);

Console.WriteLine("Everything completed!");

من الجدير بالذكر أنه في حالة عدم استدعاء التابع CompleteAdding، تستطيع مُتابعة إضافة عناصر للتَجمِيعَة حتى لو أصبح المُستهِلك قيْد التَّنْفيذ. اِستدعِي التابع collection.CompleteAdding()‎ فقط عندما تكون متأكدًا أنك لم تَعدْ في حاجة لإضافة المزيد من العناصر.

يُمكن اِستخدَام هذا النوع لتَّنْفيذ نمط عدة مُنتجِين لمُستهِلك وحيد، والذي يعنيّ وجود عدة مصادر للبيانات، تكون مسئولة عن تغذية التَجمِيعَة -من نفس النوع BlockingCollection- بالعناصر، وفي المقابل يَسَحَب مُستهلِك وحيد العناصر ويقوم بمُعالَجتِها بطريقة ما.

لاحظ أنه في حالة عدم استدعاء التابع CompleteAdding وكانت التجميعة فارغة، فان المُعدَّد العائد من collection.GetConsumingEnumerable()‎ سيتَسَبَّب بحُدوث تَعطّيل (blocking) إلى أن يُضاف عنصر جديد للتجميعة أو إلى أن يُستدعَى التابع BlockingCollection.CompleteAdding()‎.

كتابة رسائل إلى كتلة تدفق من النوع ActionBlock

تُصنَف الكتل من النوع ActionBlock ضِمْن الكتل التَّنْفيذية (execution) حيث تُنفِّذ مُفوَّض Delegate من النوع Action كلما استقبَلت بيانات جديدة، وتَعمَل كمقصِد للبيانات (target) فقط. انظر المثال التالي:

// انشئ كتلة تدفق بحدث لا متزامن
var block = new ActionBlock<string>(
    async hostName =>
    {
        IPAddress[] ipAddresses = await Dns.GetHostAddressesAsync(hostName);
        Console.WriteLine(ipAddresses[0]);
    });

// ارسل البيانات إلى كتلة التدفق للمُعالجة
block.Post("google.com"); 
block.Post("reddit.com");
block.Post("stackoverflow.com");

// بلغ كتلة التدفق بالتوقف عن استقبال عناصر جديدة
block.Complete(); 

// انتظر حتي تنتهي كتلة التدفق من معالجة جميع العناصر
await block.Completion; 

توصيل كتل التدفق لتنشئة خط أنابيب البيانات (data pipelines)

يُستخدَم التابع LinkTo لتوصيل كتلتي تَدَفُّق (dataflow blocks) بحيث تَعمَل الأولى كمصدر للبيانات (source) والاخرى كمقصِد (target). يعني ذلك أنه بمجرد استقبَال الأولى لرسالة جديدة ومُعالَجتِها، فإنها ستُبلِّغ الكتل المتصلة بها بوجود رسالة جديدة. يُمكنك إنشاء خط أنابيب (pipeline) عن طريق توصيل عدة كتل تَدَفُّق ببعضها. كالمثال التالي:

var httpClient = new HttpClient();

// أنشئ كتلة تدفق لاستقبال رابط وإعادة محتوياته كسِلسِلة نصية
var downloaderBlock = new TransformBlock<string, string>(
    async uri => await httpClient.GetStringAsync(uri));

// انشئ كتلة تدفق لاستقبال سلسلة نصية وطباعتها على الطرفية
var printerBlock = new ActionBlock<string>(
    contents => Console.WriteLine(contents));

// اجعل كتلة جلب محتويات الروابط مكتملة بمجرد اكتمال كتلة تدفق الطباعة 
var dataflowLinkOptions = new DataflowLinkOptions {PropagateCompletion = true};

// وصل كتلتي التدفق لإنشاء خط أنابيب
downloaderBlock.LinkTo(printerBlock, dataflowLinkOptions);

// أرسل عدة روابط لكتلة التدفق الأولى والتي ستُمرر محتوياتها إلى كتلة التدفق الثانية
downloaderBlock.Post("http://youtube.com");
downloaderBlock.Post("http://github.com");
downloaderBlock.Post("http://twitter.com");
downloaderBlock.Complete(); 
// Completion will propagate to printerBlock

await printerBlock.Completion; // Only need to wait for the last block in the pipeline

السياق التزامني (Synchronization Contexts)

يُعدّ السياق التزامني تمثيلًا تجريديًا (abstract representation) للبيئة التي تُنفَّذ بها الشيفرة.

يُستخدَم عادةً عندما يُريد خيط ما تَّنْفيذ شيفرة ضِمْن خيط آخر. في هذه الحالة، يُلتقَط سياق الخيط المستهدف SynchronizationContext، ويُمرَّر للخيط الفرعي، الذي يستطيع -عند الحاجة- اِستخدَام التابع SynchronizationContext.Post لإرسال مُفوَّض (delegate) إلى سياق الخيط المُلتقَط لضَمان تَّنْفيذ المفوَّض ضِمْن ذلك الخيط.

يكون ذلك بالأخص ضروريًا عند التَعامُل مع مُكوِنات الواجهة (UI component)، والتي لا يمكن تعديل خاصياتها إلا من خلال الخيط المالك الذي انشأها.

الولوج إلى مُكِونات الواجهة (UI components) من خيوط أخرى

إذا أردت تغيير خاصية مُكِون واجهة أو نموذج تَحكُم (form control)، مثل صندوق نصي (textbox) أو عنوان (label)، من خيط (thread) آخر غير خيط الواجهة الرسومية المُنشِئ لهذا المُكِون. فلابُدّ من اِستخدَام طرائق مُعينة غيْر التعديل المُباشر، وإلا ستحصل على رسالة الخطأ التالية:

اقتباس

"العمليات عبر الخيوط غير صالحة: مُحاولة الولوج إلى مُكِون واجهة من خيط آخر غير الخيط الذي أنشأه"

فمثلًا، ستؤدي الشيفرة التالية إلى التبلِّيغ عن اعتراض يَحمِل الرسالة السابقة:

private void button4_Click(object sender, EventArgs e)
{
    Thread thread = new Thread(updatetextbox);
    thread.Start();
}

private void updatetextbox()
{
    textBox1.Text = "updated"; 
}

استخدام السياق التزامُني SynchronizationContext

يمكن لخيط بالخلفية (background) تنفيذ شيفرة بخيط الواجهة (UI thread) لتَحْديث إحدى مُكوِنات الواجهة (UI component) من خلال استقبال السياق التزامني لخيط الواجهة من النوع SynchronizationContext، كالمثال التالي:

void Button_Click(object sender, EventArgs args)
{
    SynchronizationContext context = SynchronizationContext.Current;

    Task.Run(
        () =>
        {
            for(int i = 0; i < 10; i++)
            {
                Thread.Sleep(500); 

                context.Post(ShowProgress, "Work complete on item " + i);
            }
        }
}        
void UpdateCallback(object state)
{
    // يُستدعى هذا التابع من خلال خيط الواجهة فقط ولذلك تُحدث بنجاح
    this.MyTextBox.Text = state as string;
}

في المثال بالأعلى، إذا حَاول الخيط الفرعي تَحْديث خاصية مُكوِن الواجهة MyTextBox.Text داخل الحلقة for مباشرة، سيُبلَّغ عن خطأ خيطي (threading). أما إذا أرسل الحَدَث UpdateCallback إلى السياق التزامني SynchronizationContext الخاص بخيط الواجهة (UI thread)، ستتم عملية تَحْديث الصندوق النصي ضِمْن نفس خيط مُكوِنات الواجهة بنجاح.

عمليًا، ينبغي أن تَستخدَم الواجهة System.IProgress لتَحْديث شريط التقدم (progress)؛ حيث يَلتقِط التَّنْفيذ الافتراضي لهذه الواجهة System.Progress سياق المُنشِئ التزامني (synchronisation context) تلقائيًا.

استخدام التابعين Control.Invoke أو Control.BeginInvoke

حل آخر هو أن تَستخدِم أحد التابعين Control.Invoke أو Control.BeginInvoke لتَغْيير مُحتوى مُكِون الواجهة -صندوق نصي مثلًا- من خيط آخر غير الخيط المالك لنموذج التحكم (form control). علاوة على ذلك، يُمكنك الاستعانة بالخاصية Control.InvokeRequired لفحص ما إذا كان استدعاء نموذج التحكم ضروريًا أم لا، كالتالي:

private void updatetextbox()
{
    if (textBox1.InvokeRequired)
        textBox1.BeginInvoke((Action)(() => textBox1.Text = "updated"));
    else
        textBox1.Text = "updated";
}

إذا كنت ستَحتَاج لتَّنْفيذ ذلك بشكل مُتكرر، فربما من الأفضل تَضْمِين هذه الشيفرة داخل دالة مُوسِعَة (extension) للأنواع القابلة للاستدعاء (invokeable)، كالتالي:

public static class Extensions
{
    public static void BeginInvokeIfRequired(this ISynchronizeInvoke obj, Action action)
    {
        if (obj.InvokeRequired)
            obj.BeginInvoke(action, new object[0]);
        else
            action();
    }
}

وبالتالي يُصبِح تَحْديث صندوق نصي من أي خيط أمرًا يسيرًا، كالتالي:

private void updatetextbox()
{
    textBox1.BeginInvokeIfRequired(() => textBox1.Text = "updated");
}

الفرق بين التابعين Control.BeginInvoke و Control.Invoke هو أن الأول يُنفَّذ بشكل غير مُتزامِن (asynchronous) أما الثاني فيُنفَّذ بشكل مُتزامِن (synchronous). يعني ذلك أنه إذا اُستخدِم التابع الأول Control.BeginInvoke، فإن الشيفرة المكتوبة بعد استدعاء هذا التابع ستُنفَّذ بغض النظر عن انتهاء المفوَّض المُمرَّر إلى التابع من عَمَله أم لا. في المقابل، لن تُنفَّذ الشيفرة المكتوبة بعد التابع الثاني Control.Invoke إلا بعد انتهاء المفوَّض من العَمَل. يؤدي ذلك إلى حُدوث تَعطّيل بالخيط الأساسي ويَتسبَّب ببطئ الشيفرة إلى حد كبير خاصةً إذا لجأت إلى استخدام هذا التابع بكثرة. ربما أيضًا قد يَتسبَّب بحُدوث قفل ميت (deadlock) في حالة انتظار خيط الواجهة (GUI thread) انتهاء الخيط الفرعي من عمله أو انتظار تحريره لمَورِد معين.

استخدام الواجهة IProgress

لاحظ أن التابع Report مُنفَّذ صراحةً (explicit implementation) ضِمْن الواجهة IProgress، وليس موجودًا ضِمْن النوع المُنفِّذ System.Progress. لذا إمّا أن تَستدعِي التابع من خلال الواجهة أو من خلال النوع لكن بعد أن تُحوّله لنوع الواجهة (casting)، كالمثال التالي:

var p1 = new Progress<int>();
p1.Report(1); // خطأ تصريف

IProgress<int> p2 = new Progress<int>();
p2.Report(2); // تُصرف بنجاح

var p3 = new Progress<int>();
((IProgress<int>)p3).Report(3); // تُصرف بنجاح

مثال بسيط لتحديث التقدم

يُمكن لعملية معينة أن تَستخدِم الواجهة IProgress لتَحْديث شريط التقدم لعملية أخرى عن طريق استقبال كائن من الواجهة IProgress، واستدعاء تابعها Report كلما أرادت تَحْديث التقدم، كالمثال التالي:

void Main()
{
    IProgress<int> p = new Progress<int>(
        progress =>
        {
            Console.WriteLine("Running Step: {0}", progress);
        });
    LongJob(p);
}
public void LongJob(IProgress<int> progress)
{
    var max = 10;
    for (int i = 0; i < max; i++)
    {
        progress.Report(i);
    }
}

الخْرج:

Running Step: 0
Running Step: 3
Running Step: 4
Running Step: 5
Running Step: 6
Running Step: 7
Running Step: 8
Running Step: 9
Running Step: 2
Running Step: 1

يُنفَّذ التابع IProgress.Report()‎ بشكل لا مُتزامِن (asynchronously)، لذلك ربما لا تُطبَع الأرقام بنفس ترتيب الاستدعاء. بالتالي، يُعدّ غير مُلائم عندما يكون التَحْديث وِفق ترتيب الاستدعاء ضروريًا.

ترجمة -وبتصرف- للفصول 38 - 39 - 41 - 42 - 43 - 44 - 31 من كتاب ‎.NET Framework Notes for Professionals


تفاعل الأعضاء

أفضل التعليقات

لا توجد أية تعليقات بعد



انضم إلى النقاش

يمكنك أن تنشر الآن وتسجل لاحقًا. إذا كان لديك حساب، فسجل الدخول الآن لتنشر باسم حسابك.

زائر
أضف تعليق

×   لقد أضفت محتوى بخط أو تنسيق مختلف.   Restore formatting

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   جرى استعادة المحتوى السابق..   امسح المحرر

×   You cannot paste images directly. Upload or insert images from URL.


×
×
  • أضف...