En el anterior artículo de esta serie, aprendimos a crear un flujo de mensajes entre un proceso que denominamos productor y otro que llamamos consumidor. Pero eso dista mucho de lo que podríamos llamar una cola de trabajos. Para eso, necesitamos añadir ciertas funcionalidades.
Una de esas funcionalidades, que es imprescindible para nuestra cola de trabajo, es que los mensajes (que denominaremos trabajos a partir de ahora) deben tener un formato estructurado y un significado concreto, reconocido por todos los procesos implicados.
Dando formato a los mensajes
Vamos a convertir los simples mensajes que manejábamos hasta ahora en auténticas órdenes de trabajo. Para ello, hace falta estructurar la información. Tendremos que definir el objeto Trabajo y luego insertarlo en la cola como mensaje.
Dado que RabbitMQ acepta mensajes binarios, somos libres de elegir el formato que más nos plazca. En cualquier caso, ese formato deberá saber representar ciertos objetos de los que maneja el programa. En programación, la acción de convertir objetos de programa en bloques de datos almacenables o transmisibles se denomina serialización o marshalling. Algunos buenos candidatos para esto en Python son:
- XML: Es un estándar muy utilizado para representar objetos, independientemente del lenguaje de programación.
- JSON: Pronunciado "yeison", es similar a XML pero mucho más simple y legible para los humanos. Es menos estándar, pero está implementado en casi todos los lenguajes.
- YAML: Tiene una filosofía parecida a JSON, pero más flexible. Hay quien considera a JSON un subconjunto de YAML.
- Pickle: Es un mecanismo desarrollado específicamente para Python. Es complejo y muy potente, pero no es legible por humanos (son datos binarios), lo que dificulta bastante la depuración del código.
En la elección del formato, hay que tener en cuenta principalmente dos factores: la complejidad de los datos que vamos a serializar y la portabilidad de éstos a otros lenguajes. Este segundo punto no es trivial. Si realizamos un sistema de mensajes que esté atado a un lenguaje determinado, perdemos la libertad de maniobra que nos da una plataforma como RabbitMQ.
Otros factores que podemos tener en cuenta son la legibilidad (a efectos de depuración) y la eficiencia (que el coste de tiempo de proceso en la serialización y deserialización sea bajo).
En este caso, me voy a decantar por JSON, que es un formato muy utilizado en Python, que también tiene soporte en casi todos los lenguajes y además es muy legible y bastante eficiente en cuanto a rendimiento.
Su uso básico en Python es muy simple. Para serializar un objeto en formato cadena (legible) usamos el método json.dumps(), y para deserializar una cadena de vuelta al objeto original, json.loads(). El módulo json viene "de fábrica" en cualquier instalación de Python, y además maneja automáticamente todos los tipos nativos del lenguaje, como listas y diccionarios. Su uso es tan común en Python, que los conversores estándar str() y repr() son sospechosamente similares a él.
Define trabajo
Vamos a definir el objeto genérico Trabajo como una operación con unos datos (opcionales) de entrada. Una primera versión de trabajo.py puede ser algo así:
class Trabajo: def __init__(self, operacion, entrada=None): self.operacion = operacion self.entrada = entrada
En esta definición, hemos abusado sin piedad del duck typing de Python para dejar abierta la elección de qué consideramos una operación y una entrada.
La simple existencia de un objeto no lo hace serializable. Si ahora definimos una instancia de Trabajo, no podremos pasarla a JSON y nos lanzará un TypeError. Hay muchas maneras en Python de hacerlo serializable, por ejemplo usando un JSONEncoder. Aquí vamos a ir a lo más simple, que es definir métodos ad hoc para exportar e importar los objetos. La versión serializada será el resultado de exportar todas las variables en forma de diccionario:
import json class Trabajo: def __init__(self, operacion, entrada=None): self.operacion = operacion self.entrada = entrada def exportar(self): return json.dumps(self.__dict__) @classmethod def importar(cls, datos): dic = json.loads(datos) return cls(dic['operacion'], dic['entrada'])
Con esta implementación, las operaciones posibles son:
Crear un trabajo:
t = Trabajo('hola', 'mundo')
Serializarlo:
cadena = t.exportar()
Deserializarlo:
Crear un trabajo:
t = Trabajo('hola', 'mundo')
Serializarlo:
cadena = t.exportar()
Deserializarlo:
t2 = Trabajo.importar(cadena)
Esta implementación tan simple sólo va a funcionar si los tipos que usamos para la operación y la entrada son a su vez serializables. Para nuestro ejemplo, nos vale perfectamente.
Con todo esto, vamos a modificar los procesos productor y consumidor del anterior artículo para que manejen el objeto que acabamos de definir.
Para simular un trabajo, definiremos la operación "esperar" con un número aleatorio de segundos.
productor.py
Esta implementación tan simple sólo va a funcionar si los tipos que usamos para la operación y la entrada son a su vez serializables. Para nuestro ejemplo, nos vale perfectamente.
Con todo esto, vamos a modificar los procesos productor y consumidor del anterior artículo para que manejen el objeto que acabamos de definir.
Para simular un trabajo, definiremos la operación "esperar" con un número aleatorio de segundos.
productor.py
import pika from trabajo import Trabajo from random import randint # Establecer conexión con = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) ch = con.channel() # Declarar la cola ch.queue_declare(queue='trabajos') # Generar un trabajo t = Trabajo('esperar', randint(1,10)) # Publicar el mensaje ch.basic_publish(exchange='', routing_key='trabajos', body=t.exportar().encode('utf-8')) print("Trabajo enviado.") # Cerrar conexión con.close()
consumidor.py
import pika from trabajo import Trabajo from proceso import ejecutar # Establecer conexión con = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) ch = con.channel() # Declarar la cola ch.queue_declare(queue='trabajos') # Definir la función callback def procesar(ch, method, properties, body): datos = body.decode('utf-8') print("Se ha recibido un trabajo: %s" % datos) t = Trabajo.importar(datos) ejecutar(t) # Enganchar el callback ch.basic_consume(procesar, queue='trabajos', no_ack=True) # Poner en espera print('Esperando trabajos...') ch.start_consuming()
Un par de detalles a destacar:
- Dado que el body de los mensajes ha de ser binario, hacemos la correspondiente conversión desde/hacia UTF-8.
- Para mantener el código debidamente modular, hemos separado la gestión de los trabajos por un lado y la propia ejecución de los mismos. Para ello hemos creado un módulo proceso.py con una función ejecutar().
proceso.py
import time def ejecutar(trabajo): if trabajo.operacion == 'esperar': print('Esperando %d segundos...' % trabajo.entrada) time.sleep(trabajo.entrada) print('Hecho!') else: raise NotImplementedError('Operación "%s" no soportada.' % trabajo.operacion)
Con esto, tenemos un precioso consumidor de trabajos:
$ python consumidor.py Esperando trabajos... Se ha recibido un trabajo: {"entrada": 5, "operacion": "esperar"} Esperando 5 segundos... Hecho!
No hay comentarios:
Publicar un comentario
Expresa tu opinión respetando a los demás y a unas mínimas reglas del lenguaje castellano.
Nota: solo los miembros de este blog pueden publicar comentarios.