Esto trata sobre manejar con buen gusto los efectos secundarios de las operaciones asíncronas -- Erik Meijer sobre Rx

La programación reactiva es un concepto relativamente nuevo que está revolucionando el mundo del software. Hoy en día, cualquier sistema serio involucra muchos subsistemas asíncronos que necesitan ser coordinados eficientemente, algo tremendamente complicado con las técnicas de programación clásicas.

En este artículo trataré de introducir la filosofía de programación reactiva y las posibilidades que nos brinda JavaScript, junto a RxJS, en este aspecto. Repasaremos algunos conceptos clave para entender la programación reactiva y veremos algunos ejemplos pragmáticos sobre cómo usar RxJS.

Manejando la asincronía con JavaScript

En los últimos años, JavaScript se ha convertido en uno de los lenguajes más utilizados del mundo, se encuentra en infraestructuras críticas de las empresas más importantes, en las cuales un correcto manejo de la asincronía se vuelve esencial.

Antes de profundizar en conceptos relacionados con la reactividad, veamos un pequeño resumen sobre los callbacks y los promises, dos de los mecanismos clásicos para manejar la asincronía en JavaScript.

Callbacks

Los callbacks son la forma más antigua de gestionar la asincronía. Como ya sabréis, un callback no es más que una función que recibe como argumento otra función y la ejecuta. En otras palabras, una función "a" se usa como argumento de otra función "b". Cuando se llama a "b", esta ejecuta "a".

function b(callback){
    //do something
    callback()
}

function a(){
     console.log('hello');
}

Los callbacks son muy sencillos de entender y es por ello que son la forma más extendida de manejar la asincronía en JavaScript. No obstante, los callbacks tienen varios inconvenientes, como pueden ser la gestión de errores o el aumento de la complejidad a la hora de manejar la concurrencia; por no hablar del temido callback hell.

callback-hell

Promises

Las promises o promesas son un patrón que nos ayuda a realizar operaciones asíncronas sin muchos de los inconvenientes de los callbacks. Una promesa representa un valor que puede estar disponible ahora, en el futuro o nunca. Vinieron a salvarnos del tan temido callback hell. Generalmente hacen que los programas sean más claros al mantener un estilo de código síncrono, reduciendo la necesidad de anidar bloques y simplificando el manejo del estado.

Las promesas pueden encontrarse en 3 estados diferentes:

  • Pending: Estado inicial
  • Fulfilled: Representa una operación satisfactoria
  • Rejected: Representa una operación fallida

Veamos un ejemplo de cómo se crean y cómo se consumen:

