App Engine, meet Redis on AWS

Since snappy performance is critical to providing a good user experience, we try to keep the latency of all common Pulse backend API requests under 500ms. Most of the time we achieve this by using Google App Engine’s memcache to cache all data which might be reused by many requests. Less commonly requested data is pulled from the datastore, resulting in such requests taking a bit longer than we like.

When these slower requests are rare, we accept them. However, for features that access a broad range of data, the likelihood of missing the cache increases. Some data required for a request may be cached, but some will almost always not be, resulting in high latency for most requests.

To implement these types of features efficiently, one option is to dramatically increase the size of our memcache. This would allow us to keep all required data in cache. However, it would be expensive and is somewhat at odds with the LRU cache policy we like to use for other features. This approach is also currently unsupported on Google App Engine (since memcache capacity is not directly tunable).

We investigated several other options and finally settled on using Redis as a persistent, in-memory, datastore. Redis strikes a great balance between simplicity, powerful primitives, and proven stability. Instead of increasing our memcache or switching entirely to a larger in-memory store, we created a second Redis-based system on AWS. This system is specifically designed to hold data which is important to have available at in-memory speeds (with no expected misses). Achieving this is more expensive than providing a similar LRU cache (which could be smaller), so we reserve it specifically for features that require such guarantees.

Architecture

We wanted to use Redis, but also to make sure that our implementation was both scalable and easily recoverable in the case of failure. From here on out, we will discuss the infrastructure and tools we use to build this system. Here’s a visual overview of the system:

 

Amazon Elastic Load Balancer

This is a really nice utility that AWS gives us. We setup an ELB that points to as many EC2 machines as we need, and for each of those machines (we’ll call them redis frontends), we get automatic round-robin balancing and it will also detect failing machines, give us a warning, and transfer the load to the running machines. Some important dos:

  1. The load balancer can deal with https requests, so use them! Some security is always better than none.
  2. You should make sure that the machines you provide to the load balancer are distributed among the different regions that AWS offers.
  3. You can also use dynamic scaling by putting dynamic instances into a group and giving the group to the load balancer.


HA Proxy

Our redis frontend machines use Tornado as the webserver. Tornado is fast (great!) and single threaded. Single threaded prevents many headaches, scales predictably and has minimal overhead, but doesn’t benefit from multiple cores on a machine. The larger Amazon machines have multiple cores, so we really want to use that to our advantage. Enter HA Proxy, a nice utility that allows you to build an reverse proxy. Here’s a barebone version of the configuration we use:

global
maxconn 1024
daemon
log 127.0.0.1 local0
frontend load_balancer
# We process all requests hitting port 8080
bind *:8080
# We will point them to the backend we describe later
default_backend tornado_servers
mode http
option httplog
option dontlognull
clitimeout 20000
backend tornado_servers
# The balancing strategy
balance roundrobin
# The tornado servers, in this case, the machine has 4 cores
server tornado_1 127.0.0.1:13371 check rise 2 fall 5
server tornado_1 127.0.0.1:13372 check rise 2 fall 5
server tornado_1 127.0.0.1:13373 check rise 2 fall 5
server tornado_1 127.0.0.1:13374 check rise 2 fall 5
retries 1
mode http
contimeout 5000
srvtimeout 20000
# We also get stats from HA Proxy about our tornado servers
stats enable
stats uri /lb?stats

Tornado Frontends

Each of these Tornado instances provides a thin python api layer. The implementation is both simplistic and very specific to our own use-cases. I won’t go into the specific details, but the frontend takes care of all of the security and implements the internal API we provide to our client teams. Certain general tasks like deserialization, error handling, and batching requests before hitting the backend were also very important. We run enough instances to match the number of cores on the machine and they all rely on the sharded redis interface to actually access the data.

Sharded Redis Interface

This is based heavily off of redis-py by Andy McCurdy, so many thanks to him. You can take a look at https://github.com/andymccurdy/redis-py/

The thing we needed to add was the ability to split our data amongst several different machines. Andy is working on a general solution for this called cluster redis, but we opted to go with something simpler in the meantime.

The first thing was to implement the actual sharding, something like:

def find_shard(key):
hash_value = some_consistent_hash_function(key)
return hash_value % num_machines

With that little snippet, it was pretty easy to send operations to a wrapper class of StrictRedis (look at redis-py), and just have all the tornado frontends behave as if there was a single machine serving the data. This works as long as you don’t want to use pipelines.

However, it turns out that you really do want to use pipelines. Whenever you have multiple requests that you can send out at the same time, a pipeline will save you all the roundtrip time of single requests. Without pipelines, it doesn’t matter how blazingly fast redis is, you are stuck on network i/o latency.

Getting pipelines to work is a little bit more involved. Now when a request comes in on a pipeline, we index it by the order it came in and store that tied to the individual machine pipeline we created. An example with two machines:

command1 key1 value1 (key1 -> machine 1)
command2 key2 value2 (key2 -> machine 2)
command3 key3 value3 (key3 -> machine 1)
command4 key4 value4 (key4 -> machine 1)

