RabbitMQ Monitoring: Pushing Queue Metrics to Elasticsearch with Python script
Updated:
Monitoring RabbitMQ queues is critical for maintaining the health and performance of the RabbitMQ distributed system. This post is about a Python script that will collect the RabbitMQ queue metrics through it’s API and send them to an Elasticsearch in a data stream. I wrote the script so that I can keep the record of consumers nodes history for the queues.
Technologies I used to monitor.
RabbitMQ: The message broker providing the metrics via its Management API.
Python (requests, json, logging): For fetching, processing, and ingesting the data.
Elasticsearch (ES): The robust search and analytics engine used for storing and querying data.
The process, orchestrated by the metrics_rabbitmq() function, begins by defining the target RabbitMQ Management Hosts and Elasticsearch Nodes. The script then iterates through each RabbitMQ host, first retrieving a list of all queues using the Management API’s /api/queues endpoint. Next, for every queue identified, it makes a subsequent call to the detailed queue endpoint (/api/queues/{vhost}/{queues}) querying every vhosts to fetch comprehensive metrics, including message counts, consumer information, and other metrics. This data is further processed by adding a standardized @timestamp, and adding user friendly field names node_name, queue_name and consumer_ip. Finally, the processed document is ingested into the Elasticsearch through Data Stream. To push the metrics, API key is used for the Elasticsearch authentication and basic authentication credentials to authenticate the RabbitMQ. They all are defined in the .env file.
Create a .env file in the project directory first.
ES_API_KEY="<your ES KEY>"
RABBIT_USER="admin"
RABBITT_PASS="pass"
main.yml file.
| import os | |
| import requests | |
| import json | |
| from datetime import datetime, timezone | |
| from dotenv import load_dotenv | |
| import logging | |
| # RabbitMQ hosts | |
| RABBIT_HOSTS = [ | |
| "http://srv01.abc.com:15672", | |
| "http://srv02.abc.com:15672", | |
| "http://srv03.abc.com:15672", | |
| "http://srv04:15672", | |
| "http://srv05.com:15672" | |
| ] | |
| # Elasticsearch hosts | |
| ES_NODES = [ | |
| "http://es01.abc.com:9200", | |
| "http://es02.abc.com:9200" | |
| ] | |
| load_dotenv() | |
| RABBIT_USER = os.getenv("RABBIT_USER") | |
| RABBIT_PASS = os.getenv("RABBIT_PASS") | |
| ES_API_KEY = os.getenv("ES_API_KEY") | |
| DATA_STREAM = "logs-rabbitmq-prod" | |
| # Logging setup | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s [%(levelname)s] %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| logging.getLogger("urllib3").setLevel(logging.WARNING) | |
| # Headers for Elasticsearch REST API | |
| ES_HEADERS = { | |
| "Content-Type": "application/json", | |
| "Authorization": f"ApiKey {ES_API_KEY}" | |
| } | |
| def fetch_queues(host): | |
| url = f"{host}/api/queues" | |
| try: | |
| logging.info(f"Fetching queues from {host}") | |
| response = requests.get(url, auth=(RABBIT_USER, RABBIT_PASS), timeout=10) | |
| response.raise_for_status() | |
| queues = response.json() | |
| logging.info(f"Successfully fetched {len(queues)} queues from {host}") | |
| return queues | |
| except requests.RequestException as e: | |
| logging.error(f"Failed to fetch queues from {host}: {e}") | |
| return [] | |
| def fetch_queue_details(host, vhost, queue_name): | |
| if vhost == "/": | |
| vhost = "%2F" | |
| url = f"{host}/api/queues/{vhost}/{queue_name}" | |
| try: | |
| response = requests.get(url, auth=(RABBIT_USER, RABBIT_PASS), timeout=10) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.RequestException as e: | |
| logging.error(f"Failed to fetch queue detail {queue_name} from {host}: {e}") | |
| return None | |
| def clean_queue_data(data): | |
| data["@timestamp"] = datetime.now(timezone.utc).isoformat() | |
| if "node" in data and isinstance(data["node"], str): | |
| data["node_name"] = data["node"].split("@")[1] if "@" in data["node"] else data["node"] | |
| if "name" in data: | |
| data["queue_name"] = data["name"] | |
| if "slave_nodes" in data and isinstance(data["slave_nodes"], list): | |
| data["slave_nodes_name"] = [node.split("@")[1] if "@" in node else node for node in data["slave_nodes"]] | |
| consumer_details = data.get("consumer_details", []) | |
| consumer_ips = [] | |
| if isinstance(consumer_details, list): | |
| for consumer in consumer_details: | |
| channel_details = consumer.get("channel_details", {}) | |
| peer_host = channel_details.get("peer_host") | |
| if peer_host: | |
| consumer_ips.append(peer_host) | |
| if consumer_ips: | |
| data["consumer_ip"] = consumer_ips | |
| if "backing_queue_status" in data and "delta" in data["backing_queue_status"]: | |
| data["backing_queue_status"]["delta"] = str(data["backing_queue_status"]["delta"]) | |
| return data | |
| def index_to_es(document): | |
| success = False | |
| for es_host in ES_NODES: | |
| url = f"{es_host}/{DATA_STREAM}/_doc" | |
| try: | |
| response = requests.post(url, headers=ES_HEADERS, data=json.dumps(document), timeout=10) | |
| if response.status_code in (200, 201): | |
| success = True | |
| break | |
| else: | |
| logging.warning(f"Failed to index to {es_host}: {response.status_code} {response.text}") | |
| except requests.RequestException as e: | |
| logging.warning(f"Error connecting to {es_host}: {e}") | |
| if not success: | |
| logging.error("Failed to index document to all ES hosts") | |
| def metrics_rabbitmq(): | |
| for host in RABBIT_HOSTS: | |
| queues = fetch_queues(host) | |
| for q in queues: | |
| vhost = q.get("vhost") | |
| qname = q.get("name") | |
| details = fetch_queue_details(host, vhost, qname) | |
| if details: | |
| details = clean_queue_data(details) | |
| index_to_es(details) | |
| if __name__ == "__main__": | |
| metrics_rabbitmq() | |
Output of the script:
2025-12-10 13:22:58 [INFO] Fetching queues from http://srv01.abc.com:15672
2025-12-10 13:22:58 [INFO] Successfully fetched 365 queues from http://srv01.abc.com:15672
2025-12-10 13:23:41 [INFO] Fetching queues from http://srv02.abc.com:15672
2025-12-10 13:23:41 [ERROR] Failed to fetch queues from http://srv02.abc.com:15672: 401 Client Error: Unauthorized for url: http://srv02.abc.com:15672/api/queues
2025-12-10 13:23:41 [INFO] Fetching queues from http://srv03.abc.com:15672
2025-12-10 13:23:41 [INFO] Successfully fetched 28 queues from http://srv03.abc.com:15672
Screenshot from the Kibana.
Leave a comment