Concurrence en Python

Threads &
Synchronisation

threading, locks, Queue, ThreadPoolExecutor — et pourquoi le GIL change tout.

Processus vs Threads

Un processus est un programme en cours d'exécution avec son propre espace mémoire. Un thread est une unité d'exécution à l'intérieur d'un processus — les threads d'un même processus partagent la même mémoire.

ProcessusThread
MémoireIsolée (copie)Partagée
CréationLente (~50 ms)Rapide (~1 ms)
CommunicationIPC (pipe, socket…)Variables partagées
Crash isoléOuiNon — plante tout
GIL PythonContourné ✓Bloqué ✗ (CPU)
Idéal pourCalcul intensif (CPU)I/O (réseau, fichiers)
ℹ️

Les threads partagent la mémoire — c'est leur force (communication facile) et leur danger (modifications simultanées non contrôlées). Toute variable partagée entre threads doit être protégée.

Cycle de vie d'un thread

Création
t = Thread(target=fn) — le thread est créé mais pas encore démarré
Démarrage
t.start() — le thread entre en file d'attente du scheduler OS
Exécution
Le thread exécute target(*args). Peut être interrompu à tout moment par le scheduler.
Bloqué
En attente d'un lock, d'une I/O, d'un sleep — le scheduler passe à un autre thread
Terminé
La fonction target a retourné. t.join() attend cette étape.

Le GIL — Global Interpreter Lock

Le GIL est un verrou interne à CPython qui n'autorise qu'un seul thread à exécuter du bytecode Python à la fois. Il protège les structures internes de l'interpréteur, mais empêche le vrai parallélisme CPU.

Impact concret du GIL

Tâche CPU-bound — 2 threads (pas de gain)
Thread 1
Thread 2
→ séquentiels malgré 2 threads : pas plus rapide qu'un seul
Tâche I/O-bound — 2 threads (gain réel)
Thread 1
Thread 2
→ pendant l'attente I/O (GIL libéré), l'autre thread tourne
⚠️

