İçeriğe geç →

Python Async IO ile Asenkron Programlamayı Anlamak – Giriş ve Coroutine Temelleri

Async IO Python 3.3 ile beraber resmi olarak desteklenen ve 3.5 sürümüyle beraber async/await sentaksını kullanmaya başlayan, sürekli değişen ve gelişen asenkron programlama kütüphanesi. Async IO threading, multiprocessing, paralel programlama, single threaded design ve senkronizasyon gibi yeterince kafa karıştırıcı olan kavramların neresinde diye soracak olursanız bu kavramları ve farklarını anlamak gerekiyor.

Paralellik hesaplama yükünün birden fazla çekirdekte dağıtılarak aynı anda işlem yapmasıdır. C dilindeki fork ve spawn işleçlerinin kullanılmasıyla oluşur ve python’da multiprocessing paketi altında bulunur. Paralel çalışma modelinde ilgili process işletim sisteminin katmanında yeni bir process oluşturur ve işletim sistemi poliçesine göre farklı CPU çekirdekleri içinde çalışır. İşletim sistemi üzerindeki processler fork veya spawn yöntemine bağlı olarak bir kereye mahsus kopya hafızayı alırlar ve birden fazla process birbiriyle shared memory ve pipe aracılığıyla veri transferi yapar. Senkronizasyon için mutex, semaphore vb. yapılar kullanır.

Threading işletim sistemi seviyesinde tek process ve çekirdek altında birden fazla iş parçacığının kapsam değişim(context switching) poliçeleriyle aynı anda çalışıyor hissi vermesidir. Unix sistemlerde POSIX thread, windows sistemlerdeyse windows.h altındaki iş parçacıkları threading gerçekleştirmek için kullanılır. Bu tip iş parçacıklarında global hafıza alanları ortaktır. Python’da threading kütüphanesi altında üst seviye implementasyonlar bulunabilir. Bir process birden fazla thread çalıştırabilir. Python özelinde olmayan threading modelinde threadler başka çekirdeklere dağıtılabilir, fakat Python’da GIL’den dolayı hesap avantajları kullanılamayacaktır.

Eş zamanlılık iş yükünün aynı anda ortak zamanda çalışmasıdır. Paralel programlama ve threading de eş zamanlılık prensibine uymaktadır.

Async IO hangi gruptadır?

Async IO dil seviyesinde eş zamanlılık sağlamak için kullanılan yöntemleri barındıran python kütüphanesi. Bir çok senaryoda tek thread dolayısıyla da tek process üzerinde çalışır. Async IO için kısacası tek thread ve process altında eş zamanlı iş yapmayı sağlayan yöntem bütünü diyebiliriz. Async IO bir çok noktada kolaylıklar sağlasa da tek thread üzerinde bir çok iş yapmayı sağlayan bir sihir değildir. CPU’nun yoğun kullanıldığı uygulamalara (görüntü işleme, tensor hesaplama vb.) uygun değildir.

Avantajları

  • Düşük veri transferi yüksek beklemelerin olduğu eş zamanlı ağ trafiklerinin yönetilmesinde oldukça iyidir.
  • Birden fazla thread kullanıldığı durumlarda ortaya çıkan işletim sistemi kaynaklı context switch maliyeti oldukça azdır, sebebiyse geliştirici asenkron odaklı geliştirirken context switch işlemine özel sentaks(async/await) aracılığıyla kendisi karar verir.
  • Uzun süren TCP, websocket gibi yapıların threadler, worker process ve queue üzerinden ölçeklenmesi işletim sistemi kaynaklarını tüketirken asenkron yöntemlerle çok yüksek eş zamanlılık elde edilebilir. Son zamanlarda yüksek eş zamanlılık konusunun bence en göze çarpan örnekleri NodeJS ve Erlang (XMPP, ejabberd) gibi görünüyor.
  • Kaynak üstüne tampon bellek kullanılarak yazma ve okuma (Buffered) işlemleri haricinde çoğu senaryoda yarış durumu hatalarını minimize eder. Bunun sebebiyse kısa bloklayan veri manipulasyonlarında olay döngüsü ilgili metod veya birimde olduğu için sadece tek aktör kaynak üzerinde çalışır.

