الصفحات

2021/04/02

التحقق من فعالية الدوال اللامتزامنة async functions

 مقدمة

في مقالة سابقة عن الدوال اللامتزامنة تحدثت عن هذا المفهوم وكيف يمكنه المساهمة في تشغيل عدة مهمات في نفس الوقت. وقلت أننا بحاجة لمقالة نبين فيها كيف نتحقق من أن الدوال تعمل معا بالتوازي وكيف نستخدم ال contextlib.asynccontextmanager وما هي عيوب الدوالي اللامتزامنة وكيف نتجنبها. يجب أن تقرأ المقالة السابقة قبل هذه المقالة.

محاكاة عدة طلبات

لنحاكي عملية وصول عدة طلبات متزامنة للخادم. كل طلب سيقوم باستعلام ما في قاعدة بيانات بعيدة وهذا يستغرق بين 100 ميلي-ثانية إلى 200 ميلي-ثانية (بمتوسط 150). في هذه المحاكاة معالجة كل طلب سيعمل غفوة عشوائية بذلك المقدار. كل طلب سيكون له رقم متسلسل يطبع عبارة "دخول المهمة رقم كذا" ثم يأخذ الغفوة ثم يطبع "الخروج من المهمة رقم كذا".
import time
import random
import asyncio

async def rndsleep(msg):
  print(">>> entering ", msg)
  seconds = random.randrange(100, 200)/1000.0
  await asyncio.sleep(seconds)
  print("<<< leaving ", msg)

async def myloop(n):
    await asyncio.sleep(0.1)
    for i in range(n):
        task = asyncio.create_task(rndsleep(i))
        yield task

دعونا نرى ماذا سيحصل

إضافة تقنيات جديدة لا يحل المشاكل

لو قمنا بتنفيذ 20 مهمة كل منها 150 ميلي-ثانية بالمتوسط بشكل متسلسل فإنه سيستغرق حوالي 20 * 0.150 ويساوي 3 ثوان. كذلك فإنه لن يبدأ المهمة رقم 2 إلا بعد أن ينهي مهمة رقم 1. وهنا لا يوجد أي فائدة.


بالمقابل لو بدأت المهمة التالية قبل أن تنتهي المهمة الأولى وكان الوقت المستغرق لانهاء كل المهمات أقل من مجموعها منفردة فإننا هنا نرى فعالية في معالجة عدة مهمات في نفس الوقت


دعونا من الكلام ولننظر للكود

async def main():
  t0 = time.time()
  async for task in myloop(20):
    await task
  print("took ", time.time()-t0, " seconds")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

عند تجربة الكود فإن النتيجة تبدو هكذا

››› entering 0
‹‹‹ leaving  0
››› entering 1
‹‹‹ leaving  1
››› entering 2
‹‹‹ leaving  2
››› entering 3
‹‹‹ leaving  3
››› entering 4
‹‹‹ leaving  4
››› entering 5
‹‹‹ leaving  5
››› entering 6
‹‹‹ leaving  6
››› entering 7
‹‹‹ leaving  7
››› entering 8
‹‹‹ leaving  8
››› entering 9
‹‹‹ leaving  9
››› entering 10
‹‹‹ leaving  10
››› entering 11
‹‹‹ leaving  11
››› entering 12
‹‹‹ leaving  12
››› entering 13
‹‹‹ leaving  13
››› entering 14
‹‹‹ leaving  14
››› entering 15
‹‹‹ leaving  15
››› entering 16
‹‹‹ leaving  16
››› entering 17
‹‹‹ leaving  17
››› entering 18
‹‹‹ leaving  18
››› entering 19
‹‹‹ leaving  19
took 3.01 seconds

أعلاه فإننا نلاحظ أن المهمات تم تنفيذها بالتسلسل واحدة ثم ينتظر حتى تتم ثم يشغل التي تليها وأنه أخذ في تنفيذ 20 منها 3 ثوان تقريبا (أي أنها كانت تعمل بالتسلسل وليس بالتوازي)

