Batch API part 3: Supercomputing at scale
April 04, 2022 | 8 min. read
As Aurora continues to grow and prepare for the launch of Aurora Horizon and Aurora Connect—our autonomous trucking and ride-hailing products—reliable and scalable computing power only becomes more important.
Batch API is designed to efficiently and flexibly manage the massive amounts of computing power we require, allowing our teams to run millions of tasks every day. But to maintain optimal performance, the owners of these tasks also need to be able to analyze the effectiveness of all of this computing power. And to do that, they need quick and reliable access to Batch API data.
In the third and final part of our series on Aurora’s Supercomputer, Batch API, we will discuss the challenges of logging user data and debugging on a massive scale, and how we provide users with tools to evaluate the performance of their jobs via access to historical analytics data. We’ll also talk about the future of our platform and how Batch API is scaling to a billion tasks a day.
System analytics and debugging
Every time a user creates a job with Batch API, the system captures a “log,” or report, of that job and everything that happened while that job was executed. Our engineering teams use these logs to understand how the Aurora Driver is performing on the road, determine what capabilities we need to refine or build, and create simulated scenarios to further test and train our technology. They are also critical to debugging, allowing users to uncover why a particular job failed or produced incorrect results.
Batch API tasks produce several terabytes of log data per month. All of these logs must be stored and made easily accessible in case a user ever needs to review them.
To facilitate log availability, Batch API captures user log data twice: logs are streamed to CloudWatch in real-time and then stored in Amazon S3 at task completion.
Logging millions of tasks every day
Streaming logs to CloudWatch means they can be displayed in Batch API’s user interface while a task is progressing. It also means that logs are available even if the node suddenly disappears from the cluster, perhaps due to spot termination or an unexpected crash. This is not possible with Kubernetes pod logs alone, since Kubernetes log data lives on the machine from which it was produced.
Per-account API quota limits in CloudWatch make it difficult to provide reliable multi-tenant programmatic access, without which users cannot use their own tools to directly access and process the log output of their jobs. So we created a custom Kubernetes DaemonSet to allow users to configure Batch API to pipe their logs at task completion time to an S3 bucket and prefix of their own choosing. This means that issues of storage tier, retention, and permissions are user-level concerns rather than administrative concerns.
There are many off-the-shelf products available for capturing and storing Kubernetes logs to external storage mediums, but they typically require:
an unscalable per-node Kubernetes Watch on the Pod API,
an unscalable periodic Pod API list query,
reliance on the unstable Kubelet API, or
some combination of the above three.
Our custom Kubernetes DaemonSet, written in Go and creatively named “Aurora Cluster Logging Daemon,'' or ACLD for short, uses the file system itself to discover when a pod has landed on its node. The ACLD then uses an efficient GET operation to the Pod API to discover the pod’s metadata for log-augmentation purposes and creates hard-links to the log files it is tailing. This way, the lifetime of the log is decoupled from the lifetime of the container, allowing Batch API to aggressively delete pods and their associated containers without losing the logs, and giving us the ability to quickly scale our Kubernetes clusters.
Historical task data
Because of its ubiquity at Aurora, Batch API’s historical data is very nearly an audit record of all batch computation for most organizations within the company. This historical data is useful for many purposes (tracking job performance, examining usage patterns, etc.), but there are challenges to making it available in a form suitable for general consumption.
Batch API uses a single SQL database with one read-write instance and one read-only, stand-by instance. This one database serves as both a persistence engine and a message-passing engine between the highly-available API Gateway and the Batch Runner backend. This single backend would be a bottleneck for the resource-intensive online analytical processing (OLAP) queries that modern data science takes for granted.
Another issue is that the schema of the backend database is heavily optimized for:
running tasks, and
allowing users to create and monitor tasks.
This optimization is necessary to handle the scale expected of Batch API, but makes analytical queries directly on the SQL backend clunky and opaque. Certain highly-nested structures are stored in encoded form, making them completely inaccessible to SQL queries.
To empower teams to access this data, Batch API has a built-in capability to export all of the configuration and status contents of every API element of a job (jobs, workers, tasks, and task attempt statuses) to S3 in parquet format. The parquet data is then indexed into AWS Athena using a Glue Crawler.
Figure 1. (Top) This chart is an example of a query of Batch API historical data. It shows the cumulative time for each worker in a single job, broken down by each task that worker ran. (Bottom) This chart shows the volume of resources used over the lifetime of a single job.
Export happens immediately after a job completes. The data is partitioned by the creation date of the job, which allows efficient export deduplication in the case that a job is started and stopped again. This partitioning scheme also allows efficient partition-index utilization for most common historical data queries. The parquet schema is automatically generated from the protobuf API, so developers accustomed to the public API have a familiar environment when using the SQL interface.
The future: How to scale to a billion tasks per day
If there’s one thing we’ve learned from years of working in the large-scale computing space, it’s that user demand for resources and capabilities only grows over time. Any business relying on technological innovation to push the boundaries of an industry will progressively require more and more computing power at greater and greater scale.
Figure 2. Our users, when asked how many resources they will need.
To keep ahead of demand, engineering teams typically adhere to the mantra “scaling 10x,” which means designing a system that can handle 10x whatever your current peak is. But the technology industry is moving at such a rapid pace that scaling 10x is insufficient. For the self-driving space, specifically, the rate of technological advancements is accelerating and it is not enough to plan for one or two years into the future. In fact, at our current pace, we might make the same amount of progress in one year as we’ve made over the entire lifetime of the company to date. Therefore, we’re focused on scaling 100x—if we can handle 10 million tasks per day now, let's design a system that can handle a billion.
Of course, anytime you talk about scaling, doing so “horizontally” (by allowing more things to happen in parallel) is one of the first strategies that is considered, and for good reason. If you can break your problems down into smaller pieces and divide the work among multiple instances, you can handle larger amounts of work at the same time.
As discussed in previous parts of this series, we’ve designed several tools that help us improve our capabilities by adding more capacity:
Placement API can easily add more clusters and more regions to our systems as our needs increase. (In fact, we’ve recently added a third production cluster.)
Gateway API is stateless and can easily scale to more replicas.
The Batch Runner is sharded and can easily add more shards to handle more jobs and more tasks.
When you’ve broken your problem down as much as possible, you might find that some pieces are still too large (for example, jobs with 10 million tasks). So now you need to adjust your system to be able to handle those larger pieces.
Batch API was designed to be as efficient as possible with built-in flexible horizontal scaling. Vertical scaling is constrained by practical limitations like processor speed and memory limits. But even so, there are places we can continue to scale:
The task limit for a single job is based on holding the DAG in memory, so increasing the resources available to Batch Runner shards can enable larger DAGs.
Database queries can be optimized to keep performance acceptable.
APIs can be redesigned to trade performance for ease of use, such as by adding pagination.
Horizontal and vertical scaling can only do so much. Sometimes, in order to enable overall scale, you might have to de-prioritize some things in favor of other, more immediately important things.
In large-scale, heterogeneous workload systems, conflicts around resource usage and timeliness are inevitable.
What happens when we need to run a series of simulations to validate the latest Aurora Driver software so that the trucks that are waiting to head out for the day can safely leave their depots, but all of our resources are currently being utilized by a massive data-mining job? How do you prevent workloads from interfering with each other?
Of course, with a cloud environment, you can throw money at the problem and provision additional nodes to handle the new workloads. However, sometimes that isn’t possible, either because of budget constraints or cloud environment capacity problems.
Our solution is priority-aware worker scheduling. We’re developing a feature that provides a fixed number of priority tiers that job owners can request when configuring a worker. Higher priority worker pods receive a higher Kubernetes priority class, which puts them ahead of lower priority pods in the scheduling queue. As part of organization-wide resource quotas (for CPUs, memory, etc.), teams receive a certain amount of priority units that limit the amount of high-priority workers that can be active at the same time. When a team does not have enough remaining priority units at the requested tier, the worker they create will fall back to the next highest tier with sufficient priority units.
Figure 3. A team is hitting the quota limit (set at 150 priority units) for their highest priority tier (blue line), so the additional workers that are created overflow to a lower tier (yellow line).
If two workloads in the example above are scaling up at the same time, workers for the high-priority simulations will be scheduled onto nodes first, ahead of the low-priority data mining workers.
But what if the low-priority work began first? We are developing another feature to detect when high-priority workers have been queued for longer than an acceptable threshold. Once this threshold is reached, Batch API would begin to non-disruptively scale down the low-priority workers, shutting them down as they complete tasks to liberate resources for the pending high-priority workers. Of course, this feature would also need to monitor how long low-priority work has been suspended in this way, and balance the Kubernetes queue to prevent that work from starving.
Often, users don’t actually know how many resources they need to successfully complete their work. While the amount of work users request continues to increase, it’s not always the case that that work requires an increase in resources.
Batch API provides various ways of making resource usage more efficient such as Worker Reuse and Automated Resource Tiers. But those can’t really help if the amount of resources being asked for doesn’t match up with what users are actually using. While we as providers of said resources can’t know what users are going to need, we can provide them with the data they need to right-size their own workloads.
We are currently rolling out a Batch API feature that provides detailed statistical data of resource usage for every task, aggregated “roll-up” metrics at the task, worker, and worker config levels, and any resource waste (asked for 4 GiB but only used 3) or excess (asked for 1 CPU but used 2.5). These stats will inform users about their real resource usage and could help them make smarter decisions on how to use resources more efficiently.
Figure 4. This is an example of task resource statistics for one of our simulation jobs, showing data about CPU usage including waste and excess.
As we have previously discussed, self-driving requires a variety of resource-intensive technologies like machine learning, computer vision, and simulation. Aurora’s teams of engineers, scientists, and developers need to be able to deploy these tools in a reliable and scalable way in order to solve the hard problems facing autonomous vehicles.
In this series, we’ve demonstrated how the Batch API platform allows us to run our computing systems at a massive scale, while also ensuring we use resources efficiently. In Part 1, we discussed how Batch API’s “dynamic DAG” support means our users can create extremely complex and flexible computing jobs that can tackle an almost infinite variety of problems. In Part 2, we also revealed how AWS and Kubernetes are the backbone of the BatchAPI system, and how we use features such as autoscaling and advanced placement techniques to make resource management more scalable, reliable, and efficient. Finally, we dove into the difficulties that come with logging and analyzing all of the data Batch API generates, and shared some new tools we’re working on to eventually scale to a billion tasks per day.
In future posts, we’ll continue to explore the infrastructures and technologies that enable teams across Aurora to tackle self-driving industry challenges and make progress toward the safe deployment of autonomous vehicles.
the Compute Infrastructure team
Mike Deats, Brandan Greenwood, Justin Pinkul, Cody Yancey, Andrew Gemmel, and Bridget Spitznagel