App Engine, meet Redis on AWS

Since snappy performance is critical to providing a good user experience, we try to keep the latency of all common Pulse backend API requests under 500ms. Most of the time we achieve this by using Google App Engine’s memcache to cache all data which might be reused by many requests. Less commonly requested data is pulled from the datastore, resulting in such requests taking a bit longer than we like.

When these slower requests are rare, we accept them. However, for features that access a broad range of data, the likelihood of missing the cache increases. Some data required for a request may be cached, but some will almost always not be, resulting in high latency for most requests.

To implement these types of features efficiently, one option is to dramatically increase the size of our memcache. This would allow us to keep all required data in cache. However, it would be expensive and is somewhat at odds with the LRU cache policy we like to use for other features. This approach is also currently unsupported on Google App Engine (since memcache capacity is not directly tunable).

We investigated several other options and finally settled on using Redis as a persistent, in-memory, datastore. Redis strikes a great balance between simplicity, powerful primitives, and proven stability. Instead of increasing our memcache or switching entirely to a larger in-memory store, we created a second Redis-based system on AWS. This system is specifically designed to hold data which is important to have available at in-memory speeds (with no expected misses). Achieving this is more expensive than providing a similar LRU cache (which could be smaller), so we reserve it specifically for features that require such guarantees.

Architecture

We wanted to use Redis, but also to make sure that our implementation was both scalable and easily recoverable in the case of failure. From here on out, we will discuss the infrastructure and tools we use to build this system. Here’s a visual overview of the system:

 

Amazon Elastic Load Balancer

This is a really nice utility that AWS gives us. We setup an ELB that points to as many EC2 machines as we need, and for each of those machines (we’ll call them redis frontends), we get automatic round-robin balancing and it will also detect failing machines, give us a warning, and transfer the load to the running machines. Some important dos:

  1. The load balancer can deal with https requests, so use them! Some security is always better than none.
  2. You should make sure that the machines you provide to the load balancer are distributed among the different regions that AWS offers.
  3. You can also use dynamic scaling by putting dynamic instances into a group and giving the group to the load balancer.


HA Proxy

Our redis frontend machines use Tornado as the webserver. Tornado is fast (great!) and single threaded. Single threaded prevents many headaches, scales predictably and has minimal overhead, but doesn’t benefit from multiple cores on a machine. The larger Amazon machines have multiple cores, so we really want to use that to our advantage. Enter HA Proxy, a nice utility that allows you to build an reverse proxy. Here’s a barebone version of the configuration we use:

global
maxconn 1024
daemon
log 127.0.0.1 local0
frontend load_balancer
# We process all requests hitting port 8080
bind *:8080
# We will point them to the backend we describe later
default_backend tornado_servers
mode http
option httplog
option dontlognull
clitimeout 20000
backend tornado_servers
# The balancing strategy
balance roundrobin
# The tornado servers, in this case, the machine has 4 cores
server tornado_1 127.0.0.1:13371 check rise 2 fall 5
server tornado_1 127.0.0.1:13372 check rise 2 fall 5
server tornado_1 127.0.0.1:13373 check rise 2 fall 5
server tornado_1 127.0.0.1:13374 check rise 2 fall 5
retries 1
mode http
contimeout 5000
srvtimeout 20000
# We also get stats from HA Proxy about our tornado servers
stats enable
stats uri /lb?stats

Tornado Frontends

Each of these Tornado instances provides a thin python api layer. The implementation is both simplistic and very specific to our own use-cases. I won’t go into the specific details, but the frontend takes care of all of the security and implements the internal API we provide to our client teams. Certain general tasks like deserialization, error handling, and batching requests before hitting the backend were also very important. We run enough instances to match the number of cores on the machine and they all rely on the sharded redis interface to actually access the data.

Sharded Redis Interface

This is based heavily off of redis-py by Andy McCurdy, so many thanks to him. You can take a look at https://github.com/andymccurdy/redis-py/

The thing we needed to add was the ability to split our data amongst several different machines. Andy is working on a general solution for this called cluster redis, but we opted to go with something simpler in the meantime.

The first thing was to implement the actual sharding, something like:

def find_shard(key):
hash_value = some_consistent_hash_function(key)
return hash_value % num_machines

