Scale it to Billions — What They Don’t Tell you in the Cassandra README

At Threat Stack our engineering and operations teams have embraced the concept of the polyglot data platform, recognizing that no one solution can provide for all of our needs. Those needs include rapid scaling, ideally linearly, to support growing customer demand and the elastic workloads of our new economy customers. We also require different forms of analysis to support stream analysis for our IDS feature set, efficient lookup tables and prematerialized views for our ETDR feature set, and offline analysis for analysis and research.

A core component of our data platform for several years has been Cassandra, which we upgraded to Datastax Enterprise (DSE) through their start up program last year. Originally we were expecting to use it as our single source of truth for all of our time series data, but this turned out to be an anti pattern. Instead we have found it very useful for look up tables and pre-materialized views (more on this later).

Our initial Cassandra cluster was three i2.2xlarge nodes spread across 3 Availability Zones in AWS, but within a few short months we had expanded to over thirty Cassandra nodes. At the rate at which we were taking in new data, our overall scaling strategy looked similar to this:

 

 

The team did have extensive experience with other eventually consistent databases though, so with some advice from Datastax, Cassandra has now become one of the most stable and pleasurable components of our polyglot data platform to work with.

This content has been a long time coming, so we definitely ran longer than a generally accepted blog post might. We decided to ironically embrace the chaos by providing you with a table of contents:

  1. Quick Tips
  2. Monitoring Gotcha and Quirks
  3. AWS Instance Type and Disk Configuration
  4. Drowning Your Cluster with Multi-Tenant Tables
  5. Streaming Failures, JVM Tuning, and GC Woes
  6. Schema design, or These Aren’t the Rows You’re Looking For
  7. Only Have 2 to 3 Seed Nodes per Data Center
  8. Conclusion

1. Quick Tips

Some of these are well known, but are worth repeating.

  • Batch writes by partition key, then by batch size (5120 bytes, the Cassandra warn threshold).
  • Don’t under staff Cassandra. This is hard as a start up, but recognize going in that it could require 1 to 2 FTEs as you ramp up, maybe more depending on how quickly you scale up. While we are able to see the benefits of this investment on the other side, it was an uncomfortable place to be in as we were also scaling up customer acquisition. 
  • Don’t adopt new versions too quickly. Minor version bumps have been known to cause havoc and new features only work roughly as expected the first time (DTCS is a great example of this). We regularly run new versions in our development environment for a month or two before promoting to production. 
  • Paradoxically, don’t fall too far behind with new versions. Sadly we have friends who are still stuck on Cassandra 1.x because of their Thrift usage. We light a candle for them. 
  • Many smaller nodes beat out fewer larger nodes. Not only does this make your dynamo quorum math more resilient to per-server failure, which is important because AWS will decommission a node in your cluster, but it decreases the mean time to recovery because you can stream data across more nodes at a cluster to achieve higher aggregate data transfer rates. 
  • Budget days to bring a node into the cluster. If you’ve vertically scaled, then it will take over a week. Regardless you will want to uncap streaming and compaction throughput with nodetool.  You’ll take a slight performance hit, but it’s worth it to finish node streaming in a reasonable time. This is another good reason why you want to run more smaller sized instances with less data per system. 
  • Don’t use hinted handoffs (ANY or LOCAL_ANY quorum). In fact, just disable them in the configuration. It’s too easy to lose data during a prolonged outage or load spike, and if a node went down because of the load spike you’re just going to pass the problem around the ring, eventually taking multiple or all nodes down. We never experienced this on Cassandra, but have on other systems that supported hinted handoffs.

2. Monitoring Gotcha and Quirks

Datastax graciously offers usage of Opscenter and the included agent for both open source Cassandra and DSE.  The setup and deployment of this is incredibly simple, and when you are a high growth start up, time is a premium.  Opscenter acted as our initial metrics source and gave us pretty high quality metrics early in our scaling time line.  Because it was easy to setup and deploy the opscenter collector agents we were able to get meaningful data without spending a lot of time on it.  

A few months after we launched our service, new user sign ups increased our data ingestion by an order of magnitude within just 2 weeks.  We were over 20 Cassandra nodes by this point and ingesting a few TB per day.  Our original way of handling multi-tenancy was at the table level, essentially storing each customer’s data in their own table  (we talk more below about why this was a bad idea).  But as our customer count exploded we were creating more and more tables (we had a few per each customer).  By default the opscenter agent will collect data for all your tables to allow for some pretty awesome granular metrics capture. But when you have so many tables it turns into a lot of metrics. Which brings us to our first gotcha.

Don’t use the same Cassandra cluster to store Opscenter metrics and application data.

Now – the problem is that currently using a separate cluster for metrics collection with opscenter is an Enterprise only option.  If you are lucky enough to be using DSE then here is how you can set it up.