We will remember it like this:
Pipeline index for machine 1:
[1, 3, 4]
Pipeline for machine 1 will contain:
command1 key1 value1
command3 key3 value3
command4 key4 value4
Pipeline index for machine 2:
[2]
Pipeline for machine 2 will contain:
command2 key2 value2

Now when we execute all the pipelines, we will be able to reconstitute the return values in the order they came in to the sharded_redis interface. With solutions to both the sharding and pipelines, we now have an interface that hides the fact that we actually need multiple machines to serve all the data. Notice that since each tornado frontend uses the interface independently we need to update them synchronously when we make changes!

Redis Backend

Here are a few tips for setting up redis:

  1. Use a password, and make it a long password
  2. Set a memory limit and a reasonable policy to deal with exceeding max memory
  3. Change your machine overcommit_memory setting to 1
    sysctl -w vm.overcommit_memory=1
  4. Don’t run anything except redis on this machine
  5. If you are using AOF files and backup machines (recommended), don’t bother with persistence on the master! Instead, make sure you have an agressive fsync policy (everysec works) for the slave.
For those who want the “why” behind each of the tips:
  1. From Redis Documentation:

    The password is set by the system administrator in clear text inside the redis.conf file. It should be long enough to prevent brute force attacks for two reasons:

    • Redis is very fast at serving queries. Many passwords per second can be tested by an external client.
    • The Redis password is stored inside the redis.conf file and inside the client configuration, so it does not need to be remembered by the system administrator, and thus it can be very long.

    The goal of the authentication layer is to optionally provide a layer of redundancy. If firewalling or any other system implemented to protect Redis from external attackers fail, an external client will still not be able to access the Redis instance without knowledge of the authentication password.

    Note: The AUTH command, like every other Redis command, is sent unencrypted, so it does not protect against an attacker that has enough access to the network to perform eavesdropping.

  2. We actually monitor the machine memory usage as well as the redis memory usage to shard our redis backend more as needed. Even so, its safer to set a reasonable limit of memory that redis should use so that we don’t have a scenario where redis uses all available memory on a machine and then crashes.
  3. From Redis Documentation:

    Redis background saving schema relies on the copy-on-write semantic of fork in modern operating systems: Redis forks (creates a child process) that is an exact copy of the parent. The child process dumps the DB on disk and finally exits. In theory the child should use as much memory as the parent being a copy, but actually thanks to the copy-on-write semantic implemented by most modern operating systems the parent and child process will share the common memory pages. A page will be duplicated only when it changes in the child or in the parent. Since in theory all the pages may change while the child process is saving, Linux can’t tell in advance how much memory the child will take, so if the overcommit_memory setting is set to zero fork will fail unless there is as much free RAM as required to really duplicate all the parent memory pages, with the result that if you have a Redis dataset of 3 GB and just 2 GB of free memory it will fail.

    Setting overcommit_memory to 1 says Linux to relax and perform the fork in a more optimistic allocation fashion, and this is indeed what you want for Redis.

  4. Because of the large memory footprint we expect redis to use and the fact that we have to use an optimistic memory allocation setting, running anything else that might use up a lot of memory on the same machine can lead to failures.
  5. This is a optimization to make sure the master Redis instance does not bottleneck because of disk writes. The work associated with persistence is offloaded as much as possible to a backup machine That being said, its important that the slave/backup machine is robust.

Backup

This is simply a second machine running Redis that is set as a slave to the master Redis instance. In AWS, remember to use internal ip addresses when setting this up, since it saves you money. Backups are a must when you are running redis in production for several reasons:

  1. It’s a backup! If your machine in front goes down, you fail over to the backup as you try to fix the first machine. More often than not, you can actually just promote the backup and setup a new backup when you are running on AWS.
  2. If you ever need to expand the number of machines used for serving, you can just promote your backup to a serving machine and set up new backups for both machines. I would be remiss not to mention that you do have to then go through both machines to delete the extra keys later, or else you really won’t have expanded your memory limit.
  3. You can run data analytics on the backup without affecting the all important performance of the actual serving machine.

Backend Tips – Conquering Big Tables with MapReduce

mapreduceAs some of our readers already know, Pulse uses Google App Engine (GAE) to serve content from thousands of publishers to millions of users. We have been very happy with the minimal operational overhead App Engine requires and were thrilled to see App Engine scale without hiccups when we were preloaded on the Kindle Fire.

As a backend engineer, it is inevitable that some engineering tasks involve heavy data processing. In our case, this often happens on data in the App Engine datastore. We have always relied on the very flexible and easy remote shell to do this type of work. However, this approach is too slow for many use cases, especially those touching millions of records.

For larger tasks, App Engine’s built-in MapReduce is often the right tool. It allows us to quickly operate on millions of datastore entities in a very short amount time. To give a few examples, we use MapReduce: to quickly migrate existing data from legacy datastore models to new models due to architectural changes, to perform load testing on our system with hundreds of shards simulating millions of users, and to inform our users of Pulse’s latest updates by sending out millions of emails or push notifications.

Data Migration

