Scaling Collaborative Filtering with PySpark
-
Shafi Bashar and Alex Gillmor, Machine Learning Engineers
- May 7, 2018
Here at Yelp our core mission is to connect users with great local businesses. On the engineering side, that requires us to tackle big data and machine learning problems. One of the natural ways to apply machine learning to our core mission is to deliver relevant and personalized business recommendations. This helps us deliver great content to users. For example, we can let users know when a hot and new coffee shop we think they will love opens in their neighborhood through push notifications or features like collections.
This is an area of active research and development at Yelp, and there are many promising ways to think about these problems. One of the techniques that we have found to be useful is collaborative filtering. In the first part of this blog series, we present a case study of creating a large scale personalized business recommendations framework using collaborative filtering and Apache Spark and discuss our experience working on this project and lessons learned.
There are tens of millions of users interacting with tens of millions of businesses on Yelp. To train a collaborative filtering model of this size, a distributed framework like Apache Spark seemed a natural choice for us. Our tool of choice was PySpark - the Python API for Spark.
A widely-adopted approach for building a collaborative filtering model is matrix factorization. The Spark ML library contains an implementation of a collaborative filtering model using matrix factorization based on the ALS (Alternative Least-Square) algorithm.
In the matrix factorization model, we start with a matrix in which each user is represented as a row and each business as a column, and entries represent the user’s interactions with a specific business. For example, to find a dinner reservation for a special occasion, user A may swipe through photos of restaurant B of their liking. So the entry corresponding to the row/user A, column/business B, i.e. MAB = 1. Since a user interacts with only a few of the tens of millions of businesses on Yelp, this matrix is sparse. The objective of the matrix factorization model is to decompose this sparse user-business matrix into a user matrix U (where each row represents a latent factor representation of that user, also known as user-embeddings) and a business matrix B (where each column represents a latent factor representation of that business, also known as business-embeddings) as shown in the following figure:
Users can express their interest in different businesses on Yelp through various interactions. These interactions vary and can be either explicit feedback, such as writing reviews or rating a business, or implicit feedback, like checking in at or bookmarking a business, adding it to a collection, viewing a business page, browsing through business photos, or using features like request a quote (RAQ). In our matrix factorization model, we used the implicit feedback signals because the amount of implicit feedback signals exceeds the amount of explicit signals.
Spark’s ALS module worked seamlessly to train the user-business matrix factorization model. The implicit feedback model has a few hyperparameters - rank or dimension of the latent space, regularization parameters, alpha or weight on the positive interaction terms. In order to determine the proper set of hyper-parameters, we performed grid-search hyper-parameter tuning over a subset of the input data. We chose the set of hyper-parameters that maximizes our desired metric in the validation set.
Once the matrix factorization model is trained, we can determine a user’s affinity or interest for a business as the inner-product of the user-embedding (u), and the business embedding (b) found from the model.
user-business affinity-score = u ⋅ b
To serve the exact top-K business recommendations for all users, we need to calculate the affinity-scores for all (user, business) pairs. This is a special case of maximum inner product search (MIPS) problem. Serving exact top-K recommendation efficiently is still an active area of research.
Spark ML has a built-in function to calculate the exact top-K item recommendations for all users. In our experience, however, the function doesn’t scale well for our dataset. To predict for all users, the built-in function took a significant amount of time. We also found the function would frequently throw OOM (Out of Memory) errors.
Challenges
While digging into the source code of the built-in function, we discovered a few potential issues:
-
To get all combinations of (user, item) pairs, the function uses the crossJoin operation between user DataSet and item DataSet. The crossJoin operation is very expensive, resulting in lots of shuffling and is usually not recommended. For example, a crossJoin between 1000 user rows and 1000 item rows will produce 1,000,000 = (1000 x 1000) combined rows. Even though the function groups the user and item sets into blocks to reduce the possible combinations per crossJoin operations, for a large dataset like ours, the crossJoin operation can still explode.
-
The function calculates the inner-product for each (user, item) pair individually. Instead, optimized BLAS libraries can take advantage of the cache hierarchy of modern CPUs/GPUs when performing block matrix multiplications.
To solve these problems, we implemented a top-K prediction algorithm in PySpark using a block matrix-multiplication based technique as shown in the figure below:
The idea behind the block matrix multiplication technique is to row-partition the tall and skinny user matrix and column-partition the short and wide business matrix. Once partitioned, we can parallelize matrix multiplications over these partitions.
We decided to use PySpark’s mapPartitions operation to row-partition and parallelize the user matrix. Each spark executor (located in worker nodes) will then operate on a partition, aka a chunk of rows from the user matrix.
The next challenge is to find a mechanism to make the business matrix available across all worker nodes without using the expensive crossJoin operation. One possible solution is to use the broadcast operation to distribute the business matrix. However, to use broadcasting, the entire matrix first needs to be loaded in the memory of the master node. In our use case, this wasn’t an option because the business matrix was too large in size. Instead, we copied the business matrix file directly from S3 to the worker nodes using the addFile operation.
For a given number of worker nodes, the trade-off we need to be aware of here is the number of executors vs. available resources for each executor. Ideally, we want as many executors as possible so that we can compute large number of recommendations in parallel. However, the more executors we assign to a worker node, the less resources (CPU, memory etc.) each of these executors receive. We want to ensure that each executor has sufficient resources to load the business matrix file in memory and perform fast matrix multiplication.
To speed up the computation and reduce the memory requirements in each executor, we can column-partition the business matrix and split it into multiple files (shown in the figure above). During the mapPartitions operation, each executor will then sequentially load these business matrix files one at a time into memory, perform block matrix multiplication with the user matrix block, determine and maintain a rolling top-K recommendations.
As a bonus, using PySpark makes it possible to leverage popular python libraries like NumPy and SciPy, which come with very fast implementations of matrix multiplication. With proper setup of the BLAS and LAPACK modules, block matrix multiplication can outperform individual dot product calculation.
The pseudo-code of our implementations steps are summarized below:
-
Step 1: Divide and save business matrix into p different files
- Step 2: Add these files to worker nodes
sc.addFile(BUSINESS_MATRIX_FILES_S3_PATH)
- Step 3: Load user matrix row as rdd
user_row_rdd = sc.textFile(USER_MATRIX_S3_PATH)
- Step 4: Use mapPartitions operation to calculate the exact top-K business recommendations for all users
user_topK_biz_recs = user_row_rdd.mapPartitionsWithIndex( get_topK_biz_recs_for_users)
- Step 5: Save results to S3
user_topK_biz_recs.saveAsTextFile(OUTPUT_S3_PATH)
The pseudo-code of the logic inside the mapPartitions is provided below:
In summary
leveraging the Spark ML library, we built a distributed collaborative filtering model for our large user-business interactions dataset. Even though in our experience, the out of the box exact top-K recommendation function didn’t scale well, we were able to leverage PySpark’s highly parallel framework in conjunction with NumPy and SciPy to implement the well-known trick of block matrix multiplication to produce exact top-K recommendations for all users. In the next post in this series, we will discuss our experience with deploying PySpark code in production as well as how we unit test our Spark code. In the meantime, check out some of the ML and Spark tooling we’ve open sourced.
Acknowledgements
We would like to thank the following for their feedback and review: Eric Liu, Niloy Gupta, Srivathsan Rajagopalan, Daniel Yao, Xun Tang, Chris Farrell, Jingwei Shen, Ryan Drebin, Tomer Elmalem.
Interested in working on Apache Spark and scaling machine learning?
We're hiring! Check out our current job openings. We’d like to hear from you!
View Job