Bienvenidos sean a este post, como dijimos anteriormente las corutinas permiten una especie de multiproceso colaborativo, tambien comentamos como cada proceso equivale a una corutina y un par yield-resume cambia el control de un nuevo proceso a otro, sin embargo a diferencia de las corutinas de multiproceso las corutinas comunes son no preventivas esto significa que mientras una corutina esta corriendo no puede ser detenida desde afuera, la ejecucion solo se suspende cuando se solicita explicitamente a traves de una llamada a yield, para muchas aplicaciones no es un problema sino todo lo contrario, nuestra unica preocupacion es asegurarnos que una corutina produzca solo cuando es por fuera de la region critica.

Anuncios

Sin embargo, con el multiprocesamiento no preventivo cuando cualquier proceso llama a una operacion de bloqueo todo el programa se bloquea hasta que se complete la operacion pero para las mayoria de las aplicaciones esta es una condicion inaceptable, la cual lleva a muchos programadores a ignorar corutinas como una real alternativa al multiproceso convencional, pero eso es harina de otro costal y vamos a asumir el siguiente ejemplo: queremos descargar multiples archivos remotos por medio de HTTP, para poder ejecutar este ejemplo debemos conocer como descargar un archivo de esta forma, para este caso usaremos la libreria LuaSocket desarrollado por Diego Nehab, para descargar un archivo debemos abrir una conexion al sitio, enviar una solicitud del archivo, recibir el archivo (en bloques) y cerrar la conexion, en Lua podemos realizar esta tarea como sigue:

socket = require("socket")

De esta forma cargaremos esta libreria, luego definimos el host y el archivo que deseamos descargar, tomemos el siguiente ejemplo:

host = "www.w3.org"
file = "/TR/REC-html32.htm"

Despues abriremos una conexion TCP al puerto 80 del sitio:

c = assert(socket.connect(host,80))

Esta operacion devuelve un objeto de conexion, la cual usaremos para enviar la solicitud:

c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")

Lo proximo sera leer el archivo en bloques de 1 KB, escribiendo cada bloque a la salida estandard:

while true do
	local s, estado, parcial = c:receive(2^10)
	io.write(s or parcial)
	if estado == "closed" then break end
end

En este caso hablaremos primero del metodo receive la cual devuelve una cadena cuando lo lee o un valor nil en caso de un error, en el caso posterior tambien devuelve un codigo de error (estado) y que lee hasta el error (parcial), por ultimo cuando el host cierra la conexion imprimimos el remanente de la entrada y cierra el bucle de recepcion, una vez finalizada la descarga cerramos la conexion:

c:close()

Con esto completamos el concepto de descargar un solo archivo pero todavia tenemos el problema original de la descarga de multiples archivos, una solucion podria ser el enfoque secuencial donde comenzamos con la lectura de un archivo solo cuando finaliza el anterior, este proceso resulta demasiado lento porque cuando leemos un archivo remoto desperdicia mucho tiempo esperando por la recepcion de la informacion, en realidad la perdida de tiempo es en la llamada de receive, es decir que el programa podria correr mas rapidamente si descargamos todos los archivos concurrentemente, mientra una conexion no tiene datos disponibles el programa puede leer de otra conexion.

Anuncios

Para este caso las corutinas ofrecen una manera conveniente para estructurar estas descargas simultaneas, creamos un nuevo proceso por cada tarea de descarga, cuando una tarea no tiene informacion disponible, produce un control para un simple despachador, el cual sera el encargado de invocar un nuevo proceso. Para reescribir el programa anterior con corutinas deberemos modificar el codigo anterior como una funcion, veamos el codigo:

function descarga(host, file)
	local c = assert(socket.connect(host, 80))
	local contar = 0
	c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
	while true do
		local s, estado, parcial = recibir(c)
		contar = contar + #(s or parcial)
		if estado == "closed" then break end
	end
	c:close()
	print(file, contar)
end

Como no estamos interesado en el contenido del archivo remoto, la funcion cuenta e imprime el tamaño del archivo, en lugar de escribir el archivo a la salida estandar, si mostraramos muchos archivos la salida mezclaria todos los archivos), para este codigo usaremos una nueva funcion (recibir) para recibir la informacion desde la conexion, en el enfoque secuencial su codigo seria algo asi:

function recibir (conexion)
	return conexion:receive(2^10)
end

Para la implementacion concurrente esta funcion debe recibir informacion sin bloqueos, en su lugar si no hay suficiente informacion disponible deberemos usar a yield, para esto debemos modificar el codigo de la siguiente forma:

function recibir(conexion)
	conexion:settimeout(0)
	local s, estado, parcial = conexion:receive(2^10)
	if estado == "timeout" then
		coroutine.yield(conexion)
	end
	return s or parcial, estado
end
Anuncios

La llamada a settimeout(0) hace que cualquier operacion sobre la conexion sea una operacion no bloqueadora, cuando el estado de la operacion es «timeout» significa que la operacion regreso sin completarlo, en este caso el proceso produce (yields), el argumento no falso pasado a las señales del yield y de ahi al dispatcher que es el proceso que ejecuta la tarea, inclusive en un caso de timeout la conexion devuelve lo que leyo hasta el timeout, para lo cual esta la variable parcial, veamos a continuacion al dispatcher y un codigo adicional:

