Python Timeit

I’m working through the lessons in the free online version Allen B. Downey’s Think Stats.One thing I like to do whenever I write code is test it. So almost as soon as I began the exercises, I added a module called testwell.

Since performance is often a consideration with statistical computing, my testing module includes a basic timing function called timeit. The Python standard library includes a timeit module, but I don’t find it very friendly, as it creates a new process with its own separate environment. I just want a timing function that can time comparable pieces of code in the current environment. Here’s my timeit function:

def timeit(f, *args, **kw):
    """time a function over an n number of trails:

    f1 = lambda a: a * a
    def f2(a, b):
        f1(a) + f1(b)
    def f3():
        f2(10, 20)

    USAGE:
        t1 = timeit(f1, a, n=1000)
        t2 = timeit(f2, a, b, n=1000)
        t3 = timeit(f3, n=1000)
        pprint([t1, t2, t3])
    """
    n = kw.get('n', 100)
    print 'timing %s over %s trials' % (f, n)
    t0 = time.time()
    for i in range(n):
        f(*args)

    total_time = time.time() - t0
    per_trial = total_time / n
    return '%.2f (%s trials at %.6f per trial)' % (total_time, n, per_trial)

It takes a function as it’s first argument, a list of arguments, and an n keyword to specify how many times to run it.

The pattern I like best is to enclose the operation you wish to test in a function, then just pass that with the n keyword:

import pprint

num_trials = 100000
f1 = lambda: Percentile(scores, 50)
f2 = lambda: iPercentile(scores, 50)
t1 = timeit(f1, n=num_trials)
t2 = timeit(f2, n=num_trials)
pprint.pprint([t1, t2])

# output
timing <function  at 0x8950bc4> over 100000 trials
timing <function  at 0x8950bfc> over 100000 trials
['0.86 (100000 trials at 0.000009 per trial)',
 '0.38 (100000 trials at 0.000004 per trial)']
Python Timeit

Updating Model Schema in Google App Engine

Problem

I have an App Engine app that keeps tracks of some tweets (“status” in the API). I decided I wanted to store the time the message was originally tweeted. So I need to update the schema of one of my models, TweetDigest, to add the new property (field). But then I also would like to update any existing records to include the value.

Solution

First things first. Let’s update the model. This is easy enough. Just add the new property to the existing model. The relevant code:

from project.models.twitter import TweetDigest

class TweetDigest(db.Model):
    tweet_id        = db.StringProperty(required=True, indexed=True)
    user_id         = db.StringProperty(required=True)
    screen_name     = db.StringProperty(required=True)
    text            = db.StringProperty(required=True, multiline=True)
    stored_at       = db.DateTimeProperty(auto_now_add=True)
    
    # new property
    tweeted_at      = db.DateTimeProperty(required=True)

That’s the easy part. Any new records will include that property. Existing records, however, will not have the property, at all. Let me emphasize that: it’s not that the new field is set to null for existing records. Existing records do not have the field at all. A couple queries in the interactive console illustrate:

from datetime import datetime
from google.appengine.ext import db
from pprint import pprint

print datetime.now()
count = TweetDigest.all().filter('user_image', None).count()
print count

record = TweetDigest.all().get()
pprint(record.__dict__['_entity'])

Output:

2011-09-05 00:11:25.657055
0
{u'screen_name': u'klenwell',
 u'stored_at': datetime.datetime(2011, 7, 6, 9, 26, 56, 757862),
 u'text': u'Premature optimization is the root of all evil.',
 u'tweet_id': u'1948390000',
 u'user_id': u'1820900'}

So how to fix this? Well, in this case, I have to retrieve the created_at value for each status using the Twitter API then update each record. I set up an action in a special controller to do this. It queries the datastore to fetch all the existing records, the creates a task for each that will query the Twitter API, get the created_at value, and update the record in the datastore.

Here’s the controller code that creates the tasks:

def add_tweet_created_at_field(self):
    # queue settings
    queue_name = 'tweetdigest-schema-change'
    queue_url_f = '/backend/queue/store_tweet_created_at_value/%s'
    queue_params = {}

    # purge queue
    self.purge_queue(queue_name)
    logging.info('purged queue: %s' % (queue_name))

    # select all TweetDigest records without image_url and add to queue
    queue_count = 0
    query = TweetDigest.all()
    for digest in query:
        if digest.tweeted_at is None:
            queue_url = queue_url_f % (digest.tweet_id)
            added_task = self.queue_task(queue_url, queue_params, queue_name)
            queue_count += 1

    logging.info('queued %s TweetDigest records for update' % (queue_count))

    # output
    response = {
        'queued tasks'          : queue_count,
    }
    self.set('data', pformat(response))
    self.render(self.default_view)

This action is runs within the appswell framework. The task code is left as an exercise for the reader.

References

