Scaling a polling Python application with tooz

Scaling a polling Python application with tooz

This article is the final one of the series I wrote about scaling a large number of connections in a Python application. If you don't remember what the problem we're trying to solve is, here it is, coming from one of my followers:

It so happened that I'm currently working on scaling some Python app. Specifically, now I'm trying to figure out the best way to scale SSH connections - when one server has to connect to thousands (or even tens of thousands) of remote machines in a short period of time (say, several minutes).
How would you write an application that does that in a scalable way?

The first blog post was exploring a solution based on threads, while the second blog post was exploring an architecture around asyncio.

In the two first articles, we wrote programs that could handle this problem by using multiple threads or asyncio – or both. While this worked pretty well, this had some limitations, such as only using one computer. So this time, we're going to take a different approach and use multiple computers!

The job

As we've already seen, writing a Python application that connects to a host by ssh can be done using Paramiko or asyncssh as we've seen previously. Here again, that will not be the focus of this blog post since it is pretty straightforward to do.

To keep this exercise simple, we'll reuse our ping function from the first article. It looked like this:

import subprocess


def ping(hostname):
    p = subprocess.Popen(["ping", "-c", "3", "-w", "1", hostname],
                         stdout=subprocess.DEVNULL,
                         stderr=subprocess.DEVNULL)
    return p.wait() == 0

As a reminder, running this program alone and pinging serially 255 IP addresses takes more than 10 minutes. Let's try to make it faster by running it in parallel.

The architecture

Remember: if pinging 255 hosts takes 10 minutes, pinging the whole Internet is going to take forever – around five years at this rate.

With our ping experiment, we already divided our mission (e.g. "who's alive on the Internet") into very small tasks ("ping"). If we want to ping 4 billion hosts, we need to run those tasks in parallel. But one computer is not going to be enough: we need to distribute those tasks to different hosts, so we can use some massive parallelism to go even faster!

There are two ways to distribute such a set of tasks:

  • Use a queue. That works well for jobs that are not determined in advance, such as user-submitted tasks or that are going to be executed only once.

  • Use a distribution algorithm. That works only for tasks are determined in advance, and that are scheduled regularly, such as polling.

We are going to pick the second option here, as those ping tasks (or polling in the original problem) should regularly be run. That approach will allow us to spread the jobs onto several processes whose can be even spread onto several nodes over a network. We also won't have to "maintain" the queue (e.g. make it work and monitor it) so that's also a bonus point.

That's infinite horizontal scalability!

The distribution algorithm

The algorithm we're going to use to distribute this task is based on a consistent hashring.

Here's how it works in short. Picture a circular ring. We map objects onto this ring. The ring is then split into partitions. Those partitions are distributed among all the workers. The workers take care of jobs that are in the partitions they are responsible for.

In the case where a new node joins the ring, it is inserted between 2 nodes and take a bit of their workload. In the case where a node leaves the ring, the partitions it was taking care of are reassigned to its adjacent nodes.

If you want more details, it exists plenty of explanations about how this algorithm work. Feel free to look online!

However, to make this work, we need to know which nodes are alive or dead. This is another problem to solve, and the best way to tackle it is to use a coordination mechanism. There are plenty of those, from Apache ZooKeeper to etcd.

Without going too much into details, those pieces of software provide a network service where every node can connect to and can manage its state. If a client gets disconnected or crashes, it's then easy to consider it as removed. That enables the application to get the full list of nodes, and split the ring accordingly. There's no need to have any shared state between the nodes other than who's alive and running.

Using group membership

To get a list of nodes that are available to help us pinging the Internet, we need a service that provides this and a library to interact with it. Since the use case is pretty simple and I don't know which backends you like the most, we're going to use the Tooz library.

Tooz provides a coordination mechanism on top of a large variety of backends: ZooKeeper or etcd, as suggested earlier, but also Redis or memcached for those who want to live more dangerously. Indeed, while ZooKeeper or etcd can be set up in a synchronized cluster, memcached, on the other hand, is a SPOF.