With that little snippet, it was pretty easy to send operations to a wrapper class of StrictRedis (look at redis-py), and just have all the tornado frontends behave as if there was a single machine serving the data. This works as long as you don’t want to use pipelines.

However, it turns out that you really do want to use pipelines. Whenever you have multiple requests that you can send out at the same time, a pipeline will save you all the roundtrip time of single requests. Without pipelines, it doesn’t matter how blazingly fast redis is, you are stuck on network i/o latency.

Getting pipelines to work is a little bit more involved. Now when a request comes in on a pipeline, we index it by the order it came in and store that tied to the individual machine pipeline we created. An example with two machines:

command1 key1 value1 (key1 -> machine 1)
command2 key2 value2 (key2 -> machine 2)
command3 key3 value3 (key3 -> machine 1)
command4 key4 value4 (key4 -> machine 1)

We will remember it like this:
Pipeline index for machine 1:
[1, 3, 4]
Pipeline for machine 1 will contain:
command1 key1 value1
command3 key3 value3
command4 key4 value4
Pipeline index for machine 2:
[2]
Pipeline for machine 2 will contain:
command2 key2 value2

Now when we execute all the pipelines, we will be able to reconstitute the return values in the order they came in to the sharded_redis interface. With solutions to both the sharding and pipelines, we now have an interface that hides the fact that we actually need multiple machines to serve all the data. Notice that since each tornado frontend uses the interface independently we need to update them synchronously when we make changes!

Redis Backend

Here are a few tips for setting up redis:

  1. Use a password, and make it a long password
  2. Set a memory limit and a reasonable policy to deal with exceeding max memory
  3. Change your machine overcommit_memory setting to 1
    sysctl -w vm.overcommit_memory=1
  4. Don’t run anything except redis on this machine
  5. If you are using AOF files and backup machines (recommended), don’t bother with persistence on the master! Instead, make sure you have an agressive fsync policy (everysec works) for the slave.
For those who want the “why” behind each of the tips:
  1. From Redis Documentation:

    The password is set by the system administrator in clear text inside the redis.conf file. It should be long enough to prevent brute force attacks for two reasons:

    • Redis is very fast at serving queries. Many passwords per second can be tested by an external client.
    • The Redis password is stored inside the redis.conf file and inside the client configuration, so it does not need to be remembered by the system administrator, and thus it can be very long.

    The goal of the authentication layer is to optionally provide a layer of redundancy. If firewalling or any other system implemented to protect Redis from external attackers fail, an external client will still not be able to access the Redis instance without knowledge of the authentication password.

    Note: The AUTH command, like every other Redis command, is sent unencrypted, so it does not protect against an attacker that has enough access to the network to perform eavesdropping.

  2. We actually monitor the machine memory usage as well as the redis memory usage to shard our redis backend more as needed. Even so, its safer to set a reasonable limit of memory that redis should use so that we don’t have a scenario where redis uses all available memory on a machine and then crashes.
  3. From Redis Documentation:

    Redis background saving schema relies on the copy-on-write semantic of fork in modern operating systems: Redis forks (creates a child process) that is an exact copy of the parent. The child process dumps the DB on disk and finally exits. In theory the child should use as much memory as the parent being a copy, but actually thanks to the copy-on-write semantic implemented by most modern operating systems the parent and child process will share the common memory pages. A page will be duplicated only when it changes in the child or in the parent. Since in theory all the pages may change while the child process is saving, Linux can’t tell in advance how much memory the child will take, so if the overcommit_memory setting is set to zero fork will fail unless there is as much free RAM as required to really duplicate all the parent memory pages, with the result that if you have a Redis dataset of 3 GB and just 2 GB of free memory it will fail.

    Setting overcommit_memory to 1 says Linux to relax and perform the fork in a more optimistic allocation fashion, and this is indeed what you want for Redis.

  4. Because of the large memory footprint we expect redis to use and the fact that we have to use an optimistic memory allocation setting, running anything else that might use up a lot of memory on the same machine can lead to failures.
  5. This is a optimization to make sure the master Redis instance does not bottleneck because of disk writes. The work associated with persistence is offloaded as much as possible to a backup machine That being said, its important that the slave/backup machine is robust.

