Engineering Blog

October 15th, 2014

Introducing Pyleus: An Open-source Framework for Building Storm Topologies in Pure Python

Yelp loves Python, and we use it at scale to power our websites and process the huge amount of data we produce.

Pyleus is a new open-source framework that aims to do for Storm what mrjob, another open-source Yelp project, does for Hadoop: let developers process large amounts of data in pure Python and iterate quickly, spending more time solving business-related problems and less time concerned with the underlying platform.

First, a brief introduction to Storm. From the project’s website, “Apache Storm is a free and open source distributed realtime computation system. Storm makes it easy to reliably process unbounded streams of data, doing for realtime processing what Hadoop did for batch processing.”

A Pyleus topology consists of, at minimum, a YAML file describing the structure of the topology, declaring each component and how tuples flow between them. The pyleus command-line tool builds a self-contained Storm JAR which can be submitted to any Storm cluster.

When it comes to massive data processing demos, “word count” is a classic. Since Storm operates on “unbounded streams of data”, we can’t calculate a total count for each unique word, since the input stream could continue indefinitely. Instead, our topology will maintain a monotonically increasing counter for each unique word, and log the value each time we see it.

So how can you build a word count Storm topology using Pyleus? All you need is a pyleus_topology.yaml and a few Python components.

For a simple demonstration, you need to know about the three core concepts in Storm:

  • A tuple is the unit of data within a Storm topology, flowing into and out of processing components.
  • Spouts are components that feed tuples into a topology. Usually, a spout consumes data from an external source, like Kafka or Kinesis, then emits records as tuples.
  • Bolts subscribe to the output streams of one or more other spouts and bolts, do some processing, then emit tuples of their own.

This topology has three components: A spout that emits a random line of “lorem ipsum” text, a bolt that splits lines into words, and bolt that counts and logs occurrences of the same word.

Here are the contents of pyleus_topology.yaml:

The spout configuration is self-explanatory, but the bolts must indicate the tuple streams to which they are to subscribe. The split-words bolt subscribes to line-spout with a shuffle_grouping—this means that tuples emitted from line-spout should be evenly and randomly distributed amongst all instances of split-words, be there one, five, or fifty.

count-words, however uses a fields_grouping on the ‘word’ field. This forces all tuples emitted from split-words with the same value of ‘word’ to go to the same instance of count-words. This allows the code in word_count.count_words to make the assumption that it will “see” all occurrences of the same word within the same process.



word_count/ is left as an exercise for the reader. (Or, you could just check out the full example on GitHub)

Now, running pyleus build in this directory will produce a file word_count.jar which you can submit to a Storm cluster with pyleus submit, or run locally for testing with pyleus local.

The code for a Pyleus topology can be very simple, but one feature in particular we are excited about is built-in virtualenv integration. Simply include a requirements.txt file alongside your pyleus_topology.yaml, and pyleus build will build a virtualenv that your code can use and embed it in the JAR. You can even re-use packaged Pyleus components, and refer to them directly in pyleus_topology.yaml!

Another team at Yelp has already developed a Pyleus spout for consuming data from an internal source and built a Python package for it. Now, others within the company can add one line to their requirements.txt and use the spout in their pyleus_topology.yaml without writing a single line of code.

Pyleus is beta software, but feedback and pull requests are happily accepted.

Get started using Pyleus by installing it with pip install pyleus, then check out the source on GitHub for more examples.

October 9th, 2014

Using MOE, the Metric Optimization Engine, to Optimize an A/B Testing Experiment Framework

A/B Testing Experiment Frameworks and MOE

We recently open sourced MOE, the Metric Optimization Engine, a machine learning tool for solving global, black box optimization problems. An example application for such a system is optimally running online A/B experiments.

A/B testing segments the users that come to a site into buckets, or cohorts, and show different versions of the site to different cohorts of users. One can show 50% of users one version of a site (version A) and 50% of users another version of a site (version B). After some amount of time we can see which version of the site performs better on various metrics (like Click Through Rate (CTR), conversions, or total revenue) and shift all of your traffic over to the better version. This can be repeated as we converge towards the best version of the site under the metric(s).

