mirror of
https://github.com/Tafkas/fritzbox-munin.git
synced 2023-10-10 13:36:55 +02:00
166 lines
5.9 KiB
Python
Executable File
166 lines
5.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
fritzbox_helper - A munin plugin for Linux to monitor AVM Fritzbox
|
|
Copyright (C) 2015 Christian Stade-Schuldt
|
|
Author: Christian Stade-Schuldt
|
|
Like Munin, this plugin is licensed under the GNU GPL v2 license
|
|
http://www.opensource.org/licenses/GPL-2.0
|
|
Add the following section to your munin-node's plugin configuration:
|
|
|
|
[fritzbox_*]
|
|
env.fritzbox_ip [ip address of the fritzbox]
|
|
env.fritzbox_username [fritzbox username]
|
|
env.fritzbox_password [fritzbox password]
|
|
|
|
This plugin supports the following munin configuration parameters:
|
|
#%# family=auto contrib
|
|
#%# capabilities=autoconf
|
|
|
|
The initial script was inspired by
|
|
https://www.linux-tips-and-tricks.de/en/programming/389-read-data-from-a-fritzbox-7390-with-python-and-bash
|
|
framp at linux-tips-and-tricks dot de
|
|
|
|
New authentication supporting username and password, as well as PBKDF2 support, has been inspired by
|
|
https://avm.de/fileadmin/user_upload/Global/Service/Schnittstellen/AVM%20Technical%20Note%20-%20Session%20ID_englisch.pdf
|
|
"""
|
|
|
|
import hashlib
|
|
import sys
|
|
import time
|
|
|
|
import requests
|
|
from lxml import etree
|
|
|
|
USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X x.y; rv:10.0) Gecko/20100101 Firefox/10.0"
|
|
|
|
LOGIN_SID_ROUTE = "/login_sid.lua?version=2"
|
|
|
|
|
|
class LoginState:
|
|
def __init__(self, challenge, blocktime):
|
|
self.challenge = challenge
|
|
self.blocktime = blocktime
|
|
self.is_pbkdf2 = challenge.startswith("2$")
|
|
|
|
|
|
def get_session_id(server, username, password, port=80):
|
|
""" Get a sid by solving the PBKDF2 (or MD5) challenge-response process. """
|
|
box_url = "http://{}:{}".format(server, port)
|
|
try:
|
|
state = get_login_state(box_url)
|
|
except Exception as ex:
|
|
raise Exception("failed to get challenge")
|
|
if state.is_pbkdf2:
|
|
# print("PBKDF2 supported")
|
|
challenge_response = calculate_pbkdf2_response(state.challenge, password)
|
|
else:
|
|
# print("Falling back to MD5")
|
|
challenge_response = calculate_md5_response(state.challenge, password)
|
|
if state.blocktime > 0:
|
|
# print(f"Waiting for {state.blocktime} seconds...")
|
|
time.sleep(state.blocktime)
|
|
try:
|
|
sid = send_response(box_url, username, challenge_response)
|
|
except Exception as ex:
|
|
raise Exception("failed to login") from ex
|
|
if sid == "0000000000000000":
|
|
raise Exception("wrong username or password")
|
|
return sid
|
|
|
|
|
|
def get_login_state(box_url):
|
|
""" Get login state from FRITZ!Box using login_sid.lua?version=2 """
|
|
url = box_url + LOGIN_SID_ROUTE
|
|
r = requests.get(url)
|
|
xml = etree.fromstring(r.content)
|
|
challenge = xml.find("Challenge").text
|
|
blocktime = int(xml.find("BlockTime").text)
|
|
return LoginState(challenge, blocktime)
|
|
|
|
|
|
def calculate_pbkdf2_response(challenge, password):
|
|
""" Calculate the response for a given challenge via PBKDF2 """
|
|
challenge_parts = challenge.split("$")
|
|
# Extract all necessary values encoded into the challenge
|
|
iter1 = int(challenge_parts[1])
|
|
salt1 = bytes.fromhex(challenge_parts[2])
|
|
iter2 = int(challenge_parts[3])
|
|
salt2 = bytes.fromhex(challenge_parts[4])
|
|
# Hash twice, once with static salt...
|
|
# Once with dynamic salt.
|
|
hash1 = hashlib.pbkdf2_hmac("sha256", password.encode(), salt1, iter1)
|
|
hash2 = hashlib.pbkdf2_hmac("sha256", hash1, salt2, iter2)
|
|
return "{}${}".format(challenge_parts[4], hash2.hex())
|
|
|
|
|
|
def calculate_md5_response(challenge, password):
|
|
""" Calculate the response for a challenge using legacy MD5 """
|
|
response = challenge + "-" + password
|
|
# the legacy response needs utf_16_le encoding
|
|
response = response.encode("utf_16_le")
|
|
md5_sum = hashlib.md5()
|
|
md5_sum.update(response)
|
|
response = challenge + "-" + md5_sum.hexdigest()
|
|
return response
|
|
|
|
|
|
def send_response(box_url, username, challenge_response):
|
|
""" Send the response and return the parsed sid. raises an Exception on error """
|
|
# Build response params
|
|
post_data = {"username": username, "response": challenge_response}
|
|
# post_data = urllib.parse.urlencode(post_data_dict).encode()
|
|
headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
|
url = box_url + LOGIN_SID_ROUTE
|
|
r = requests.post(url, headers=headers, data=post_data)
|
|
# Parse SID from resulting XML.
|
|
xml = etree.fromstring(r.content)
|
|
return xml.find("SID").text
|
|
|
|
|
|
def get_page_content(server, session_id, page, port=80):
|
|
"""Fetches a page from the Fritzbox and returns its content
|
|
|
|
:param server: the ip address of the Fritzbox
|
|
:param session_id: a valid session id
|
|
:param page: the page you are regquesting
|
|
:param port: the port the Fritzbox webserver runs on
|
|
:return: the content of the page
|
|
"""
|
|
|
|
headers = {"Accept": "application/xml", "Content-Type": "text/plain", "User-Agent": USER_AGENT}
|
|
|
|
url = "http://{}:{}/{}?sid={}".format(server, port, page, session_id)
|
|
try:
|
|
r = requests.get(url, headers=headers)
|
|
r.raise_for_status()
|
|
except requests.exceptions.HTTPError as err:
|
|
print(err)
|
|
sys.exit(1)
|
|
return r.content
|
|
|
|
|
|
def get_xhr_content(server, session_id, page, port=80):
|
|
"""Fetches the xhr content from the Fritzbox and returns its content
|
|
|
|
:param server: the ip address of the Fritzbox
|
|
:param session_id: a valid session id
|
|
:param page: the page you are regquesting
|
|
:param port: the port the Fritzbox webserver runs on
|
|
:return: the content of the page
|
|
"""
|
|
|
|
headers = {
|
|
"Accept": "application/xml",
|
|
"Content-Type": "application/x-www-form-urlencoded",
|
|
"User-Agent": USER_AGENT,
|
|
}
|
|
|
|
url = "http://{}:{}/data.lua".format(server, port)
|
|
data = {"xhr": 1, "sid": session_id, "lang": "en", "page": page, "xhrId": "all", "no_sidrenew": ""}
|
|
try:
|
|
r = requests.post(url, data=data, headers=headers)
|
|
except requests.exceptions.HTTPError as err:
|
|
print(err)
|
|
sys.exit(1)
|
|
return r.content
|