Airflow Cluster Architecture at Halodoc - Challenges and Learnings

data-enginnering Jun 08, 2020

In this blog post, we will explain the Airflow installation steps, the architecture of the Airflow cluster at Halodoc, and the learnings and challenges of using the Airflow in the past couple of months. In our previous blog, we gave an overview of how we evolved our batch pipeline using Airflow. If you haven’t already, you can read over at -  Evolution of Batch Data Pipeline at Halodoc.

Airflow is a platform to programmatically author, schedule, and monitor workflows. A workflow is a directed acyclic graph (DAG) of tasks. Airflow works on the concept of DAG. It can distribute tasks on a cluster of nodes.

We used the following selection criteria when evaluating AirFlow:

  • Open source is preferred.
  • Managed service on AWS is preferred.
  • Should be cost-effective.
  • Should have broad community support.

Halodoc's tech stack is hosted on AWS. We would have preferred a managed AirFlow on AWS. But it is not provided as managed service at the moment. We had to go for self-hosted Airflow on clustered EC2 instances that can scale.

To set up an airflow cluster, we need to install below components and services:

  1. Airflow Webserver: A web interface to query the metadata to monitor and execute DAGs.
  2. Airflow Scheduler: It checks the status of the DAG's and tasks in the metadata database, create new ones if necessary, and sends the tasks to the queues.
  3. Airflow Metadata Database: It contains the status of the DAG's runs and task instances.
  4. Airflow Message Broker: It stores the task commands to be run in queues.
  5. Airflow Workers: They retrieve the commands from the queues, execute them, and update the metadata.

Airflow - Deployment Architecture

We host the Airflow on a cluster of EC2 instances. Currently, we have set up an 8 node cluster with 3 master and 5 worker nodes. The number and size of nodes are decided based on the performance testing in terms of load and amount of data to be processed. In the future, the number of nodes might increase with the growth of business and data to be processed.

The components used by Airflow are as follows:

  1. Airflow Webserver, Worker and Scheduler - AWS EC2
  2. Metadata - AWS RDS
  3. Message Broker - AWS managed Redis
Current Airflow Cluster Setup at Halodoc

Airflow - Installation Steps

Key Points to consider while creating an Airflow cluster:

  • Airflow Configs need to be homogenous across all the nodes in the cluster.
  • Python3.6 or above is recommended.

Step 1: Create a user for airflow
$ sudo adduser airflow
$ sudo usermod -aG sudo airflow  #make the user sudo
$ sudo su airflow                               #Login as airflow user

Step 2: Install the required packages
$ sudo apt install python3.6
$ sudo apt install python3.6-pip
$ sudo apt-get install build-essential libssl-dev libffi-dev python-dev
$ sudo apt-get install python3.6-dev libmysqlclient-dev

Step 3: Install Airflow using pip command:
$ pip3 install apache-airflow     #include an airflow version if needed.

Step 4: After installation, some configs changes to be done based on the requirements:
Step 4.1: Setting up Airflow home directory
$ export AIRFLOW_HOME=/users/airflow/airflow

Step 4.2: Update airflow.cfg config file
$ cd AIRFLOW_HOME

airflow.cfg

dags_folder = [AIRFLOW_HOME]/dags
plugins = [AIRFLOW_HOME]/plugins

#store logs remotely in s3.
remote_logging = True
remote_log_conn_id = aws_s3_conn_id
remote_base_log_folder = s3://airflow/airflow-logs
encrypt_s3_logs = False

#Based on your location.We operate in Indonesia hence JKT
default_timezone = Asia/Jakarta  

executor = CeleryExecutor
sql_alchemy_conn = mysql://airflow:passwd@host:port/airflow  #metadata information
load_examples = False
parallelism = 128
dag_concurrency = 32
max_active_runs_per_dag = 1

default_owner = data-team

result_backend = db+mysql://airflow:passwd@host:port/airflow
broker_url = redis://host:port/db  #message broker information

default_impersonation = airflow
dag_run_conf_overrides_params = True
worker_precheck = True
endpoint_url = http://localhost:8080  #The port on which to run the web server

web_server_port = 8080
worker_refresh_interval = 30
worker_refresh_batch_size = 1
workers = 4
worker_concurrency = 16
worker_autoscale = 16,12

Step 5: Configure the systemctl service to start the airflow webserver, worker and scheduler to respective ec2 instances. Benefit of having systemctl to start is we can set the restart strategy for each component if any failures occur.

$ vim /etc/systemd/system/airflow-scheduler.service

[Unit]
Description=Airflow scheduler daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service

[Service]
EnvironmentFile=/etc/environment
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow scheduler
Restart=always
RestartSec=5s

[Install]
WantedBy=multi-user.target

$ vim /etc/systemd/system/airflow-webserver.service

