Gestionar peticiones de larga duración con Azure Durable Functions y SignalR

Hay veces que por motivos ajenos a nuestra voluntad tenemos que esperar más de lo que nos gustaría. Esto también puede pasarte con APIs que no ofrecen un resultado para nada inmediato y es nuestra aplicación la que debe ingeniárselas para lidiar con ello. Esta es una consulta que me llegó esta semana donde efectivamente nos encontrábamos una llamada que tarda minutos en procesarse y hace que el cliente irremediablemente de un timeout antes de obtener la respuesta que tanto estaba esperando. En este artículo te cuento una posible solución utilizando Azure Durable Functions y SignalR.

Azure Durable Functions

Hace tiempo te conté que existe una extensión llamada Azure Durable Functions que te permite gestionar diferentes escenarios a través de diferentes patrones. Para este caso he utilizado el patrón Chaining, porque necesito que una función se ejecute detrás de otra, y Monitoring porque quiero ir informando al cliente web del estado del proceso, a través de SignalR como veremos después.

Cuando trabajas con Azure Durable Functions debes tener en cuenta que siempre deberían de entrar en juego al menos tres funciones:

  1. Un trigger: es el encargado de generar una nueva instancia de nuestro flujo.
  2. Un orquestador/es: son los que manejarán las actividades.
  3. Actividades: que son las que hacen el trabajo.

El trigger

Para este ejemplo voy a utilizar una petición HTTP para comenzar el proceso, aunque podría ser con cualquier otro. El código sería el siguiente:

using System.Net;
using System.Net.Http;
using System.Threading.Tasks;
using DurableFunctions.Model;
using DurableFunctions.Orchestrators;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;

namespace DurableFunctions.Triggers
{
    public static class HttpTrigger
    {
        [FunctionName(nameof(StartLongRunningCall))]
        public static async Task<HttpResponseMessage> StartLongRunningCall([HttpTrigger(AuthorizationLevel.Anonymous, "post")] HttpRequestMessage req, [DurableClient] IDurableOrchestrationClient starter, ILogger log)
        {
            var requestBody = await req.Content.ReadAsStringAsync();
            dynamic data = JsonConvert.DeserializeObject(requestBody);


            //Execute long running call using an orchestrator
            string instanceId = await starter.StartNewAsync(nameof(Workflows.LongRunningWorkflow), null, data?.wait);

            //Monitor the previous workflow
            await starter.StartNewAsync(nameof(Workflows.MonitorWorkflow), null, new Info { UserName = data?.userName, WorkflowId = instanceId });

            log.LogInformation($"Started orchestration with ID = '{instanceId}'.");

            return req.CreateResponse(HttpStatusCode.OK);
        }
    }
}

Como puedes ver, lo único que hago es recuperar el contenido del body y primeramente inicio el workflow que ejecutará las llamadas de larga duración. Una vez iniciado, con el id de la instancia inicio un segundo workflow que monitorizará el primero.

Workflows.LongRunningWorkflow

El primer método del que genero una instancia se trata de un orquestador que hace lo siguiente:

        [FunctionName(nameof(LongRunningWorkflow))]
        public static async Task<string> LongRunningWorkflow([OrchestrationTrigger] IDurableOrchestrationContext context)
        {

            var wait = context.GetInput<int>();

            //Long running operations
            var outputs = new List<string>
            {
                await context.CallActivityAsync<string>(nameof(LongRunningActivities.SayHello), wait),
                await context.CallActivityAsync<string>(nameof(LongRunningActivities.SayHello), wait * 2),
                await context.CallActivityAsync<string>(nameof(LongRunningActivities.SayHello), wait * 3)
            };            

            return string.Join("<br/>", outputs.ToArray());
        }

Lo único que ocurre dentro de esta función es la llamada secuencial a la actividad SayHello. Es decir, que llamaremos una primera vez a esta, esperaremos el tiempo que se ha recuperado a través de la variable wait, volveremos a lanzarla de nuevo con ese mismo tiempo multiplicado por dos y una tercera vez por tres. La actividad en sí, como lo que quiero es simular peticiones con una espera larga, es así de sencilla:

