This post is part of a series covering Yelp's real-time streaming data infrastructure. Our series explores in-depth how we stream MySQL and Cassandra data at real-time, how we automatically track & migrate schemas, how we process and transform streams, and finally how we connect all of this into data stores like Redshift, Salesforce, and Elasticsearch.

Read the posts in the series:

When you have a system that streams billions of messages a day, in real-time, from MySQL into Kafka, how do you effectively manage all of the schemas stored across all the databases? When you are building to connect hundreds of services, you will encounter thousands of different schemas and manually managing those schemas is unsustainable. An automated solution became necessary to handle schema changes from upstream data sources to all downstream consumers. Confluent’s Schema Registry and Kafka Connect are excellent choices to begin with, except they did not exist at the time when we started building the Yelp Data Pipeline. This is how the Schematizer was born.

Schematizer… who?

One of the key design decisions of Yelp’s Data Pipeline is schematizing all data. In other words, the pipeline forces all the data flowing through it to conform to predefined schemas, instead of existing in a free form. Standardizing the data format is crucial to the Yelp Data Pipeline because we want the data consumers to have an expectation about what type of data they are getting, as well as being able to avoid immediate impact when the upstream data producers decide to change the data they publish. Having a uniform schema representation also gives the Data Pipeline a really easy way to integrate and support various systems that use different data formats.

The Schematizer is a schema store service that tracks and manages all the schemas used in the Data Pipeline and provides features like automatic documentation support. We use Apache Avro to represent our schemas. Avro has several very attractive features we need in the Data Pipeline, particularly schema evolution, which is one of the key ingredients that make decoupling data producing and consuming possible. Each message that flows through the Data Pipeline is serialized with an Avro schema. To reduce the message size, rather than embedding the entire schema into the message, the message only contains a schema ID. Data consumers then can retrieve the schema from the Schematizer and deserialize the messages at run time. The Schematizer is the single source of truth for all the predefined schemas.

Look at all these schemas. We can organize them in various ways.

The Schematizer associates and organizes schemas in two ways: one from the data producer’s perspective and the other from the data consumer’s perspective.

The first method groups the schemas based on the data origin. Each group is defined by a namespace and a source. The data producers must specify a namespace and a source when they register the schemas with the Schematizer. For instance, a service that wishes to publish its database data into the Data Pipeline can choose its service name as the namespace, and each table as a source. Group schemas based on namespace and source Group schemas based on namespace and source

The second method is based on the data destination. Each data destination, such as a Redshift cluster or a MySQL database, is a data target, and can have one or more data origins. Each data origin consists of one or more schemas, that is, one or more namespace and source pairs as defined in the first method.

Group schemas based on data origins of single data target Group schemas based on data origins of single data target

These two different approaches allow us to search and query related schemas based on different needs. For instance, an application may want to know all the topics to which it is publishing, while another service may want to know the sources of all the data in its Redshift cluster.

Let’s register schemas, shall we?

The Data Pipeline requires all the data to be schematized and serialized with predefined Avro schemas. Therefore, when a data producer would like to publish data into the Data Pipeline, the first thing the producer does is register the schema with the Schematizer. The most common way to do this is to register an Avro schema directly.

For the data producers that do not have or cannot create Avro schemas, schema converters can be easily added into the Schematizer to convert a non-Avro schema to an Avro schema. A good example is the MySQLStreamer, a service that pumps data from MySQL databases into the Data Pipeline, which only knows about MySQL table schemas. The Schematizer can take a MySQL table schema definition and create the corresponding Avro schema. The data producer must also re-register the new schema whenever there is a schema change.

OMG, the upstream schema has changed! Will my service break?

A common pain point that every data pipeline must address is how to deal with upstream schema changes. Often times such an event requires a lot of communication and coordination between upstream data producers and downstream data consumers. Yelp is not immune to this problem. We have batch jobs and systems that ingest data coming from other batch jobs and systems. It has become a painful problem every time the upstream data changes because it may cause the downstream batch jobs or systems to crash or necessitate a backfill. The entire process is pretty labor-intensive.

