İçeriğe geç →

Python Async IO ile Asenkron Programlamayı Anlamak – async/await Sentaksı & Task & Future

Bir önceki yazımda asenkron programlamanın temeli olan generator yapılarını ve basit bir event loop’u oluşturmayı öğrenmiştik. Eğer okumadıysanız asenkron programlama çalışma prensibini ve Async IO kullanımının avantaj ve dezavantajlarını öğrenmek adına okumanızı tavsiye ediyorum.

Bir önceki yazımda kendi event loop implementasyonumuzu oluşturmuş ve birden fazla iş parçacığını eş zamanlı olarak çalıştırmanın generator yöntemini keşfetmiştik. Aslında python içerisinde event loop oluşturma, işletme, mesajlaşma, ağ işlemleri vb. bir çok nokta için hali hazırda bir paket var. Bu paketin ismi Async IO ve bu paket de temelde generator nesneleriyle çalışmakta. Biliyorsunuz ki __next__ veya next() metoduyla generator nesnelerini bir ileri yield’a kadar taşıyabiliyorduk. Yani kendi oluşturduğumuz event loop canlandırmasında başlatılıp duraklatılabilen ve eş zamanlı çalıştırılabilen kod bloklarını gerçekleştirebiliyoruz.

Coroutine

Asyncio eş zamanlılığı coroutine üzerinden sağlamakta. Her hangi bir çağrılabilir nesne generator olup olmaması fark etmeksizin @asyncio.coroutine dekoratörüyle oluşturulursa Async IO içinde kullanılabilmesi için generator haline dönüştürülür ve _is_coroutine değişkeni atanır.

>>> import asyncio
>>> @asyncio.coroutine
... def a():
...     print("Merhaba")
...
>>> a()
<generator object a at 0x000001EE99B00B10>
>>> a._is_coroutine
<object object at 0x000001EE9794E2F0>
>>> @asyncio.coroutine
... def b():
...     #Normal şartlar altında da generator
...     yield None
...
>>> b()
<generator object b at 0x000001EE99B00B10>
>>> b._is_coroutine
<object object at 0x000001EE9794E2F0>
>>> class C:
...     def __call__(self):
...         print("Merhaba ben callable")
...
>>> c = C()
>>> c = asyncio.coroutine(c)
>>> c()
<generator object coroutine.<locals>.coro at 0x000001EE99B00B10>
>>> c._is_coroutine
<object object at 0x000001EE9794E2F0>

Genel olarak baktığımızda coroutine Async IO’nun yapıtaşıdır ve asyncio.coroutine dekoratörüyle girişin generator, callable veya function olmasına bakmaksızın, çıktı olarak özelleştirilmiş generator(coroutine) üretir. Lambda fonksiyonlarındaysa farklı bir davranış söz konusu. Dönüş ifadesi coroutine olmaz fakat _is_coroutine özelliği değiştirilir.

Basit bir Async IO programı çalıştıralım.

>>> import asyncio
>>>
>>> @asyncio.coroutine
... def main():
...     yield from asyncio.sleep(1)
...     print("1 saniye sonra çalıştı")
...     yield from asyncio.sleep(1)
...     print("2 saniye sonra çalıştı")
...
>>> asyncio.run(main())
1 saniye sonra çalıştı
2 saniye sonra çalıştı

Yukarıda ne yaptık diye soracak olursanız bir çeşit generator olan asyncio.sleep fonksiyonuyla beklemeler yaptık ve sonunda da ekrana yazdırdık. Async IO ise söz konusu coroutine nesnesini bir Task haline getirdi ve varsayılan event loop üzerinde çalıştırdı.

Coroutine nesneleri üzerinde bir problem ortaya çıkmakta. Generator yield işlemi asyncio ve bizim daha önce gerçeklediğimiz event loop kurulumunda mantığı bir adım işletmek anlamına geliyor. Peki ya biz event loop üzerinde çalışan generator oluşturmak isteseydik? Mesela bir generator ile http üzerinden birden fazla medya indirip her bir istekte yield etmek istersek ne olacak? Böyle bir ihtiyaç az karşılaşılan bir durum değil ve bu tarz ihtiyaçları custom iterator ile atlatmaya çalışmak okunabilirliği oldukça etkileyecek. Bu nokta ve benzer ihtiyaçların karşılığını bulabilmesi için async/await sentaksına ihtiyaç duyuluyor. Python 3.7 ile beraber gelen bu sentaksta async fonksiyon tanımları yapılıyor ve yield from yerine await getiriliyor.