Dezavantajları

  • Bloklama hatalarına oldukça açık ve coroutine veya en azından generator tabanlı eş zamanlılık konusunu algılama eksikliğinden ötürü debug edilmesi zor hatalar ortaya çıkabiliyor.
  • CPU yoran işlemleri yapmak için uygun değil veya en azından worker process çalıştırarak asenkron mesajlaşması sağlanmalı.

Bu noktadan sonra asenkron programlama sırasında karşınıza çıkacak olan bloklama, single-threaded model vb. teorik derinliğe odaklanıyor olacağız. Doğrudan AsyncIO kullanımını öğrenmek istiyorsanız aşağıdaki yazı linkine tıklayabilirsiniz. Fakat ben kesinlikle asenkron doğasını öğrenmenizi tavsiye ediyorum.

O zaman başlayalım

Don't block
Keep moving.

Async IO’nun sistem üzerinde thread veya process oluşturmadan eş zamanlı iş yapabildiğini kabul ettik. Fakat bu nasıl gerçekleşiyor? Nasıl hem tek thread üzerinde çalışıp hem de IO operasyonlarında thread ve process’lerden çok daha fazla paralelliğe erişebiliyor?

The Queen’s Gambit örneği…

Netflix’te The Queen’s Gambit dizisini izleyenler bilir. Dizinin baş karakteri Beth Harmon yetimhaneden çıkıp kendisinden yaşça büyük kolej öğrencilerinin kurduğu satranç takımının 20 civarı öğrencisiyle aynı anda satranç oynuyor. Dizinin bir kaç yerinde bu gösteriyi tekrarlıyor. Nasıl ki Beth Harmon aynı anda tek hamle yapabilirken birden fazla satranç oyuncusuyla satranç oynayabiliyorsa, Async IO birden fazla IO oyuncusuyla o şekilde veri alışverişi yapabiliyor.

Bloklayan işlem

Asenkron yöntemde Harmon’ın olay döngüsü (daha sonra öğreneceğimiz event loop) Round Robin kuralına göre işleri sıralıyor diye varsayarsak, 1’den n’e kadar giden masa numaralarında 1 numaralı masada hamle yapıyor sonra sırayla n numaralı masaya kadar hamleler yapıyor. 1’den n numaralı masaya kadar her masa hamle yaptı mı diye kontrol ediyor ve bu döngü sürüyor. 1 ile n arasındaki masalardan herhangi birisi hamle yapmışsa hamlesini gerçekleştiriyor ve sırayla kontrol etmeye devam ediyor. Asenkron dünyasında sıkça duyduğumuz senkron veya bloklayan(blocking) iş bu döngünün arasına girerse diğer masalar beklemek zorunda kalıyor.

Örneğin; Harmon karşı hamleyi düşünme süresini uzatıp masalardan herhangi birinde çakılı kalırsa diğer masalar onun hamlesini yapmasını bekleyecektir. Tabiki de bu döngüde bir çok işlem bloklayan olarak ilerliyor. Fakat her bir masada kısa süreli bloklayan iş yapması göz ardı edilebilir bir şey. Eğer toplamda bloklayan işlem süresi kontrol etme(yielding) süresine yakınlaşıyorsa (bir web uygulamasında çok fazla trafik aktarmak, yüksek disk IO oluşması gibi) asenkron avantajını kaybetmeye başlıyor.

Eğer aynı problemde process eş zamanlılığı uygulansaydı n tane Beth Harmon her bir masanın karşısına geçip oynar ve karşı hamleyi beklerdi. Her Beth Harmon’ın ayrı hafızası olurdu.

Eğer thread eş zamanlılığı uygulansaydı Harmon’ın dışındaki bir kuvvet(işletim sistemi) Harmon’ın yerini kendi kararıyla değiştirmesinden belki 1000 kat fazla değiştirirdi ve bu değişimlerin bir kısmı Harmon’ın hamle yaptığı ana denk gelirdi. Yani Harmon’ın yeri, Harmon atı ileri almaya başlayıp bitmesi arasında geçen sürede dahi onlarca defa değişebilirdi. Harmon ise nereye gideceğine karar veremez, ne zaman yer değiştireceğini, nereye gideceğini bilemezdi.