http://code.google.com/appengine/articles/update_schema.html
http://stackoverflow.com/questions/7037269/check-if-a-field-is-present-in-an-entity
http://appswell.appspot.com/

Updating Model Schema in Google App Engine

Google App Engine Memcache Limits

Problem

If you attempt to store an object more than approximately 1 MB in size using memcache in the Google App Engine, it will give a ValueError, something like this:

ValueError: Values may not be more than 1000000 bytes in length; received 1088171 bytes

Solution

I’ve added a library to my Appswell framework that allows you to get around this limit by serializing an object into multiple strings and storing these along with an index object that stores the key.

Usage Example:

import multicache as memcache

# cache params
cache_data = some_large_nested_dict
cache_key = 'test_multicache'
cache_len = 60

# save data
memcache.set(cache_key, cache_data, cache_len)

# retrieve data
retrieved_data = memcache.get(cache_key)

The module can be easily extracted from the framework. See these links for additional details:

source code: http://code.google.com/p/appswell/source/browse/appspot/lib/multicache.py
wiki page: http://klenwell.com/is/AppengineMulticache

Google App Engine Memcache Limits

Python Unit Testing Preamble

A little trick I just came up with for command line unit test output. It will print the docstring for the module at the beginning of your unit test. Use this code at the bottom of your unit test file:

#
# MAIN
#
if __name__ == "__main__":
    print sys.modules[globals()['__name__']].__doc__    # print module doc
    suite = unittest.TestLoader().loadTestsFromTestCase(YourTestClassHere)
    unittest.TextTestRunner(verbosity=2).run(suite)
Python Unit Testing Preamble

Forcing /etc/cron.d to Reload on Ubuntu

There are two options for scheduling a script with cron on linux. Either edit the crontab:

$ sudo crontab -e

Or link a cron file to /etc/cron.d:

$ sudo ln -sv /home/klenwell/crons/my.cron /etc/cron.d/mycron

I like option two because it provides a more modular and organized way to enable cron tasks for various projects. The only problem with this method is that, when things go wrong, it’s been a huge headache trying to figure out what the problem is and how to get the tasks in the linked cron file running again. The cron documentation and Google are frustratingly tight-lipped on the subject of /etc/cron.d. But I think I have figured out a reliable way to configure and trouble /etc/cron.d.

Some rules that will help in using /etc/cron.d:

  1. You must specify a user in your cron file.
  2. The script being linked must be owned by root (giving root permission alone doesn’t seem enough)
  3. To force /etc/cron.d to reload, touch the /etc/cron.d dir: sudo touch /etc/cron.d/

I was able to confirm these rules by linking a cron file to /etc/cron.d, then changing the owner to myself (klenwell), then checking the /var/log/syslog file.

Cron checks /etc/cron.d minutely, so you have to give it at least 60 seconds to register any changes. I set up my test cron file (/home/klenwell/crons/my.cron) to touch a file in /tmp (/tmp/crontest) every minute.

When the cron file is owned by my user (klenwell), I see this message in the log:

Nov 26 02:55:01 myserver cron[1924]: (*system*mycron) WRONG FILE OWNER (/etc/cron.d/mycron)

The linked cron file will not run again until the owner is corrected and the directory touched:

$ sudo chown -v root /home/klenwell/crons/my.cron
$ sudo touch /etc/cron.d/

After this, I see this in /var/log/syslog:

Nov 26 02:57:01 myserver CRON[21170]: (klenwell) CMD (   touch /tmp/crontest)

Took me a lot of trial and error to figure this out. Hope it saves your an extra headache or two.

Forcing /etc/cron.d to Reload on Ubuntu

Get for Python Lists

With python’s dictionary objects, you have the useful get method:

>>> d = {'a':1, 'b':2, 'c':3}
>>> d.get('a')
1
>>> d.get('d', 'not found')
'not found'

Sometimes, I find something like this would be useful for lists, using the list’s index in place of the dict’s key:

>>> l = [0,1,2,3]
>>> l.get(2)
Traceback (most recent call last):
  File "", line 1, in 
AttributeError: 'list' object has no attribute 'get'

Here’s a quick lambda that accomplishes that:

list_get = lambda l,i,a=False: (len(l)  i and l[i])

Usage:

>>> list_get = lambda l,i,a=False: (len(l)  i and l[i])
>>> my_list = [0,False,None,3]
>>> list_get(my_list,0)
0
>>> list_get(my_list,1)
False
>>> list_get(my_list,2)  # returns None
>>> list_get(my_list,3)
3
>>> list_get(my_list,4)
False
>>> list_get(my_list,4,0)  # Note: returns False instead of 0
False
>>> list_get(my_list,4,1)
1

It’s not perfect. Be careful with False-equivalent values in the alt parameters as noted above. But otherwise it should work for most practical cases were such usage is desired.

Get for Python Lists