When making product changes, we sometimes move large amounts of data away from a legacy django-nonrel model. The speed of MapReduce ensures that minimal transition time is required and that the transition is painless enough that it is preferred over simply living with the wrong data model.

Load Testing

We use MapReduce to simulate load tests that would otherwise be unrealistic if we only used a few physical machines. A simple load test might use MapReduce to make thousands of requests within a very short period. These requests can simulate millions of users using Pulse throughout a day.

Lessons Learned

You should plan and test any large Map Reduce task that will consume quota-limited resources before running the full job. It’s a good idea to estimate the amount of datastore reads/writes, url fetch calls, and other API requests beforehand. In some cases, it may be necessary to contact App Engine support to ask for increased quotas (for those that cannot be increased in the admin console).

For those using a framework on top of App Engine, make sure you initialize at the top of your handler file (see below). In some cases, you may also need to add the initialization code to the mapreduce module (at the top of mapreduce/main.py). In Django-nonrel, the init line you’ll need looks like this.

from djangoappengine import main

Getting Started

For those of you new to Map Reduce on App Engine, here’s how to create jobs of your own. The App Engine team has made it pretty easy.

Download the mapreduce library via svn and add it to your app:

 svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce

Register the MapReduce handler in your app.yaml:

handlers:
- url: /mapreduce(/.*)?
  script: mapreduce/main.py
  login: admin

url – The MapReduce endpoints.
script – The handler file containing the task you want to perform.
login – Restricts access to app admins only.

Create the handler file you specified in the previous step (mr_email_users.py) and pass in the model you want to map over:

def run(user_entity):
    send_email(user_entity.email)

Note: See the official Map Reduce guide below for more advanced options & examples.

Register and configure the MapReduce job in mapreduce.yaml:

mapreduce:
- name: MapReduce Email Users Job
  mapper:
    input_reader: mapreduce.input_readers.DatastoreInputReader
    handler: mr_email_users.run
    params:
    - name: entity_kind
      default: user
    - name: shard_count
      default: 50
    - name: processing_rate
      default: 1000

input_reader – The input reader for this job; you can find other types here.
handler – The entry point to this MapReduce job.
entity_kind – The datastore model being mapped over.
shard_count – The number of concurrent mapper workers to run at once.
processing_rate – The aggregated maximum number of inputs processed per second by all mappers. Can be used to avoid using up all quota, interfering with online users.

Access the MapReduce admin console panel to view and launch jobs:

http://(your app name).appspot.com/mapreduce/status

More Info

You may be interested in the official MapReduce Get Started Guide for Python or Java. In addition, this 2011 Google IO talk includes many new useful MapReduce tips. Please leave any questions and comments below, and we will be happy to answer / discuss!

Backend Tips – Google Cloud Storage

Google App Engine’s datastore meets most of our backend storage needs, but we sometimes find ourselves limited by the maximum entity size of one megabyte. One option for storing larger files is to build a separate system on top of Amazon S3. A downside of this approach, however, is that we cannot take advantage of Google’s edge cache, which acts as a free CDN.

A second option is the new Google Cloud Storage service. Google Cloud Storage is the unofficial successor to the Google App Engine Blobstore, and both services are built on the same underlying infrastructure. Yet unlike the Blobstore, which is bundled with App Engine, Google Cloud Storage is a standalone service for storing and managing data. As such, Cloud Storage is Google’s attempt to roll out an Infrastructure as a Service (IaaS) offering that can compete with Amazon S3.

Getting Started

In order to use Google Cloud Storage with App Engine, the first step is to grant your application access to your storage bucket. The documentation instructs you to add the application’s service account name (application-id@appspot.gserviceaccount.com) as a team member to your Google APIs console project.

However, since we created our project with a Google Apps account, this takes bit more effort.  Only users from our domain (xxx@yourdomain.com) could be added to the team via the console. The solution is to use the GSUtil command line tool to edit the storage bucket’s Access Control List (ACL).

Run the following command to retrieve your bucket’s current ACL: gsutil getacl gs://bucketname > acl.txt. Then add an entry that looks like this:

<Entry>
<Scope type="UserByEmail">
<EmailAddress>application-id@appspot.gserviceaccount.com</EmailAddress>
<Name>Service Account</Name>
</Scope>
<Permission>FULL_CONTROL</Permission>
</Entry>

Finally, run this command to set the new ACL: gsutil setacl acl.txt gs://bucketname.

Storing Data

Google provides an experimental API to integrate Cloud Storage with App Engine. This API allows for reading and writing of files to a storage bucket. While testing, I had already preloaded some test files into our bucket using the (barebones, but functional) Cloud Storage Manager web application. I could also have used the GSUtil tool.

Moving forward, we wanted to start loading files programmatically from within App Engine. The API documentation clearly explains how to create, write to, save, and read from Cloud Storage objects. Note that the function provided by the API to create a Google Cloud Storage object —files.gs.create() — takes a number of useful parameters. For instance, this is where you can specify the ACL and Cache-Control header for the object.

