Key Learnings on Using Apache HUDI in building Lakehouse Architecture @ Halodoc

data-platform Apr 4, 2022

Halodoc data engineering has evolved from traditional data platform 1.0 to revamping with modern data platform 2.0 using Lake House architecture. In our previous blogs, we have mentioned how we implemented Lakehouse architecture at Halodoc to serve large-scale analytical workloads. We mentioned the design considerations, best practices, and learnings during the Platform 2.0 building journey.

In this blog, we will go through Apache HUDI in detail and how it helped us in building a transactional data lake. We will also highlight some of the challenges in general people faced while building the lake house and how we overcome using Apache HUDI.

Apache Hudi

Let's start with the basic understanding of Apache HUDI. Hudi is a rich platform to build streaming data lakes with incremental data pipelines on a self-managing database layer while being optimised for lake engines and regular batch processing.

Apache Hudi brings core warehouse and database functionality directly to a data lake. Hudi provides tables, transactions, efficient upserts/deletes, advanced indexes, streaming ingestion services, data clustering/compaction optimisations, and concurrency all while keeping your data in open source file formats.

Apache Hudi can easily be used on any cloud storage platform. Apache HUDI’s advanced performance optimisations, make analytical workloads faster with any of the popular query engines including Apache Spark, Flink, Presto, Trino, Hive, etc.

Let's see some of the key challenges while building the lake house and how we solved them using HUDI and AWS cloud services.

Performing Incremental Upserts in Lake House

One of the key challenges everyone faces in building a transactional data lake is identifying the correct primary key for upserting records in the Data Lake. In most scenarios, everyone utilises the primary key as a unique identifier and timestamp field to filter the duplicate records in the incoming batches.

At Halodoc, most of the micro-services use RDS MySQL as a data store. We have 50+ MySQL databases that need to be migrated to the data lake. The transactions go through various states and updates happen frequently in most cases.

Problem:
The MySQL RDS stores the timestamp field in seconds format which makes it difficult to track transactions occurring in milliseconds or even microseconds. Identifying the latest transaction in the incoming batch was challenging for us with business modified timestamp fields.

We tried multiple approaches to solve this, by using the rank function or combining multiple fields and choosing the right compound key. Selecting compound keys was not uniform across tables and might require different logic for identifying the latest transactions.

Solution:
AWS Data Migration Service can be configured to have transformation rules that can add additional headers with custom or predefined properties.

ar_h_change_seq: A unique incrementing number from the source database that consists of a timestamp and an auto-incrementing number. The value depends on the source database system.

The header helps us to filter out duplicate records easily and we were able to update the latest records in the data lake. The header will be applied only to ongoing changes. For full-load, we had by default 0 assigned to the records and in incremental records, we have a unique identifier attached to each record. We configure ar_h_change_seq in the precombine field to remove the duplicate records from the incoming batches.

HUDI config

precombine = ar_h_change_seq
hoodie.datasource.write.precombine.field: precombine
hoodie.datasource.write.payload.class: 'org.apache.hudi.common.model.DefaultHoodieRecordPayload'
hoodie.payload.ordering.field: precombine

Small Files Problem in Data Lake

While building the data lake, there would be frequent updates/inserts happening, resulting in lots of small files in every partition.

Problem:
Let's take a look at how small files cause a problem while querying. When a query is fired to extract or transform the datasets, the driver node has to collect the metadata of each file, causing the performance overhead during the transformation process.

Solution:
Running regular compaction of small files helps to maintain the correct file sizes that are efficient for query performance. Apache HUDI supports both synchronous and asynchronous compaction.

Synchronous Compaction: This can be enabled during the writing process itself. This will increase the ETL execution time to upserts records in HUDI.

Asynchronous Compaction: The compaction can be achieved by a different process and requires separate memory to achieve it. This won’t impact the writer process and is a scalable solution too.

At Halodoc, we first adopted synchronous compaction. Slowly, we have a plan to adopt the hybrid compaction based on table sizes, growth, and use cases.

HUDI config

hoodie.datasource.clustering.inline.enable
hoodie.datasource.compaction.async.enable

Maintaining Storage Size for cost reduction

Data Lake is cheap, doesn’t mean that we should store data that is not required for business analytics. Otherwise, we will soon see the cost getting high for storage. Apache HUDI maintains the version of the file on every upsert operation to provide a time-travel query for the records. Each commit would create a new version of the files creating a lot of versioned files.

Problem:
If we didn’t enable the cleaner policy, then the storage size would increase exponentially impacting directly the storage cost. Older commits have to be purged if there is no business value.

Solution:
HUDI has two types of cleaner policy, timestamp-based, and count-based (number of commits to be retained). At Halodoc, we did calculations on how frequently the write occurs and how long the ETL process took time to complete, based on this we came up with a number of commits to be retained in the HUDI datasets.

