Leveraging PySpark for High-Volume Data Aggregation at Halodoc
At Halodoc, we process data from our process zone and transform it into well-structured data models, consisting of both dimension and fact tables. This data transformation pipeline is built on Amazon EMR on EKS, with Apache Spark serving as the core processing engine. Initially, we utilized Spark SQL to perform the majority of our transformations into dimension and fact tables.
As our data volume grew and business requirements became more complex, this naturally translated into increased demand for computational resources. We began to experience limitations with Spark SQL due to its relatively restricted control over query execution behavior. In particular, Spark SQL abstracts away many low-level optimizations, meaning it does not provide the same level of granular control as PySpark. While this abstraction simplifies query writing, it limits our ability to fine-tune performance. For example, with Spark SQL, we couldn’t:
- Fine-tuning broadcast decisions, such as broadcasting a table only when it’s small enough, wasn’t possible in Spark SQL, as it couldn’t be combined with Python logic to make data-driven decisions at runtime.
- Cache CTEs (Common Table Expressions) that were reused multiple times.
As we adopted modern table formats like Apache Hudi, we found that while Spark SQL is supported by a powerful Catalyst Optimizer and can handle predicate pushdown, it may not always generate the most optimal execution plan, especially when interacting with Hudi’s internal mechanisms such as indexing and data skipping. PySpark’s DataFrame API provides more granular control, allowing us to explicitly apply filters, such as on Hudi's internal metadata columns (e.g., partition_column) to more directly guide Hudi’s internal data skipping behavior.
To overcome this, we migrated several transformation queries using PySpark, which gives us more control over how Spark works. Besides that, we are also utilizing features such as broadcast joins to optimize small table joins, persist to cache intermediate results, apply column pruning, and partition pruning to reduce the amount of data scanned during query execution. These enhancements allow us to build more efficient data pipelines while making better use of memory and compute resources within our EMR on EKS environment.
Broadcast Joins
Before implementation, we had already established a performance baseline by collecting detailed execution metrics from our Spark jobs. As described in this blog post, we tracked key indicators such as the total number of tasks, total memory used, and total core utilization across different workloads.
One of the features we implemented during the migration to PySpark was the use of broadcast joins. In our data model, it is common to join large tables with relatively small lookup tables. Under the default Spark behavior, even small tables are shuffled across the cluster during a join.
By explicitly broadcasting small tables, we instruct Spark to send a full copy of the smaller table to all worker nodes. This eliminates the need for shuffling the large table and enables local joins on each executor, significantly reducing job stages.
The performance improvement results from eliminating shuffle operations. Shuffles force Spark to redistribute data across the cluster and create additional tasks. Broadcast joins optimize this by distributing the smaller dataset to all nodes just once, where it remains available in memory. This approach reduced our task count from 12 to 4 and cut processing time from 12 seconds to just 2 seconds.
Key Considerations for Broadcast Joins
- Full Control Configuration :
To manually control broadcast behavior, disable auto-broadcast. This ensures joins only broadcast when explicitly specified viabroadcast()hints. - Broadcast Size :
Tables too large may cause Out-of-Memory (OOM) errors. Always verify the table size before broadcasting.
Persist / Cache
PySpark offers capabilities for optimizing performance, especially when dealing with repetitive operations on large datasets. One such feature is the ability to persist or cache intermediate results. By storing the DataFrame in memory (or optionally on disk), PySpark avoids re-reading data from external sources like Amazon S3 on subsequent actions. This significantly reduces I/O overhead and improves execution time, particularly in workflows that reuse the same data across multiple stages.
The choice of MEMORY_AND_DISK strikes a balance between speed and reliability. When the data fits in memory, it can be accessed quickly; otherwise, Spark will automatically spill the excess to disk. While reading from disk is slower than memory, it still avoids the more expensive operation of reloading data from S3. This storage level is particularly useful when dealing with moderately large datasets that may partially fit in memory but are reused across multiple stages. For significantly larger datasets that are unlikely to benefit from memory caching due to size, using StorageLevel.DISK_ONLY can be a more stable and memory-efficient option.
The example below demonstrates a common use case for caching. We derive two separate DataFrames place_df and locality_df by applying additional filters and column selections specific to each use case. Since profile_sections_df is already cached, these operations do not trigger repeated reads or computations from the original S3 source. Instead, Spark accesses the data directly from memory or disk.
We can also verify that caching is working as expected by inspecting the Spark UI. In the query plan section, cached DataFrames will appear as InMemoryTableScan or InMemoryRelation, indicating that Spark is reading data from the cache rather than re-scanning the original table.
As shown in the screenshot above, this is a clear example of what happens when persist() or cache() is not applied. In this case, Spark repeatedly reads the same intermediate data from S3 every time the DataFrame is used in different stages.
Column Pruning
As part of the migration, we also applied column pruning by explicitly selecting only the necessary columns required for each transformation. Instead of using broad queries like SELECT * We rewrote the logic to target specific columns relevant to the business logic. This strategy provides several benefits:
- Reduced I/O and memory usage: By reading only the required columns from the source. Spark minimizes the amount of data transferred and held in memory.
- Faster query execution: With less data to process, Spark can execute transformations and actions more efficiently.
Partition Pruning
We applied partition pruning on several tables to optimize data read performance. In our use case, some tables only contain data from the last 3 months from the creation date, and the source tables are partitioned by a column called PDATE (partition date).
By leveraging the PDATE column, we applied filters explicitly at the beginning of the transformation process. This allows Spark to read only the relevant partitions and skip the rest, significantly reducing the amount of data scanned.
Lineage Extraction
We already have an MWAA (Airflow) job using the Kubernetes Operator that is responsible for running SQLLineage. SQLLineage reads SQL queries from our S3 Bucket and stores the extracted results into tables. We refer to these as table dependency and column dependency mappings. As mentioned earlier, some scripts in our repository will be written in PySpark after the migration, which causes lineage extraction to be unavailable for tables that use PySpark.
To address this, we created a new job in MWAA that converts PySpark scripts into equivalent SQL representations so that lineage can still be populated. We use Amazon Bedrock with the Nova Pro model, where we send the PySpark code along with a prompt. The generated SQL outputs are then stored in a separate S3 bucket and later integrated into our existing lineage extraction workflow.
We chose Nova Pro because PySpark transformation code can be highly complex and sometimes very long, requiring a model that excels at deep code understanding and long-context reasoning.
From the above diagram, we developed an MWAA job that leverages the Bedrock API and GitLab API to automatically convert PySpark transformation code into SQL. This enables us to extract lineage metadata from PySpark scripts by translating them into a SQL format compatible with our existing lineage tools, and this job should be run before the Lineage Extraction job.
The job consists of the following key steps:
- Retrieve PySpark Files: Transformation scripts (
.pyfiles) are fetched from S3 using Boto3. - Check Commit Timestamp: We utilize the GitLab API to check if the file was last updated within day -1. Only files modified yesterday are processed to minimize Bedrock cost.
- Convert to SQL: The Bedrock API is used to translate eligible PySpark scripts into SQL.
To ensure high-quality output, we must provide a well-defined prompt including the desired SQL dialect. Without clear instructions, the generated SQL can often be messy, inconsistent, or even incorrect.
- Store Results: Converted SQL files are stored in a separate S3 bucket.
- Run Lineage Verification: We verify the SQL using LineageRunner to ensure correctness and syntax validity. The process runs via KubernetesPodOperator, allowing us to easily scale or add custom validation logic when needed.
Challenges
When planning to adopt PySpark to get more flexibility, we encountered challenges around table and column dependencies. We had already adopted SQLLineage to extract lineage information directly from SQL queries, which helped us track both table-level and column-level dependencies across our data models. However, SQLLineage only works with raw SQL files and does not support lineage extraction from PySpark scripts. This limitation posed a challenge, as some of our transformation logic would be implemented using PySpark.
To address the lineage gap introduced by our shift to PySpark, we utilized Amazon Bedrock with the Nova Pro model to convert PySpark transformation scripts into equivalent SQL representations. These generated SQL versions are then stored in an S3 bucket and integrated into our existing lineage extraction workflow.
Conclusion
By migrating from Spark SQL to PySpark, we achieved the same query results with significantly lower resource usage. This shift gave us greater control over Spark’s execution, enabling more efficient use of broadcast joins, partitioning, and column pruning, especially for tables with heavy transformations that require full control over execution behavior.
As a result, we reduced total task execution time by 25% and core utilization by 14%, leading to faster performance and improved resource efficiency.
This transition also introduced a gap in lineage extraction, since our existing system relied on raw SQL. To address this, we built a new job using Amazon Bedrock with the Nova Pro model to convert PySpark scripts into SQL, allowing lineage to remain populated and consistent.
While generative AI accelerates this process, it's important to review the results carefully and regularly. Outputs may contain syntax or logic issues if prompts are not specific enough. Keep in mind that AI usage comes with underlying token costs, so it’s best to use it selectively and only when needed to maximize both cost-efficiency and value.
Reference :
Join us
Scalability, reliability, and maintainability are the three pillars that govern what we build at Halodoc Tech. We are actively looking for engineers at all levels and if solving hard problems with challenging requirements is your forte, please reach out to us with your resumé at careers.india@halodoc.com.
About Halodoc
Halodoc is the number one all-around healthcare application in Indonesia. Our mission is to simplify and deliver quality healthcare across Indonesia, from Sabang to Merauke. Since 2016, Halodoc has been improving health literacy in Indonesia by providing user-friendly healthcare communication, education, and information (KIE). In parallel, our ecosystem has expanded to offer a range of services that facilitate convenient access to healthcare, starting with Homecare by Halodoc as a preventive care feature that allows users to conduct health tests privately and securely from the comfort of their homes; My Insurance, which allows users to access the benefits of cashless outpatient services in a more seamless way; Chat with Doctor, which allows users to consult with over 20,000 licensed physicians via chat, video or voice call; and Health Store features that allow users to purchase medicines, supplements and various health products from our network of over 4,900 trusted partner pharmacies. To deliver holistic health solutions in a fully digital way, Halodoc offers Digital Clinic services including Haloskin, a trusted dermatology care platform guided by experienced dermatologists.We are proud to be trusted by global and regional investors, including the Bill & Melinda Gates Foundation, Singtel, UOB Ventures, Allianz, GoJek, Astra, Temasek, and many more. With over USD 100 million raised to date, including our recent Series D, our team is committed to building the best personalized healthcare solutions — and we remain steadfast in our journey to simplify healthcare for all Indonesians.