Graphing Palo Alto Firewall statistics via Python, Telegraf and Grafana - Part 1
Today we are looking into how to fetch, save and visualize various counters on Palo Alto firewalls using the PAN-OS API. The examples are using the XML API and not the "new" REST API, as it does not offer nearly as much functionality as the old API.
I'm using this data gathering to visualize and detect issues much faster than relying on the SNMP data or any other means. Especially as via these scripts you can easily gather, for example, throughput data, session data, global counters and more.
Tools we are using are Python scripts to fetch the data via the Palo Alto PAN-OS XML API, Telegraf to control the script, InfluxDB to save the data and finally Grafana to visualize the data.
In part 1 of this topic we will cover fetching the global counter and session info data. Part 2 will include fetching the CPU and packet buffer data.
Prerequisites
I'm running all the software on Ubuntu 20.04 server and Python 3.10.2. Lesser versions should work also, though Python < 3 has not been tested (and should not be used anymore anyways).
- Palo Alto firewalls (PAN-OS 9.0, 9.1, 10.0 and 10.1 tested with these scripts)
- InfluxDB 2.x installed (1.x will also work but the Telegraf and Grafana configs are a bit different)
- Telegraf installed
- Grafana installed
Getting Palo Alto statistics using Python scripts
These Python scripts will fetch the statistics via the PAN-OS XML API, parse the XML-formatted result and output it in InfluxDB format. In these scripts the firewalls are read from a JSON file (the example format will be listed later on).
The authentication to the XML API is by using the pre-set API key to config.json file. You can easily include a login function to get the API key for each firewall (in this example all firewalls have the same master encryption key so the API key works for all firewalls, if you have the same user configured on the firewalls or via a remote authentication server)
You can get the API key easily via the following CURL command.
curl -k -X GET 'https://<firewall>/api/?type=keygen&user=<username>&password=<password>'
Also worth noting in the Python scripts below is the use of Threading. This allows you to run all the API-queries at the same time to multiple firewalls. This speeds up the data gathering significantly. Semaphores are used when outputting the data, so the output does not get garbled/overlapped when multiple threads would write to the output simultaneously.
Note: You can easily modify the script to save the data anywhere you like or for example perform data calculations and such before outputting the data. Also getting other data via the XML API can be easily implemented.
Global counters
#!/usr/bin/env python
import datetime
import logging
import requests
import json
import time
import urllib3
import argparse
import textwrap
import threading
import xml.etree.ElementTree as ET
sem = threading.Semaphore()
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logging.basicConfig(level=logging.WARNING, format='[%(asctime)s] [%(levelname)s] (%(threadName)-10s) %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
# Set the API Call URL
PAOpApiCallUrl = "https://{}/api/?type={}&cmd={}&key={}"
# Function for parsing the configuration
def parse_config(config_path):
with open(config_path, 'r') as config_file:
config_data = json.load(config_file)
return config_data
# Palo Alto API call function
def pa_apicall(url,calltype,cmd,key,firewall,unixtime):
logging.info('Parsing firewall %s (%s) system info', firewall)
result = requests.get(PAOpApiCallUrl.format(url, calltype, cmd, key), verify=False, timeout=5)
if result.status_code != 200:
logging.info("Palo Alto API call failed - status code: %i" % r.status_code)
return 1
# Acquire semaphore and parse the output
sem.acquire()
parse_output(firewall, unixtime, result)
sem.release()
return 1
# Parse and print output
def parse_output(firewall, unixtime, gc_info):
# Get the XML Element Tree
gc_info_tree = ET.fromstring(gc_info.content)
# Parse the Global Counter info from the XML Tree and print it in InfluxDB output format
for gc in gc_info_tree.findall(".//entry"):
print("paglobalcounters,firewall=" + firewall + "," + "counter=" + gc.find('name').text + " value=" + gc.find('value').text + " " + str(unixtime))
def main():
# Print help to CLI and parse the arguments
parser=argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description=textwrap.dedent('''\
Palo Alto Get Global Counters info for Telegraf
'''))
parser.add_argument('-f', type=str, dest='firewall_db', default="firewalls.json", help='File to read the firewall database from')
args=parser.parse_args()
# Config filenames
config_filename = "config.json"
firewall_filename = args.firewall_db
unixtime = time.time_ns()
# Output logging information for debug purposes
logging.info('Starting Palo Alto Get Global Counters info for Telegraf')
logging.info('Parsing config %s and firewall database %s', config_filename, firewall_filename)
# Parse configuration files
config = parse_config(config_filename)
fw_config = parse_config(firewall_filename)
# Initiate jobs list
jobs = []
try:
# Parse the firewalls list
for firewall in fw_config["firewalls"]:
# Save the function calls into jobs list
# We are getting the severity "drop" counters. Change the severity to get events of lesser severity
thread_apicall = threading.Thread(target=pa_apicall, args=(firewall["ip"], "op", "<show><counter><global><filter><severity>drop</severity></filter></global></counter></show>", config["apikey"], firewall["name"], unixtime))
jobs.append(thread_apicall)
# Start the jobs in list
for j in jobs:
j.start()
# Join the jobs in list
for j in jobs:
j.join()
except KeyboardInterrupt:
logging.info('KeyboardInterrupt')
logging.info('Ending')
if __name__ == "__main__":
main()
Session information
Parsing the session information can be easily done by implementing the following changes to the script. Full scripts will be included in the GIT repository.
# Parse and print output
def parse_output(firewall, unixtime, session_info):
# Get the XML Element Tree
session_info_tree = ET.fromstring(session_info.content)
# Get session info and parse the XML
for sess in session_info_tree.findall("./"):
print("pasessioninfo,firewall=" + firewall + " pps=" + sess.find('pps').text + ",active=" + sess.find('num-active').text + ",udp=" + sess.find('num-udp').text + ",tcp=" + sess.find('num-tcp').text + ",icmp=" + sess.find('num-icmp').text + ",cps=" + sess.find('cps').text + ",kbps=" + sess.find('kbps').text + ",totalsessions=" + sess.find('num-installed').text + " " + str(unixtime))
# In the main function the API calls have to be modified as below
thread_apicall = threading.Thread(target=pa_apicall, args=(firewall["ip"], "op", "<show><session><info></info></session></show>", config["apikey"], firewall["name"], unixtime))
Example config JSON
{
"apikey": "APIKEY"
}
Example firewall source JSON
{
"firewalls": [
{
"name": "Firewall 1",
"ip": "192.168.1.1"
},
{
"name": "Firewall 2",
"ip": "192.168.1.2"
}
]
}
Automate the data gathering with Telegraf
We are using Telegraf to call the Python script with specific intervals to fetch the data and save it to InfluxDB.
In the configurations below, only the output and inputs are defined. Other Telegraf config must be present also.
# Define InfluxDB v2 output
[[outputs.influxdb_v2]]
urls = ["http://localhost:8086"]
token = "TOKEN"
organization = "ORG"
bucket = "panos_counters"
tagexclude = ["tag1"]
[outputs.influxdb_v2.tagpass]
tag1 = ["panoscounters"]
# Palo Alto global counters fetch
[[inputs.exec]]
command = "/usr/bin/python3 /panos_scripts/get_panos_global_counter.py"
data_format = "influx"
timeout = "15s"
interval = "30s"
[inputs.exec.tags]
tag1 = "panoscounters"
# Palo Alto session counters fetch
[[inputs.exec]]
command = "/usr/bin/python3 /panos_scripts/get_panos_session_info.py"
data_format = "influx"
timeout = "15s"
interval = "30s"
[inputs.exec.tags]
tag1 = "panoscounters"
Verify data input in InfluxDB browser
Browse through the InfluxDB Data Explorer to verify that the data is coming to the proper Bucket and is getting written correctly.
Visualize data in Grafana
Below are example graphs and InfluxDB Flux queries to get and format the data. In addition, I'm using data variables (firewall names), you must configure these in the dashboard variable settings.
However, you don't have to use the data variables. You can change / remove the filters for firewall in the Flux queries below. The queries will then display everything in the same view, without the ability to filter the data in Grafana (which in turn would cause a lot of data displayed in one graph when data from multiple firewalls is visualized).
// Palo Alto - Global Counters Graph - 30sec data (Time series graph)
from(bucket: "panos_counters")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "paglobalcounters")
|> filter(fn: (r) => r["firewall"] =~ /${firewalls:regex}$/)
|> filter(fn: (r) => r["_field"] == "value")
|> map(fn: (r) => ({
r with
_value: r._value
}))
|> derivative(unit: 30s, nonNegative: true, columns: ["_value"], timeColumn: "_time")
|> yield(name: "last")
// Palo Alto - System Info Graph - 30sec data (Time series graph)
from(bucket: "panos_counters")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "pasessioninfo" and r["_field"] != "totalsessions")
|> filter(fn: (r) => r["firewall"] =~ /${firewalls:regex}$/)
|> aggregateWindow(every: v.windowPeriod, fn: last, createEmpty: false)
|> yield(name: "last")
Example on how to grab the data variables from the InfluxDB data for usage in Flux queries. Configure these under Dashboard -> Variables.
from(bucket: "panos_counters")
|> range(start: v.timeRangeStart, stop: v.timeRangeStop)
|> filter(fn: (r) => r["_measurement"] == "pasessioninfo")
|> keep(columns: ["firewall"])
|> distinct(column: "firewall")
|> keep(columns: ["_value"])
Conclusion
Following the steps above allows you to gather and visualize some very informative data from Palo Alto firewalls via the PAN-OS XML API. The 30 second interval data will allow you to see the spikes and anomalities in the global counters in a more detailed manner.
All the scripts above can be found in the GIT repository
Part 2 will be posted in the near future and include the topics of fetching CPU and packet buffer data.