The documentation does not address the case in which the object you wish to save is a user upload. Storing uploaded files in a bucket can be accomplished using the Blobstore, as suggested by this StackOverflow answer. The blobstore_helper module is useful for adapting this code for Django.  Simply replace self.get_uploads('file') with blobstore_helper.get_uploads(request, 'file') in order to retrieve the uploaded files.

Serving Content

The Cloud Storage API does not offer a way to serve files directly from a storage bucket. Instead, you can use the Blobstore API to create a url that points at your file.

First, generate a blob key for the Cloud Storage object using the Blobstore API’s create_gs_key() function. Then serve the object as you would a traditional blobstore object. The example given for the Blobstore Python API assumes use of Google’s webapp framework, which provides helper functions (such as self.send_blob()) that obscure the underlying implementation. This makes it a little tricky to understand how to port the code to a different framework, but once again the blobstore_helper module offers some insight. The module defines its own send_blob function, in which the key line of code is response[blobstore.BLOB_KEY_HEADER] = str(blob_key). Essentially, if you put a special header in the response containing the blob key, then App Engine will automatically fill the body of the response with the content of the blob.

To properly serve the blob, it is also necessary to set a correct Content-Type header for the response. Although the Cloud Storage REST API does support retrieving an object’s metadata, it seems that the API for App Engine does not. Currently, we rely on Python’s mimetypes module, which can guess content type from a filename: response['Content-Type'] = mimetypes.guess_type(filename)[0].

An alternative approach to serving files from Cloud Storage, which applies to images only, is to use App Engine’s Image API. As of App Engine version 1.7.0, it is possible to use the get_serving_url() function with Cloud Storage objects. Simply generate the blob key as before, and plug into this function to generate a url for the image. One benefit of using this approach is that the serving url supports cropping and resizing on the fly by supplying optional parameters.

We will continue to investigate the best practices for using Google Cloud Storage with App Engine as a service for storing and serving large files. For others who might be interested, there was a helpful session at Google IO, entitled Storing Your Application’s Data in the Google Cloud, that covers the basics of this new service. Of course, there are other options to consider as well, such as the Blobstore or Amazon S3. It remains to be seen which service will best meet our needs, but we’re glad that there is now a strong option on the Google side.

Backend Tips – The Free CDN

New Blog Post Series

This is the first in a series of blog posts in which we will offer a peek into the some of the challenges we tackle on the Backend Team and discuss some tips and tricks we have discovered. These posts will focus on the ways in which we use GAE and AWS to build simple features that have helped us to deliver an amazing product. We plan to dive a little deeper into topics we’ve covered before, as well as highlighting some new ones. Upcoming topics will include GAE MapReduce, Redis, Google Cloud Storage, and duplicate detection via TF-IDF. Our first entry in the series discusses how to use Google’s edge cache as a free content delivery network (CDN).

The Free CDN

At the end of last year, we briefly mentioned Google’s edge cache as a useful feature as part of our guest post on the App Engine blog. Since this is one of our favorite services, I’d like to take a few minutes to explain it in more detail. It is an extremely simple feature that has the potential to significantly improve content serving latency and can be very valuable in terms of cost savings over other CDNs. Hopefully it will be clear by the end of this post why you should think about using it for your next project.

Content Delivery Networks

Content Delivery Networks (CDNs) offer several benefits that are typically desired for both web and mobile apps. They are designed to cache content on many geographically distributed servers, as close to the end user as possible, thereby minimizing latency for requests to the cached content. There are several major CDN providers, but the big ones that come to mind are Akamai and Amazon’s Cloudfront. CDNs vary in quality and price, but generally one should expect to pay a premium for this type of service.

Google’s Edge Cache (aka. CDN)

It turns out that if you’re using Google App Engine (or other Google services like the newly announced Google Cloud Storage) and you configure things correctly, you get the same service for free. By simply setting public cache control headers wherever possible, you allow Google’s edge caches to serve unchanged content directly to users. Here’s an example of a set of response headers that will activate the cache:

 Cache-Control: public, max-age=900, must-revalidate

The most important component of the header is the word ‘public’. It tells Google’s network that the content in this response is not specific to a particular user or private in any way, so it’s safe to cache it as aggressively as possible. ‘max-age’ allows you to decide how often this content will be refreshed from your servers, and ‘must-revalidate’ is just telling the server (or client cache) to strictly follow this timeout.

This technique has been mentioned in at least one Google IO talk, but for some reason hasn’t been widely publicized. Because of the scale of Google’s network, this is perhaps the best CDN available. Best of all, there is no cost for this caching. It’s actually a win-win for both you and Google, since it minimizes the traffic that has to cross their internal networks and servers.

At Pulse we use this feature very heavily. It lets us serve high quality, mobile optimized images at < 50ms latency, while also saving us lots of App Engine instance hours by preventing these requests from hitting our frontend servers. As you can see from the graph below, for this particular App Engine app, we are serving the majority of requests out of Google’s edge cache (labeled red). I encourage you to try it out. It’s almost too easy to be true! If you have questions, feel free to leave comments below or ping me @gregbayer.

Scaling to 10M on AWS