We tackle this problem with schema compatibility. During schema registration, the Schematizer determines which topic should be assigned to the new schema based on schema compatibility. Only compatible schemas can use the same topic. When an incompatible schema is registered, the Schematizer creates a new topic in the same namespace and source for the new schema. How does the Schematizer determine the compatibility? Avro resolution rules. The resolution rules ensure that, in the same topic, the messages serialized with later versions of the schemas can always be deserialized with older version of the schemas, and vice versa.

Different topics are assigned to incompatible schemas Different topics are assigned to incompatible schemas

Right now, Yelp’s main database data flows through the Data Pipeline via the MySQLStreamer. Let’s say that at some point we decide to add a column with a default value to the Business table. The MySQLStreamer will re-register the updated Business table schema with the Schematizer. Since such a schema change is a compatible change based on the resolution rules, the Schematizer will create a new Avro schema and assign the latest existing topic of the same namespace and source to it. Later, if someone decides to change one column type of the Business table from int to varchar, this will cause an incompatible schema change, and the Schematizer will create a new topic for the updated Business table schema.

The guarantee of schema compatibility within each topic makes sure when the upstream schema is changed, the downstream data consumers can continue to consume the data from the same topic using their current schema, without worrying that the change may break the downstream systems. They also have the flexibility to transition to the newer topics based on their own timeline. It provides the pipeline further automation and less human intervention in schema change events.

Besides resolution rules, we also build in custom rules into the Schematizer to support some Data Pipeline features. The primary key fields of a schema are used for log compaction in the Data Pipeline. Because the key for log compaction must remain the same for a single topic, any change to the primary key fields is considered an incompatible change, and the Schematizer will create a new topic for the new schema. Also, when a non-PII (Personally Identifiable Information) schema starts to include a PII field, that change is qualified as an incompatible change as well. The PII data and non-PII data will be stored in separate topics, which simplifies the security implementation for the PII data, preventing downstream consumers from accidentally accessing the data they do not have permission to.

Logic flow to decide whether a new topic is required Logic flow to decide whether a new topic is required

One thing worth noting is the schema registration operation is idempotent. If identical schema registration calls are made multiple times, only the first one creates a new schema. The subsequent calls will simply return the registered schema. This also provides a simpler way for an application or a service to initially set up its Avro schemas. Most applications and services have Avro schemas defined in the files or in the code, but they do not hardcode schema IDs, since schema IDs depend on the Schematizer. Instead of first querying the Schematizer to get the schema information back and registering it if it doesn’t exist yet, the application or the service can use the schema registration endpoint to achieve these two operations at the same time.

Streamline all the way.

To fully streamline the pipeline for schema change events, the Schematizer can further generate the schema migration plan for the downstream systems to be applied based on the existing schema and the new schema. Currently the Schematizer is only able to create the schema migration plan for Redshift tables. A downstream system that loads data into a Redshift cluster from the Data Pipeline can simply query and apply the schema migration plan when a schema change event occurs, and then automatically pick up the new table schema without any manual process. This feature is designed to be easily extensible, and each schema migration plan generator is an exchangeable component, so later we can add more generators to support different schema types, or switch to the ones that use more sophisticated algorithms to generate the migration plans.

Who’s the data producer? Who consumes these data? The Schematizer knows it all.

In addition to registered schemas, the Schematizer also tracks the data producers and data consumers in the Data Pipeline, including which team and which service is responsible for producing or consuming the data, and how often they expect to publish the data. We use this data to effectively contact and communicate with the corresponding teams when events that require human intervention occur. This information also allows us to monitor and decide which schemas and topics may be out of service and can be deprecated or removed. As a result, it simplifies the compatibility validation during schema registration. The Schematizer can now skip deprecated schemas and check the schema compatibility only against active ones instead of every single schema for the same topic.

