Programación distribuida
Definiciones
La programación distribuida es una solución ideal para responder a los problemas de concurrencia donde se necesite ejecutar tareas en segundo plano. La solución consiste en tener, en el lado de la aplicación principal, una aplicación secundaria capaz de ejecutar estas tareas. A continuación, hay que transmitir los mensajes de una aplicación a otra para, por un lado, explicar la tarea por ejecutar y por otra, devolver el resultado (esta última parte es opcional).
Para gestionar esta problemática, nos basamos en el protocolo AMQP (Advanced Message Queuing Protocol), que es un estándar para la organización de estos mensajes. Presentamos los principales patrones de diseño que permiten resolver la mayor parte de las necesidades usándolas a bajo nivel. Empezaremos presentando ØMQ, cuya interfaz de programación es relativamente similar al uso de simples sockets de red, y después utilizaremos una solución basada en un mensaje broker RabbitMQ.
En un segundo momento presentaremos Celery, que es una solución de alto nivel para gestionar la problemática de la programación distribuida muy rápida y eficazmente. Terminaremos presentando crossbar y las soluciones que mezclan Python y JavaScript.
ØMQ
1. Presentación general
ØMQ es una solución elegante y con buen rendimiento, que nos va a permitir presentar los patrones de diseño principales, relacionados con esta problemática. Para instalarla:
$ pip install zmq
Cuando se escribe un programa y queremos utilizar esta librería, siempre hay que escribir el mismo tipo de código:
>>> from zmq import Context
>>> context = Context()
>>> socket = context.socket(METHODE)
Este código se encuentra en las dos aplicaciones (o más), que se comunican entre ellas. Permite crear el socket de red en el ámbito en el que tienen lugar los intercambios de mensajes. El método que se ha de utilizar depende del patrón de diseño que queramos utilizar.
Una vez que se crea el socket, se pueden visualizar sus métodos:
>>> dir(socket)
Como se trabaja con un socket, se utilizará el método bind para escuchar en una interfaz de red y un puerto particular. Los protocolos admitidos son TCP, UDP, PGM, EPGM, INPROC y IPC. También existe bind_to_random_port, que permite dejar al programa la elección del puerto. Para cerrar una conexión, será necesario utilizar el método unbind. El atributo closed permite conocer el estado del socket.
El programa que realizará la operación de bind será el servidor, el que escucha el puerto (y solo puede haber uno). El otro programa deberá utilizar connect y será el cliente. Siguiendo el paradigma utilizado, la comunicación es monodireccional o bidireccional. Dicho de otra manera, en algunos protocolos, el cliente y el servidor pueden enviar y recibir al mismo tiempo el dato, mientras que, en los otros casos, uno no hace otra cosa que recibir y el otro enviar. Para cerrar una conexión como esta, hay que utilizar disconnect.
A continuación, los sockets pueden enviar o recibir los datos y admiten varios formatos:
Tipo de objetos |
Método de recepción |
Método de envío |
Bytes |
recv |
send |
Objeto convertible en JSON (usualmente un diccionario) |
recv_json |
send_json |
Lista de bytes (principalmente) |
recv_multipart |
send_multipart |
Objeto Python serializable a través de pickle |
recv_pyobj |
send_pyobj |
Objeto Python (+ método de serialización que se pasa como argumento) |
recv_serialized |
send_serialized |
Cadena... |
AMQP con RabbitMQ
Anteriormente, hemos mostrado los diferentes patrones que se pueden utilizar para escribir aplicaciones distribuidas. Aquí mostraremos cómo utilizar RabbitMQ a bajo nivel y a continuación se mostrará una solución de alto nivel.
Vamos a utilizar Pika para manipular una cola de espera de RabbitMQ y presentar las diferentes maneras de utilizarlo.
1. Instalar
Para instalar RabbitMQ, hay que utilizar un almacén específico, porque los paquetes que resultan de la distribución, si existen en el almacén oficial, normalmente están fechados y sufren vulnerabilidades:
# wget -O - "https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey" | apt-key add -
# apt update
# apt install rabbitmq-server
Para el resto de distribuciones, será necesario dirigirse a la documentación oficial (https://www.rabbitmq.com/download.html).
2. Configurar
Cuando se usa PostgreSQL, por ejemplo, hay que crear un usuario con su contraseña, así como una base de datos. Entonces, se puede utilizar en nuestro programa Python a partir de una URL como esta:
database_url = 'postgres://user:secret@localhost:5432/db_name'
Con RabbitMQ, es lo mismo:
rabbit_broker_url = 'amqp://user:secret@localhost:5672/vhost'
Existen algunos comandos que hay que conocer para crear un usuario, un vhost y para configurar los permisos:
# rabbitmqctl add_user test test
# sudo rabbitmqctl add_vhost vhost
# sudo rabbitmqctl set_permissiones -p vhost test ".*" ".*" ".*"
Ahora podemos utilizar RabbitMQ a partir de la URL definida más atrás.
Para terminar, también hay que instalar pika:
pip install pika
3. Introducción
El primer ejemplo de código es muy básico: un productor y un consumidor que usan una cola de espera (queue) para intercambiar mensajes. Se trata principalmente de ver cómo crear una conexión, enviar o recibir mensajes.
A continuación, se muestra el productor:
from pika import BlockingConnection
from pika.connection import URLParameters
connection = BlockingConnection(
URLParameters('amqp://test:test@localhost:5672/vhost'))
channel = connection.channel()
channel.queue_declare(queue='test1')
while...
Kafka
1. Presentación
Kafka es un sistema de distribución de mensajes en tiempo real utilizado en particular en canales de datos, pero también como sistema de intercambio en sistemas de microservicios.
El ecosistema de Kafka en Python incluye kafka-python y aiokafka, su versión asíncrona, que puede utilizarse para crear productores y consumidores.
También existe el proyecto faust, que gestiona el equivalente del patrón publicar/suscribir (publish/subscribe).
2. Principios generales
Kafka es un sistema de distribución de mensajes. Estos mensajes se aíslan en un tema. Este tema es un mecanismo de almacenamiento que preserva el orden en el que se envían los mensajes, y los mensajes pueden verse como eventos.
Cada tema puede tener una o varias particiones y el orden de los mensajes está garantizado dentro de la misma partición, pero no entre particiones diferentes.
Cuando un productor produce un mensaje, este se enviará en una sola partición.
También existe la noción de grupo. En un sistema en el que varias partes intercambian mensajes, el mensaje debe ser leído por cada grupo que escucha. Si un grupo solo tiene un consumidor, este escuchará todas las particiones. Si tiene varios consumidores, entonces las particiones se distribuirán uniformemente entre cada consumidor.
Por último, también existe la noción de replicación...
Celery
1. Presentación
Celery es un ordenador de tareas:
-
Recibe sus tareas a través de un sistema de mensajes.
-
Los distribuye de manera optimizada a través del uso de los workers.
-
Cada tarea se puede ejecutar de manera síncrona o asíncrona.
-
Está diseñado para funcionar en tiempo real, pero también puede gestionar una planificación.
-
Cada tarea puede devolver un resultado (opcionalmente).
Se trata de una herramienta de alto nivel, que ofrece las funciones y una sintaxis para responder fácilmente a las necesidades de arquitectura de aplicaciones compleja.
2. ¿En qué caso utilizar Celery?
El caso clásico es cuando tenemos una función que debe generar un resultado y que después tiene que hacer algo relacionado con dicho resultado para, finalmente, devolverlo al usuario. Si hacemos estas acciones anexas en el mismo túnel de tratamiento, entonces también retrasamos la respuesta al usuario. Por lo tanto, la idea es poner estas acciones anexas en las tareas distribuidas (ejecutadas en el lado del proceso principal). De esta manera, se lanzan estas tareas que se ejecutarán más tarde, pero no esperamos a que terminen y se puede continuar el proceso principal y devolver el control al usuario.
Las tareas ubicadas en una cola de espera, se ejecutan lo antes posible. Sin embargo, no hay ninguna garantía de la temporalidad de su ejecución ni sobre el hecho de que tengan éxito o fallen.
De esta manera, la experiencia del usuario mejora porque la aplicación responde más rápido, pero también se debe mantener actualizado porque hay tareas en espera y se notificará cuando terminen.
Un ejemplo concreto sería una tarea para enviar un e-mail o generar un documento a partir de los resultados del proceso principal.
Es posible prever un medio para Celery para que restituya el resultado, lo que permite resolver todas las situaciones clásicas donde la programación asíncrona es eficaz, salvo que se ha hecho con una solución de terceros, que permiten una ampliacón importante y tiene posibilidades más interesantes en términos de arquitectura.
3. Instalar
Como se ha explicado anteriormente, Celery funciona recibiendo mensajes pidiéndo realizar una tarea. Por lo tanto, hay que elegir cómo se envían estos mensajes....
Crossbar
1. Presentación
Para terminar este capítulo sobre la programación distribuida, vamos a introducir WAMP (Web Application Messaging Protocol). Se trata de un estándar abierto de WebSocket, que permite a dos aplicaciones intercambiar mensajes RPC (Remote Procedure Call, es decir llamada remota de procedimiento) o de tipo Publish/Subscribe (que ya hemos encontrado dos veces en este capítulo).
El diseño original de este protocolo se ha realizado por crossbar.io en 2012 y es relativamente útil y fácil de poner en marcha por parte de los proyectos web.
La solución presentada aquí, utiliza masivamente la librería asíncrona de Python. Por lo tanto, se aconseja haber leído el capítulo Programación asíncrona: avanzada, antes de atacar esta parte.
Para instalar las herramientas necesarias:
$ pip install autobahn
2. WebSocket
Como introducción, vamos a realizar un WebSocket de tipo Echo. Por lo tanto, este último va a recibir el dato y devolverlo tal cual. A continuación, se muestra este servidor:
import json
import logging
import coloredlogs
from autobahn.asyncio.websocket import (
WebSocketServerProtocol,
WebSocketServerFactory)
logger = logging.getLogger("notifications")
coloredlogs.install(level='DEBUG', logger=logger)
class TestServerProtocol(WebSocketServerProtocol):
def onConnect(self, request):
logger.info("Client connecting: {0}".format(request.peer))
def onOpen(self):
logger.info("WebSocket connection open.")
def onMessage(self, payload, isBinary):
if isBinary:
result = self.onMessageBinary(payload)
else:
payload = payload.decode("utf-8")
...