RabbitMQ Monitoring: Pushing Queue Metrics to Elasticsearch with Python script

Updated:

Monitoring RabbitMQ queues is critical for maintaining the health and performance of the RabbitMQ distributed system. This post is about a Python script that will collect the RabbitMQ queue metrics through it’s API and send them to an Elasticsearch in a data stream. I wrote the script so that I can keep the record of consumers nodes history for the queues.

Technologies I used to monitor.
RabbitMQ: The message broker providing the metrics via its Management API.
Python (requests, json, logging): For fetching, processing, and ingesting the data.
Elasticsearch (ES): The robust search and analytics engine used for storing and querying data.

The process, orchestrated by the metrics_rabbitmq() function, begins by defining the target RabbitMQ Management Hosts and Elasticsearch Nodes. The script then iterates through each RabbitMQ host, first retrieving a list of all queues using the Management API’s /api/queues endpoint. Next, for every queue identified, it makes a subsequent call to the detailed queue endpoint (/api/queues/{vhost}/{queues}) querying every vhosts to fetch comprehensive metrics, including message counts, consumer information, and other metrics. This data is further processed by adding a standardized @timestamp, and adding user friendly field names node_name, queue_name and consumer_ip. Finally, the processed document is ingested into the Elasticsearch through Data Stream. To push the metrics, API key is used for the Elasticsearch authentication and basic authentication credentials to authenticate the RabbitMQ. They all are defined in the .env file.

Create a .env file in the project directory first.

ES_API_KEY="<your ES KEY>"
RABBIT_USER="admin"
RABBITT_PASS="pass"

main.yml file.

import os
import requests
import json
from datetime import datetime, timezone
from dotenv import load_dotenv
import logging
# RabbitMQ hosts
RABBIT_HOSTS = [
"http://srv01.abc.com:15672",
"http://srv02.abc.com:15672",
"http://srv03.abc.com:15672",
"http://srv04:15672",
"http://srv05.com:15672"
]
# Elasticsearch hosts
ES_NODES = [
"http://es01.abc.com:9200",
"http://es02.abc.com:9200"
]
load_dotenv()
RABBIT_USER = os.getenv("RABBIT_USER")
RABBIT_PASS = os.getenv("RABBIT_PASS")
ES_API_KEY = os.getenv("ES_API_KEY")
DATA_STREAM = "logs-rabbitmq-prod"
# Logging setup
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.getLogger("urllib3").setLevel(logging.WARNING)
# Headers for Elasticsearch REST API
ES_HEADERS = {
"Content-Type": "application/json",
"Authorization": f"ApiKey {ES_API_KEY}"
}
def fetch_queues(host):
url = f"{host}/api/queues"
try:
logging.info(f"Fetching queues from {host}")
response = requests.get(url, auth=(RABBIT_USER, RABBIT_PASS), timeout=10)
response.raise_for_status()
queues = response.json()
logging.info(f"Successfully fetched {len(queues)} queues from {host}")
return queues
except requests.RequestException as e:
logging.error(f"Failed to fetch queues from {host}: {e}")
return []
def fetch_queue_details(host, vhost, queue_name):
if vhost == "/":
vhost = "%2F"
url = f"{host}/api/queues/{vhost}/{queue_name}"
try:
response = requests.get(url, auth=(RABBIT_USER, RABBIT_PASS), timeout=10)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logging.error(f"Failed to fetch queue detail {queue_name} from {host}: {e}")
return None
def clean_queue_data(data):
data["@timestamp"] = datetime.now(timezone.utc).isoformat()
if "node" in data and isinstance(data["node"], str):
data["node_name"] = data["node"].split("@")[1] if "@" in data["node"] else data["node"]
if "name" in data:
data["queue_name"] = data["name"]
if "slave_nodes" in data and isinstance(data["slave_nodes"], list):
data["slave_nodes_name"] = [node.split("@")[1] if "@" in node else node for node in data["slave_nodes"]]
consumer_details = data.get("consumer_details", [])
consumer_ips = []
if isinstance(consumer_details, list):
for consumer in consumer_details:
channel_details = consumer.get("channel_details", {})
peer_host = channel_details.get("peer_host")
if peer_host:
consumer_ips.append(peer_host)
if consumer_ips:
data["consumer_ip"] = consumer_ips
if "backing_queue_status" in data and "delta" in data["backing_queue_status"]:
data["backing_queue_status"]["delta"] = str(data["backing_queue_status"]["delta"])
return data
def index_to_es(document):
success = False
for es_host in ES_NODES:
url = f"{es_host}/{DATA_STREAM}/_doc"
try:
response = requests.post(url, headers=ES_HEADERS, data=json.dumps(document), timeout=10)
if response.status_code in (200, 201):
success = True
break
else:
logging.warning(f"Failed to index to {es_host}: {response.status_code} {response.text}")
except requests.RequestException as e:
logging.warning(f"Error connecting to {es_host}: {e}")
if not success:
logging.error("Failed to index document to all ES hosts")
def metrics_rabbitmq():
for host in RABBIT_HOSTS:
queues = fetch_queues(host)
for q in queues:
vhost = q.get("vhost")
qname = q.get("name")
details = fetch_queue_details(host, vhost, qname)
if details:
details = clean_queue_data(details)
index_to_es(details)
if __name__ == "__main__":
metrics_rabbitmq()
view raw main.py hosted with ❤ by GitHub

Output of the script:

2025-12-10 13:22:58 [INFO] Fetching queues from http://srv01.abc.com:15672 
2025-12-10 13:22:58 [INFO] Successfully fetched 365 queues from http://srv01.abc.com:15672 
2025-12-10 13:23:41 [INFO] Fetching queues from http://srv02.abc.com:15672 
2025-12-10 13:23:41 [ERROR] Failed to fetch queues from http://srv02.abc.com:15672: 401 Client Error: Unauthorized for url: http://srv02.abc.com:15672/api/queues 
2025-12-10 13:23:41 [INFO] Fetching queues from http://srv03.abc.com:15672 
2025-12-10 13:23:41 [INFO] Successfully fetched 28 queues from http://srv03.abc.com:15672


Screenshot from the Kibana.



Leave a comment