//Creando la promesa
const requestPosts = (url) =>
	new Promise((resolve, reject) => {
		const req = new XMLHttpRequest();
		req.open('GET', url);

		req.onload = () =>
			(req.status == 200)
				? resolve((req.response)
				: reject(req.status);

		req.send();
	})

//Consumiendo la promesa
const url = 'https://jsonplaceholder.typicode.com/posts'

requestPosts(url)
	.then(r =>
    	console.log(JSON.parse(r))
  	)
  	.catch((e) =>
    	console.log(`Error: ${e}`)
  	);

Desafortunadamente, las promesas no son la panacea. Representan una mejora con respecto a los callbacks, pero tienen una deficiencia importante: solo producen un valor único. Esto es un factor limitante a la hora de manejar eventos recurrentes, como clics del mouse o flujos de datos provenientes del servidor, ya que tendríamos que preocuparnos de cada evento por separado en lugar de controlar el flujo de eventos tal como viene dado.

Async / Await

Desde la versión ES8 de JavaScript, ya tenemos disponible el patrón async/await. Esta característica heredada de otros lenguajes como c# nos permite resolver promesas escribiendo código con un estilo síncrono, es decir, añade azúcar sintáctico a las promesas.

Para usar async/await debemos declarar una función con el modificador async. Esto nos permite añadir el modificador await delante de la expresión asíncrona en dicha función, deteniendo la ejecución hasta que se resuelva la expresión.

La promesa del ejemplo anterior la podríamos consumir así:

async function main(){
    const url = 'https://jsonplaceholder.typicode.com/posts'
    const response = await requestPosts(url)

    console.log(response)
}

A priori, queda un código mucho más elegante, además de resolver el problema de la anidación de promesas. El problema viene con el tratamiento de los errores, y es que debemos “envolver” nuestro código en bloques try catch, con lo cual el manejo de errores se vuelve muy tedioso:

async function main(){
	const url = 'https://jsonplaceholder.typicode.com/posts'
	try{
        const response = await requestPosts(url)
        console.log(response)
    }
    catch(error){
        console.log(error)
    }
}

En los siguientes párrafos veremos cómo RxJS nos puede ayudar a solucionar estas limitaciones, pero antes veamos qué es la programación reactiva.

¿Qué es la programación reactiva?

Según el Reactive Manifesto, la programación reactiva es un paradigma enfocado en el trabajo con stream de datos de manera asíncrona. En este se establece las bases de los sistemas reactivos, los cuales deben ser:

  • Responsivos: aseguran la calidad del servicio cumpliendo unos tiempos de respuesta establecidos.
  • Resilientes: se mantienen responsivos incluso cuando se enfrentan a situaciones de error.
  • Elásticos: se mantienen responsivos incluso ante aumentos en la carga de trabajo.
  • Orientados a mensajes: minimizan el acoplamiento entre componentes al establecer interacciones basadas en el intercambio de mensajes de manera asíncrona.

¿Qué son los streams?

Para entender la programación reactiva, debemos entender lo que son los streams. Podríamos decir que un stream es un tipo de colección, al igual que un array o un string, en este caso, de eventos o elementos futuros. La diferencia radica en que los stream no están disponibles de forma síncrona y que, además, desconocemos su tamaño.

Un stream puede provenir de múltiples fuentes, como por ejemplo:

  • Eventos del DOM - (eventos del ratón, eventos del teclado, eventos de formularios, etc.)
  • Animaciones
  • Peticiones HTTP
  • WebSockets
  • Lectura o escritura de ficheros.
  • Llamadas a base de datos

¿Qué es RxJS?

RxJS es una implementación para JavaScript de las Reactive Extensions. Estas fueron desarrolladas por Erik Meijer en Microsoft en el 2009. Básicamente son una librería para trabajar con streams mediante el uso de observables.

Las Rx están implementadas en más de 18 lenguajes de programación. En el mundo de Javascript también son conocidas como el “Lodash” de los eventos.

Esta librería nos proporciona un marco de trabajo en el que todo gira alrededor del tipo básico, el “observable”, el cual simplemente representa un stream de datos. Además, disponemos de otros tipos complementarios como “observer, schedulers, subjects” y operadores inspirados en Arraysmap, filter, reduce”, etc; los cuales nos permiten el manejo de eventos asíncronos como colecciones. En párrafos posteriores, veremos los operadores más importantes.

observable

Observer e Iterator

Para entender qué son y de dónde provienen los observables, debemos entender sus bases, el patrón observer y el patrón iterator.

Patrón observer

La filosofía del patrón observador es sencilla: un objeto, denominado sujeto (subject), posee un estado. Si dicho estado cambia, es capaz de “notificar” a sus suscriptores (observers) de este cambio. Gracias a ello, los objetos suscritos al objeto subject no tienen por qué preocuparse de cuándo se produce un cambio de estado, ya que este se encargará de informar de forma activa a todos aquellos objetos que hayan decidido suscribirse.

Veamos una implementación básica de dicho patrón:

class Subject {
    constructor(){
        this.observers = []
    }

    add(obs){
	  this.observers = this.observers.concat(obs)
    }

    delete(obs){
	  this.observers = this.observers.filter(l => l !== obs)
    }

    notify(msg){
	  this.observers.map(obs => obs.update(msg))
    }
}

Como podemos comprobar, la implementación es muy sencilla. Si creamos una instancia de esta clase, el objeto subject contiene una lista de observers. Estos observers se pueden añadir a través de add, o eliminar a través del método delete. Además, por medio del método notify, podemos notificar a dichos observers.

Veamos un ejemplo de cómo funciona:

const observer1 = {
	update: msg => console.log(`observer 1: ${msg}`)
}

const observer2 = {
	update: msg => console.log(`observer 2: ${msg}`)
}

const subject = new Subject()
subject.add(observer1)
subject.add(observer2)
subject.notify("Hello");
//observer 1: Hello
//observer 2: Hello

Como podemos observar en el código anterior, observer1 y observer2 son notificados cada vez que el objeto subject actualiza su estado interno. Esta implementación es muy sencilla, pero nos sirve para ilustrar cómo el patrón observador nos permite desacoplar los eventos y la reacción de los objetos que están a la escucha.

Patrón Iterator

La otra pieza fundamental del puzle es el patrón iterator. El objetivo de este patrón es proporcionarnos una manera de acceder a los elementos de un objeto agregado, de forma secuencial, sin exponer sus detalles. Es decir, proporciona a una colección un medio para navegar por sus datos sin exponer su estructura interna.

La implementación del iterador es muy simple, tan solo necesita la especificación de dos métodos: next (), para obtener el siguiente elemento en la colección, y hasNext (), para verificar si quedan elementos en dicha colección.

class CustomIterator {
    constructor(collection) {
        this.index = 0;
        this.collection = collection;
    }

    next = () =>
        this.hasNext()
            ? this.collection[this.index++]
            : null

    hasNext = () =>
        this.index + 1 < this.collection.length;
}

const customIterator = new CustomIterator([1,2,3,4])
console.log(customIterator.next(), consumer.hasNext()) //1, true

Como vemos, es un patrón extremadamente simple, pero nos proporciona una excelente forma de encapsular la lógica mediante la cual recorremos cualquier tipo de estructura de datos. La combinación de este patrón junto con el patrón observer nos es tremendamente efectiva y es la base de los observables de las Reactive Extensions.

Qué es un observable

Tras analizar sus fundamentos, veamos qué entendemos por observable.  Como podemos imaginar, el tipo observable es el eje central de Rx. Simplemente representa la idea de una colección de valores o eventos futuros.  

Los valores o eventos se emiten en orden, igual que en el patrón iterator. En lugar de que sean los objetos que lo consumen los que solicitan el siguiente elemento, es el propio observable el que “empuja” los siguientes elementos a los objetos suscritos, a medida que estos están disponibles, tal como sucedía en el subject del patrón observer.

Características:

  • Proporcionan soporte para enviar mensajes entre publishers y subscribers.
  • Ofrecen beneficios significativos sobre otras técnicas a la hora de trabajar con eventos y manejar la asincronía.
  • Los observables son lazy (perezosos). No comienzan a emitir datos hasta que te suscribes a ellos.
  • Al igual que los iteradores, un observable puede indicar cuándo se completa la secuencia.
  • Nos permiten declarar cómo reaccionar a una secuencia de elementos, en lugar de tener que reaccionar a los elementos de forma individual.

Ejemplo de observable

Antes de continuar, veamos una implementación sencilla de cómo consumir un observable:

import { fromEvent } from 'rxjs';
const link = document.getElementById("customLink");

const obs = {
    next: function(value) {
        console.log(value);
    },
    error: function(err) {
        console.error(err);
    },
    complete: function() {
        console.log("Completed");
    }
};

// Create an Observable from event
const observable = fromEvent(link, "click");
// Subscribe to begin listening for async result
observable.subscribe(obs);

Lo primero que hacemos es crear un observable a partir de un evento, en concreto del evento clic de un enlace con la id “customLink”. A continuación, nos suscribimos a dicho observable por medio de la función subscribe, pasándole el objeto “obs”, que cumple con la interfaz observer.

Subscription y observer

En el ejemplo anterior aparecen dos conceptos básicos de Rx: subscription y observer.

Subscription representa la ejecución de un observable. Como hemos comentado, los observables son lazy, por lo tanto, hasta que no nos suscribimos a ellos, no comienzan a emitir valores. Los suscriptores tienen que implementar la interfaz de observador.

observable-observers

Observer es un objeto que sabe reaccionar a los valores entregados por el observable. Para ello implementa la interfaz de observador. Dicha interfaz contiene los métodos next(), error() y complete().  

observable process

Next() es el equivalente al método update en el patrón observer. Se llama cuando el observable emite un nuevo valor. Complete() se ejecuta cuando no hay más datos disponibles.

El método error() se ejecuta cuando se produce un error en el observable, al igual que complete(). Una vez ejecutado, las siguientes llamadas no tendrán efecto.

Visualización de observables

Para entender el comportamiento de los observables nos apoyaremos en los diagramas de marbles o canicas.  Estos diagramas son muy clarificadores, ya que nos ayudan a simplificar el comportamiento de los observables y de los operadores que aplicamos sobre ellos, los cuales nos ayudarán a visualizar de forma más clara el comportamiento de los operadores.

En este tipo de diagramas el tiempo viene representado por la línea horizontal y los valores emitidos por el observable se encuentran representados por canicas.

observable marble

En el ejemplo tenemos un observable que emite los valores de un evento. Cada una de las canicas representa un elemento emitido, la “x” representa un error y la línea vertical indica que el observable ha dejado de emitir valores.

Operadores

Un operador es, en esencia, una función pura que toma un observable como entrada y genera otro observable como salida. Existen docenas de operadores divididos en ocho categorías: creacionales, de transformación, de filtrado, condicionales, de combinación, multidifusión, manejo de errores y de utilidad.

A continuación, veremos los más importante desde un punto de vista pragmático, mediante los cuales podremos realizar el 90% de las operaciones en cualquier proyecto real.

Operadores de creación

RxJS nos ofrece una serie de operadores destinados a la creación de observables. Estas funciones simplifican el proceso de creación de observables a partir de elementos como arrays, eventos, callbacks o promesas.

Create

Es el operador de creación más básico. Crea un observable a partir de una función recibida y esta define cómo el observable va a emitir los valores:

// RxJS v6+
import { Observable } from 'rxjs';
/*
  Create an observable that emits 'Hello' and 'World' on
  subscription.
*/
const hello = Observable.create((observer) => {
  observer.next('Hello');
  observer.next('World');
});

//output: 'Hello'...'World'
const subscribe = hello.subscribe(val => console.log(val));

From

Este operador de creación convierte una colección o una promesa en un observable:

// RxJS v6+
import { from } from 'rxjs';

//emit array as a sequence of values
const arraySource = from([1, 2, 3, 4, 5]);
//output: 1,2,3,4,5
arraySource.subscribe(val => console.log(val));

//emit result of promise
const promiseSource = from(new Promise(resolve => resolve('Hello World!')));
//output: 'Hello World'
const subscribe = promiseSource.subscribe(val => console.log(val));

//emit string as a sequence
const source = from('Hello World');
//output: 'H','e','l','l','o',' ','W','o','r','l','d'
const subscribe = source.subscribe(val => console.log(val));

fromEvent

Como su propio nombre indica, este operador convierte cualquier tipo de evento en un observable:

// RxJS v6+
import { fromEvent } from 'rxjs';
//convert the mousemove event stream of the DOM into an observable sequence.
const source = fromEvent(document, 'mousemove');
source.subscribe(e => console.log(e.clientX + ', ' + e.clientY));

Operadores de filtrado

Como su propio nombre indica, son operadores destinados al filtrado de los valores emitidos por el observable. Estos operadores son los más sencillos y fáciles de utilizar, ya que simplemente aceptan o rechazan ciertos valores según los criterios de filtrado aplicados. Sin duda, en este apartado el más importante es filter.

Filter

Filter se comporta igual que el operador estándar de JavaScript. Solo emitirá valores si cumple la condición dada por el predicado.

filter
// RxJS v6+
import { from } from 'rxjs';
import { filter } from 'rxjs/operators';

const numbers = from([2, 30, 22, 5, 60, 1])


numbers
    .pipe(filter(n => n > 10))
    .subscribe(n => console.log(n))

En este caso, el predicado devuelve solo los elementos de entrada mayor que 10. Por lo tanto, solo se emiten los valores 30, 22 y 60.

Operadores de transformación

Existen multitud de operadores que nos permiten aplicar transformaciones elegantes a los observables, aunque en la práctica los más utilizados son los clásicos map y reduce, además de algún otro que deriva de estos como mergeMap (flatmap) y concatMap. Siendo pragmáticos, con estos operadores tenemos lo necesario para la mayoría de los casos. Los demás los estudiaría bajo demanda, es decir, mientras los fuera necesitando.

Map

Transforma los elementos emitidos por un observable aplicando una función a cada uno de los mismos.

map
// RxJS v6+
import { from } from 'rxjs';
import { map } from 'rxjs/operators';

const numbers = from([1, 2, 3])

numbers
   .pipe(map(x => 10 * x))
   .subscribe(n => console.log(n))

En el ejemplo, el observable de origen contiene tres elementos numéricos (1, 2 y 3). Map toma el valor de cada uno de ellos y le aplica la función recibida. En este caso multiplica por 10 cada uno de los elementos.

Reduce

Funciona igual que en los Arrays de JavaScript. Aplica una función a un acumulador y a cada valor (de izquierda a derecha) para reducirlo a uno solo.

reduce
// RxJS v6+
import { from } from 'rxjs';
import { reduce } from 'rxjs/operators';


const numbers = from([1, 2, 3, 4, 5])
numbers
	.pipe(reduce((x, y) => x + y))
	.subscribe(n => console.log(n))

mergeMap

También como flatMap. Es un operador tremendamente potente. En muchas ocasiones se da la casuística en la que tenemos un observable cuyos elementos emitidos son también observables, con la complejidad que ello conlleva. Lo que va a hacer mergeMap es transformar un observable en otros observables y unificar la salida de los mismos bajo un solo stream.

flatmap
// RxJS v6+
import { from, of } from 'rxjs';
import { mergeMap } from 'rxjs/operators';

const values = from([
    of(1,2,3),
    of(4,5,6),
    of(7,8,9)
])

values
	.pipe(mergeMap(v =>v))
	.subscribe(v => console.log(v))

En este ejemplo, value es un observable que emite a su vez otros tres observables. Flatmap lo que hace es un unwrap de cada uno de estos observables emitiendo en la secuencia principal uno solo.

Es importante tener en cuenta que flatMap no garantiza el orden en el stream resultante.

ConcatMap

Es similar a flatMap, aunque en este caso transforma un solo observable de origen a la vez. Por lo tanto, garantiza que los elementos emitidos en el stream resultante mantengan el mismo orden.

concatmap
​// RxJS v6+
import { from, of } from 'rxjs';
import { concatMap } from 'rxjs/operators';

const values = from([
    of(1,2,3),
    of(4,5,6),
    of(7,8,9)
])

values
	.pipe(concatMap(v =>v))
	.subscribe(v => console.log(v))

Al igual que sucedía con flatMap, concatMap hace un unwrap de cada uno de estos observables, emitiendo en la secuencia principal un solo stream con la diferencia de que en este caso asegura el orden.

Operadores de combinación

En multitud de ocasiones nos veremos en la casuística de tener que combinar más de un stream a la vez y es aquí donde entra este tipo de operadores. Mi operador de combinación preferido es zip.

Zip

Combina las emisiones de múltiples observables aplicando la operación de la función opcional especificada en el último parámetro.

zip
​// RxJS v6+
import { from, of } from 'rxjs';
import { zip } from 'rxjs/operators';


const numbers = of(1,2,3,4,5)
const strings = of('a', 'b', 'c', 'd')

zip(numbers, strings, (n,s)=>(n+s))
	.subscribe(v => console.log(v))

El operador zip emitirá un observable resultante cuando cada observable de origen emita un nuevo elemento (previamente descomprimido). En la práctica, muchas veces se suele utilizar para resolver varios observables a la vez, obteniendo los valores en la función opcional.

Resumen

Este es el artículo más largo que he escrito hasta el momento, pero creo que este es el post que me hubiera gustado encontrar hace unos años cuando empecé a estudiar la programación reactiva. Creo que es un buen resumen de cómo manejar la complejidad de la asincronía en general y, en particular, haciendo uso del paradigma reactivo junto con RxJS. Quizás se echa en falta algún ejemplo en un escenario real, pero eso lo dejaré para futuras entradas.

Si te ha gustado la entrada, valora y comparte en tus redes sociales. No dudes en comentar dudas, aportes o sugerencias. ¡Estaré encantado de responder!

Este artículo se distribuye bajo una Licencia Creative Commons Reconocimiento-CompartirIgual 4.0 Internacional (CC BY-SA 4.0)

licencia-cc

Por dónde seguir

Si quieres continuar mejorando como desarrollador Javascript te recomendamos nuestro e-book de Clean Code aplicado a JavaScript. Puedes empezar a leer los primeros capítulos gratis.

e-book de Clean Code aplicado a JavaScript