Le GIL est propre à CPython (l'implémentation standard). Jython et IronPython n'ont pas de GIL. Python 3.13 introduit un mode expérimental "free-threaded" sans GIL.

GIL libéré automatiquement pendant…
# I/O système (réseau, fichiers, stdin)
response = requests.get("https://...")  # ← GIL libéré
data = file.read()                        # ← GIL libéré
time.sleep(1)                            # ← GIL libéré

# Extensions C qui le libèrent explicitement
numpy_result = np.dot(a, b)  # NumPy libère le GIL

# Calcul pur Python → GIL NON libéré
total = sum(range(10_000_000))  # bloque les autres threads
💡

Règle pratique : threads Python = idéal pour I/O (requêtes HTTP, lecture fichiers, BDD). Pour du calcul CPU (traitement d'image, ML, maths), utiliser multiprocessing ou numpy.

Quand utiliser quoi ?

threading
module standard
I/O réseau (scraping, API, SSH)
Lecture/écriture de fichiers
Interface graphique responsive
Tâches de fond légères
✗ Calcul CPU intensif
multiprocessing
module standard
Calcul numérique (sans NumPy)
Traitement d'images / vidéo
Parallélisme CPU réel
Isolation mémoire souhaitée
✗ Overhead de création élevé
asyncio
Python 3.4+
Milliers de connexions réseau
Serveurs HTTP (FastAPI, aiohttp)
Un seul thread — coopératif
async/await
✗ Bibliothèques non-async bloquantes

Créer et démarrer un thread

threading — bases
import threading
import time

def telecharger(url: str, duree: float) -> None:
    """Simule un téléchargement."""
    print(f"[{threading.current_thread().name}] Début {url}")
    time.sleep(duree)            # GIL libéré ici
    print(f"[{threading.current_thread().name}] Fin {url}")

# Créer les threads
t1 = threading.Thread(
    target=telecharger,
    args=("https://site-a.com", 2),
    name="T-SiteA"
)
t2 = threading.Thread(
    target=telecharger,
    args=("https://site-b.com", 3),
    name="T-SiteB"
)

# Démarrer (non bloquant)
t1.start()
t2.start()

# Attendre la fin des deux
t1.join()
t2.join()

print("Tous les téléchargements terminés")
# Durée totale ≈ 3s (pas 5s) — parallélisme I/O

Paramètres de Thread()

ParamètreTypeRôle
targetcallableFonction à exécuter
argstupleArguments positionnels
kwargsdictArguments nommés
namestrNom (debug, logs)
daemonboolThread daemon (voir §07)
Infos sur le thread courant
# Dans n'importe quelle fonction
t = threading.current_thread()
print(t.name)      # "T-SiteA"
print(t.ident)     # ID système
print(t.is_alive()) # True / False

# Lister tous les threads actifs
for t in threading.enumerate():
    print(t.name, t.is_alive())
⚠️

join() sans argument attend indéfiniment. Passer un timeout : t.join(timeout=5) puis vérifier t.is_alive() pour détecter un blocage.

Thread par héritage

Pour encapsuler état et comportement, on peut hériter de threading.Thread et surcharger la méthode run().

Classe Thread personnalisée
import threading
import time
import requests

class WorkerThread(threading.Thread):
    def __init__(self, url: str) -> None:
        # Toujours appeler super().__init__()
        super().__init__(name=f"Worker-{url[:20]}")
        self.url     = url
        self.resultat = None
        self.erreur   = None

    def run(self) -> None:
        """Appelée par start() — NE PAS appeler directement."""
        try:
            resp = requests.get(self.url, timeout=10)
            self.resultat = resp.json()
        except Exception as e:
            self.erreur = e

# Utilisation
urls = [
    "https://api.example.com/users",
    "https://api.example.com/products",
]
workers = [WorkerThread(url) for url in urls]

for w in workers: w.start()
for w in workers: w.join()

for w in workers:
    if w.erreur:
        print(f"Erreur {w.url}: {w.erreur}")
    else:
        print(f"OK {w.url}: {w.resultat}")
💡

run() vs start() — ne jamais appeler run() directement : cela exécute la méthode dans le thread courant, sans créer de nouveau thread. Toujours utiliser start().

ℹ️

L'héritage est utile quand le thread a un état propre (stocker un résultat, un statut) ou des méthodes auxiliaires. Pour les cas simples, préférer Thread(target=fn) — plus concis.

Thread avec arrêt propre
class MoniteurThread(threading.Thread):
    def __init__(self) -> None:
        super().__init__(daemon=True)
        self._stop_event = threading.Event()

    def stop(self) -> None:
        self._stop_event.set()

    def run(self) -> None:
        while not self._stop_event.is_set():
            # Travail périodique
            self._stop_event.wait(5)  # sleep interruptible

Threads daemon

Un thread daemon est tué automatiquement quand le programme principal se termine. Les threads non-daemon bloquent la sortie du programme jusqu'à leur fin.

Différence daemon / non-daemon
import threading, time

def tache_longue():
    time.sleep(10)
    print("Fin de la tâche longue")

# Thread NON-daemon (défaut)
# → programme attend 10s même si main() est terminé
t1 = threading.Thread(target=tache_longue)

# Thread DAEMON
# → tué immédiatement à la fin du programme
t2 = threading.Thread(target=tache_longue, daemon=True)

# Ou après création :
t2.daemon = True  # avant start() uniquement

t2.start()
print("Main terminé — le daemon sera tué")
# Programme se ferme ici sans attendre t2
Non-daemon (défaut)Daemon
Fin du programmeAttend la fin du threadTue le thread
RisqueProgramme "pendu"Données non sauvegardées
Usage typiqueTâches critiquesMoniteurs, heartbeats, logs
⚠️

Un thread daemon tué brutalement ne ferme pas ses fichiers, ne flush pas ses buffers, n'exécute pas ses blocs finally. Ne jamais l'utiliser pour des opérations qui nécessitent un nettoyage propre.

Race conditions

Une race condition survient quand deux threads accèdent simultanément à une ressource partagée et que le résultat dépend de l'ordre d'exécution — ce qui n'est jamais garanti.

Exemple : compteur partagé — DANGEREUX
import threading

compteur = 0

def incrementer(n: int) -> None:
    global compteur
    for _ in range(n):
        compteur += 1  # ← PAS atomique !

t1 = threading.Thread(target=incrementer, args=(100_000,))
t2 = threading.Thread(target=incrementer, args=(100_000,))
t1.start(); t2.start()
t1.join();  t2.join()

print(compteur)
# Attendu : 200 000
# Obtenu   : 173 842 (valeur variable !)
Pourquoi compteur += 1 n'est pas atomique
T1 : LOAD compteur (lit 42)
T2 : LOAD compteur (lit 42 aussi)
T1 : ADD 1 → 43
T2 : ADD 1 → 43
T1 : STORE 43
T2 : STORE 43
→ Une incrémentation perdue ! (attendu 44, obtenu 43)

Opérations qui semblent atomiques mais ne le sont pas

Pièges courants
# ✗ Lecture-modification-écriture
x += 1
liste.append(element)  # OK en Python (GIL)

# ✗ Test puis action (check-then-act)
if cle not in dico:    # Thread 2 peut s'insérer ici
    dico[cle] = valeur  # entre le test et l'écriture

# ✗ Séquence de plusieurs opérations
total = lire_solde()    # Thread 2 peut modifier
nouveau = total - 100  # entre ces deux lignes
ecrire_solde(nouveau)
💡

En Python, certaines opérations sont atomiques grâce au GIL : list.append(), dict[key] = val, x = y (affectation simple). Mais toute séquence de plusieurs opérations ne l'est pas.

Lock & RLock

Un Lock (verrou) garantit qu'un seul thread peut exécuter une section critique à la fois. RLock est un verrou réentrant — un même thread peut l'acquérir plusieurs fois.

Lock — compteur thread-safe
import threading

class CompteurSafe:
    def __init__(self) -> None:
        self._valeur = 0
        self._lock   = threading.Lock()

    def incrementer(self) -> None:
        with self._lock:         # acquire() + release() auto
            self._valeur += 1   # section critique

    def valeur(self) -> int:
        with self._lock:
            return self._valeur  # lecture protégée aussi

# Test
compteur = CompteurSafe()
threads = [
    threading.Thread(target=lambda: [compteur.incrementer()
                                       for _ in range(100_000)])
    for _ in range(2)
]
for t in threads: t.start()
for t in threads: t.join()

print(compteur.valeur())  # Toujours 200 000 ✓
RLock — verrou réentrant
# Lock ordinaire → deadlock si même thread acquiert 2x
lock = threading.Lock()
lock.acquire()
lock.acquire()  # ← DEADLOCK ! attend éternellement

# RLock → même thread peut l'acquérir N fois
rlock = threading.RLock()

def methode_a(self):
    with self._rlock:
        self.methode_b()  # OK avec RLock

def methode_b(self):
    with self._rlock:   # même thread → pas de blocage
        ...
Lock avec timeout
lock = threading.Lock()

# Tentative d'acquisition avec timeout
acquis = lock.acquire(timeout=2.0)
if acquis:
    try:
        # section critique
        ...
    finally:
        lock.release()
else:
    print("Impossible d'acquérir le lock")
⚠️

Toujours utiliser with lock: plutôt que acquire()/release() manuels — un release() oublié (après une exception) causerait un deadlock permanent.

Semaphore

Un Semaphore est un compteur qui limite le nombre de threads pouvant accéder simultanément à une ressource. Un Lock est un cas particulier (Semaphore de valeur 1).

Semaphore — pool de connexions DB
import threading, time

# Autoriser au plus 3 accès simultanés à la BDD
sem_db = threading.Semaphore(3)

def requete_db(worker_id: int) -> None:
    print(f"Worker {worker_id} : en attente...")
    with sem_db:  # attend si 3 threads dedans
        print(f"Worker {worker_id} : connexion BDD")
        time.sleep(1)  # simule requête
        print(f"Worker {worker_id} : terminé")

# 10 workers → max 3 en BDD à la fois
threads = [
    threading.Thread(target=requete_db, args=(i,))
    for i in range(10)
]
for t in threads: t.start()
for t in threads: t.join()
ℹ️

BoundedSemaphore lève une exception si release() est appelé plus de fois que acquire() — utile pour détecter les bugs de synchronisation.

Rate limiter avec Semaphore
import threading, time

class RateLimiter:
    """Limite à N appels par seconde."""
    def __init__(self, max_par_sec: int):
        self.sem = threading.BoundedSemaphore(max_par_sec)
        self.max = max_par_sec

    def appel(self, fn, *args):
        self.sem.acquire()
        try:
            return fn(*args)
        finally:
            threading.Timer(1.0, self.sem.release).start()

Event & Condition

Event — signal simple

Un Event est un drapeau booléen partagé. Les threads peuvent attendre qu'il soit levé avec wait().

threading.Event
pret = threading.Event()

def chargement():
    time.sleep(2)         # charge les données
    pret.set()             # signal "c'est prêt"

def traitement():
    print("En attente des données...")
    pret.wait()            # bloque jusqu'à set()
    print("Traitement en cours")

threading.Thread(target=chargement).start()
threading.Thread(target=traitement).start()

# event.clear()  → remet à False
# event.is_set() → vérifie sans bloquer
# event.wait(timeout=5) → attente limitée

Condition — notification ciblée

Une Condition combine un Lock et la capacité de notifier des threads en attente — plus précise qu'un Event.

threading.Condition
cond = threading.Condition()
donnees = []

def producteur():
    for i in range(5):
        with cond:
            donnees.append(i)
            cond.notify()  # réveille 1 consommateur
        time.sleep(0.5)

def consommateur():
    while True:
        with cond:
            # wait() libère le lock et attend notify()
            cond.wait_for(lambda: len(donnees) > 0)
            item = donnees.pop(0)
        print(f"Consommé : {item}")

Queue — Producteur / Consommateur

queue.Queue est la solution idiomatique pour la communication inter-threads. Elle est thread-safe de facto — pas besoin de Lock explicite.

🏭 Producteur
Thread 1..N
──▶
Queue
maxsize=10
[ item1 | item2 | item3 ]
──▶
⚙️ Consommateur
Thread 1..N
Pattern Producteur / Consommateur
import threading
import queue
import time

# Queue bornée : bloque le prod si pleine
q: queue.Queue = queue.Queue(maxsize=10)
POISON_PILL = None  # signal d'arrêt

def producteur(items: list) -> None:
    for item in items:
        q.put(item)          # bloque si queue pleine
        print(f"Produit : {item}")
    q.put(POISON_PILL)       # signal de fin

def consommateur() -> None:
    while True:
        item = q.get()       # bloque si queue vide
        if item is POISON_PILL:
            q.task_done()
            break
        time.sleep(0.1)    # traitement
        print(f"Consommé : {item}")
        q.task_done()        # obligatoire après get()

prod = threading.Thread(
    target=producteur, args=(["a", "b", "c", "d"],)
)
cons = threading.Thread(target=consommateur)

prod.start(); cons.start()
prod.join();  cons.join()

q.join()  # attend que task_done() soit appelé pour chaque item

API Queue essentielle

MéthodeComportement
q.put(item)Ajoute — bloque si pleine (maxsize)
q.put_nowait(item)Ajoute — lève Full si pleine
q.get()Retire — bloque si vide
q.get_nowait()Retire — lève Empty si vide
q.get(timeout=5)Retire — lève Empty après 5s
q.task_done()Signale qu'un item est traité
q.join()Attend que tous les items soient traités
q.qsize()Nombre d'items (approximatif)
q.empty()True si vide (non fiable entre threads)
ℹ️

Pour N consommateurs, envoyer N POISON_PILL — un par consommateur. Chaque consommateur retire une pilule et s'arrête.

ThreadPoolExecutor

concurrent.futures.ThreadPoolExecutor est l'API moderne et de haut niveau pour gérer un pool de threads. Elle gère automatiquement la création, la réutilisation et la récupération des erreurs.

ThreadPoolExecutor — cas d'usage
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests

urls = [
    "https://api.github.com/users/python",
    "https://api.github.com/users/django",
    "https://api.github.com/users/flask",
]

def fetch(url: str) -> dict:
    resp = requests.get(url, timeout=10)
    resp.raise_for_status()
    return resp.json()

── map() : simple, résultats dans l'ordre ──
with ThreadPoolExecutor(max_workers=5) as pool:
    resultats = list(pool.map(fetch, urls))
    # résultats[i] correspond à urls[i]

── submit() : plus de contrôle, gestion erreurs ──
with ThreadPoolExecutor(max_workers=5) as pool:
    futures = {pool.submit(fetch, url): url for url in urls}

    for future in as_completed(futures):
        url = futures[future]
        try:
            data = future.result()
            print(f"✓ {url}: {data['login']}")
        except Exception as e:
            print(f"✗ {url}: {e}")

Future — objet résultat différé

MéthodeRôle
f.result(timeout=N)Attend et retourne le résultat (ou lève l'exception)
f.done()True si terminé (succès ou erreur)
f.running()True si en cours d'exécution
f.cancel()Annule si pas encore démarré
f.exception()Retourne l'exception sans la relancer
Choisir max_workers
import os

# I/O-bound : plus de workers que de CPUs
# (règle empirique : 4 × nb_CPUs)
nb_io = (os.cpu_count() or 1) * 4

# CPU-bound : jamais > nb de CPUs
# (préférer multiprocessing dans ce cas)
nb_cpu = os.cpu_count() or 1

# Défaut Python 3.8+ : min(32, cpu_count + 4)

multiprocessing — contourner le GIL

Pour les tâches CPU-bound, utiliser multiprocessing qui crée de vrais processus parallèles — chacun avec son propre GIL.

ProcessPoolExecutor — calcul parallèle
from concurrent.futures import ProcessPoolExecutor
import math

def est_premier(n: int) -> bool:
    """Calcul CPU intensif."""
    if n < 2: return False
    for i in range(2, int(math.sqrt(n)) + 1):
        if n % i == 0: return False
    return True

nombres = range(10_000_000, 10_000_100)

# ThreadPoolExecutor → séquentiel (GIL)
# ProcessPoolExecutor → vrai parallélisme
with ProcessPoolExecutor() as pool:
    resultats = list(pool.map(est_premier, nombres, chunksize=10))

premiers = [n for n, p in zip(nombres, resultats) if p]
print(premiers)
⚠️

Sur Windows, le code multiprocessing doit être dans un bloc if __name__ == "__main__": pour éviter la récursion infinie lors du spawn des processus.

ThreadPoolExecutorProcessPoolExecutor
GILPartagé → bloque CPUUn par processus → libre
MémoirePartagéeCopiée (pickle)
OverheadFaibleÉlevé (fork/spawn)
I/O-bound✓ IdéalOverkill
CPU-bound✗ Inutile✓ Idéal

asyncio — concurrence coopérative

asyncio réalise de la concurrence dans un seul thread via un event loop. Les coroutines cèdent le contrôle volontairement avec await — pas de préemption, pas de race condition.

asyncio — structure de base
import asyncio
import aiohttp

async def fetch(session, url: str) -> dict:
    # await cède le contrôle pendant l'I/O
    async with session.get(url) as resp:
        return await resp.json()

async def main() -> None:
    urls = ["https://..."] * 10
    async with aiohttp.ClientSession() as session:
        # Lance toutes les coroutines "simultanément"
        tasks = [fetch(session, url) for url in urls]
        resultats = await asyncio.gather(*tasks)
    return resultats

# Point d'entrée
asyncio.run(main())
ℹ️

asyncio est la solution la plus efficace pour des milliers de connexions réseau simultanées (serveurs, scrapers massifs). Pour 10–50 tâches I/O, ThreadPoolExecutor est souvent plus simple à mettre en œuvre.

Conceptthreadingasyncio
ModèlePréemptif (OS)Coopératif (event loop)
ThreadsN threads réels1 thread
Race conditionsPossiblesTrès rares
ScalabilitéCentainesMilliers+
SyntaxeStandardasync/await
Libs requisesStandardasync-compatibles

Pièges classiques

PiègeSymptômeSolution
Deadlock Programme bloqué indéfiniment — deux threads attendent chacun le lock de l'autre Toujours acquérir les locks dans le même ordre. Utiliser timeout. Préférer les structures thread-safe (Queue).
Race condition Résultats incohérents, aléatoires, différents entre exécutions Protéger toute variable partagée avec Lock. Préférer Queue pour la communication.
Thread oublié Programme ne se termine pas (thread non-daemon bloqué) Toujours join() les threads, ou les passer en daemon=True s'ils sont auxiliaires.
Exception silencieuse Thread planté silencieusement, pas de stack trace visible Capturer les exceptions dans run() et les stocker. Vérifier future.exception() avec Executor.
Variable partagée en closure Tous les threads reçoivent la même valeur (la dernière) Capturer la valeur par défaut : lambda i=i: ... ou utiliser args=(i,).
GIL ignoré Threads CPU-bound plus lents qu'un seul thread (overhead) Utiliser multiprocessing ou des bibliothèques qui libèrent le GIL (NumPy).
Piège closure — variable capturée
# ✗ Tous les threads reçoivent i=4
threads = []
for i in range(5):
    t = threading.Thread(target=lambda: print(i))
    threads.append(t)

# ✓ Capturer i par valeur
for i in range(5):
    t = threading.Thread(target=lambda n=i: print(n))
    # ou : target=print, args=(i,)
Deadlock — exemple et prévention
# ✗ Deadlock : ordre différent d'acquisition
def thread_a():
    with lock1:   # acquiert lock1
        with lock2: ...  # attend lock2

def thread_b():
    with lock2:   # acquiert lock2
        with lock1: ...  # attend lock1 → DEADLOCK

# ✓ Même ordre partout
def thread_a():
    with lock1:
        with lock2: ...

def thread_b():
    with lock1:   # même ordre que thread_a
        with lock2: ...

Cheat sheet

threading — Thread

Thread(target, args)Créer un thread
t.start()Démarrer
t.join(timeout)Attendre la fin
t.is_alive()En cours ?
t.daemon = TrueThread daemon
current_thread()Thread courant
enumerate()Tous les threads actifs

Synchronisation

Lock()Verrou exclusif
RLock()Verrou réentrant
Semaphore(n)N accès simultanés
Event()Signal booléen
Condition()Attente conditionnelle
with lock:Acquire + release auto
event.wait()Bloque jusqu'à set()

Queue

Queue(maxsize)File thread-safe
q.put(item)Ajouter (bloque si pleine)
q.get()Retirer (bloque si vide)
q.task_done()Marquer item traité
q.join()Attendre tous les items
POISON_PILL = NoneSignal d'arrêt

concurrent.futures

ThreadPoolExecutor(n)Pool de threads
ProcessPoolExecutor()Pool de processus (CPU)
pool.map(fn, iterable)Map parallèle (résultats ordonnés)
pool.submit(fn, *args)Soumettre → Future
as_completed(futures)Itérer au fil des fins
future.result()Résultat (ou relance exception)