Skip to content

Python SDK Tutorial Part 4

Albert Suarez edited this page Jan 23, 2019 · 1 revision

Adding robustness and performance using the token-bucket algorithm

After the above implementation, we have a functionally multi-threaded client that implements basic rate-limiting. However, it isn't very robust nor performant; it is limited to the same number of processing threads as the allotted rate (in requests per second). A more robust and performant approach is to allow for as many processing threads as desired to optimize iowait while still throttling requests in order to maintain a consistent rate. To implement this approach, we can easily leverage the token bucket algorithm, which was designed for the similar network bandwidth limiting concept.

For the token-bucket implementation, start by copying the existing script to Now let's add a variable to represent the number of waiting threads per processing thread:

# set allotted number of requests per second
# this is defined here as it could be retrieved through some external mechanism
__requests_per_second = 4
__waiting_threads = 10

Next, let's add our new token-bucket concurrency control constructs:

    # 3. Define concurrency specific objects
    # stats objects
    lock_stats = mp.Lock()
    counter = mp.Value('i', 0)
    avg_req_time = mp.Value('f', 0)
    time_start = mp.Value('f', 999999999999999)
    time_end = mp.Value('f', 0)
    # concurrency control objects
    lock_bucket = mp.Lock()
    bucket = mp.Queue()
    # pre-seed bucket
    for i in range(__requests_per_second):

In addition to the previously added stats objects, we are now adding multiprocessing constructs for the purpose of rate limiting. The Queue object acts as the bucket, with a Lock to gate modifications to its contents. We also pre-seed the bucket with tokens here.

Now let's define a new function that will act as our bucket-refilling thread. In a nutshell, the way the token-bucket algorithm works is that multiple threads take tokens from the bucket in order to run, while a separate thread replenishes tokens into the bucket at a pre-defined rate (in this case, a rate of n tokens per seconds, which corresponds to the n requests per second rate). Let's call this function bucket_refill_thread:

def bucket_refill_thread(lock_bucket, bucket):
    while True:
        # (re-)establish parameters
        interval = float(1.0)
        tokens = __requests_per_second

        # sleep for interval
        #   the blow section breaks intervals apart into smaller chunks in order to smooth out bursting
        #   for example, 8 rps can be broken down into: 2 requests / 250 ms
        if __requests_per_second > 1:
            for i in range(__requests_per_second-1, 1):
                if __requests_per_second % i == 0:
                    interval = 1 / float(i)
                    tokens = __requests_per_second / i

        # check for poison pill
        size = bucket.qsize()
        shutdown = False
        for i in range(size):
                token = bucket.get(block=False)
                if 'shutdown' == token:
                    shutdown = True
        if shutdown:

        # don't let the bucket exceed token capacity
        while bucket.qsize() < min(size + tokens, __requests_per_second):

Like the other thread functions above, it looks for a 'poison pill' control signal to shutdown, but it also puts tokens into the bucket queue, with a cap of no higher than the allotted rps rate. It also doesn't simply put in the maximum allotted tokens each second; it tries to divide it into smaller chunks in order to smooth out request bursting. Doing this maintains a smoother overall request rate and lower response time latencies (which yields better overall throughput). For example, with a rate of 12 rps, it will actually end up putting in 2 tokens every 1/6 of a second, which makes for much smoother response handling than 12 tokens (at once) every second.

We also need to start and stop the bucket_refill_thread just like the other threads in the test_api function:

    #   first start bucket refill thread
    refill_thread = mp.Process(target=bucket_refill_thread, args=(lock_bucket, bucket))
    pool = []
    for i in range(__requests_per_second * __waiting_threads):  # waiting threads to keep the pipeline full
        # pass in necessary parameters to thread, including client key, etc.
        p = mp.Process(target=image_process_thread,
                       args=(url, client_key, queue, results,
                             lock_stats, counter, avg_req_time, time_start, time_end,
                             lock_bucket, bucket))
    # 5. clean-up after queue has been processed with "poison pill"
    while not queue.empty():
        # wait for queue to be processed
    for i in pool:
        # seed shutdown messages / poison pills
        queue.put(dict(id=-1, url='shutdown', model='shutdown'))
    for p in pool:
        # enforce clean shutdown of threads
    # stop bucket refill thread

Note that we also multiply the number of allotted threads by the previously defined __waiting_threads multiplier in order to spawn numerous waiting threads in order to maximize our client's throughput. We also add our new bucket (and corresponding mutex) to our existing image_process_thread invocation loop.

The image_process_thread can now be enhanced to utilize these new rate limiting constructs:

def image_process_thread(url, client_key, queue, results,
                         lock_stats, counter, avg_req_time, time_start, time_end,
                         lock_bucket, bucket):
    while True:
        # acquire token in order to process
        token = None
            token = bucket.get(block=False)  # don't do anything with the token, just remove it, as it acts as our "access rights"
        # first release lock
        # then proceed or sleep
        if not token:
            time.sleep(1 / float(__requests_per_second))
        # get image URL entry to process

Note that similarly to how the earlier implementation handled 429s, if a token is not available for the processing thread, sleep for an arbitrary 1 / n seconds (where n is requests per seconds).

We also don't need to explicitly sleep if we get a 429 code, we can remove the previously existing time.sleep() calls:

            elif resp.status_code == 429:
                # handle over-rate limit retrying
                    msg='surpassed rate limit, trying again')
                # re-queue entry and try again

Finally, let's add this 3rd implementation to our existing test runner:

    print('4. running robust rate limiting example')

Now if you run the test suite, you should see higher throughput for this more robust implementation, such as:

[60] requests processed in [16.0] seconds with average time [1033.0] ms, total throughput: [3.75] rps

And that's it! At this point, we have implemented a basic client, a simple rate-limited client, and a more robust, optimized rate-limited client that implements the token-bucket algorithm.

If you run into any issues or have any suggestions with the code examples, please feel free to create an issue in GitHub! Thanks for reading our tutorial!