>>> import asyncio
>>>
>>> async def main():
...     await asyncio.sleep(1)
...     print("1 saniye sonra çalıştı")
...     await asyncio.sleep(1)
...     print("2 saniye sonra çalıştı")
...
>>> asyncio.run(main())
1 saniye sonra çalıştı
2 saniye sonra çalıştı
>>> print(main())
<coroutine object main at 0x000002169201C748>

await anahtar kelimesi sadece async def olarak tanımlanmış fonksiyonların içerisinde kulanılabilmekte. Bu anahtar kelimeyle eski yöntemle tanımlanmış (yani asyncio.coroutine decorator ile tanımlanmış) veya yeni sentaks olan async def ile tanımlanmış her hangi bir coroutine işletmesi bitene kadar beklenir ve devam edilir.

Javascript temelli biriyseniz karşılaştırma yapmak adına şöyle bir bilgi vermekte yarar var. Javascript üzerinde async olarak tanımladığınız bir fonksiyon çalıştırıldığında -içerideki kod Promise döndürmediği takdirde- Promise geri döndürüyordu. Promise’in karşılığı python için Future olarak düşünülebilir. Fakat burada fark olarak python programında async def olarak tanımlama yapıldığında coroutine geri döndürüyor. Bunun sonucu olarak await ile sonuç bekleme işlemi başlatılmadan coroutine çalışmaya başlamaz.

>>> import time
>>> import asyncio
>>> async def main():
...     #İlk sleep nesnesi tutuluyor fakat çalışmaya başlamıyor
...     print(f"Başlangıç {time.time()}")
...     sleep1 = asyncio.sleep(5)
...     await asyncio.sleep(2.5)
...     await sleep1
...     print(f"Bitiş {time.time()}")
...
>>> asyncio.run(main())
Başlangıç 1611021410.1263363
Bitiş 1611021417.6289043

Yukarıdaki örneğin başlangıç ve bitiş zamanlarına baktığımızda yaklaşık 7.5 saniye sürdüğünü görüyoruz. Javascript üzerinde benzer bir uygulama gerçekleştirseydik 5 saniye süreceğini söyleyebiliriz çünkü JS üzerindeki event loop modelinde asenkron bloğu çalıştırdığınızda event loop içerisine gönderilir ve istediğiniz zaman await edebileceğiniz bir Promise geri döndürülür. Python üzerindeki modelde ise event loop üzerine eş zamanlı çalışmak üzere coroutine göndermek için manuel olarak bir işlem yapmanız gerekmektedir. Bunları da Task nesneleriyle gerçekleştiriyoruz.

Task ve Future

Task nedir? Coroutine nasıl Task’a dönüşür?

Task teknik ifadeyle Future sınıfından türetilmiş özel amaçlı bir sınıf. Event loop üzerine eş zamanlı çalışacak şekilde gönderilmiş bir coroutine nesnesini temsil eder ve await nesnesiyle sonuçlanması beklenerek sonuç alınabilir. Nasıl ki yield for anahtarından türeyen await, coroutine içerisinde exception çıktığında await zamanında bu hatayı fırlatıyorsa, Task nesneleri de await zamanında exception fırlatır. Yani event loop üzerinde çalışan Task arkaplanda çalışırken hata fırlatmışsa ancak ve ancak await edildiğinde hata bulunulan kapsam içerisinde görülebilir.

Açıkçası bu denli karmaşa içerisinden anlam çıkarmanın tek yöntemi denemeler yapmak.