Yukarıdaki örneklerden asenkron yöntemde Harmon bulunduğu masalardaki durumları aklında tutmak zorunda. Yani her masadaki süreç python’daki karşılığıyla ayrı bir iterator olmalı ve iterator süreci ayrı ayrı saklanmalı. Tabi her problem için __iter__ ve __next__ metodlarıyla sınıf oluşturmak ve sınıfın hafızasına (state) karar vermek zahmetli olacağından bir çeşit iterator olan generator kullanmak daha mantıklı.

Python Generator

Temel anlamda baktığınızda içerisinde herhangi bir satırda yield veya yield from geçen bütün metodlar generator geri döndürür ve çalışmaz. yield anahtar kelimesi metodun ilerleyişini kaydeder ve hemen yanındaki nesneyi iterasyonun yapıldığı yere gönderir. Metodun içerisinde uğranacak yield noktası kalmadıysa çalışmayı bitirir ve iterasyonun yapıldığı yere StopIteration hatasını fırlatır.

Aşağıdaki konsol denemelerinde akışı anlayacağınıza eminim.

>>> #Örnek bir generator oluşturuyoruz.
>>> def sample_generator():
...     yield 1
...     yield 2
...     yield 3
...
>>> generator_object = sample_generator() #generator başlatıyoruz.
>>> type(generator_object) #Bu bir generator nesnesi.
<class 'generator'>
>>> next(generator_object) #ilk yield anahtarına kadar çalışıyor.
1
>>> Aşağıdaki komut next() ile birebir aynı.
>>> generator_object.__next__() #ikinci yield anahtarına kadar çalışıyor. 
2
>>> next(generator_object) #Son yield anahtarına kadar çalışıyor.
3
>>> #StopIteration hatası fırlatılacak.
>>> next(generator_object) #yield anahtarı kalmadı fonksiyon dönüş yapacak. 
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

Generator nesneleri aynı zamanda iterator nesnelerinin __next__ metodunu kullandığı için aşağıdaki gibi doğrudan döngülerde ve iterator bekleyen işlemlerde kullanılabilir.

>>> for num in sample_generator():
...     print(num)
...
1
2
3
>>> list(sample_generator())
[1, 2, 3]
>>> list(map(lambda x: x+1, sample_generator()))
[2, 3, 4]

Generator nesneleri iterable nesneleri ve bu nesnelerin __iter__ metoduyla döndürdükleri iterator nesnelerinin özelleştirilmiş ve basitleştirilmiş halidir. Bulunduğu noktayı kaydeder ve kolayca tembel çalışan blok (lazy execution) oluşturabileceği için hafıza problemlerinin önüne geçer.

Aşağıdaki örnekte aynı çıktıyı lazy execution ve eager execution yöntemleriyle gerçekleştirince oluşan hafıza farkını görebiliriz.

>>> import sys
>>> eager_count = [x for x in range(1000000)] #Tüm sayıları hemen alır ve hafızaya kaydeder.
>>> print(f"{sys.getsizeof(eager_count)} byte yer ayrıldı.")
8697464 byte yer ayrıldı.
>>> def lazy_count():
#Sayıları iterasyon ilerletildikçe alır.
...     counter = 0
...     while counter<1000000:
...             yield counter
...             counter += 1
...
>>> print(f"{sys.getsizeof(lazy_count())} byte yer ayrıldı.")
120 byte yer ayrıldı.

Generator kullanımı sayesinde hafızaya sığmayacak veriler işlenebilmektedir. Resimler veya medya dosyaları üzerinde derin öğrenme modeli eğitenlerin sıkça karşılaştığı durum olan verinin hafızaya sığmaması durumu da bu yöntemle veya alternatif lazy execution yöntemleriyle atlatılabiliyor.

Generator nesneleri thread-safe değil. Yani birden fazla thread üzerinde aynı anda işletilmesi güvenli değil. Böyle bir durumda “ValueError: generator already executing” hatasıyla karşılaşmanız yüksek muhtemel. Bunun için bir generator nesnesini birden fazla thread üzerinde kullanmak için senkronizasyonu kendiniz yapmak durumundasınız.

yield from anahtar kelimesi

“yield” anahtar kelimesinin metodun bulunduğu noktayı kaydederek iterasyon kaynağına veri gönderdiğini öğrenmiştik. “yield from” ise verilen iterator nesnesinin üzerinde ilerler ve yield gibi bulunduğu noktayı kaydeder. Aşağıdaki örnekte tam olarak ne dediğimi anlayacaksınız.

