Usar las colas de mensajes de Azure Storage en PHP

Ya has visto cómo trabajar con los blobs y con las tablas de Azure Storage en PHP. Para cerrar el círculo, hoy te voy a contar cómo utilizar las colas de mensajes, que también te ofrece el servicio.

Crea un nuevo proyecto con Symfony a través de este comando:

composer create-project symfony/website-skeleton azure-storage-queues-php

Dentro del repositorio Microsoft Azure Storage PHP Client Libraries, esta vez vamos a necesitar la librería azure-storage-queue. Puedes instalarla a través de composer require:

composer require microsoft/azure-storage-queue

Al igual que en los dos ejemplos anteriores, vamos a crear una clase llamada QueueService, donde encapsular toda la lógica que tenga que ver con el servicio.

<?php

namespace App\Service;

use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\PeekMessagesOptions;
use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use Psr\Log\LoggerInterface;

class QueueService
{
    private $logger;
    private $queueClient;

    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
        $this->queueClient = QueueRestProxy::createQueueService($_SERVER['AZURE_STORAGE_CONNECTION_STRING']);
    }

    public function sendMessage($msg, $queue = 'inbox')
    {
        try {
            $this->createQueue($queue);
            $this->queueClient->createMessage($queue, $msg);
        } catch (ServiceException $exception) {
            $this->logger->error('failed to create the messages: ' . $exception->getCode() . ':' . $exception->getMessage());
            throw $exception;
        }
    }

    public function peekMessages($queue = 'inbox')
    {
        try {
            // OPTIONAL: Set peek message options.
            $message_options = new PeekMessagesOptions();
            $message_options->setNumberOfMessages(10); // Default value is 1.

            $listMessagesResult = $this->queueClient->peekMessages($queue, $message_options);
            return $listMessagesResult->getQueueMessages();
        } catch (ServiceException $exception) {
            $this->logger->error('failed to get the messages: ' . $exception->getCode() . ':' . $exception->getMessage());
            throw $exception;
        }
    }

    public function getMessage($queue = 'inbox')
    {
        try {
            // $message_options = new ListMessagesOptions();
            // $message_options->setNumberOfMessages(5);
            $listMessagesResult = $this->queueClient->listMessages($queue);
            return $listMessagesResult->getQueueMessages();
        } catch (ServiceException $exception) {
            $this->logger->error('failed to get the messages: ' . $exception->getCode() . ':' . $exception->getMessage());
            throw $exception;
        }
    }

    public function deleteMessage($messageId, $popReceipt, $queue = 'inbox')
    {
        try {
            $this->queueClient->deleteMessage($queue, $messageId, $popReceipt);
        } catch (ServiceException $exception) {
            $this->logger->error('failed to delete the message: ' . $exception->getCode() . ':' . $exception->getMessage());
            throw $exception;
        }
    }

    public function createQueue($name)
    {
        try {
            $this->queueClient->createQueue($name);

        } catch (ServiceException $exception) {
            $this->logger->error('failed to create the queue: ' . $exception->getCode() . ':' . $exception->getMessage());
        }
    }
}

La cadena de conexión debe de tener el siguiente formato:

AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=YOUR_ACCOUNT_NAME;AccountKey=YOUR_ACCOUNT_KEY;

Ahora crea un controlador, QueuesController, desde donde utilizarás QueueService.

php bin/console make:controller QueuesController

Reemplaza el contenido de src/Controller/QueuesController.php por el siguiente:

<?php

namespace App\Controller;

use App\Service\QueueService;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\JsonResponse;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\Routing\Annotation\Route;

class QueuesController extends AbstractController
{
    /**
     * @Route("/", name="queues")
     */
    public function index(QueueService $messaging)
    {
        return $this->render('queues/index.html.twig', [
            'controller_name' => 'QueuesController',
            'messages' => $messaging->peekMessages(),
        ]);
    }

    /**
     * @Route("/message")
     */
    public function getMessage(QueueService $messaging)
    {
        $message = $messaging->getMessage();
        return new JsonResponse(array(
            'id' => $message[0]->getMessageId(),
            'popReceipt' => $message[0]->getPopReceipt(),
            'text' => $message[0]->getMessageText(),
            'insertionDate' => $message[0]->getInsertionDate(),
            'dequeueCount' => $message[0]->getDequeueCount(),
        ));
    }

    /**
     * @Route("/message/delete/{id}/{popReceipt}")
     */
    public function delete($id, $popReceipt, QueueService $messaging)
    {
        $messaging->deleteMessage($id, $popReceipt);
        return $this->redirectToRoute('queues');
    }

    /**
     * @Route("/send/message")
     */
    public function send(Request $request, QueueService $messaging)
    {
        $message = $request->get('message');
        $messaging->sendMessage($message);
        return $this->redirectToRoute('queues');
    }
}