It can take a long time to attain statistically significant results for experiments. We want to know with high confidence that one version of the site is better than another. Depending on user traffic it can take days, or even weeks, to run a single iteration. These experiments are also expensive; by showing a suboptimal version of your site, you are sacrificing potential revenue. There is also an opportunity cost associated with creating, developing, and running any particular experiment. So the fewer experiment iterations to get to the optimum, the better.

Furthermore, many A/B tests are merely parameter selection problems (an A/A’ test, where a feature stays the same, and only the underlying parameters change). Given a feature, we want to find the optimal configuration values for its parameters as quickly as possible.

MOE was designed to optimally solve problems like this, when we want to find an optimal set of parameters (inputs) of a function (a metric, like CTR) when sampling this function is time consuming or expensive (like running an A/B test experiment on live traffic). By leveraging MOE we can build a general experiments platform that automatically promotes good cohorts and pushes traffic away from under-performing cohorts, then replacing them with new, winning configurations.

MOE in A-B testing (1)

Figure 1: MOE can be used to optimally assign traffic allocations in an A/B testing framework and to suggest new parameters to sample given the historical values already sampled.

Using Multi-Armed Bandits for Optimal Traffic Allocation

Instead of naively allocating traffic in some uniform way (like a 50/50 split) we can use the information about how an individual cohort is performing to dynamically and optimally allocate the best percentage of traffic to it. This is fundamentally a tradeoff between exploration and exploitation. Exploration is equivalent to gaining more knowledge about the system by continuing to allocate traffic to a wide variety of points. Exploitation on the other hand, is using the knowledge we have already gained to get the most expected return out of the system as we currently understand it. Others have shown that using Multi-Armed Bandits can achieve better results, faster, in online A/B tests.

MOE has many different bandit policies implemented and allows the user to select a policy that best fits their desired trade-off between the exploration and exploitation of the data. By using a Multi-Armed Bandit approach we can limit the amount of suboptimal traffic allocation in an A/B test, leaving as few gains on the table as possible.

Using Bayesian Global Optimization to Suggest Parameters

As the Multi-Armed Bandit system starts lowering the traffic allocation for a certain experiment we have two choices. We can turn off that cohort, and allow the remaining cohorts to battle it out until we have one clear winner (within whatever confidence we desire). Alternatively, we can have MOE suggest a new cohort using Bayesian Global Optimization (with a new set of underlying parameters) for the system to try. This allows the system to play an internal game of king-of-the-hill, where better and better parameters are being sampled as the objective continues to rise higher and higher.

MOE uses the historical data gained so far to decide which points to sample next. It suggests new parameters to try that have the highest Expected Improvement under a Gaussian Process model. These are the parameters that are expected beat the current best set of parameters by the most, using whatever metric we provide as the objective function. This method converges much faster to global optima than other heuristic methods like grid search, random search or basic hill climbing.

By constantly spinning down underperforming cohorts in our experiment and replacing them with optimal new cohorts we allow our experiment framework to continually search the underlying parameter space for the best possible parameters.

Putting it all Together

Let’s assume that we want to optimize some underlying parameter that affects our ad Click Through Rate (CTR) in some way. This parameter could represent anything from a threshold in our system to some hyperparameter to a feature in our ad targeting system. Let’s say we currently are running with this parameter set to the value 0.2 in production, our status quo.

We would like to run an experiment where we sample other values of parameter in an attempt to maximize CTR. We will need to run time consuming and expensive experiments every time we want to measure CTR, so we will use MOE to help us find the optimal value in as few samples as possible. We simulate the observed CTR of the system for 1 million users by sampling from a Bernoulli distribution with the given true CTR.

In a real world system we do not know exactly how the underlying parameter affects CTR, which is why we need to sample various values with real traffic to find the optima. For the purposes of this example, we will define the exact function of CTR with respect to the underlying parameter for simulation purposes, keeping it hidden from MOE. Note that the function need not be convex or even continuous.


Figure 2: The graph of the true CTR vs the underlying parameter. There is a local CTR maximum at 0.143 and a global CTR maximum at 0.714.