[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service

[Service]
EnvironmentFile=/etc/environment
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow webserver
Restart=always
RestartSec=5s

[Install]
WantedBy=multi-user.target

$ vim /etc/systemd/system/airflow-worker.service

[Unit]
Description=Airflow worker daemon
After=network.target postgresql.service mysql.service redis.service rabbitmq-server.service
Wants=postgresql.service mysql.service redis.service rabbitmq-server.service

[Service]
EnvironmentFile=/etc/environment
User=airflow
Group=airflow
Type=simple
ExecStart=/usr/local/bin/airflow worker
Restart=always
RestartSec=5s

[Install]
WantedBy=multi-user.target

Step 6: Start the airflow component.
Step 6.1: Initialize the metadata store.
$ airflow initdb

Step 6.2: Start the webserver.
$ systemctl start airflow-webserver.service

Step 6.3: Start the scheduler.
$ systemctl start airflow-scheduler.service

Step 6.4: Start the worker node.
$ systemctl start airflow-worker.service

Airflow - Monitoring & Alerting

As of today, we have almost 150+ DAGs and 750+ tasks running in our production environment. We expect these numbers to go up in future due to the exponential growth of our business and data. The Data Engineering (DE) team is responsible for ensuring no downtime of the service. To achieve this, we have set-up monitoring and alerting for each of the services. We had set up a monitoring dashboard and alerting mechanism for any failure alerts to be sent to slack since that’s the primary mode of notifications for failures in our services. Here is how we’ve set up alerting for different components of airflow.

Challenges Faced During the Cluster Set-up

  • Right-sizing of the cluster as the number of DAGs to be deployed and the amount of data to be processed was not predictable.
  • Configuring the dag_concurrency and worker_concurrency during the cluster set-up.
  • Setting up the logging with EFK (Elasticsearch/Fluentbit/Kibana) stack, as the message patterns of the airflow job logs are not similar. Then, we enabled a s3 logging where we can see the logs in the Airflow UI itself.

Operating Airflow - Key Takeaways

  1. The following configurations should be set carefully in the airflow.cfg config file based on the resources available in the server.
    - parallelism: max number of task instances that should run simultaneously.
    - dag_concurrency: number of task instances allowed to run concurrently by the scheduler.
    - max_active_runs_per_dag: maximum number of active DAG's runs per DAG.
  2. Setting up the systemd process of all the airflow services. We can start the airflow services using the daemon process. i.e
    airflow webserver -D | airflow worker -D | airflow scheduler -D

    The drawbacks with above process would be:
    - If any of the services goes down due to some failure, we need to manually restart it.
    - If the server is restarted, the service has to be restarted again manually.
    - If the server goes down and we restart the service, we won’t be able to find the logs as these logs are printed in the stdout.

    Advantages of systemd process:
    - Restart strategy on any failures.
    - Easy to check the status, start, restart or stop the service using command below:
    systemctl status|start|restart|stop  service
  3. Alerting for a task getting queued up for more than 15 mins. This is being achieved using the prometheus metric exporter and sending alerts to slack.
    alert_rule in prometheus
    groups:
    - name: airflow
      rules:
    - alert: AirflowTaskQueued
          expr: airflow_num_queued_tasks > 0
      for: 15m
          annotations:
             summary: "Instance: `{{ $labels.instance }}`\nJob: `{{ $labels.job }}`\n"
             description: "Alarm: `Task getting queued for more than 15minutes` "
  4. Try to decrease the number of variables used in the dag. Because the variables are stored in the metadata (MySQL or Postgres DB) and airflow establishes a connection to metadata every time the dag is executed which might result in a maximum connection limit for the database. We store all variables to be used in a single dag in a JSON structure and used the same wherever needed in the dag using Variable.get(DAG_ID, deserialize_json=True)

Summary

We have tried to provide a brief on how we created an airflow cluster, managed it using monitoring and alerting as the key aspect. Also, we have listed some of our learnings and best practices to be followed in using airflow. The learnings and improvement to our existing environment is a never-ending process; we evolve as and when our tech evolves.

Some of our future scopes involve:

  • Migrating from Celery Executor to Kubernetes Executor where we can save the cost for the airflow workers when it's ideal.
  • Making airflow scheduler highly available(HA).
  • Upgrading the airflow version to 2.0 (waiting for release) where we can leverage the latest features of it.

Scalability, reliability and maintainability are the three pillars that govern what we build at Halodoc Tech. We are actively looking for data engineers/architects and  if solving hard problems with challenging requirements is your forte, please reach out to us with your resumé at careers.india@halodoc.com.


About Halodoc

Halodoc is the number 1 all around Healthcare application in Indonesia. Our mission is to simplify and bring quality healthcare across Indonesia, from Sabang to Merauke.
We connect 20,000+ doctors with patients in need through our Tele-consultation service. We partner with 2500+ pharmacies in 100+ cities to bring medicine to your doorstep. We've also partnered with Indonesia's largest lab provider to provide lab home services, and to top it off we have recently launched a premium appointment service that partners with 500+ hospitals that allows patients to book a doctor appointment inside our application.
We are extremely fortunate to be trusted by our investors, such as the Bill & Melinda Gates foundation, Singtel, UOB Ventures, Allianz, Gojek and many more. We recently closed our Series B round and In total have raised USD$100million for our mission.
Our team work tirelessly to make sure that we create the best healthcare solution personalised for all of our patient's needs, and are continuously on a path to simplify healthcare for Indonesia.