Si bien es cierto que Azure Service Bus proporciona SDKs para diferentes lenguajes (Go, Java, PHP, .NET, Ruby, Python y Node.js), es probable que estemos acostumbrados a trabajar con alguna librería para el protocolo AMQP, con la que ya tengamos algún desarrollo y queramos aprovechar lo que teníamos o que el lenguaje en el que desarrollamos nuestra aplicación no tenga disponible SDK. Hoy quería compartir contigo una pequeña prueba que he preparado para que veas que es posible usar este servicio con un cliente ajeno al SDK de Azure Service Bus.
El que envía
Para este ejemplo he utilizado rhea, una librería para AMQP en Node.js, para demostrarte cuál sería la configuración y cómo trabajar tanto con queues como con topics. El ejemplo no es ni mucho menos productivo pero te ayudará a saber qué paramétros debes configurar para poder conectarte a tu Service Bus y cómo mandar las application properties si estás utilizando filtros en las suscripciones:
const rhea = require('rhea');
require('dotenv').config();
var options = {
transport: 'tls',
host: process.env.SERVICE_BUS_HOSTNAME,
hostname: process.env.SERVICE_BUS_HOSTNAME,
username: process.env.SERVICE_BUS_POLICY_NAME,
password: process.env.SERVICE_BUS_POLICY_KEY,
port: 5671,
reconnect_limit: 100
}
var connection = rhea.connect(options);
connection.once('connection_close', () => console.log('connection_close'));
connection.once('disconnected', () => console.log('disconnected'));
connection.once('connection_open', async function(e) {
console.log('connection_open');
const session = connection.create_session();
// session events
session.once('session_open', () => console.log('session opened'));
session.once('session_close', () => console.log('session closed'));
session.begin();
// Queue
// const receiver = session.attach_receiver('myqueue', { });
// Topic
const sender = session.attach_sender(process.env.SERVICE_BUS_TOPIC_NAME, {});
// sender events
sender.once('sendable', () => console.log('sender opened'));
sender.once('sender_close', () => console.log('sender closed'));
sender.on('accepted', () => console.log('message accepted'));
sender.on('released', () => console.log('message released'));
sender.on('rejected', () => console.log('message rejected'));
sender.on('modified', () => console.log('message modified'));
// Send a message with application properties
await sender.send({
body: Buffer.from(JSON.stringify({
name: "Albert Einstein",
quote: "A person who never made a mistake never tried anything new.",
random_value: Math.random()
})),
application_properties: { "StoreId": "Store1" } // for Topic filters
});
});
Los parámetros que necesito para conectarme a mi servicio los he almacenado en un archivo .env, que tiene esta pinta:
SERVICE_BUS_HOSTNAME="<YOUR_SERVICE_BUS_NAME>.servicebus.windows.net"
SERVICE_BUS_POLICY_NAME="RootManageSharedAccessKey"
SERVICE_BUS_POLICY_KEY="<YOUR_POLICY_KEY>"
SERVICE_BUS_TOPIC_NAME="weather"
SERVICE_BUS_SUBSCRIPTION_NAME="sub1"
Cuando quieres enviar un mensaje y poder aplicar los filtros, si los estás usando en las suscripciones de tus topics, debes utilizar la propiedad application_properties con los valores que están sujetos a evaluación.
El que recibe
Desde el punto de vista del que recibe, la configuración no es muy diferente a la que acabas de ver:
const rhea = require('rhea');
require('dotenv').config();
var options = {
transport: 'tls',
host: process.env.SERVICE_BUS_HOSTNAME,
hostname: process.env.SERVICE_BUS_HOSTNAME,
username: process.env.SERVICE_BUS_POLICY_NAME,
password: process.env.SERVICE_BUS_POLICY_KEY,
port: 5671,
reconnect_limit: 100
}
var connection = rhea.connect(options);
// connection events
connection.once('connection_close', () => console.log('connection_close'));
connection.once('disconnected', () => console.log('disconnected'));
connection.once('connection_open', () => {
console.log('connection_open');
const session = connection.create_session();
// session events
session.once('session_open', () => console.log('session opened'));
session.once('session_close', () => console.log('session closed'));
session.begin();
// Queue
// const receiver = session.attach_receiver('myqueue', { });
// Topics
const receiver = session.attach_receiver({ name: process.env.SERVICE_BUS_SUBSCRIPTION_NAME, source: { address: process.env.SERVICE_BUS_TOPIC_NAME, durable: 2, expiry_policy: 'never' } });
// receiver events
receiver.once('receiver_open', () => console.log('receiver opened'));
receiver.once('receiver_close', () => console.log('receiver closed'));
// When I receive a message
receiver.on('message', ({ message, delivery }) => {
console.log('message received');
console.log('body: ', JSON.parse(message.body.toString()));
console.log(`application properties: ${JSON.stringify(message.application_properties)}`);
delivery.update(undefined, rhea.message.accepted().described()); // Complete
// delivery.update(undefined, rhea.message.rejected().described()); // DeadLetter
// delivery.update(undefined, rhea.message.modified().described({ undeliverable_here: true })); // Abandon
// delivery.update(undefined, rhea.message.released().described()); // Defer
});
});
Lo más importante en este caso es la configuración de la llamada a session.attach_receiver donde debemos indicar la suscripción dentro del topic al que nos queremos suscribir.
El resultado de ambos será el siguiente:

¡Saludos!

Bootcamp Backend
Si tienes ganas de ponerte al día con tecnologías de backend, formo parte del
equipo de docentes del Bootcamp Backend de Lemoncode, ¿Te animas a
aprender con nosotros?