Example: If data ingestion job into Hudi is scheduled for every 5 mins  and the longest running query can take 1 hour to finish , then the platform should retain at least 60/5 = 12 commits.

HUDI config

hoodie.cleaner.policy: KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained: 12

Or,

hoodie.cleaner.policy: KEEP_LATEST_FILE_VERSIONS
hoodie.cleaner.fileversions.retained: 1

Choosing the right storage type based on latency and business use case

Apache HUDI has two storage type that stores the datasets for different use cases. Once a storage type is selected, changing/updating to another requires re-write of entire datasets which could be a cumbersome process. So, it is very important to choose the right storage type before migrating the data to HUDI datasets.

Problem:
Choosing the incorrect storage type might impact the ETL execution time and expected data latency from data consumers.

Solution:
At Halodoc, we utilised both storage types for our workload.

MoR - MoR stands for Merge on Read. We selected MoR for tables that required instant read-access once the write is completed. It also reduces the upsert time as HUDI maintains the AVRO file for delta change log and doesn’t have to rewrite the existing parquet files.

MoR provides 2 views of datasets _ro and _rt.

  • _ro for read optimized table.
  • _rt for real time table.

CoW: CoW stands for Copy on Write. Storage type CoW was chosen for datasets where data latency, update cost and write amplification is of less priority but reading performance is of high priority.

HUDI config

type = COPY_ON_WRITE / MERGE_ON_READ
hoodie.datasource.write.table.type: type

Is File Listing a heavy process and how does HUDI resolve it

In general upsert and updates on distributed object store or file system is expensive as these systems are immutable by nature and it involves in tracking and identifying the subset of files that needs to be updated and overwrite the files with new version containing latest records. Apache HUDI stores the metadata of each files slice and file group to track the records for upsert operations.

Problem:
As mentioned earlier, having lots of files in different partitions is an overhead for the driver node to collect the information, hence causing memory/computation problems.

Solution:
To resolve this, HUDI brings the metadata concepts, meaning all the file information is stored in a separate table and is synced whenever there is a change in source. This will help the spark to read or perform file listing from 1 location, resulting in optimal resource utilisation.

These can be easily achieved with the below configuration.

HUDI config

hoodie.metadata.enabled: true

Choosing the right index for HUDI dataset

In traditional databases, an index is applied to efficiently retrieve the data from the tables. Apache HUDI also has the indexing concepts but it works slightly differently. Indexes in HUDI are mostly used to enforce the uniqueness of keys across all partitions of a table.

Problem:
Maintaining/Restricting the duplicate records in each partition or in the global partition is always critical when you want to build the transactional data lake

Solution
HUDI solves this problem by employing the indexes in HUDI datasets. It provides both global and non-global indexes. By default, it has Bloom Index. Currently, HUDI supports:

  • Bloom index: Employs bloom filters built out of the record keys, optionally also pruning candidate files using record key ranges.
  • Simple index: Performs a lean join of the incoming update/delete records against keys extracted from the table on storage.
  • Hbase index: Manages the index mapping in an external Apache HBase table.

At Halodoc, we leverage the global Bloom index so that records are unique across the partitions. One has to make a decision based on the source behaviour or if someone wants to maintain the duplicate.

Conclusion:

At Halodoc, we are using Apache HUDI for the last 6 months and it has been serving well for large-scale data workloads. In the beginning, there was some learning curve involved in choosing the right configurations for Apache Hudi.

In this blog, we share some of the problems we faced in setting up the lake house architecture and best practices for rightly configuring the parameters/config while using Apache HUDI in a production environment.

Sharing a few of the reference links for a better understanding of the above topics.

Join us

Scalability, reliability and maintainability are the three pillars that govern what we build at Halodoc Tech. We are actively looking for engineers at all levels and  if solving hard problems with challenging requirements is your forte, please reach out to us with your resumé at careers.india@halodoc.com.


About Halodoc

Halodoc is the number 1 all around Healthcare application in Indonesia. Our mission is to simplify and bring quality healthcare across Indonesia, from Sabang to Merauke. We connect 20,000+ doctors with patients in need through our Tele-consultation service. We partner with 3500+ pharmacies in 100+ cities to bring medicine to your doorstep. We've also partnered with Indonesia's largest lab provider to provide lab home services, and to top it off we have recently launched a premium appointment service that partners with 500+ hospitals that allow patients to book a doctor appointment inside our application. We are extremely fortunate to be trusted by our investors, such as the Bill & Melinda Gates Foundation, Singtel, UOB Ventures, Allianz, GoJek, Astra, Temasek and many more. We recently closed our Series C round and In total have raised around USD$180 million for our mission. Our team works tirelessly to make sure that we create the best healthcare solution personalised for all of our patient's needs, and are continuously on a path to simplify healthcare for Indonesia.

Jitendra Shah

Data Engineer by profession. Building data infra using open source tools and cloud services.