Mover archivos entre Storage Accounts con Azure Functions y Event Grid

Otra de las tareas a la que muchos desarrolladores se enfrentan cuando trabajan con Azure Storage es el movimiento de archivos entre cuentas. Muchos clientes suben archivos al cloud, llevan a cabo un procesamiento sobre los mismos y posteriormente guardan en un almacenamiento cold el original, lo cual permite que sea más económico. En este artículo te cuento cómo transferir blobs entre cuentas con la ayuda de Azure Functions y Event Grid.

Azure Function para la copia del archivo

A día de hoy, no existe una acción mover como tal, programáticamente hablando, sino que es necesario llevar a cabo las dos acciones que implican un movimiento, esto es: copia del archivo en destino y su posterior eliminación en origen.
El motivo por el cual se lanza esta copia pueden ser diversos y pueden llegar a tu función desde diferentes vías. Para este ejemplo, voy a utilizar un script en Python y el trigger para Event Grid para que simplemente dejando el archivo en una cuenta de almacenamiento comience todo el proceso.
El código del ejemplo es el siguiente:

import json
import logging
import os

import azure.functions as func
from azure.storage.blob import BlobServiceClient, generate_blob_sas, AccessPolicy, BlobSasPermissions
from azure.core.exceptions import ResourceExistsError
from datetime import datetime, timedelta


def main(event: func.EventGridEvent):
    result = json.dumps({
        'id': event.id,
        'data': event.get_json(),
        'topic': event.topic,
        'subject': event.subject,
        'event_type': event.event_type,
    })

    logging.info('Python EventGrid trigger processed an event: %s', result)

    blob_service_client = BlobServiceClient.from_connection_string(
        os.environ.get('ARCHIVE_STORAGE_CONNECTION_STRING'))

    # Get the URL and extract the name of the file and container
    blob_url = event.get_json().get('url')
    logging.info('blob URL: %s', blob_url)
    blob_name = blob_url.split("/")[-1].split("?")[0]
    container_name = blob_url.split("/")[-2].split("?")[0]
    archived_container_name = container_name + '-' + os.environ.get('AZURE_STORAGE_ARCHIVE_CONTAINER')

    blob_service_client_origin = BlobServiceClient.from_connection_string(os.environ.get('ORIGIN_STORAGE_CONNECTION_STRING'))

    blob_to_copy = blob_service_client_origin.get_blob_client(container=container_name, blob=blob_name)

    sas_token = generate_blob_sas(
        blob_to_copy.account_name,
        blob_to_copy.container_name,
        blob_to_copy.blob_name,
        account_key=blob_service_client_origin.credential.account_key,        
        permission=BlobSasPermissions(read=True),
        start=datetime.utcnow() + timedelta(seconds=1),
        expiry=datetime.utcnow() + timedelta(hours=1))

    logging.info('sas token: %s',sas_token)

    archived_container = blob_service_client.get_container_client(archived_container_name)

    # Create new Container
    try:
        archived_container.create_container()
    except ResourceExistsError:
        pass

    copied_blob = blob_service_client.get_blob_client(
        archived_container_name, blob_name)

    blob_to_copy_url = blob_url + '?' + sas_token

    logging.info('blob url: ' + blob_to_copy_url)

    # Start copy
    copied_blob.start_copy_from_url(blob_to_copy_url)

Como ves, una vez que el evento ha sido capturado por la función, lo primero que hace es mostrar la información que viene como parte del evento. He instalado el modulo azure.storage.blob para crear dos clientes: uno que utiliza la cadena de conexión del origen y otra del destino. La del origen la necesito porque tengo mis blobs en contenedores privados y necesito crear un token para su acceso. La del destino es necesaria para poder hacer la copia.
La información sobre el blob que necesitas mover la recupero de la propiedad url, que viene como parte del evento y creo un contenedor en destino terminado en -archived (este valor también lo he incluido como una variable de entorno más, por si se quiere utilizar otro nombre). Una vez hecho esto, comienzo la copia del blob, la cual la hace el propio servicio de Azure Storage de manera asíncrona.

En el archivo requirements.txt he añadido los siguientes módulos:

azure-functions
azure-storage
azure-storage-blob
azure-core

Estos serán utilizados tanto por la función de copia como por la función de eliminación, que te mostraré más adelante.