This post complements the recent article about Pulse on the Amazon Web Services Blog.

As Pulse crosses the 10M user mark (up 10x since last year), we’d like to share a bit more about how we’ve built and scaled Pulse’s backend systems. In this article we will discuss the important role AWS plays in our infrastructure.

Today there are more infrastructure choices than ever. They include running your own hardware, leasing virtual machines, subscribing to higher level platforms and software services, and often a combination of all of the above. It is important to consider the trade-offs and choose the right tool for the job. In our experience, AWS provides an exceptional capability to build systems as close to the metal as you like, while still avoiding the burden and inelasticity of owning your own hardware. It also provides some useful abstraction layers and services above the machine level.

Event Logging

Amazon’s Elastic Compute Cloud (Amazon EC2) instances make it easy to run low level processes that can write directly to disk, and its Amazon Simple Storage System (Amazon S3) provides great long-term file storage. This combination makes an excellent choice for most flat-file logging systems. At Pulse, we’ve built a simple logging system that is blazingly fast on one machine and easy to scale horizontally. Using Tornado to handle HTTP requests and Scribe to buffer and write files, we are able to store logs at near-disk speeds (more than 50 MB/s per instance). Once the logs have been written to disk, we regularly move them to Amazon S3 for reliable long-term storage and easy access. Amazon S3′s low cost and scalable nature allows us to save all of our data without worrying too much about size.

By provisioning one of Elastic Load Balancer (ELB) instances, we are able to easily divide our load over as many logging servers as necessary and automatically direct load away from failing machines. Provisioning these machines in multiple AWS availability zones also makes it easy to achieve fault tolerance.

Pulse’s implementation easily handles millions of events per hour and has been running continuously for over a year without any downtime.

Data Analytics

Another major reason we decided to build our event logging system on Amazon S3 was to leverage Amazon Elastic MapReduce  and Apache Hive. Now that our data is getting bigger, it is much more efficient to query with a cluster of machines. Without having to configure and maintain our own Hadoop cluster or having to move our data from Amazon S3, AWS allows us to quickly spin up a cluster of 10s to 100s of machines.

With a large cluster, we are able to query a significant portion of our data in minutes instead of hours or days. Because the AWS cluster can simply be turned off when we are done, the cost to run big queries is usually quite reasonable. Consider a cluster of 100 m1.large machines. A set of queries that takes 45 minutes to run on this cluster would cost us $11 – $34 (depending on whether we bid on spot instances or use regular on-demand instances). Assuming you’re not running jobs all the time, this is preferable to the cost of buying and continuously maintaining your own cluster.

Apache Hive makes this process even easier by taking simple SQL queries and converting them into what would often be relatively complex, multi-step Amazon Elastic MapReduce jobs. These SQL queries can be run directly by our business team, avoiding the need for engineering support.

For batch jobs, such as regularly extracting the top read and shared stories, the Pulse backend team likes to use mrjob, an open source framework developed at Yelp. Mrjob allows us to write mappers and reducers in Python (instead of Java) and integrates seamlessly with Amazon Elastic MapReduce. Python is our language of choice because it is more consistent with our codebase and it provides a simple representation for common MapReduce data structures such as tuples and dictionaries. Because our jobs are usually IO-bound, the interpreted runtime doesn’t slow things down much.

Recommendations

Beyond curating our top story feeds, we’ve recently started developing several exciting new user-facing features using Amazon Elastic MapReduce, mrjob, and our data on Amazon S3. As part of our last major release, we announced a new feature called Smart Dock, which recommends new sources to millions of users based on their reading history. This feature makes it much easier to discover relevant content and has been extremely well received by our users. Our newest full-time backend engineer, Leonard Wei, led this project and built it almost entirely on AWS.

Our recommendations pipeline processes over 250GB of the raw log data we have in Amazon S3. We reduce this data down to about 1GB of relevant features via an Amazon Elastic MapReduce job. We then use an LDA-based approach to predict which sources a user is likely to add next. We run this portion of the pipeline on AWS using a single High-CPU Extra Large instance.

Once the model is generated for each user and some additional post-processing is complete, we upload each user’s recommended sources to our serving infrastructure on App Engine. From there, the recommendations are combined with the latest catalog data and sent to the app to be presented in the Smart Dock. One run of the whole pipeline costs us a very reasonable $20 of AWS compute time.

Other Tasks

Beyond event logging, analytics and recommendations, we also use AWS for lots of smaller tasks that just make sense to run directly on one or more machines, rather than through a higher level service. Some examples include parsing html pages with node-readability and continuously monitoring all of our systems to make sure we’re aware of any problems. Recently, we also started working on a new real-time analytics infrastructure based on Redis, which will leverage the High-Memory instances Amazon EC2 offers.

To learn more about Pulse’s infrastructure check out some of the backend team’s other posts. Our recent article on how we scaled up for the Kindle Fire launch compliments this one and talks more about our content serving, client APIs and Pulse.me web hosting.

 

Scaling with the Kindle Fire

This post was originally published as a guest post on the Google App Engine blog.