using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using System.Threading;

namespace DurableFunctions.Activities
{
    public static class LongRunningActivities
    {

        [FunctionName(nameof(SayHello))]
        public static string SayHello([ActivityTrigger] int wait)
        {
            Thread.Sleep(wait); // :-P

            return $"Hello me with {wait / 60000} minutes of delay";
        }
    }
}

Por resumir: el flujo que implementa el patrón chaining haría la llamada al trigger StartLongRunningCall, este generaría una instancia de la función LongRunningWorkflow, que es el orquestador, y este último llamaría a tres actividades del tipo SayHello.

Workflows.MonitorWorkflow

Una vez que está inicializado el primero podemos arrancar el segundo. El objetivo principal de este es revisar cada X tiempo el estado del workflow anterior:

        [FunctionName(nameof(MonitorWorkflow))]
        public static async Task MonitorWorkflow([OrchestrationTrigger] IDurableOrchestrationContext context)
        {
            var info = context.GetInput<Info>();

            await context.CallActivityAsync(nameof(SignalRActivities.SendUpdate), new Status { UserName = info.UserName, Message = $"Start monitoring for user {info.UserName}" });

            var expiryTime = context.CurrentUtcDateTime.AddHours(EXPIRY_TIME);

            var timeout = false;

            while (context.CurrentUtcDateTime < expiryTime)
            {
                timeout = false;

                var status = await context.CallActivityAsync<DurableOrchestrationStatus>(nameof(MonitorActivities.CheckWorkflowStatus), info.WorkflowId);

                if (status.RuntimeStatus == OrchestrationRuntimeStatus.Completed)
                {
                    await context.CallActivityAsync(nameof(SignalRActivities.SendUpdate), new Status { UserName = info.UserName, Message = $"Result: <br/> {status.Output}" });
                    await context.CallActivityAsync(nameof(SignalRActivities.SendUpdate), new Status { UserName = info.UserName, Message = $"Completed! It took {(context.CurrentUtcDateTime - status.CreatedTime).TotalMinutes} minutes." });
                    break;
                }
                else
                {
                    await context.CallActivityAsync(nameof(SignalRActivities.SendUpdate), new Status { UserName = info.UserName, Message = $"The call {status.Name}'s still running since {status.CreatedTime}. Last check time: {context.CurrentUtcDateTime}" });

                    var nextCheck = context.CurrentUtcDateTime.AddSeconds(POOLING);
                    await context.CreateTimer(nextCheck, CancellationToken.None);
                    timeout = true;
                }
            }

            if (timeout)
                await context.CallActivityAsync(nameof(SignalRActivities.SendUpdate), new Status { UserName = info.UserName, Message = "Time out!" });

        }

Esta función es algo más extensa pero para nada más complicada. Lo primero que hace es recuperar la información relativa al workflow anterior y al usuario que la ha iniciado, con el objetivo de poder informarle del estado de su petición. En diferentes puntos se hacen llamadas a una actividad llamada SignalRActivities.SendUpdate, que te puedes imaginar que lo que hace es mandar las actualizaciones a la aplicación cliente, que veremos después. Defino cuál será el tiempo de expiración de esta monitorización, donde utilizo EXPIRY_TIME con el número de horas que quiero, aunque puedo cambiarlo a minutos o segundos según necesite. Fijo una variable timeout a false que me ayudará a saber si se acabó el tiempo de monitorizar o por el contrario estoy dentro de plazo. Hago una llamada a la actividad MonitorActivities.CheckWorkflowStatus a la que solo debo enviarle el id de la instancia del flujo que quiero monitorizar:

using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;

namespace DurableFunctions.Activities
{
    public static class MonitorActivities
    {
        [FunctionName(nameof(CheckWorkflowStatus))]
        public static async Task<DurableOrchestrationStatus> CheckWorkflowStatus([ActivityTrigger] string instaceId, [DurableClient] IDurableClient client, ILogger logger)
        {
            return await client.GetStatusAsync(instaceId);
        }
    }
}

Aquí hago uso de uno de los métodos que me provee IDurableClient que es para comprobar justamente el estado de un workflow.