Event loop üzerine coroutine gönderip eş zamanlı çalışan bir Task nesnesine dönüştürmek için üç temel yöntem bulunmakta.

  • loop.create_task() – Low level bir işlemdir. Birden fazla event loop olan bir senaryo kullanılmayacaksa önerilmiyor. Sadece coroutine nesnelerini kabul eder.
  • asyncio.ensure_future() – Low level bir yöntemdir. Okunabilirliği azalttığı için artık tercih edilmiyor. Hem Future (ve türetilmiş sınıfları) hem de coroutine destekliyor.
  • asnycio.create_task() – High level bir yöntemdir. Python 3.7 ve üzerinde çalışmaktadır. Sadece coroutine kabul etmektedir.

Event loop üzerine arkaplanda çalışmak üzere task gönderimini örnekle görelim.

>>> import asyncio
>>> import time
>>> async def background(task_num):
...     await asyncio.sleep(5)
...     print(f"{task_num} bitti.")
...
>>> async def main():
...     start_time = time.time()
...     task_list = []
...     # 10 ayrı coroutine nesnesini task olarak event loop..
...     # içerisine gönderiyoruz
...     for i in range(10):
...         task_list.append(asyncio.create_task(background(i)))
...     # Toplu olarak await ediyoruz.
...     for t in task_list:
...         await t
...     # Geçen süre
...     print(f"Toplam süre: {time.time()-start_time} saniye")
...
>>> asyncio.run(main())
0 bitti.
2 bitti.
6 bitti.
9 bitti.
8 bitti.
5 bitti.
7 bitti.
4 bitti.
1 bitti.
3 bitti.
Toplam süre: 5.003758668899536 saniye

Her bir task create_task adımında çalışmaya başladığı için toplam bekleme süresi yaklaşık 5 saniye olarak hesaplandı. Bu sonuç bize create_task fonksiyonunun event loop üzerine eş zamanlı iş gönderildiğini gösteriyor.

Future nedir? Future ile Task arasındaki fark nedir?

Task nesnelerinin, event loop üzerinde çalışan coroutine süreci olduğunu açıklamıştım. Task yukarıda belirttiğim üç yöntemden birisiyle temel olarak oluşturulabilir. Task nesnesinin asyncio.Future sınıfından türediğini de belirtmiştim. Yani her Task aslında event loop üzerinde eş zamanlı çalışan bir bloğun(coroutine veya tekrar kapsama alınmış bir veya daha fazla Future) senkronizasyon nesnesidir. Task nesnesi durdurulabilir await edilebilir.

Future ise düşük seviye senkronizasyon paradigmasıdır. Future temel olarak asenkron bir işlemin olay bağlamlı sonucudur. set_result veya set_exception metoduyla await eden bağlam içerisine sonuç döndürür. Konumuz olan asyncio.Future python standart kütüphanesindeki concurrent.futures.Future nesnesini taklit eder. Temel olarak farkları şunlardır.

  • asyncio.Future thread-safe değildir. Event loop işletilen thread dışındaki bir thread içinde kullanmak güvenli değildir.
  • asyncio.Future.result veya asyncio.future.exception timeout parametresi kabul etmez.
  • concurrent.futures.Future.result veya concurrent.futures.Future.exception bulunulan bağlamı sonuç gelene ve işlem gerçekleştirilene kadar bloklarken, asyncio.Futures.result veya asyncio.Futures.exception sonuçlanmadıysa InvalidStateException fırlatır.
  • asyncio.Future.add_done_callback üzerinden kaydedilen nesneler hemen çağırılmaz, loop.call_soon() metoduyla çağrılır.
  • asyncio.Future concurrent.futures.wait() veya concurrent.futures.as_completed() metodlarını desteklemez.
  • asyncio.Future.cancel fonksiyonu ekstra olarak msg parametresi alabilir.

Basit bir kullanımını görelim.

>>> import asyncio
>>> async def sub_task(f:asyncio.Future):
...     await asyncio.sleep(5)
...     f.set_result("Sub task bitti.")
...
>>> async def main():
...     current_future = asyncio.Future()
...     # Oluşturulan future nesnesiyle başka bir task
...     # içerisindeki işlem callback'ini takip edeceğiz.
...     asyncio.create_task(sub_task(current_future))
...     # Bu bağlam içerisinde örnek işlemler.
...     await asyncio.sleep(1)
...     print("1 saniye bekledi")
...     # Diğer bağlam içerisindeki işlemin bitmesini bekleyelim.
...     print(await current_future)
...
>>> asyncio.run(main())
1 saniye bekledi
Sub task bitti.

