¿Qué es Microsoft Azure Event Hubs?

Imagina escenarios donde necesitas telemetría a gran escala, información a tiempo real de un conjunto de datos de un tamaño o a una velocidad importante. Esto puede ser para un gran sitio web, una aplicación móvil con muchísimos usuarios,  el mundo IoT, donde recopilar todos los datos de cada uno de ellos puede ser todo un reto, mucha información llegando al mismo tiempo o que se esté recopilando desde diferentes sitios. Todo ello puede estar sobrecargando tu máquina virtual o tu sitio web sólo con este tipo de información.

Event Hubs es una de las posibilidades dentro del abanico de Service Bus, donde además se encapsula la parte que queues, topics y el servicio de relay. Muchos podrían pensar que Event hubs no deja de ser un sistema de queues y topics, pero va mucho más allá. Antes de tener Event Hubs la solución podría ser almacenar esta información en una queue del mismo servicio con un tratamiento posterior (justo ayer salió Service Bus Premium que te permite beneficiarte de la arquitectura de Event hubs en queues). El inconveniente en ese escenario es que las colas no están preparadas para escalar del mismo modo que lo hace Event hubs, ya que la arquitectura de las colas y topics utiliza un patrón de competing consumer (es como una base de datos, todos están compitiendo por bloquear registros en la tabla) y este otro utiliza una arquitectura basada en particiones.

De manera simplificada, el servicio se compone de las siguientes piezas:

event ingestor

  • Event Publishers: se trata de todo aquello que envie datos o eventos a un Event Hub, los cuales pueden usar HTTPS o AMQP 1.0 para la ingesta. El mecanismo de autenticación es través de una Shared Access Signature (SAS) con un conjunto de permisos establecidos (Administrar, escribir, leer o todos los anteriores). Se puede publicar un único evento o en batch. Cada publicación tiene un límite de 256KB, independientemente de si es uno solo o un lote de ellos.
  • Event Hub: se trata del servicio en si, el cual gestiona la entrada y salida de datos.
  • Event consumer: toda entidad que lea de un Event hub toma este rol, que es el de leer la información que ha sido proporcionada por los event publishers. Se conectan a través de una sesión AMQP 1.0.

El servicio está montado sobre un patrón de particionado con el objetivo de mejorar la escalabilidad. Esto significa que deberías tener tantas particiones como event consumers estén leyendo del servicio.

Event hub partitions

El número de particiones debe estar entre 4 y 32 y no es modificable una vez que el servicio es creado, por lo que debes tener una estimación de cuántos consumers van a estar leyendo en paralelo. También puedes ampliar el limite de 32 abriendo un ticket de soporte.

Los mensajes recibidos se almacenan en un formato conocido como event data, el cual está formado por:

  • Offset: es la posición de un evento dentro de una partición en Event Hub. Puedes pensar en el como un cursor. Se puede utilizar por el consumer para indicar a partir de qué punto se quiere empezar a leer. Se puede especificar el offset como un timestamp o un valor offset.

    Event Hub offset
    Offset: posición de un evento dentro de una partición de Event Hub.
  • Sequence number: Se utiliza para facilitar el checking point, de lo que hablaremos después.
  • Body: se trata del cuerpo del mensaje.
  • User properties
  • System properties:

Otro concepto que debes conocer es el de consumer groups. Se tratan de vistas de un Event Hub. Te permite que varias aplicaciones puedan consumir la información utilizando una vista separada del flujo de datos por cada consumer. Siempre que hay que leer de un event hub es necesario tener un consumer group. Por defecto hay uno creado, llamado Default, pero puedes crear hasta 20 en el tipo Standard.

Por último hay que hablar del proceso check pointing por el cual los consumer marcan (hacen un commit) de su posición dentro de la secuencia de una partición. Esto significa que por cada consumer group, cada lector de cada partición debe informar de su posición actual cada cierto tiempo y así poder avisar al servicio cuando todos los datos hayan sido leídos. Si un consumer se desconecta de una partición, cuando vuelva a estar operativo comenzará desde el último checkpoint que fue enviado al servicio por el último reader de esa partición en ese consumer group.

Aclarados todos estos conceptos vamos a aterrizarlos a un caso práctico: el objetivo es enviar eventos con la cantidad de usuarios por ciudades alrededor del mundo, simulando las visitas de usuarios desde diferentes localizaciones. Lo primero que necesitamos es una aplicación que haga las veces de Event Publisher. Para este ejemplo he creado una aplicación de consola, SenderUserEvents.

