Hello! In a previous blog post, Kevan Dunsmore wrote about the portion of the new Lumos architecture that is built using the Lambda Architecture pattern, and I’d like to dive a bit deeper into what that looks like. This article will go over what tools we use for aggregating all the app load data we get into trends and active user counts, how we implement those solutions, and what kinds of things go wrong. However, the same basic structure is being used for the aggregation of both crashes and handled exceptions.

The Tools We Use: A Brief Introduction to Spark and Lambda

We use Apache Spark for this step in our pipeline for a number of reasons such as how well it parallelizes work, how well it connects to our other components (Cassandra for our production database and Kafka for queuing between services, in particular), as well as its streaming capabilities. And while I like all of these things, my favorite thing about using Spark is that we write Spark jobs in Scala, which is a very neat language with the ability to natively use all of our Java internal libraries. It also happens to be the language Spark itself is written in, and we find that the support for Scala is excellent.

Spark is a fairly generic data processing engine. There are a ton of things it can be used for, including machine learning, but for now on my team we use it for two simple things:
a) Processing large amounts of Object A into Object B at reasonable speed, and
b) Processing small amounts of Object A into Object B at ridiculous speed.

The first use is our batch jobs, which run once per day and aggregate all the data from the previous day into counts and summaries that are easier for the API to fetch, and easier for the UI to use. The latter use case is streaming, which runs continuously and grabs data from kafka in much smaller batches.

The goal of the streaming layer is speed. If the batch job hasn’t run for the day yet, the batch job hasn’t seen any of the data for the day yet. Without streaming, our data would always be one day behind. We wouldn’t be able to have live updating graphs without streaming, either. But the tradeoff for availability is missing the rare crashes that come in much later than when they occurred. “Offline” events or events that are received “way out of band” would not be captured. For example, if someone is out playing Pokémon Go in the wilderness with no cell signal, and comes back that evening with a load of data for us, streaming is going to ignore it. Streaming has better things to do, like live crash counts on a new app version to see if it needs to be rolled back.

Batch, on the other hand, is much slower, more accurate, and more resilient. The daily batch jobs will pull all the data from the past 30 days, so our mountaineering Pokémon Go friend will be counted. These jobs take several hours to go from end to end, but they can do things that streaming cannot. For example, monthly active users (MAU) is an important metric for measuring app usage. Constantly streaming calculations of unique users over 30 days would be incredibly expensive, and since the data for 30 days won’t change much from one minute to the next, daily batches get us that data often enough, and at a reasonable cost. (We do generate today’s MAU through a mix of streaming and batch, but it is a temporary value until the batch runs again, so we can be as accurate as possible.) Batch will also recover exceptionally well from failures, which I will talk about in the next big section.

Put together, this means almost all data will go through streaming, and all data that falls within our recalculation window will go through batch (the edge case being data from a phone someone was using for over a month without any cell signal). Both layers put data into our production Cassandra database, where the API can decide which layer to grab data from depending on the time window that was requested. For example, a request for data from now until last week will require pulling data from streaming for today, and from several batch jobs to fill in last week.

Moving from Spark 1.6 to 2.0

When we started this project we realized pretty quickly that this was a prime chance to write new Spark jobs that took advantage of new features in Spark 2.0. Many of our Spark jobs used to be written in 1.6, and a disadvantage of that was that for every schema Spark had to read from .avro files, we had to compile another set of Java classes for Spark to use. Not hard, but a bit tedious.

Spark 2.0 still allows this, but can be a bit smarter if you ask it to. So long as your schema is simple enough, you can write a short case class naming the properties it has, and that’s your Spark job’s schema. There are encoders for most if not all primitive Scala types (I only needed int, String, and a byte array), which do most of the hard work converting from our .avro files. These encoders produce Datasets instead of DataFrames for the Spark job to work with, which give us a nice boost in processing speed in addition to other benefits.

How It Fails

Spark isn’t perfect, and neither are we. Sometimes, things fail. But we’ve put a lot of safeties in place to make sure a failure in Spark is always recoverable.

First of all, since streaming is about what is happening right now, if we ever have to restart the streaming layer, it picks up again at the current time, reading messages from the head of our Kafka topics. Batch gets its data from the same part of the pipeline that streaming does, so if a streaming job fails, the data that it should have processed is recovered automatically in the batch layer.

The streaming layer is never run again for an earlier time window, which is called ‘backfilling.’ The batch layer, on the other hand, should be backfilled for any days it misses. Our writes to Cassandra are done so that new data will overwrite old data, so if there was a batch job that failed halfway through, we’d have fresh data from the backfill.

Most importantly, though, the batch layer does not read directly from Kafka topics like streaming does. The Kafka topic that feeds streaming also dumps all of its data into Amazon’s S3, which batch then reads from. Thus the data from Kafka will be persisted at all times, whether the batch runs on time or not. Batch will also write back to S3 at various points during execution so that when we calculate hourly counts, we can use them later to expedite the calculation of daily counts.

The practical upshot of all of this is that even if either layer fails, no data is actually lost. Data will stop showing up on the UI for as long as it takes to re-run the batch layer or restart the streaming layer, but it will still exist in our platform.

