Bienvenidos sean a este post, hoy nos centraremos en hablar sobre los pools para threads y procesos.
Cuando hablamos de pools en este post dijimos que estructuras para contener N objetos (threads, procesos, etc) y cuando se alcanza la capacidad ningun trabajo es asignado al objeto hasta que alguno de los que este trabajando se vuelve disponible, por lo tanto son una gran forma para limitar a los threads y procesos que pueden ejecutarse al mismo tiempo evitando que el sistema se quede sin recursos disponibles o que afecte la performance del equipo debido al continuo cambio de contexto, como siempre vamos a comenzar con nuestro ejemplo para entender este tema, para ello vamos a crear un archivo llamado piscina_th.py y le agregaremos el siguiente codigo:
piscina_th.py
from concurrent.futures import ThreadPoolExecutor, as_completed
from random import randint
import threading
def correr(nombre):
valor = randint(0, 10**2)
tnombre = threading.current_thread().name
print(f'Hola, soy {nombre} ({tnombre}) y mi valor es {valor}')
return (nombre, valor)
with ThreadPoolExecutor(max_workers=3) as ejecutor:
futuros = [
ejecutor.submit(correr,f'T{nombre}') for nombre in range(5)
]
for futuro in as_completed(futuros):
nombre, valor = futuro.result()
print(f'Thread {nombre} returned {valor}')
Para esta ocasion vamos a usar el modulo concurrent.futures es una interfaz de alto nivel la cual nos provee la capacidad de poder correr distintas llamadas en paralelo de forma asincronica, de este modulo utilizaremos dos objetos:
- ThreadPoolExecutor, una clase para ejecutar una pool de threads, tiene un parametro llamado max_workers que nos permite delimitar la cantidad de threads que se ejecutaran
- as_completed, este nos devolvera un iterador de los elementos a futuro a medida que se completan, ya hablaremos enseguida de esto
Despues tenemos a randint para crear numeros al azar y threading para manejar nuestros threads, lo siguiente sera definir una funcion llamada correr y esta recibira un atributo llamado nombre, dentro de la funcion generamos un numero al azar por medio de randint, lo siguiente sera crear un objeto llamado tnombre que almacenara el nombre del thread actual, despues mostraremos una cadena donde pasaremos el nombre informado como argumento, seguido del nombre del thread y por ultimo el valor, con esto realizado pasamos a devolver el nombre y el valor, pasemos a la siguiente parte.
En este tenemos un with donde ejecutara a ThreadPoolExecutor con un maximo de 3 threads y lo asignaremos al objeto ejecutor, dentro de este with crearemos una lista donde a traves de submit llamaremos a la funcion antes definida y le pasaremos el nombre que sera T seguido del numero del range en el bucle for que tenemos, con esta lista generada podemos pasar a la siguiente parte que es un bucle for donde por medio de as_completed generaremos los objetos iterables de futuros y lo pasaremos a un objeto llamado futuro, aqui asignaremos los dos valores que retorna a nombre y valor respectivamente y por ultimo mostraremos un texto donde indicara el nombre del thread y el valor asociado, probemos de ejecutar para ver su salida:
tinchicus@dbn001vrt:~/lenguajes/python$ python3 piscina_th.py
Hola, soy T0 (ThreadPoolExecutor-0_0) y mi valor es 33
Hola, soy T1 (ThreadPoolExecutor-0_0) y mi valor es 63
Hola, soy T2 (ThreadPoolExecutor-0_0) y mi valor es 52
Thread T1 returned 63
Thread T2 returned 52
Thread T0 returned 33
Hola, soy T3 (ThreadPoolExecutor-0_0) y mi valor es 1
Hola, soy T4 (ThreadPoolExecutor-0_0) y mi valor es 33
Thread T3 returned 1
Thread T4 returned 33
tinchicus@dbn001vrt:~/lenguajes/python$
Por deduccion sabemos que el bucle genera cinco threads (t0…t4) pero limitamos al ThreadPoolExecutor para que trabaje con 3, si observan nos devuelve los tres mensajes en correr de t0 a t2, luego muestra los tres mismos por medio del bucle for que existe en la ultima parte del codigo, lo siguiente que vemos muestra los mensaje de los ultimos threads y los ultimos mensajes del for, como podran observar se cumplio lo solicitado es decir que una vez lleno el pool espera a que se vacie o se creen espacios disponibles y continua con la ejecucion, en otras circunstancias esto puede ser invisible para el usuario y solo ver el resultado final, pasemos al siguiente tema.
En este caso vamos a hacer lo mismo pero con los procesos, para ello vamos a crear otro archivo llamado piscina_pr.py y le agregaremos el siguiente codigo:
piscina_pr.py
from concurrent.futures import ProcessPoolExecutor, as_completed
from random import randint
from time import sleep
def correr(nombre):
sleep(.05)
valor = randint(0, 10**2)
print(f'Hola, soy {nombre} y mi valor es {valor}')
return (nombre, valor)
with ProcessPoolExecutor(max_workers=3) as ejecutor:
futuros = [
ejecutor.submit(correr,f'P{nombre}') for nombre in range(5)
]
for futuro in as_completed(futuros):
nombre, valor = futuro.result()
print(f'Proceso {nombre} devolvio {valor}')
Es muy parecido a lo visto en el ejemplo anterior pero en lugar de ThreadPoolExecutor importamos a ProcessPoolExecutor y en lugar de importar a threading importamos a sleep, definimos una funcion correr nuevamente pero en este caso agregamos un sleep para hacer una demora, volvemos a obtener un valor al azar, luego mostramos un mensaje y por ultimo los seguimos retornando, volvemos a usar el mismo with con el limitador pero con la nueva clase, volvemos a crear una lista llamada futuros de la misma manera que antes y volvemos a usar un mismo bucle for para mostrar otro mensaje de procesado, vamos a probarlo pero con un video para ver la particularidad.
A diferencia del caso anterior siempre que ejecutamos el programa veremos que los datos se procesan de forma distinta porque ahora no espera a que se llene el pool sino que cuando procesó alguno y encontró un lugar disponible ejecuta el procesamiento sin esperar a que se vacie completamente el pool, esta version mas agresiva es mejor en performance que la de thread pero para evitar posibles errores se agrega un sleep para generar una pausa para poder procesarlo, es decir que cada uno tiene sus pros y contras a pesar de trabajar de una forma similar y cada uno debe adoptarse dependiendo de nuestras necesidades.
En resumen, hoy hemos visto que son los pool de threads y procesos, cual es el modulo encargado de manejarlos, hemos visto que con dos clases distintas los manejamos pero de manera muy similares, ambas poseen un limitador para poder establecer cuantos objetos (threads o procesos) pueden trabajar los pools, tambien hemos visto que a pesar de manejarse de la misma forma pero sus conductas son muy distintas, espero les haya sido util sigueme en tumblr, Twitter o Facebook para recibir una notificacion cada vez que subo un nuevo post en este blog, nos vemos en el proximo post.


Donación
Es para mantenimento del sitio, gracias!
$1.50