As part of the much anticipated Kindle Fire launch, Pulse was announced as one of the only preloaded apps. When you first unbox the Fire, Pulse will be there waiting for you on the home row, next to Facebook and IMDB!

Scale

The Kindle Fire is projected to sell over five million units this quarter alone. This means that those of us who work on backend infrastructure at Pulse have had to prepare for nearly doubling our user-base in a very short period. We also need to be ready for spikes in load due to press events and the holiday season.

Architecture

As I’ve discussed previously on the Pulse Engineering Blog, Pulse’s infrastructure has been designed with scalability in mind from the beginning. We’ve built our web site and client APIs on top of Google App Engine, which has allowed us to grow steadily from 10s to many 1000s of requests per second, without needing to re-architect our systems.

While restrictive in some ways, we’ve found App Engine’s frontend serving instances (running Python in our case) to be extremely scalable, with minimal operational support from our team. We’ve also found the datastore, memcache, and task queue facilities to be equally scalable.

Pulse’s backend infrastructure provides many critical services to our native applications and web site. For example, we cache and serve optimized feed and image data for each source in our catalog. This allows us to minimize latency and data transfer and is especially important to providing an exceptional user experience on limited mobile connections. Providing this service for millions of users requires us to serve 100Ms of requests per day. As with any well designed App Engine app, the vast majority of these requests are served out of memcache and never hit the datastore. Another useful technique we use is to set public cache control headers wherever possible, to allow Google’s edge cache (shown as cached requests on the graph below) and ISP / mobile carrier caches to serve unchanged content directly to users.

Costs

Based on App Engine’s projected billing statements leading up to the recent pricing changes, we were concerned that our costs might increase significantly. To prepare for these changes and the expected additional load from Kindle Fire users, we invested some time in diagnosing and reducing these costs. In most cases, the increases turned out to be an indicator of inefficiencies in our code and/or in the App Engine scheduler. With a little optimization, we have reduced these costs dramatically.

The new tuning sliders for the scheduler make it possible to rein in overly aggressive instance allocation. In the old pricing structure, idle instance time wasn’t charged for at all, so these inefficiencies were usually ignored. Now App Engine charges for all instance time by default. However, any time App Engine runs more idle instances than you’ve allowed, those hours are free. This acts as a hint to the scheduler, helping it reduce unneeded idle instances. By doing some testing to find the optimal cost vs spike latency tolerance and setting the sliders to those levels, we were able to reduce our frontend instance costs to near original levels. Our heavy usage of memcache (which is still free!) also helps keep our instance hours down.

Since datastore operations used to be charged under the umbrella of CPU hours, it was difficult to know the cost of these operations under the old pricing structure. This meant it was easy to miss application inefficiencies, especially for write-heavy workloads where additional indexes can have a multiplicative effect on costs. In our case, the new datastore write operations metric led us to notice some inefficiencies in our design and a tendency to overuse indexes. We are now working to minimize the number of indexes our queries rely on, and this has started to reduce our write costs.

Preparing for the Kindle Fire Launch

We took a few additional steps to prepare for the expected load increase and spikes associated with the Fire’s launch. First, we contacted App Engine’s support team to warn them of the expected increase. This is recommended for any app at or near 10,000 requests per second (to make sure your application is correctly provisioned). We also signed up for a Premier account which gets us additional support and simpler billing.

Architecturally, we decided to split our load across three primary applications, each serving different use cases. While this makes it harder to access data across these applications, those same boundaries serve to isolate potential load-related problems and make tuning simpler. In our case, we were able to divide certain parts of our infrastructure, where cross application data access was less important and load would be significant. Until App Engine provides more visibility into and control of memcache eviction policies, this approach also helps prevent lower priority data from evicting critical data.

I’m hopeful that in the near future such division of services will not be required. Individually tunable load isolation zones and memcache controls would certainly make it a lot more appealing to have everything in a single application. Until then, this technique works quite well, and helps to simplify how we think about scaling.

Bringing Location-based Deals to Your Phone

Savvy shoppers have always been on the lookout for new deals, but sites such as Groupon, Gilt City, and LivingSocial have made deal-hunting even more popular. For the last few weeks, we’ve been working hard to make finding great deals easier for our users. By aggregating local deals from these sources, we hope to make them less intrusive (than email alerts) and easy to peruse. It should also be painless to scan deals from several different sites at once by simply adding each of them in Pulse.

Where are the deals?

Since most deal sites are location-based, we want to allow our users see deals that are nearby. The first step is to know where the deals are. We can get a list of cities (either through API calls to the deal sites or RSS feeds provided by them) and store the longitude and latitude of these cities. If the sites are unable to provide a list of cities and/or their coordinates, it is relatively straightforward to scrape the list of cities from their website (with permission of course!) and find the coordinates of the cities through services such as the Google Geocoding API.  Once we have this data, we store the city/coordinate pairs in the Google AppEngine datastore (Figure 1).

Figure 1: City coordinates in datastore

Where is the user?

