Hava Akışı: Daha Az Bilinen İpuçları, Püf Noktaları ve En İyi Uygulamalar

Uzun süre kullandıktan sonra bile bilmeyeceğiniz tüm araçlarla ilgili belli şeyler var. Ve bir kez bunu biliyorsanız, “Keşke bunu önceden bilseydim” gibisiniz, çünkü müşterinize daha iyi bir şekilde yapılamayacağını zaten söylemiştiniz . Diğer araçlar gibi hava akımı da farklı değil, hayatınızı kolaylaştıracak ve DAG geliştirmeyi eğlenceli hale getirebilecek bazı gizli taşlar var.

Bunlardan bazılarını zaten biliyor olabilirsiniz ve hepsini tanıyorsanız - o zaman bir PRO'sunuz.

(1) Bağlam Yöneticisi ile DAG

Görevinize dag = dag eklemeyi unuttuğunuzda ve Airflow hatasında rahatsız mı kaldınız? Evet, her görev için eklemeyi unutmak kolaydır. Aşağıdaki örnekte gösterildiği gibi aynı parametreyi eklemek de gereksizdir (example_dag.py dosyası):

Yukarıdaki örnekte (example_dag.py dosyası) yalnızca 2 görev vardır, ancak 10 veya daha fazla varsa, artıklık daha belirgin hale gelir. Bundan kaçınmak için Airflow DAG'ları bağlam yöneticileri olarak kullanarak, yukarıdaki örnekte (example_dag_with_context.py) ifade ile birlikte bu DAG'ye yeni operatörler otomatik olarak atamak için kullanabilirsiniz.

(2) Görev bağımlılıklarını ayarlamak için Listeyi kullanma

Aşağıdaki resimde gösterilene benzer bir DAG oluşturmak istediğinizde, görev bağımlılıklarını ayarlarken görev adlarını tekrarlamanız gerekir.

Yukarıdaki kod parçacığında gösterildiği gibi, görev bağımlılıklarını ayarlama yolumuzu kullanmak görev_two ve sonunun 3 kez tekrarlandığı anlamına gelir. Bu, aynı sonucu daha şık bir şekilde elde etmek için python listeleri kullanılarak değiştirilebilir.

(3) Argümanları tekrarlamaktan kaçınmak için varsayılan argümanları kullanın

Bu DAG’daki tüm görevler için kullanılabilecek bir parametre sözlüğünün geçmesine izin veren hava akımı

Örneğin, DataReply'de, DataWareshouse ile ilgili tüm DAG'larımız için BigQuery kullanıyoruz ve her göreve etiketler, bigquery_conn_id gibi parametreler iletmek yerine, bunu yalnızca DAG'de gösterildiği gibi, fail_args sözlüğünü geçiyoruz.

Bu, Airflow'ta Slack Alerts Entegre Etme hakkındaki son blog yazımda zaten bahsettiğim DAG hataları yerine bireysel görev hataları hakkında uyarılar istediğinizde de faydalıdır.

(4) “Params” argümanı

“Params”, şablonlarda erişilebilir hale getirilen DAG seviyesi parametreleri sözlüğüdür. Bu paramlar görev seviyesinde geçersiz kılınabilir.

Bu son derece yararlı bir argümandır ve kişisel olarak params.param_name kullanarak jinja templating ile templated alanında erişilebildiği için çok kullanıyordum. Örnek bir kullanım aşağıdaki gibidir:

Sabit kodlama değerleri yerine parametreli DAG yazmanızı kolaylaştırır. Ayrıca yukarıdaki örneklerde gösterildiği gibi, params sözlük 3 yerde tanımlanabilir: (1) DAG nesnesinde (2) default_args sözlüğünde (3) Her görev.

(5) Hassas verileri Bağlantılarda Kaydetme

Kullanıcıların çoğu bunun farkında ama ben DAG'ın içinde düz metin olarak saklanan şifreleri gördüm. Tanrı aşkına, bunu yapma. DAG'lerinizi halka açık bir depoda saklayacak kadar kendinize güvenebileceğiniz şekilde yazmalısınız.

Varsayılan olarak, Airflow, bağlantı parolalarını meta veri veritabanında düz metin olarak kaydeder. Kripto paketi Hava Akışı kurulumu sırasında şiddetle tavsiye edilir ve basitçe pip yükleme apache-airflow [kripto] ile yapılabilir.

