Creating a graph of performance data from a Hadoop cluster sounds like it should be a fairly straightforward task, but Pepperdata’s instrumentation on a typical cluster can generate over a million data points per minute – especially if you’re collecting both node-level metrics like CPU, memory, disk, and network statistics, along with job and task-level data. A cluster with 500 nodes could easily produce billions of time series data points per day (the “time series” nature of the data poses some other unique issues – more on that in a minute). Even a simple visualization like drawing a single line is a challenge.
When I joined Pepperdata last November, I knew I would work on the data wrangling and visualization side of the software (rather than the instrumentation and control components that interface directly with Hadoop). I immediately (and naively) began thinking about fun ways to visualize and explore massive amounts of time-series data (VizTree, Time Series Spirals). It wasn’t until I actually began working on the
Pepperdata software that I realized the scale of the data involved, and all the system architecture and basic plumbing required to enable even simple visualization and manipulation of this amount of data.
To increase visibility into a Hadoop cluster, Pepperdata monitors not only the cluster nodes, but the individual tasks that compose a Hadoop job. This produces a lot of data. And displaying this data in a meaningful way, in a reasonable amount of time, is a challenge.
To get a sense of the amount of data, let’s focus on Hadoop task data. Imagine a cluster with 500 nodes, where each node may be running 10 tasks at any moment; there may be 100 different interesting task attributes or metrics, we care about: memory consumed, HDFS reads, CPU time, etc. If we sample each of these metrics every 5 seconds for every task on the cluster, we generate about 6 million data points every minute (that’s billions of data points per day). These data points are not just time-stamped data points, they are time series – each series is a discrete unit with each data point dependent on (or at least closely related to) its neighbors within the series. Time series require special treatment in data analysis, and the time series we’re working with – with relatively low (5 second) sampling rates and dubious relationship to underlying harmonic functions – don’t lend themselves to traditional signal processing techniques. We easily generate millions of unique time series per day, which confounds existing time series storage systems.
At a high level, the data flow through our Dashboard visualization system is pretty straightforward. The top half of the diagram below illustrates the pipeline we use for ingesting data. Pepperdata components running on each Hadoop node feed data we’ve gathered into OpenTSDB, an open source time series database that stores data in HBase.
When querying the time series data, our Dashboard servlet pulls data from OpenTSDB (which reads from HBase), does a bit more processing of the raw data, and provides JSON output to javascript code in a user’s browser.
I imagine a plumbing analogy for system architecture where you have “data pressure” instead of water pressure (the internet is, after all, just a “series of tubes”). When your system is over its maximum planned capacity you’re likely to experience leaks (or OOM errors, long GC-cycles, and other random component failures). When you reinforce one component along your data pipeline you may inadvertently add more pressure on another component. Each component in the pipeline needs to be stable (and easy to debug!), but also needs to be tuned as a system – you can’t always increase capacity or change behavior on one component without causing ripples that travel throughout the system. We do a couple of things to manage the “data pressure”:
- All the nodes in a cluster write data to one or more OpenTSDB daemons. If one TSD can’t handle the load, you can add more. But this puts more pressure on HBase, which may result in failure of its RegionServer, or of the HDFS DataNodes that store its data.
- On the query side, we have a Javascript layer (running in a user’s browser) that talks to a servlet layer, that in turn talks to OpenTSDB. We reduce pressure on OpenTSDB by splitting queries at the servlet layer and parallelizing them to multiple TSDs. But then the servlet must store the sliced responses while it reassembles them, and may quickly run into memory issues.
- Offloading some of the processing from the servlet to Javascript client might help, but that results in more data over the network, and time-consuming data operations on the client result in slow rendering and an unpleasant user experience.
There are plenty of opportunities for clever ways to optimize this system and improve performance in the data visualization dashboard (yet another source of the interesting problems that make my job fun):
- OpenTSDB does a good job generating HBase row keys, but maybe we can customize it for better performance with our data.
- We can pre-compute some data before inserting into OpenTSDB, though this requires careful balancing of the improved query response against the additional resources used at ingestion time.
- We can cache some query data, although the cache expiration policy gets tricky because it’s hard to know when data from all nodes is complete (in a 500-node cluster, it’s entirely likely that one or more nodes have hiccups.)
…and there are a few others we’ve thought of and many others we haven’t yet thought of. (Suggestions are welcome!)
So far I’ve just talked about the plumbing for handling high data volume. But manipulating time-series data has peculiarities as well. We need to downsample and aggregate our data. For example:
When we want to show a graph to help a cluster admin determine which user hogged CPU time over the past 12 hours, we must aggregate all the individual task time series into series representing the users who started the jobs which spawned the tasks. To fit 12 hours of data into a 400 pixel wide chart we need to downsample the data (12 hours @ 5 seconds per sample = 8640 samples).
…but this turns out to be non-trivial — our time series data doesn’t play nicely with traditional signal processing tools like Fourier analysis, and aggregation is a lot more than just executing a SQL “group by.” OpenTSDB describes some of the challenges on their site, but our situation with millions of task series starting and stopping every day makes the problem even more complex.
Because downsampled aggregated data does not equal the aggregate of the downsampled data, this really deserves (and will soon get!) its own blog post…