If you don’t have DSE you could potentially put the Opscenter data in a keyspace on nodes in a separate datacenter.  I haven’t tried that so YMMV.  For us we didn’t want to lose out on potentially valuable metrics so we sent the metrics to a separate Cassandra cluster.  The graph below of the cluster load average shows a pretty large drop when we essentially stop DDOSing our cluster with our own metrics.

 


If you don’t have DSE, you can (and should) disable collection on specific keyspaces and column families.

But at this point the value of opscenter goes down quite a bit when you start nerfing functionality and metrics you’re collecting.

Metrics Collection

A month after launch when we had cluster issues we quickly found that Opscenter was great when the cluster was running optimally, but when the cluster has issues your visibility goes down to zero. We are a happy Librato customer and soon starting collecting additional metrics using the Sensu community plugin for Cassandra.  It’s a very basic way to collect metrics by parsing nodetool status output, and didn’t require us messing around with the JMX console.

Over the last few months we have built an internal high resolution metrics cluster and we wanted higher resolution metrics for Cassandra as well, so for that we moved metrics collection from Sensu over to collectd using the JMX plugin.  This has worked pretty well for us and SignalFX has an example of how you might set that up.

Additionally, Cassandra does have a pluggable metrics reporting since 2.0.0 – you can use that in order to send metrics to your local metrics cluster.  Assuming this works well we’ll likely be moving over to this to push metrics to graphite vs pulling them.

Metrics that you should care about

This is where the most learning about Cassandra really took place.  There are a TON of metrics available via JMX and if you can afford the storage of those metrics I’m a fan of capturing as many as possible at as high a resolution as is feasible for your environment.  

You can use the JConsole in order to navigate around to see what kind of metrics exist. There is some more info on the Datastax site.

Additionally there is some great info on the Cassandra wiki describing some of the metrics you can capture.

This is one blog post in particular that was helpful for me in the beginning of our Cassandra usage.

3. AWS Instance Type and Disk Configuration

The original cluster we launched on in October 2014 was built on i2.2xlarge servers (8 vCPUs, 61GB of RAM, and 2 x 800GB SSDs). While the local SSDs were a treat, especially when you had to force a hard compaction or stream data, it wasn’t cost efficient.

Earlier this spring AWS launched its D2 lines of instances, which were specifically targeted at our situation of needing a large amount of locally attached storage at a low price point, without sacrificing CPU and RAM density. We rebuilt the cluster in place by replacing i2.2xlarge’s with d2.2xlarge’s, giving us the same CPU and RAM but with 6 x 2TB spinning disks. This was a time intensive process as we waited for all the data to stream, but was effectively cheaper than running two whole clusters in parallel.

The other impactful change that we made was dedicating a single spinning disk to the commit log. Every operation that touches the disk sped up as a result of this. The remaining 5 disks are striped together, giving us 10TB usable or 5TB effective disk space (leaving 50% of the disk for SSTable compaction). If we were to fill the effective disk space of 5TB then node replacement would likely take days or weeks, which is too long. As a result we are deciding whether or not to double the size of the cluster on d2.xlarge’s.

4. Drowning Your Cluster with Multi-Tenant Tables

The entire Threat Stack platform is multi tenant, including the storage. Our initial implementation of tenancy on Cassandra was at the table level, allowing for logical tracking of access and growth rates per customer.

However, we found that handling more than a few hundred tables was outside of Cassandra’s capabilities. Compactions would regularly fail, the Datastax read repair service never completed, and the JVM was spending far too much time tracking state. This problem scaled rapidly as we added customers because the number of tables was equal to the number of customers multiplied by the number of table types.

Luckily we already had agent IDs (sensors) in the partition key which have a one-to-one relationship to organization IDs (tenants), so it was trivial to collapse down to multi tenant tables.

The other benefit was that we no longer had to dynamically create new tables. This was a painful and error prone process, which if performed incorrectly could crash Cassandra if too many duplicate CREATE TABLE statements were issued.

5. Streaming Failures, JVM Tuning, and GC Woes

*obligatory disclaimer about JVM tuning and other dark arts

When you are under a state of high growth, adding nodes quickly becomes critical.  When nodes fail to stream the stress level goes up about the same rate as your total available disk goes down. As our node count grew and the amount of writes per second increased, we reached a point where a few default settings caused us to be unable to stream new nodes. Prior to this point new nodes would take about 10-15 hours to fully join the cluster. Mainly because of our (incorrect) usage of per-account column families we were streaming about 70,000 SSTables to new nodes.  Part of the streaming issue was related to that, but it was compounded by some of the default settings in our Cassandra cluster.

Your HEAP_NEW_SIZE is probably too low