Backup

This is simply a second machine running Redis that is set as a slave to the master Redis instance. In AWS, remember to use internal ip addresses when setting this up, since it saves you money. Backups are a must when you are running redis in production for several reasons:

  1. It’s a backup! If your machine in front goes down, you fail over to the backup as you try to fix the first machine. More often than not, you can actually just promote the backup and setup a new backup when you are running on AWS.
  2. If you ever need to expand the number of machines used for serving, you can just promote your backup to a serving machine and set up new backups for both machines. I would be remiss not to mention that you do have to then go through both machines to delete the extra keys later, or else you really won’t have expanded your memory limit.
  3. You can run data analytics on the backup without affecting the all important performance of the actual serving machine.

Scaling to 10M on AWS

This post complements the recent article about Pulse on the Amazon Web Services Blog.

As Pulse crosses the 10M user mark (up 10x since last year), we’d like to share a bit more about how we’ve built and scaled Pulse’s backend systems. In this article we will discuss the important role AWS plays in our infrastructure.

Today there are more infrastructure choices than ever. They include running your own hardware, leasing virtual machines, subscribing to higher level platforms and software services, and often a combination of all of the above. It is important to consider the trade-offs and choose the right tool for the job. In our experience, AWS provides an exceptional capability to build systems as close to the metal as you like, while still avoiding the burden and inelasticity of owning your own hardware. It also provides some useful abstraction layers and services above the machine level.

Event Logging

Amazon’s Elastic Compute Cloud (Amazon EC2) instances make it easy to run low level processes that can write directly to disk, and its Amazon Simple Storage System (Amazon S3) provides great long-term file storage. This combination makes an excellent choice for most flat-file logging systems. At Pulse, we’ve built a simple logging system that is blazingly fast on one machine and easy to scale horizontally. Using Tornado to handle HTTP requests and Scribe to buffer and write files, we are able to store logs at near-disk speeds (more than 50 MB/s per instance). Once the logs have been written to disk, we regularly move them to Amazon S3 for reliable long-term storage and easy access. Amazon S3′s low cost and scalable nature allows us to save all of our data without worrying too much about size.

By provisioning one of Elastic Load Balancer (ELB) instances, we are able to easily divide our load over as many logging servers as necessary and automatically direct load away from failing machines. Provisioning these machines in multiple AWS availability zones also makes it easy to achieve fault tolerance.

Pulse’s implementation easily handles millions of events per hour and has been running continuously for over a year without any downtime.

Data Analytics

Another major reason we decided to build our event logging system on Amazon S3 was to leverage Amazon Elastic MapReduce  and Apache Hive. Now that our data is getting bigger, it is much more efficient to query with a cluster of machines. Without having to configure and maintain our own Hadoop cluster or having to move our data from Amazon S3, AWS allows us to quickly spin up a cluster of 10s to 100s of machines.

With a large cluster, we are able to query a significant portion of our data in minutes instead of hours or days. Because the AWS cluster can simply be turned off when we are done, the cost to run big queries is usually quite reasonable. Consider a cluster of 100 m1.large machines. A set of queries that takes 45 minutes to run on this cluster would cost us $11 – $34 (depending on whether we bid on spot instances or use regular on-demand instances). Assuming you’re not running jobs all the time, this is preferable to the cost of buying and continuously maintaining your own cluster.

Apache Hive makes this process even easier by taking simple SQL queries and converting them into what would often be relatively complex, multi-step Amazon Elastic MapReduce jobs. These SQL queries can be run directly by our business team, avoiding the need for engineering support.

For batch jobs, such as regularly extracting the top read and shared stories, the Pulse backend team likes to use mrjob, an open source framework developed at Yelp. Mrjob allows us to write mappers and reducers in Python (instead of Java) and integrates seamlessly with Amazon Elastic MapReduce. Python is our language of choice because it is more consistent with our codebase and it provides a simple representation for common MapReduce data structures such as tuples and dictionaries. Because our jobs are usually IO-bound, the interpreted runtime doesn’t slow things down much.

Recommendations

