At Yelp, we encountered challenges that prompted us to enhance the training time of our ad-revenue generating models, which use a Wide and Deep Neural Network architecture for predicting ad click-through rates (pCTR). These models handle large tabular datasets with small parameter spaces, requiring innovative data solutions. This blog post delves into our journey of optimizing training time using TensorFlow and Horovod, along with the development of ArrowStreamServer, our in-house library for low-latency data streaming and serving. Together, these components have allowed us to achieve a 1400x speedup in training for business critical models compared to using a single GPU with Petastorm.

The Challenge

At Yelp, we encountered several challenges in optimizing the training time of our machine learning models, particularly pCTR model.Our primary tool for machine learning tasks is Spark, and most of our datasets are stored in S3 in Parquet format. Initially, training on 450 million tabular samples took 75 hours per epoch on a p3-2xlarge instance. Our goal was to scale this efficiency to 2 billion tabular samples while also achieving a per epoch training time of less than 1 hour. This required innovative solutions to address several key challenges:

  • Data Storage: Efficiently manage large1 distributed tabular datasets stored in Parquet format on S3 to ensure compatibility with our Spark ecosystem. Petastorm was inefficient for our needs because of the tabular nature of our data, leading to the development of ArrowStreamServer, which improved streaming performance.
  • Distributed Training: Efficiently scale across multiple GPUs by transitioning from TenserFlow’s MirroredStrategy to Horovod.

Technical Implementation

Data Storage

We have opted to store materialized training data in a Parquet dataset on S3 for the following reasons:

  • Compatibility: It integrates seamlessly with Yelp’s Spark ecosystem.
  • Efficiency: Parquet is highly compressed and optimized for I/O operations.
  • Scalability: S3 offers virtually unlimited storage capacity.
  • Performance: S3 can support very high throughput2 when accessed in parallel.

After materializing the training data in S3, it needs to be read and converted into a TensorFlow Dataset. Our first approach was to use Petastorm, which is very easy to use and integrates well with Spark. However, we found that Petastorm did not fit our use case as it was much slower when using the rebatch approach to achieve the desired batch size. This inefficiency is due to Yelp’s focus on tabular datasets with hundreds of features and hundreds of thousands of rows per3, where unbatching causes tensor explosions and creates millions of tensors.

To address this challenge, we explored alternative solutions, and discovered that Tensorflow I/O has an implementation called ArrowStreamDataset. By using TensorFlow Dataset directly, we can avoid relying on a Python generator, which is known for limited portability and scalability issues. Upon testing, we found that this approach performed significantly better for our use case.

Method Time
boto3 read raw 18.5 s
Petastorm(a batch per row group) 76.4 s
Petastorm(rebatch to 4096) 815 s
ArrowStream(batch to 4096) 19.2 s
Time to convert 9 millions samples  

Unfortunately ArrowStreamDataset only supports dataset being stored in a local disk with Feather format which is incompatible with our ecosystem and inhibits scaling to our necessary dataset sizes. To work with ArrowStreamDataset, and to more effectively stream data from S3, we implemented an ArrowStreamServer with PyArrow. ArrowStreamServer reads and batches datasets, serving them as RecordBatch streams over a socket. Each ArrowStreamServer runs in a separate process to have better parallelism.

On the consumer side, we utilized ArrowStreamDataset to read RecordBatches from endpoints, enabling efficient batching and interleaving of datasets.

System diagram of ArrowStream Dataset and Server

Distributed Training

As training data grew from hundreds of millions to billions of samples, we adopted distributed training across multiple GPUs4 to improve training time. Ultimately we choose Horovod as our default distribution method.

Initially, we used TensorFlow’s built-in MirroredStrategy for distributed training. We got almost linear speedup in 4 GPUs. However, as we scaled from 4 GPUs to 8 GPUs, we discovered that MirroredStrategy was not optimal, resulting in low GPU, CPU, and IO metrics. The bottleneck was identified in the Keras data handler, which struggled with sharding the dataset across devices.

