En un artículo anterior te conté cómo Event Hubs puede ser útil para la ingesta masiva de datos. El ejemplo era muy simple, utilizando dos aplicaciones de consola: una que fuera el event publisher (el que genera los eventos) y otro el event consumer (el que los lee). Sin embargo, hay herramientas mucho más potentes a la hora de procesar este tipo de eventos, sobre todo aquellos que están en movimiento (Data in motion) y es por ello que hoy quiero hablarte de Azure Stream Analytics.
Cuando tienes una gran cantidad de datos llegando sin parar necesitas un sistema que te permita procesar toda esa información de la forma más efectiva posible. El motor debe ser capaz de gestionar esta información, además de proporcionar las herramientas necesarias para que el procesamiento de estos datos sea lo menos tedioso posible. Para simplificar todo esto Azure Stream Analytics se resume en tres partes:
- inputs: son las fuentes de donde se obtendrán los datos.
- query: a través de una consulta se tratan los datos para que se vuelquen en el apartado de output que elijamos de la forma que esperamos.
- outputs: destino donde se almacena el resultado de la consulta realizada.
Para que veas con un ejemplo cómo funciona, voy a utilizar como referencia el ejemplo que vimos en el post de Event Hubs, que se puede utilizar perfectamente como input.
En ese escenario tenías una aplicación llamada SenderUserEvents que enviaba eventos a un Event Hub con el número de usuarios por ciudad.

La he modificado para añadir una nueva propiedad llamada Id del tipo GUID, que veremos después cuál es su uso.
using Microsoft.ServiceBus.Messaging; using System; using System.Collections.Generic; using System.Configuration; using System.Text; using System.Threading; namespace SenderUserEvents { class Program { static void Main(string[] args) { //1. Create an Event Hub client var client = EventHubClient.CreateFromConnectionString(ConfigurationManager.AppSettings["EventHub"], "returngiseventhub"); //2. Mock locations var locations = new List<string>() { "Madrid", "Veracruz", "Melbourne", "Moscow", "Marseilles", "Manchester", "Munich", "Osaka" }; //3. Random objects var usersRandom = new Random(); var locationRandom = new Random(); while (true) { try { //4. Get a random location var index = locationRandom.Next(locations.Count); //5. Create an user event var usersEvent = new UsersEvent { Id = Guid.NewGuid(), Users = usersRandom.Next(0, 200), City = locations[index] }; //6. Serialize the objeto to json var json = Newtonsoft.Json.JsonConvert.SerializeObject(usersEvent); //7. Show the info to send Console.WriteLine("'{0}' Sending > '{1}'", DateTime.Now, json); //7. Send an event data client.Send(new EventData(Encoding.UTF8.GetBytes(json))); //8. Wait one second to start over Thread.Sleep(1000); } catch (Exception ex) { throw ex; } } } } }
El primer paso es crear una cuenta de Azure Stream Analytics a través NEW > DATA SERVICES > STREAM ANALYTICS > QUICK CREATE.

Debes elegir un nombre para el servicio y una cuenta de Azure Storage donde se almacenará la monitorización de los jobs dentro de esta cuenta.
Una vez creada puedes ver que existe una sección por cada uno de los puntos mencionados anteriormente.

Añadir input
Como ya te conté al principio de este post, los inputs son las fuentes de donde se consiguen los datos. Añade un nuevo input y selecciona la opción Data Stream.

A día de hoy es posible elegir entre un Event Hub y Blob Storage. Selecciona el primero de ellos para utilizar el ejemplo del post anterior.

Debes elegir la suscripción donde está tu cuenta de Event Hub, añadir un nombre para el input y seleccionar la política de acceso que creaste en el artículo anterior llamada ManagePolicy.

En este ejemplo, donde se montan los datos en formato JSON, puedes dejar seleccionados los valores que vienen por defecto en el apartado Serialization settings.

Añadir el output
Una vez creado el input, deberías pensar cuál va a ser el destino de esta información. Para este ejemplo voy a utilizar una Tabla del servicio Azure Storage.
Selecciona la sección de OUTPUT y crea uno nuevo. Dentro del listado elige Table Storage.
Al igual que con el input tienes que seleccionar una suscripción, elegir un nombre para el output, la tabla donde se van a almacenar los datos y, por último, indicar cuál será la partition key y la row key en cada elemento que se inserte.

En este ejemplo la partition key será el campo City y la row key el Id, que añadí dentro de la clase UsersEvent.
Ya tienes la entrada de datos y la salida.
Generar la query
Ya sabes desde dónde y hacia dónde van a ir los datos. Lo último que te falta por determinar es el cómo van a llegar estos a destino. Para este cometido está la sección QUERY donde puedes utilizar un lenguaje basado en SQL, del cual puedes encontrar toda la documentación aquí. En este ejemplo vamos a hacerlo fácil, devolviendo únicamente los valores que conocemos de nuestro objeto para que estos se inserten como registros en la tabla de la cuenta de Azure Storage.

Como puedes ver lo que estamos haciendo es seleccionar todas las columnas, una por una (también se podría utilizar * pero es poco recomendable en cualquier consulta), que tenemos en myeventhub (FROM) y mandándolas a nuestro output llamado tableoutput (INTO).
Comprobando el resultado
Con la aplicación de consola ejecutándose, pulsa el botón Start del menú de la parte inferior para que el sistema comience a funcionar.

Puedes llegar a tener hasta tres opciones:
- JOB START TIME: Comienza a leer inmediatamente, incluso si no hay datos.
- CUSTOM TIME: Se elige una fecha y hora concreta para comenzar.
- LAST STOPPED TIME: Empieza a leer desde la última vez que se paró el servicio. Esta opción estará disponible a partir de la primera vez.
Selecciona JOB START TIME y espera unos instantes a que se inicie el servicio. Utiliza cualquier cliente para Azure Storage para comprobar el resultado.

¡Saludos!