Concurrency & Parallelism
Parallel programming means executing operations while using multiple CPU's processes (cores) or using multiple threads in a process. This speeds up the operation multitudes faster based on the number of processes or threads launched.
Python's GIL problem
Python has a Global Interpreter Lock (GIL), which prevent two threads from executing simultaneously in the same program. However, libraries like numpy bypass this limitation by running external code in C.
Because of this GIL limitation, threads provide no benefit for CPU intensive tasks, and is only used for Input/Ouput (IO) tasks. CPU-limiting processes refer to one where most part of the life is spent in cpu, while IO-limiting processes refer to one where most part of the life is spent in i/o state, i.e. instructing another program to run something.
Why does IO task work when it can also only use a single thread in a process at a time? Let's take an example, one thread fires off a request to a URL and while it is waiting for a response, that thread can be swapped out for another thread that fires another request to another URL. Since a thread doesn’t have to do anything until it receives a response, it doesn’t really matter that only one thread is executing at a given time. This makes multi-threading in Python as a concurrent or asychronous process.
Given all these, we know when to use multi-processes and threads to run tasks with improved latency.
Multiprocess | Multithread |
---|---|
more overhead than threads as opening and closing processes takes more time | fast as they share memory space and efficiently read and write to the same variables |
only for CPU-intensive tasks because of overheads | only for IO-intensive tasks because fo GIL |
E.g. complex calculations | E.g. networking, file read-write, database query |
True Parallel Process | Concurrent Process |
Great detailed explanations are provided by Brendan Fortuner, Thilina Rajapakse, stackoverflow, and testdrive.io 1 & 2.
Examples
Below are two examples of embarassingly parallel tasks, one a CPU-bound job...
def compute_x(x, y=2):
from time import sleep
sleep(0.8)
return (x,y)
list_to_iterate = [1,2,3,4,5,6,7,8,9,10]
sum_all_x = [compute_x(i) for i in list_to_iterate]
and an IO-bound job where it is requesting data from a website.
import requests
wiki_page_urls = [
"https://en.wikipedia.org/wiki/Ocean",
"https://en.wikipedia.org/wiki/Island",
"https://en.wikipedia.org/wiki/this_page_does_not_exist",
"https://en.wikipedia.org/wiki/Shark",
]
def get_wiki_page_existence(wiki_page_url, timeout=10):
response = requests.get(url=wiki_page_url, timeout=timeout)
page_status = "unknown"
if response.status_code == 200:
page_status = "exists"
elif response.status_code == 404:
page_status = "does not exist"
return wiki_page_url + " - " + page_status
Joblib
Joblib makes embarrassingly parallel for loops written in a list comprehension very simple, with the following syntax.
from joblib import Parallel, delayed
variable_name = Parallel(n_jobs=<no_of_jobs>)(
delayed(<func_name>)(<arg1>, <arg2>, <arg3>) for i in <list_to_iterate>)
Parallel
We can run this in parallel with multiprocessing in joblib as this. By default it uses the backend loky
, which is more robust than multiprocessing
, taken from the older multiprocessing.Pool
library. However, it might not always be faster.
from joblib import Parallel, delayed
sum_all_x = Parallel(n_jobs=4)(
delayed(compute_x)(i) for i in list_to_iterate)
Concurrent
However, if this is an I/O intensive task, using threading
to run it asynchronously is preferred.
from joblib import Parallel, delayed
sum_all_x = Parallel(n_jobs=4, backend="threading")(
delayed(compute_x)(i) for i in list_to_iterate)
TQDM
If we want to have a more informative runtime, we can use tqdm
to execute.
from tqdm import tqdm
from joblib import Parallel, delayed
job = Parallel(n_jobs=threads, backend="threading")(
delayed(get_jobs)(p, group_id, project_id, year, mth) for p in tqdm(pipes))
Another promising library is pqdm
which wraps tqdm
and concurrent.futures
together.
# pip install pqdm
from pqdm.processes import pqdm
# If you want threads instead:
# from pqdm.threads import pqdm
args = [1, 2, 3, 4, 5]
# args = range(1,6) would also work
def square(a):
return a*a
result = pqdm(args, square, n_jobs=2)
Concurrent Futures
concurrent.futures
is an in-built library that can easily run multi-processing or threading tasks.
Parallel
from concurrent.futures import ProcessPoolExecutor, as_completed
def multiproc(list_to_iterate, workers=4):
with ProcessPoolExecutor(max_workers=workers) as executor:
futures = []
for i in list_to_iterate:
executor.submit(compute_x, x=i)
result = [future.result() for future in as_completed(futures)]
return result
# alternatively, we can use the map function for a more concise result
# however, it does not allow multiple args to the worker function
def multiproc(list_to_iterate, workers=4):
with ProcessPoolExecutor(max_workers=workers) as executor:
futures = executor.map(compute_x, list_to_iterate)
result = [future for future in futures]
return result
# to do that, we need to use another library to wrap the worker function
# and additional args together
from functools import partial
def multiproc(list_to_iterate, workers=4):
with ProcessPoolExecutor(max_workers=workers) as executor:
wrap = partial(compute_x, y=3)
futures = executor.map(wrap, list_to_iterate)
result = [future for future in futures]
return result
if __name__ == "__main__":
result = multiproc(list_to_iterate)
Concurrent
from concurrent.futures import ThreadPoolExecutor, as_completed
with ThreadPoolExecutor(max_workers=2) as executor:
futures = []
for url in wiki_page_urls:
futures.append(executor.submit(get_wiki_page_existence, wiki_page_url=url))
result = [future.result() for future in as_completed(futures)]
Multiprocessing
The multiprocessing
module is another in-built library that supports both multi-processing multiprocessing.Pool
or threading multiprocessing.pool.ThreadPool
tasks.
Parallel
import multiprocessing as mp
def multiproc(list_to_iterate, workers=2):
pool = mp.Pool(workers)
result = pool.map(compute_x, list_to_iterate)
return result
if __name__ == "__main__":
result = multiproc(list_to_iterate)
Concurrent
import multiprocessing as mp
def multithread(list_to_iterate, workers=2):
pool = mp.pool.ThreadPool(workers)
result = pool.map(get_wiki_page_existence, wiki_page_urls)
return result
if __name__ == "__main__":
result = multithread(list_to_iterate)
Threading Library
The python default threading library provides a simple way to spin off a thread for an I/O bound task.
import threading
threading.Thread(target=<function>).start()
Numba
Numba is one of the unique cases where has true parallel multi-threading, being able to overcome the GIL.