During development, I also found some really interesting ways Spark jobs in general can fail, and they require a slightly deeper dive into how Spark does what it does.

The Small Files Problem

This problem (or at least a close relative of it) apparently originates in HDFS and is also called the “Hadoop Small Files Problem,” which essentially describes a preference for reading from a few large files rather than a bunch of small files. A lot of people have encountered this problem, in a lot of different contexts. Our context was that while we were able to read all the files without running out of space, something was happening when we started testing the batch job against a full thirty days of real data in production.

Scale is something we need to care a lot about at Apteligent. In conjunction with being cost effective, one of the reasons we do this batch job once daily is because batch jobs are the longest jobs we run on a regular basis. As of writing, the batch job for aggregating app load counts is the smallest batch job we have, and it takes two and a half hours from start to finish. During that time it spends a lot of its resources reading and writing to S3.

So we give Spark the batch job, and Spark reads files from S3 onto its cluster of machines. But when it reads these files, it also gives each of these files its own data partition. So when Spark begins actual processing, it will process each data partition as a separate task, and merge those results when it absolutely has to. This would make a lot of sense if there were only, say, 2000 files. 2000 tasks is actually a perfectly acceptable number to Spark, which is one of the reasons we use it. It will multitask by allocating a bit of time to each task until all of them are complete.

But I had given Spark two hundred files for every hour for thirty days: 144,000 files. Spark is significantly less happy with a hundred thousand tasks.

[Spark does a lot of divide and conquer, as shown above. Summarized, the driver controls nodes that run executors that run tasks that are performed on exactly one data partition, and the number of data partitions is decided by how many partitions were made by the previous step of the job. In our case, the previous step was “read 144,000 files.”]

The first symptoms we saw were signs that the garbage collection on the cluster’s nodes was thrashing; it was spending so much time setting up to process data that by the time it was ready to do actual work, the executor was switching to working on the next task for its allotted time, and no actual work was getting done. Credit where credit is due, I finally found help on this Stack Overflow page, where someone suggested using Spark’s .coalesce() method to reduce the number of data partitions Spark would work with. I told it to reduce from 144,000 data partitions to 2,000, and it’s handled the files just fine ever since.

Repartition vs Coalesce

I promised myself to try to write this post in plain English, but I do want to make this more technical side note. Spark has two methods (that I know of) for changing the number of data partitions in a set of data (helpfully called a Dataset). These methods are .repartition() and .coalesce(). While they can do similar things, the way they go about it is very different. The aptly-named repartition will actually shuffle data around and make new data partitions. There is no guarantee after the repartition that any of the partitions will have any of the data they had before the repartition. Since it will shuffle all of the data, repartition can take a long time and use a lot of resources moving data around.

Coalesce is a lazy repartition for when you want to reduce the number of partitions, preferably to a number that easily divides your initial number of partitions. For example, if you have ten partitions, and you want five, coalesce is going to quietly tell Spark that each pair of original partitions is actually just one big partition, and Spark will happily accept this as truth and treat it as one partition going forward. If you’re careful about your factors, coalesce will incur no shuffling at all, and run very, very quickly. So going from 144,000 down to 2,000 with .coalesce() was both fast and made the next step of computing much easier.

Other Problems

There were other problems we encountered while writing these Spark jobs, but a lot of those existed between my keyboard and my chair and were easily solved with a bit of head scratching and Google searching. The only other big one was dependency management, where the fact that the code for the jobs wasn’t compiling the exact same way each time was causing unreliable builds, and that was solved exclusively with the Maven Enforcer plugin and a lot of handwritten exclusions in the pom file. I’m not proud of how much extra mess ended up in that file to specify exactly which versions of everything to use, but now the job very reliably builds the exact same way each time, so I’m willing to take that for now.

Most of the jobs are built with Maven, but one of the most recent jobs we made was built with The Scala Build Tool (SBT), which has promising tools for dependency management that we hope to take advantage of in the future. From what we’ve seen so far, the Scala Build Tool seems to work better with Scala than the Java-based Maven. Surprise!

Spark and Reading/Writing to/from Disk

I don’t think I will ever get over just how much disk read/writes affect how long these jobs take. We do processing and database writing in parallel, so while the data is being processed in the next stage, the previously completed stage is being written to our Cassandra database. So there’s a lot of overlap of processing and writing that lets the whole batch job run in only two and a half hours.

Currently, the first hour is the part that reads those 144,000 files I mentioned earlier, and writes the first chunk of data to S3 that the rest of the job uses. That part consists of ten minutes of reading from S3, thirty-four minutes of processing, and another sixteen of writing to S3. So about forty percent of that part is just reading and writing all the files from S3, and that part must finish for the rest of the batch to run. So there’s a little under half an hour, or twenty percent of the total run time, at the beginning of the batch that is just disk I/O. And it’s a good thing we can run the write-to-Cassandra parts while the next stage of data processing is running, because each Cassandra write is as long as or longer than the data processing. Cassandra could actually be written to much faster, but since we have the time to do it a bit more slowly, we limit how fast Spark can write in case Cassandra happens to be under fairly high load already.

That’s About It

I’ll wrap up here for this post, though there’s a lot more Spark to be had. Feel free to ask questions or share your experiences with Spark in the comment section!