threads = {}

function tomar(host, file)
	local co = coroutine.create(function()
		descarga(host, file)
	end)
	table.insert(threads, co)
end

function despachar()
	local i = 1
	while true do
		if threads[i] == nil then
			if threads[i] == nil then break end
			i = 1
		end
		local estado, res = coroutine.resume(threads[i])
		if not res then
			table.remove(threads, i)
		else
			i = i + 1
		end
	end
end

La tabla threads mantiene una lista de todos los procesos activos para el dispatcher, la funcion tomar asegura que cada descarga corra en un proceso individual, el despachador en si mismo es principalmente un bucle que va a traves de todos los procesos, reanudandolos uno por uno, y tambien debe removerlos de la lista los procesos que han finalizados sus tareas, y el bucle se detiene cuando no hay mas procesos que correr, finalmente el programa principal crea los procesos que necesita y llama al despachador (despachar), supongamos que tenemos cuatro archivos para descargar desde el sitio de W3C, el programa principal podria ser algo como esto:

host = "www.w3.org"

tomar(host, "/TR/html401/html40.txt")
tomar(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
tomar(host, "/TR/REC-html32.html")
tomar(host, "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")

despachar()
Anuncios

Si lo hacemos asi puede tomarnos alrededor de 6 segundos descargarlo con las corutinas, en cambio con la implementacion secuencial puede tardar unos 15 segundos, igualmente esto siempre va a depender de la velocidad de internet, al margen de la velocidad esta implementacion esta lejos de funcionar correctamente porque todo funcionara bien mientras algun proceso tenga algo para leer, sin embargo cuando ningun proceso tenga algo que leer el despachador hace una espera pesada yendo de proceso a proceso solo para chequear que aun no tiene informacion dando como resultado que esta implementacion de corutinas usa para este caso 30 veces mas de CPU que la solucion secuencial, estudiemos el siguiente listado:

function despachar()
	local i = 1
	local conexiones = {}
	while true do
		if threads[i] == nil then 
			if threads[i] == nil then break end
			i = 1
			conexiones = {}
		end
		local estado, res = coroutine.resume(threads[i])
		if not res then
			table.remove(threads, i)
		else
			i = i + 1
			conexiones[#conexiones + 1] = res
			if #conexiones == #threads then
				socket.select(conexiones)
			end
		end
	end
end

Para evitar esto podemos utilizar select, como se ve en el codigo superior, del socket, esto permite a un programa bloquear mientras espera por un cambio de estado, en un grupo de sockets, donde podemos ver que el cambio en nuestra implementacion es muy chico, tenemos que cambiar el despachador solamente, a lo largo del bucle este nuevo despachador las conexiones con timeout en la tabla conexiones, recordemos que recibir pasa tales conexiones a yield asi resume los devuelve, si todas las conexion estan en timeout el despachador llama a select para esperar por cualquiera de estas conexiones para cambiar el estado, esta implementacion final corre tan rapida como la primera implementacion con corutinas, no genera espera pesadas y solo usa un poco mas de CPU que la implementacion secuencial, veamos el codigo final:

socket = require("socket")

function descarga(host, file)
        local c = assert(socket.connect(host, 80))
        local contar = 0
        c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
        while true do
                local s, estado, parcial = recibir(c)
                contar = contar + #(s or parcial)
                if estado == "closed" then break end
        end
        c:close()
        print(file, contar)
end

function recibir(conexion)
        conexion:settimeout(0)
        local s, estado, parcial = conexion:receive(2^10)
        if estado == "timeout" then
                coroutine.yield(conexion)
        end
        return s or parcial, estado
end

threads = {}

function tomar(host, file)
        local co = coroutine.create(function()
                descarga(host, file)
        end)
        table.insert(threads, co)
end

function despachar()
        local i = 1
        local conexiones = {}
        while true do
                if threads[i] == nil then
                        if threads[i] == nil then break end
                        i = 1
                        conexiones = {}
                end
                local estado, res = coroutine.resume(threads[i])
                if not res then
                        table.remove(threads, i)
                else
                        i = i + 1
                        conexiones[#conexiones + 1] = res
                        if #conexiones == #threads then
                                socket.select(conexiones)
                        end
                end
        end
end
Nota: Para que me funcione tuve que utilizar el siguiente comando en DEBIAN:

tinchicus@dbn001dsk:~/lenguaje/lua$ sudo apt-get install lua-socket

Porque de lo contrario no encuentra la libreria a la hora de compilarlo.
Anuncios

En resumen, hoy hemos visto como manejar multiprocesamiento no preventivo, un ejemplo que fuimos desarrollando poco a poco, como implementar una libreria externa, como la implementamos paso a paso, espero les haya sido util sigueme en Twitter o Facebook para recibir una notificacion cada vez que subo un nuevo post en este blog, nos vemos en el proximo post.

Tambien podes donar

Es para mantenimiento del sitio, gracias!

$1.50

Anuncio publicitario