Como ves, en este caso tengo cuatro acciones: la acción index, que recuperará todos los mensajes sin afectar a su invisibilidad, llamando a la función peekMessages, getMessage para recuperar uno a uno los mensajes de la cola, delete para eliminar mensajes y send para crear mensajes nuevos.

Para ver que todo funciona correctamente, modifica la plantilla template/base.html.twig para añadir la librería de bootstrap y toastr, que nos ayudará a mostrar notificaciones al usuario.

<!DOCTYPE html>
<html>
    <head>
        <meta charset="UTF-8">
        <title>{% block title %}Welcome!{% endblock %}</title>
        {% block stylesheets %}
        <link rel="stylesheet" href="https://stackpath.bootstrapcdn.com/bootstrap/4.2.1/css/bootstrap.min.css" integrity="sha384-GJzZqFGwb1QTTN6wy59ffF1BuGJpLSa9DkKMp0DgiMDm4iYMj70gZWKYbI706tWS" crossorigin="anonymous">
        <link rel="stylesheet" href="https://cdnjs.cloudflare.com/ajax/libs/toastr.js/latest/css/toastr.min.css" />
        {% endblock %}
    </head>
    <body>
    <div class="container">
        {% block body %}{% endblock %}
     </div>
        <script src="https://code.jquery.com/jquery-3.3.1.min.js" integrity="sha256-FgpCb/KJQlLNfOu91ta32o/NMZxltwRo8QtmkMRdAu8=" crossorigin="anonymous"></script> <script src="https://cdnjs.cloudflare.com/ajax/libs/toastr.js/latest/js/toastr.min.js"></script> {% block javascripts %}{% endblock %} </body></html>

Además, reemplaza el contenido de templates/queues/index.html.twig por el siguiente, donde he utilizado fetch y toastr para ir mostrando los mensajes que se van agregando a la cola de mensajes:

{% extends 'base.html.twig' %}

{% block title %}Hello QueuesController!{% endblock %}

{% block body %}
<div class="row">
<div class="col-lg-12 jumbotron">
    <h1>Azure Queue storage</h1>
    <p class="lead">It's a service for storing large numbers of messages that can be accessed from anywhere in the world via authenticated calls using HTTP or HTTPS. A single queue message can be up to 64 KB in size, and a queue can contain millions of messages, up to the total capacity limit of a storage account.</p>
</div>
</div>
<div class="row">
    <div class="col">
    <form action="/send/message" method="post" class="form-inline">
            <div class="input-group mb-3">
                <input type="text" name="message" class="form-control" placeholder="Send a new message" aria-label="New message" aria-describedby="button-addon2">
                <div class="input-group-append">
                    <button class="btn btn-outline-secondary" type="submit" id="button-addon2">Send</button>            
                </div>
            </div>
        </form>
    </div>
</div>
<div class="row">
     {% for message in messages %}
        <div class="col-sm-6">
            <div class="card p-3 text-right" style="margin-bottom: 5px">
                <blockquote class="blockquote mb-0">
                    <p>{{message.getMessageText()}}</p>
                    <footer class="blockquote-footer">
                        <small class="text-muted">
                            Last enqueued {{date().diff(message.getInsertionDate()).i}} minutes ago.<br/><cite title="Source Title">Dequeued {{message.getDequeueCount()}} times.</cite>
                        </small>
                    </footer>
                </blockquote>
            </div>
        </div>            
      {% endfor %}
</div>
{% endblock %}
{% block javascripts %}
<script>
    toastr.options = {
        "closeButton": true,
        "debug": false,
        "newestOnTop": false,
        "progressBar": true,
        "positionClass": "toast-top-right",
        "preventDuplicates": false,
        "onclick": null,
        "showDuration": "300",
        "hideDuration": "1000",
        "timeOut": "5000",
        "extendedTimeOut": "1000",
        "showEasing": "swing",
        "hideEasing": "linear",
        "showMethod": "fadeIn",
        "hideMethod": "fadeOut"
    };
    setInterval(()=>{
        fetch('http://127.0.0.1:8000/message')
            .then(res=>res.json())
            .then((data)=>{
                console.log(data);
                toastr.options.onCloseClick = () => {
                     fetch(`http://127.0.0.1:8000/message/delete/${data.id}/${data.popReceipt}`)
                     .then(()=>{
                         console.log('message deleted');
                     });
                }
                toastr.info(data.text,'Inbox');
            });
    },10000);
</script>
{% endblock %}

El resultado será parecido a este:

Ejemplo de Azure Storage Queues en PHP
Ejemplo de Azure Storage Queues en PHP

El ejemplo completo lo tienes en Github.

¡Saludos!