Instala los paquetes WindowsAzure.ServiceBus y Newtonsoft.Json a través de Nuget , además de la librería System.Configuration. El código que te propongo es el siguiente:

using Microsoft.ServiceBus.Messaging;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Text;
using System.Threading;
namespace SenderUserEvents
{
    class Program
    {
        static void Main(string[] args)
        {
            //1. Create an Event Hub client
            var client = EventHubClient.CreateFromConnectionString(ConfigurationManager.AppSettings["EventHub"], "returngiseventhub");
            //2. Mock locations
            var locations = new List<string>() { "Madrid", "Veracruz", "Melbourne", "Moscow", "Marseilles", "Manchester", "Munich", "Osaka" };
            //3. Random objects
            var usersRandom = new Random();
            var locationRandom = new Random();

            while (true)
            {
                try
                {
                    //4. Get a random location
                    var index = locationRandom.Next(locations.Count);
                    //5. Create an user event 
                    var usersEvent = new UsersEvent
                    {
                        Users = usersRandom.Next(0, 200),
                        City = locations[index]
                    };
                    //6. Serialize the objeto to json
                    var json = Newtonsoft.Json.JsonConvert.SerializeObject(usersEvent);
                    //7. Show the info to send
                    Console.WriteLine("'{0}' Sending > '{1}'", DateTime.Now, json);
                    //7. Send an event data 
                    client.Send(new EventData(Encoding.UTF8.GetBytes(json)));
                    //8. Wait one second to start over
                    Thread.Sleep(1000);
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
        }
    }
}
  1. Lo primero que necesitas es crear un servicio de Event Hub. Para ello basta con ir al portal https://manage.windowsazure.com y seleccionar NEW > APP SERVICES > SERVICE BUS > EVENT HUB y selecciona QUICK CREATE para esta demo, donde debes elegir  el nombre del Event Hub, la región y el namespace.
    Create an Azure Event Hub
    Create an Azure Event Hub

    Una vez que el servicio esté activo debes pulsar sobre el namespace, seleccionar la parte de Event Hubs y hacer clic sobre el nombre del nuevo elemento. Selecciona la sección CONFIGURE donde verás la siguiente información:

    Event hub - Configure section
    Event hub – Configure section
    • MESSAGE RETENTION: Se trata del número de días que los mensajes serán retenidos en el servicio. Por defecto el valor es de un día y es ampliable hasta 7.
    • EVENT HUB STATE: Te permite habilitar/deshabilitar un event hub. Por defecto está habilitado cuando se crea el servicio.
    • PARTITION COUNT: Se habló al inicio del post. Cuando creas un event hub a través de quick create asigna por defecto 4 particiones, lo que supondría que podríamos tener hasta 4 consumers leyendo en paralelo.
    • Shared access policies: Esta es la parte que realmente nos importa ahora mismo: aquí deberías administrar las políticas de acceso a tu Event hub. Al menos deberías de tener dos: una que solo se utilice para enviar eventos y otra que pueda leer, administrar y enviar. Crea dos políticas con esta configuración:
      Event hub - Shared access policies
      Event hub – Shared access policies

      Una vez que hagas clic en el botón SAVE podrás recuperar las cadenas de conexión en el apartado DASHBOARD a través del botón Connection Information.

      Event hub - Connection information
      Event hub – Connection information

      Copia la llamada SendPolicy  y añadela en el apartado appSettings del archivo de configuración del proyecto con key EventHub. En el primer punto del código ya está asociada esta clave. Además debes especificar el nombre del servicio en la misma línea como segundo parámetro como se muestra en el código anterior.