الحل

نحن في المثال السابق كنا نشغل المهمة ثم مباشرة ننتظرها await قبل أن تبدأ المهمة التالية بالعمل. ماذا لو كومنا كل المهمات ثم في قائمة ثم انتظرنا هذه القائمة كما في هذا المثال

async def main():
  t0 = time.time()
  tasks = []
  async for task in myloop(20):
    tasks.append(task)
  await asyncio.gather(*tasks)
  print("took ", time.time()-t0, " seconds")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

والنتيجة

››› entering  0
››› entering  1
››› entering  2
››› entering  3
››› entering  4
››› entering  5
››› entering  6
››› entering  7
››› entering  8
››› entering  9
››› entering  10
››› entering  11
››› entering  12
››› entering  13
››› entering  14
››› entering  15
››› entering  16
››› entering  17
››› entering  18
››› entering  19
‹‹‹ leaving   15
‹‹‹ leaving   2
‹‹‹ leaving   18
‹‹‹ leaving   6
‹‹‹ leaving   1
‹‹‹ leaving   10
‹‹‹ leaving   4
‹‹‹ leaving   16
‹‹‹ leaving   3
‹‹‹ leaving   9
‹‹‹ leaving   13
‹‹‹ leaving   19
‹‹‹ leaving   11
‹‹‹ leaving   8
‹‹‹ leaving   0
‹‹‹ leaving   7
‹‹‹ leaving   14
‹‹‹ leaving   5
‹‹‹ leaving   17
‹‹‹ leaving   12
took  0.29  seconds

ونلاحظ أن المهمات كلها دخلت بالعمل معا حتى قبل أن تنتهي أول واحدة منهم وأن الوقت الإجمالي كان قريب من من زمن مهمة واحدة.

وقعنا في الفخ

في المثال السابق قمنا بتشغيل كل المهمات دفعة واحدة. تخيل لو أنها طلبات لموقع ولدينا عدد كبير من الاتصالات كل عملية تعمل استعلام على قاعدة البيانات وتعالج كمية كبيرة من البيانات وكل هذه البيانات موجودة في الذاكرة دفعة واحدة دون أي حد. ربما علينا أن نضع بعض المكابح حتى نضع سقف بحيث لا تنموا المهمات بحيث تلتهم كامل موارد الجهاز.

كيف وماذا نراقب؟

نحتاج أن نعد كم عملنا. كام واحدة نجحت وكام واحدة فشلت. وكلما دخلنا نزيد الميزان وكلما خرجنا نطرح من الميزان. إذا اشتغلت 3 مهمات فإن الميزان يزاد 3 مرات ثم ان نتهت واحدة يطرح واحد من الميزان ليصبح 2. الميزان يمثل طول طابور المراجعين أو عدد المهمات في الانتظار.

from contextlib import contextmanager

class Stats:
    def __init__(self):
        self.total=0
        self.success=0
        self.failed=0
        self.balance=0
        self.max_balance=0

    def __str__(self):
        return "** Stats total={} success={} failed={} balance={} max_balance={} **".format(
            self.total,
            self.success,
            self.failed,
            self.balance,
            self.max_balance,
        )

    @contextmanager
    def context(self):
        self.total+=1
        self.balance+=1
        self.max_balance = max(self.max_balance, self.balance)
        try:
            print("››› entering stats ", self)
            yield self
        except Exception as e:
            self.failed+=1
            raise e
        else:
            self.success+=1
        finally:
            self.balance-=1
            print("‹‹‹ leaving stats  ", self)

هذا الكود يسمى contextmanager ويستعمل عبر with. في المثال الأول كانت أقصى قيمة للميزان هي 1 والثاني كانت 20. ولو كان عندنا ملايين المهمات ستكون الطريقة الأولى 1 والثانية كلهم. وكلاهما سيء. واحدة تنفذهم بتسلسل وواحدة تنفذهم كلهم (إلى ما لا نهاية).

