Programación distribuida
Definiciones
La programación distribuida es una solución ideal para responder a los problemas de concurrencia donde se necesite ejecutar tareas en segundo plano. La solución consiste en tener, en el lado de la aplicación principal, una aplicación secundaria capaz de ejecutar estas tareas. A continuación, hay que transmitir los mensajes de una aplicación a otra para, por un lado explicar la tarea a ejecutar y por otra, devolver el resultado (esta última parte es opcional).
Para gestionar esta problemática, nos basamos en el protocolo AMQP (Advanced Message Queuing Protocol), que es un estándar para la organización de estos mensajes. Presentamos los principales patrones de diseño que permiten resolver la mayor parte de las necesidades usándolas a bajo nivel. Empezaremos presentando ØMQ, cuya interfaz de programación es relativamente similar al uso de simples sockets de red, y después utilizaremos una solución basada en un mensaje broker RabbitMQ.
En un segundo momento presentaremos Celery, que es una solución de alto nivel para gestionar la problemática de la programación distribuida muy rápida y eficazmente. Terminaremos presentando crossbar y las soluciones que mezclan Python y JavaScript.
ØMQ
1. Presentación general
ØMQ es una solución elegante y con buen rendimiento, que nos va a permitir presentar los patrones de diseño principales, relacionados con esta problemática. Para instalarla:
$ pip install zmq
Cuando se escribe un programa y queremos utilizar esta librería, siempre hay que escribir el mismo tipo de código:
>>> from zmq import Context
>>> context = Context()
>>> socket = context.socket(METHODE)
Este código se encuentra en las dos aplicaciones (o más), que se comunican entre ellas. Permite crear el socket de red en el ámbito en el que tienen lugar los intercambios de mensajes. El método a utilizar depende del patrón de diseño que queramos utilizar.
Una vez que se crea el socket, se puede visualizar sus métodos:
>>> dir(socket)
Como se trabaja con un socket, se utilizará el método bind para escuchar en una interfa de red y un puerto particular. Los protocolos soportados son TCP, UDP, PGM, EPGM, INPROC y IPC. También existe bind_to_random_port, que permite dejar al programa la elección del puerto. Para cerrar una conexión, será necesario utilizar el método unbind. El atributo closed permite conocer el estado del socket.
El programa que realizará la operación de bind será el servidor, el que escucha el puerto (y solo puede haber uno). El otro programa deberá utilizar connect y será el cliente. Siguiendo el paradigma utilizado, la comunicación es monodireccional o bidireccional. Dicho de otra manera, en algunos protocolos, el cliente y el servidor pueden enviar y recibir al mismo tiempo el dato, mientras que en los otros casos, uno no hace otra cosa que recibir y el otro enviar. Para cerrar una conexión como esta, hay que utilizar disconnect.
A continuación, los sockets pueden enviar o recibir los datos y soportan varios formatos:
Tipo de objetos |
Método de recepción |
Método de envío |
Bytes |
recv |
send |
Objeto convertible en JSON (usualmente un diccionario) |
recv_json |
send_json |
Lista de bytes (principalmente) |
recv_multipart |
send_multipart |
Objeto Python serializable a través de pickle |
recv_pyobj |
send_pyobj |
Objeto Python (+ método de serialización que se pasa como argumento) |
recv_serialized |
send_serialized |
Cadena de caracteres... |
AMQP con RabbitMQ
En el capítulo anterior, hemos mostrado los diferentes patrones que se pueden utilizar para escribir aplicaciones distribuidas. Este capítulo va a mostrar cómo utilizar RabbitMQ a bajo nivel y el capítulo siguiente mostrará una solución de alto nivel.
Vamos a utilizar Pika para manipular una cola de espera de RabbitMQ y presentar las diferentes maneras de utilizarlo.
1. Instalación
Para instalar RabbitMQ, hay que utilizar un almacén específico, porque los paquetes que resultan de la distribución, si existen en el almacén oficial, normalmente están fechados y sufren vulnerabilidades:
# wget -O - "https://packagecloud.io/rabbitmq/rabbitmq-server/gpgkey" | apt-key add -
# apt update
# apt install rabbitmq-server
Para el resto de distribuciones, será necesario dirigirse a la documentación oficial (https://www.rabbitmq.com/download.html).
2. Configuración
Cuando se usa PostgreSQL, por ejemplo, hay que crear un usuario con su contraseña, así como una base de datos. Entonces, se puede utilizar en nuestro programa Python a partir de una URL como esta:
database_url = 'postgres://user:secret@localhost:5432/db_name'
Con RabbitMQ, es lo mismo:
rabbit_broker_url = 'amqp://user:secret@localhost:5672/vhost'
Existen algunos comandos que hay que conocer para crear un usuario, un vhost y para configurar los permisos:
# rabbitmqctl add_user test test
# sudo rabbitmqctl add_vhost vhost
# sudo rabbitmqctl set_permissiones -p vhost test ".*" ".*" ".*"
Ahora podemos utilizar RabbitMQ a partir de la URL definida más atrás.
Para terminar, también hay que instalar pika:
pip install pika
3. Introducción
El primer ejemplo de código es muy básico: un productor y un consumidor que usan una cola de espera (queue) para intercambiar mensajes. Se trata principalmente de ver cómo crear una conexión, enviar o recibir mensajes.
A continuación se muestra el productor:
from pika import BlockingConnection
from pika.connection import URLParameters
connection = BlockingConnection(
URLParameters('amqp://test:test@localhost:5672/vhost'))
channel = connection.channel()
...
Celery
1. Presentación
Celery es un ordenador de tareas:
-
recibe sus tareas a través de un sistema de mensajes
-
los distribuye de manera optimizada a través del uso de los workers
-
cada tarea se puede ejecutar de manera síncrona o asíncrona
-
está diseñado para funcionar en tiempo real, pero también puede gestionar una planificación
-
cada tarea puede devolver un resultado (opcionalmente)
Se trata de una herramienta de alto nivel, que ofrece las funciones y una sintaxis para responder fácilmente a las necesidades de arquitectura de aplicaciones compleja.
2. ¿En qué caso utilizar Celery?
El caso clásico es cuando tenemos una función que debe generar un resultado, y después que hacer algo relacionado con este resultado y lo devuelve al usuario. Si hacemos estas acciones anexas en el mismo túnel de tratamiento, entonces también retrasamos la respuesta al usuario. Por lo tanto, la idea es poner estas acciones anexas en las tareas distribuidas (ejecutadas en el lado del proceso principal). De esta manera, se lanzan estas tareas que se ejecutarán más tarde, pero no esperamos a que terminen y se puede continuar el proceso principal y devolver el control al usuario.
Las tareas ubicadas en una cola de espera, se ejecutan lo antes posible. Sin embargo, no hay ninguna garantía de la temporalidad de su ejecución ni sobre el hecho de que tengan éxito o fallen.
De esta manera, la experiencia del usuario mejora porque la aplicación responde más rápido, pero también se debe mantener actualizado porque hay tareas en espera y se notificará cuando terminen.
Un ejemplo concreto sería una tarea para enviar un e-mail o generar un documento a partir de los resultados del proceso principal.
Es posible prever un medio para Celery para que restituya el resultado, lo que permite resolver todas las situaciones clásicas donde la programación asíncrona es eficaz, salvo que se ha hecho con una solución de terceros, que permite un escalado importante y tiene posibilidades más interesantes en términos de arquitectura.
3. Instalación
Como se ha explicado más atrás, Celery funciona recibiendo mensajes pidiéndole realizar una tarea. Por lo tanto, hay que elegir cómo se envían estos mensajes. Las opciones más...
Crossbar
1. Presentación
Para terminar este capítulo sobre la programación distribuida, vamos a introducir WAMP (Web Application Messaging Protocol). Se trata de un estándar abierto de WebSocket, que permite a dos aplicaciones intercambiar mensajes RPC (Remote Procedure Call, es decir llamada remota de procedimiento) o de tipo Publish/Subscribe (que ya hemos encontrado dos veces en este capítulo).
El diseño original de este protocolo se ha realizado por crossbar.io en 2012 y es relativamente útil y fácil de poner en marcha por parte de los proyectos web.
La solución presentada aquí, utiliza masivamente la librería asíncrona de Python. Por lo tanto, se aconseja haber estudiado el capítulo avanzado sobre la asincronía, antes de atacar esta parte.
Para instalar las herramientas necesarias:
$ pip install autobahn
2. WebSocket
Como introducción, vamos a realizar un WebSocket de tipo Echo. Por lo tanto, este último va a recibir el dato y devolverlo tal cual. A continuación se muestra este servidor:
import json
import logging
import coloredlogs
from autobahn.asyncio.websocket import (
WebSocketServerProtocol,
WebSocketServerFactory)
logger = logging.getLogger("notifications")
coloredlogs.install(level='DEBUG', logger=logger)
class TestServerProtocol(WebSocketServerProtocol):
def onConnect(self, request):
logger.info("Client connecting: {0}".format(request.peer))
def onOpen(self):
logger.info("WebSocket connection open.")
def onMessage(self, payload, isBinary):
if isBinary:
result = self.onMessageBinary(payload)
else:
payload = payload.decode("utf-8")
...