There were old recommendations that said 100m per CPU up to 800m total.  For us, this was not enough as the real reason nodes were unable to fully complete bootstrap and stream from other nodes was because we were hitting long GC pauses that actually timed out the stream from one or two nodes.  The most painful part is when you have (for example) 10 or 15 stream sessions going, and ONE fails, then the whole thing failed and you need to start over again.  CASSANDRA-8150 is still one of the best tickets to review that has a bunch of additional JVM options as well, as well as newer tuning option for G1GC.

You can also monitor for stream failures by grabbing the Stream Session ID in the cassandra system logs, then searching for that ID in all your logs.  If there is a failure, it will happen very quickly and you don’t have to wait a day for what will eventually be a failed bootstrapping/streaming session.

Increase memtable_flush_writers when you have lots of data to stream

The way it was described to us was that since we were streaming a lot of data from many nodes we wanted to increase the number of flush writers because the streams were all hitting the memtables. If you do not have enough writers to deal with a (larger than normal) amount of data hitting them that can cause your streams to fail.  The recommendation we were given was to set this equal to the number of CPUs on the node and it’s worked well for us.

streaming_socket_timeout_in_ms should be set to a non-zero number

Finally – we ran into this fun issues described in CASSANDRA-8472 where stream sessions would hang and not timeout. This was after we made the changes to the heap memory amount and we’re chalking this up to odd AWS networking across AZ’s.

Luckily this issue is now fixed and upgrading will solve it, but if you haven’t yet (or can’t) upgrade, then set your streaming_socket_timeout_in_ms to some non-zero number.  We set this to 1 hour, which is now coincidentally the default in the now accepted patch in CASSANDRA-8611.

6. Schema design, or These Aren’t the Rows You’re Looking For

The more applications we bring up on top of Cassandra, the more we find it being well suited for models that are either wide and shallow or narrow and deep. It is very similar to CouchDB’s incremental map/reduce in this way.

A lot of Threat Stack’s UI requires building relationships of different types of events, which can be difficult when the underlying data model is an insert only, partially ordered, write ahead log. For example, when you view a process’s details we have to collect all of the syscall, network, TTY, login/logout, and FIM information for that single process invoke and all of its historical runs. Additionally, you could be entering that process’s context from any of those data points, not just the process start. Therefore we maintain multiple lookup tables, tracking a process’s state as it changes in milliseconds or over years (the longest running process we’ve seen is from 2011).

Some examples of building wide and shallow models are sparse matrices. We have less examples of this in production today, instead driving specific widgets off pre materialized views, but plan to migrate some of the most popular lookups to this model.

7. Only Have 2 to 3 Seed Nodes per Data Center

Back in February we were seeing very odd behavior that was preventing a node from bootstrapping. This was a big concern because we were having to scale our cluster for disk utilization reasons, so time was against us.

The short story is that we were hitting the bug documented in CASSANDRA-5836, where seed nodes were prevented from bootstrapping. The root cause for our cluster was that we were setting every node as a seed node, which is not the expected gossip configuration.

We have since moved to one seed node per rack (AWS availability zone). This is technically one more than the Datastax recommended count, but gives us cleaner failure scenarios if an AZ becomes unavailable or splits from the rest of the cluster. Additionally, Datastax’s documentation has been improved to now let you know that you should not make every node a seed node.

From their docs:

Attention: In multiple data-center clusters, the seed list should include at least one node from each data center (replication group). More than a single seed node per data center is recommended for fault tolerance. Otherwise, gossip has to communicate with another data center when bootstrapping a node. Making every node a seed node is not recommended because of increased maintenance and reduced gossip performance. Gossip optimization is not critical, but it is recommended to use a small seed list (approximately three nodes per data center).

http://docs.datastax.com/en/cassandra/2.0/cassandra/operations/ops_add_node_to_cluster_t.html

It might seem fine to make every node a seed node when you only have 3 nodes in a few different AZ’s, but if your configuration management is setup to make every node a seed node, and you add more nodes you’ll soon experience similar problems as us.  Our advice, take the extra time to configure specific seed nodes in your cluster, and when you add new nodes ensure they are not seeds.

8. Conclusion

Scaling any distributed database is always going to be a challenge, and Cassandra was no different.  Like most technologies, learning how things work is the first step towards making the right choices when it comes to increasing the overall footprint of your cluster.  Following basic system administration principals of making small changes over time, while monitoring the result of those changes can take you a long way toward scaling while maintaining availability and data integrity.

We continue to ingest billions of unique events daily into our platform and Cassandra continues to be one of the most stable parts of our platform.  That is mainly because we’ve invested time and energy to learn from the community about operating it properly. If this kind of data scaling challenge sounds interesting to you, check out our jobs page, or reach out to use directly to hear about current open positions.

 

Author’s note: This post was co-authored by Pete Cheslock, Senior Director of Ops and Support at Threat Stack, who’s support was instrumental in getting this post put together.