When running M3DB, it is important to monitor the cluster’s health to understand how queries are performing and if the cluster requires additional resources to handle the volume of data being written to it. Some key M3DB health metrics include memory utilization of each node in the cluster and query latencies for how long queries are taking.
These metrics revealed a pattern of the database reliably and periodically suffering from inflated memory and deteriorated query latencies.
One example below shows how memory usage for nodes in a cluster would spike up by 20-60% every 2 hours. This spiky behavior is problematic because it requires we provision these nodes with enough memory to support the peaks, even though the majority of the time the nodes require much less. That means we’re paying for memory that most of the time we aren’t utilizing – and we only pay for this to prevent running out of memory during these peak intervals.
Additionally, these periodic memory spikes were exactly correlated with spikes in query latencies.
The curiously periodic nature of these performance degradations led our team to investigate what operations within M3DB were to blame, and ultimately resolve this behavior to achieve more consistent memory utilization and query speeds.
Why was it that every 2 hours we were seeing these performance degradations? Given this cadence, we theorized that the cause, and verified via heap profiles, was a process in the database referred to as a block rotation, which happens every 2 hours. To understand what a block rotation is, first we must understand the database’s index and the index’s blocks.
In M3DB, when a database node receives a query request, executing that query involves two steps:
// QueryIDs resolves the given query into known IDs.
QueryIDs(
ctx context.Context,
namespace ident.ID,
query index.Query,
opts index.QueryOptions,
) (index.QueryResult, error)
// ReadEncoded retrieves encoded segments for an ID.
ReadEncoded(
ctx context.Context,
namespace ident.ID,
id ident.ID,
start, end xtime.UnixNano,
) (series.BlockReaderIter, error)
M3DB’s index is used in step 1 to go from a query (e.g. specific tags and values) to a set of matching series IDs. For example, a query http_request{status_code=~”200|500”} could return from the index the following series results:
{__name__=http_request,status_code=200,service=a}
{__name__=http_request,status_code=200,service=b}
{__name__=http_request,status_code=500,service=a}
{__name__=http_request,status_code=500,service=b}
In order to perform this matching, the M3DB index accumulates the set of all distinct series IDs over time as data is written. This means that every new series write that occurs involves an update to the index – whereas a subsequent write for an existing series does not need to update the index since the series ID already is present and searchable for that write.
Having a single data structure on-disk that contains all series IDs and their tags for all time would be too slow to search. Series IDs / Tags are therefore stored into blocks of time. That way, a given query only must check all blocks within the query’s start/end range, limiting the search space.
For example, below shows 2 hour index blocks and how a query only must search a subset of them based on the query range.
A consequence of breaking the index up into blocks of time is that we incur a cost whenever time elapses beyond the previous latest block to a new one. This process of opening a new latest block is referred to as a block rotation.
For example, say we have the following series [A,C,D] being actively written to in the latest block.
As soon as the clock strikes 4:00pm, that incoming data now belongs in a new block, and so a new empty block is created.
Since the new block is initially empty, it very rapidly gets seeded with the distinct series that are actively being written to.
Why is it so costly, though, to populate the new empty block? In the example above, having only three active series is not much of a burden to rapidly accumulate in a new block. However, large M3DB clusters can have millions of distinct series being written at any given time – and remember that the index must contain an entry for every new distinct series. That leads to an enormous amount of initial contention on the new block to insert each of these distinct series at the same time.
Not only do the writes lead to contention and back-up on the write path, populating the new block also disrupts the read path because queries must query for series IDs from this same new block. To make matters worse, the most common time range for a query is now (e.g. Jane the engineer wishes to know the error rate happening now, or alert XYZ wants to know if a latency metric has exceeded some threshold now). Since the majority of queries need to fetch data from now, the majority of queries have to read from the new index block after it gets created.
Additionally, for improved compression and speeding up evaluation of time series that match a given query, the actual data structure within an index block is an FST (finite state transducer) (i.e. a compressed trie to search for time series IDs). To achieve this compression and fast searching, the FST requires that all series be inserted in alphanumeric order (to build the trie) and also it must be reconstructed (to recompress) on each update. The cost of updating the FST makes it non-conducive to such aggressive mutation all at once. Below we see this cost in metrics for queued-up writes to the index upon each block rotation:
For more reading on FSTs in general, see this blog post.
After understanding the problematic database operation to be the index’s block rotation, we knew we needed a fix that would (A) keep the data structured by blocks of time but also (B) pay a less severe penalty when transitioning to a new block.
Our new approach is to keep an “active” block around permanently that accepts all writes, even once a new block is opened. This way, we never have to rapidly seed a new block from empty to all active series. Instead, series are slowly accumulated over time as they are created. While this could mean that the “active” block indefinitely grows in size as new series are written, we address this by background garbage collecting old series that no longer are being written to.
The full active index block code change can be seen in this PR in the M3 open source repo.
To compare this change with the previous block rotation illustration, say we again have 3 actively written series [A,C,D] – but this time we also have the active block present.
Once time reaches 4:00pm, we again need to open a new block – however, writes continue to go into the active block instead of the new latest one.
The new latest block will eventually accumulate the relevant data by time, but it can happen in the background and more slowly, reducing the peak memory required.
To prevent indefinite growth, series that are no longer being written to will be removed from the active block in the background. In the previous example, since series [A,B,C,D] are in the active block, but only series [A,C,D] are being actively written to, series B will eventually be removed from the active block.
Now that all writes are directed to the active block, it is important that we also use this block for reads. This way, we can ensure results are present for series whose IDs are in the active block but not yet in their time-based block.
So … did it work? Happily we see dramatic improvements to the M3DB memory and query metrics that revealed this performance issue.
Here we see the memory transition of a cluster from before and after the active block change. Before the change, there is reliable memory spikiness every 2 hours, but afterwards we see steady memory.
* The blue window indicates the period during which the change was deployed.
Additionally, we no longer see periodic degraded query latencies every 2 hours.
* The yellow line indicates the point of time when the change was deployed.
This performance improvement goes to show how important it is to monitor health metrics of systems! This issue was impacting users’ query experience – but only every so often. Someone could run a query at 4:00pm, and it could take well above average – but then run again a minute later and everything is fine. These types of issues are extremely difficult to be made aware of from anecdotal user reports – high-level tracking of memory and latencies is a much more systematic way of keeping an eye on these user-impacting issues.
I gave a breakout session at this year’s KubeCon North America titled “Stream vs. Batch: Leveraging M3 and Thanos for Real-Time Aggregation.” This blog is a recap of the topics and concepts discussed during the session. Visit our KubeCon North America events page for the full session recording.
With monitoring workflows aimed at minimizing time to detect incidents, having real-time insights is critical for maintaining reliable cloud-native applications. But monitoring business-critical applications can become difficult at scale. How do you continue processing large volumes of real time data while maintaining valuable insights? This is where aggregation can help!
Taking the example given during the presentation, when querying a high cardinality metric such as CPU usage, query time can take up to 20 seconds to complete as it’s fetching 60,000 time series across all pods and labels.
In most metrics monitoring use cases, however, you don’t need the view metrics at the per pod or label level, and an aggregate view is sufficient for understanding how your system is performing at a high level. Continuing with the above example, if you aggregate on only two labels (e.g. container name and namespace) by pre-computing the sum at one minute intervals, the query results become real-time (0.4 seconds) with roughly 200 time series.
Understanding the value of aggregation for query performance and real-time results, it’s also important to know the two primary approaches to metrics aggregation – stream and batch.
M3 and Thanos each have their own approaches to stream and batch aggregation, both of which are based on how Prometheus performs aggregation via recording rules. Prometheus recording rules allow for pre-computing of frequently needed and/or expensive queries before then storing back the aggregate metrics to a TSDB. They execute and pre-compute as a single process in memory at regular intervals making them especially useful for dashboards. With large scale metrics monitoring, however, you will typically outgrow a single Prometheus instance and turn to a Prometheus remote storage solution like M3, Thanos, and Cortex.
M3 is an open source metrics engine comprised of four main components:
M3’s approach to aggregation uses roll-up rules, which aggregate across multiple time series at regular intervals using the M3 coordinator and, in some use cases, the M3 aggregator. Before writing the newly aggregated series to M3DB, the M3 coordinator will reconstitute the series as a counter, histogram, or gauge metric — all of which are compatible with PromQL (check out our blog on the primary types of Prometheus metrics for more information). With aggregation done in-memory upon the ingest path, the aggregated metrics are immediately available for query with M3.
Similar to M3, Thanos is an open source metrics monitoring solution. It has several main components, including:
With the Thanos sidecar setup, Prometheus metrics are scraped and stored inside each Prometheus instance. From there, the Thanos query tier pulls data from the instances via the sidecars before aggregating and deduplicating the metrics. Once these metrics have been processed inside the querier, the query results are available for display inside your dashboards (e.g. Grafana). However, for larger scale queries, especially those needed on a regular basis, the querier can be informed by the ruler to execute Prometheus recording rules on the collected metrics. Once the rules have been evaluated, the ruler will send the newly aggregated time series to Thanos object store (e.g. S3) for query and/or longer term storage.
Let’s now take a look at the various benefits and tradeoffs of these two approaches – streaming with M3 and batch with Thanos – and how they compare to one another:
M3 Pros: With M3, metrics are aggregated in-memory on the ingest path making them immediately available for query. Additionally, with roll up rules, only the aggregated metrics need to be persisted to M3DB and all other raw data can be dropped. By alleviating the query requirements for M3DB, you are able to scale to a higher number of alerts and recording rules.
M3 Cons: In terms of trade offs, M3 aggregation is more complex to operate and deploy compared to Thanos. It also does not support arbitrary PromQL, but instead reconstitutes the aggregate metrics as counters, histograms, and gauges.
Thanos Pros: Compared to M3, Thanos is more simple to operate, especially when scaling resources up or down. It is also fully PromQL compatible allowing for arbitrary PromQL queries and aggregation via Prometheus recording rules.
Thanos Cons: In terms of trade offs, Thanos aggregation adds an additional step when compared to streaming aggregation, as you need to re-query, read, and then write metrics to storage over the network. This can lead to large resource consumption, as well as slow queries. With aggregation performed against the query tier, larger scale queries may also take a while to request metrics from each Prometheus instance, and in some cases, will lead to skipped metrics by missing the intervals set by recording rules.
Focusing on M3 and Thanos, we’re able to compare some of the major benefits and tradeoffs of using stream and batch processing for large scale metrics monitoring. If interested in learning more, check out the full session recording or visit M3 and Thanos documentation.
Chronosphere is the only observability platform that puts you back in control by taming rampant data growth and cloud-native complexity, delivering increased business confidence. To learn more, request a demo.
We recently published a post detailing how Chronosphere validates releases, what we learnt from the process, and what we needed to fill the gaps we identified. This post digs deeper into what we developed to fill those gaps, a tool called “Query Compare,” or “querycmp” for short. Querycmp solves the problems outlined by emulating a real environment performing under significant real-world write and query load, with the capability to spin up as a load testing suite for custom builds.
TL;DR, querycmp runs a real-world suite of queries against both a real environment and a mirrored environment (an environment cloned from the real one, with matching components and resource capacities, and receiving the same write traffic), then compares results to make sure they match.
There are three phases for the typical lifecycle of a querycmp validation run:
The setup for querycmp involves bringing up an environment provisioned with the same resources as the dogfooding meta environment and then dual-writing to both environments to ensure they contain the same set of data. Provisioning a mirrored environment makes heavy use of the M3DB Operator, to deploy an entire suite of M3 components, leveraging Kubernetes to create a fresh stack and check it’s healthy. After the stack is healthy, our LaunchDarkly integration notifies a service called “reflector”. This service receives forwarded write traffic from a real environment and proxies that traffic to any number of configured target environments.
At this point, tests wait for a specified amount of time before kicking off a querycmp run. This delay is necessary because there needs to be enough metrics written to the test environment, in terms of volume and time range, before we can reasonably expect all query results between meta and the test environment to match. After the wait period, it’s time to run querycmp.
As querycmp starts, it pulls a collated list of queries from a cloud blob store (a Google Cloud Storage bucket). The list consists of the scraped set of all alert and dashboard queries used in the meta environment and a set of user-defined queries for applications that our dashboards rarely use. For example, we don’t use many dashboards for predictions or forecasting, so functions like holt_winters()
don’t see much use and are added to the list. As automated testing is a case of “the more, the merrier,” we plan to pull ad-hoc queries used by engineers when inspecting data and automatically deduplicate and populate them into the user-defined query list.
Now the comparison logic starts. Each query in the list is concurrently issued against the meta and test environments, using the same start/end/step arguments, and then the results are compared in a point-wise fashion to ensure they match. After the comparison, we increment a metric based on the result:
{__name__!=""}
, is artificially limited to a smaller set. Since this limiting doesn’t always happen in a deterministic fashion, we can’t confidently say that running it on meta and the tested environment would yield the same results, so instead we upload a summary file featuring the reason for skipping to a “Skipped” bucket.This comparison logic runs against every query as an instant query using the instant query endpoint. After these comparisons are complete, we go through the query list again, but instead execute them as range queries, with particular logic for selecting query start and end times that allow the selecting of an appropriate time range containing a complete set of data (Read more in the deep dive section), emitting similar metrics and summary files with an indication that the query_range
endpoint specifically generated them.
Once the query range comparisons have been completed, comparisons against metadata query endpoints start, which are queries for finding available label names and values. The comparisons run differently from query endpoints, as there is no list of “queries,” but instead, we generate a list of queries to run from the results themselves:
label_names
query runs on the tested and meta environments to get a list of all valid label names.After completing all label_values
comparisons, a single “run” ends, and further behavior depends on how the command was triggered. In most common test situations, another run begins from the execution phase. This extends the valid range allowable for comparisons, as we can be sure that the time the first run started (i.e. the time at which there were enough valid metrics for comparisons), will be valid for all subsequent runs, since metrics emission remains enabled.
As mentioned above, querycmp emits metrics to indicate progress. After all, M3 is a metrics store, and when you have a metrics store, you tend to want to deal with metrics. We compile results from mismatched runs and upload them to a cloud storage bucket with one-week retention for manual retrieval and inspection. We rely on querying the emitted metrics to calculate a running match rate to see how closely the new build tracks against the current stable version.
Getting the testing process running has a few moving parts:
We monitor the metrics querycmp emits to ensure it runs as expected and doesn’t cause mismatches during the whole process. To coordinate this, we use Temporal, the same tool used for scenario testing, to compose these steps as distinct workflows. Temporal makes the querycmp cycle repeatable, automated, and perfect for including in scenario tests, which already rely on Temporal and run continuously against release candidates.
A continuous parallel verification check runs alongside the querycmp process, which ensures there are no mismatches, and that there aren’t too many errors which could indicate a broken build. If this test fails, we label the entire scenario run as failed and signal it as such, flagging it for manual review to find the root cause of the error. Continuously running comparisons in this manner is useful for ensuring no read/write degradations occur when performing the complex actions modeled by scenario tests, such as adding a database node to an active cluster.
On top of querycmp is CLI tooling (called “droidcli”) for starting the testing process by running something like the following command:
droidcli querycmp start --environment rc_01
This helped foster confidence in development builds, especially for long-lived branches and complicated changes that were previously painful to test, like the index active block refactor work.
A tweak to the core flow gives the option to run querycmp as a testbed for query performance under load. We turn off comparisons entirely and instead run only against the test environment when running in this mode.There are some additional levers to tweak, for example, the concurrency at which to send queries, simulating periods of extreme load. Running in this mode has helped expose and reproduce issues with query degradation and cases where some queries are allowed to run faster than others arbitrarily, which are difficult to simulate. Unfortunately, it’s not realistic to keep comparison verifications running in this mode, as it would thrash the test environment and our meta environment.
How long do we wait until there’s enough data to compare results confidently?
Familiarity with Prometheus staleness mechanics helps answer this.
In summary, a point in a given series is valid for 5 minutes after it’s emitted, until another write to this series arrives within the 5-minute window. Any query, even a lookup like foo{bar= "baz"}
, needs to have at least 5 minutes of data to account for any sparse series emitted just under 5 minutes before the query start.
Any queries that act on range vectors, e.g. sum_over_time(foo{bar= "baz"}[10m])
don’t follow these staleness semantics and instead need data that covers the entire range for the matrix. In the example above, this is 10 minutes before the query start. Subqueries complicate this further by requiring an extra range of data on top which services the underlying query.
To account for this complication, we delay for 5 minutes after write forwarding is ready to ensure there’s enough data to service the majority of instant queries. We can calculate the total limit for sufficient data to service all queries that don’t have explicit ranges or have range selectors (over time functions) with ranges less than 5 minutes. We use Prometheus’ parser Inspect command, which parses a query string into a traversable tree, allowing calculation of the maximum time range required for the query. We skip queries with range selectors that exceed this standard wait until enough time has passed from startTime to inspect them. In later runs, we run more queries as they become valid. This is fine in practice, as the 5m window services over 90% of queries.
Since we mirror writes to the tested environment from meta, we can’t be sure that any query with a “now” endTime
has all points available in both environments due to non-deterministic propagation delays. These delays can arise from network degradation between m3reflector and the test environment, a heightened datapoint ingestion delay under heavy load, or any other number of distributed system issues. Because of this, we introduce an artificial delay for query endTime
’s, to allow enough time for all queried datapoints to be present in both environments. Choosing an appropriate period took trial and error. Too short results in mismatches, which are hard to diagnose since inspecting them even a few seconds after the reported mismatch shows data that does match. Too long delays the time to launch of querycmp, since we must wait 5 minutes from startTime
to account for staleness, until selected liveness delay from endTime
to be sure of a valid comparison window. This delay, in turn, increases the duration of a testing cycle without providing any additional verification. Ninety seconds turned out to be a fine balance between concerns.
Running querycmp as validation for new builds, we saw persistent failures in label name and value comparisons. These mismatches were difficult to diagnose because the data showed that these labels didn’t exist in the query period on the tested environment, which is expected for a mismatch, but they were also missing from the same range in the meta environment. After a lot of detective work, we discovered the reason, the architecture of M3’s index, and data blocks.
The following summary is a simplified version of the actual issue, for more information on index and data blocks, read the M3 architecture documentation.
At a high level, M3 operates in terms of index and data blocks, where index blocks contain a list of label name/value pairs emitted while that block was active, with an offset pointing into the appropriate data block for the same period. The data block contains compressed timeseries data for that series, which means that a series can “exist” in the index block for a given query time range but not return any results, as any actual datapoints may exist outside of the queried range. This caused issues in querycmp due to index lookups handling label lookup queries exclusively and not touching data. This in turn led to situations where some sparsely written series had labels added during the “spin-up” period of the tested environment, where those writes had not yet been forwarded. This period was outside the valid query range, so wasn’t included in any query comparisons but still yielded those results for label lookups, since it was still in the same index block. To get around this issue for any mismatched labels, for example, label_foo
, that doesn’t exist in the tested environment, we run a data query count({label_foo!=""})
against meta, to ensure that the data does exist, and was forwarded.
This post covered a deep dive into the query comparator tool and testing in general at Chronosphere, continuing from part 1 of this series. Future posts around these topics will look more closely at how Chronosphere uses Temporal and more detail on the active index block changes touched upon briefly in this post.