مقدمة
في مقالة سابقة عن الدوال اللامتزامنة تحدثت عن هذا المفهوم وكيف يمكنه المساهمة في تشغيل عدة مهمات في نفس الوقت. وقلت أننا بحاجة لمقالة نبين فيها كيف نتحقق من أن الدوال تعمل معا بالتوازي وكيف نستخدم ال contextlib.asynccontextmanager وما هي عيوب الدوالي اللامتزامنة وكيف نتجنبها. يجب أن تقرأ المقالة السابقة قبل هذه المقالة.
محاكاة عدة طلبات
import time import random import asyncio async def rndsleep(msg): print(">>> entering ", msg) seconds = random.randrange(100, 200)/1000.0 await asyncio.sleep(seconds) print("<<< leaving ", msg) async def myloop(n): await asyncio.sleep(0.1) for i in range(n): task = asyncio.create_task(rndsleep(i)) yield task
إضافة تقنيات جديدة لا يحل المشاكل
لو قمنا بتنفيذ 20 مهمة كل منها 150 ميلي-ثانية بالمتوسط بشكل متسلسل فإنه سيستغرق حوالي 20 * 0.150 ويساوي 3 ثوان. كذلك فإنه لن يبدأ المهمة رقم 2 إلا بعد أن ينهي مهمة رقم 1. وهنا لا يوجد أي فائدة.
بالمقابل لو بدأت المهمة التالية قبل أن تنتهي المهمة الأولى وكان الوقت المستغرق لانهاء كل المهمات أقل من مجموعها منفردة فإننا هنا نرى فعالية في معالجة عدة مهمات في نفس الوقت
دعونا من الكلام ولننظر للكود
async def main(): t0 = time.time() async for task in myloop(20): await task print("took ", time.time()-t0, " seconds") if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
عند تجربة الكود فإن النتيجة تبدو هكذا
››› entering 0 ‹‹‹ leaving 0 ››› entering 1 ‹‹‹ leaving 1 ››› entering 2 ‹‹‹ leaving 2 ››› entering 3 ‹‹‹ leaving 3 ››› entering 4 ‹‹‹ leaving 4 ››› entering 5 ‹‹‹ leaving 5 ››› entering 6 ‹‹‹ leaving 6 ››› entering 7 ‹‹‹ leaving 7 ››› entering 8 ‹‹‹ leaving 8 ››› entering 9 ‹‹‹ leaving 9 ››› entering 10 ‹‹‹ leaving 10 ››› entering 11 ‹‹‹ leaving 11 ››› entering 12 ‹‹‹ leaving 12 ››› entering 13 ‹‹‹ leaving 13 ››› entering 14 ‹‹‹ leaving 14 ››› entering 15 ‹‹‹ leaving 15 ››› entering 16 ‹‹‹ leaving 16 ››› entering 17 ‹‹‹ leaving 17 ››› entering 18 ‹‹‹ leaving 18 ››› entering 19 ‹‹‹ leaving 19 took 3.01 seconds
أعلاه فإننا نلاحظ أن المهمات تم تنفيذها بالتسلسل واحدة ثم ينتظر حتى تتم ثم يشغل التي تليها وأنه أخذ في تنفيذ 20 منها 3 ثوان تقريبا (أي أنها كانت تعمل بالتسلسل وليس بالتوازي)
الحل
نحن في المثال السابق كنا نشغل المهمة ثم مباشرة ننتظرها await قبل أن تبدأ المهمة التالية بالعمل. ماذا لو كومنا كل المهمات ثم في قائمة ثم انتظرنا هذه القائمة كما في هذا المثال
async def main(): t0 = time.time() tasks = [] async for task in myloop(20): tasks.append(task) await asyncio.gather(*tasks) print("took ", time.time()-t0, " seconds") if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())
والنتيجة
››› entering 0 ››› entering 1 ››› entering 2 ››› entering 3 ››› entering 4 ››› entering 5 ››› entering 6 ››› entering 7 ››› entering 8 ››› entering 9 ››› entering 10 ››› entering 11 ››› entering 12 ››› entering 13 ››› entering 14 ››› entering 15 ››› entering 16 ››› entering 17 ››› entering 18 ››› entering 19 ‹‹‹ leaving 15 ‹‹‹ leaving 2 ‹‹‹ leaving 18 ‹‹‹ leaving 6 ‹‹‹ leaving 1 ‹‹‹ leaving 10 ‹‹‹ leaving 4 ‹‹‹ leaving 16 ‹‹‹ leaving 3 ‹‹‹ leaving 9 ‹‹‹ leaving 13 ‹‹‹ leaving 19 ‹‹‹ leaving 11 ‹‹‹ leaving 8 ‹‹‹ leaving 0 ‹‹‹ leaving 7 ‹‹‹ leaving 14 ‹‹‹ leaving 5 ‹‹‹ leaving 17 ‹‹‹ leaving 12 took 0.29 seconds
ونلاحظ أن المهمات كلها دخلت بالعمل معا حتى قبل أن تنتهي أول واحدة منهم وأن الوقت الإجمالي كان قريب من من زمن مهمة واحدة.
وقعنا في الفخ
في المثال السابق قمنا بتشغيل كل المهمات دفعة واحدة. تخيل لو أنها طلبات لموقع ولدينا عدد كبير من الاتصالات كل عملية تعمل استعلام على قاعدة البيانات وتعالج كمية كبيرة من البيانات وكل هذه البيانات موجودة في الذاكرة دفعة واحدة دون أي حد. ربما علينا أن نضع بعض المكابح حتى نضع سقف بحيث لا تنموا المهمات بحيث تلتهم كامل موارد الجهاز.
كيف وماذا نراقب؟
نحتاج أن نعد كم عملنا. كام واحدة نجحت وكام واحدة فشلت. وكلما دخلنا نزيد الميزان وكلما خرجنا نطرح من الميزان. إذا اشتغلت 3 مهمات فإن الميزان يزاد 3 مرات ثم ان نتهت واحدة يطرح واحد من الميزان ليصبح 2. الميزان يمثل طول طابور المراجعين أو عدد المهمات في الانتظار.
from contextlib import contextmanager class Stats: def __init__(self): self.total=0 self.success=0 self.failed=0 self.balance=0 self.max_balance=0 def __str__(self): return "** Stats total={} success={} failed={} balance={} max_balance={} **".format( self.total, self.success, self.failed, self.balance, self.max_balance, ) @contextmanager def context(self): self.total+=1 self.balance+=1 self.max_balance = max(self.max_balance, self.balance) try: print("››› entering stats ", self) yield self except Exception as e: self.failed+=1 raise e else: self.success+=1 finally: self.balance-=1 print("‹‹‹ leaving stats ", self)
هذا الكود يسمى contextmanager ويستعمل عبر with. في المثال الأول كانت أقصى قيمة للميزان هي 1 والثاني كانت 20. ولو كان عندنا ملايين المهمات ستكون الطريقة الأولى 1 والثانية كلهم. وكلاهما سيء. واحدة تنفذهم بتسلسل وواحدة تنفذهم كلهم (إلى ما لا نهاية).
تعرف إلى الحواجز semaphore
الحاجز asyncio.Semaphore هو مجرد عداد يبدأ برقم ما نمرره له عند استهلاله مثلا 5 هذا الرقم يمثل عدد الموارد المتاحة. كلما طلب أحدهم حجزه يتم طرح واحد من العداد وعندما يحرره أحدهم يجمع له واحد. عند حجزه عدة مرات دفعة واحدة إلى أن يصل إلى الصفر. بمجرد أن يصل للصفر فإنه لا يعود فورا كما كان يعمل بل يتوقف ويأخذ غفوة إلى أن يتم تحرير عدد من الموارد ويعود الرقم ليصير موجبا. مثلا لدينا 20 مهمة تم تشغيلها معا. كل مهمة منها تطلب حجز هذا الحاجز الذي بدأ ب 5 فإنه سينفذ أول 5 مهمات ذات الأرقام 0 و 1 و 2 و 3 و 4 ثم يأخذ غوة حتى تنتهي بعض هذه المهمات.
sem = asyncio.Semaphore(5) stats = Stats() async def rndsleep(msg): async with sem: with stats.context(): print("››› entering ", msg) seconds = random.randrange(100, 200)/1000.0 await asyncio.sleep(seconds) print("‹‹‹ leaving ", msg)
هنا حددنا قيمة العداد ب 5 يعني أقصى طول لطابور الانتظار هو 5. ولن تمر المهمة رقم 6 إلا بعد انتهاء أي مهمة من المهمات التي قبلها. هذا يعني أن ال 20 مهمة ستعمل كل 5 معا كأنها موزعة في 4 دفعات. وستستغرق 4 ضرب طول المجموعة الواحدة والتي هي 0.200 يعني أقل من ثانية بقليل. المثال متوفر هنا
دفعات غير مرتبة من خلال asyncio.as_completed
هذه الدالة تأخذ قائمة من المهمات وتعيد generator يمكن السير عبر for لكن ليس بالترتيب بل تعيد أيهما تكتمل أولا. يعني لو اول مهمة تستغرق 200 ميلي-ثانية والثانية 120 والثالثة 130 فإنه تعيد الثانية ثم الثالثة ثم الأولى. كلما اكتملت واحدة تعيدها بغض النظر عن ترتيبها.
async def unordered_bulks(size, aiterable): tasks = set() async for task in aiterable: tasks.add(task) if len(tasks)>=size: for coro in asyncio.as_completed(tasks): yield coro tasks.clear() if len(tasks): for coro in asyncio.as_completed(tasks): yield coro tasks.clear()
هذه الدالة التي عرفناها تأخذ حجم الدفعة ومتوالية async generator (حتى لو كانت لا نهائية) وتشغلها لما يتجمع عنها حجم الدفعة المطلوبة وتعيدها واحدة واحدة لما تكتمل
وأخيرا هذا المثال
sem = asyncio.Semaphore(10) stats = Stats() async def rndsleep(msg): async with sem: with stats.context(): print("››› entering ", msg) seconds = random.randrange(100, 200)/1000.0 await asyncio.sleep(seconds) print("‹‹‹ leaving ", msg) async def myloop(n): await asyncio.sleep(0.1) for i in range(n): task = asyncio.create_task(rndsleep(i)) yield task async def main(): t0 = time.time() gen = myloop(20) async for task in unordered_bulks(5, gen): await task print("took ", time.time()-t0, " seconds")
مع أن حجم الحاجز هو 10 إلا أن ال max_balance كان 5. لأننا نشغلها في دفعات حجم كل منها 5. ولاحظ أنها ستنجح حتى لو كان ال gen لانهائي.
ليست هناك تعليقات:
إرسال تعليق