Daha sonra aşağıdaki şekilde kolayca erişebilirsiniz:

from airflow.hooks.base_hook İthalat BaseHook
slack_token = BaseHook.get_connection ('slack') şifre

(6) DAG'inizdeki Hava Akışı değişkenlerinin sayısını sınırlayın

Hava Akış Değişkenleri Meta Veri Veritabanında saklanır, bu nedenle değişkenlere yapılan herhangi bir çağrı Meta Veri DB'ye bağlantı anlamına gelir. DAG dosyalarınız her X saniyede bir ayrıştırılır. DAG'inizde çok sayıda değişken kullanmak (ve default_args dosyasında daha da kötüsü), veritabanınıza izin verilen bağlantıların sayısını doyuracağınız anlamına gelebilir.

Bu durumu önlemek için, ya JSON değerine sahip tek bir Airflow değişkeni kullanabilirsiniz. Bir Airflow değişkeni JSON değeri içerebildiğinden, tüm DAG yapılandırmanızı aşağıdaki resimde gösterildiği gibi tek bir değişken içinde saklayabilirsiniz:

Bu ekran görüntüsünde gösterildiği gibi, değerleri ayrı Airflow değişkenlerinde veya JSON alanı olarak tek bir Airflow değişkeninde saklayabilirsiniz.

Onlara Tavsiye edilen yolun altında aşağıda gösterildiği şekilde erişebilirsiniz:

(7) “Bağlam” sözlüğü

PythonOperator'ı çağrılabilir bir işlevle kullanırken, kullanıcılar genellikle içerik sözlüğünün içeriğini unuturlar.

Bağlam, görev örneğiyle ilgili nesnelere referanslar içerir ve şablonlandırılmış alan için de mevcut olduklarından API'nin makrolar bölümünde belgelenir.

{
      'dag': task.dag,
      'ds': ds,
      'next_ds': next_ds,
      'next_ds_nodash': next_ds_nodash,
      'prev_ds': prev_ds,
      'prev_ds_nodash': prev_ds_nodash,
      'ds_nodash': ds_nodash,
      'ts': ts
      'ts_nodash': ts_nodash,
      'ts_nodash_with_tz': ts_nodash_with_tz,
      'dün_ds': dün_ds,
      'yesterday_ds_nodash': yesterday_ds_nodash,
      'tomorrow_ds': tomorrow_ds,
      'tomorrow_ds_nodash': tomorrow_ds_nodash,
      'END_DATE': ds,
      'end_date': ds,
      'dag_run': dag_run,
      'run_id': run_id,
      'execution_date': self.execution_date,
      'prev_execution_date': prev_execution_date,
      'next_execution_date': next_execution_date,
      'latest_date': ds,
      'makrolar': makrolar,
      'params': params,
      'tablolar': tablolar,
      'görev': görev,
      'task_instance': öz,
      'ti': öz
      'task_instance_key_str': ti_key_str,
      'conf': yapılandırma,
      'test_mode': self.test_mode,
      'var': {
          'değer': VariableAccessor (),
          'json': DeğişkenJsonAccessor ()
      },
      'girişler': task.inlets,
      'çıkışlar': task.outlets,
}

(8) Dinamik Hava Akışı Görevleri Oluşturma

StackOverflow'ta dinamik görevlerin nasıl oluşturulacağı ile ilgili birçok soruyu cevapladım. Cevap basit, tüm görevleriniz için benzersiz bir task_id oluşturmanız yeterli. Aşağıda bunun nasıl başarılacağına ilişkin 2 örnek verilmiştir:

(9) "Hava akımı initdb" yerine "hava akımı yükseltildi" yi çalıştırın.

First Apache Airflow London Meetup'taki konuşmasında bu ipucu için Ash Berlin'e teşekkür ederiz.

airflow initdb, üretim veritabanımızda istemediğimiz ve istemediğimiz tüm varsayılan bağlantıları, çizelgeleri vb. oluşturur. Hava akımı yükseltilmiş olan bunun yerine veritabanı tablosuna herhangi bir eksik geçişi uygular. (eksik tabloların oluşturulması vb. dahil) Her seferinde çalıştırmak güvenlidir, hangi geçişlerin uygulanmış olduğunu izler (Alembic modülünü kullanarak).

Bu blog gönderisine eklemeye değecek bir şey biliyorsanız, aşağıdaki yorumlar bölümünde bana bildirin. Mutlu Hava Akışı :-)