Si la operación se ha completado aviso al cliente y le muestro el resultado de las tres actividades anteriores. De lo contrario configuraré un Timer, que me provee Azure Durable Functions, para seguir esperando y comprobar el estado del workflow más tarde.

Ahora que ya están claras las dos orquestaciones que necesitamos, solo queda ver cómo SignalR nos ayuda a notificar al cliente.

Enviar notificaciones a través de Azure SignalR

Si nunca has visto Azure SignalR, se trata de un servicio basado en la tecnología SignalR que te permite que el servidor envíe peticiones asíncronas al cliente. Dependiendo de las capacidades de las que disponga el cliente que estemos utilizando, SignalR utilizará Websockets, Server-Sent Events, Pooling, etcétera pero nosotros como desarrolladores no tendremos que preocuparnos por ello.

Por el lado del cliente, este es el código que necesito:

HTML

@{
    ViewData["Title"] = "Home Page";
}

<div class="text-center">
    <h1 class="display-4">Welcome</h1>
    <p>Learn about <a href="https://docs.microsoft.com/aspnet/core">building Web apps with ASP.NET Core</a>.</p>
</div>

<div class="container">
    <div class="row" style="margin:10px;">
        <div class="col-12">
            <div class="input-group">
                <input id="userName" name="userName" class="form-control" type="text" placeholder="Enter your user name" />
                <input id="howLong" name="howLong" class="form-control" type="text" placeholder="How long do you want to wait? (minutes)" />
                <div class="input-group-append">
                    <button id="btnStartLongRunningCall" class="btn btn-danger" disabled>Start Long Running Call</button>
                </div>
            </div>

        </div>
    </div>
    <div class="row">
        <ul id="output" class="list-unstyled" style="font-family:Consolas">
        </ul>
    </div>
</div>

Para este ejemplo he utilizado la plantilla de ASP.NET Core con MVC y esta es la página Index, donde puedes ver que el contenido es muy sencillo. En el archivo Layout.cs debes tener la referencia a la librería de SignalR. Simplemente tengo dos inputs para que el usuario pueda indicar su nombre y cuánto tiempo quiere esperar y un botón para comenzar con el proceso que lanzará lo comentado anteriormente.

JavaScript

//wwwroot/js/site.js

let state = {
    userName: '',
    howLong: null,
    input: {},
    button: {},
    div: {}
};

let connection = null;

state.input.userName = document.getElementById('userName');
state.input.howLong = document.getElementById('howLong');
state.button.startLongRunningCall = document.getElementById('btnStartLongRunningCall');
state.div.output = document.getElementById('output');

const refreshUserNameAndHowLong = () => {

    state.userName = state.input.userName.value;
    state.howLong = state.input.howLong.value;

    if (state.userName.length > 1 && parseInt(state.howLong) > 0) {
        state.button.startLongRunningCall.disabled = false;
    }
    else {
        state.button.startLongRunningCall.disabled = true;
    }
};

state.input.userName.addEventListener('keyup', refreshUserNameAndHowLong);
state.input.howLong.addEventListener('keyup', refreshUserNameAndHowLong);


const CallLongRunningOperation = () => {

    const url = `${apiBase}/StartLongRunningCall`;
    let data = { userName: state.userName, wait: parseInt(state.howLong) * 60000 };

    fetch(url, {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json'
        },
        body: JSON.stringify(data)

    }).then((res) => {
        if (res.ok) {
            UpdateOutput(`Long Running Operation Launched`);
            res.text().then((text) => {
                UpdateOutput(text);
            });
        }

    }).catch((err) => {
        UpdateOutput(`Something bad happened: ${err}`);
    });

};


const UpdateOutput = (msg) => {
    state.div.output.innerHTML += `<li>${msg}</li>`;
};