We begin the experiment by asking MOE for 2 new parameter values to sample, in addition to our current status quo value of 0.2. We will set our objective function for MOE to optimize to be the relative gain over the status quo CTR. We note that this gives our status quo parameter a value of 0, and any parameter that has a higher CTR than the status quo will have a positive value, while any parameter that has a lower CTR will have a negative value.

Our initial representation of the underlying system, and the Gaussian Process, looks like Figure 3.


Figure 3: The blue (mean) and green (variance) plot is the initial Gaussian Process representation of the space that MOE optimizes. The dashed gray line is the true, unknown objective function, the relative CTR gain vs status quo.

MOE suggests two points as the initial parameters to test. These will be the corresponding parameters for cohort 1 and cohort 2 respectively. Initially we will conservatively allocate new cohorts to have 5% of traffic each, using the epsilon greedy bandit policy with epsilon set to 0.15. The remaining 90% of traffic will go towards the best parameter value observed so far.

Once we have the simulated data we can determine which cohorts have performed well and which ones should be turned off. We turn off any cohort that has a CTR more than two standard deviations worse than the current best observed CTR. We then query MOE for new, optimal parameters to sample given how all historical parameters have performed. This is repeated multiple times as poorly performing parameter values are culled and new parameter values are suggested by MOE. See Figure 4 for the evolution of the underlying Gaussian Process and watch MOE converge to the globally optimal parameter value.


Figure 4: The new points be sampled each day (red x) and previous points (blue x) influence the Gaussian Process representation of the relative CTR gain that MOE uses to optimize the underlying parameter. We can see the Gaussian Process getting more accurate with respect to the true function (dashed gray) as more points are sampled.

By iterating to the global optimal parameter value in as few samples as possible, we can increase the system CTR far beyond the initial status quo value, or even the local maxima. In this example, by the second set of sampled parameters MOE is beating both the status quo CTR value and the CTR the best local parameter value, achieving the best global parameter value within the third set of sampled points.


Figure 5: The simulated CTR during for each round of sampled parameter values. Initially the system CTR is equal to the status quo CTR. MOE quickly finds better and better values of the underlying parameter, converging to the global optima in 3 short sets of samples.

Using MOE in your experiment framework

October 6th, 2014

October Events at Yelp

October will be a busy month for all of us, with a lot of great conferences and events happening back-to-back. We start the month off with an SF Python meetup in collaboration with PyLadies. Zach Musgrave will be presenting on performance profiling in production. Later this month, Scott Clark will be presenting on MOE for SF Machine Learning here at Yelp HQ too! If you’re not familiar with MOE, it’s our black box optimization engine to help you with real world metric optimization.

We’ll also be at Grace Hopper on October 8-10, StrataConf on October 15-17, and at HTML5DevConf on October 20-24. Michael Stoppelman will be presenting at StrataConf on Yelp’s Data Pipeline and Ken Struys will be presenting at HTML5DevConf on Scaling Selenium Testing at Yelp so make sure to catch both of those!

Events happening at Yelp HQ

  • Wednesday, October 8, 2014 – 6:15PM – Python in Handling E-Crime and Massive Web Requests in Production (SF Python)
  • Thursday, October 9, 2014 – 6:00PM – Android Tour plus Hands-On Session (Learning Android Development)
  • Wednesday, October 15, 2014 – 6:00PM – Introducing the Metric Optimization Engine (MOE) (SF Machine Learning)
  • Thursday, October 16, 2014 – 6:30PM – Designing for Longevity (Designers + Geeks)
  • Wednesday, October 22, 2014 – 6:45PM – How Product Managers Make Your Business Scale (Products That Count)
  • Thursday, October 30, 2014 – 6:00PM – Large-scale Linear Classification: Status and Challenges by Prof. C.J. Lin (SF Machine Learning)

Conferences Yelp is attending

October 1st, 2014

Yelp At Grace Hopper

Yelp is happy to announce that we’re headed to the Grace Hopper Celebration of Women in Computing conference in October!

Admiral Grace Hopper was a pioneer in computer science. She was a driving force behind the development of English-based programming languages and an inspiring example of how intelligence and perseverance can overcome technical and cultural adversity.

The Grace Hopper conference, presented by the Anita Borg Institute and ACM, is the largest gathering of women in computing in the world. Its mission to “connect, inspire, and guide women in computing and organizations that view technology innovation as a strategic imperative” is a perfect tribute to the incredible Grace Hopper.

Yelp’s delegation this year is composed of engineers from a diverse selection of teams, from mobile to frontend to backend to infrastructure. We’re looking forward to meeting tech women from all over the world to swap stories, experience, and expertise.

We’re also excited about this year’s talks. Our ‘must see’ lists include:

If you’d like to learn more about life as a Yelp Engineer, have great ideas about how to make our app better, want to know what’s up with all those dog photos, or have the urge to shoot the technological breeze, come find us – we’d love to chat! We’ll be wandering around the conference and you can also come find us at booth 532 in the Career Fair.

Speaking of the Career Fair, we will be accepting resumes there as well as online. If you’re attending Grace Hopper, here’s a Super Special Grace Hopper link. The page might say New Grad, but we’re happy to take all applications through this special link.

Stay awesome, keep coding, and see you in Phoenix!


September 29th, 2014

Intern Project: Real-Time Log Tailing with Franz, our Kafka Wrapper

At Yelp, we often need to analyze data in real time. Right now most of our data is aggregated and logged using a system called Scribe. This means that real-time processes currently depend on tailing Scribe, reading messages as they are added to the end of the logs.

Simply tailing the logs means that any time a tailer is not running, due to maintenance or an unexpected error, it misses all the information logged during that time. We want a more flexible system that allows processes to pick up where they left off, so our analyses account for all of the data available.

In comes Kafka, a different kind of logging system. One of the big differences from Scribe is that Kafka provides you with the ability to start tailing from any point in the log, allowing past messages to be reprocessed if necessary. We’re already starting to use Kafka for some applications at Yelp, and all the data being written to Scribe is also available from Kafka.

One thing has been stopping us from switching to Kafka for real-time processing: the lack of a simple, efficient way to tail Kafka. What we do have is an internal service with an HTTP API for interacting with our Kafka clusters, named Franz. As an HTTP service, though, it can only respond to requests, not initiate communication. That means a client tailing Kafka with this API must poll continuously to see if new messages have been written. In addition, Franz’s current API requires clients to keep track of low-level, Kafka-specific details about their position in the logs, further inhibiting adoption.

This summer, I worked on adding a high-level WebSocket endpoint to Franz, designed to improve the Kafka tailing experience. WebSocket is a relatively new internet protocol that lets clients and servers establish long-lived, two-way connections with each other. With such an endpoint, a client could simply start one WebSocket connection with Franz to start tailing Kafka. As messages became available, Franz could then use the connection to forward the messages, without any further action from the client. This endpoint also manages the position of clients, so that reconnecting clients automatically start at the position they left off at.

Because this is the first time we’re using WebSocket at Yelp, I had a lot of freedom in the implementation. The existing parts of Franz were implemented in Java with Dropwizard, but Dropwizard is only designed for regular HTTP endpoints. Ultimately, I decided to use Atmosphere, a Java framework that supports WebSocket, and added an Atmosphere servlet to the Dropwizard environment.

Using the endpoint is fairly straightforward: you can use a Python client like ws4py, tornado.websocket, or even a Chrome extension to establish a connection to the WebSocket endpoint. Once you’re connected, you send a topic message

    "topic": "service_log",
    "batch_size": 20

where the batch_size specifies how many Kafka messages you want, max, per message from Franz. Franz will then start streaming messages back to you, max batch_size at a time. A response from Franz is pretty simple, it’s just an object containing an array of messages with some metadata on them:

    "messages": [
        { "topic": "<topic_name>", "partition_id": <partition_id>, "offset": <offset>, "message": <json_dict>},

We’re currently working on wrapping up the project and many people and teams at Yelp are excited to use it. The first user will likely be the real-time ad metrics display. Since most Yelp applications are written in Python, there is also a Python client in progress to facilitate service use of the new interface. I’m looking forward to my project being used across the organization!