[rfd116] covers telemetry provided by the Oxide Rack and various ways we expect to use this data. This RFD seeks to take the next step of categorizing telemetry, identifying specific requirements for that data, and identifying existing building blocks we might use to implement these requirements. Our eventual goal (beyond this RFD) is to make some determinations on architecture for collecting, storing, and using telemetry in the rack. The initial sections cover our uses of telemetry and the requirements of our desired system for storing and querying telemetry data. The latter sections, especially the building blocks, is a relatively accurate historical record of the analysis of possible technologies that might underlie our telemetry system.
Use cases and requirements
The use cases below describe the broad kinds of things we want to do with telemetry. Not all telemetry needs to support all of these use cases — see below for details.
Graphs, other visuals, and API access | We can think of this as "telemetry for its own sake", where the telemetry itself (or a visualization derived from it) is exposed directly to developers or operators. |
---|---|
Alerting | We want to notify customers when particular events happen or when metrics exceed or fall below some configurable thresholds. |
Automated response: synchronous | These events require an immediate automatic response from the system in the context where the event was generated. A typical example is an uncorrectable DIMM error. |
Automated response: asynchronous but immediate | These events require an automatic response not in the context where the event was generated, but where timeliness affects system availability. An example might be a determination that a server has failed. This might require that the control plane identify resources running on that server and migrate them elsewhere. |
Automated response: asynchronous and not immediate | These events require an automatic response but not within any tight time bound. An example might be that when the long-term utilization of a server has exceeded some threshold, then some workloads might be moved elsewhere. |
Aiding active support cases | Telemetry will be useful for Oxide support to resolve open cases. |
Product iteration | Telemetry will be used to inform Oxide’s development of the product. This includes identifying components with unexpectedly high failure rates (for proactive root cause analysis and mitigation), developing diagnosis engines for better detecting hardware failures, and prioritizing feature development based on usage. (See also [rfd94].) |
This is just one way to slice the use cases. There are even higher-level functions like "Capacity Planning" and "Determining if the system is meeting expectations". We bucket these into one or more of the above categories. For example, capacity planning involves "graphs, visuals, and API access" and "alerting".
The use cases above impose certain requirements on the systems that collect, store, and support querying telemetry:
Real-time vs. historical (or both) | Some system functions need real-time data (e.g., alerting on system unavailability). Some functions need historical data (e.g., capacity planning). Some functions probably need both (e.g., delivering virtual CPU utilization graphs to developers). |
---|---|
High availability storage and querying | In principle, we’d like all data to be highly available; however, this requirement may be stricter for some functions than others. Alerting on system unavailability absolutely needs to be HA. Capacity planning may not need HA. |
Horizontal scalability | In principle, we’d like all data to be stored in a horizontally scalable way. This is most critical for telemetry where either the data itself or its access scales proportionally to factors outside of our control (e.g., number of user instances or volume of user requests). This is least critical for data whose size and access patterns we can reliably estimate up front (e.g., hardware error counters used only for support cases). |
Precise measurements vs. trends | For alerting, it’s useful for the system to calculate trends using approximation and extrapolation. Concretely, an operator that wants to alert on CPU utilization above some threshold should not have to specify how to handle edge cases like data for a particular time bucket being missing. For many other use cases, it’s better if the system provides precise reporting of exactly the data that’s available, even if the result is much more complex for the consumer. |
These requirements will be useful for identifying building blocks we can use for telemetry data.
Summarizing these two, here’s a rough idea of what requirements we might expect to need for the use cases above:
Use case | Real-time? | Historical? | H/A? | Horizontal scale? | Trends? | Precise? |
---|---|---|---|---|---|---|
Graphs, other visuals, and API access | Yes | Some | Yes? | Yes | Yes | No |
Alerting | Yes | Some | Yes | No? | Yes | No |
Automated response: synchronous | Yes | No | No | No | No | Yes |
Automated response: asynchronous but immediate | Yes | No | Yes | No? | Yes? | No? |
Automated response: asynchronous and not immediate | Yes | Some | No? | No? | Yes | No |
Aiding active support cases | Some | Some | No? | No | No | Yes |
Product iteration | No | Yes | No | No? | Yes | Yes |
There are several other considerations worth mentioning.
Other considerations
Event-based vs. metric-based
Telemetry often describes events that happen in the system. These events can also be aggregated into metrics (specifically, counters) that describe how many times the event happened. Examples:
Event-based | Metric-based | Example metadata |
---|---|---|
|
| the service (an id, maybe with IP address, port), the specific RPC, the user that made the RPC |
|
| the server (maybe: fleet, AZ, cluster, rack, server), slot, type of error, etc. |
|
| the server (maybe: fleet, AZ, cluster, rack, server), NIC, IP address, next layer protocol, etc. |
Both events and metrics have their advantages: events provide details that are
often necessary to debug broken systems. Metrics are far more scalable and
enable people to talk about trends and define SLOs. It’s possible to compute
metrics based on an event stream: one could always compute
myservice.nrpcs_done
from a stream of the myservice.rpc_done
events
generated by the RPC server. You can’t go the other way around.
In a sense, most behaviors begin as events and the question is where the lossy conversion to metrics happens (if at all). At the application level, systems like OpenTelemetry (or OpenTracing) preserve the event representation all the way to query time. They collect an event stream from an application, store that, and compute any needed summary metrics. On the other hand, programs that expose Prometheus endpoints have computed summaries (and thrown out the event data) before the data leaves the program.
DTrace directly exposes events. (DTrace probes are essentially points in the system that can generate an event.) DTrace makes it very easy to summarize events (often by counting them) at the point of instrumentation. In this way, DTrace lets the user decide when to aggregate them.
Regarding data collection: we propose developing a common component (i.e., Rust crate) that we can use to define events in a program and expose them as a combination of DTrace probes, a raw event stream, or aggregated metrics. This behavior can be configurable based on the program and the event. This approach allows us to build rich instrumentation into applications even when the transmission or storage of that data would generally be prohibitive. We can adjust the policy based on the anticipated data volume or other details. This technique has been used in the past to enable extra verbose debug logging to an out-of-band channel without restarting a program. This facility could potentially be used (in some form) in applications, system libraries, or potentially the kernel.
What about data storage and querying? We’ll almost certainly want some kind of event-based debug tracing for the control plane. For most other use cases, it’s not clear that it matters whether a system is event-based or metric-based. It’s worth noting that the volume of some events may be so high that we could never expect to emit events from the system under normal operation (e.g., arrival of network packets).
What about gauges? The above discussion applies to events and counters. Gauges are different. These are quantities that must be sampled to obtain any value, and the sample doesn’t tell you anything about the value at any other time. Temperature, memory usage, and firmware revision might all be modeled as gauges. You might have data for these at specific points in time, but that doesn’t tell you anything about any other times. Gauges can be modeled by saying that the measurement itself is an event. That measurement might happen on some periodic basis or because some device determined that a threshold was exceeded (e.g., a temperature sensor exceeded a particular threshold).
Predictable performance and resource utilization
In systems that store time series data, it’s common to associate key-value pairs with each item (whether a numeric quantity or an event). This can lead to an explosion in cardinality, the number of distinct key-value pairs in the system. Depending on the design of the system, this can lead to a significant increases in memory and storage used for the time series data. The CPU required to serve queries and query latency overall can be affected, too. Sometimes, once the cardinality has grown large (intentionally or otherwise), few mitigations are available to restore performance.
Many of our use cases require very predictable, real-time performance. We may define SLOs for API requests, particularly those used to render real-time graphs, that set expectations like 99% of queries will complete within a few hundred milliseconds. Other use cases (like alerting) don’t require quite so tight deadlines, but still demand good performance. At the other end of the spectrum, complex queries over hardware telemetry that are used for product iteration could be allowed to take days.
Physical resources used for time series collection, storage, and querying come from the pool of hardware made available to operators and developers. Efficiency is important to maximize value to the customer. Predictability is also important for the customer’s capacity planning. Systems that support schedules for re-aggregating data are particularly useful for historical storage.
Decoupling data collection from storage
Where possible, we will decouple the software that produces telemetry from the system that stores that telemetry. Two approaches have been discussed:
a client-side library used in software that exposes telemetry. This library would be dynamically configurable to send specific telemetry to different storage systems based on policy.
an additional service that accepts telemetry from other components in the system and makes it available to different storage systems based on policy.
Decoupling telemetry producers from consumers enables us to evolve the storage system over time with minimal impact to producers. It also makes it easier to use a combination of systems — e.g., Prometheus for a very small set of data requiring high availability for alerting and InfluxDB for more general-purpose telemetry (where richer controls over retention and resampling are valuable).
Regardless of where this functionality is implemented, we’ll likely want:
a client-side library we can use in as much of our software as possible to expose telemetry. Ideally, this library would buffer data locally to smooth over transient failures of the transport.
a mechanism for routing telemetry to the appropriate storage system
policy for controlling those routing rules, and potentially tools for observing and modifying the policy. (This would not necessarily need to be customer-facing, although it’s possible that customer actions could indirectly modify policy — e.g., if a customer adding a new alerting rule requires us to send some telemetry to somewhere else.)
Building blocks
The table below provides a summary of properties of the building blocks that have have been thoroughly investigated, but does not contain details on every possible option. See the relevant section for details.
Key for icons:
✅: Yes
❌: No
⭕️: Not applicable, not tested, or partial support
illumos FMA | Prometheus | Thanos | InfluxDB | ClickHouse | VictoriaMetrics | |
---|---|---|---|---|---|---|
Real-time | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Historical | ✅ | ✅ | ✅ | ✅ | ✅ | ✅ |
Event-based | ✅ | ❌ | ❌ | ✅ | ✅ | ✅ |
Provides metric summaries | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
Supports queries | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
H/A storage and querying | ❌ | ❌ | ❌ | ❌ | ✅ (requires ZooKeeper) | ✅ |
Horizontally scalable | ❌ | ❌ | ❌ | ❌ | ✅ | ✅ |
Precise measurements? | ✅ | ❌ | ❌ | ✅ | ✅ | ✅ |
Trending (for alerts) | ❌ | ✅ | ✅ | ✅ | ✅ | ✅ |
Predictable performance & utilization | ✅ | ⭕️ | ❌ | ❌ | ✅ | ❌ |
Can re-aggregate / downsample historical data | ❌ | ❌ | ⭕️ (caveats) | ✅ | ✅ | ✅ |
illumos FMA
illumos includes a subsystem called FMA, the Fault Management Architecture.
This is described in some detail in [rfd26]. Both kernel and userland
components of the system can generate events, which are expected to conform to
a hierarchical schema. A daemon called fmd
contains diagnosis engines that
can subscribe to event streams, make diagnoses (which are other events), and
potentially take actions like removing a device from service. FMA records these
events in a persistent log.
With illumos as the host operating system, we’ll be using FMA at the very least for synchronous response to hardware failures and probably asynchronous response to explicit hardware failures when local decisions are possible (e.g., a disk has failed). What’s less clear is what FMA’s role is for asynchronous automated response to other events and as a primary store and query backend for data generated both by FMA and other event sources.
One possible approach is to say that the operating system will continue generating telemetry with FMA, fmd will still be used for synchronous and some asynchronous automatic response within the same system, that FMA’s events (or summarized metrics) are forwarded to a more distributed telemetry system for further analysis, canonical storage, and later querying by the rest of the system. (That system could even be implemented with fmd.)
Summary: FMA is event-based, does not support historical querying (except to dump the whole log), and performs vital functions beyond the scope of this RFD (e.g., taking components out of service). It’s not obviously intended to support a very high volume of events nor events not related to faults.
Prometheus
Prometheus is a widely-used system for collecting, storing, querying, and alerting for metric data. Its sweet spot is monitoring request rates and error counts from distributed service and alerting based on these values. The model is pretty simple: you define collectors, which "scrape" remote components (fetching metric data from them). You can use PromQL to query the stored data. You can configure alerts. You can also configure rules, which are essentially predefined queries that are evaluated as data arrives (generating a new metric).
There are a bunch of things that are too simple about this model:
There’s only one scrape interval for all collectors and all metrics. This is also the time interval into which metric data is bucketed. If you have some data that changes much less frequently than others, you can’t collect it less frequently or aggregate it into bigger time buckets to save on storage space. - Relatedly: it’s not possible to adjust the aggregation interval of older data.
Approximation and extrapolation are deeply baked into Prometheus’s query model. See [hai20] for details. This is helpful for the alerting use case, and some graphing use cases, but it’s often very confusing for less common events (like errors), particularly those that don’t happen at least once per second. (It’s common to have fractional rates: suppose you have 8 instances of a service that you scrape every 10 seconds. If one of them reports one error in that 10-second window, when you query for the error metric for that window, you’ll get a rate of 1/80 = 0.0125 errors per second. You can see how this happens, but it sure is confusing to see a graph showing 0.0125 errors per second from a service.)
Original versions of Prometheus are extremely resource-heavy. All data was buffered in memory, all the time. However, as of version 2.19, memory-consumption has been significantly improved. Data samples are organized hierarchically into chunks, and chunks into blocks. Only the latest ("head") block is actually in memory, the rest are on-disk. The latest chunk in the latest block is truly in-memory, the rest are backed by a memory-mapped file. This suggests that the explicit resource usage has been significantly improved in recent versions, though the degree to which the resource usage may be constrained in actual operation remains to be seen. See the repo changelog and this series of posts on the topic for details on the file format and the updates related to memory-consumption. Benchmarking, specifically comparisons with previous versions, is available here, though be aware this compares 1.x to 2.0, so their relationship to our particular use case is unclear.
There’s no built-in horizontal scalability.
Many of these deficiencies can be worked around by setting up additional Prometheus instances, some of which might pull from others. For example, you can achieve HA by having two completely independent Prometheus instances, although this presents issues such as deduplication of alerts. (That particular problem seems to generally be solved by systems like Thanos, which provide additional alerting tools that implement deduplication.) You can potentially scale horizontally by pulling different metrics into different instances, or by having tiers of Prometheus servers. (Tiers can also be used to reaggregate data with different time buckets or retention times.) But there’s no way to move metrics from one to another. This is a very manual process.
It’s also worth pointing out some major values clashes we’d likely have with the Prometheus community. There’s been a lot of controversy over the approximation and extrapolation behavior. The developers' interactions seem rather at odds with our own values of empathy, responsibility, and teamwork.
Thanos
Thanos is a separate set of components for providing additional storage capabilities to Prometheus. The main benefits of Thanos are long-term (e.g., infinite) storage of metric data, with optional components of "compacting" data and providing consistent queries across data from multiple Prometheus instances.
There are two important components and a third group of components. The Sidecar component is used for local queries and to transfer data to a long-term storage solution, most commonly cloud buckets. The Compactor coalesces data files into larger blocks, among those already transfered to buckets. Additionally, the Compactor is used to reaggregate data into a new time resolution. Importantly, this does not remove the original resolution of data, but adds a new series.
There are additional components to Thanos, which provide things like a consistent query layer on top of the data collected by one or more Prometheus instances. However, it’s important to note that Thanos itself provides no tools to manage a highly-available Prometheus cluster. To the extent it’s required, availability guarantees would need to be provided by another component (or Oxide); Thanos is designed for storage and consistent querying of such data, but not management of such a cluster.
InfluxDB
InfluxDB is a timeseries DB for collecting, storing, processing, querying and alerting on the basis of event or metric data. Data is organized into buckets (analogous to tables in a traditional DB), from which data can be queried or on which tasks can be run (scheduled queries). Data can be transformed in many ways, such as resampling, aggregation or other statistics. This processed data can be pushed into new buckets or used to generate alerts. In contrast to Prometheus, where data is pulled from remote components, Influx uses a push-based approach by default, but also supports pulling data via scrapers. (Interestingly, these are supposed to provide data in the Prometheus format, which suggests an intended interoperability.)
Influx has similar availability and scalability concerns as Prometheus. Clustered solutions are provided as paid services, with the open-source implemenation providing only a single server using the local filesystem for storage. Oxide would bear the responsibility of providing availability guarantees.
Pain points: In the process of evaluating InfluxDB, a number of small issues have cropped up. Any single point is minor, but they are worrying when taken together, and may indicate poor-quality code, rough edges, reliability or stability issues, or other problems. The main problems are documented here for reference.
InfluxDB includes an OpenAPI spec, from which one can ostensibly build client SDKs. However, the spec seems broken out of the box. For example, they provide no method of discriminating between possible return types in some API calls. Worse, there are documented, open issues of mismatches between the spec and the actual behavior of the server. This has been observed directly by snooping the requests between their provided server and CLI using Wireshark. Their own tools definitely do not conform to the API in their specification. While there may only be a few such cases, this calls the entire OpenAPI specification into question, and raises the possibility that Oxide would be responsible for implementing and maintaining a client-side library for interacting with InfluxDB.
There is a huge number of outstanding issues on the project’s GitHub page. New issues seem to be opened regularly, but there is rarely discussion or commit activity related to them.
The authorization model seems straightforward, but in practice it’s been incredibly difficult to accomplish simple tasks. For example, creating a non-root user with useful permissions has been challenging. A default, root-like user is created when a DB is instantiated. New users may be added, and different permission policies created and attached to the users. For example, a
read:user
policy is documented as:"Grants the permission to perform read actions against organization users"
. However, giving a user that policy and then hitting the API endpoint that returns information about that user returns an error due to bad authorization.The InfluxDB server has some inconsistent behavior. For example, hitting a nonexistent endpoint sometimes returns a 404 and sometimes returns a 200 with a "sitemap" of the valid endpoints in the body. Similarly, requests requiring authentication usually fail with a 403 status code, but a 401 is also possible, as is a 404. Lastly, the exact same JSON response body is formatted differently depending on the endpoint it is returned from (sometimes indent-formatted with newlines, others not).
The project seems to be in a state of transition. Version 2.0 is quite different from 1.x, but both still seem largely supported, with 1.x being the dominant version in the wild. Fixes and features are being ported between the versions, which suggests minimal code reuse. Additionally, the core database engine appears to be in the process of a rewrite in Rust, but there’s no release roadmap. These all taken together suggest a lot of duplicated engineering effort, poor coordination or leadership, and a lack of focus.
ClickHouse
ClickHouse is a column-oriented database management system designed for efficient online analytical processing (OLAP). While not strictly a timeseries database, it has been used as such due to its efficiency and architecture, and seems to stack up well in terms of performance. ClickHouse has a number of excellent features:
The documentation is extensive, if sometimes a bit opaque.
It supports many data types, and tries very hard to select efficient data structures and algorithms for each type.
All aspects of the server are highly configurable, via any combination of XML configuration files and command-line parameters. This is a good illustration of ClickHouse’s attention to detail: the server watches its configuration files for changes, and automatically reloads them and reapplies any changes to the entire cluster.
TLS and user authentication and authorization are supported.
It has an extensive set of introspection capabilities via "system tables". This stores metadata about the entire database, from cluster information to the libraries (and their licenses) against which the server is linked.
It provides native support for the MySQL wire protocol as well as a simple HTTP API. There is also at least partial, though undocumented, support for other interfaces, including postgres and gRPC. It should be noted that this is just the wire protocol. ClickHouse itself only emulates a portion of MySQL. For example, many of the standard commands such as
SHOW COLUMNS FROM my_table
are not available, nor does ClickHouse use MySQL’sinformation_schema
metadata database, opting for thesystem
database instead. This leads to lots of friction when using client libraries that actually expect a MySQL server on the other end of the line, and suggests that we may have to write stringly-typed SQL via ClickHouse’s HTTP API or build our own strongly-typed library. More importantly, a native Rust client is available.Though alerts themselves are missing from ClickHouse, the experimental
WATCH
statement provides a notification-like mechanism. An arbitrary select-statement can be used to create a "live view," which is updated whenever the results of the underlying query change. This obviates the need for polling.
ClickHouse’s most attractive feature at this point is native support for clustering, without requiring a commercial license. It uses asynchronous multi-master replication to distribute data in an available and fault-tolerant way, and supports simultaneous sharding and replication. This is a standout feature, among the options thus far considered. (Only CrateDB also implements this.)
However, the native support for clustering highlights ClickHouse’s only obvious potential drawback: clustering requires Apache ZooKeeper, which has a reputation for high management overhead. While the server itself supports extremely tight control over resource utilization (e.g, memory usage) , ZooKeeper may be more difficult to constrain. ZooKeeper is not part of the data path — the documentation clearly states this, and this is borne out by experimentation (see below). Additionally, ZooKeeper’s reputation stems from the difficulty of managing it directly. Experimentation with a small ClickHouse cluster shows that ZK is largely invisible to the client, managed entirely by the nodes of the cluster.
Note: At this point, ClickHouse seems a likely development target. Its documentation is solid; the system is highly configurable and easily introspected; it supports clustering, TLS, authentication/authorization; and it can be used with existing client-side libraries via its MySQL interface (and a native Rust client exists). See below for details on experimentation and testing.
VictoriaMetrics
VictoriaMetrics is an open-source monitoring
solution and timeseries database. Though not a fork, it is closely related to
Prometheus, supporting the same query language and providing interoperability
via the remote_write
protocol. VM also integrates directly with
Alertmanager
, the alerting component that ships with Prometheus.
VM has several attractive features. It is fast and scalable, claiming to provide both higher throughput and lower resource utilization than Prometheus; VM fixes several long-standing issues with Prometheus’s query language (e.g., the rate function); it supports a wide range of wire protocols and ingestion methods (both push and pull). Its most attractive feature is native support for clustering.
The drawbacks of VM are largely around performance, resource utilization, and its general alignment with Oxide values. See Detailed experimentation for a more complete description of the former two points — misalignment with Oxide’s values is described here.
The most prominent features of the documentation page appear to be articles or talks celebrating VictoriaMetrics, "case studies" from users who’ve successfully deployed VM. While it can certainly be useful to see the system deployed in the wild, such links would be more appropriate as addenda rather than the first section.
One of VM’s key selling points is a better compression ratio for on-disk data, compared to Prometheus (and others). However, this investigation makes clear that this is at best misleading. VictoriaMetrics appears to round floating-point values, including timestamps, on insertion — so we get better compression, of course, but our data is wrong. NaN values are also ignored without warning, apparently on the assumption that these will be ignored when plotting or aggregating data in any case.
As another strike against VM’s rigor, its implementation of several key
query-language functions appears flawed. The simplest example is the scalar
function, which is designed to return the only element of an array, or a NaN
if the array’s length is greater than 1. VM’s implementation can return a
vector! This kind of type or category error arises in several situations, as
the above blog post shows, and is indicative of a general lack of attention to
rigor and detail. (Contrast this with ClickHouse, which is extremely precise
throughout.)
For a more detailed analysis of the performance of the DB, please see below.
Elasticsearch
Elasticsearch is an open-source search and analytics engine, originally designed to support fast indexing and search of text documents. It is widely used as part of the ELK stack for collecting, analyzing and monitoring text-based log file data streams. Despite the heavy text-orientation, Elasticsearch supports a wide variety of data types. Fast text-search is enabled by inverted indices, while other data types are indexed by more appropriate structures (KD-trees for certain numeric data, for example).
Elasticsearch presents several significant advantages. It provides extremely fast search (at the storage cost of precomputed indices); its documentation is extensive and complete; availability, consistency, and clustering are part of the vanilla free and open version. There are notable downsides, however. It’s not clear how resource-intensive Elasticsearch is, though presumably storage costs will be incurred to pay for speedy search capabilities. (As a reference, Lucene, on which Elasticsearch is built, claims that text indices occupy 20-30% the size of the indexed text. While not trivial, that’s a relatively benign overhead. Note that Elaticsearch itself may entail additional overhead.) Finally, the Elasticsearch ecosystem runs on the JVM, which may mean other resource-utilization issues. On the other hand, one may strictly enforce memory limitations on the JVM at startup time.
Another downside is that, while Elasticsearch provides the core storage and search capabilities, a number of other useful features are either missing or only available in enterprise versions. For example, encrypting traffic via TLS requires an enterprise subscription. Similarly, automated alerting mechanisms are only available via subscription. (See here for the full feature-matrix.) These aspects would be built and maintained by Oxide.
CrateDB
CrateDB is an open-source SQL-compatible databased intended for large volumes of machine generated data, such as from IoT sensor networks. The database provides clustering, high-availability, replication and sharding, behind a Postgres-compatible SQL interface. The database is configurable, high performance, and supports both fixed and dynamic table schemas. CrateDB is compatible with the Postgres wire protocol, with a few exceptions, and supports a simple HTTP interface as well, which should make application development straightforward.
CrateDB also has a number of drawbacks. It is built on top of Lucene and Elasticsearch, and so presents similar drawbacks in terms of resource usage. There are also a number of features, such as user management and authentication, which are only available in the enterprise version.
Other metric-based systems
Monarch is Google’s most recently published time series system. There are many appealing aspects to it, including the data model, distribution of queries, high availability, horizontal scalability, etc., but there’s no public implementation we can use. See [jc-monarch].
Other systems that we eliminated early:
OpenTSDB: operating HBase and ZooKeeper is more complex and hands-on than we want in this context.
TimescaleDB: operating PostgreSQL is complex, and TimescaleDB relies on operators to manage PostgreSQL instances that provide whatever high availability guarantees are desired.
Event-based: pub/sub, nats.io
As mentioned above, one approach for decoupling telemetry producers from consumers is to run a service that accepts telemetry from other components in the system and applies some sort of routing policy to other components. This is a reasonable operational definition of a message bus, which suggests investigating options in this space.
One possible such system that has been floated is NATS. This is a highly-available, secure, and flexible message bus, with multi-tenancy, availability, and performance as primary design goals. The vanilla system, however, does not fit our needs. There are a few minor points, but the most important disqualifying factor is that it makes at most once delivery guarantees. The documentation explicitly states that guaranteed delivery must be implemented in the application, via acknowledgements, sequence numbers, or similar mechanisms.
The distribution includes an additional binary, however, called the streaming server, which does provide different guarantees around message delivery. Messages may be persisted to durable storage which, along with what NATS calls "durable subscriptions", would support reliable message delivery between endpoints in the face of failure on either end. The NATS documentation makes clear in several sections that the delivery is guaranateed to be at-least-once. That is, NATS core provides at-most-once and NATS streaming provides at-least-once message semantics. Applications are responsible for using these building blocks to implement exactly-once semantics.
As a final note, while NATS core supports subscribing to topics with wildcards, NATS streaming does not support this. All subscriptions must be directed to an actual topic.
The conclusion here is that the NATS streaming server may be useful, but it’s not clear. It offers a number of features that we may or may not need (authentication, encryption, multi-tenancy, rate-limiting). It seems clear this is intended to be used to communicate between many parties over untrusted networks, where logical segregation of those communication streams is required. The documentation and quickstarts often reference systems like an IoT sensor mesh. These features don’t seem relevant for our goals. NATS will likely provide some useful building blocks as a basic message bus, but will undoubtedly require lots of application code in any case.
It’s not yet clear if this would be used for metric data in addition to event-based telemetry. If only for event-based telemetry, is this necessary? The systems discussed above primarily store metric-oriented data. FMA provides rich, detailed event-based telemetry based largely on hardware telemetry. Some of this will be used for synchronous automated response on the system where the data is generated. Much of this data can be modeled as metric data and exported off the system.
Still, it may be useful to expose the original event data to the control plane. One could imagine an NVME device producing a non-critical event (or a group of them) that causes a control-plane-level diagnosis engine to move blocks from a network volume off that device and onto another one elsewhere in the rack. This approach is particularly important for issues that require data from multiple systems to diagnose.
influx-spout
The combination of InfluxDB and a message-bus of some kind is attractive. This
would support routing of messages, including duplication, to replicated
databases. While building an available system out of the database and a bus is
feasible, it’s worth mentioning that one already exists in the form of
influx-spout
. This includes
multiple Influx instances on top of which a NATS message bus sits. The bus
distributes data to the individual DBs, provides routing rules,
filtering/downsampling, and more. This system hasn’t been evaluated rigorously,
but we should definitely do so before deciding to reinvent this particular
wheel.
Rust tracing ecosystem
The Tokio project has spawned a collection of crates under the
tracing-rs heading. The ecosystem
contains a core crate which defines the concept of a span, a duration of time
within a program, and an event, which is a particular moment in time within a
span. These are generally used to define an ordering of events, particularly in
concurrent applications, and metadata associated with those spans and events.
Developers generally instrument their code with macros similar to logging (such
as event!()
or span!()
), which define when those spans are entered/exited or
an event occurs. Metadata can be attached to both spans and events, making them
quite rich, strongly-typed indicators of program state.
The most interesting thing for Oxide’s purposes is the
Collect
trait,
which provides precise control over what happens when some instrumentation is
triggered. Implementors can register their interest in a particular callsite,
based on static information such as function name, line number, and more, or on
the basis of dynamic, runtime information associated with the implementor. There
are many crates which
implement the Collector
trait in various ways, such as those which emit log
messages, send events to distributed tracing PaaS tools like honeycomb.io, and
much more.
One could imagine Oxide implementing its telemetry on top of the tracing
ecosystem. We could define collectors which include dynamically-configurable
actions such as: sending data to new endpoints; emitting log messages;
manipulating counters or other contextual data; or changing the "log level" of
the handlers. The various handlers implementing the above can be easily composed
using the related
tracing-subscriber
crate, for example. Being part of the Tokio stack, integration with async code
can be done simply.
The details of the proposed crate(s) for implementing our telemetry system
remain to be seen, however the tracing-rs
ecosystem seems to provide a large
amount of important functionality, as well as powerful and widely-used
abstractions for building custom tracing toolkits.
Detailed experimentation
This section describes the results of detailed experimentation running against the two likeliest database candidates: ClickHouse and VictoriaMetrics. We used a small tool to generate fake metric data — CPU, disk, memory and network statistics for a number of VMs, which were grouped by (VM, sled, project, team). Random data was inserted on an interval of 10 seconds, from multiple clients, to test handling of concurrent inserts.
Test environment details
Tests were run the following machine:
MacBook Pro (mid-2014) running macOS 10.15.7
2.5GHz quad-core i7, 8 logical cores
16GB memory
Apple 500GB SM0512F SSD, over PCIx2 5.0GT/s
ClickHouse server version: 21.3.1.5976
VictoriaMetrics version:
victoria-metrics-20210327-003453—dirty-e6b4b0d3
Numbers reported below focus on CPU and memory utilization. CPU is reported as a percentage, where 100% represents one logical core, for a theoretical maximum of 800% on this machine. Memory refers to the resident set size, unless otherwise noted.
Tests limiting memory resources, and the ClickHouse clustering tests, were run
using Docker Desktop for Mac, version 20.10.5 (build 55c4c88). Note that this
uses Apple’s hypervisor framework directly, there’s no other VM between the
host OS and the container. Access to CPU was not restricted, and thus 100%
represents a full logical core.
It’s not clear from Docker’s documentation what measurement the memory
utilization really refers to, but
some
articles indicate it measures RSS. The VictoriaMetrics Docker image used had ID
victoriametrics/
,
and the ClickHouse image had ID
yandex/
.
Both were running a LinuxKit kernel, version 4.19.121.
In all cases, the same client application was used — a Rust binary running directly on the host machine.
Test data
Tests were run with 10 projects, teams, and sleds, with either 10, 20, 50, or 100 VMs. 22 total measurements were generated, which are stored as 22 columns (in 4 tables) in the case of ClickHouse, or distinct timeseries in VictoriaMetrics. The total data size in these cases were: 1K - 100K rows per insertion period (10s) for ClickHouse, or 22K-2.2M total timeseries per insertion period for VictoriaMetrics. (The total amount of data is identical, only the storage details differed.) Data was inserted for one hour, during which time a range of queries was run manually, to keep the system busy and experiment with the query languages. Data types were very simple — a millisecond precision timestamp along with unsigned 32-bit integers for the remaining fields. The data we’ll store will almost certainly include other fixed-width types, as well as variable length data such as strings. These were chosen for simplicity, and to allow more direct comparison between the databases. Each uses faily complicated encoding and compression schemes, and I thought it most helpful to sidestep those details when comparing disk and memory usage.
Resource utilization: 10 VMs
ClickHouse's resource utilization was very predictable. It generally consumed about 10-50% of a CPU, and memory and disk utilization grew linearly as data was inserted. There were brief spikes of CPU utilization, up to about 75%, as queries were run and new data was inserted. This is likely due to CH’s background tasks for merging and compressing data, or in the case of some larger queries. All queries were snappy, even large (and pointless) joins across multiple tables. Using the system was subjectively "nice", as CH supports SQL, albeit with some oddities and lots of additional functions and features.
VictoriaMetrics: Resource utilization was overall similar to ClickHouse, though the pattern was markedly different. Immediately upon starting to insert data, CPU jumped to about 100-150% and memory usage to 400MiB. CPU then settled down over the first few minutes to a steady state of about 5-20% throughout the rest of the test. Memory inched up a bit higher than CH, to 500-600MiB in total by the end of the hour. Querying was also subjectively fast, and the compact expressions for historical queries are extremely nice.
Stress-testing
The behavior for 20 and 50 VMs was largely the same (though scaled up) as the
10-VM case. To effectively stress the systems, they were run with strict
resource limitations and 100 VMs in total. Containers for each database server
were run using the official images available on Docker Hub, using the command
docker run --memory 500000000
, i.e., limited to 500MB of memory. Using 100
unique VMs to generate metrics results in 100K unique instances, which equates
to 100K rows in 22 columns inserted per period for ClickHouse, or 2.2M unique
time points per period in VictoriaMetrics.
ClickHouse effectively maxes out the memory of its container, hitting about 95% immediately and staying there. The CPU was not pegged, however, and the server remained responsive to client queries. Inserts appeared to return just as quickly (though this was not rigorously confirmed), and command-line queries to the DB were similarly snappy as the unconstrained case. Overall, CH performed extremely well here.
VictoriaMetrics did not fair as well. It also maxed out the resources of its environment immediately, however, inserts were obviously much slower — the metric generator client slowed to a crawl and within a minute or two started generating the following errors:
Failed to insert disk metrics: remoteAddr: "172.17.0.1:63662"; error in "/write": cannot handle more than 16 concurrent inserts during 1m0s; possible solutions: increase `-insert.maxQueueDuration`, increase `-maxConcurrentInserts`, increase server capacity
Though it is an educated guess, this exhaustion is likely a result of memory limitations. The Docker container was given access to all cores, and CPU utilization was well-below the theoretical maximum of 800% on this machine, while memory usage was above 97% for the duration of this short test.
Clustering
Most surveyed solutions do not offer clustering, either at all or only as part of a paid plan. VictoriaMetrics and ClickHouse were the only exceptions, the main reason for comparing these two in detail.
VM’s clustering is fairly simplistic — users are required to manage the
components of the cluster themselves, including the separate storage nodes, one
or more vminsert
instances to insert data (possibly behind a load-balancer),
and one or more vmselect
instances against which queries are run. Data is
entirely replicated between the storage nodes, with no support for sharding.
Multitenancy is supported.
ClickHouse’s clustering is more full-featured. Sharding and replication are independently controlled, with any level of either aspect supported. ZooKeeper is used to control data replication, but is not involved in non-modifying queries of the data. The set of servers involved in clustering, including which shards or replicas they manage, is defined in a static configuration file. However, the actual distribution of data is defined per table, so that one may support separate sharding or replication patterns for any table.
Given the drastic performance differences above, and the fact that ClickHouse is
already the leading candidate, only ClickHouse was evaluated for its clustering
behavior. Testing was done again using Docker containers, specifically,
following the procedure described
here, which used Docker Compose to
orchestrate multiple servers. A single ZooKeeper container was run, along with 6
ClickHouse servers (3 shards, each with two replicas). The hits
table as
described in the
official clustering
guide was created, and the example dataset was broken into 1000 line chunks to
test the behavior of incremental insertions into the cluster.
The setup of CH’s clustering is very flexible. In this instance, a local table
was created on each server which contained a shard of the total data. Each of
these tables used the ReplicatedMergeTree
table engine to transparently
replicate the data between the different servers. On top of this, a
Distributed
table was created — this contains no additional data, instead
providing a view onto the local tables. Data can be inserted directly into the
local tables, and will appear in the distributed table, or vice versa. (The
former might be done if the client can better shard the data than the server,
for example.) The advantage of the distributed table is that it can be created
on each server, allowing them all to serve a consistent view of the data — no
server is special, and clients may operate against any one of them.
In general, data inserted into the distributed table appears more or less immediately on any of the servers. All queries are consistent, fast, and correct. As a more interesting test, one of the Docker containers in the cluster was paused, to simulate a network partition. The cluster continued to perform without errors in this case, both for read-only queries and when inserting new data. Queries occasionally exhibited increased latency, though this was uncommon.
After unpausing the container, its CH server immediately rejoined the cluster and began receiving any new data. Queries against the newly-restarted server occasionally returned partial results (e.g., only 300 of 1000 new rows were available), though this was very quickly resolved (less than a second), and often not seen at all.
As a final stress test, the ZooKeeper node itself was paused. The behavior here was interesting (and extremely reassuring!). Read-only queries against any node in the cluster continued to perform well, returning results quickly and accurately. Inserting new data, however, resulted in an error:
Received exception from server (version 21.3.4): Code: 242. DB::Exception: Received from clickhouse-server:9000. DB::Exception: Table is in readonly mode (zookeeper path: /clickhouse/tables/01/tutorial/hits_local).
Because CH cannot guarantee the consistency of the replicated table, it converts the table to read-only mode. Note that all queries, even those which required accessing multiple shards, continued to work. ZK is only used to consistently manage the cluster state when the underlying tables change, while the different servers communicate directly to serve read queries. Once the ZK container was unpaused, data could be inserted successfully without issue.
Overall, CH’s clustering is excellent. Though a bit tedious and error-prone to set up, the system performed extremely well: normal usage saw little to no noticeable client-side performance impact, and network partitions were gracefully handled with minimal impact on the user and zero impact on the data itself.
Determination of underlying database
At this point, we’ve decided to move forward with ClickHouse as the underlying storage technology for our metric and telemetry data. Its performance is extremely impressive, even in highly constrained environments. It is extremely flexible, supporting the full SQL standard as well as a large number of extension and custom functions. The documentation is sometimes a bit opaque (clearly written by a non-native English speaker, for example), but it is extensive and detailed. And finally, though a bit difficult to set up, ClickHouse’s support for clustering and HA is extremely robust and flexible.
Outstanding questions
A number of outstanding questions remain, especially relating to access to telemetry data for both customers and Oxide. This section attempts to constrain possible answers to these questions, in the context of using ClickHouse as the underlying storage system. No determinations have been or will be made at this time — this section simply provides notes about important open questions and avenues for tackling them using ClickHouse.
How will we expose raw data from all the components in the system that generate it? (See Event-based vs. metric-based for a partial proposal.)
This is still unclear. We’d like a Rust library that allows easy definition of metric data, and that exposes this via some network endpoint. But no details beyond the above section are clear at this time.
How will this data be collected? Pushed or pulled? Using what protocol(s)? To where?
This depends a bit on the data source, but a pull-model seems most plausible. For example, the sled agent will likely poll the hypervisor for telemetry on the virtual machines the hypervisor is currently managing, and then insert that into ClickHouse (or possibly via Nexus). (See [rfd152] for details.) On the other hand, the service processor, which has minimal buffering capacity, will likely push its data to the sled agent, which will batch and insert this into ClickHouse using some policy. In terms of protocols, ClickHouse supports HTTP as well as a lower-level TCP protocol, for which a Rust client exists.
How will this data be stored within the rack? Reaggregated to save space?
Storage is not yet known, but the approach could conceivable mirror that taken for the general control plane database. ClickHouse supports several methods for reaggregating data. Views may be created which rollup data into larger chunks. Expiration dates may be set on both individual columns and entire tables, using the TTL feature. The same feature can be used to automatically move expired (and possibly aggregated) data to either new tables or to disk.
How will this data be made available for the public API and console?
This is not clear. ClickHouse supports a SQL-like interface, as well as table-, row-, and action-level permissions. Authentication can be done using username and password or keys, and traffic may be encrypted. Furthermore, queries can be given strict resource limits on complexity, and the number of queries itself can be restricted. All this is to say that it’s plausible ClickHouse itself could be exposed to the API. On the other hand, security considerations make that unlikely — in which case these features could be leveraged along with additional constraints imposed by an interposing API provided by Nexus itself. This provides additional security and access controls, and will likely be desirable to constrain the queries that customers may run.
How will this data be made available for automated response?
This is not yet clear, but see the next bullet for related notes.
How will we alert based on this data?
ClickHouse supports materialized views, "live views" (a beefed up version of materialized views), and an experimental
WATCH
statement which enables notifications on the basis of another query. All of these may be useful when implementing alerting or other systems which rely on or benefit from notification instead of polling.
How will this data be made available to Oxide (subject to the considerations in [rfd94]) for support and product iteration?
This is unclear, and outside the scope of this RFD.
Appendix: Example telemetry data
Here’s a non-exhaustive list of telemetry provided by the rack, provided just to have concrete examples in mind:
hardware telemetry:
for basic system functioning: link speeds, presence detection
sensors: power usage, temperature, fan speeds
errors and other telemetry related to fault diagnosis (e.g., correctable errors)
performance counters
utilization
firmware and microcode revisions
Virtual network: packets, bytes, drops and other errors, with enough metadata to support desired visualizations and graphs (e.g., breakouts by IP address and/or TCP port)
Virtual block storage: I/O operations, bytes, and latency; errors; storage utilization, I/O utilization ("%busy")
Virtual CPU: utilization
Misc OS metrics: memory pressure, physical network (see virtual network), physical I/O (see virtual block storage)
Control plane services: requests, latency, errors, various events of interest (e.g., VM provisioned, VM migrated, block device attached)
Service processor: TBD
Root of trust: TBD
References
[hai20] Hai, Zaar. Making peace with Prometheus rate()".
[jc-monarch] Oxide Computer Company. Journal Cloub: Monarch. https://github.com/oxidecomputer/papers/issues/12
[rfd26] Oxide Computer Company. RFD 26 Host Operating System & Hypervisor. https://rfd.shared.oxide.computer/rfd/0026
[rfd94] Oxide Computer Company. RFD 94 Collection of Customer Data and Telemetry. https://rfd.shared.oxide.computer/rfd/0094
[rfd116] Oxide Computer Company. RFD 116 A Midsummer Night’s Metric. https://rfd.shared.oxide.computer/rfd/0116
[rfd152] Oxide Computer Company. RFD 152 Sled Agent and Propolis APIs. https://rfd.shared.oxide.computer/rfd/0152