¿Qué es Azure Stream Analytics?

En un artículo anterior te conté cómo Event Hubs puede ser útil para la ingesta masiva de datos. El ejemplo era muy simple, utilizando dos aplicaciones de consola: una que fuera el event publisher (el que genera los eventos) y otro el event consumer (el que los lee). Sin embargo, hay herramientas mucho más potentes a la hora de procesar este tipo de eventos, sobre todo aquellos que están en movimiento (Data in motion) y es por ello que hoy quiero hablarte de Azure Stream Analytics.

Cuando tienes una gran cantidad de datos llegando sin parar necesitas un sistema que te permita procesar toda esa información de la forma más efectiva posible. El motor debe ser capaz de gestionar esta información, además de proporcionar las herramientas necesarias para que el procesamiento de estos datos sea lo menos tedioso posible. Para simplificar todo esto Azure Stream Analytics se resume en tres partes:

  • inputs: son las fuentes de donde se obtendrán los datos.
  • query: a través de una consulta se tratan los datos para que se vuelquen en el apartado de output que elijamos de la forma que esperamos.
  • outputs: destino donde se almacena el resultado de la consulta realizada.

Para que veas con un ejemplo cómo funciona, voy a utilizar como referencia el ejemplo que vimos en el post de Event Hubs, que se puede utilizar perfectamente como input.

En ese escenario tenías una aplicación llamada SenderUserEvents que enviaba eventos a un Event Hub con el número de usuarios por ciudad.

SenderUserEvents output
SenderUserEvents output

La he modificado para añadir una nueva propiedad llamada Id del tipo GUID, que veremos después cuál es su uso.

using Microsoft.ServiceBus.Messaging;
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Text;
using System.Threading;

namespace SenderUserEvents
{
    class Program
    {
        static void Main(string[] args)
        {
            //1. Create an Event Hub client
            var client = EventHubClient.CreateFromConnectionString(ConfigurationManager.AppSettings["EventHub"], "returngiseventhub");

            //2. Mock locations
            var locations = new List<string>() { "Madrid", "Veracruz", "Melbourne", "Moscow", "Marseilles", "Manchester", "Munich", "Osaka" };

            //3. Random objects
            var usersRandom = new Random();
            var locationRandom = new Random();


            while (true)
            {
                try
                {
                    //4. Get a random location
                    var index = locationRandom.Next(locations.Count);

                    //5. Create an user event 
                    var usersEvent = new UsersEvent
                    {
                        Id = Guid.NewGuid(),
                        Users = usersRandom.Next(0, 200),
                        City = locations[index]
                    };

                    //6. Serialize the objeto to json
                    var json = Newtonsoft.Json.JsonConvert.SerializeObject(usersEvent);

                    //7. Show the info to send
                    Console.WriteLine("'{0}' Sending > '{1}'", DateTime.Now, json);

                    //7. Send an event data 
                    client.Send(new EventData(Encoding.UTF8.GetBytes(json)));

                    //8. Wait one second to start over
                    Thread.Sleep(1000);
                }
                catch (Exception ex)
                {
                    throw ex;
                }
            }
        }
    }
}

El primer paso es crear una cuenta de Azure Stream Analytics a través NEW > DATA SERVICES > STREAM ANALYTICS > QUICK CREATE.

Create Azure Stream Analytics account
Create Azure Stream Analytics account

Debes elegir un nombre para el servicio y una cuenta de Azure Storage donde se almacenará la monitorización de los jobs dentro de esta cuenta.

Una vez creada puedes ver que existe una sección por cada uno de los puntos mencionados anteriormente.

Azure Stream Analytics: inputs, query y outputs
Azure Stream Analytics: inputs, query y outputs

Añadir input

Como ya te conté al principio de este post, los inputs son las fuentes de donde se consiguen los datos. Añade un nuevo input y selecciona la opción Data Stream.

Stream Analytics - Add an input to your job
Stream Analytics – Add an input to your job

A día de hoy es posible elegir entre un Event Hub y Blob Storage. Selecciona el primero de ellos para utilizar el ejemplo del post anterior.

Add a data stream to your job - Event Hub
Add a data stream to your job – Event Hub

Debes elegir la suscripción donde está tu cuenta de Event Hub, añadir un nombre para el input y seleccionar la política de acceso que creaste en el artículo anterior llamada ManagePolicy.

Analytics - Inputs - Event Hub settings
Analytics – Inputs – Event Hub settings

En este ejemplo, donde se montan los datos en formato JSON, puedes dejar seleccionados los valores que vienen por defecto en el apartado Serialization settings.

Analytics - Inputs - Event Hub - Serialization Settings
Analytics – Inputs – Event Hub – Serialization Settings

Añadir el output

Una vez creado el input, deberías pensar cuál va a ser el destino de esta información. Para este ejemplo voy a utilizar una Tabla del servicio Azure Storage.

Selecciona la sección de OUTPUT y crea uno nuevo. Dentro del listado elige Table Storage.

Analytics - Outputs - Table Storage

Al igual que con el input tienes que seleccionar una suscripción, elegir un nombre para el output, la tabla donde se van a almacenar los datos y, por último, indicar cuál será la partition key y la row key en cada elemento que se inserte.

Output - Table Storage - Settings
Output – Table Storage – Settings

En este ejemplo la partition key será el campo City y la row key el Id, que añadí dentro de la clase UsersEvent.

Ya tienes la entrada de datos y la salida.

Generar la query

Ya sabes desde dónde y hacia dónde van a ir los datos. Lo último que te falta por determinar es el cómo van a llegar estos a destino. Para este cometido está la sección QUERY donde puedes utilizar un lenguaje basado en SQL, del cual puedes encontrar toda la documentación aquí. En este ejemplo vamos a hacerlo fácil, devolviendo únicamente los valores que conocemos de nuestro objeto para que estos se inserten como registros en la tabla de la cuenta de Azure Storage.

Azure Stream Analytics - Query
Azure Stream Analytics – Query

Como puedes ver lo que estamos haciendo es seleccionar todas las columnas, una por una (también se podría utilizar * pero es poco recomendable en cualquier consulta), que tenemos en myeventhub (FROM) y mandándolas a nuestro output llamado tableoutput (INTO).

Comprobando el resultado

Con la aplicación de consola ejecutándose, pulsa el botón Start del menú de la parte inferior para que el sistema comience a funcionar.

Stream Analytics - Start outpu - job start time
Stream Analytics – Start output – job start time

Puedes llegar a tener hasta tres opciones:

  • JOB START TIME: Comienza a leer inmediatamente, incluso si no hay datos.
  • CUSTOM TIME: Se elige una fecha y hora concreta para comenzar.
  • LAST STOPPED TIME: Empieza a leer desde la última vez que se paró el servicio. Esta opción estará disponible a partir de la primera vez.

Selecciona JOB START TIME y espera unos instantes a que se inicie el servicio. Utiliza cualquier cliente para Azure Storage para comprobar el resultado.

Azure Storage Explorer - Azure Analytics Stream
Azure Storage Explorer – Azure Analytics Stream

¡Saludos!