For the sake of the exercise, we're going to use a single instance of etcd here. Thanks to Tooz, switching to another backend would be a one-line change anyway.

Tooz provides a tooz.coordination.Coordinator object that represents the connection to the coordination subsystem. It then exposes an API based on groups and members. A member is a node connected through a Coordinator instance. A group is a place that members can join or leave.

Here's a first implementation of a member joining a group and printing the member list:

import sys
import time

from tooz import coordination

# Check that a client and group ids are passed as arguments
if len(sys.argv) != 3:
    print("Usage: %s <client id> <group id>" % sys.argv[0])
    sys.exit(1)

# Get the Coordinator object
c = coordination.get_coordinator(
    "etcd3://localhost",
    sys.argv[1].encode())
# Start it (initiate connection).
c.start(start_heart=True)

group = sys.argv[2].encode()

# Create the group
try:
    c.create_group(group).get()
except coordination.GroupAlreadyExist:
    pass

# Join the group
c.join_group(group).get()

try:
    while True:
        # Print the members list
        members = c.get_members(group)
        print(members.get())
        time.sleep(1)
finally:
    # Leave the group
    c.leave_group(group).get()

    # Stop when we're done
    c.stop()

Don't forget to run etcd on your machine before running this program. Running a first instance of this program will print set(['client1']) every second. As soon as you run a second instance of this program, they both start to print set(['client1', 'client2']). If you shut down one of the clients, they will print the member list with only one member of it.

This can work with any number of client. If a client crashes rather than disconnect properly, its membership will automatically expire a few seconds – you can configure this expiration period with by passing a timeout value in
Tooz URL.

Using consistent hashing

Now that we have a group, which will turn out to be our ring, we can
implement consistent hashring on top of it. Fortunately, Tooz also provides an implementation of this that is ready to be used. Rather than using the
join_group method, we're gonna use the join_partitioned_group method.

import sys
import time

from tooz import coordination

# Check that a client and group ids are passed as arguments
if len(sys.argv) != 3:
    print("Usage: %s <client id> <group id>" % sys.argv[0])
    sys.exit(1)

# Get the Coordinator object
c = coordination.get_coordinator(
    "etcd3://localhost",
    sys.argv[1].encode())
# Start it (initiate connection).
c.start(start_heart=True)

group = sys.argv[2].encode()

# Join the partitioned group
p = c.join_partitioned_group(group)

try:
    while True:
        print(p.members_for_object("foobar"))
        time.sleep(1)
finally:
    # Leave the group
    c.leave_group(group).get()

    # Stop when we're done
    c.stop()

Running this program on one node (or just one terminal) will output the following every second:

$ python distribution.py client1 foobar
0 handled by set(['client1'])
1 handled by set(['client1'])
2 handled by set(['client1'])
3 handled by set(['client1'])
4 handled by set(['client1'])
5 handled by set(['client1'])
6 handled by set(['client1'])
7 handled by set(['client1'])
8 handled by set(['client1'])
9 handled by set(['client1'])

As soon as a second members join (just run another copy of the script in another terminal), the output changes and both the running programs output the same thing:

0 handled by set(['client2'])
1 handled by set(['client1'])
2 handled by set(['client1'])
3 handled by set(['client1'])
4 handled by set(['client1'])
5 handled by set(['client2'])
6 handled by set(['client2'])
7 handled by set(['client1'])
8 handled by set(['client1'])
9 handled by set(['client2'])

They just shared the ten objects between them. They did not communicate with each other. They just know each other presence, and since they are using the same algorithm to compute where an object should belong, they share the same
results. You can do the test with a third copy of the program:

0 handled by set(['client2'])
1 handled by set(['client1'])
2 handled by set(['client1'])
3 handled by set(['client1'])
4 handled by set(['client1'])
5 handled by set(['client2'])
6 handled by set(['client2'])
7 handled by set(['client3'])
8 handled by set(['client1'])
9 handled by set(['client3'])