>>> def sample_generator():
...     yield from [1, 2, 3] #list nesneleri aynı zamanda iterator nesnesidir.
...
>>> generator_object = sample_generator()
>>> next(generator_object) # listenin 1. elemanı gelir.
1
>>> next(generator_object) # listenin 2. elemanı gelir.
2
>>> next(generator_object) # listenin 3. elemanı gelir.
3
>>> next(generator_object) # yield edilecek başka element kalmaz.
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

yield from ile generator nesnesi içerisinde başka generator nesnelerini de işletebilirsiniz. Bunun da örneği aşağıda bulunmaktadır.

>>> def sub_generator():
 # Bu generator sample_generator altında işletilecek.
...     for i in range(10):
...             yield i
...
>>> def sample_generator():
 # Bu generator ise üzerinde dönülecek iterator.
...     yield from sub_generator()
...
>>> for i in sample_generator():
...     print(i)
...
0
1
2
3
4
5
6
7
8
9
Eureka

Şimdi de asenkron programlamanın generator nesneleriyle ilişkisini aklınıza yerleştirecek ayrıntıya geliyor sıra. yield from anahtarı işlettiği generator nesnesinin return değerini getirir. Bir başka deyişle yield from anahtarının fonksiyonu işlettiği generator nesnesinin bitmesini beklemektir ve sonucunu almaktır. Aşağıda bunun da örneğini görüyorsunuz.

>>> def sample_generator(times):
 # Gelen sayının 2 katını adım adım hesaplar.
...     counter = 0
...     for i in range(times):
...             counter += 2
...             yield
...     return counter
...
>>> def main():
 # Ana metod
...     result = yield from sample_generator(200)
...     print(result)
...
>>> for _ in main():
...     pass
...
400

Async IO Coroutine temeli generator eş zamanlılığı

İşleri biraz karıştırmakta yarar var diye düşünüyorum. Mesela generator nesneleriyle 2 ayrı eş zamanlı task oluşturulsun ve 1. task 2 saniyede bir ekrana “Ben 1. taskım”, 2. task ise 4 saniyede bir “Ben 2. taskım” yazsın. Bunu tabiki bloklayan işlem olan time.sleep metoduyla yapmamamız gerekiyor.

Yazının başında Beth Harmon’ın eş zamanlı satranç oynarken sırayla (Round Robin) masalara gidip oyunu ilerlettiğini anlatmıştık. Burada da mantık aynı olacak.

>>> import time
>>> def async_sleep(seconds):
 # Asenkron uyuma.
...     start_time = time.time()
...     while True:
...             if (time.time()-start_time) < seconds:
...                     yield
...             else:
...                     return
...
>>> def task_1():
 # 2 saniyede bir çalışacak
...     while True:
...             yield from async_sleep(2)
...             print("Ben 1. taskım")
...
>>> def task_2():
 # 4 saniyede bir çalışacak
...     while True:
...             yield from async_sleep(4)
...             print("Ben 2. taskım")
...
>>> import itertools
>>> # Round robin olarak tasklar işletiliyor.
>>> # Aşağıdaki döngü Event loop'a örnektir.
>>> for task in itertools.cycle([task_1(), task_2()]):
...     next(task)
...
Ben 1. taskım
Ben 2. taskım
Ben 1. taskım
Ben 1. taskım
Ben 2. taskım
Ben 1. taskım
Ben 1. taskım
Ben 2. taskım
Ben 1. taskım
Ben 1. taskım
.
.
.

Asenkron çalışmanın hilesi tam olarak budur. Gördüğünüz üzere threading kullanmadık ve 2 ayrı task eş zamanlı bir şekilde çalışıyor. Yukarıdaki tasklardan herhangi birinde bloklayan işlem (time.sleep vb.) kullansaydık tasklar arasında geçiş olamayacaktı ve event loop ilerleyemeyecekti.

Yazının devamında async/await sentaksı ve Async IO yapıtaşlarını tartışıyor olacağız.

İyi kodlamalar 😀

https://asyncio-notes.readthedocs.io/en/latest/asyncio-history.html

https://realpython.com/async-io-python/

https://docs.python.org/3/library/asyncio.html

Kategori: Async IO

Yorumlar

Bir cevap yazın

E-posta hesabınız yayımlanmayacak. Gerekli alanlar * ile işaretlenmişlerdir