Scaling a polling Python application with parallelism

A study case around threads

Tuesday 23 January 2018 Python Comments

A few weeks ago, Alon contacted me and asked me the following:

It so happened that I'm currently working on scaling some Python app. Specifically, now I'm trying to figure out the best way to scale SSH connections - when one server has to connect to thousands (or even tens of thousands) of remote machines in a short period of time (say, several minutes).

How would you write an application that does that in a scalable way?

Alon is using such an application to gather information on the hosts it connects to, though that's not important in this case.

In a series of blog post, I'd like to help Alon solve this problem! We're gonna write an application that can manage millions of hosts.

Well, if you have enough hardware, obviously.

The job

Writing a Python application that connects to a host by ssh can be done using, for example, Paramiko. That will not be the focus of this blog post since it is pretty straightforward to do.

To keep this exercise simple, we'll just use a ping function that looks like this:

import subprocess
 

def ping(hostname):
p = subprocess.Popen(["ping", "-c", "3", "-w", "1", hostname],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return p.wait() == 0


The function ping returns True if the host is reachable and alive, or False if an error occurs (bad hostname, network unreachable, ping timeout, etc.). We're also not trying to make ping fast by specifying a lower timeout or a smaller number of packets. The goal is to scale this task while knowing it takes time to execute.

So ping is going to be the job to be executed by our application. It'll replace ssh in this example, but you'll see it'll be easy to replace it with any other job you might have.

We're going to use this job to accomplish a bigger mission: determine which hosts in my home network are up:

for i in range(255):
ip = "192.168.2.%d" % i
if ping(ip):
print("%s is alive" % ip)


Running this program alone and pinging all 255 IP addresses takes more than 10 minutes.

It is pretty slow because each time we ping a host, we wait for the ping to succeed or timeout before starting the next ping. So if you need 3 seconds to ping each host in average, then to ping 255 nodes you'll need 5 seconds × 255 = 765 seconds and that's more than 12 minutes.

The solution

If 255 hosts need 12 minutes to be pinged, you can imagine how long it's going to be when we're going to test which hosts are alive on the IPv4 Internet – 4 294 967 296 addresses to ping!

Since those ping (or ssh) jobs are not CPU intensive, we can consider that one multi-processor host is going to be powerful enough – at least for a beginning.

The real issue here currently is that those tasks are I/O intensive and executing them serially is very long.

So let's run them in parallel!

To do this, we're going to use threads. Threads are not efficient in Python when your tasks are CPU intensive, but in case of blocking I/O, they are good enough.

Using concurrent.futures

With concurrent.futures, it's easy to manage a pool of threads and schedule the execution of tasks. Here's how we're going to do it:

import functools
from concurrent import futures
import subprocess
 

def ping(hostname):
p = subprocess.Popen(["ping", "-q", "-c", "3", "-W", "1",
hostname],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL)
return p.wait() == 0
 

with futures.ThreadPoolExecutor(max_workers=4) as executor:
futs = [
(host, executor.submit(functools.partial(ping, host)))
for host in ("192.168.2.%d" % i for i in range(255))
]
 
for ip, f in futs:
if f.result():
print("%s is alive" % ip)


The ThreadPoolExecutor is an engine, called executor, that allows us to submit tasks to it. Each task submitted is put into an internal queue using the executor.submit method. This method takes a function to execute as argument.

Then, the executor pulls jobs out of its queue and execute them. In order to execute them, it starts a thread that is going to be responsible for the execution. The maximum number of threads to start is controlled by the max_workers parameters.

executor.submit returns a Future object, that holds the future result of the submitted task. Future objects expose methods to know if the task is finished or not; here we just use Future.result() to get the result. This method will block until the result is ready.

There's no magic recipe to find how many max workers you should use. It really depends on the nature of the tasks that are submitted. In this case, using a value of 4 brings down the execution time to 3 minutes – roughly 12 minutes divided by 4, which makes sense. Setting the max_workers to 255 (i.e. the number of tasks submitted) will make all the pings started at the same time, producing a CPU usage spike, but bringing down the total execution time to less than 5 seconds!

Obviously, you wouldn't be able to start 4 billion threads in parallel, but if your system is big and fast enough, and your task using more I/O than CPU, you can use a pretty high value in this case. The memory should also be taken into account – in this case, it's very low since the ping task is not using a lot of memory.

Just a first step

As already said, this ping job does not use a lot of CPU time or I/O bandwidth, neither would the original ssh case by Alon. However, if that would be the case, this method would be limited pretty quickly. Threads are not always the best option to maximize your throughput, especially with Python.

These are just the first steps of the distribution and scalability mechanism that you can implement using Python. There are a few other options available on top of this mechanism – I've covered those in my book Scaling Python, if you're interested in learning more!

If you're curious, go read the next article of this series!