Antes de este, debes leer los dos artículos anteriores de esta serie. En el primero, desarrollamos un simple flujo de mensajes y en el segundo, estructuramos dichos mensajes como trabajos.
Ahora vamos a afrontar otra de las funcionalidades que requiere cualquier cola de trabajos: debe haber un mecanismo para que el proceso consumidor pueda confirmar que un trabajo se ha completado, y en caso contrario, sea reasignado a otro proceso.
Se ruega acuse de recibo
La confirmación, acuse de recibo o acknowledgment, es un concepto usado en prácticamente todos los protocolos (incluso los no digitales) para aquellas operaciones en las que queremos asegurar que se completan correctamente.
En el programa que hemos hecho en esta serie de artículos, hasta ahora le estábamos indicando a la cola de RabbitMQ que no hiciera acuse de recibo, mediante la opción no_ack=True en la llamada a basic_consume(). Esto hacía que la cola no esperase ninguna confirmación y eliminase los trabajos tan pronto como eran despachados al consumidor. Esto es muy eficiente, pero evidentemente corremos el peligro de que algún trabajo quede sin hacer y sea silenciosamente olvidado.
Para hacer el acuse de recibo, eliminamos la opción no_ack (por defecto es False) y añadimos en la función callback una llamada a ch.basic_ack(). Esta llamada avisa a la cola que el trabajo se ha completado y ya puede ser retirado. Para identificar el trabajo al que nos referimos, se incluye el argumento delivery_tag, que a su vez hemos recibido a través del argumento method.
En resumen, el código cambia de esta manera:
Si ejecutamos este consumidor, en principio no notaremos ningún cambio, a no ser que miremos en la cola de mensajes y nos demos cuenta que el mensaje se elimina después de pasar el tiempo de espera, no antes.
Desde la línea de comandos de rabbitmqctl, hay una forma para ver no solo los mensajes en espera, sino cuántos han sido despachados y están esperando confirmación:
Para hacer el acuse de recibo, eliminamos la opción no_ack (por defecto es False) y añadimos en la función callback una llamada a ch.basic_ack(). Esta llamada avisa a la cola que el trabajo se ha completado y ya puede ser retirado. Para identificar el trabajo al que nos referimos, se incluye el argumento delivery_tag, que a su vez hemos recibido a través del argumento method.
En resumen, el código cambia de esta manera:
# Definir la función callback def procesar(ch, method, properties, body): datos = body.decode('utf-8') print("Se ha recibido un trabajo: %s" % datos) t = Trabajo.importar(datos) ejecutar(t) ch.basic_ack(delivery_tag=method.delivery_tag) # Enganchar el callback ch.basic_consume(procesar, queue='trabajos')
Si ejecutamos este consumidor, en principio no notaremos ningún cambio, a no ser que miremos en la cola de mensajes y nos demos cuenta que el mensaje se elimina después de pasar el tiempo de espera, no antes.
Desde la línea de comandos de rabbitmqctl, hay una forma para ver no solo los mensajes en espera, sino cuántos han sido despachados y están esperando confirmación:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
En nuestro ejemplo, los posibles valores arrojados por esta consulta pueden ser:
a) No hay trabajos.
b) Se ha enviado un trabajo con productor.py.
c) Se ha lanzado consumidor.py y está procesando el trabajo.
d) El trabajo se ha terminado de procesar, y el acuse de recibo se ha completado.
¿No es maravilloso? ¡Seguimos para bingo!
Competencia entre consumidores
El ejemplo anterior es muy trivial, debido a que sólo contamos con un consumidor. La cosa se pone más interesante cuando hay varios consumidores procesando los mensajes, lo que inevitablemente genera una "competencia" entre ellos. Veremos que RabbitMQ puede lidiar con esa competencia de forma brillante.
Para simular una competencia, vamos a modificar nuestros inocentes programas para que fuercen la máquina. Vamos a hacer lo siguiente:
a) No hay trabajos.
Listing queues ... trabajos 0 0
b) Se ha enviado un trabajo con productor.py.
Listing queues ... trabajos 1 0
c) Se ha lanzado consumidor.py y está procesando el trabajo.
Listing queues ... trabajos 0 1
d) El trabajo se ha terminado de procesar, y el acuse de recibo se ha completado.
Listing queues ... trabajos 0 0
¿No es maravilloso? ¡Seguimos para bingo!
Competencia entre consumidores
El ejemplo anterior es muy trivial, debido a que sólo contamos con un consumidor. La cosa se pone más interesante cuando hay varios consumidores procesando los mensajes, lo que inevitablemente genera una "competencia" entre ellos. Veremos que RabbitMQ puede lidiar con esa competencia de forma brillante.
Para simular una competencia, vamos a modificar nuestros inocentes programas para que fuercen la máquina. Vamos a hacer lo siguiente:
- Haremos que productor.py genere trabajos de manera constante.
- Haremos que consumidor.py, además de ejecutar trabajos en espera simulando que trabaja, aleatoriamente falle lanzando una excepción. De esta manera comprobaremos si dicho trabajo se redirige a otro consumidor.
- Lanzaremos varios productores y varios consumidores a la vez, en distintas terminales.
Nuestros programas se quedan de esta manera:
productor.py
import pika from trabajo import Trabajo from random import randint import time # Establecer conexión con = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) ch = con.channel() # Declarar la cola ch.queue_declare(queue='trabajos') # Bucle infinito while True: # Espera aleatoria antes de lanzar el siguiente trabajo time.sleep(randint(1,4)) # Generar un trabajo t = Trabajo('esperar', randint(1,10)) # Publicar el mensaje ch.basic_publish(exchange='', routing_key='trabajos', body=t.exportar().encode('utf-8')) print("Trabajo enviado: %s" % t.exportar())
consumidor.py
import pika from trabajo import Trabajo from proceso import ejecutar # Establecer conexión con = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) ch = con.channel() # Declarar la cola ch.queue_declare(queue='trabajos') # Definir la función callback def procesar(ch, method, properties, body): datos = body.decode('utf-8') print("Se ha recibido un trabajo: %s" % datos) t = Trabajo.importar(datos) ejecutar(t) ch.basic_ack(delivery_tag=method.delivery_tag) # Enganchar el callback ch.basic_consume(procesar, queue='trabajos') # Poner en espera print('Esperando trabajos...') ch.start_consuming()
proceso.py
import time import random def ejecutar(trabajo): if trabajo.operacion == 'esperar': print('Esperando %d segundos...' % trabajo.entrada) time.sleep(trabajo.entrada) # Simular un fallo el 20% de las veces if random.random() < 0.2: raise Exception print('Hecho!') else: raise NotImplementedError('Operación "%s" no soportada.' % trabajo.operacion)
Si, con esta configuración, lanzamos, por ejemplo, dos productores y tres consumidores, veremos cómo los trabajos van fluyendo y cuando, eventualmente, uno de los consumidores aborta, los otros siguen procesando con normalidad, sin que se pierda ningún trabajo.
Dos productores y tres consumidores trabajando a la vez. Podemos observar que, a pesar de que los consumidores 1 y 2 han abortado, el segundo sigue en la brecha procesando trabajos. |
No hay comentarios:
Publicar un comentario
Expresa tu opinión respetando a los demás y a unas mínimas reglas del lenguaje castellano.
Nota: solo los miembros de este blog pueden publicar comentarios.