  2. Esta línea de código se trata de un conjunto de ciudades que de manera aleatoria asignará un número de usuarios en la ingesta.
  3. Se crear un par de objetos de tipo random para asignar una ciudad y número de usuarios en cada uno de los eventos que se envíen.
  4. Se recupera el indice de una de las ciudades.
  5. Se crea un objeto del tipo UsersEvent, que sólo tiene las propiedades Users y City que será la información que se envíe.
  6. Se serializa el objeto anterior a formato JSON.
  7. Se crea un objeto EventData que encapsula el json anterior y se envía a Event Hub.
  8. Espera un segundo para volver a realizar un nuevo envio.

Cuando lances la aplicación el resultado debe ser similar al siguiente:

Event hub - Sender - Output
Event hub – Sender – Output

Ahora sólo quedael extremo que lea todos esos datos que se han mandado a Event hub. Lo ideal es que estos se exploten de una forma sencilla y organizada, pero para este ejemplo vamos a utilizar otra aplicación de consola con el fin de comprobar que es posible obtenerlos correctamente. Crea un nuevo proyecto llamado ReceiverUsersEvents, instala el paquete Microsoft.Azure.ServiceBus.EventProcessorHost y agrega la librería System.Configuration. El código para el consumer es el siguiente:

using Microsoft.ServiceBus.Messaging;
using System;
using System.Configuration;
namespace ReceiverUsersEvents
{
    class Program
    {
        static void Main(string[] args)
        {
            //1. Create an event processor host
            var eventProcessorHost = new EventProcessorHost(
                "myeventprocessorhost", //Event Processor host name
                "returngiseventhub", //Your event hub name
                EventHubConsumerGroup.DefaultGroupName, //Consumer group
                ConfigurationManager.AppSettings["EventHub"], //Event hub connection string
                ConfigurationManager.AppSettings["StorageAccount"]); //Storage connection string

            Console.WriteLine("Registering the event processor...");
            //2. Register the event processor
            eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>().Wait();
            Console.WriteLine("Press any key to stop the worker.");
            Console.ReadLine();
            //3. Unregistering the event processor
            eventProcessorHost.UnregisterEventProcessorAsync().Wait();
        }
    }
}
  1. Lo primero que necesitas es crear un objeto del tipo EventProcessorHost, donde debemos pasarle un nombre para el mismo, el nombre del event hub, el consumer group que en este caso será el que tenemos por defecto, la cadena de conexión de Event Hub (la creada anteriormente llamada ManagePolicy) y la cadena de conexión de una cuenta de Azure Storage.
  2. Registra el nuevo event processor host como lector de tu event hub. Debes de tener una clase que implemente IEventProcessorHost la cual gestionará lo que ocurre cuando se cierra el proceso, cuando se inicia y cuando se recibe un evento cómo debe procesarlo.
    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    using Microsoft.ServiceBus.Messaging;
    using System.Diagnostics;
    using System.Text;
    namespace ReceiverUsersEvents
    {
        class SimpleEventProcessor : IEventProcessor
        {
            Stopwatch checkpointStopWatch;
            async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
            {
                Console.WriteLine("IEventProcessor.CloseAsync: Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
                if (reason == CloseReason.Shutdown)
                {
                    await context.CheckpointAsync();
                }
            }
            public Task OpenAsync(PartitionContext context)
            {
                Console.WriteLine("IEventProcessor.OpenAsync: Partition '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
                checkpointStopWatch = new Stopwatch();
                checkpointStopWatch.Start();
                return Task.FromResult<object>(null);
            }
            async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
            {
                Console.WriteLine("IEventProcessor.ProcessEventsAsync");
                foreach (EventData eventData in messages)
                {
                    string data = Encoding.UTF8.GetString(eventData.GetBytes());
                    Console.WriteLine(string.Format("Message received. Partition: '{0}', Data: '{1}'", context.Lease.PartitionId, data));
    
                    //Every 5 minutes CheckpointAsync
                    if (checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
                    {
                        await context.CheckpointAsync();
                        checkpointStopWatch.Restart();
                    }
                }
            }
        }
    }
  3. Una vez haya finalizado la lectura de datos, debes desenlazarte del event hub.

Este será el resultado que verás por pantalla:

Event hub - Receiver - Output
Event hub – Receiver – Output

Por último es importante mencionar cómo escala Event hub: La capacidad de ancho de banda se mide por unidades pre reservadas. Una sola unidad incluye:

  • Entrada: hasta 1MB por segundo o 1000 eventos por segundo, lo que ocurra primero. Si se sobrepasa se lanzará una excepción «quota exceeded».
  • Salida: hasta 2MB por segundo. Esta no produce excepciones

Las unidades se facturan por hora y como mínimo te van a cobrar una hora. Por defecto hay un límite de 20 unidades por cuenta de Azure pero es posible ampliarlo a través de soporte.

¡Saludos!