On GPS enabled devices, location coordinates are relatively easy to come by. A client application can simply query the server with a user’s coordinates (with user’s permission) using an API call such as: http://<server>/api/nearby?long=-73&lat=42&max=10. After some server-side calculations, this call will return a list of nearby cities that should be relevant to the user because they are nearby the location the user just provided.

What is nearby?

Given that we have each city’s coordinates stored on the server and the user’s coordinates from the client application, how do we determine which cities are closest? We use the following algorithm to figure out which cities we should show to the user:

for city in cities:
    dist = math.pow(city.long - user.long, 2) + math.pow(city.lat - user.lat, 2)
    heapq.heappush(result, (dist, city))

The result is a min heap that ranks the cities from the closest to the farthest. We return this list (or a subset of it) to the client application and then allow the user to choose to add deals from a city near them.

The key to making this operation efficient is loading the city coordinates into memory (in our case, into the AppEngine Memcache). This simple optimization works well for any small/medium sized list of locations and turns a resource intensive calculation (that can be quite involved to implement on AppEngine’s datastore) into simple and scaleable operation that requires no datastore calls.

We hope to give our users a better experience by offering the ability to aggregate deal sites in Pulse. This was a short insight into how we implemented this feature.  If you have great ideas that can help further improve this experience, let us know!

Introducing Livecount

Analytics is ideally a combination of real-time and batch processing. Batch processing, with something like Hadoop, is great for digging into large amounts of past data and asking questions that cannot be anticipated. However, when it is known ahead of time that certain aggregates will be required, the best solution is often to count each event as it happens. Most analytics dashboards are backed by this kind of real-time data.

Nine months ago, Pulse was just starting to experiment with real-time event counting. We didn’t have much server infrastructure yet and were using Google AppEngine to host a few simple APIs. We started reading about the various ways to implement counters on appengine and came across two frequently recommended solutions.

Existing Solutions

Sharded counters was the first approach we tried. To compensate for our write-heavy counter workload, we split each counter into several datastore entities. This allowed writes to be parallelized and avoided the single entity write limits of the AppEngine datastore. To read a single counter value, we queried all shards and summed up the values. This worked well, but required hitting the datastore on every request. Our tests still showed unacceptably high latencies. More shards sped things up a bit, but performance was always bottlenecked on the datastore.

To avoid datastore latencies, counting in memory seemed like an obvious solution. Given a distributed key-value store like Memcached, counting in memory should be quite scalable, while improving both read and write performance. Of course, memcache data is vulnerable to loss when a server goes down, as well as being subject to eviction if available memory runs short. Unfortunately, the eviction problem is amplified in shared environments where it is always possible to be evicted by memory pressure from another app. While we were willing to accept some risk of data loss, our tests showed the probability was too high on AppEngine’s memcache.

Implementing Write-behind Counters

Livecount was developed to leverage the performance of AppEngine’s memcache, while making an effort to maintain the durability of counts. AppEngine’s task queues turned out to be perfect for the job. Each time a count is updated (or optionally when a multiple is reached), Livecount creates a worker task to write that count from the memcache to the datastore in the background. If the count is ever evicted, it is reloaded from the datastore on the next read or write.

Performance

Since counter updates are usually written back to the datastore within seconds, the risk of loss is minimal. Write performance is excellent, since only the memcache must be updated before completing a request. Most reads can also be served from the memcache. Load on the datastore is further reduced by storing a dirty flag along with each memcached count. If more increment events come in than can be written back in real time, only one write is needed to update the datastore with the latest count. After a successful write, the dirty bit is cleared and the other backlogged write tasks for that counter are skipped.

Using Livecount

This simple solution has allowed Pulse’s backend to easily scale to counting hundreds of events per second, with minimal cost and complexity. Livecount’s API requires nothing more than a simple string counter name.

from livecount.counter import load_and_increment_counter

load_and_increment_counter(name=url)

For more advanced use-cases, namespaces are supported for keep counters organized and easy to query. Recently, we also added support for time period fields to help support hourly/daily/weekly/monthly aggregates. Here’s a more advanced example.

from livecount.counter import PeriodType, load_and_increment_counter

load_and_increment_counter(name=url, period=datetime.now(), \
period_types=[PeriodType.DAY, PeriodType.WEEK], namespace="starred", delta=1)

Livecount is open-source and easily deployable on Google AppEngine. If you have something to count, give Livecount a try. We’d love to hear your thoughts or suggestions for improvement!

Integrating with External APIs

In February, we released our first set of API driven feeds, including Flickr, YouTube, and Reddit. Every week, Pulse features a new set of feeds to help our users discover new content; RSS feeds served this purpose simply and effectively, since many news sites, blogs, and social sites offer high-quality RSS feeds. However, we realized that there are also many mediocre feeds, in terms of content served, image size/quality, and device compatibility (i.e. article formatting across phones, tablets and Pulse.me).

Fortunately, many of these services offer external APIs for developers, and so we started playing around these APIs to create custom feeds for our users. This post discusses some of the design choices we made in order to bring API content to Pulse.

Integrating with the Backend