After we observed bottlenecks with TensorFlow’s built in strategies, we decided to try Horovod’s more sophisticated distributed training capabilities. In our tests, we found Horovod provided linear performance scaling up to 8 GPUs compared to using a single GPU. We believe the reasons for Horovod’s superior performance are as follows:

  • Efficient Process Management: Horovod uses one process per device, which optimizes resource utilization and avoids GIL in Python.
  • Gradient Conversion: By converting sparse gradients to dense gradients, Horovod significantly improves memory efficiency and speed during all-reduce operations. We recommend converting sparse gradients to dense gradients for models with a large batch size and numerous categorical features5.

Switching to Horovod presented several challenges, primarily due to the use of Keras’s premade WideDeepModel and the complexities of resource management on a multi-GPU machine.

To support Keras, Horovod implemented DistributedOptimizer to wrap and override the _compute_gradients method, but Keras’s premade WideDeepModel directly calls GradientTape instead of calling minimize to support two optimizer cases. To address this issue, we had to override the train_step of the WideDeepModel.

We also encountered a “threading storm” and out-of-memory issues as thousands of threads were created. To prevent oversubscription of cores and memory, we devised a way to downsize thread pools used by these libraries from all available resources to only the available resources per GPU.

setting value
tf.data.Options.threading.private_threadpool_size number of cpu cores per GPU
tf.data.Options.autotune.ram_budget available host memory per GPU
OMP_NUM_THREADS number of cpu cores per GPU
TF_NUM_INTEROP_THREADS 1
TF_NUM_INTRAOP_THREADS number of cpu cores per GPU
Settings for splitting resources based on GPU  

Summarize

To put all of this together and incorporate it into Yelp’s existing Spark ML ecosystem, we designed a KerasEstimator to act as the Spark Estimator in our ML pipeline. As shown in the following figure, we first materialize transformed features into S3 and serve it with ArrowStreamServer on the training Spark Executors, then we make use of ArrowStreamDataset to stream training data from the ArrowStreamServer. TFMirrorRunner6 and HorovodRunner7, as two concrete implementations of the SparkRunner, are used to set up and train Keras Model in Spark executor.

Overview of KerasEstimator fits into Spark Pipeline

Results and Benefits

Performance Improvements

Our benchmarks on a dataset with 2 billion samples with the pCTR model demonstrated substantial improvements. By optimizing data storage with ArrowStream, we achieved an 85.8x speedup from a starting point of 75 hours with 450 million samples. Additionally, implementing distributed training provided a further 16.9x speedup, resulting in a total speedup of approximately 1,400x. These results underscore the effectiveness of our approach in optimizing both speed and cost.

note: all tests are done using AWS P3 instances except A10G, which is a G5dn instance.

Conclusion

Our transition to using TensorFlow with Horovod has significantly accelerated our machine learning training processes, reducing costs and improving developer velocity. This approach not only addresses the challenges we faced but also sets a foundation for future scalability and efficiency improvements. IO is often the bottleneck for Neural Network training, especially for tabular datasets and relatively simple models. Improving IO equates to improving training performance. For training small to medium-sized models, AWS G series instances often outperform P series instances especially if we take cost into account.

Acknowledgements

Thanks Nathan Sponberg for implementing the KerasEstimator.

Footnotes

  1. Up to 10s of terabytes. 

  2. AWS S3 can provide up to 100Gb/s to a single EC2 instance. Introduction - Best Practices Design Patterns: Optimizing Amazon S3 Performance 

  3. Parquet row group is the default batch size in Petastorm 

  4. We are using p3.8xlarge and p3.16xlarge 

  5. In our case we have hundreds categorical features and a batch size of 32K-256K. 

  6. Wrapper for Tensorflow MirrorStrategy on Spark 

  7. Wrapper for Horovod on Spark 

Become an Engineer at Yelp

We work on a lot of cool projects at Yelp. If you're interested, apply!

View Job

Back to blog