The data producers and consumers are required to provide this information when they start up. Initially, we planned to only store this information in the Schematizer. Since this data is also very valuable for exploratory analysis and alerting, we instead decided to write the data into separate Kafka topics outside the Data Pipeline. The data then can be ingested into Redshift and Splunk, as well as loaded into the Schematizer and displayed through the front-end web interface. We choose to use the async-Kafka producer, a non-blocking Kafka producer developed at Yelp which writes data through Clog, because it will not interfere with the normal producer publishing messages. In addition, it can avoid the circular dependency situation in which the normal producer tries to register itself by using another copy of same producer, which also tries to register itself.

Wait, which Kafka topic should I use? The Schematizer takes care of it for you.

Unlike regular Kafka producers, the data producers of the Data Pipeline do not need to know which Kafka topics they should publish the data to in advance. Since the Schematizer dictates the topic which each registered schema uses, the producers only need to tell the pipeline the schema they use to serialize the messages. The pipeline then asks the Schematizer for the topic information and publishes the messages to the correct topic. Abstracting away the topic awareness makes the interface simpler and easier to use.

The same mechanism exists for data consumers. Although they may define a specific Kafka topic to consume from, the more common use-case is to allow the Schematizer to provide the correct topics to consume from based on the group the data consumer is interested in. We introduced various grouping mechanisms in the Schematizer earlier. The data consumer can specify either a namespace and a source, or a data target, and the Schematizer will figure out the right topics in that group. This is especially useful when the data consumer is interested in a group of topics that may change over time due to incompatible schema changes. It relieves the data consumers from the burden of keeping track of every single topic in the group.

Schemas are good. Documentation is even better!

Schemas standardize the data format but may not provide enough information for people who want to understand the meaning of the data. We noticed that the people who use the data are usually not the same people who produce the data, and they often don’t know where to find the information about the data they try to use. Since the Schematizer already has the knowledge about all the schemas in the Data Pipeline, it becomes an excellent candidate to store information about the data.

Meet our knowledge explorer, Watson.

The Schematizer requires schema registrars to include documentation along with their schemas. The documentation then is extracted and stored in the Schematizer. To make the schema information and data documentation in the Schematizer accessible to all the teams at Yelp, we created Watson, a webapp that users across the company can use to explore this data. Watson is a visual frontend for the Schematizer and retrieves its information through a set of RESTful APIs exposed by the Schematizer.

Watson provides valuable information about the state of the Data Pipeline: existing namespaces, sources, and the Avro schema information within those sources. Most importantly, Watson provides an easy interface to view documentation on every source and schema the Schematizer is aware of.

Those docs aren’t going to write themselves.

The majority of the data flowing through the Data Pipeline right now come from databases. To document the sources and the schemas of these data, we leverage SQLAlchemy models. At Yelp, SQLAlchemy is used to describe all models in our databases. Besides the docstring, SQLAlchemy also allows users to include additional information for the columns of the model. Therefore, it becomes a natural location for us to put documentation on the purpose and usage of both the model and its fields.

A new ownership field is also introduced to the SQLAlchemy model to capture the maintainers and the experts for each model. We think the people who generate the data are the best source to provide documentation. Also, this approach encourages us to keep the actual data models and their descriptions in sync all the time.

class BizModel(Base):
    __yelp_owner__ = Ownership(
        teams=[TEAM_OWNERS['biz_team']],
        members=[],
        contacts=[]
    )
    __table_name__ = 'my_biz_table'
    __doc__ = 'Business information.'
    id = Column(Integer, primary_key=True, doc=r"""ID of the business.""")
    name = Column(String(64), doc=r"""Name of the business.""")

Simple SQLAlchemy model with documentation and ownership information

Developers may not always remember to include the documentation when they work on the SQLAlchemy models. To prevent that, we set up automated tests to enforce that the model is attributed and documented. Hard checks are also put in place to ensure that we never regress. Whenever a new model is added, the test suite will fail if the model is not properly documented or is missing ownership information. These automated tests and checks have moved us much closer to our goal of 100% documentation coverage.

Extract delicious documentation to feed Watson.