Here we got a third client in the mix, excellent! If we stop one of the clients, the rebalancing is done automatically.

While the consistent hashing approach is great, is has a few characteristics you might want to know about:

  • The distribution algorithm is not made to be perfectly even. If you have a vast number of objects, it might seem pretty even statistically, but if you are trying to distribute two objects on two nodes, it's probable one node will handle the two objects and the other one none.

  • The distribution is not done in real time, meaning there's a small chance that an object might be owned by two nodes at the same time. This is not a problem in a scenario such as this one, since pinging a host twice is not going to be a big deal, but if your job needed to be unique and executed once and only once, this might not be an adequate method of distribution. Rather use a queue which has the proper characteristics.

Distributed ping

Now that we have our hashring ready to distribute our job, we can implement our final program!

import sys
import subprocess
import time

from tooz import coordination


# Check that a client and group ids are passed as arguments
if len(sys.argv) != 3:
    print("Usage: %s <client id> <group id>" % sys.argv[0])
    sys.exit(1)


# Get the Coordinator object
c = coordination.get_coordinator(
    "etcd3://localhost",
    sys.argv[1].encode())
# Start it (initiate connection).
c.start(start_heart=True)

group = sys.argv[2].encode()

# Join the partitioned group
p = c.join_partitioned_group(group)


class Host(object):
    def __init__(self, hostname):
        self.hostname = hostname

    def __tooz_hash__(self):
        """Returns a unique byte identifier so Tooz can distribute this object."""
        return self.hostname.encode()

    def __str__(self):
        return "<%s: %s>" % (self.__class__.__name__, self.hostname)

    def ping(self):
        p = subprocess.Popen(["ping", "-q", "-c", "3", "-W", "1",
                              self.hostname],
                             stdout=subprocess.DEVNULL,
                             stderr=subprocess.DEVNULL)
        return p.wait() == 0


hosts_to_ping = [Host("192.168.2.%d" % i) for i in range(255)]

try:
    while True:
        for host in hosts_to_ping:
            c.run_watchers()
            if p.belongs_to_self(host):
                print("Pinging %s" % host)
                if host.ping():
                    print("  %s is alive" % host)
        time.sleep(1)
finally:
    # Leave the group
    c.leave_group(group).get()

    # Stop when we're done
    c.stop()

When the first client starts, it starts iterating on the host, and since it is alone, all hosts belong to it. So it starts pinging all nodes:

{% syntax %}
$ python3 ping.py client1 ping
Pinging <Host: 192.168.2.0>
<Host: 192.168.2.0> is alive
Pinging <Host: 192.168.2.1>
<Host: 192.168.2.1> is alive
Pinging <Host: 192.168.2.2>
{% endsyntax %}

Then, a second client starts pinging too, and automatically the jobs are split. The client1 instance starts skipping some nodes that now belongs to client2:

# client1 output
Pinging <Host: 192.168.2.8>
  <Host: 192.168.2.8> is alive
Pinging <Host: 192.168.2.9>
Pinging <Host: 192.168.2.11>
Pinging <Host: 192.168.2.12>

# client2 output
Pinging <Host: 192.168.2.7>
Pinging <Host: 192.168.2.10>
Pinging <Host: 192.168.2.13>
  <Host: 192.168.2.13> is alive

On the other hand, client2 is skipping nodes that are belonging to client1. If you want to scale further our application, we can start new clients on other nodes on the network and expand our pinging system!

Just a first step

This ping job does not use a lot of CPU time or I/O bandwidth, neither would the original ssh case by Alon. However, if that would be the case, this method would be even more efficient as the scalability of the resources would be a key.

These are just the first steps of the distribution and scalability mechanism
that you can implement using Python. There are a few other options available on top of this mechanism such as defining different weights for different nodes or using replicas to achieve high-availability scenario. I've covered those in my book Scaling Python, if you're interested in learning more!