As outlined in a previous post, our custom feeds are served from our backend, which is built on Google AppEngine (GAE). We were able to quickly prototype our API feeds by utilizing the datastore and deferred features. By integrating this component into GAE, we are able to serve the API feeds in the same manner as other RSS feeds, requiring no modifications to our client-side code.

We use GAE Datastore to store individual articles crafted from the APIs, and GAE Deferred/TaskQueue to poll the APIs for updated content. The article content is created in HTML to be easily displayed in a WebView on the client (iOS and Android) .

API Issues

Here are some issues that we ran into while integrating the APIs into our backend, and tips on how to deal with them:

1) No APIs are created equal
Since there’s no standard for how APIs are served, you really need to have a separate script for each of them, as they all have different endpoints for different functions. Some return JSON, and some return XML. It’s tedious, but it’s only a one time investment to learn each API. Once you have your script programmed to call the right functions and use the data in the correct way, you don’t have to look at the API ever again…

2) Ever again? APIs change!
The golden rule of using an API is that APIs change! You are always at the mercy of the API providers. Just because you have access to a piece of content now, doesn’t mean you will have access to it in the future. The endpoint for it may change, or it may just be removed entirely. As the developer you have to create code that doesn’t grind to a halt when it doesn’t find the data where it expects it to be. Protip: use try/catch when making API calls so your script has a chance to finish despite missing some calls.

3) Rate Limiting on GAE
From the start, we were very careful about following each API’s rate limit quota, specified by X requests a day or Y requests per second. Using Deferred/Taskqueue functions on GAE, these rate limits can be faithfully followed. However, within the first week, we started getting blocked by YouTube and Digg for surpassing our limit. After digging deeper, we found the culprit; in the GAE implementation, Google sends out URL requests for multiple apps from the same physical server. Therefore, if the API provider only counts the number of requests coming from each IP address as a means of checking the rate limit, this GAE server will quickly hit the limit if many of its apps perform API requests to that same service. You can circumvent this problem in two ways. A short-term solution is to set up a server outside of GAE (for example, on Amazon’s EC2 cluster) to bounce requests for your application. The long-term solution is to ask the API provider to check the rate limit based on api key, not IP address.

Recap

As we push out new feeds every week, we are always looking for better ways to get content to our users. Whether it be through improving on existing RSS feeds or creating new feeds through APIs, we want to provide our users with an experience that they can only get through Pulse.

Ever used another service’s external API? Liked it? Hated it? Comments and suggestions appreciated!

Using Data Analysis to Discover Top Stories

For the last six months, Pulse has curated Top Stories feeds to help users discover interesting content when it is most relevant. Top Stories has been one of our most popular features, receiving millions of update requests per day. To help start off our new engineering blog, I’ll share some of the technical details behind how we build these feeds. Yesterday we also announced the ability to follow our Top Stories by category via Twitter. 

We start building the Top Stories feeds by analyzing anonymous data on what stories are currently being read and shared the most through Pulse. Since the volume of data coming from Pulse is always growing, we’ve built several highly scalable systems for collection and analysis.  Most of these systems are currently built on Amazon Web Services (AWS).

Collecting Data

The first system is built on Amazon’s Elastic Compute Cloud (EC2) and Simple Storage Service (S3).  It uses Tornado and Scribe to efficiently log data from HTTP requests to disk.  This data is then moved to S3 on a regular basis for permanent storage and analysis.  All data is stored in two separate S3 buckets for additional reliability. To make the system horizontally scalable, we replicate the Tornado/Scribe stack on multiple EC2 instances and use an Elastic Load Balancer (ELB) to distribute our request load across these instances. The ELB also provides a heartbeat service which automatically redirects requests away from any malfunctioning instances.

Analyzing with Hadoop

Once we have our data on S3, it is important for us to be able to extract breaking stories quickly.  To achieve this on our rapidly growing dataset, we built our analysis engine on top of Elastic Map Reduce (EMR) and Hadoop. Using a set of relatively simple mappers and reducers based on the mrjob python framework from Yelp, we are able to quickly scan through hours or days worth of data to surface top read and shared stories. We regularly run these Hadoop jobs directly against our data on S3. To improve performance, this data could be staged on Elastic Block Storage (EBS) first, but in practice we haven’t found this necessary. Since our MapReduce operations are very parallelizable, we can get faster results simply by spinning up a few extra EMR instances. The Amazon EMR interface makes this process extremely simple and cost effective in comparison to running a dedicated Hadoop cluster on EC2 or our own hardware.

Serving Feeds

In contrast to our data collection and analysis infrastructure, most of our feed serving and client api request handling is built on Google AppEngine (GAE).  When our EMR jobs finish, they push the final top stories for each of our categories to the datastore on GAE. Here we achieve horizontal scalability via GAE’s automatic web servers provisioning based on the current request load, as well as it’s solid non-relational datastore and memcache.

This data is then normalized against the current ratio of reading to sharing in the given category and used to create a ranked feed of the current top stories that quickly surfaces breaking news.  The feeds for each category are then memcached and efficiently served to users via the Pulse application on each mobile platform.

Check out our post on the Pulse Blog to follow our Top Stories on Twitter!