Subscripción de Event Grid a la cuenta de de origen

Una vez que despliegues tu nueva función en tu servicio de Azure Functions, lo siguiente que necesitas es crear una suscripción de Event Grid asociada a la cuenta de almacenamiento que quieres monitorizar. La forma más sencilla de crear esta suscripción es a través del portal, seleccionando la función que acabas de desplegar y haciendo clic sobre el enlace Add Event Grid Subscription.

Añadir una suscripción a Event Grid

En el asistente debes elegir el tipo de recurso, en este caso Azure Storage Accounts, la suscripción, el grupo de recursos y la cuenta de almacenamiento de origen. Por otro lado, selecciona únicamente el tipo de evento Blob Created.

Crear una suscripción a Event Grid

Puedes hacer que el evento se lance para cualquier blob que se cree dentro de dicha cuenta o bien puedes especificar rutas concretas a través del apartado Filters. Por ejemplo, podría hacer que solo los blobs creados en la carpeta processed fueran los que provocaran la invocación de esta función, utilizando este formato /blobServices/default/containers/processed en Subject Begins With.

Filtros para especificar rutas de Azure Storage

Una vez creada la suscripción puedes probar que al subir un archivo al contenedor processed en la cuenta de almacenamiento de origen esta lanzará tu nueva función. Puedes ver todas las ejecuciones en el apartado Monitor, aunque recuerda que puede tardar hasta 5 minutos en aparecer dicho evento.

En ocasiones, puedes encontrarte que la función no se ejecuta correctamente por diferentes motivos (el archivo tiene un nombre incorrecto para un blob, el código está mal, etc.). Esto hace que el evento capturado no se marque como procesado. Por defecto, el número de reintentos es 30 y esto puede hacer que ocurran numerosas llamadas para un archivo o un escenario que no tenemos contemplado. Es por ello, que también es posible controlar el número de reintentos por evento. Durante la creación de la suscripción, puedes configurarlo en el apartado Additional Features, o bien puedes modificar la suscripción existente a través de la cuenta de almacenamiento, en la sección Events seleccionando la suscripción que creaste anteriormente. Después en la sección Features puedes modificar el número de reintentos, entre otros valores:

Eliminación del archivo original una vez copiado

Si antes de seguir con el artículo has probado tu función, el archivo subido debería de haberse copiado en la cuenta de almacenamiento que has elegido como destino, en un contenedor llamado nombre_contenedor_origen-archived. Lo ideal es que dicha cuenta sea de tipo cold, si lo que quieres es almacenar los ficheros originales pero rara vez volverás a acceder a ellos.
Lo último que te queda es la eliminación del archivo original en la cuenta de origen. Para ello, he vuelto a utilizar Event Grid como mecanismo para detectar cuándo un blob nuevo se ha creado:

import json
import logging
import os

import azure.functions as func
from azure.storage.blob import BlobServiceClient, RetentionPolicy

def main(event: func.EventGridEvent):
    result = json.dumps({
        'id': event.id,
        'data': event.get_json(),
        'topic': event.topic,
        'subject': event.subject,
        'event_type': event.event_type,
    })

    logging.info('Python EventGrid trigger processed an event: %s', result)

    blob_service_client = BlobServiceClient.from_connection_string(os.environ.get('ORIGIN_STORAGE_CONNECTION_STRING'))

    # Create a retention policy to retain deleted blobs
    delete_retention_policy = RetentionPolicy(enabled=True, days=1)

    # Set the retention policy on the service
    blob_service_client.set_service_properties(delete_retention_policy=delete_retention_policy)

    # Blob info to delete
    blob_url = event.get_json().get('url')
    container_name = blob_url.split("/")[-2].split("?")[0].split("-")[0]
    blob_name = blob_url.split("/")[-1].split("?")[0]

    blob_to_delete = blob_service_client.get_blob_client(container=container_name,blob=blob_name)   
    
    blob_to_delete.delete_blob()

En este caso, debes seguir el mismo procedimiento para suscribirte al evento de creación del blob pero en la cuenta de destino. En el código lo primero que hago es establecer una retención de un día en la cuenta de origen para que cuando lo borre en las siguientes líneas se produzca un soft blob en lugar de un borrado total, por si fuera necesario recuperarlo en un tiempo de gracia.

El código de ejemplo lo tienes en mi GitHub.

¡Saludos!