Future üzerinde set_exception ile de sonuç alınabilir.

>>> import asyncio
>>> async def sub_task(f:asyncio.Future):
...     try:
...             await asyncio.sleep(5/0)
...             f.set_result("Sub task bitti")
...     except Exception as e:
...             f.set_exception(e)
...
>>> async def main():
...     current_future = asyncio.Future()
...     # Oluşturulan future nesnesiyle başka bir task
...     # içerisindeki işlem callback'ini takip edeceğiz.
...     asyncio.create_task(sub_task(current_future))
...     # Bu bağlam içerisinde örnek işlemler.
...     await asyncio.sleep(1)
...     print("1 saniye bekledi")
...     # Diğer bağlam içerisindeki işlemin bitmesini bekleyelim.
...     print(await current_future)
...
>>> asyncio.run(main())
1 saniye bekledi
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Python39\lib\asyncio\runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "C:\Python39\lib\asyncio\base_events.py", line 642, in run_until_complete
    return future.result()
  File "<stdin>", line 10, in main
  File "<stdin>", line 3, in sub_task
ZeroDivisionError: division by zero

Görüldüğü üzere farklı bağlamlar içerisindeki işlemler asyncio.Future nesnesi kullanılarak senkronize edilebiliyor. Farklı bir thread üzerindeki alt işlemleri de bu şekilde takip edebilirsiniz. asyncio.Future thread-safe olmadığı için asyncio.wrap_future fonksiyonuyla concurrent.futures.Future nesneleri kapsanabilir ve farklı bir thread ile senkronizasyon sağlanabilir.

>>> import asyncio
>>> import concurrent
>>> import threading
>>> import time
>>> def sub_thread(half_future:concurrent.futures.Future, full_future:concurrent.futures.Future):
...     time.sleep(5)
...     half_future.set_result("Yarısı bitti.")
...     # Bütün işlem bitince de kalan mesaj gönderilebilir.
...     time.sleep(5)
...     full_future.set_result("Tamamı bitti.")
...
>>> async def main():
...     half_future = concurrent.futures.Future()
...     full_future = concurrent.futures.Future()
...     # Bu future nesnelerinin asenkron contekst içerisinde kullanılabilen
...     # asyncio.Future haline dönüştürülmesi.
...     half_future_async = asyncio.wrap_future(half_future)
...     full_future_async = asyncio.wrap_future(full_future)
...     # Farklı bir thread başlatalım.
...     t = threading.Thread(target=sub_thread, args=(half_future, full_future))
...     t.start()
...     print(await half_future_async)
...     print(await full_future_async)
...
>>> asyncio.run(main())
Yarısı bitti.
Tamamı bitti.

Hem concurrent.futures hem de asyncio.futures altındaki Future nesneleri thread üzerinden bilgi alabilmek ve gönderebilmek adına çok esnek yetenekler sunuyor.

Asenkron bağlam ve başka process ve thread arasında daha basit bir mesajlaşma bağı kurabilmek adına aktif olarak geliştirdiğim https://pypi.org/project/async-message-handler/ eklentisini inceleyebilirsiniz.

Async IO temel yeteneklerini öğrendik. Bir sonraki yazımda gelişmiş Async IO özelliklerinden bahsedeceğiz.

İyi kodlamalar 😁

https://docs.python.org/3/library/asyncio-future.html

https://stackoverflow.com/questions/715758/coroutine-vs-continuation-vs-generator#:~:text=A%20generator%20is%20essentially%20a,whereas%20a%20generator%20can’t.&text=I%20was%20outlining%20how%20one%20might%20implement%20asymmetric%20coroutines%20in%20Python.)

https://stackoverflow.com/questions/36342899/asyncio-ensure-future-vs-baseeventloop-create-task-vs-simple-coroutine#:~:text=ensure_future%20vs%20create_task,an%20abstract%20method%20of%20AbstractEventLoop%20.&text=You%20should%20use%20ensure_future%20to%20create%20tasks.

https://docs.python.org/3/library/asyncio-future.html

Kategori: Async IO

Yorumlar

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir