At Yelp, we often need to analyze data in real time. Right now most of our data is aggregated and logged using a system called Scribe. This means that real-time processes currently depend on tailing Scribe, reading messages as they are added to the end of the logs.

Simply tailing the logs means that any time a tailer is not running, due to maintenance or an unexpected error, it misses all the information logged during that time. We want a more flexible system that allows processes to pick up where they left off, so our analyses account for all of the data available.

In comes Kafka, a different kind of logging system. One of the big differences from Scribe is that Kafka provides you with the ability to start tailing from any point in the log, allowing past messages to be reprocessed if necessary. We’re already starting to use Kafka for some applications at Yelp, and all the data being written to Scribe is also available from Kafka.

One thing has been stopping us from switching to Kafka for real-time processing: the lack of a simple, efficient way to tail Kafka. What we do have is an internal service with an HTTP API for interacting with our Kafka clusters, named Franz. As an HTTP service, though, it can only respond to requests, not initiate communication. That means a client tailing Kafka with this API must poll continuously to see if new messages have been written. In addition, Franz’s current API requires clients to keep track of low-level, Kafka-specific details about their position in the logs, further inhibiting adoption.

This summer, I worked on adding a high-level WebSocket endpoint to Franz, designed to improve the Kafka tailing experience. WebSocket is a relatively new internet protocol that lets clients and servers establish long-lived, two-way connections with each other. With such an endpoint, a client could simply start one WebSocket connection with Franz to start tailing Kafka. As messages became available, Franz could then use the connection to forward the messages, without any further action from the client. This endpoint also manages the position of clients, so that reconnecting clients automatically start at the position they left off at.

Because this is the first time we’re using WebSocket at Yelp, I had a lot of freedom in the implementation. The existing parts of Franz were implemented in Java with Dropwizard, but Dropwizard is only designed for regular HTTP endpoints. Ultimately, I decided to use Atmosphere, a Java framework that supports WebSocket, and added an Atmosphere servlet to the Dropwizard environment.

Using the endpoint is fairly straightforward: you can use a Python client like ws4py, tornado.websocket, or even a Chrome extension to establish a connection to the WebSocket endpoint. Once you’re connected, you send a topic message

{
    "topic": "service_log",
    "batch_size": 20
}

where the batch_size specifies how many Kafka messages you want, max, per message from Franz. Franz will then start streaming messages back to you, max batch_size at a time. A response from Franz is pretty simple, it’s just an object containing an array of messages with some metadata on them:

{

    "messages": [
        { "topic": "<topic_name>", "partition_id": <partition_id>, "offset": <offset>, "message": <json_dict>},
        ...
    ]
}

We’re currently working on wrapping up the project and many people and teams at Yelp are excited to use it. The first user will likely be the real-time ad metrics display. Since most Yelp applications are written in Python, there is also a Python client in progress to facilitate service use of the new interface. I’m looking forward to my project being used across the organization!

Back to blog