Once the documentation is available in the data models, we are ready to get it into the Schematizer and eventually present it in Watson. Before diving into the extraction process, we first introduce another component that plays an important role in this process: the Application Specific Transformer, or AST for short. The AST, as its name suggests, is a framework that takes in a stream of messages from one or more Data Pipeline topics, applies transformation logic to the messages’ schema and data payloads, and then outputs the transformed messages to a new set of Data Pipeline topics. Transformation components that provide specific transformation logic are chainable, and therefore multiple components can be combined to perform more sophisticated transformation logic.

We use a set of transformation components in the AST to generate more understandable data using SQLAlchemy model reflection. Since the components are chainable, we now simply create a transformation component which extracts the documentation and ownership information from the SQLAlchemy models and add it into the existing transformation chain. The documentation and ownership information of the models then are automatically extracted and loaded into the Schematizer through the existing pipeline. The implementation is surprisingly simple and seamlessly integrates into the entire pipeline, so it’s a very effective way to produce good quality documentation.

Transformation components in AST Transformation components in AST

As mentioned above, some transformation components already exist in the AST to generate more meaningful data to end users. The bit flag field transformation component flattens a single integer flags field into multiple boolean fields, each of which represents what each bit of the integer value means. Similarly the enum field transformation component converts the numeric enum value into a readable string representation. A nice bonus from these transformation components is they also produce self-explanatory and self-documented schemas at the same time, and consequently create better documentation.

Collaborate, contribute, and find.

The story doesn’t end with developer documentation. Watson also provides mechanisms for end users to collaborate and contribute toward making all of Yelp’s data easily understandable.

The first mechanism is tagging. Watson allows users to tag any source with a relevant category. A source may be a MySQL database table or a data model. For example, a Business source can be tagged with the “Business Information” category, and a User source may be tagged with the “User Information” category. End users can tag related sources with the same category and organize them in a way that makes the most sense to them. The effect of tagging is a richer understanding of how our own data sources relate and connect to each other.

Business source tagged with "Business Info" Business source tagged with “Business Info”

Adding notes is the second mechanism Watson provides. This enables users, especially non-technical users, to contribute their own documentation to a source or a field. Users such as business analysts often have valuable experience on using data, and notes have been a great way for them to share gotchas, edge-cases, and time-sensitive information.

The number one feature requested by end users for Watson was a more nuanced search. To support that, we have implemented a simple search engine in Watson that allows users to search on various aspects of the data such as schemas, topics, and data model descriptions. We chose Whoosh python package to back the search functionality as opposed to something like Elasticsearch because it allowed us to get the development moving quickly. Whoosh also provides decent performance for the volume of search data we have so far. As the volume of data increases, we will consider switching to more scalable engine later.

Conclusion

The Schematizer is a key component of Yelp’s Data Pipeline. Its schema registration operation enables the major features of the pipeline, including mitigating the upstream schema change impact on downstream consumer applications and services. The Schematizer also takes care of topic assignment for data publishing, removing the need for users to determine which topic to use. Finally, it requires and provides the documentation for every piece of data flowing through the pipeline to facilitate knowledge-sharing across the entire organization. Combined with Watson, all the employees at Yelp now have a powerful tool to access up-to-date information.

We now have dug into the Schematizer and its front-end documentation system, Watson. Next, we are going to explore our stream processor: Paastorm. Stay tuned!

Acknowledgements

Many thanks to Josh Szepietowski, Will Cheng, and Bo Wu for bringing Watson to life. Special thanks to Josh, the author of the Application Specific Transformer, for providing invaluable inputs on the AST and Watson sections in this blog post.

This post is part of a series covering Yelp's real-time streaming data infrastructure. Our series explores in-depth how we stream MySQL and Cassandra data at real-time, how we automatically track & migrate schemas, how we process and transform streams, and finally how we connect all of this into data stores like Redshift, Salesforce, and Elasticsearch.

Read the posts in the series:

Build Real-time Data Infrastructure at Yelp

Want to build next-generation streaming data infrastructure? Apply to become an Infrastructure Engineer today.

View Job

Back to blog