تعرف إلى الحواجز semaphore 

الحاجز asyncio.Semaphore هو مجرد عداد يبدأ برقم ما نمرره له عند استهلاله مثلا 5 هذا الرقم يمثل عدد الموارد المتاحة. كلما طلب أحدهم حجزه يتم طرح واحد من العداد وعندما يحرره أحدهم يجمع له واحد. عند حجزه عدة مرات دفعة واحدة إلى أن يصل إلى الصفر. بمجرد أن يصل للصفر فإنه لا يعود فورا كما كان يعمل بل يتوقف ويأخذ غفوة إلى أن يتم تحرير عدد من الموارد ويعود الرقم ليصير موجبا. مثلا لدينا 20 مهمة تم تشغيلها معا. كل مهمة منها تطلب حجز هذا الحاجز الذي بدأ ب 5 فإنه سينفذ أول 5 مهمات ذات الأرقام 0 و 1 و 2 و 3 و 4 ثم يأخذ غوة حتى تنتهي بعض هذه المهمات.

sem = asyncio.Semaphore(5)
stats = Stats()

async def rndsleep(msg):
  async with sem:
    with stats.context():
      print("››› entering ", msg)
      seconds = random.randrange(100, 200)/1000.0
      await asyncio.sleep(seconds)
      print("‹‹‹ leaving  ", msg)

هنا حددنا قيمة العداد ب 5 يعني أقصى طول لطابور الانتظار هو 5. ولن تمر المهمة رقم 6 إلا بعد انتهاء أي مهمة من المهمات التي قبلها. هذا يعني أن ال 20 مهمة ستعمل كل 5 معا كأنها موزعة في 4 دفعات. وستستغرق 4 ضرب طول المجموعة الواحدة والتي هي 0.200 يعني أقل من ثانية بقليل. المثال متوفر هنا

دفعات غير مرتبة من خلال asyncio.as_completed 

هذه الدالة تأخذ قائمة من المهمات وتعيد generator يمكن السير عبر for لكن ليس بالترتيب بل تعيد أيهما تكتمل أولا. يعني لو اول مهمة تستغرق 200 ميلي-ثانية والثانية 120 والثالثة 130 فإنه تعيد الثانية ثم الثالثة ثم الأولى. كلما اكتملت واحدة تعيدها بغض النظر عن ترتيبها.

async def unordered_bulks(size, aiterable):
    tasks = set()
    async for task in aiterable:
        tasks.add(task)
        if len(tasks)>=size:
            for coro in asyncio.as_completed(tasks):
                yield coro
            tasks.clear()
    if len(tasks):
        for coro in asyncio.as_completed(tasks):
            yield coro
        tasks.clear()

هذه الدالة التي عرفناها تأخذ حجم الدفعة ومتوالية async generator (حتى لو كانت لا نهائية) وتشغلها لما يتجمع عنها حجم الدفعة المطلوبة وتعيدها واحدة واحدة لما تكتمل

وأخيرا هذا المثال

sem = asyncio.Semaphore(10)
stats = Stats()

async def rndsleep(msg):
  async with sem:
    with stats.context():
      print("››› entering ", msg)
      seconds = random.randrange(100, 200)/1000.0
      await asyncio.sleep(seconds)
      print("‹‹‹ leaving  ", msg)

async def myloop(n):
    await asyncio.sleep(0.1)
    for i in range(n):
        task = asyncio.create_task(rndsleep(i))
        yield task

async def main():
  t0 = time.time()
  gen = myloop(20)
  async for task in unordered_bulks(5, gen):
    await task
  print("took ", time.time()-t0, " seconds")


مع أن حجم الحاجز هو 10 إلا أن ال max_balance كان 5. لأننا نشغلها في دفعات حجم كل منها 5. ولاحظ أنها ستنجح حتى لو كان ال gen لانهائي.


ليست هناك تعليقات:

إرسال تعليق