const connectSignalR = () => {

    connection = new signalR.HubConnectionBuilder()
        .withUrl(`${apiBase}/${state.userName}`)
        .configureLogging(signalR.LogLevel.Information)
        .build();

    connection.on('SendUpdate', SendUpdate);

    connection.onclose(() => UpdateOutput('disconnected'));

    UpdateOutput('Connecting...');

    connection.start()
        .then(() => console.log)
        .then(() => {
            UpdateOutput('You are connected to SignalR');
            UpdateOutput('Calling Long Running Operation...');
            CallLongRunningOperation();
        })
        .catch((err) => UpdateOutput(err));

};

const SendUpdate = (status) => {
    console.log(status);
    UpdateOutput(status.Message);
};

state.button.startLongRunningCall.addEventListener('click', () => {
    connectSignalR();
});

He procurado no utilizar ninguna librería externa o framework para que el código sea lo más sencillo posible. En el archivo site.js, defino una serie de variables dentro del objeto state, así como una función que me ayudará a refrescar los valores del nombre del usuario y el tiempo de espera según se vayan introduciendo (refreshUserNameAndHowLong). Después tengo una función llamada CallLongRunningOperation que será quien haga la llamada al trigger que comenté al inicio llamado StartLongRunningCall. UpdateOuput simplemente se utiliza para actualizar la interfaz con los mensajes que vaya recibiendo del lado del servidor. La función que hace que arranque todo es connectSignalR, que se asocia al botón btnStartRunningCall al final. Esto es así porque necesito primero estar conectado a SignalR para poder recibir los mensajes relacionados con el workflow por el que estoy esperando. Además, dentro de esta función puedo asociar la conexión a las distintas funciones que quiero que puedan ser invocadas desde el servidor. En este caso asocio ‘SendUpdate’ a la función que me he creado en JavaScript con el mismo nombre. Para poder conectarme al servicio a través de Azure Functions necesito crear un trigger del tipo HTTP de la siguiente forma:

using Microsoft.AspNetCore.Http;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;

namespace DurableFunctions.Triggers
{
    public static class SignalRTrigger
    {
        [FunctionName(nameof(Negotiate))]
        public static SignalRConnectionInfo Negotiate([HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "{userName}/Negotiate")] HttpRequest req, [SignalRConnectionInfo(HubName = "updates", UserId = "{userName}")] SignalRConnectionInfo connectionInfo)
        {
            return connectionInfo;
        }
    }
}

Para que este trigger funcione necesitas añadir la librería Microsoft.Azure.WebJobs.Extensions.SignalRService, además de configurar en las App Settings de la Function la cadena de conexión del servicio de Azure SignalR, que debes crear en el portal de Azure, con el nombre AzureSignalRConnectionString. Esta función simplemente mandará la configuración necesaria de vuelta al cliente para poder recibir futuras notificaciones.

Una vez que el cliente obtiene una conexión de manera satisfactoria, llamo a CallLongRunningOperation para comenzar todo el proceso. Cuando te expliqué el flujo de monitorización, también comenté que en momentos puntuales hacía una llamada a SignalRActivities.SendUpdate, la cual es la actividad que se encargará de hacer las llamadas asíncronas al cliente con el nuevo estado:

using DurableFunctions.Model;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.DurableTask;
using Microsoft.Azure.WebJobs.Extensions.SignalRService;
using System.Threading.Tasks;

namespace DurableFunctions.Activities
{
    public static class SignalRActivities
    {
        const string HUB_NAME = "updates";

        [FunctionName(nameof(SendUpdate))]
        public static Task SendUpdate([ActivityTrigger] Status status, [SignalR(HubName = HUB_NAME)] IAsyncCollector<SignalRMessage> signalRMessages)
        {
            return signalRMessages.AddAsync(new SignalRMessage
            {
                UserId = status.UserName,
                Target = nameof(SendUpdate),
                Arguments = new[] { status }
            });
        }
    }
}

Como ves, utilizo el mismo HubName, updates, que utilicé en la generación de la conexión y a través de signalRMessages.AddAsync mando el mensaje con el estado del workflow a SendUpdate, que si recuerdas se mapeará con una función con el mismo nombre en el cliente y al UserId que corresponda, el cual se le pasó como parte de la ruta al generar la conexión de SignalR.

El resultado de todo esto debería de ser como el siguiente:

El código completo lo tienes en mi GitHub.

¡Saludos!