Dynamic DAG Generation in Airflow: Best Practices and Use Cases

Data Engineering Sep 13, 2024

In Halodoc, the essential tool we use for orchestration is AWS Managed Workflows for Apache Airflow (MWAA). This is a platform designed to orchestrate and monitor complex workflows efficiently. It provides scalability, availability, and security features to ensure reliable execution of data pipelines.

In this blog, we have documented the best practices to follow in an Airflow environment which not only decreases the CPU utilisation but also helps in reducing the cost. It includes the importance of reducing the top-level code in the DAGs, reducing DAG parsing time, and reducing the total number of DAG Python files. By implementing these optimisations, we achieved reduction in overall CPU utilisation and improved worker node efficiency, resulting in significant savings towards our MWAA cost.

Airflow top-level code

The code which is outside the operators and tasks that gets executed when the DAG is parsed by the Airflow scheduler is called the Airflow top-level code. MWAA generally executes this top-level code at every regular intervals in order to prepare the DAG and refresh in the web server. To configure this frequency, MWAA provides min_file_process_interval configuration in the Airflow environment to set the DAG parsing time. The logs on DAG parsing are available in Cloudwatch at the path configured in the MWAA environment.

Cloudwatch log path

dag_processor_manager.log log stream is used for logging the steps of the DAG Processor parsing over all the DAGs in the environment. Along with it, there is a log stream for each DAG which can be used to monitor a single DAG during DAG Processing.

DAG parsing before optimisation

The Scheduler was unable to run the jobs or sensors in the DAGs because of the substandard top-level code, which increased the Scheduler CPU memory utilisation. As a result, DAG runs were mistakenly reported as unsuccessful. At this point, the CPU usage was noted as follows.

Scheduler CPU Utilisation graph before optimisation

Best practices

  • It is advised to set scheduler.min_file_process_interval to 300 seconds or more as a general best practice to prevent excessive polling, reduces scheduler load.
  • Package import handling: All the packages, and functions imported outside the tasks will be executed while parsing as well. Hence avoid unnecessary imports or heavy packages at the top-level of the code.
  • Database calls: If the configs related to a DAG or metadata are getting fetched at the top-level code by query execution, it's better to avoid any database calls to prevent query execution during DAG parsing.
  • API calls: Any API calls in top-level code are generally discouraged in Airflow due to performance and scalability concerns along with API cost (if exists).
    If API calls or database calls are unavoidable in certain scenarios, consider the following practices to make things better.
    Caching: Storing the results of API/database in the cache memory to minimise redundant requests.
  • When all these measures are taken and the DAG parsing is optimised, similar functioning DAGs can be combined into the DAG group so that multiple DAGs are created with a single Python file.

Dynamic DAG and Deferrable Operator

Dynamic DAGs are the DAGs that are generated dynamically while parsing the python file which tends to change based on the parameters.

After performing all of these optimisations on the top-level code, we have grouped similar functioning DAGs which get generated from a single Python file. This has significantly reduced the DAG parsing time. Before optimisation, a single, complex DAG incurred a parsing time of 5 seconds. After applying performance enhancements, this DAG file created four structurally similar DAGs, resulting in a combined parsing time of 1 second.

Difference between DAG creation

Below is the sample code of dynamic DAG generation based on the dependant parameters. After fetching the list of DAGs and their configurations, it'll be looped to create a individual DAG.

sample code of Dynamic DAG generation

Deferrable operator

Standard Airflow operators, tasks and sensors typically consume an entire worker slot during their execution cycle. Whenever the task or sensor in the DAG is waiting for certain external conditions, deferrable operator feature helps in optimising the resource utilisation by releasing worker slots.

After starting execution, the operator takes up a worker slot. When the operator is deferred, it releases the worker slot and defers itself until it reaches a point when it must wait. To keep track of the waiting condition, a trigger is made. The operator continues to execute on an available worker when the trigger fires, indicating that the condition has been met.

As an instance, if a system with 50 concurrently active DAGs is there and each containing at least one sensor, can potentially saturate a worker pool of 50, thereby preventing the smooth initiation of new DAGs. Airflow provides the parameter for most of the operators to make it a deferrable operator.

Sample code for Deferred operator

In our system, because of the EMR sensors the Scheduler was occupied and not able to execute the further steps in the queue which led to auto cancellation of steps. Making the EMR operator - EmrContainerOperator deferred has helped in using the resources efficiently.

Overall, by following the best practices like reduction in top-level code of DAG and reduction in number of DAG Python files by implementing dynamic DAGs, we are able to reduce the DAG parsing time.

DAG parsing of 13 DAGs in 1 sec after optimisation

The Scheduler CPU utilisation has dropped by 21% following the completion of all the aforementioned optimizations.

As the load on worker nodes has been reduced by deferred operator, the cost of the MWAA worker has reduced by 75%

Reduction in overall MWAA cost

Summary

In this blog, we have outlined best practices for optimising Airflow performance and resource utilisation. By minimising API calls in DAG top-level code and consolidating similar functionalities into dynamic DAGs, we significantly reduced scheduler CPU consumption by 21%. Additionally, employing the EMR operator as a deferrable operator has freed up worker nodes, resulting in improved overall system efficiency and cost savings of 75% on worker node of MWAA . These optimisations have collectively enhanced Airflow's overall performance and scalability.  

References

Avoiding pitfalls in airflow

Airflow: Dynamic DAG

Airflow: Top-level code

Join us

Scalability, reliability, and maintainability are the three pillars that govern what we build at Halodoc Tech. We are actively looking for engineers at all levels and if solving hard problems with challenging requirements is your forte, please reach out to us with your resumé at careers.india@halodoc.com.


About Halodoc

Halodoc is the number 1 all around Healthcare application in Indonesia. Our mission is to simplify and bring quality healthcare across Indonesia, from Sabang to Merauke. We connect 20,000+ doctors with patients in need through our Tele-consultation service. We partner with 3500+ pharmacies in 100+ cities to bring medicine to your doorstep. We've also partnered with Indonesia's largest lab provider to provide lab home services, and to top it off we have recently launched a premium appointment service that partners with 500+ hospitals that allow patients to book a doctor appointment inside our application. We are extremely fortunate to be trusted by our investors, such as the Bill & Melinda Gates Foundation, Singtel, UOB Ventures, Allianz, GoJek, Astra, Temasek and many more. We recently closed our Series C round and In total have raised around USD$180 million for our mission. Our team works tirelessly to make sure that we create the best healthcare solution personalized for all of our patient's needs, and are continuously on a path to simplify healthcare for Indonesia.

Jitendra Bhat

Data Engineer at Halodoc