Backend Tips – Conquering Big Tables with MapReduce

mapreduceAs some of our readers already know, Pulse uses Google App Engine (GAE) to serve content from thousands of publishers to millions of users. We have been very happy with the minimal operational overhead App Engine requires and were thrilled to see App Engine scale without hiccups when we were preloaded on the Kindle Fire.

As a backend engineer, it is inevitable that some engineering tasks involve heavy data processing. In our case, this often happens on data in the App Engine datastore. We have always relied on the very flexible and easy remote shell to do this type of work. However, this approach is too slow for many use cases, especially those touching millions of records.

For larger tasks, App Engine’s built-in MapReduce is often the right tool. It allows us to quickly operate on millions of datastore entities in a very short amount time. To give a few examples, we use MapReduce: to quickly migrate existing data from legacy datastore models to new models due to architectural changes, to perform load testing on our system with hundreds of shards simulating millions of users, and to inform our users of Pulse’s latest updates by sending out millions of emails or push notifications.

Data Migration

When making product changes, we sometimes move large amounts of data away from a legacy django-nonrel model. The speed of MapReduce ensures that minimal transition time is required and that the transition is painless enough that it is preferred over simply living with the wrong data model.

Load Testing

We use MapReduce to simulate load tests that would otherwise be unrealistic if we only used a few physical machines. A simple load test might use MapReduce to make thousands of requests within a very short period. These requests can simulate millions of users using Pulse throughout a day.

Lessons Learned

You should plan and test any large Map Reduce task that will consume quota-limited resources before running the full job. It’s a good idea to estimate the amount of datastore reads/writes, url fetch calls, and other API requests beforehand. In some cases, it may be necessary to contact App Engine support to ask for increased quotas (for those that cannot be increased in the admin console).

For those using a framework on top of App Engine, make sure you initialize at the top of your handler file (see below). In some cases, you may also need to add the initialization code to the mapreduce module (at the top of mapreduce/main.py). In Django-nonrel, the init line you’ll need looks like this.

from djangoappengine import main

Getting Started

For those of you new to Map Reduce on App Engine, here’s how to create jobs of your own. The App Engine team has made it pretty easy.

Download the mapreduce library via svn and add it to your app:

 svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce

Register the MapReduce handler in your app.yaml:

handlers:
- url: /mapreduce(/.*)?
  script: mapreduce/main.py
  login: admin

url – The MapReduce endpoints.
script – The handler file containing the task you want to perform.
login – Restricts access to app admins only.

Create the handler file you specified in the previous step (mr_email_users.py) and pass in the model you want to map over:

def run(user_entity):
    send_email(user_entity.email)

Note: See the official Map Reduce guide below for more advanced options & examples.

Register and configure the MapReduce job in mapreduce.yaml:

mapreduce:
- name: MapReduce Email Users Job
  mapper:
    input_reader: mapreduce.input_readers.DatastoreInputReader
    handler: mr_email_users.run
    params:
    - name: entity_kind
      default: user
    - name: shard_count
      default: 50
    - name: processing_rate
      default: 1000

input_reader – The input reader for this job; you can find other types here.
handler – The entry point to this MapReduce job.
entity_kind – The datastore model being mapped over.
shard_count – The number of concurrent mapper workers to run at once.
processing_rate – The aggregated maximum number of inputs processed per second by all mappers. Can be used to avoid using up all quota, interfering with online users.

Access the MapReduce admin console panel to view and launch jobs:

http://(your app name).appspot.com/mapreduce/status

More Info

You may be interested in the official MapReduce Get Started Guide for Python or Java. In addition, this 2011 Google IO talk includes many new useful MapReduce tips. Please leave any questions and comments below, and we will be happy to answer / discuss!

Bringing Location-based Deals to Your Phone

Savvy shoppers have always been on the lookout for new deals, but sites such as Groupon, Gilt City, and LivingSocial have made deal-hunting even more popular. For the last few weeks, we’ve been working hard to make finding great deals easier for our users. By aggregating local deals from these sources, we hope to make them less intrusive (than email alerts) and easy to peruse. It should also be painless to scan deals from several different sites at once by simply adding each of them in Pulse.

Where are the deals?

Since most deal sites are location-based, we want to allow our users see deals that are nearby. The first step is to know where the deals are. We can get a list of cities (either through API calls to the deal sites or RSS feeds provided by them) and store the longitude and latitude of these cities. If the sites are unable to provide a list of cities and/or their coordinates, it is relatively straightforward to scrape the list of cities from their website (with permission of course!) and find the coordinates of the cities through services such as the Google Geocoding API.  Once we have this data, we store the city/coordinate pairs in the Google AppEngine datastore (Figure 1).

Figure 1: City coordinates in datastore

Where is the user?

On GPS enabled devices, location coordinates are relatively easy to come by. A client application can simply query the server with a user’s coordinates (with user’s permission) using an API call such as: http://<server>/api/nearby?long=-73&lat=42&max=10. After some server-side calculations, this call will return a list of nearby cities that should be relevant to the user because they are nearby the location the user just provided.

What is nearby?

Given that we have each city’s coordinates stored on the server and the user’s coordinates from the client application, how do we determine which cities are closest? We use the following algorithm to figure out which cities we should show to the user:

for city in cities:
    dist = math.pow(city.long - user.long, 2) + math.pow(city.lat - user.lat, 2)
    heapq.heappush(result, (dist, city))

The result is a min heap that ranks the cities from the closest to the farthest. We return this list (or a subset of it) to the client application and then allow the user to choose to add deals from a city near them.

The key to making this operation efficient is loading the city coordinates into memory (in our case, into the AppEngine Memcache). This simple optimization works well for any small/medium sized list of locations and turns a resource intensive calculation (that can be quite involved to implement on AppEngine’s datastore) into simple and scaleable operation that requires no datastore calls.

We hope to give our users a better experience by offering the ability to aggregate deal sites in Pulse. This was a short insight into how we implemented this feature.  If you have great ideas that can help further improve this experience, let us know!