Beyond curating our top story feeds, we’ve recently started developing several exciting new user-facing features using Amazon Elastic MapReduce, mrjob, and our data on Amazon S3. As part of our last major release, we announced a new feature called Smart Dock, which recommends new sources to millions of users based on their reading history. This feature makes it much easier to discover relevant content and has been extremely well received by our users. Our newest full-time backend engineer, Leonard Wei, led this project and built it almost entirely on AWS.

Our recommendations pipeline processes over 250GB of the raw log data we have in Amazon S3. We reduce this data down to about 1GB of relevant features via an Amazon Elastic MapReduce job. We then use an LDA-based approach to predict which sources a user is likely to add next. We run this portion of the pipeline on AWS using a single High-CPU Extra Large instance.

Once the model is generated for each user and some additional post-processing is complete, we upload each user’s recommended sources to our serving infrastructure on App Engine. From there, the recommendations are combined with the latest catalog data and sent to the app to be presented in the Smart Dock. One run of the whole pipeline costs us a very reasonable $20 of AWS compute time.

Other Tasks

Beyond event logging, analytics and recommendations, we also use AWS for lots of smaller tasks that just make sense to run directly on one or more machines, rather than through a higher level service. Some examples include parsing html pages with node-readability and continuously monitoring all of our systems to make sure we’re aware of any problems. Recently, we also started working on a new real-time analytics infrastructure based on Redis, which will leverage the High-Memory instances Amazon EC2 offers.

To learn more about Pulse’s infrastructure check out some of the backend team’s other posts. Our recent article on how we scaled up for the Kindle Fire launch compliments this one and talks more about our content serving, client APIs and Pulse.me web hosting.

 

Using Data Analysis to Discover Top Stories

For the last six months, Pulse has curated Top Stories feeds to help users discover interesting content when it is most relevant. Top Stories has been one of our most popular features, receiving millions of update requests per day. To help start off our new engineering blog, I’ll share some of the technical details behind how we build these feeds. Yesterday we also announced the ability to follow our Top Stories by category via Twitter. 

We start building the Top Stories feeds by analyzing anonymous data on what stories are currently being read and shared the most through Pulse. Since the volume of data coming from Pulse is always growing, we’ve built several highly scalable systems for collection and analysis.  Most of these systems are currently built on Amazon Web Services (AWS).

Collecting Data

The first system is built on Amazon’s Elastic Compute Cloud (EC2) and Simple Storage Service (S3).  It uses Tornado and Scribe to efficiently log data from HTTP requests to disk.  This data is then moved to S3 on a regular basis for permanent storage and analysis.  All data is stored in two separate S3 buckets for additional reliability. To make the system horizontally scalable, we replicate the Tornado/Scribe stack on multiple EC2 instances and use an Elastic Load Balancer (ELB) to distribute our request load across these instances. The ELB also provides a heartbeat service which automatically redirects requests away from any malfunctioning instances.

Analyzing with Hadoop

Once we have our data on S3, it is important for us to be able to extract breaking stories quickly.  To achieve this on our rapidly growing dataset, we built our analysis engine on top of Elastic Map Reduce (EMR) and Hadoop. Using a set of relatively simple mappers and reducers based on the mrjob python framework from Yelp, we are able to quickly scan through hours or days worth of data to surface top read and shared stories. We regularly run these Hadoop jobs directly against our data on S3. To improve performance, this data could be staged on Elastic Block Storage (EBS) first, but in practice we haven’t found this necessary. Since our MapReduce operations are very parallelizable, we can get faster results simply by spinning up a few extra EMR instances. The Amazon EMR interface makes this process extremely simple and cost effective in comparison to running a dedicated Hadoop cluster on EC2 or our own hardware.

Serving Feeds

In contrast to our data collection and analysis infrastructure, most of our feed serving and client api request handling is built on Google AppEngine (GAE).  When our EMR jobs finish, they push the final top stories for each of our categories to the datastore on GAE. Here we achieve horizontal scalability via GAE’s automatic web servers provisioning based on the current request load, as well as it’s solid non-relational datastore and memcache.

This data is then normalized against the current ratio of reading to sharing in the given category and used to create a ranked feed of the current top stories that quickly surfaces breaking news.  The feeds for each category are then memcached and efficiently served to users via the Pulse application on each mobile platform.

Check out our post on the Pulse Blog to follow our Top Stories on Twitter!