update v1.7.0

This commit is contained in:
whb0514
2025-07-22 14:09:25 +08:00
parent f8fad844d7
commit 6a6d226413
4 changed files with 4684 additions and 0 deletions

968
moonraker/authorization.py Normal file
View File

@@ -0,0 +1,968 @@
# API Key Based Authorization
#
# Copyright (C) 2020 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license
from __future__ import annotations
import asyncio
import base64
import uuid
import hashlib
import secrets
import os
import time
import datetime
import ipaddress
import re
import socket
import logging
from tornado.web import HTTPError
from libnacl.sign import Signer, Verifier
from ..utils import json_wrapper as jsonw
from ..common import RequestType, TransportType, SqlTableDefinition, UserInfo
# Annotation imports
from typing import (
TYPE_CHECKING,
Any,
Tuple,
Optional,
Union,
Dict,
List,
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..common import WebRequest
from .websockets import WebsocketManager
from tornado.httputil import HTTPServerRequest
from .database import MoonrakerDatabase as DBComp
from .database import DBProviderWrapper
from .ldap import MoonrakerLDAP
IPAddr = Union[ipaddress.IPv4Address, ipaddress.IPv6Address]
IPNetwork = Union[ipaddress.IPv4Network, ipaddress.IPv6Network]
OneshotToken = Tuple[IPAddr, Optional[UserInfo], asyncio.Handle]
# Helpers for base64url encoding and decoding
def base64url_encode(data: bytes) -> bytes:
return base64.urlsafe_b64encode(data).rstrip(b"=")
def base64url_decode(data: str) -> bytes:
pad_cnt = len(data) % 4
if pad_cnt:
data += "=" * (4 - pad_cnt)
return base64.urlsafe_b64decode(data)
ONESHOT_TIMEOUT = 5
TRUSTED_CONNECTION_TIMEOUT = 3600
FQDN_CACHE_TIMEOUT = 84000
PRUNE_CHECK_TIME = 300.
USER_TABLE = "authorized_users"
AUTH_SOURCES = ["moonraker", "ldap"]
HASH_ITER = 100000
API_USER = "_API_KEY_USER_"
TRUSTED_USER = "_TRUSTED_USER_"
RESERVED_USERS = [API_USER, TRUSTED_USER]
JWT_EXP_TIME = datetime.timedelta(hours=1)
JWT_HEADER = {
'alg': "EdDSA",
'typ': "JWT"
}
class UserSqlDefinition(SqlTableDefinition):
name = USER_TABLE
prototype = (
f"""
{USER_TABLE} (
username TEXT PRIMARY KEY NOT NULL,
password TEXT NOT NULL,
created_on REAL NOT NULL,
salt TEXT NOT NULL,
source TEXT NOT NULL,
jwt_secret TEXT,
jwk_id TEXT,
groups pyjson
)
"""
)
version = 1
def migrate(self, last_version: int, db_provider: DBProviderWrapper) -> None:
if last_version == 0:
users: Dict[str, Dict[str, Any]]
users = db_provider.get_namespace("authorized_users")
api_user = users.pop(API_USER, {})
if not isinstance(api_user, dict):
api_user = {}
user_vals: List[Tuple[Any, ...]] = [
UserInfo(
username=API_USER,
password=api_user.get("api_key", uuid.uuid4().hex),
created_on=api_user.get("created_on", time.time())
).as_tuple()
]
for key, user in users.items():
if not isinstance(user, dict):
logging.info(
f"Auth migration, skipping invalid value: {key} {user}"
)
continue
user_vals.append(UserInfo(**user).as_tuple())
placeholders = ",".join("?" * len(user_vals[0]))
conn = db_provider.connection
with conn:
conn.executemany(
f"INSERT OR IGNORE INTO {USER_TABLE} VALUES({placeholders})",
user_vals
)
db_provider.wipe_local_namespace("authorized_users")
class Authorization:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.login_timeout = config.getint('login_timeout', 90)
self.force_logins = config.getboolean('force_logins', False)
self.default_source = config.get('default_source', "moonraker").lower()
self.enable_api_key = config.getboolean('enable_api_key', True)
self.max_logins = config.getint("max_login_attempts", None, above=0)
self.failed_logins: Dict[IPAddr, int] = {}
self.fqdn_cache: Dict[IPAddr, Dict[str, Any]] = {}
if self.default_source not in AUTH_SOURCES:
self.server.add_warning(
"[authorization]: option 'default_source' - Invalid "
f"value '{self.default_source}', falling back to "
"'moonraker'."
)
self.default_source = "moonraker"
self.ldap: Optional[MoonrakerLDAP] = None
if config.has_section("ldap"):
self.ldap = self.server.load_component(config, "ldap", None)
if self.default_source == "ldap" and self.ldap is None:
self.server.add_warning(
"[authorization]: Option 'default_source' set to 'ldap',"
" however [ldap] section failed to load or not configured"
)
database: DBComp = self.server.lookup_component('database')
self.user_table = database.register_table(UserSqlDefinition())
self.users: Dict[str, UserInfo] = {}
self.api_key = uuid.uuid4().hex
hi = self.server.get_host_info()
self.issuer = f"http://{hi['hostname']}:{hi['port']}"
self.public_jwks: Dict[str, Dict[str, Any]] = {}
self.trusted_users: Dict[IPAddr, Dict[str, Any]] = {}
self.oneshot_tokens: Dict[str, OneshotToken] = {}
# Get allowed cors domains
self.cors_domains: List[str] = []
for domain in config.getlist('cors_domains', []):
bad_match = re.search(r"^.+\.[^:]*\*", domain)
if bad_match is not None:
self.server.add_warning(
f"[authorization]: Unsafe domain '{domain}' in option "
f"'cors_domains'. Wildcards are not permitted in the"
" top level domain."
)
continue
if domain.endswith("/"):
self.server.add_warning(
f"[authorization]: Invalid domain '{domain}' in option "
"'cors_domains'. Domain's cannot contain a trailing "
"slash."
)
else:
self.cors_domains.append(
domain.replace(".", "\\.").replace("*", ".*"))
# Get Trusted Clients
self.trusted_ips: List[IPAddr] = []
self.trusted_ranges: List[IPNetwork] = []
self.trusted_domains: List[str] = []
for val in config.getlist('trusted_clients', []):
# Check IP address
try:
tc = ipaddress.ip_address(val)
except ValueError:
pass
else:
self.trusted_ips.append(tc)
continue
# Check ip network
try:
tn = ipaddress.ip_network(val)
except ValueError as e:
if "has host bits set" in str(e):
self.server.add_warning(
f"[authorization]: Invalid CIDR expression '{val}' "
"in option 'trusted_clients'")
continue
pass
else:
self.trusted_ranges.append(tn)
continue
# Check hostname
match = re.match(r"([a-z0-9]+(-[a-z0-9]+)*\.?)+[a-z]{2,}$", val)
if match is not None:
self.trusted_domains.append(val.lower())
else:
self.server.add_warning(
f"[authorization]: Invalid domain name '{val}' "
"in option 'trusted_clients'")
t_clients = "\n".join(
[str(ip) for ip in self.trusted_ips] +
[str(rng) for rng in self.trusted_ranges] +
self.trusted_domains)
c_domains = "\n".join(self.cors_domains)
logging.info(
f"Authorization Configuration Loaded\n"
f"Trusted Clients:\n{t_clients}\n"
f"CORS Domains:\n{c_domains}")
eventloop = self.server.get_event_loop()
self.prune_timer = eventloop.register_timer(
self._prune_conn_handler)
# Register Authorization Endpoints
self.server.register_endpoint(
"/access/login", RequestType.POST, self._handle_login,
transports=TransportType.HTTP | TransportType.WEBSOCKET,
auth_required=False
)
self.server.register_endpoint(
"/access/logout", RequestType.POST, self._handle_logout,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/refresh_jwt", RequestType.POST, self._handle_refresh_jwt,
transports=TransportType.HTTP | TransportType.WEBSOCKET,
auth_required=False
)
self.server.register_endpoint(
"/access/user", RequestType.all(), self._handle_user_request,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/users/list", RequestType.GET, self._handle_list_request,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/user/password", RequestType.POST, self._handle_password_reset,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
# Custom endpoint: find a user by username and reset password (only suitable for ordinary user)
self.server.register_endpoint(
"/access/user/password_by_name", RequestType.POST, self._handle_password_reset_by_name,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/api_key", RequestType.GET | RequestType.POST,
self._handle_apikey_request,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/oneshot_token", RequestType.GET, self._handle_oneshot_request,
transports=TransportType.HTTP | TransportType.WEBSOCKET
)
self.server.register_endpoint(
"/access/info", RequestType.GET, self._handle_info_request,
transports=TransportType.HTTP | TransportType.WEBSOCKET,
auth_required=False
)
wsm: WebsocketManager = self.server.lookup_component("websockets")
wsm.register_notification("authorization:user_created")
wsm.register_notification(
"authorization:user_deleted", event_type="logout"
)
wsm.register_notification(
"authorization:user_logged_out", event_type="logout"
)
async def component_init(self) -> None:
# Populate users from database
cursor = await self.user_table.execute(f"SELECT * FROM {USER_TABLE}")
self.users = {row[0]: UserInfo(**dict(row)) for row in await cursor.fetchall()}
need_sync = self._initialize_users()
if need_sync:
await self._sync_user_table()
self.prune_timer.start(delay=PRUNE_CHECK_TIME)
async def _sync_user(self, username: str) -> None:
user = self.users[username]
vals = user.as_tuple()
placeholders = ",".join("?" * len(vals))
async with self.user_table as tx:
await tx.execute(
f"REPLACE INTO {USER_TABLE} VALUES({placeholders})", vals
)
async def _sync_user_table(self) -> None:
async with self.user_table as tx:
await tx.execute(f"DELETE FROM {USER_TABLE}")
user_vals: List[Tuple[Any, ...]]
user_vals = [user.as_tuple() for user in self.users.values()]
if not user_vals:
return
placeholders = ",".join("?" * len(user_vals[0]))
await tx.executemany(
f"INSERT INTO {USER_TABLE} VALUES({placeholders})", user_vals
)
def _initialize_users(self) -> bool:
need_sync = False
api_user: Optional[UserInfo] = self.users.get(API_USER, None)
if api_user is None:
need_sync = True
self.users[API_USER] = UserInfo(username=API_USER, password=self.api_key)
else:
self.api_key = api_user.password
for username, user_info in list(self.users.items()):
if username == API_USER:
continue
# generate jwks for valid users
if user_info.jwt_secret is not None:
try:
priv_key = self._load_private_key(user_info.jwt_secret)
jwk_id = user_info.jwk_id
assert jwk_id is not None
except (self.server.error, KeyError, AssertionError):
logging.info("Invalid jwk found for user, removing")
user_info.jwt_secret = None
user_info.jwk_id = None
self.users[username] = user_info
need_sync = True
continue
self.public_jwks[jwk_id] = self._generate_public_jwk(priv_key)
return need_sync
async def _handle_apikey_request(self, web_request: WebRequest) -> str:
if web_request.get_request_type() == RequestType.POST:
self.api_key = uuid.uuid4().hex
self.users[API_USER].password = self.api_key
await self._sync_user(API_USER)
return self.api_key
async def _handle_oneshot_request(self, web_request: WebRequest) -> str:
ip = web_request.get_ip_address()
assert ip is not None
user_info = web_request.get_current_user()
return self.get_oneshot_token(ip, user_info)
async def _handle_login(self, web_request: WebRequest) -> Dict[str, Any]:
ip = web_request.get_ip_address()
if ip is not None and self.check_logins_maxed(ip):
raise HTTPError(
401, "Unauthorized, Maximum Login Attempts Reached"
)
try:
ret = await self._login_jwt_user(web_request)
except asyncio.CancelledError:
raise
except Exception:
if ip is not None:
failed = self.failed_logins.get(ip, 0)
self.failed_logins[ip] = failed + 1
raise
if ip is not None:
self.failed_logins.pop(ip, None)
return ret
async def _handle_logout(self, web_request: WebRequest) -> Dict[str, str]:
user_info = web_request.get_current_user()
if user_info is None:
raise self.server.error("No user logged in")
username: str = user_info.username
if username in RESERVED_USERS:
raise self.server.error(
f"Invalid log out request for user {username}")
jwk_id: Optional[str] = self.users[username].jwk_id
self.users[username].jwt_secret = None
self.users[username].jwk_id = None
if jwk_id is not None:
self.public_jwks.pop(jwk_id, None)
await self._sync_user(username)
eventloop = self.server.get_event_loop()
eventloop.delay_callback(
.005, self.server.send_event, "authorization:user_logged_out",
{'username': username}
)
return {
"username": username,
"action": "user_logged_out"
}
async def _handle_info_request(self, web_request: WebRequest) -> Dict[str, Any]:
sources = ["moonraker"]
if self.ldap is not None:
sources.append("ldap")
login_req = self.force_logins and len(self.users) > 1
request_trusted: Optional[bool] = None
user = web_request.get_current_user()
req_ip = web_request.ip_addr
if user is not None and user.username == TRUSTED_USER:
request_trusted = True
elif req_ip is not None:
request_trusted = await self._check_authorized_ip(req_ip)
return {
"default_source": self.default_source,
"available_sources": sources,
"login_required": login_req,
"trusted": request_trusted
}
async def _handle_refresh_jwt(self,
web_request: WebRequest
) -> Dict[str, str]:
refresh_token: str = web_request.get_str('refresh_token')
try:
user_info = self.decode_jwt(refresh_token, token_type="refresh")
except Exception:
raise self.server.error("Invalid Refresh Token", 401)
username: str = user_info.username
if user_info.jwt_secret is None or user_info.jwk_id is None:
raise self.server.error("User not logged in", 401)
private_key = self._load_private_key(user_info.jwt_secret)
jwk_id: str = user_info.jwk_id
token = self._generate_jwt(username, jwk_id, private_key)
return {
'username': username,
'token': token,
'source': user_info.source,
'action': 'user_jwt_refresh'
}
async def _handle_user_request(
self, web_request: WebRequest
) -> Dict[str, Any]:
req_type = web_request.get_request_type()
if req_type == RequestType.GET:
user = web_request.get_current_user()
if user is None:
return {
"username": None,
"source": None,
"created_on": None,
}
else:
return {
"username": user.username,
"source": user.source,
"created_on": user.created_on
}
elif req_type == RequestType.POST:
# Create User
return await self._login_jwt_user(web_request, create=True)
elif req_type == RequestType.DELETE:
# Delete User
return await self._delete_jwt_user(web_request)
raise self.server.error("Invalid Request Method")
async def _handle_list_request(self,
web_request: WebRequest
) -> Dict[str, List[Dict[str, Any]]]:
user_list = []
for user in self.users.values():
if user.username == API_USER:
continue
user_list.append({
'username': user.username,
'source': user.source,
'created_on': user.created_on
})
return {
'users': user_list
}
async def _handle_password_reset(self,
web_request: WebRequest
) -> Dict[str, str]:
password: str = web_request.get_str('password')
new_pass: str = web_request.get_str('new_password')
user_info = web_request.get_current_user()
if user_info is None:
raise self.server.error("No Current User")
username = user_info.username
if user_info.source == "ldap":
raise self.server.error(
f"Can´t Reset password for ldap user {username}")
if username in RESERVED_USERS:
raise self.server.error(
f"Invalid Reset Request for user {username}")
salt = bytes.fromhex(user_info.salt)
hashed_pass = hashlib.pbkdf2_hmac(
'sha256', password.encode(), salt, HASH_ITER).hex()
if hashed_pass != user_info.password:
raise self.server.error("Invalid Password")
new_hashed_pass = hashlib.pbkdf2_hmac(
'sha256', new_pass.encode(), salt, HASH_ITER).hex()
self.users[username].password = new_hashed_pass
await self._sync_user(username)
return {
'username': username,
'action': "user_password_reset"
}
async def _handle_password_reset_by_name(self,
web_request: WebRequest
) -> Dict[str, str]:
username: str = web_request.get_str('username')
new_pass: str = web_request.get_str('new_password')
user_info = self.users[username]
if user_info.source == "ldap":
raise self.server.error(
f"Can´t Reset password for ldap user {username}")
if username in RESERVED_USERS:
raise self.server.error(
f"Invalid Reset Request for user {username}")
salt = bytes.fromhex(user_info.salt)
new_hashed_pass = hashlib.pbkdf2_hmac(
'sha256', new_pass.encode(), salt, HASH_ITER).hex()
self.users[username].password = new_hashed_pass
await self._sync_user(username)
return {
'username': username,
'action': "user_password_reset_by_name"
}
async def _login_jwt_user(
self, web_request: WebRequest, create: bool = False
) -> Dict[str, Any]:
username: str = web_request.get_str('username')
password: str = web_request.get_str('password')
source: str = web_request.get_str(
'source', self.default_source
).lower()
if source not in AUTH_SOURCES:
raise self.server.error(f"Invalid 'source': {source}")
user_info: UserInfo
if username in RESERVED_USERS:
raise self.server.error(
f"Invalid Request for user {username}")
if source == "ldap":
if create:
raise self.server.error("Cannot Create LDAP User")
if self.ldap is None:
raise self.server.error(
"LDAP authentication not available", 401
)
await self.ldap.authenticate_ldap_user(username, password)
if username not in self.users:
create = True
if create:
if username in self.users:
raise self.server.error(f"User {username} already exists")
salt = secrets.token_bytes(32)
hashed_pass = hashlib.pbkdf2_hmac(
'sha256', password.encode(), salt, HASH_ITER).hex()
user_info = UserInfo(
username=username,
password=hashed_pass,
salt=salt.hex(),
source=source,
)
self.users[username] = user_info
await self._sync_user(username)
action = "user_created"
if source == "ldap":
# Dont notify user created
action = "user_logged_in"
create = False
else:
if username not in self.users:
raise self.server.error(f"Unregistered User: {username}")
user_info = self.users[username]
auth_src = user_info.source
if auth_src != source:
raise self.server.error(
f"Moonraker cannot authenticate user '{username}', must "
f"specify source '{auth_src}'", 401
)
salt = bytes.fromhex(user_info.salt)
hashed_pass = hashlib.pbkdf2_hmac(
'sha256', password.encode(), salt, HASH_ITER).hex()
action = "user_logged_in"
if hashed_pass != user_info.password:
raise self.server.error("Invalid Password")
jwt_secret_hex: Optional[str] = user_info.jwt_secret
if jwt_secret_hex is None:
private_key = Signer()
jwk_id = base64url_encode(secrets.token_bytes()).decode()
user_info.jwt_secret = private_key.hex_seed().decode() # type: ignore
user_info.jwk_id = jwk_id
self.users[username] = user_info
await self._sync_user(username)
self.public_jwks[jwk_id] = self._generate_public_jwk(private_key)
else:
private_key = self._load_private_key(jwt_secret_hex)
if user_info.jwk_id is None:
user_info.jwk_id = base64url_encode(secrets.token_bytes()).decode()
jwk_id = user_info.jwk_id
token = self._generate_jwt(username, jwk_id, private_key)
refresh_token = self._generate_jwt(
username, jwk_id, private_key, token_type="refresh",
exp_time=datetime.timedelta(days=self.login_timeout))
conn = web_request.get_client_connection()
if create:
event_loop = self.server.get_event_loop()
event_loop.delay_callback(
.005, self.server.send_event,
"authorization:user_created",
{'username': username})
elif conn is not None:
conn.user_info = user_info
return {
'username': username,
'token': token,
'source': user_info.source,
'refresh_token': refresh_token,
'action': action
}
async def _delete_jwt_user(self, web_request: WebRequest) -> Dict[str, str]:
username: str = web_request.get_str('username')
current_user = web_request.get_current_user()
if current_user is not None:
curname = current_user.username
if curname == username:
raise self.server.error(f"Cannot delete logged in user {curname}")
if username in RESERVED_USERS:
raise self.server.error(
f"Invalid Request for reserved user {username}")
user_info: Optional[UserInfo] = self.users.get(username)
if user_info is None:
raise self.server.error(f"No registered user: {username}")
if user_info.jwk_id is not None:
self.public_jwks.pop(user_info.jwk_id, None)
del self.users[username]
async with self.user_table as tx:
await tx.execute(
f"DELETE FROM {USER_TABLE} WHERE username = ?", (username,)
)
event_loop = self.server.get_event_loop()
event_loop.delay_callback(
.005, self.server.send_event,
"authorization:user_deleted",
{'username': username})
return {
"username": username,
"action": "user_deleted"
}
def _generate_jwt(self,
username: str,
jwk_id: str,
private_key: Signer,
token_type: str = "access",
exp_time: datetime.timedelta = JWT_EXP_TIME
) -> str:
curtime = int(time.time())
payload = {
'iss': self.issuer,
'aud': "Moonraker",
'iat': curtime,
'exp': curtime + int(exp_time.total_seconds()),
'username': username,
'token_type': token_type
}
header = {'kid': jwk_id}
header.update(JWT_HEADER)
jwt_header = base64url_encode(jsonw.dumps(header))
jwt_payload = base64url_encode(jsonw.dumps(payload))
jwt_msg = b".".join([jwt_header, jwt_payload])
sig = private_key.signature(jwt_msg)
jwt_sig = base64url_encode(sig)
return b".".join([jwt_msg, jwt_sig]).decode()
def decode_jwt(
self, token: str, token_type: str = "access", check_exp: bool = True
) -> UserInfo:
message, sig = token.rsplit('.', maxsplit=1)
enc_header, enc_payload = message.split('.')
header: Dict[str, Any] = jsonw.loads(base64url_decode(enc_header))
sig_bytes = base64url_decode(sig)
# verify header
if header.get('typ') != "JWT" or header.get('alg') != "EdDSA":
raise self.server.error("Invalid JWT header")
jwk_id: Optional[str] = header.get('kid')
if jwk_id not in self.public_jwks:
raise self.server.error("Invalid key ID")
# validate signature
public_key = self._public_key_from_jwk(self.public_jwks[jwk_id])
public_key.verify(sig_bytes + message.encode())
# validate claims
payload: Dict[str, Any] = jsonw.loads(base64url_decode(enc_payload))
if payload['token_type'] != token_type:
raise self.server.error(
f"JWT Token type mismatch: Expected {token_type}, "
f"Recd: {payload['token_type']}", 401)
if payload['iss'] != self.issuer:
raise self.server.error("Invalid JWT Issuer", 401)
if payload['aud'] != "Moonraker":
raise self.server.error("Invalid JWT Audience", 401)
if check_exp and payload['exp'] < int(time.time()):
raise self.server.error("JWT Expired", 401)
# get user
user_info: Optional[UserInfo] = self.users.get(
payload.get('username', ""), None)
if user_info is None:
raise self.server.error("Unknown user", 401)
return user_info
def validate_jwt(self, token: str) -> UserInfo:
try:
user_info = self.decode_jwt(token)
except Exception as e:
if isinstance(e, self.server.error):
raise
raise self.server.error(
f"Failed to decode JWT: {e}", 401
) from e
return user_info
def validate_api_key(self, api_key: str) -> UserInfo:
if not self.enable_api_key:
raise self.server.error("API Key authentication is disabled", 401)
if api_key and api_key == self.api_key:
return self.users[API_USER]
raise self.server.error("Invalid API Key", 401)
def _load_private_key(self, secret: str) -> Signer:
try:
key = Signer(bytes.fromhex(secret))
except Exception:
raise self.server.error(
"Error decoding private key, user data may"
" be corrupt", 500) from None
return key
def _generate_public_jwk(self, private_key: Signer) -> Dict[str, Any]:
public_key = private_key.vk
return {
'x': base64url_encode(public_key).decode(),
'kty': "OKP",
'crv': "Ed25519",
'use': "sig"
}
def _public_key_from_jwk(self, jwk: Dict[str, Any]) -> Verifier:
if jwk.get('kty') != "OKP":
raise self.server.error("Not an Octet Key Pair")
if jwk.get('crv') != "Ed25519":
raise self.server.error("Invalid Curve")
if 'x' not in jwk:
raise self.server.error("No 'x' argument in jwk")
key = base64url_decode(jwk['x'])
return Verifier(key.hex().encode())
def _prune_conn_handler(self, eventtime: float) -> float:
cur_time = time.time()
for ip, user_info in list(self.trusted_users.items()):
exp_time: float = user_info['expires_at']
if cur_time >= exp_time:
self.trusted_users.pop(ip, None)
logging.info(f"Trusted Connection Expired, IP: {ip}")
for ip, fqdn_info in list(self.fqdn_cache.items()):
exp_time = fqdn_info["expires_at"]
if cur_time >= exp_time:
domain: str = fqdn_info["domain"]
self.fqdn_cache.pop(ip, None)
logging.info(f"Cached FQDN Expired, IP: {ip}, domain: {domain}")
return eventtime + PRUNE_CHECK_TIME
def _oneshot_token_expire_handler(self, token):
self.oneshot_tokens.pop(token, None)
def get_oneshot_token(self, ip_addr: IPAddr, user: Optional[UserInfo]) -> str:
token = base64.b32encode(os.urandom(20)).decode()
event_loop = self.server.get_event_loop()
hdl = event_loop.delay_callback(
ONESHOT_TIMEOUT, self._oneshot_token_expire_handler, token)
self.oneshot_tokens[token] = (ip_addr, user, hdl)
return token
def _check_json_web_token(
self, request: HTTPServerRequest, required: bool = True
) -> Optional[UserInfo]:
auth_token: Optional[str] = request.headers.get("Authorization")
if auth_token is None:
auth_token = request.headers.get("X-Access-Token")
if auth_token is None:
qtoken = request.query_arguments.get('access_token', None)
if qtoken is not None:
auth_token = qtoken[-1].decode(errors="ignore")
elif auth_token.startswith("Bearer "):
auth_token = auth_token[7:]
else:
return None
if auth_token:
try:
return self.decode_jwt(auth_token, check_exp=required)
except Exception:
logging.exception(f"JWT Decode Error {auth_token}")
raise HTTPError(401, "JWT Decode Error")
return None
async def _check_authorized_ip(self, ip: IPAddr) -> bool:
if ip in self.trusted_ips:
return True
for rng in self.trusted_ranges:
if ip in rng:
return True
if self.trusted_domains:
if ip in self.fqdn_cache:
fqdn: str = self.fqdn_cache[ip]["domain"]
else:
eventloop = self.server.get_event_loop()
try:
fut = eventloop.run_in_thread(socket.getfqdn, str(ip))
fqdn = await asyncio.wait_for(fut, 5.0)
except asyncio.TimeoutError:
logging.info("Call to socket.getfqdn() timed out")
return False
else:
fqdn = fqdn.lower()
self.fqdn_cache[ip] = {
"expires_at": time.time() + FQDN_CACHE_TIMEOUT,
"domain": fqdn
}
return fqdn in self.trusted_domains
return False
async def _check_trusted_connection(
self, ip: Optional[IPAddr]
) -> Optional[UserInfo]:
if ip is not None:
curtime = time.time()
exp_time = curtime + TRUSTED_CONNECTION_TIMEOUT
if ip in self.trusted_users:
self.trusted_users[ip]["expires_at"] = exp_time
return self.trusted_users[ip]["user"]
elif await self._check_authorized_ip(ip):
logging.info(
f"Trusted Connection Detected, IP: {ip}")
self.trusted_users[ip] = {
"user": UserInfo(TRUSTED_USER, "", curtime),
"expires_at": exp_time
}
return self.trusted_users[ip]["user"]
return None
def _check_oneshot_token(
self, token: str, cur_ip: Optional[IPAddr]
) -> Optional[UserInfo]:
if token in self.oneshot_tokens:
ip_addr, user, hdl = self.oneshot_tokens.pop(token)
hdl.cancel()
if cur_ip != ip_addr:
logging.info(f"Oneshot Token IP Mismatch: expected{ip_addr}"
f", Recd: {cur_ip}")
return None
return user
else:
return None
def check_logins_maxed(self, ip_addr: IPAddr) -> bool:
if self.max_logins is None:
return False
return self.failed_logins.get(ip_addr, 0) >= self.max_logins
async def authenticate_request(
self, request: HTTPServerRequest, auth_required: bool = True
) -> Optional[UserInfo]:
if request.method == "OPTIONS":
return None
# Allow local request
try:
# logging.info(f"request.remote_ip: {request.remote_ip}, is_loopback: {ipaddress.ip_address(request.remote_ip).is_loopback}") # type: ignore
ip = ipaddress.ip_address(request.remote_ip) # type: ignore
if ip.is_loopback:
return None
except ValueError:
logging.exception(
f"Unable to Create IP Address {request.remote_ip}")
ip = None
# Check JSON Web Token
jwt_user = self._check_json_web_token(request, auth_required)
if jwt_user is not None:
return jwt_user
# Check oneshot access token
ost: Optional[List[bytes]] = request.arguments.get('token', None)
if ost is not None:
ost_user = self._check_oneshot_token(ost[-1].decode(), ip)
if ost_user is not None:
return ost_user
# Check API Key Header
if self.enable_api_key:
key: Optional[str] = request.headers.get("X-Api-Key")
if key and key == self.api_key:
return self.users[API_USER]
# If the force_logins option is enabled and at least one user is created
# then trusted user authentication is disabled
if self.force_logins and len(self.users) > 1:
if not auth_required:
return None
raise HTTPError(401, "Unauthorized, Force Logins Enabled")
# Check if IP is trusted. If this endpoint doesn't require authentication
# then it is acceptable to return None
trusted_user = await self._check_trusted_connection(ip)
if trusted_user is not None:
return trusted_user
if not auth_required:
return None
raise HTTPError(401, "Unauthorized")
async def check_cors(self, origin: Optional[str]) -> bool:
if origin is None or not self.cors_domains:
return False
for regex in self.cors_domains:
match = re.match(regex, origin)
if match is not None:
if match.group() == origin:
logging.debug(f"CORS Pattern Matched, origin: {origin} "
f" | pattern: {regex}")
return True
else:
logging.debug(f"Partial Cors Match: {match.group()}")
else:
# Check to see if the origin contains an IP that matches a
# current trusted connection
match = re.search(r"^https?://([^/:]+)", origin)
if match is not None:
ip = match.group(1)
try:
ipaddr = ipaddress.ip_address(ip)
except ValueError:
pass
else:
if await self._check_authorized_ip(ipaddr):
logging.debug(f"Cors request matched trusted IP: {ip}")
return True
logging.debug(f"No CORS match for origin: {origin}\n"
f"Patterns: {self.cors_domains}")
return False
def cors_enabled(self) -> bool:
return self.cors_domains is not None
def close(self) -> None:
self.prune_timer.stop()
def load_component(config: ConfigHelper) -> Authorization:
return Authorization(config)

File diff suppressed because it is too large Load Diff

354
moonraker/klippy_apis.py Normal file
View File

@@ -0,0 +1,354 @@
# Helper for Moonraker to Klippy API calls.
#
# Copyright (C) 2020 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations
import logging
from ..utils import Sentinel
from ..common import WebRequest, APITransport, RequestType
import os
import shutil
import json
# Annotation imports
from typing import (
TYPE_CHECKING,
Any,
Union,
Optional,
Dict,
List,
TypeVar,
Mapping,
Callable,
Coroutine
)
if TYPE_CHECKING:
from ..confighelper import ConfigHelper
from ..common import UserInfo
from .klippy_connection import KlippyConnection as Klippy
from .file_manager.file_manager import FileManager
Subscription = Dict[str, Optional[List[Any]]]
SubCallback = Callable[[Dict[str, Dict[str, Any]], float], Optional[Coroutine]]
_T = TypeVar("_T")
INFO_ENDPOINT = "info"
ESTOP_ENDPOINT = "emergency_stop"
LIST_EPS_ENDPOINT = "list_endpoints"
GC_OUTPUT_ENDPOINT = "gcode/subscribe_output"
GCODE_ENDPOINT = "gcode/script"
SUBSCRIPTION_ENDPOINT = "objects/subscribe"
STATUS_ENDPOINT = "objects/query"
OBJ_LIST_ENDPOINT = "objects/list"
REG_METHOD_ENDPOINT = "register_remote_method"
class KlippyAPI(APITransport):
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.klippy: Klippy = self.server.lookup_component("klippy_connection")
self.fm: FileManager = self.server.lookup_component("file_manager")
self.eventloop = self.server.get_event_loop()
app_args = self.server.get_app_args()
self.version = app_args.get('software_version')
# Maintain a subscription for all moonraker requests, as
# we do not want to overwrite them
self.host_subscription: Subscription = {}
self.subscription_callbacks: List[SubCallback] = []
# Register GCode Aliases
self.server.register_endpoint(
"/printer/print/pause", RequestType.POST, self._gcode_pause
)
self.server.register_endpoint(
"/printer/print/resume", RequestType.POST, self._gcode_resume
)
self.server.register_endpoint(
"/printer/print/cancel", RequestType.POST, self._gcode_cancel
)
self.server.register_endpoint(
"/printer/print/start", RequestType.POST, self._gcode_start_print
)
self.server.register_endpoint(
"/printer/restart", RequestType.POST, self._gcode_restart
)
self.server.register_endpoint(
"/printer/firmware_restart", RequestType.POST, self._gcode_firmware_restart
)
self.server.register_event_handler(
"server:klippy_disconnect", self._on_klippy_disconnect
)
self.server.register_endpoint(
"/printer/list_endpoints", RequestType.GET, self.list_endpoints
)
self.server.register_endpoint(
"/printer/breakheater", RequestType.POST, self.breakheater
)
self.server.register_endpoint(
"/printer/breakmacro", RequestType.POST, self.breakmacro
)
def _on_klippy_disconnect(self) -> None:
self.host_subscription.clear()
self.subscription_callbacks.clear()
async def _gcode_pause(self, web_request: WebRequest) -> str:
return await self.pause_print()
async def _gcode_resume(self, web_request: WebRequest) -> str:
return await self.resume_print()
async def _gcode_cancel(self, web_request: WebRequest) -> str:
return await self.cancel_print()
async def _gcode_start_print(self, web_request: WebRequest) -> str:
filename: str = web_request.get_str('filename')
user = web_request.get_current_user()
return await self.start_print(filename, user=user)
async def _gcode_restart(self, web_request: WebRequest) -> str:
return await self.do_restart("RESTART")
async def _gcode_firmware_restart(self, web_request: WebRequest) -> str:
return await self.do_restart("FIRMWARE_RESTART")
async def _send_klippy_request(
self,
method: str,
params: Dict[str, Any],
default: Any = Sentinel.MISSING,
transport: Optional[APITransport] = None
) -> Any:
try:
req = WebRequest(method, params, transport=transport or self)
result = await self.klippy.request(req)
except self.server.error:
if default is Sentinel.MISSING:
raise
result = default
return result
async def run_gcode(self,
script: str,
default: Any = Sentinel.MISSING
) -> str:
params = {'script': script}
result = await self._send_klippy_request(
GCODE_ENDPOINT, params, default)
return result
async def start_print(
self,
filename: str,
wait_klippy_started: bool = False,
user: Optional[UserInfo] = None
) -> str:
# WARNING: Do not call this method from within the following
# event handlers when "wait_klippy_started" is set to True:
# klippy_identified, klippy_started, klippy_ready, klippy_disconnect
# Doing so will result in "wait_started" blocking for the specifed
# timeout (default 20s) and returning False.
# XXX - validate that file is on disk
if filename[0] == '/':
filename = filename[1:]
# Escape existing double quotes in the file name
filename = filename.replace("\"", "\\\"")
homedir = os.path.expanduser("~")
if os.path.split(filename)[0].split(os.path.sep)[0] != ".cache":
base_path = os.path.join(homedir, "printer_data/gcodes")
target = os.path.join(".cache", os.path.basename(filename))
cache_path = os.path.join(base_path, ".cache")
if not os.path.exists(cache_path):
os.makedirs(cache_path)
shutil.rmtree(cache_path)
os.makedirs(cache_path)
metadata = self.fm.gcode_metadata.metadata.get(filename, None)
self.copy_file_to_cache(os.path.join(base_path, filename), os.path.join(base_path, target))
msg = "// metadata=" + json.dumps(metadata)
self.server.send_event("server:gcode_response", msg)
filename = target
script = f'SDCARD_PRINT_FILE FILENAME="{filename}"'
if wait_klippy_started:
await self.klippy.wait_started()
logging.info(f"Requesting Job Start, filename = {filename}")
ret = await self.run_gcode(script)
self.server.send_event("klippy_apis:job_start_complete", user)
return ret
async def pause_print(
self, default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, str]:
self.server.send_event("klippy_apis:pause_requested")
logging.info("Requesting job pause...")
return await self._send_klippy_request(
"pause_resume/pause", {}, default)
async def resume_print(
self, default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, str]:
self.server.send_event("klippy_apis:resume_requested")
logging.info("Requesting job resume...")
return await self._send_klippy_request(
"pause_resume/resume", {}, default)
async def cancel_print(
self, default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, str]:
self.server.send_event("klippy_apis:cancel_requested")
logging.info("Requesting job cancel...")
await self._send_klippy_request(
"breakmacro", {}, default)
await self._send_klippy_request(
"breakheater", {}, default)
return await self._send_klippy_request(
"pause_resume/cancel", {}, default)
async def breakheater(
self, default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, str]:
return await self._send_klippy_request(
"breakheater", {}, default)
async def breakmacro(
self, default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, str]:
return await self._send_klippy_request(
"breakmacro", {}, default)
async def do_restart(
self, gc: str, wait_klippy_started: bool = False
) -> str:
# WARNING: Do not call this method from within the following
# event handlers when "wait_klippy_started" is set to True:
# klippy_identified, klippy_started, klippy_ready, klippy_disconnect
# Doing so will result in "wait_started" blocking for the specifed
# timeout (default 20s) and returning False.
if wait_klippy_started:
await self.klippy.wait_started()
try:
result = await self.run_gcode(gc)
except self.server.error as e:
if str(e) == "Klippy Disconnected":
result = "ok"
else:
raise
return result
async def list_endpoints(self,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, Dict[str, List[str]]]:
return await self._send_klippy_request(
LIST_EPS_ENDPOINT, {}, default)
async def emergency_stop(self) -> str:
return await self._send_klippy_request(ESTOP_ENDPOINT, {})
async def get_klippy_info(self,
send_id: bool = False,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, Dict[str, Any]]:
params = {}
if send_id:
ver = self.version
params = {'client_info': {'program': "Moonraker", 'version': ver}}
return await self._send_klippy_request(INFO_ENDPOINT, params, default)
async def get_object_list(self,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, List[str]]:
result = await self._send_klippy_request(
OBJ_LIST_ENDPOINT, {}, default)
if isinstance(result, dict) and 'objects' in result:
return result['objects']
if default is not Sentinel.MISSING:
return default
raise self.server.error("Invalid response received from Klippy", 500)
async def query_objects(self,
objects: Mapping[str, Optional[List[str]]],
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, Dict[str, Any]]:
params = {'objects': objects}
result = await self._send_klippy_request(
STATUS_ENDPOINT, params, default)
if isinstance(result, dict) and "status" in result:
return result["status"]
if default is not Sentinel.MISSING:
return default
raise self.server.error("Invalid response received from Klippy", 500)
async def subscribe_objects(
self,
objects: Mapping[str, Optional[List[str]]],
callback: Optional[SubCallback] = None,
default: Union[Sentinel, _T] = Sentinel.MISSING
) -> Union[_T, Dict[str, Any]]:
# The host transport shares subscriptions amongst all components
for obj, items in objects.items():
if obj in self.host_subscription:
prev = self.host_subscription[obj]
if items is None or prev is None:
self.host_subscription[obj] = None
else:
uitems = list(set(prev) | set(items))
self.host_subscription[obj] = uitems
else:
self.host_subscription[obj] = items
params = {"objects": dict(self.host_subscription)}
result = await self._send_klippy_request(SUBSCRIPTION_ENDPOINT, params, default)
if isinstance(result, dict) and "status" in result:
if callback is not None:
self.subscription_callbacks.append(callback)
return result["status"]
if default is not Sentinel.MISSING:
return default
raise self.server.error("Invalid response received from Klippy", 500)
async def subscribe_from_transport(
self,
objects: Mapping[str, Optional[List[str]]],
transport: APITransport,
default: Union[Sentinel, _T] = Sentinel.MISSING,
) -> Union[_T, Dict[str, Any]]:
params = {"objects": dict(objects)}
result = await self._send_klippy_request(
SUBSCRIPTION_ENDPOINT, params, default, transport
)
if isinstance(result, dict) and "status" in result:
return result["status"]
if default is not Sentinel.MISSING:
return default
raise self.server.error("Invalid response received from Klippy", 500)
async def subscribe_gcode_output(self) -> str:
template = {'response_template':
{'method': "process_gcode_response"}}
return await self._send_klippy_request(GC_OUTPUT_ENDPOINT, template)
async def register_method(self, method_name: str) -> str:
return await self._send_klippy_request(
REG_METHOD_ENDPOINT,
{'response_template': {"method": method_name},
'remote_method': method_name})
def send_status(
self, status: Dict[str, Any], eventtime: float
) -> None:
for cb in self.subscription_callbacks:
self.eventloop.register_callback(cb, status, eventtime)
self.server.send_event("server:status_update", status)
def copy_file_to_cache(self, origin, target):
stat = os.statvfs("/")
free_space = stat.f_frsize * stat.f_bfree
filesize = os.path.getsize(os.path.join(origin))
if (filesize < free_space):
shutil.copy(origin, target)
else:
msg = "!! Insufficient disk space, unable to read the file."
self.server.send_event("server:gcode_response", msg)
raise self.server.error("Insufficient disk space, unable to read the file.", 500)
def load_component(config: ConfigHelper) -> KlippyAPI:
return KlippyAPI(config)

2184
moonraker/machine.py Normal file

File diff suppressed because it is too large Load Diff