From 6a6d22641320d94c69a09af9c38158f2f69e9e48 Mon Sep 17 00:00:00 2001 From: whb0514 <112596503+whb0514@users.noreply.github.com> Date: Tue, 22 Jul 2025 14:09:25 +0800 Subject: [PATCH] update v1.7.0 --- moonraker/authorization.py | 968 ++++++++++++ moonraker/file_manager/metadata.py | 1178 +++++++++++++++ moonraker/klippy_apis.py | 354 +++++ moonraker/machine.py | 2184 ++++++++++++++++++++++++++++ 4 files changed, 4684 insertions(+) create mode 100644 moonraker/authorization.py create mode 100644 moonraker/file_manager/metadata.py create mode 100644 moonraker/klippy_apis.py create mode 100644 moonraker/machine.py diff --git a/moonraker/authorization.py b/moonraker/authorization.py new file mode 100644 index 0000000..d9ffda0 --- /dev/null +++ b/moonraker/authorization.py @@ -0,0 +1,968 @@ +# API Key Based Authorization +# +# Copyright (C) 2020 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license + +from __future__ import annotations +import asyncio +import base64 +import uuid +import hashlib +import secrets +import os +import time +import datetime +import ipaddress +import re +import socket +import logging +from tornado.web import HTTPError +from libnacl.sign import Signer, Verifier +from ..utils import json_wrapper as jsonw +from ..common import RequestType, TransportType, SqlTableDefinition, UserInfo + +# Annotation imports +from typing import ( + TYPE_CHECKING, + Any, + Tuple, + Optional, + Union, + Dict, + List, +) + +if TYPE_CHECKING: + from ..confighelper import ConfigHelper + from ..common import WebRequest + from .websockets import WebsocketManager + from tornado.httputil import HTTPServerRequest + from .database import MoonrakerDatabase as DBComp + from .database import DBProviderWrapper + from .ldap import MoonrakerLDAP + IPAddr = Union[ipaddress.IPv4Address, ipaddress.IPv6Address] + IPNetwork = Union[ipaddress.IPv4Network, ipaddress.IPv6Network] + OneshotToken = Tuple[IPAddr, Optional[UserInfo], asyncio.Handle] + +# Helpers for base64url encoding and decoding +def base64url_encode(data: bytes) -> bytes: + return base64.urlsafe_b64encode(data).rstrip(b"=") + +def base64url_decode(data: str) -> bytes: + pad_cnt = len(data) % 4 + if pad_cnt: + data += "=" * (4 - pad_cnt) + return base64.urlsafe_b64decode(data) + + +ONESHOT_TIMEOUT = 5 +TRUSTED_CONNECTION_TIMEOUT = 3600 +FQDN_CACHE_TIMEOUT = 84000 +PRUNE_CHECK_TIME = 300. + +USER_TABLE = "authorized_users" +AUTH_SOURCES = ["moonraker", "ldap"] +HASH_ITER = 100000 +API_USER = "_API_KEY_USER_" +TRUSTED_USER = "_TRUSTED_USER_" +RESERVED_USERS = [API_USER, TRUSTED_USER] +JWT_EXP_TIME = datetime.timedelta(hours=1) +JWT_HEADER = { + 'alg': "EdDSA", + 'typ': "JWT" +} + +class UserSqlDefinition(SqlTableDefinition): + name = USER_TABLE + prototype = ( + f""" + {USER_TABLE} ( + username TEXT PRIMARY KEY NOT NULL, + password TEXT NOT NULL, + created_on REAL NOT NULL, + salt TEXT NOT NULL, + source TEXT NOT NULL, + jwt_secret TEXT, + jwk_id TEXT, + groups pyjson + ) + """ + ) + version = 1 + + def migrate(self, last_version: int, db_provider: DBProviderWrapper) -> None: + if last_version == 0: + users: Dict[str, Dict[str, Any]] + users = db_provider.get_namespace("authorized_users") + api_user = users.pop(API_USER, {}) + if not isinstance(api_user, dict): + api_user = {} + user_vals: List[Tuple[Any, ...]] = [ + UserInfo( + username=API_USER, + password=api_user.get("api_key", uuid.uuid4().hex), + created_on=api_user.get("created_on", time.time()) + ).as_tuple() + ] + for key, user in users.items(): + if not isinstance(user, dict): + logging.info( + f"Auth migration, skipping invalid value: {key} {user}" + ) + continue + user_vals.append(UserInfo(**user).as_tuple()) + placeholders = ",".join("?" * len(user_vals[0])) + conn = db_provider.connection + with conn: + conn.executemany( + f"INSERT OR IGNORE INTO {USER_TABLE} VALUES({placeholders})", + user_vals + ) + db_provider.wipe_local_namespace("authorized_users") + +class Authorization: + def __init__(self, config: ConfigHelper) -> None: + self.server = config.get_server() + self.login_timeout = config.getint('login_timeout', 90) + self.force_logins = config.getboolean('force_logins', False) + self.default_source = config.get('default_source', "moonraker").lower() + self.enable_api_key = config.getboolean('enable_api_key', True) + self.max_logins = config.getint("max_login_attempts", None, above=0) + self.failed_logins: Dict[IPAddr, int] = {} + self.fqdn_cache: Dict[IPAddr, Dict[str, Any]] = {} + if self.default_source not in AUTH_SOURCES: + self.server.add_warning( + "[authorization]: option 'default_source' - Invalid " + f"value '{self.default_source}', falling back to " + "'moonraker'." + ) + self.default_source = "moonraker" + self.ldap: Optional[MoonrakerLDAP] = None + if config.has_section("ldap"): + self.ldap = self.server.load_component(config, "ldap", None) + if self.default_source == "ldap" and self.ldap is None: + self.server.add_warning( + "[authorization]: Option 'default_source' set to 'ldap'," + " however [ldap] section failed to load or not configured" + ) + database: DBComp = self.server.lookup_component('database') + self.user_table = database.register_table(UserSqlDefinition()) + self.users: Dict[str, UserInfo] = {} + self.api_key = uuid.uuid4().hex + hi = self.server.get_host_info() + self.issuer = f"http://{hi['hostname']}:{hi['port']}" + self.public_jwks: Dict[str, Dict[str, Any]] = {} + self.trusted_users: Dict[IPAddr, Dict[str, Any]] = {} + self.oneshot_tokens: Dict[str, OneshotToken] = {} + + # Get allowed cors domains + self.cors_domains: List[str] = [] + for domain in config.getlist('cors_domains', []): + bad_match = re.search(r"^.+\.[^:]*\*", domain) + if bad_match is not None: + self.server.add_warning( + f"[authorization]: Unsafe domain '{domain}' in option " + f"'cors_domains'. Wildcards are not permitted in the" + " top level domain." + ) + continue + if domain.endswith("/"): + self.server.add_warning( + f"[authorization]: Invalid domain '{domain}' in option " + "'cors_domains'. Domain's cannot contain a trailing " + "slash." + ) + else: + self.cors_domains.append( + domain.replace(".", "\\.").replace("*", ".*")) + + # Get Trusted Clients + self.trusted_ips: List[IPAddr] = [] + self.trusted_ranges: List[IPNetwork] = [] + self.trusted_domains: List[str] = [] + for val in config.getlist('trusted_clients', []): + # Check IP address + try: + tc = ipaddress.ip_address(val) + except ValueError: + pass + else: + self.trusted_ips.append(tc) + continue + # Check ip network + try: + tn = ipaddress.ip_network(val) + except ValueError as e: + if "has host bits set" in str(e): + self.server.add_warning( + f"[authorization]: Invalid CIDR expression '{val}' " + "in option 'trusted_clients'") + continue + pass + else: + self.trusted_ranges.append(tn) + continue + # Check hostname + match = re.match(r"([a-z0-9]+(-[a-z0-9]+)*\.?)+[a-z]{2,}$", val) + if match is not None: + self.trusted_domains.append(val.lower()) + else: + self.server.add_warning( + f"[authorization]: Invalid domain name '{val}' " + "in option 'trusted_clients'") + + t_clients = "\n".join( + [str(ip) for ip in self.trusted_ips] + + [str(rng) for rng in self.trusted_ranges] + + self.trusted_domains) + c_domains = "\n".join(self.cors_domains) + + logging.info( + f"Authorization Configuration Loaded\n" + f"Trusted Clients:\n{t_clients}\n" + f"CORS Domains:\n{c_domains}") + + eventloop = self.server.get_event_loop() + self.prune_timer = eventloop.register_timer( + self._prune_conn_handler) + + # Register Authorization Endpoints + self.server.register_endpoint( + "/access/login", RequestType.POST, self._handle_login, + transports=TransportType.HTTP | TransportType.WEBSOCKET, + auth_required=False + ) + self.server.register_endpoint( + "/access/logout", RequestType.POST, self._handle_logout, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + self.server.register_endpoint( + "/access/refresh_jwt", RequestType.POST, self._handle_refresh_jwt, + transports=TransportType.HTTP | TransportType.WEBSOCKET, + auth_required=False + ) + self.server.register_endpoint( + "/access/user", RequestType.all(), self._handle_user_request, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + self.server.register_endpoint( + "/access/users/list", RequestType.GET, self._handle_list_request, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + self.server.register_endpoint( + "/access/user/password", RequestType.POST, self._handle_password_reset, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + # Custom endpoint: find a user by username and reset password (only suitable for ordinary user) + self.server.register_endpoint( + "/access/user/password_by_name", RequestType.POST, self._handle_password_reset_by_name, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + self.server.register_endpoint( + "/access/api_key", RequestType.GET | RequestType.POST, + self._handle_apikey_request, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + self.server.register_endpoint( + "/access/oneshot_token", RequestType.GET, self._handle_oneshot_request, + transports=TransportType.HTTP | TransportType.WEBSOCKET + ) + self.server.register_endpoint( + "/access/info", RequestType.GET, self._handle_info_request, + transports=TransportType.HTTP | TransportType.WEBSOCKET, + auth_required=False + ) + wsm: WebsocketManager = self.server.lookup_component("websockets") + wsm.register_notification("authorization:user_created") + wsm.register_notification( + "authorization:user_deleted", event_type="logout" + ) + wsm.register_notification( + "authorization:user_logged_out", event_type="logout" + ) + + async def component_init(self) -> None: + # Populate users from database + cursor = await self.user_table.execute(f"SELECT * FROM {USER_TABLE}") + self.users = {row[0]: UserInfo(**dict(row)) for row in await cursor.fetchall()} + need_sync = self._initialize_users() + if need_sync: + await self._sync_user_table() + self.prune_timer.start(delay=PRUNE_CHECK_TIME) + + async def _sync_user(self, username: str) -> None: + user = self.users[username] + vals = user.as_tuple() + placeholders = ",".join("?" * len(vals)) + async with self.user_table as tx: + await tx.execute( + f"REPLACE INTO {USER_TABLE} VALUES({placeholders})", vals + ) + + async def _sync_user_table(self) -> None: + async with self.user_table as tx: + await tx.execute(f"DELETE FROM {USER_TABLE}") + user_vals: List[Tuple[Any, ...]] + user_vals = [user.as_tuple() for user in self.users.values()] + if not user_vals: + return + placeholders = ",".join("?" * len(user_vals[0])) + await tx.executemany( + f"INSERT INTO {USER_TABLE} VALUES({placeholders})", user_vals + ) + + def _initialize_users(self) -> bool: + need_sync = False + api_user: Optional[UserInfo] = self.users.get(API_USER, None) + if api_user is None: + need_sync = True + self.users[API_USER] = UserInfo(username=API_USER, password=self.api_key) + else: + self.api_key = api_user.password + for username, user_info in list(self.users.items()): + if username == API_USER: + continue + # generate jwks for valid users + if user_info.jwt_secret is not None: + try: + priv_key = self._load_private_key(user_info.jwt_secret) + jwk_id = user_info.jwk_id + assert jwk_id is not None + except (self.server.error, KeyError, AssertionError): + logging.info("Invalid jwk found for user, removing") + user_info.jwt_secret = None + user_info.jwk_id = None + self.users[username] = user_info + need_sync = True + continue + self.public_jwks[jwk_id] = self._generate_public_jwk(priv_key) + return need_sync + + async def _handle_apikey_request(self, web_request: WebRequest) -> str: + if web_request.get_request_type() == RequestType.POST: + self.api_key = uuid.uuid4().hex + self.users[API_USER].password = self.api_key + await self._sync_user(API_USER) + return self.api_key + + async def _handle_oneshot_request(self, web_request: WebRequest) -> str: + ip = web_request.get_ip_address() + assert ip is not None + user_info = web_request.get_current_user() + return self.get_oneshot_token(ip, user_info) + + async def _handle_login(self, web_request: WebRequest) -> Dict[str, Any]: + ip = web_request.get_ip_address() + if ip is not None and self.check_logins_maxed(ip): + raise HTTPError( + 401, "Unauthorized, Maximum Login Attempts Reached" + ) + try: + ret = await self._login_jwt_user(web_request) + except asyncio.CancelledError: + raise + except Exception: + if ip is not None: + failed = self.failed_logins.get(ip, 0) + self.failed_logins[ip] = failed + 1 + raise + if ip is not None: + self.failed_logins.pop(ip, None) + return ret + + async def _handle_logout(self, web_request: WebRequest) -> Dict[str, str]: + user_info = web_request.get_current_user() + if user_info is None: + raise self.server.error("No user logged in") + username: str = user_info.username + if username in RESERVED_USERS: + raise self.server.error( + f"Invalid log out request for user {username}") + jwk_id: Optional[str] = self.users[username].jwk_id + self.users[username].jwt_secret = None + self.users[username].jwk_id = None + if jwk_id is not None: + self.public_jwks.pop(jwk_id, None) + await self._sync_user(username) + eventloop = self.server.get_event_loop() + eventloop.delay_callback( + .005, self.server.send_event, "authorization:user_logged_out", + {'username': username} + ) + return { + "username": username, + "action": "user_logged_out" + } + + async def _handle_info_request(self, web_request: WebRequest) -> Dict[str, Any]: + sources = ["moonraker"] + if self.ldap is not None: + sources.append("ldap") + login_req = self.force_logins and len(self.users) > 1 + request_trusted: Optional[bool] = None + user = web_request.get_current_user() + req_ip = web_request.ip_addr + if user is not None and user.username == TRUSTED_USER: + request_trusted = True + elif req_ip is not None: + request_trusted = await self._check_authorized_ip(req_ip) + return { + "default_source": self.default_source, + "available_sources": sources, + "login_required": login_req, + "trusted": request_trusted + } + + async def _handle_refresh_jwt(self, + web_request: WebRequest + ) -> Dict[str, str]: + refresh_token: str = web_request.get_str('refresh_token') + try: + user_info = self.decode_jwt(refresh_token, token_type="refresh") + except Exception: + raise self.server.error("Invalid Refresh Token", 401) + username: str = user_info.username + if user_info.jwt_secret is None or user_info.jwk_id is None: + raise self.server.error("User not logged in", 401) + private_key = self._load_private_key(user_info.jwt_secret) + jwk_id: str = user_info.jwk_id + token = self._generate_jwt(username, jwk_id, private_key) + return { + 'username': username, + 'token': token, + 'source': user_info.source, + 'action': 'user_jwt_refresh' + } + + async def _handle_user_request( + self, web_request: WebRequest + ) -> Dict[str, Any]: + req_type = web_request.get_request_type() + if req_type == RequestType.GET: + user = web_request.get_current_user() + if user is None: + return { + "username": None, + "source": None, + "created_on": None, + } + else: + return { + "username": user.username, + "source": user.source, + "created_on": user.created_on + } + elif req_type == RequestType.POST: + # Create User + return await self._login_jwt_user(web_request, create=True) + elif req_type == RequestType.DELETE: + # Delete User + return await self._delete_jwt_user(web_request) + raise self.server.error("Invalid Request Method") + + async def _handle_list_request(self, + web_request: WebRequest + ) -> Dict[str, List[Dict[str, Any]]]: + user_list = [] + for user in self.users.values(): + if user.username == API_USER: + continue + user_list.append({ + 'username': user.username, + 'source': user.source, + 'created_on': user.created_on + }) + return { + 'users': user_list + } + + async def _handle_password_reset(self, + web_request: WebRequest + ) -> Dict[str, str]: + password: str = web_request.get_str('password') + new_pass: str = web_request.get_str('new_password') + user_info = web_request.get_current_user() + if user_info is None: + raise self.server.error("No Current User") + username = user_info.username + if user_info.source == "ldap": + raise self.server.error( + f"Can´t Reset password for ldap user {username}") + if username in RESERVED_USERS: + raise self.server.error( + f"Invalid Reset Request for user {username}") + salt = bytes.fromhex(user_info.salt) + hashed_pass = hashlib.pbkdf2_hmac( + 'sha256', password.encode(), salt, HASH_ITER).hex() + if hashed_pass != user_info.password: + raise self.server.error("Invalid Password") + new_hashed_pass = hashlib.pbkdf2_hmac( + 'sha256', new_pass.encode(), salt, HASH_ITER).hex() + self.users[username].password = new_hashed_pass + await self._sync_user(username) + return { + 'username': username, + 'action': "user_password_reset" + } + + async def _handle_password_reset_by_name(self, + web_request: WebRequest + ) -> Dict[str, str]: + username: str = web_request.get_str('username') + new_pass: str = web_request.get_str('new_password') + + user_info = self.users[username] + if user_info.source == "ldap": + raise self.server.error( + f"Can´t Reset password for ldap user {username}") + if username in RESERVED_USERS: + raise self.server.error( + f"Invalid Reset Request for user {username}") + salt = bytes.fromhex(user_info.salt) + new_hashed_pass = hashlib.pbkdf2_hmac( + 'sha256', new_pass.encode(), salt, HASH_ITER).hex() + self.users[username].password = new_hashed_pass + await self._sync_user(username) + return { + 'username': username, + 'action': "user_password_reset_by_name" + } + + async def _login_jwt_user( + self, web_request: WebRequest, create: bool = False + ) -> Dict[str, Any]: + username: str = web_request.get_str('username') + password: str = web_request.get_str('password') + source: str = web_request.get_str( + 'source', self.default_source + ).lower() + if source not in AUTH_SOURCES: + raise self.server.error(f"Invalid 'source': {source}") + user_info: UserInfo + if username in RESERVED_USERS: + raise self.server.error( + f"Invalid Request for user {username}") + if source == "ldap": + if create: + raise self.server.error("Cannot Create LDAP User") + if self.ldap is None: + raise self.server.error( + "LDAP authentication not available", 401 + ) + await self.ldap.authenticate_ldap_user(username, password) + if username not in self.users: + create = True + if create: + if username in self.users: + raise self.server.error(f"User {username} already exists") + salt = secrets.token_bytes(32) + hashed_pass = hashlib.pbkdf2_hmac( + 'sha256', password.encode(), salt, HASH_ITER).hex() + user_info = UserInfo( + username=username, + password=hashed_pass, + salt=salt.hex(), + source=source, + ) + self.users[username] = user_info + await self._sync_user(username) + action = "user_created" + if source == "ldap": + # Dont notify user created + action = "user_logged_in" + create = False + else: + if username not in self.users: + raise self.server.error(f"Unregistered User: {username}") + user_info = self.users[username] + auth_src = user_info.source + if auth_src != source: + raise self.server.error( + f"Moonraker cannot authenticate user '{username}', must " + f"specify source '{auth_src}'", 401 + ) + salt = bytes.fromhex(user_info.salt) + hashed_pass = hashlib.pbkdf2_hmac( + 'sha256', password.encode(), salt, HASH_ITER).hex() + action = "user_logged_in" + if hashed_pass != user_info.password: + raise self.server.error("Invalid Password") + jwt_secret_hex: Optional[str] = user_info.jwt_secret + if jwt_secret_hex is None: + private_key = Signer() + jwk_id = base64url_encode(secrets.token_bytes()).decode() + user_info.jwt_secret = private_key.hex_seed().decode() # type: ignore + user_info.jwk_id = jwk_id + self.users[username] = user_info + await self._sync_user(username) + self.public_jwks[jwk_id] = self._generate_public_jwk(private_key) + else: + private_key = self._load_private_key(jwt_secret_hex) + if user_info.jwk_id is None: + user_info.jwk_id = base64url_encode(secrets.token_bytes()).decode() + jwk_id = user_info.jwk_id + token = self._generate_jwt(username, jwk_id, private_key) + refresh_token = self._generate_jwt( + username, jwk_id, private_key, token_type="refresh", + exp_time=datetime.timedelta(days=self.login_timeout)) + conn = web_request.get_client_connection() + if create: + event_loop = self.server.get_event_loop() + event_loop.delay_callback( + .005, self.server.send_event, + "authorization:user_created", + {'username': username}) + elif conn is not None: + conn.user_info = user_info + return { + 'username': username, + 'token': token, + 'source': user_info.source, + 'refresh_token': refresh_token, + 'action': action + } + + async def _delete_jwt_user(self, web_request: WebRequest) -> Dict[str, str]: + username: str = web_request.get_str('username') + current_user = web_request.get_current_user() + if current_user is not None: + curname = current_user.username + if curname == username: + raise self.server.error(f"Cannot delete logged in user {curname}") + if username in RESERVED_USERS: + raise self.server.error( + f"Invalid Request for reserved user {username}") + user_info: Optional[UserInfo] = self.users.get(username) + if user_info is None: + raise self.server.error(f"No registered user: {username}") + if user_info.jwk_id is not None: + self.public_jwks.pop(user_info.jwk_id, None) + del self.users[username] + async with self.user_table as tx: + await tx.execute( + f"DELETE FROM {USER_TABLE} WHERE username = ?", (username,) + ) + event_loop = self.server.get_event_loop() + event_loop.delay_callback( + .005, self.server.send_event, + "authorization:user_deleted", + {'username': username}) + return { + "username": username, + "action": "user_deleted" + } + + def _generate_jwt(self, + username: str, + jwk_id: str, + private_key: Signer, + token_type: str = "access", + exp_time: datetime.timedelta = JWT_EXP_TIME + ) -> str: + curtime = int(time.time()) + payload = { + 'iss': self.issuer, + 'aud': "Moonraker", + 'iat': curtime, + 'exp': curtime + int(exp_time.total_seconds()), + 'username': username, + 'token_type': token_type + } + header = {'kid': jwk_id} + header.update(JWT_HEADER) + jwt_header = base64url_encode(jsonw.dumps(header)) + jwt_payload = base64url_encode(jsonw.dumps(payload)) + jwt_msg = b".".join([jwt_header, jwt_payload]) + sig = private_key.signature(jwt_msg) + jwt_sig = base64url_encode(sig) + return b".".join([jwt_msg, jwt_sig]).decode() + + def decode_jwt( + self, token: str, token_type: str = "access", check_exp: bool = True + ) -> UserInfo: + message, sig = token.rsplit('.', maxsplit=1) + enc_header, enc_payload = message.split('.') + header: Dict[str, Any] = jsonw.loads(base64url_decode(enc_header)) + sig_bytes = base64url_decode(sig) + + # verify header + if header.get('typ') != "JWT" or header.get('alg') != "EdDSA": + raise self.server.error("Invalid JWT header") + jwk_id: Optional[str] = header.get('kid') + if jwk_id not in self.public_jwks: + raise self.server.error("Invalid key ID") + + # validate signature + public_key = self._public_key_from_jwk(self.public_jwks[jwk_id]) + public_key.verify(sig_bytes + message.encode()) + + # validate claims + payload: Dict[str, Any] = jsonw.loads(base64url_decode(enc_payload)) + if payload['token_type'] != token_type: + raise self.server.error( + f"JWT Token type mismatch: Expected {token_type}, " + f"Recd: {payload['token_type']}", 401) + if payload['iss'] != self.issuer: + raise self.server.error("Invalid JWT Issuer", 401) + if payload['aud'] != "Moonraker": + raise self.server.error("Invalid JWT Audience", 401) + if check_exp and payload['exp'] < int(time.time()): + raise self.server.error("JWT Expired", 401) + + # get user + user_info: Optional[UserInfo] = self.users.get( + payload.get('username', ""), None) + if user_info is None: + raise self.server.error("Unknown user", 401) + return user_info + + def validate_jwt(self, token: str) -> UserInfo: + try: + user_info = self.decode_jwt(token) + except Exception as e: + if isinstance(e, self.server.error): + raise + raise self.server.error( + f"Failed to decode JWT: {e}", 401 + ) from e + return user_info + + def validate_api_key(self, api_key: str) -> UserInfo: + if not self.enable_api_key: + raise self.server.error("API Key authentication is disabled", 401) + if api_key and api_key == self.api_key: + return self.users[API_USER] + raise self.server.error("Invalid API Key", 401) + + def _load_private_key(self, secret: str) -> Signer: + try: + key = Signer(bytes.fromhex(secret)) + except Exception: + raise self.server.error( + "Error decoding private key, user data may" + " be corrupt", 500) from None + return key + + def _generate_public_jwk(self, private_key: Signer) -> Dict[str, Any]: + public_key = private_key.vk + return { + 'x': base64url_encode(public_key).decode(), + 'kty': "OKP", + 'crv': "Ed25519", + 'use': "sig" + } + + def _public_key_from_jwk(self, jwk: Dict[str, Any]) -> Verifier: + if jwk.get('kty') != "OKP": + raise self.server.error("Not an Octet Key Pair") + if jwk.get('crv') != "Ed25519": + raise self.server.error("Invalid Curve") + if 'x' not in jwk: + raise self.server.error("No 'x' argument in jwk") + key = base64url_decode(jwk['x']) + return Verifier(key.hex().encode()) + + def _prune_conn_handler(self, eventtime: float) -> float: + cur_time = time.time() + for ip, user_info in list(self.trusted_users.items()): + exp_time: float = user_info['expires_at'] + if cur_time >= exp_time: + self.trusted_users.pop(ip, None) + logging.info(f"Trusted Connection Expired, IP: {ip}") + for ip, fqdn_info in list(self.fqdn_cache.items()): + exp_time = fqdn_info["expires_at"] + if cur_time >= exp_time: + domain: str = fqdn_info["domain"] + self.fqdn_cache.pop(ip, None) + logging.info(f"Cached FQDN Expired, IP: {ip}, domain: {domain}") + return eventtime + PRUNE_CHECK_TIME + + def _oneshot_token_expire_handler(self, token): + self.oneshot_tokens.pop(token, None) + + def get_oneshot_token(self, ip_addr: IPAddr, user: Optional[UserInfo]) -> str: + token = base64.b32encode(os.urandom(20)).decode() + event_loop = self.server.get_event_loop() + hdl = event_loop.delay_callback( + ONESHOT_TIMEOUT, self._oneshot_token_expire_handler, token) + self.oneshot_tokens[token] = (ip_addr, user, hdl) + return token + + def _check_json_web_token( + self, request: HTTPServerRequest, required: bool = True + ) -> Optional[UserInfo]: + auth_token: Optional[str] = request.headers.get("Authorization") + if auth_token is None: + auth_token = request.headers.get("X-Access-Token") + if auth_token is None: + qtoken = request.query_arguments.get('access_token', None) + if qtoken is not None: + auth_token = qtoken[-1].decode(errors="ignore") + elif auth_token.startswith("Bearer "): + auth_token = auth_token[7:] + else: + return None + if auth_token: + try: + return self.decode_jwt(auth_token, check_exp=required) + except Exception: + logging.exception(f"JWT Decode Error {auth_token}") + raise HTTPError(401, "JWT Decode Error") + return None + + async def _check_authorized_ip(self, ip: IPAddr) -> bool: + if ip in self.trusted_ips: + return True + for rng in self.trusted_ranges: + if ip in rng: + return True + if self.trusted_domains: + if ip in self.fqdn_cache: + fqdn: str = self.fqdn_cache[ip]["domain"] + else: + eventloop = self.server.get_event_loop() + try: + fut = eventloop.run_in_thread(socket.getfqdn, str(ip)) + fqdn = await asyncio.wait_for(fut, 5.0) + except asyncio.TimeoutError: + logging.info("Call to socket.getfqdn() timed out") + return False + else: + fqdn = fqdn.lower() + self.fqdn_cache[ip] = { + "expires_at": time.time() + FQDN_CACHE_TIMEOUT, + "domain": fqdn + } + return fqdn in self.trusted_domains + return False + + async def _check_trusted_connection( + self, ip: Optional[IPAddr] + ) -> Optional[UserInfo]: + if ip is not None: + curtime = time.time() + exp_time = curtime + TRUSTED_CONNECTION_TIMEOUT + if ip in self.trusted_users: + self.trusted_users[ip]["expires_at"] = exp_time + return self.trusted_users[ip]["user"] + elif await self._check_authorized_ip(ip): + logging.info( + f"Trusted Connection Detected, IP: {ip}") + self.trusted_users[ip] = { + "user": UserInfo(TRUSTED_USER, "", curtime), + "expires_at": exp_time + } + return self.trusted_users[ip]["user"] + return None + + def _check_oneshot_token( + self, token: str, cur_ip: Optional[IPAddr] + ) -> Optional[UserInfo]: + if token in self.oneshot_tokens: + ip_addr, user, hdl = self.oneshot_tokens.pop(token) + hdl.cancel() + if cur_ip != ip_addr: + logging.info(f"Oneshot Token IP Mismatch: expected{ip_addr}" + f", Recd: {cur_ip}") + return None + return user + else: + return None + + def check_logins_maxed(self, ip_addr: IPAddr) -> bool: + if self.max_logins is None: + return False + return self.failed_logins.get(ip_addr, 0) >= self.max_logins + + async def authenticate_request( + self, request: HTTPServerRequest, auth_required: bool = True + ) -> Optional[UserInfo]: + if request.method == "OPTIONS": + return None + + # Allow local request + try: + # logging.info(f"request.remote_ip: {request.remote_ip}, is_loopback: {ipaddress.ip_address(request.remote_ip).is_loopback}") # type: ignore + ip = ipaddress.ip_address(request.remote_ip) # type: ignore + if ip.is_loopback: + return None + except ValueError: + logging.exception( + f"Unable to Create IP Address {request.remote_ip}") + ip = None + + # Check JSON Web Token + jwt_user = self._check_json_web_token(request, auth_required) + if jwt_user is not None: + return jwt_user + + # Check oneshot access token + ost: Optional[List[bytes]] = request.arguments.get('token', None) + if ost is not None: + ost_user = self._check_oneshot_token(ost[-1].decode(), ip) + if ost_user is not None: + return ost_user + + # Check API Key Header + if self.enable_api_key: + key: Optional[str] = request.headers.get("X-Api-Key") + if key and key == self.api_key: + return self.users[API_USER] + + # If the force_logins option is enabled and at least one user is created + # then trusted user authentication is disabled + if self.force_logins and len(self.users) > 1: + if not auth_required: + return None + raise HTTPError(401, "Unauthorized, Force Logins Enabled") + + # Check if IP is trusted. If this endpoint doesn't require authentication + # then it is acceptable to return None + trusted_user = await self._check_trusted_connection(ip) + if trusted_user is not None: + return trusted_user + if not auth_required: + return None + + raise HTTPError(401, "Unauthorized") + + async def check_cors(self, origin: Optional[str]) -> bool: + if origin is None or not self.cors_domains: + return False + for regex in self.cors_domains: + match = re.match(regex, origin) + if match is not None: + if match.group() == origin: + logging.debug(f"CORS Pattern Matched, origin: {origin} " + f" | pattern: {regex}") + return True + else: + logging.debug(f"Partial Cors Match: {match.group()}") + else: + # Check to see if the origin contains an IP that matches a + # current trusted connection + match = re.search(r"^https?://([^/:]+)", origin) + if match is not None: + ip = match.group(1) + try: + ipaddr = ipaddress.ip_address(ip) + except ValueError: + pass + else: + if await self._check_authorized_ip(ipaddr): + logging.debug(f"Cors request matched trusted IP: {ip}") + return True + logging.debug(f"No CORS match for origin: {origin}\n" + f"Patterns: {self.cors_domains}") + return False + + def cors_enabled(self) -> bool: + return self.cors_domains is not None + + def close(self) -> None: + self.prune_timer.stop() + + +def load_component(config: ConfigHelper) -> Authorization: + return Authorization(config) diff --git a/moonraker/file_manager/metadata.py b/moonraker/file_manager/metadata.py new file mode 100644 index 0000000..1a3dfa3 --- /dev/null +++ b/moonraker/file_manager/metadata.py @@ -0,0 +1,1178 @@ +#!/usr/bin/env python3 +# GCode metadata extraction utility +# +# Copyright (C) 2020 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations +import json +import argparse +import re +import os +import sys +import base64 +import traceback +import tempfile +import zipfile +import shutil +import uuid +import logging +from PIL import Image + +# Annotation imports +from typing import ( + TYPE_CHECKING, + Any, + Optional, + Dict, + List, + Tuple, + Type, +) +if TYPE_CHECKING: + pass + +UFP_MODEL_PATH = "/3D/model.gcode" +UFP_THUMB_PATH = "/Metadata/thumbnail.png" + +logging.basicConfig(stream=sys.stderr, level=logging.INFO) +logger = logging.getLogger("metadata") + +# Regex helpers. These methods take patterns with placeholders +# to insert the correct regex capture group for floats, ints, +# and strings: +# Float: (%F) = (\d*\.?\d+) +# Integer: (%D) = (\d+) +# String: (%S) = (.+) +def regex_find_floats(pattern: str, data: str) -> List[float]: + pattern = pattern.replace(r"(%F)", r"([0-9]*\.?[0-9]+)") + matches = re.findall(pattern, data) + if matches: + # return the maximum height value found + try: + return [float(h) for h in matches] + except Exception: + pass + return [] + +def regex_find_ints(pattern: str, data: str) -> List[int]: + pattern = pattern.replace(r"(%D)", r"([0-9]+)") + matches = re.findall(pattern, data) + if matches: + # return the maximum height value found + try: + return [int(h) for h in matches] + except Exception: + pass + return [] + +def regex_find_float(pattern: str, data: str) -> Optional[float]: + pattern = pattern.replace(r"(%F)", r"([0-9]*\.?[0-9]+)") + match = re.search(pattern, data) + val: Optional[float] = None + if match: + try: + val = float(match.group(1)) + except Exception: + return None + return val + +def regex_find_int(pattern: str, data: str) -> Optional[int]: + pattern = pattern.replace(r"(%D)", r"([0-9]+)") + match = re.search(pattern, data) + val: Optional[int] = None + if match: + try: + val = int(match.group(1)) + except Exception: + return None + return val + +def regex_find_string(pattern: str, data: str) -> Optional[str]: + pattern = pattern.replace(r"(%S)", r"(.*)") + match = re.search(pattern, data) + if match: + return match.group(1).strip('"') + return None + +def regex_find_min_float(pattern: str, data: str) -> Optional[float]: + result = regex_find_floats(pattern, data) + return min(result) if result else None + +def regex_find_max_float(pattern: str, data: str) -> Optional[float]: + result = regex_find_floats(pattern, data) + return max(result) if result else None + + +# Slicer parsing implementations +class BaseSlicer(object): + def __init__(self, file_path: str) -> None: + self.path = file_path + self.header_data: str = "" + self.footer_data: str = "" + self.layer_height: Optional[float] = None + self.has_m486_objects: bool = False + + def set_data(self, + header_data: str, + footer_data: str, + fsize: int) -> None: + self.header_data = header_data + self.footer_data = footer_data + self.size: int = fsize + + def _check_has_objects(self, + data: str, + pattern: Optional[str] = None + ) -> bool: + match = re.search( + r"\n((DEFINE_OBJECT)|(EXCLUDE_OBJECT_DEFINE)) NAME=", + data + ) + if match is not None: + # Objects already processed + fname = os.path.basename(self.path) + logger.info( + f"File '{fname}' currently supports cancellation, " + "processing aborted" + ) + if match.group(1).startswith("DEFINE_OBJECT"): + logger.info( + "Legacy object processing detected. This is not " + "compatible with official versions of Klipper." + ) + return False + # Always check M486 + patterns = [r"\nM486"] + if pattern is not None: + patterns.append(pattern) + for regex in patterns: + if re.search(regex, data) is not None: + self.has_m486_objects = regex == r"\nM486" + return True + return False + + def check_identity(self, data: str) -> Optional[Dict[str, str]]: + return None + + def has_objects(self) -> bool: + return self._check_has_objects(self.header_data) + + def parse_gcode_start_byte(self) -> Optional[int]: + m = re.search(r"\n[MG]\d+\s.*\n", self.header_data) + if m is None: + return None + return m.start() + + def parse_gcode_end_byte(self) -> Optional[int]: + rev_data = self.footer_data[::-1] + m = re.search(r"\n.*\s\d+[MG]\n", rev_data) + if m is None: + return None + return self.size - m.start() + + def parse_first_layer_height(self) -> Optional[float]: + return None + + def parse_layer_height(self) -> Optional[float]: + return None + + def parse_object_height(self) -> Optional[float]: + return None + + def parse_filament_total(self) -> Optional[float]: + return None + + def parse_filament_weight_total(self) -> Optional[float]: + return None + + def parse_filament_name(self) -> Optional[str]: + return None + + def parse_filament_type(self) -> Optional[str]: + return None + + def parse_estimated_time(self) -> Optional[float]: + return None + + def parse_first_layer_bed_temp(self) -> Optional[float]: + return None + + def parse_chamber_temp(self) -> Optional[float]: + return None + + def parse_first_layer_extr_temp(self) -> Optional[float]: + return None + + def parse_thumbnails(self) -> Optional[List[Dict[str, Any]]]: + for data in [self.header_data, self.footer_data]: + thumb_matches: List[str] = re.findall( + r"; thumbnail begin[;/\+=\w\s]+?; thumbnail end", data) + if thumb_matches: + break + else: + return None + thumb_dir = os.path.join(os.path.dirname(self.path), ".thumbs") + if not os.path.exists(thumb_dir): + try: + os.mkdir(thumb_dir) + except Exception: + logger.info(f"Unable to create thumb dir: {thumb_dir}") + return None + thumb_base = os.path.splitext(os.path.basename(self.path))[0] + parsed_matches: List[Dict[str, Any]] = [] + #has_miniature: bool = False + for match in thumb_matches: + lines = re.split(r"\r?\n", match.replace('; ', '')) + info = regex_find_ints(r"(%D)", lines[0]) + data = "".join(lines[1:-1]) + if len(info) != 3: + logger.info( + f"MetadataError: Error parsing thumbnail" + f" header: {lines[0]}") + continue + if len(data) != info[2]: + logger.info( + f"MetadataError: Thumbnail Size Mismatch: " + f"detected {info[2]}, actual {len(data)}") + continue + thumb_name = f"{thumb_base}-{info[0]}x{info[1]}.png" + thumb_path = os.path.join(thumb_dir, thumb_name) + thumb_jpg_name = f"{thumb_base}-{info[0]}x{info[1]}.jpg" + thumb_jpg_path = os.path.join(thumb_dir, thumb_jpg_name) + rel_thumb_path = os.path.join(".thumbs", thumb_name) + with open(thumb_path, "wb") as f: + f.write(base64.b64decode(data.encode())) + with Image.open(thumb_path) as img: + if img.mode != "RGBA": + img = img.convert("RGBA") + new_img = Image.new("RGB", size=(info[0], info[1]), color=(255, 255, 255)) + img = img.resize((info[0], info[1])) + new_img.paste(img, (0, 0), mask=img) + new_img.save(thumb_jpg_path, "JPEG", quality=90) + parsed_matches.append({ + 'width': info[0], 'height': info[1], + 'size': os.path.getsize(thumb_path), + 'relative_path': rel_thumb_path}) + # find the smallest thumb index + smallest_match = parsed_matches[0] + max_size = min_size = smallest_match['size'] + for item in parsed_matches: + if item['size'] < smallest_match['size']: + smallest_match = item + if item["size"] < min_size: + min_size = item["size"] + if item["size"] > max_size: + max_size = item["size"] + # Create thumbnail for screen + thumb_full_name = smallest_match['relative_path'].split("/")[-1] + thumb_path = os.path.join(thumb_dir, f"{thumb_full_name}") + thumb_QD_full_name = f"{thumb_base}-{smallest_match['width']}x{smallest_match['height']}_QD.jpg" + thumb_QD_path = os.path.join(thumb_dir, f"{thumb_QD_full_name}") + rel_path_QD = os.path.join(".thumbs", thumb_QD_full_name) + try: + with Image.open(thumb_path) as img: + if img.mode != "RGBA": + img = img.convert("RGBA") + new_img = Image.new("RGB", size=(smallest_match['width'], smallest_match['height']), color=(255, 255, 255)) + img = img.resize((smallest_match['width'], smallest_match['height'])) + new_img.paste(img, (0, 0), mask=img) + new_img.save(thumb_QD_path, "JPEG", quality=90) + except Exception as e: + logger.info(str(e)) + parsed_matches.append({ + 'width': smallest_match['width'], 'height': smallest_match['height'], + 'size': (max_size + min_size) // 2, + 'relative_path': rel_path_QD}) + return parsed_matches + + def parse_layer_count(self) -> Optional[int]: + return None + + def parse_nozzle_diameter(self) -> Optional[float]: + return None + +class UnknownSlicer(BaseSlicer): + def check_identity(self, data: str) -> Optional[Dict[str, str]]: + return {'slicer': "Unknown"} + + def parse_first_layer_height(self) -> Optional[float]: + return regex_find_min_float(r"G1\sZ(%F)\s", self.header_data) + + def parse_object_height(self) -> Optional[float]: + return regex_find_max_float(r"G1\sZ(%F)\s", self.footer_data) + + def parse_first_layer_extr_temp(self) -> Optional[float]: + return regex_find_float(r"M109 S(%F)", self.header_data) + + def parse_first_layer_bed_temp(self) -> Optional[float]: + return regex_find_float(r"M190 S(%F)", self.header_data) + + def parse_chamber_temp(self) -> Optional[float]: + return regex_find_float(r"M191 S(%F)", self.header_data) + + def parse_thumbnails(self) -> Optional[List[Dict[str, Any]]]: + return None + +class PrusaSlicer(BaseSlicer): + def check_identity(self, data: str) -> Optional[Dict[str, str]]: + aliases = { + 'QIDIStudio': r"QIDIStudio\s(.*)", + 'QIDISlicer': r"QIDISlicer\s(.*)\son", + 'PrusaSlicer': r"PrusaSlicer\s(.*)\son", + 'SuperSlicer': r"SuperSlicer\s(.*)\son", + 'OrcaSlicer': r"OrcaSlicer\s(.*)\son", + 'MomentSlicer': r"MomentSlicer\s(.*)\son", + 'SliCR-3D': r"SliCR-3D\s(.*)\son", + 'BambuStudio': r"BambuStudio[^ ]*\s(.*)\n", + 'A3dp-Slicer': r"A3dp-Slicer\s(.*)\son", + } + for name, expr in aliases.items(): + match = re.search(expr, data) + if match: + return { + 'slicer': name, + 'slicer_version': match.group(1) + } + return None + + def has_objects(self) -> bool: + return self._check_has_objects( + self.header_data, r"\n; printing object") + + def parse_first_layer_height(self) -> Optional[float]: + # Check percentage + pct = regex_find_float(r"; first_layer_height = (%F)%", self.footer_data) + if pct is not None: + if self.layer_height is None: + # Failed to parse the original layer height, so it is not + # possible to calculate a percentage + return None + return round(pct / 100. * self.layer_height, 6) + return regex_find_float(r"; first_layer_height = (%F)", self.footer_data) + + def parse_layer_height(self) -> Optional[float]: + self.layer_height = regex_find_float( + r"; layer_height = (%F)", self.footer_data + ) + return self.layer_height + + def parse_object_height(self) -> Optional[float]: + matches = re.findall( + r";BEFORE_LAYER_CHANGE\n(?:.*\n)?;(\d+\.?\d*)", self.footer_data) + if matches: + try: + matches = [float(m) for m in matches] + except Exception: + pass + else: + return max(matches) + return regex_find_max_float(r"G1\sZ(%F)\sF", self.footer_data) + + def parse_filament_total(self) -> Optional[float]: + line = regex_find_string(r'filament\sused\s\[mm\]\s=\s(%S)\n', self.footer_data) + if line: + filament = regex_find_floats( + r"(%F)", line + ) + if filament: + return sum(filament) + return None + + def parse_filament_weight_total(self) -> Optional[float]: + return regex_find_float( + r"total\sfilament\sused\s\[g\]\s=\s(%F)", + self.footer_data + ) + + def parse_filament_type(self) -> Optional[str]: + return regex_find_string(r";\sfilament_type\s=\s(%S)", self.footer_data) + + def parse_filament_name(self) -> Optional[str]: + return regex_find_string( + r";\sfilament_settings_id\s=\s(%S)", self.footer_data + ) + + def parse_estimated_time(self) -> Optional[float]: + time_match = re.search( + r';\sestimated\sprinting\stime.*', self.footer_data) + if not time_match: + return None + total_time = 0 + time_group = time_match.group() + time_patterns = [(r"(\d+)d", 24*60*60), (r"(\d+)h", 60*60), + (r"(\d+)m", 60), (r"(\d+)s", 1)] + try: + for pattern, multiplier in time_patterns: + t = re.search(pattern, time_group) + if t: + total_time += int(t.group(1)) * multiplier + except Exception: + return None + return round(total_time, 2) + + def parse_first_layer_extr_temp(self) -> Optional[float]: + return regex_find_float( + r"; first_layer_temperature = (%F)", self.footer_data + ) + + def parse_first_layer_bed_temp(self) -> Optional[float]: + return regex_find_float( + r"; first_layer_bed_temperature = (%F)", self.footer_data + ) + + def parse_chamber_temp(self) -> Optional[float]: + return regex_find_float( + r"; chamber_temperature = (%F)", self.footer_data + ) + + def parse_nozzle_diameter(self) -> Optional[float]: + return regex_find_float( + r";\snozzle_diameter\s=\s(%F)", self.footer_data + ) + + def parse_layer_count(self) -> Optional[int]: + return regex_find_int(r"; total layers count = (%D)", self.footer_data) + + def parse_gimage(self) -> Optional[str]: + return regex_find_string( + r";gimage:(.*)", self.footer_data) + + def parse_simage(self) -> Optional[str]: + return regex_find_string( + r";simage:(.*)", self.footer_data) + + def parse_preset_colours(self) -> Optional[List[str]]: + colour_data = regex_find_string(r";\sfilament_colour\s=\s(%S)", self.footer_data) + if colour_data: + colours = [c.strip() for c in colour_data.split(";") if c.strip()] + return colours + return None + def parse_used_extruders(self) -> Optional[List[str]]: + extruder_str = regex_find_string(r";\sused_extruders\s=\s(%S)", self.footer_data) + if extruder_str: + try: + + extruder_list = extruder_str.split(";") + extruders = [] + for e in extruder_list: + if e.strip(): + extruders.append(int(e.strip())) + return extruders + except ValueError: + return None + return None + +class Slic3rPE(PrusaSlicer): + def check_identity(self, data: str) -> Optional[Dict[str, str]]: + match = re.search(r"Slic3r\sPrusa\sEdition\s(.*)\son", data) + if match: + return { + 'slicer': "Slic3r PE", + 'slicer_version': match.group(1) + } + return None + + def parse_filament_total(self) -> Optional[float]: + return regex_find_float(r"filament\sused\s=\s(%F)mm", self.footer_data) + + def parse_thumbnails(self) -> Optional[List[Dict[str, Any]]]: + return None + +class Slic3r(Slic3rPE): + def check_identity(self, data: str) -> Optional[Dict[str, str]]: + match = re.search(r"Slic3r\s(\d.*)\son", data) + if match: + return { + 'slicer': "Slic3r", + 'slicer_version': match.group(1) + } + return None + + def parse_filament_total(self) -> Optional[float]: + filament = regex_find_float( + r";\sfilament\_length\_m\s=\s(%F)", self.footer_data + ) + if filament is not None: + filament *= 1000 + return filament + + def parse_filament_weight_total(self) -> Optional[float]: + return regex_find_float(r";\sfilament\smass\_g\s=\s(%F)", self.footer_data) + + def parse_estimated_time(self) -> Optional[float]: + return None + +class Cura(BaseSlicer): + def check_identity(self, data: str) -> Optional[Dict[str, str]]: + match = re.search(r"Cura_SteamEngine\s(.*)", data) + if match: + return { + 'slicer': "Cura", + 'slicer_version': match.group(1) + } + return None + + def has_objects(self) -> bool: + return self._check_has_objects(self.header_data, r"\n;MESH:") + + def parse_first_layer_height(self) -> Optional[float]: + return regex_find_float(r";MINZ:(%F)", self.header_data) + + def parse_layer_height(self) -> Optional[float]: + self.layer_height = regex_find_float( + r";Layer\sheight:\s(%F)", self.header_data + ) + return self.layer_height + + def parse_object_height(self) -> Optional[float]: + return regex_find_float(r";MAXZ:(%F)", self.header_data) + + def parse_filament_total(self) -> Optional[float]: + filament = regex_find_float(r";Filament\sused:\s(%F)m", self.header_data) + if filament is not None: + filament *= 1000 + return filament + + def parse_filament_weight_total(self) -> Optional[float]: + return regex_find_float(r";Filament\sweight\s=\s.(%F).", self.header_data) + + def parse_filament_type(self) -> Optional[str]: + return regex_find_string(r";Filament\stype\s=\s(%S)", self.header_data) + + def parse_filament_name(self) -> Optional[str]: + return regex_find_string(r";Filament\sname\s=\s(%S)", self.header_data) + + def parse_estimated_time(self) -> Optional[float]: + return regex_find_max_float(r";TIME:(%F)", self.header_data) + + def parse_first_layer_extr_temp(self) -> Optional[float]: + return regex_find_float(r"M109 S(%F)", self.header_data) + + def parse_first_layer_bed_temp(self) -> Optional[float]: + return regex_find_float(r"M190 S(%F)", self.header_data) + + def parse_chamber_temp(self) -> Optional[float]: + return regex_find_float(r"M191 S(%F)", self.header_data) + + def parse_layer_count(self) -> Optional[int]: + return regex_find_int(r";LAYER_COUNT\:(%D)", self.header_data) + + def parse_nozzle_diameter(self) -> Optional[float]: + return regex_find_float(r";Nozzle\sdiameter\s=\s(%F)", self.header_data) + + def parse_thumbnails(self) -> Optional[List[Dict[str, Any]]]: + # Attempt to parse thumbnails from file metadata + thumbs = super().parse_thumbnails() + if thumbs is not None: + return thumbs + # Check for thumbnails extracted from the ufp + thumb_dir = os.path.join(os.path.dirname(self.path), ".thumbs") + thumb_base = os.path.splitext(os.path.basename(self.path))[0] + thumb_path = os.path.join(thumb_dir, f"{thumb_base}.png") + rel_path_full = os.path.join(".thumbs", f"{thumb_base}.png") + rel_path_small = os.path.join(".thumbs", f"{thumb_base}-32x32.png") + thumb_path_small = os.path.join(thumb_dir, f"{thumb_base}-32x32.png") + if not os.path.isfile(thumb_path): + return None + # read file + thumbs = [] + try: + with Image.open(thumb_path) as im: + thumbs.append({ + 'width': im.width, 'height': im.height, + 'size': os.path.getsize(thumb_path), + 'relative_path': rel_path_full + }) + # Create 32x32 thumbnail + im.thumbnail((32, 32), Image.Resampling.LANCZOS) + im.save(thumb_path_small, format="PNG") + thumbs.insert(0, { + 'width': im.width, 'height': im.height, + 'size': os.path.getsize(thumb_path_small), + 'relative_path': rel_path_small + }) + except Exception as e: + logger.info(str(e)) + return None + return thumbs + + def parse_gimage(self) -> Optional[str]: + return regex_find_string( + r";gimage:(.*)", self.header_data) + + def parse_simage(self) -> Optional[str]: + return regex_find_string( + r";simage:(.*)", self.header_data) + +class Simplify3D(BaseSlicer): + def check_identity(self, data: str) -> Optional[Dict[str, str]]: + match = re.search(r"Simplify3D\(R\)\sVersion\s(.*)", data) + if match: + self._version = match.group(1) + self._is_v5 = self._version.startswith("5") + return { + 'slicer': "Simplify3D", + 'slicer_version': match.group(1) + } + return None + + def parse_first_layer_height(self) -> Optional[float]: + return regex_find_min_float(r"G1\sZ(%F)\s", self.header_data) + + def parse_layer_height(self) -> Optional[float]: + self.layer_height = regex_find_float( + r";\s+layerHeight,(%F)", self.header_data + ) + return self.layer_height + + def parse_object_height(self) -> Optional[float]: + return regex_find_max_float(r"G1\sZ(%F)\s", self.footer_data) + + def parse_filament_total(self) -> Optional[float]: + return regex_find_float( + r";\s+(?:Filament\slength|Material\sLength):\s(%F)\smm", + self.footer_data + ) + + def parse_filament_weight_total(self) -> Optional[float]: + return regex_find_float( + r";\s+(?:Plastic\sweight|Material\sWeight):\s(%F)\sg", + self.footer_data + ) + + def parse_filament_name(self) -> Optional[str]: + return regex_find_string( + r";\s+printMaterial,(%S)", self.header_data) + + def parse_filament_type(self) -> Optional[str]: + return regex_find_string( + r";\s+makerBotModelMaterial,(%S)", self.footer_data) + + def parse_estimated_time(self) -> Optional[float]: + time_match = re.search(r';\s+Build (t|T)ime:.*', self.footer_data) + if not time_match: + return None + total_time = 0 + time_group = time_match.group() + time_patterns = [(r"(\d+)\shours?", 60*60), (r"(\d+)\smin", 60), + (r"(\d+)\ssec", 1)] + try: + for pattern, multiplier in time_patterns: + t = re.search(pattern, time_group) + if t: + total_time += int(t.group(1)) * multiplier + except Exception: + return None + return round(total_time, 2) + + def _get_temp_items(self, pattern: str) -> List[str]: + match = re.search(pattern, self.header_data) + if match is None: + return [] + return match.group().split(",")[1:] + + def _get_first_layer_temp(self, heater: str) -> Optional[float]: + heaters = self._get_temp_items(r"temperatureName.*") + temps = self._get_temp_items(r"temperatureSetpointTemperatures.*") + for h, temp in zip(heaters, temps): + if h == heater: + try: + return float(temp) + except Exception: + return None + return None + + def _get_first_layer_temp_v5(self, heater_type: str) -> Optional[float]: + pattern = ( + r";\s+temperatureController,.+?" + r";\s+temperatureType,"f"{heater_type}"r".+?" + r";\s+temperatureSetpoints,\d+\|(\d+)" + ) + match = re.search(pattern, self.header_data, re.MULTILINE | re.DOTALL) + if match is not None: + try: + return float(match.group(1)) + except Exception: + return None + return None + + def parse_first_layer_extr_temp(self) -> Optional[float]: + if self._is_v5: + return self._get_first_layer_temp_v5("extruder") + else: + return self._get_first_layer_temp("Extruder 1") + + def parse_first_layer_bed_temp(self) -> Optional[float]: + if self._is_v5: + return self._get_first_layer_temp_v5("platform") + else: + return self._get_first_layer_temp("Heated Bed") + + def parse_nozzle_diameter(self) -> Optional[float]: + return regex_find_float( + r";\s+(?:extruderDiameter|nozzleDiameter),(%F)", + self.header_data + ) + +class KISSlicer(BaseSlicer): + def check_identity(self, data: str) -> Optional[Dict[str, Any]]: + match = re.search(r";\sKISSlicer", data) + if match: + ident = {'slicer': "KISSlicer"} + vmatch = re.search(r";\sversion\s(.*)", data) + if vmatch: + version = vmatch.group(1).replace(" ", "-") + ident['slicer_version'] = version + return ident + return None + + def parse_first_layer_height(self) -> Optional[float]: + return regex_find_float( + r";\s+first_layer_thickness_mm\s=\s(%F)", self.header_data) + + def parse_layer_height(self) -> Optional[float]: + self.layer_height = regex_find_float( + r";\s+max_layer_thickness_mm\s=\s(%F)", self.header_data) + return self.layer_height + + def parse_object_height(self) -> Optional[float]: + return regex_find_max_float( + r";\sEND_LAYER_OBJECT\sz=(%F)", self.footer_data) + + def parse_filament_total(self) -> Optional[float]: + filament = regex_find_floats( + r";\s+Ext #\d+\s+=\s+(%F)\s*mm", self.footer_data) + if filament: + return sum(filament) + return None + + def parse_estimated_time(self) -> Optional[float]: + time = regex_find_float( + r";\sCalculated.*Build\sTime:\s(%F)\sminutes", + self.footer_data) + if time is not None: + time *= 60 + return round(time, 2) + return None + + def parse_first_layer_extr_temp(self) -> Optional[float]: + return regex_find_float(r"; first_layer_C = (%F)", self.header_data) + + def parse_first_layer_bed_temp(self) -> Optional[float]: + return regex_find_float(r"; bed_C = (%F)", self.header_data) + + def parse_chamber_temp(self) -> Optional[float]: + return regex_find_float(r"; chamber_C = (%F)", self.header_data) + + +class IdeaMaker(BaseSlicer): + def check_identity(self, data: str) -> Optional[Dict[str, str]]: + match = re.search(r"\sideaMaker\s(.*),", data) + if match: + return { + 'slicer': "IdeaMaker", + 'slicer_version': match.group(1) + } + return None + + def has_objects(self) -> bool: + return self._check_has_objects(self.header_data, r"\n;PRINTING:") + + def parse_first_layer_height(self) -> Optional[float]: + return regex_find_float( + r";LAYER:0\s*.*\s*;HEIGHT:(%F)", self.header_data + ) + + def parse_layer_height(self) -> Optional[float]: + return regex_find_float( + r";LAYER:1\s*.*\s*;HEIGHT:(%F)", self.header_data + ) + + def parse_object_height(self) -> Optional[float]: + return regex_find_float(r";Bounding Box:(?:\s+(%F))+", self.header_data) + + def parse_filament_total(self) -> Optional[float]: + filament = regex_find_floats( + r";Material.\d\sUsed:\s+(%F)", self.footer_data + ) + if filament: + return sum(filament) + return None + + def parse_filament_type(self) -> Optional[str]: + return ( + regex_find_string(r";Filament\sType\s.\d:\s(%S)", self.header_data) or + regex_find_string(r";Filament\stype\s=\s(%S)", self.header_data) + ) + + def parse_filament_name(self) -> Optional[str]: + return ( + regex_find_string(r";Filament\sName\s.\d:\s(%S)", self.header_data) or + regex_find_string(r";Filament\sname\s=\s(%S)", self.header_data) + ) + + def parse_filament_weight_total(self) -> Optional[float]: + pi = 3.141592653589793 + length = regex_find_floats( + r";Material.\d\sUsed:\s+(%F)", self.footer_data) + diameter = regex_find_floats( + r";Filament\sDiameter\s.\d:\s+(%F)", self.header_data) + density = regex_find_floats( + r";Filament\sDensity\s.\d:\s+(%F)", self.header_data) + if len(length) == len(density) == len(diameter): + # calc individual weight for each filament with m=pi/4*d²*l*rho + weights = [(pi/4 * diameter[i]**2 * length[i] * density[i]/10**6) + for i in range(len(length))] + return sum(weights) + return None + + def parse_estimated_time(self) -> Optional[float]: + return regex_find_float(r";Print\sTime:\s(%F)", self.footer_data) + + def parse_first_layer_extr_temp(self) -> Optional[float]: + return regex_find_float(r"M109 T0 S(%F)", self.header_data) + + def parse_first_layer_bed_temp(self) -> Optional[float]: + return regex_find_float(r"M190 S(%F)", self.header_data) + + def parse_chamber_temp(self) -> Optional[float]: + return regex_find_float(r"M191 S(%F)", self.header_data) + + def parse_nozzle_diameter(self) -> Optional[float]: + return regex_find_float( + r";Dimension:(?:\s\d+\.\d+){3}\s(%F)", self.header_data) + +class IceSL(BaseSlicer): + def check_identity(self, data) -> Optional[Dict[str, Any]]: + match = re.search(r"", data) + if match: + version = match.group(1) if match.group(1)[0].isdigit() else "-" + return { + 'slicer': "IceSL", + 'slicer_version': version + } + return None + + def parse_first_layer_height(self) -> Optional[float]: + return regex_find_float( + r";\sz_layer_height_first_layer_mm\s:\s+(%F)", + self.header_data) + + def parse_layer_height(self) -> Optional[float]: + self.layer_height = regex_find_float( + r";\sz_layer_height_mm\s:\s+(%F)", + self.header_data) + return self.layer_height + + def parse_object_height(self) -> Optional[float]: + return regex_find_float( + r";\sprint_height_mm\s:\s+(%F)", self.header_data) + + def parse_first_layer_extr_temp(self) -> Optional[float]: + return regex_find_float( + r";\sextruder_temp_degree_c_0\s:\s+(%F)", self.header_data) + + def parse_first_layer_bed_temp(self) -> Optional[float]: + return regex_find_float( + r";\sbed_temp_degree_c\s:\s+(%F)", self.header_data) + + def parse_chamber_temp(self) -> Optional[float]: + return regex_find_float( + r";\schamber_temp_degree_c\s:\s+(%F)", self.header_data) + + def parse_filament_total(self) -> Optional[float]: + return regex_find_float( + r";\sfilament_used_mm\s:\s+(%F)", self.header_data) + + def parse_filament_weight_total(self) -> Optional[float]: + return regex_find_float( + r";\sfilament_used_g\s:\s+(%F)", self.header_data) + + def parse_filament_name(self) -> Optional[str]: + return regex_find_string( + r";\sfilament_name\s:\s+(%S)", self.header_data) + + def parse_filament_type(self) -> Optional[str]: + return regex_find_string( + r";\sfilament_type\s:\s+(%S)", self.header_data) + + def parse_estimated_time(self) -> Optional[float]: + return regex_find_float( + r";\sestimated_print_time_s\s:\s+(%F)", self.header_data) + + def parse_layer_count(self) -> Optional[int]: + return regex_find_int( + r";\slayer_count\s:\s+(%D)", self.header_data) + + def parse_nozzle_diameter(self) -> Optional[float]: + return regex_find_float( + r";\snozzle_diameter_mm_0\s:\s+(%F)", self.header_data) + +class KiriMoto(BaseSlicer): + def check_identity(self, data) -> Optional[Dict[str, Any]]: + variants: Dict[str, str] = { + "Kiri:Moto": r"; Generated by Kiri:Moto (\d.+)", + "SimplyPrint": r"; Generated by Kiri:Moto \(SimplyPrint\) (.+)" + } + for name, pattern in variants.items(): + match = re.search(pattern, data) + if match: + return { + "slicer": name, + "slicer_version": match.group(1) + } + return None + + def parse_first_layer_height(self) -> Optional[float]: + return regex_find_float( + r"; firstSliceHeight = (%F)", self.header_data + ) + + def parse_layer_height(self) -> Optional[float]: + self.layer_height = regex_find_float( + r"; sliceHeight = (%F)", self.header_data + ) + return self.layer_height + + def parse_object_height(self) -> Optional[float]: + return regex_find_max_float( + r"G1 Z(%F) (?:; z-hop end|F\d+\n)", self.footer_data + ) + + def parse_layer_count(self) -> Optional[int]: + matches = re.findall( + r";; --- layer (\d+) \(.+", self.footer_data + ) + if not matches: + return None + try: + return int(matches[-1]) + 1 + except Exception: + return None + + def parse_estimated_time(self) -> Optional[float]: + return regex_find_int(r"; --- print time: (%D)s", self.footer_data) + + def parse_filament_total(self) -> Optional[float]: + return regex_find_float( + r"; --- filament used: (%F) mm", self.footer_data + ) + + def parse_first_layer_extr_temp(self) -> Optional[float]: + return regex_find_float( + r"; firstLayerNozzleTemp = (%F)", self.header_data + ) + + def parse_first_layer_bed_temp(self) -> Optional[float]: + return regex_find_float( + r"; firstLayerBedTemp = (%F)", self.header_data + ) + + +READ_SIZE = 1024 * 1024 # 1 MiB +SUPPORTED_SLICERS: List[Type[BaseSlicer]] = [ + PrusaSlicer, Slic3rPE, Slic3r, Cura, Simplify3D, + KISSlicer, IdeaMaker, IceSL, KiriMoto +] +SUPPORTED_DATA = [ + 'gimage', + 'simage', + 'gcode_start_byte', + 'gcode_end_byte', + 'layer_count', + 'object_height', + 'estimated_time', + 'nozzle_diameter', + 'layer_height', + 'first_layer_height', + 'first_layer_extr_temp', + 'first_layer_bed_temp', + 'chamber_temp', + 'filament_name', + 'filament_type', + 'filament_total', + 'filament_weight_total', + 'preset_colours', + 'used_extruders', + 'thumbnails'] + +def process_objects(file_path: str, slicer: BaseSlicer, name: str) -> bool: + try: + from preprocess_cancellation import ( + preprocess_slicer, + preprocess_cura, + preprocess_ideamaker, + preprocess_m486 + ) + except ImportError: + logger.info("Module 'preprocess-cancellation' failed to load") + return False + fname = os.path.basename(file_path) + logger.info( + f"Performing Object Processing on file: {fname}, " + f"sliced by {name}" + ) + with tempfile.TemporaryDirectory() as tmp_dir_name: + tmp_file = os.path.join(tmp_dir_name, fname) + with open(file_path, 'r') as in_file: + with open(tmp_file, 'w') as out_file: + try: + if slicer.has_m486_objects: + processor = preprocess_m486 + elif isinstance(slicer, PrusaSlicer): + processor = preprocess_slicer + elif isinstance(slicer, Cura): + processor = preprocess_cura + elif isinstance(slicer, IdeaMaker): + processor = preprocess_ideamaker + else: + logger.info( + f"Object Processing Failed, slicer {name}" + "not supported" + ) + return False + for line in processor(in_file): + out_file.write(line) + except Exception as e: + logger.info(f"Object processing failed: {e}") + return False + if os.path.islink(file_path): + file_path = os.path.realpath(file_path) + shutil.move(tmp_file, file_path) + return True + +def get_slicer(file_path: str) -> Tuple[BaseSlicer, Dict[str, str]]: + header_data = footer_data = "" + slicer: Optional[BaseSlicer] = None + size = os.path.getsize(file_path) + with open(file_path, 'r') as f: + # read the default size, which should be enough to + # identify the slicer + header_data = f.read(READ_SIZE) + for impl in SUPPORTED_SLICERS: + slicer = impl(file_path) + ident = slicer.check_identity(header_data) + if ident is not None: + break + else: + slicer = UnknownSlicer(file_path) + ident = slicer.check_identity(header_data) + if size > READ_SIZE * 2: + f.seek(size - READ_SIZE) + footer_data = f.read() + elif size > READ_SIZE: + remaining = size - READ_SIZE + footer_data = header_data[remaining - READ_SIZE:] + f.read() + else: + footer_data = header_data + slicer.set_data(header_data, footer_data, size) + if ident is None: + ident = {"slicer": "unknown"} + return slicer, ident + +def extract_metadata( + file_path: str, check_objects: bool +) -> Dict[str, Any]: + metadata: Dict[str, Any] = {} + slicer, ident = get_slicer(file_path) + if check_objects and slicer.has_objects(): + name = ident.get("slicer", "unknown") + if process_objects(file_path, slicer, name): + slicer, ident = get_slicer(file_path) + metadata['size'] = os.path.getsize(file_path) + metadata['modified'] = os.path.getmtime(file_path) + metadata['uuid'] = str(uuid.uuid4()) + metadata.update(ident) + for key in SUPPORTED_DATA: + func = getattr(slicer, "parse_" + key) + result = func() + if result is not None: + metadata[key] = result + return metadata + +def extract_ufp(ufp_path: str, dest_path: str) -> None: + if not os.path.isfile(ufp_path): + logger.info(f"UFP file Not Found: {ufp_path}") + sys.exit(-1) + thumb_name = os.path.splitext( + os.path.basename(dest_path))[0] + ".png" + dest_thumb_dir = os.path.join(os.path.dirname(dest_path), ".thumbs") + dest_thumb_path = os.path.join(dest_thumb_dir, thumb_name) + try: + with tempfile.TemporaryDirectory() as tmp_dir_name: + tmp_thumb_path = "" + with zipfile.ZipFile(ufp_path) as zf: + tmp_model_path = zf.extract( + UFP_MODEL_PATH, path=tmp_dir_name) + if UFP_THUMB_PATH in zf.namelist(): + tmp_thumb_path = zf.extract( + UFP_THUMB_PATH, path=tmp_dir_name) + if os.path.islink(dest_path): + dest_path = os.path.realpath(dest_path) + shutil.move(tmp_model_path, dest_path) + if tmp_thumb_path: + if not os.path.exists(dest_thumb_dir): + os.mkdir(dest_thumb_dir) + shutil.move(tmp_thumb_path, dest_thumb_path) + except Exception: + logger.info(traceback.format_exc()) + sys.exit(-1) + try: + os.remove(ufp_path) + except Exception: + logger.info(f"Error removing ufp file: {ufp_path}") + +def main(path: str, + filename: str, + ufp: Optional[str], + check_objects: bool + ) -> None: + file_path = os.path.join(path, filename) + if ufp is not None: + extract_ufp(ufp, file_path) + metadata: Dict[str, Any] = {} + if not os.path.isfile(file_path): + logger.info(f"File Not Found: {file_path}") + sys.exit(-1) + try: + metadata = extract_metadata(file_path, check_objects) + except Exception: + logger.info(traceback.format_exc()) + sys.exit(-1) + fd = sys.stdout.fileno() + data = json.dumps( + {'file': filename, 'metadata': metadata}).encode() + while data: + try: + ret = os.write(fd, data) + except OSError: + continue + data = data[ret:] + + +if __name__ == "__main__": + # Parse start arguments + parser = argparse.ArgumentParser( + description="GCode Metadata Extraction Utility") + parser.add_argument( + "-f", "--filename", metavar='', + help="name gcode file to parse") + parser.add_argument( + "-p", "--path", default=os.path.abspath(os.path.dirname(__file__)), + metavar='', + help="optional absolute path for file" + ) + parser.add_argument( + "-u", "--ufp", metavar="", default=None, + help="optional path of ufp file to extract" + ) + parser.add_argument( + "-o", "--check-objects", dest='check_objects', action='store_true', + help="process gcode file for exclude opbject functionality") + args = parser.parse_args() + check_objects = args.check_objects + enabled_msg = "enabled" if check_objects else "disabled" + logger.info(f"Object Processing is {enabled_msg}") + main(args.path, args.filename, args.ufp, check_objects) diff --git a/moonraker/klippy_apis.py b/moonraker/klippy_apis.py new file mode 100644 index 0000000..8d3855b --- /dev/null +++ b/moonraker/klippy_apis.py @@ -0,0 +1,354 @@ +# Helper for Moonraker to Klippy API calls. +# +# Copyright (C) 2020 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations +import logging +from ..utils import Sentinel +from ..common import WebRequest, APITransport, RequestType +import os +import shutil +import json + +# Annotation imports +from typing import ( + TYPE_CHECKING, + Any, + Union, + Optional, + Dict, + List, + TypeVar, + Mapping, + Callable, + Coroutine +) +if TYPE_CHECKING: + from ..confighelper import ConfigHelper + from ..common import UserInfo + from .klippy_connection import KlippyConnection as Klippy + from .file_manager.file_manager import FileManager + Subscription = Dict[str, Optional[List[Any]]] + SubCallback = Callable[[Dict[str, Dict[str, Any]], float], Optional[Coroutine]] + _T = TypeVar("_T") + +INFO_ENDPOINT = "info" +ESTOP_ENDPOINT = "emergency_stop" +LIST_EPS_ENDPOINT = "list_endpoints" +GC_OUTPUT_ENDPOINT = "gcode/subscribe_output" +GCODE_ENDPOINT = "gcode/script" +SUBSCRIPTION_ENDPOINT = "objects/subscribe" +STATUS_ENDPOINT = "objects/query" +OBJ_LIST_ENDPOINT = "objects/list" +REG_METHOD_ENDPOINT = "register_remote_method" + +class KlippyAPI(APITransport): + def __init__(self, config: ConfigHelper) -> None: + self.server = config.get_server() + self.klippy: Klippy = self.server.lookup_component("klippy_connection") + self.fm: FileManager = self.server.lookup_component("file_manager") + self.eventloop = self.server.get_event_loop() + app_args = self.server.get_app_args() + self.version = app_args.get('software_version') + # Maintain a subscription for all moonraker requests, as + # we do not want to overwrite them + self.host_subscription: Subscription = {} + self.subscription_callbacks: List[SubCallback] = [] + + # Register GCode Aliases + self.server.register_endpoint( + "/printer/print/pause", RequestType.POST, self._gcode_pause + ) + self.server.register_endpoint( + "/printer/print/resume", RequestType.POST, self._gcode_resume + ) + self.server.register_endpoint( + "/printer/print/cancel", RequestType.POST, self._gcode_cancel + ) + self.server.register_endpoint( + "/printer/print/start", RequestType.POST, self._gcode_start_print + ) + self.server.register_endpoint( + "/printer/restart", RequestType.POST, self._gcode_restart + ) + self.server.register_endpoint( + "/printer/firmware_restart", RequestType.POST, self._gcode_firmware_restart + ) + self.server.register_event_handler( + "server:klippy_disconnect", self._on_klippy_disconnect + ) + self.server.register_endpoint( + "/printer/list_endpoints", RequestType.GET, self.list_endpoints + ) + self.server.register_endpoint( + "/printer/breakheater", RequestType.POST, self.breakheater + ) + self.server.register_endpoint( + "/printer/breakmacro", RequestType.POST, self.breakmacro + ) + + def _on_klippy_disconnect(self) -> None: + self.host_subscription.clear() + self.subscription_callbacks.clear() + + async def _gcode_pause(self, web_request: WebRequest) -> str: + return await self.pause_print() + + async def _gcode_resume(self, web_request: WebRequest) -> str: + return await self.resume_print() + + async def _gcode_cancel(self, web_request: WebRequest) -> str: + return await self.cancel_print() + + async def _gcode_start_print(self, web_request: WebRequest) -> str: + filename: str = web_request.get_str('filename') + user = web_request.get_current_user() + return await self.start_print(filename, user=user) + + async def _gcode_restart(self, web_request: WebRequest) -> str: + return await self.do_restart("RESTART") + + async def _gcode_firmware_restart(self, web_request: WebRequest) -> str: + return await self.do_restart("FIRMWARE_RESTART") + + async def _send_klippy_request( + self, + method: str, + params: Dict[str, Any], + default: Any = Sentinel.MISSING, + transport: Optional[APITransport] = None + ) -> Any: + try: + req = WebRequest(method, params, transport=transport or self) + result = await self.klippy.request(req) + except self.server.error: + if default is Sentinel.MISSING: + raise + result = default + return result + + async def run_gcode(self, + script: str, + default: Any = Sentinel.MISSING + ) -> str: + params = {'script': script} + result = await self._send_klippy_request( + GCODE_ENDPOINT, params, default) + return result + + async def start_print( + self, + filename: str, + wait_klippy_started: bool = False, + user: Optional[UserInfo] = None + ) -> str: + # WARNING: Do not call this method from within the following + # event handlers when "wait_klippy_started" is set to True: + # klippy_identified, klippy_started, klippy_ready, klippy_disconnect + # Doing so will result in "wait_started" blocking for the specifed + # timeout (default 20s) and returning False. + # XXX - validate that file is on disk + if filename[0] == '/': + filename = filename[1:] + # Escape existing double quotes in the file name + filename = filename.replace("\"", "\\\"") + homedir = os.path.expanduser("~") + if os.path.split(filename)[0].split(os.path.sep)[0] != ".cache": + base_path = os.path.join(homedir, "printer_data/gcodes") + target = os.path.join(".cache", os.path.basename(filename)) + cache_path = os.path.join(base_path, ".cache") + if not os.path.exists(cache_path): + os.makedirs(cache_path) + shutil.rmtree(cache_path) + os.makedirs(cache_path) + metadata = self.fm.gcode_metadata.metadata.get(filename, None) + self.copy_file_to_cache(os.path.join(base_path, filename), os.path.join(base_path, target)) + msg = "// metadata=" + json.dumps(metadata) + self.server.send_event("server:gcode_response", msg) + filename = target + script = f'SDCARD_PRINT_FILE FILENAME="{filename}"' + if wait_klippy_started: + await self.klippy.wait_started() + logging.info(f"Requesting Job Start, filename = {filename}") + ret = await self.run_gcode(script) + self.server.send_event("klippy_apis:job_start_complete", user) + return ret + + async def pause_print( + self, default: Union[Sentinel, _T] = Sentinel.MISSING + ) -> Union[_T, str]: + self.server.send_event("klippy_apis:pause_requested") + logging.info("Requesting job pause...") + return await self._send_klippy_request( + "pause_resume/pause", {}, default) + + async def resume_print( + self, default: Union[Sentinel, _T] = Sentinel.MISSING + ) -> Union[_T, str]: + self.server.send_event("klippy_apis:resume_requested") + logging.info("Requesting job resume...") + return await self._send_klippy_request( + "pause_resume/resume", {}, default) + + async def cancel_print( + self, default: Union[Sentinel, _T] = Sentinel.MISSING + ) -> Union[_T, str]: + self.server.send_event("klippy_apis:cancel_requested") + logging.info("Requesting job cancel...") + await self._send_klippy_request( + "breakmacro", {}, default) + await self._send_klippy_request( + "breakheater", {}, default) + return await self._send_klippy_request( + "pause_resume/cancel", {}, default) + + async def breakheater( + self, default: Union[Sentinel, _T] = Sentinel.MISSING + ) -> Union[_T, str]: + return await self._send_klippy_request( + "breakheater", {}, default) + + async def breakmacro( + self, default: Union[Sentinel, _T] = Sentinel.MISSING + ) -> Union[_T, str]: + return await self._send_klippy_request( + "breakmacro", {}, default) + + async def do_restart( + self, gc: str, wait_klippy_started: bool = False + ) -> str: + # WARNING: Do not call this method from within the following + # event handlers when "wait_klippy_started" is set to True: + # klippy_identified, klippy_started, klippy_ready, klippy_disconnect + # Doing so will result in "wait_started" blocking for the specifed + # timeout (default 20s) and returning False. + if wait_klippy_started: + await self.klippy.wait_started() + try: + result = await self.run_gcode(gc) + except self.server.error as e: + if str(e) == "Klippy Disconnected": + result = "ok" + else: + raise + return result + + async def list_endpoints(self, + default: Union[Sentinel, _T] = Sentinel.MISSING + ) -> Union[_T, Dict[str, List[str]]]: + return await self._send_klippy_request( + LIST_EPS_ENDPOINT, {}, default) + + async def emergency_stop(self) -> str: + return await self._send_klippy_request(ESTOP_ENDPOINT, {}) + + async def get_klippy_info(self, + send_id: bool = False, + default: Union[Sentinel, _T] = Sentinel.MISSING + ) -> Union[_T, Dict[str, Any]]: + params = {} + if send_id: + ver = self.version + params = {'client_info': {'program': "Moonraker", 'version': ver}} + return await self._send_klippy_request(INFO_ENDPOINT, params, default) + + async def get_object_list(self, + default: Union[Sentinel, _T] = Sentinel.MISSING + ) -> Union[_T, List[str]]: + result = await self._send_klippy_request( + OBJ_LIST_ENDPOINT, {}, default) + if isinstance(result, dict) and 'objects' in result: + return result['objects'] + if default is not Sentinel.MISSING: + return default + raise self.server.error("Invalid response received from Klippy", 500) + + async def query_objects(self, + objects: Mapping[str, Optional[List[str]]], + default: Union[Sentinel, _T] = Sentinel.MISSING + ) -> Union[_T, Dict[str, Any]]: + params = {'objects': objects} + result = await self._send_klippy_request( + STATUS_ENDPOINT, params, default) + if isinstance(result, dict) and "status" in result: + return result["status"] + if default is not Sentinel.MISSING: + return default + raise self.server.error("Invalid response received from Klippy", 500) + + async def subscribe_objects( + self, + objects: Mapping[str, Optional[List[str]]], + callback: Optional[SubCallback] = None, + default: Union[Sentinel, _T] = Sentinel.MISSING + ) -> Union[_T, Dict[str, Any]]: + # The host transport shares subscriptions amongst all components + for obj, items in objects.items(): + if obj in self.host_subscription: + prev = self.host_subscription[obj] + if items is None or prev is None: + self.host_subscription[obj] = None + else: + uitems = list(set(prev) | set(items)) + self.host_subscription[obj] = uitems + else: + self.host_subscription[obj] = items + params = {"objects": dict(self.host_subscription)} + result = await self._send_klippy_request(SUBSCRIPTION_ENDPOINT, params, default) + if isinstance(result, dict) and "status" in result: + if callback is not None: + self.subscription_callbacks.append(callback) + return result["status"] + if default is not Sentinel.MISSING: + return default + raise self.server.error("Invalid response received from Klippy", 500) + + async def subscribe_from_transport( + self, + objects: Mapping[str, Optional[List[str]]], + transport: APITransport, + default: Union[Sentinel, _T] = Sentinel.MISSING, + ) -> Union[_T, Dict[str, Any]]: + params = {"objects": dict(objects)} + result = await self._send_klippy_request( + SUBSCRIPTION_ENDPOINT, params, default, transport + ) + if isinstance(result, dict) and "status" in result: + return result["status"] + if default is not Sentinel.MISSING: + return default + raise self.server.error("Invalid response received from Klippy", 500) + + async def subscribe_gcode_output(self) -> str: + template = {'response_template': + {'method': "process_gcode_response"}} + return await self._send_klippy_request(GC_OUTPUT_ENDPOINT, template) + + async def register_method(self, method_name: str) -> str: + return await self._send_klippy_request( + REG_METHOD_ENDPOINT, + {'response_template': {"method": method_name}, + 'remote_method': method_name}) + + def send_status( + self, status: Dict[str, Any], eventtime: float + ) -> None: + for cb in self.subscription_callbacks: + self.eventloop.register_callback(cb, status, eventtime) + self.server.send_event("server:status_update", status) + + def copy_file_to_cache(self, origin, target): + stat = os.statvfs("/") + free_space = stat.f_frsize * stat.f_bfree + filesize = os.path.getsize(os.path.join(origin)) + if (filesize < free_space): + shutil.copy(origin, target) + else: + msg = "!! Insufficient disk space, unable to read the file." + self.server.send_event("server:gcode_response", msg) + raise self.server.error("Insufficient disk space, unable to read the file.", 500) + +def load_component(config: ConfigHelper) -> KlippyAPI: + return KlippyAPI(config) diff --git a/moonraker/machine.py b/moonraker/machine.py new file mode 100644 index 0000000..4556b4d --- /dev/null +++ b/moonraker/machine.py @@ -0,0 +1,2184 @@ +# Machine manipulation request handlers +# +# Copyright (C) 2020 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations +import sys +import os +import re +import pathlib +import logging +import asyncio +import platform +import socket +import ipaddress +import time +import shutil +import distro +import tempfile +import getpass +import configparser +from ..confighelper import FileSourceWrapper +from ..utils import source_info, cansocket, sysfs_devs, load_system_module +from ..utils import json_wrapper as jsonw +from ..common import RequestType + +# Annotation imports +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Dict, + List, + Optional, + Tuple, + Union, + cast +) + +if TYPE_CHECKING: + from ..confighelper import ConfigHelper + from ..common import WebRequest + from .application import MoonrakerApp + from .klippy_connection import KlippyConnection + from .http_client import HttpClient + from .shell_command import ShellCommandFactory as SCMDComp + from .database import MoonrakerDatabase + from .file_manager.file_manager import FileManager + from .announcements import Announcements + from .proc_stats import ProcStats + from .dbus_manager import DbusManager + from dbus_next.aio.proxy_object import ProxyInterface + from dbus_next.signature import Variant + SudoReturn = Union[Awaitable[Tuple[str, bool]], Tuple[str, bool]] + SudoCallback = Callable[[], SudoReturn] + +CGROUP_PATH = "/proc/1/cgroup" +SCHED_PATH = "/proc/1/sched" +SYSTEMD_PATH = "/etc/systemd/system" +SD_CID_PATH = "/sys/block/mmcblk0/device/cid" +SD_CSD_PATH = "/sys/block/mmcblk0/device/csd" +SD_MFGRS = { + '1b': "Samsung", + '03': "Sandisk", + '74': "PNY" +} +IP_FAMILIES = {'inet': 'ipv4', 'inet6': 'ipv6'} +NETWORK_UPDATE_SEQUENCE = 10 +SERVICE_PROPERTIES = [ + "Requires", "After", "SupplementaryGroups", "EnvironmentFiles", + "ExecStart", "WorkingDirectory", "FragmentPath", "Description", + "User" +] +USB_IDS_URL = "http://www.linux-usb.org/usb.ids" + +class Machine: + def __init__(self, config: ConfigHelper) -> None: + self.server = config.get_server() + self._allowed_services: List[str] = [] + self._init_allowed_services() + dist_info: Dict[str, Any] + dist_info = {'name': distro.name(pretty=True)} + dist_info.update(distro.info()) + dist_info['release_info'] = distro.distro_release_info() + dist_info['kernel_version'] = platform.release() + self.inside_container = False + self.moonraker_service_info: Dict[str, Any] = {} + self.sudo_req_lock = asyncio.Lock() + self.periph_lock = asyncio.Lock() + self._sudo_password: Optional[str] = None + sudo_template = config.gettemplate("sudo_password", None) + if sudo_template is not None: + self._sudo_password = sudo_template.render() + self._public_ip = "" + self.system_info: Dict[str, Any] = { + 'python': { + "version": tuple(sys.version_info), + "version_string": sys.version.replace("\n", " ") + }, + 'cpu_info': self._get_cpu_info(), + 'sd_info': self._get_sdcard_info(), + 'distribution': dist_info, + 'virtualization': self._check_inside_container(), + 'network': {}, + 'canbus': {} + } + self._update_log_rollover(log=True) + providers: Dict[str, type] = { + "none": BaseProvider, + "systemd_cli": SystemdCliProvider, + "systemd_dbus": SystemdDbusProvider, + "supervisord_cli": SupervisordCliProvider + } + self.provider_type = config.get('provider', 'systemd_dbus') + pclass = providers.get(self.provider_type) + if pclass is None: + raise config.error(f"Invalid Provider: {self.provider_type}") + self.sys_provider: BaseProvider = pclass(config) + self.system_info["provider"] = self.provider_type + logging.info(f"Using System Provider: {self.provider_type}") + self.validator = InstallValidator(config) + self.sudo_requests: List[Tuple[SudoCallback, str]] = [] + + self.server.register_endpoint( + "/machine/reboot", RequestType.POST, self._handle_machine_request + ) + self.server.register_endpoint( + "/machine/shutdown", RequestType.POST, self._handle_machine_request + ) + self.server.register_endpoint( + "/machine/services/restart", RequestType.POST, self._handle_service_request + ) + self.server.register_endpoint( + "/machine/services/stop", RequestType.POST, self._handle_service_request + ) + self.server.register_endpoint( + "/machine/services/start", RequestType.POST, self._handle_service_request + ) + self.server.register_endpoint( + "/machine/system_info", RequestType.GET, self._handle_sysinfo_request + ) + self.server.register_endpoint( + "/machine/sudo/info", RequestType.GET, self._handle_sudo_info + ) + self.server.register_endpoint( + "/machine/sudo/password", RequestType.POST, self._set_sudo_password + ) + self.server.register_endpoint( + "/machine/peripherals/serial", RequestType.GET, self._handle_serial_request + ) + self.server.register_endpoint( + "/machine/peripherals/usb", RequestType.GET, self._handle_usb_request + ) + self.server.register_endpoint( + "/machine/peripherals/canbus", RequestType.GET, self._handle_can_query + ) + self.server.register_endpoint( + "/machine/peripherals/video", RequestType.GET, self._handle_video_request + ) + + self.server.register_notification("machine:service_state_changed") + self.server.register_notification("machine:sudo_alert") + + # Register remote methods + self.server.register_remote_method( + "shutdown_machine", self.sys_provider.shutdown) + self.server.register_remote_method( + "reboot_machine", self.sys_provider.reboot) + + # IP network shell commands + shell_cmd: SCMDComp = self.server.load_component( + config, 'shell_command') + self.addr_cmd = shell_cmd.build_shell_command("ip -json -det address") + iwgetbin = "/sbin/iwgetid" + if not pathlib.Path(iwgetbin).exists(): + iwgetbin = "iwgetid" + self.iwgetid_cmd = shell_cmd.build_shell_command(iwgetbin) + self.init_evt = asyncio.Event() + self.libcam = self._try_import_libcamera() + + def _init_allowed_services(self) -> None: + app_args = self.server.get_app_args() + data_path = app_args["data_path"] + fpath = pathlib.Path(data_path).joinpath("moonraker.asvc") + fm: FileManager = self.server.lookup_component("file_manager") + fm.add_reserved_path("allowed_services", fpath, False) + default_svcs = source_info.read_asset("default_allowed_services") or "" + try: + if not fpath.exists(): + fpath.write_text(default_svcs) + data = fpath.read_text() + except Exception: + logging.exception("Failed to read moonraker.asvc") + data = default_svcs + svcs = [svc.strip() for svc in data.split("\n") if svc.strip()] + for svc in svcs: + if svc.endswith(".service"): + svc = svc.rsplit(".", 1)[0] + if svc not in self._allowed_services: + self._allowed_services.append(svc) + + def _update_log_rollover(self, log: bool = False) -> None: + sys_info_msg = "\nSystem Info:" + for header, info in self.system_info.items(): + sys_info_msg += f"\n\n***{header}***" + if not isinstance(info, dict): + sys_info_msg += f"\n {repr(info)}" + else: + for key, val in info.items(): + sys_info_msg += f"\n {key}: {val}" + sys_info_msg += "\n\n***Allowed Services***" + for svc in self._allowed_services: + sys_info_msg += f"\n {svc}" + self.server.add_log_rollover_item('system_info', sys_info_msg, log=log) + + def _try_import_libcamera(self) -> Any: + try: + libcam = load_system_module("libcamera") + cmgr = libcam.CameraManager.singleton() + self.server.add_log_rollover_item( + "libcamera", + f"Found libcamera Python module, version: {cmgr.version}" + ) + return libcam + except Exception: + if self.server.is_verbose_enabled(): + logging.exception("Failed to import libcamera") + self.server.add_log_rollover_item( + "libcamera", "Module libcamera unavailble, import failed" + ) + return None + + @property + def public_ip(self) -> str: + return self._public_ip + + @property + def unit_name(self) -> str: + svc_info = self.moonraker_service_info + unit_name = svc_info.get("unit_name", "moonraker.service") + return unit_name.split(".", 1)[0] + + def is_service_allowed(self, service: str) -> bool: + return ( + service in self._allowed_services or + re.match(r"moonraker[_-]?\d*", service) is not None or + re.match(r"klipper[_-]?\d*", service) is not None + ) + + def validation_enabled(self) -> bool: + return self.validator.validation_enabled + + def get_system_provider(self): + return self.sys_provider + + def is_inside_container(self): + return self.inside_container + + def get_provider_type(self): + return self.provider_type + + def get_moonraker_service_info(self): + return dict(self.moonraker_service_info) + + async def wait_for_init( + self, timeout: Optional[float] = None + ) -> None: + try: + await asyncio.wait_for(self.init_evt.wait(), timeout) + except asyncio.TimeoutError: + pass + + async def component_init(self) -> None: + eventloop = self.server.get_event_loop() + eventloop.create_task(self.update_usb_ids()) + await self.validator.validation_init() + await self.sys_provider.initialize() + if not self.inside_container: + virt_info = await self.sys_provider.check_virt_status() + self.system_info['virtualization'] = virt_info + await self._parse_network_interfaces(0, notify=False) + pstats: ProcStats = self.server.lookup_component('proc_stats') + pstats.register_stat_callback(self._parse_network_interfaces) + available_svcs = self.sys_provider.get_available_services() + avail_list = list(available_svcs.keys()) + self.system_info['available_services'] = avail_list + self.system_info['service_state'] = available_svcs + svc_info = await self.sys_provider.extract_service_info( + "moonraker", os.getpid() + ) + self.moonraker_service_info = svc_info + self.log_service_info(svc_info) + self.init_evt.set() + + async def validate_installation(self) -> bool: + return await self.validator.perform_validation() + + async def on_exit(self) -> None: + await self.validator.remove_announcement() + + async def _handle_machine_request(self, web_request: WebRequest) -> str: + ep = web_request.get_endpoint() + if self.inside_container: + virt_id = self.system_info['virtualization'].get( + 'virt_identifier', "none") + raise self.server.error( + f"Cannot {ep.split('/')[-1]} from within a " + f"{virt_id} container") + if ep == "/machine/shutdown": + await self.sys_provider.shutdown() + elif ep == "/machine/reboot": + await self.sys_provider.reboot() + else: + raise self.server.error("Unsupported machine request") + return "ok" + + async def do_service_action(self, + action: str, + service_name: str + ) -> None: + await self.sys_provider.do_service_action(action, service_name) + + def restart_moonraker_service(self): + async def wrapper(): + try: + await self.do_service_action("restart", self.unit_name) + except Exception: + pass + self.server.get_event_loop().create_task(wrapper()) + + async def _handle_service_request(self, web_request: WebRequest) -> str: + name: str = web_request.get_str('service') + action = web_request.get_endpoint().split('/')[-1] + if name == self.unit_name: + if action != "restart": + raise self.server.error( + f"Service action '{action}' not available for moonraker") + self.restart_moonraker_service() + elif self.sys_provider.is_service_available(name): + await self.do_service_action(action, name) + else: + if name in self._allowed_services: + raise self.server.error(f"Service '{name}' not installed") + raise self.server.error( + f"Service '{name}' not allowed") + return "ok" + + async def _handle_sysinfo_request(self, + web_request: WebRequest + ) -> Dict[str, Any]: + kconn: KlippyConnection + kconn = self.server.lookup_component("klippy_connection") + sys_info = self.system_info.copy() + sys_info["instance_ids"] = { + "moonraker": self.unit_name, + "klipper": kconn.unit_name + } + # Used for Qidi Slicer searching device + dev_name = web_request.get_str('dev_name',default=None) + if dev_name !=None: + Note=open('/dev_info.txt',mode='w') + Note.write(dev_name) + Note.close() + with open('/dev_info.txt', 'r') as f: + content = f.read() + f.close() + self.system_info["machine_name"] = content + return {"system_info": sys_info} + + async def _set_sudo_password( + self, web_request: WebRequest + ) -> Dict[str, Any]: + async with self.sudo_req_lock: + self._sudo_password = web_request.get_str("password") + if not await self.check_sudo_access(): + self._sudo_password = None + raise self.server.error( + "Invalid password, sudo access was denied" + ) + sudo_responses = ["Sudo password successfully set."] + restart: bool = False + failed: List[Tuple[SudoCallback, str]] = [] + failed_msgs: List[str] = [] + if self.sudo_requests: + while self.sudo_requests: + cb, msg = self.sudo_requests.pop(0) + try: + ret = cb() + if isinstance(ret, Awaitable): + ret = await ret + msg, need_restart = ret + sudo_responses.append(msg) + restart |= need_restart + except asyncio.CancelledError: + raise + except Exception as e: + failed.append((cb, msg)) + failed_msgs.append(str(e)) + restart = False if len(failed) > 0 else restart + self.sudo_requests = failed + if not restart and len(sudo_responses) > 1: + # at least one successful response and not restarting + eventloop = self.server.get_event_loop() + eventloop.delay_callback( + .05, self.server.send_event, + "machine:sudo_alert", + { + "sudo_requested": self.sudo_requested, + "request_messages": self.sudo_request_messages + } + ) + if failed_msgs: + err_msg = "\n".join(failed_msgs) + raise self.server.error(err_msg, 500) + if restart: + self.restart_moonraker_service() + sudo_responses.append( + "Moonraker is currently in the process of restarting." + ) + return { + "sudo_responses": sudo_responses, + "is_restarting": restart + } + + async def _handle_sudo_info( + self, web_request: WebRequest + ) -> Dict[str, Any]: + check_access = web_request.get_boolean("check_access", False) + has_sudo: Optional[bool] = None + if check_access: + has_sudo = await self.check_sudo_access() + return { + "sudo_access": has_sudo, + "linux_user": self.linux_user, + "sudo_requested": self.sudo_requested, + "request_messages": self.sudo_request_messages + } + + async def _handle_serial_request(self, web_request: WebRequest) -> Dict[str, Any]: + return { + "serial_devices": await self.detect_serial_devices() + } + + async def _handle_usb_request(self, web_request: WebRequest) -> Dict[str, Any]: + return { + "usb_devices": await self.detect_usb_devices() + } + + async def _handle_can_query(self, web_request: WebRequest) -> Dict[str, Any]: + interface = web_request.get_str("interface", "can0") + return { + "can_uuids": await self.query_can_uuids(interface) + } + + async def _handle_video_request(self, web_request: WebRequest) -> Dict[str, Any]: + return await self.detect_video_devices() + + def get_system_info(self) -> Dict[str, Any]: + return self.system_info + + @property + def sudo_password(self) -> Optional[str]: + return self._sudo_password + + @sudo_password.setter + def sudo_password(self, pwd: Optional[str]) -> None: + self._sudo_password = pwd + + @property + def sudo_requested(self) -> bool: + return len(self.sudo_requests) > 0 + + @property + def linux_user(self) -> str: + return getpass.getuser() + + @property + def sudo_request_messages(self) -> List[str]: + return [req[1] for req in self.sudo_requests] + + def register_sudo_request( + self, callback: SudoCallback, message: str + ) -> None: + self.sudo_requests.append((callback, message)) + self.server.send_event( + "machine:sudo_alert", + { + "sudo_requested": True, + "request_messages": self.sudo_request_messages + } + ) + + async def check_sudo_access(self, cmds: List[str] = []) -> bool: + if not cmds: + cmds = ["systemctl --version", "ls /root"] + shell_cmd: SCMDComp = self.server.lookup_component("shell_command") + for cmd in cmds: + try: + await self.exec_sudo_command(cmd, timeout=10.) + except shell_cmd.error: + return False + return True + + async def exec_sudo_command( + self, command: str, tries: int = 1, timeout=2. + ) -> str: + proc_input = None + full_cmd = f"sudo {command}" + if self._sudo_password is not None: + proc_input = self._sudo_password + full_cmd = f"sudo -S {command}" + shell_cmd: SCMDComp = self.server.lookup_component("shell_command") + return await shell_cmd.exec_cmd( + full_cmd, proc_input=proc_input, log_complete=False, attempts=tries, + timeout=timeout + ) + + def _get_sdcard_info(self) -> Dict[str, Any]: + sd_info: Dict[str, Any] = {} + cid_file = pathlib.Path(SD_CID_PATH) + if not cid_file.exists(): + # No SDCard detected at mmcblk0 + return {} + try: + cid_text = cid_file.read_text().strip().lower() + mid = cid_text[:2] + sd_info['manufacturer_id'] = mid + sd_info['manufacturer'] = SD_MFGRS.get(mid, "Unknown") + sd_info['oem_id'] = cid_text[2:6] + sd_info['product_name'] = bytes.fromhex(cid_text[6:16]).decode( + encoding="ascii", errors="ignore") + sd_info['product_revision'] = \ + f"{int(cid_text[16], 16)}.{int(cid_text[17], 16)}" + sd_info['serial_number'] = cid_text[18:26] + mfg_year = int(cid_text[27:29], 16) + 2000 + mfg_month = int(cid_text[29], 16) + sd_info['manufacturer_date'] = f"{mfg_month}/{mfg_year}" + except Exception: + logging.info("Error reading SDCard CID Register") + return {} + sd_info['capacity'] = "Unknown" + sd_info['total_bytes'] = 0 + csd_file = pathlib.Path(SD_CSD_PATH) + # Read CSD Register + try: + csd_reg = bytes.fromhex(csd_file.read_text().strip()) + csd_type = (csd_reg[0] >> 6) & 0x3 + if csd_type == 0: + # Standard Capacity (CSD Version 1.0) + max_block_len: int = 2**(csd_reg[5] & 0xF) + c_size = ((csd_reg[6] & 0x3) << 10) | (csd_reg[7] << 2) | \ + ((csd_reg[8] >> 6) & 0x3) + c_mult_reg = ((csd_reg[9] & 0x3) << 1) | (csd_reg[10] >> 7) + c_mult = 2**(c_mult_reg + 2) + total_bytes: int = (c_size + 1) * c_mult * max_block_len + sd_info['capacity'] = f"{(total_bytes / (1024.0**2)):.1f} MiB" + elif csd_type == 1: + # High Capacity (CSD Version 2.0) + c_size = ((csd_reg[7] & 0x3F) << 16) | (csd_reg[8] << 8) | \ + csd_reg[9] + total_bytes = (c_size + 1) * 512 * 1024 + sd_info['capacity'] = f"{(total_bytes / (1024.0**3)):.1f} GiB" + elif csd_type == 2: + # Ultra Capacity (CSD Version 3.0) + c_size = ((csd_reg[6]) & 0xF) << 24 | (csd_reg[7] << 16) | \ + (csd_reg[8] << 8) | csd_reg[9] + total_bytes = (c_size + 1) * 512 * 1024 + sd_info['capacity'] = f"{(total_bytes / (1024.0**4)):.1f} TiB" + else: + # Invalid CSD, skip capacity check + return sd_info + sd_info['total_bytes'] = total_bytes + except Exception: + logging.info("Error Reading SDCard CSD Register") + return sd_info + + def _get_cpu_info(self) -> Dict[str, Any]: + cpu_file = pathlib.Path("/proc/cpuinfo") + mem_file = pathlib.Path("/proc/meminfo") + cpu_info = { + 'cpu_count': os.cpu_count(), + 'bits': platform.architecture()[0], + 'processor': platform.processor() or platform.machine(), + 'cpu_desc': "", + 'serial_number': "", + 'hardware_desc': "", + 'model': "", + 'total_memory': None, + 'memory_units': "" + } + if cpu_file.exists(): + try: + cpu_text = cpu_file.read_text().strip() + cpu_items = [item.strip() for item in cpu_text.split("\n\n") + if item.strip()] + for item in cpu_items: + cpu_desc_match = re.search(r"model name\s+:\s+(.+)", item) + if cpu_desc_match is not None: + cpu_info['cpu_desc'] = cpu_desc_match.group(1).strip() + break + hw_match = re.search(r"Hardware\s+:\s+(.+)", cpu_items[-1]) + if hw_match is not None: + cpu_info['hardware_desc'] = hw_match.group(1).strip() + sn_match = re.search(r"Serial\s+:\s+0*(.+)", cpu_items[-1]) + if sn_match is not None: + cpu_info['serial_number'] = sn_match.group(1).strip() + model_match = re.search(r"Model\s+:\s+(.+)", cpu_items[-1]) + if model_match is not None: + cpu_info['model'] = model_match.group(1).strip() + except Exception: + logging.info("Error Reading /proc/cpuinfo") + if mem_file.exists(): + try: + mem_text = mem_file.read_text().strip() + for line in mem_text.split('\n'): + line = line.strip() + if line.startswith("MemTotal:"): + parts = line.split() + cpu_info['total_memory'] = int(parts[1]) + cpu_info['memory_units'] = parts[2] + break + except Exception: + logging.info("Error Reading /proc/meminfo") + return cpu_info + + def _check_inside_container(self) -> Dict[str, Any]: + cgroup_file = pathlib.Path(CGROUP_PATH) + virt_type = virt_id = "none" + if cgroup_file.exists(): + try: + data = cgroup_file.read_text() + container_types = ["docker", "lxc"] + for ct in container_types: + if ct in data: + self.inside_container = True + virt_type = "container" + virt_id = ct + logging.info( + f"Container detected via cgroup: {ct}" + ) + break + except Exception: + logging.exception(f"Error reading {CGROUP_PATH}") + + # Fall back to process schedule check + if not self.inside_container: + sched_file = pathlib.Path(SCHED_PATH) + if sched_file.exists(): + try: + data = sched_file.read_text().strip() + proc_name = data.split('\n')[0].split()[0] + if proc_name not in ["init", "systemd"]: + self.inside_container = True + virt_type = "container" + virt_id = "lxc" + if ( + os.path.exists("/.dockerenv") or + os.path.exists("/.dockerinit") + ): + virt_id = "docker" + logging.info( + f"Container detected via sched: {virt_id}" + ) + except Exception: + logging.exception(f"Error reading {SCHED_PATH}") + return { + 'virt_type': virt_type, + 'virt_identifier': virt_id + } + + async def _parse_network_interfaces(self, + sequence: int, + notify: bool = True + ) -> None: + if sequence % NETWORK_UPDATE_SEQUENCE: + return + network: Dict[str, Any] = {} + canbus: Dict[str, Any] = {} + try: + # get network interfaces + resp = await self.addr_cmd.run_with_response(log_complete=False) + decoded: List[Dict[str, Any]] = jsonw.loads(resp) + for interface in decoded: + if interface['operstate'] != "UP": + continue + if interface['link_type'] == "can": + infodata: dict = interface.get( + "linkinfo", {}).get("info_data", {}) + canbus[interface['ifname']] = { + 'tx_queue_len': interface['txqlen'], + 'bitrate': infodata.get("bittiming", {}).get( + "bitrate", -1 + ), + 'driver': infodata.get("bittiming_const", {}).get( + "name", "unknown" + ) + } + elif ( + interface['link_type'] == "ether" and + 'address' in interface + ): + addresses: List[Dict[str, Any]] = [ + { + 'family': IP_FAMILIES[addr['family']], + 'address': addr['local'], + 'is_link_local': addr.get('scope', "") == "link" + } + for addr in interface.get('addr_info', []) + if 'family' in addr and 'local' in addr + ] + if not addresses: + continue + network[interface['ifname']] = { + 'mac_address': interface['address'], + 'ip_addresses': addresses + } + except Exception: + logging.exception("Error processing network update") + return + prev_network = self.system_info.get('network', {}) + if network != prev_network: + self._find_public_ip() + if notify: + self.server.send_event("machine:net_state_changed", network) + self.system_info['network'] = network + self.system_info['canbus'] = canbus + + async def get_public_network(self) -> Dict[str, Any]: + wifis = await self._get_wifi_interfaces() + public_intf = self._find_public_interface() + ifname = public_intf["ifname"] + is_wifi = ifname in wifis + public_intf["is_wifi"] = is_wifi + if is_wifi: + public_intf["ssid"] = wifis[ifname] + # TODO: Can we detect the private top level domain? That + # would be ideal + public_intf["hostname"] = socket.gethostname() + return public_intf + + def _find_public_interface(self) -> Dict[str, Any]: + src_ip = self._find_public_ip() + networks = self.system_info.get("network", {}) + for ifname, ifinfo in networks.items(): + for addrinfo in ifinfo["ip_addresses"]: + if addrinfo["is_link_local"]: + continue + fam = addrinfo["family"] + addr = addrinfo["address"] + if fam == "ipv6" and not src_ip: + ip = ipaddress.ip_address(addr) + if ip.is_global: + return { + "ifname": ifname, + "address": addr, + "family": fam + } + elif src_ip == addr: + return { + "ifname": ifname, + "address": addr, + "family": fam + } + return { + "ifname": "", + "address": src_ip or "", + "family": "" + } + + def _find_public_ip(self) -> str: + # Check for an IPv4 Source IP + # NOTE: It should also be possible to extract this from + # the routing table, ie: ip -json route + # It would be an entry with a "gateway" with the lowest + # metric. Might also be able to get IPv6 info from this. + # However, it would be better to use NETLINK for this rather + # than run another shell command + src_ip: str = "" + # First attempt: use "broadcast" to find the local IP + addr_info = [ + ("", 0, socket.AF_INET), + ("10.255.255.255", 1, socket.AF_INET), + ("2001:db8::1234", 1, socket.AF_INET6), + ] + for (addr, port, fam) in addr_info: + s = socket.socket(fam, socket.SOCK_DGRAM | socket.SOCK_NONBLOCK) + try: + if addr == "": + s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) + s.connect((addr, port)) + src_ip = s.getsockname()[0] + except Exception: + continue + logging.info(f"Detected Local IP: {src_ip}") + break + if src_ip != self._public_ip: + self._public_ip = src_ip + self.server.send_event("machine:public_ip_changed", src_ip) + if not src_ip: + logging.info("Failed to detect local IP address") + return src_ip + + async def _get_wifi_interfaces(self) -> Dict[str, Any]: + # get wifi interfaces + shell_cmd: SCMDComp = self.server.lookup_component('shell_command') + wifi_intfs: Dict[str, Any] = {} + try: + resp = await self.iwgetid_cmd.run_with_response(log_complete=False) + except shell_cmd.error: + logging.info("Failed to run 'iwgetid' command") + return {} + if resp: + for line in resp.split("\n"): + parts = line.strip().split(maxsplit=1) + wifi_intfs[parts[0]] = parts[1].split(":")[-1].strip('"') + return wifi_intfs + + def log_service_info(self, svc_info: Dict[str, Any]) -> None: + if not svc_info: + return + name = svc_info.get("unit_name", "unknown") + manager = svc_info.get("manager", "systemd").capitalize() + msg = f"\n{manager} unit {name}:" + for key, val in svc_info.items(): + if key == "properties": + msg += "\nProperties:" + for prop_key, prop in val.items(): + msg += f"\n**{prop_key}={prop}" + else: + msg += f"\n{key}: {val}" + self.server.add_log_rollover_item(name, msg) + + async def update_usb_ids(self, force: bool = False) -> None: + async with self.periph_lock: + db: MoonrakerDatabase = self.server.lookup_component("database") + client: HttpClient = self.server.lookup_component("http_client") + dpath = pathlib.Path(self.server.get_app_arg("data_path")) + usb_ids_path = pathlib.Path(dpath).joinpath("misc/usb.ids") + if usb_ids_path.is_file() and not force: + return + usb_id_req_info: Dict[str, str] + usb_id_req_info = await db.get_item("moonraker", "usb_id_req_info", {}) + etag: Optional[str] = usb_id_req_info.pop("etag", None) + last_modified: Optional[str] = usb_id_req_info.pop("last_modified", None) + headers = {"Accept": "text/plain"} + if etag is not None and usb_ids_path.is_file(): + headers["If-None-Match"] = etag + if last_modified is not None and usb_ids_path.is_file(): + headers["If-Modified-Since"] = last_modified + logging.info("Fetching latest usb.ids file...") + resp = await client.get( + USB_IDS_URL, headers, enable_cache=False + ) + if resp.has_error(): + logging.info("Failed to retrieve usb.ids file") + return + if resp.status_code == 304: + logging.info("USB IDs file up to date") + return + # Save etag and modified headers + if resp.etag is not None: + usb_id_req_info["etag"] = resp.etag + if resp.last_modified is not None: + usb_id_req_info["last_modifed"] = resp.last_modified + await db.insert_item("moonraker", "usb_id_req_info", usb_id_req_info) + # Write file + logging.info("Writing usb.ids file...") + eventloop = self.server.get_event_loop() + await eventloop.run_in_thread(usb_ids_path.write_bytes, resp.content) + + async def detect_serial_devices(self) -> List[Dict[str, Any]]: + async with self.periph_lock: + eventloop = self.server.get_event_loop() + return await eventloop.run_in_thread(sysfs_devs.find_serial_devices) + + async def detect_usb_devices(self) -> List[Dict[str, Any]]: + async with self.periph_lock: + eventloop = self.server.get_event_loop() + return await eventloop.run_in_thread(self._do_usb_detect) + + def _do_usb_detect(self) -> List[Dict[str, Any]]: + data_path = pathlib.Path(self.server.get_app_args()["data_path"]) + usb_id_path = data_path.joinpath("misc/usb.ids") + usb_id_data = sysfs_devs.UsbIdData(usb_id_path) + dev_list = sysfs_devs.find_usb_devices() + for usb_dev_info in dev_list: + cls_ids: List[str] = usb_dev_info.pop("class_ids", None) + class_info = usb_id_data.get_class_info(*cls_ids) + usb_dev_info.update(class_info) + prod_info = usb_id_data.get_product_info( + usb_dev_info["vendor_id"], usb_dev_info["product_id"] + ) + for field, desc in prod_info.items(): + if usb_dev_info.get(field) is None: + usb_dev_info[field] = desc + return dev_list + + async def query_can_uuids(self, interface: str) -> List[Dict[str, Any]]: + async with self.periph_lock: + cansock = cansocket.CanSocket(interface) + uuids = await cansocket.query_klipper_uuids(cansock) + cansock.close() + return uuids + + async def detect_video_devices(self) -> Dict[str, List[Dict[str, Any]]]: + async with self.periph_lock: + eventloop = self.server.get_event_loop() + v4l2_devs = await eventloop.run_in_thread(sysfs_devs.find_video_devices) + libcam_devs = await eventloop.run_in_thread(self.get_libcamera_devices) + return { + "v4l2_devices": v4l2_devs, + "libcamera_devices": libcam_devs + } + + def get_libcamera_devices(self) -> List[Dict[str, Any]]: + libcam = self.libcam + libcam_devs: List[Dict[str, Any]] = [] + if libcam is not None: + cm = libcam.CameraManager.singleton() + for cam in cm.cameras: + device: Dict[str, Any] = {"libcamera_id": cam.id} + props_by_name = {cid.name: val for cid, val in cam.properties.items()} + device["model"] = props_by_name.get("Model") + modes: List[Dict[str, Any]] = [] + cam_config = cam.generate_configuration([libcam.StreamRole.Raw]) + for stream_cfg in cam_config: + formats = stream_cfg.formats + for pix_fmt in formats.pixel_formats: + cur_mode: Dict[str, Any] = {"format": str(pix_fmt)} + resolutions: List[str] = [] + for size in formats.sizes(pix_fmt): + resolutions.append(str(size)) + cur_mode["resolutions"] = resolutions + modes.append(cur_mode) + device["modes"] = modes + libcam_devs.append(device) + return libcam_devs + + +class BaseProvider: + def __init__(self, config: ConfigHelper) -> None: + self.server = config.get_server() + self.shutdown_action = config.get("shutdown_action", "poweroff") + self.shutdown_action = self.shutdown_action.lower() + if self.shutdown_action not in ["halt", "poweroff"]: + raise config.error( + "Section [machine], Option 'shutdown_action':" + f"Invalid value '{self.shutdown_action}', must be " + "'halt' or 'poweroff'" + ) + self.available_services: Dict[str, Dict[str, str]] = {} + self.shell_cmd: SCMDComp = self.server.load_component( + config, 'shell_command') + + async def initialize(self) -> None: + pass + + async def _exec_sudo_command(self, command: str): + machine: Machine = self.server.lookup_component("machine") + return await machine.exec_sudo_command(command) + + async def shutdown(self) -> None: + await self._exec_sudo_command(f"systemctl {self.shutdown_action}") + + async def reboot(self) -> None: + await self._exec_sudo_command("systemctl reboot") + + async def do_service_action(self, + action: str, + service_name: str + ) -> None: + raise self.server.error("Service Actions Not Available", 503) + + async def check_virt_status(self) -> Dict[str, Any]: + return { + 'virt_type': "unknown", + 'virt_identifier': "unknown" + } + + def is_service_available(self, service: str) -> bool: + return service in self.available_services + + def get_available_services(self) -> Dict[str, Dict[str, str]]: + return self.available_services + + async def extract_service_info( + self, + service_name: str, + pid: int, + properties: Optional[List[str]] = None, + raw: bool = False + ) -> Dict[str, Any]: + return {} + +class SystemdCliProvider(BaseProvider): + async def initialize(self) -> None: + await self._detect_active_services() + if self.available_services: + svcs = list(self.available_services.keys()) + self.svc_cmd = self.shell_cmd.build_shell_command( + "systemctl show -p ActiveState,SubState --value " + f"{' '.join(svcs)}") + await self._update_service_status(0, notify=True) + pstats: ProcStats = self.server.lookup_component('proc_stats') + pstats.register_stat_callback(self._update_service_status) + + async def do_service_action(self, + action: str, + service_name: str + ) -> None: + await self._exec_sudo_command(f"systemctl {action} {service_name}") + + async def check_virt_status(self) -> Dict[str, Any]: + # Fallback virtualization check + virt_id = virt_type = "none" + + # Check for any form of virtualization. This will report the innermost + # virtualization type in the event that nested virtualization is used + try: + resp: str = await self.shell_cmd.exec_cmd("systemd-detect-virt") + except self.shell_cmd.error: + pass + else: + virt_id = resp.strip() + + if virt_id != "none": + # Check explicitly for container virtualization + try: + resp = await self.shell_cmd.exec_cmd( + "systemd-detect-virt --container") + except self.shell_cmd.error: + virt_type = "vm" + else: + if virt_id == resp.strip(): + virt_type = "container" + else: + # Moonraker is run from within a VM inside a container + virt_type = "vm" + logging.info( + f"Virtualized Environment Detected, Type: {virt_type} " + f"id: {virt_id}") + else: + logging.info("No Virtualization Detected") + return { + 'virt_type': virt_type, + 'virt_identifier': virt_id + } + + async def _detect_active_services(self) -> None: + machine: Machine = self.server.lookup_component("machine") + try: + resp: str = await self.shell_cmd.exec_cmd( + "systemctl list-units --all --type=service --plain" + " --no-legend") + lines = resp.split('\n') + services = [line.split()[0].strip() for line in lines + if ".service" in line.strip()] + except Exception: + services = [] + for svc in services: + sname = svc.rsplit('.', 1)[0] + if machine.is_service_allowed(sname): + self.available_services[sname] = { + 'active_state': "unknown", + 'sub_state': "unknown" + } + + async def _update_service_status(self, + sequence: int, + notify: bool = True + ) -> None: + if sequence % 2: + # Update every other sequence + return + svcs = list(self.available_services.keys()) + try: + resp = await self.svc_cmd.run_with_response(log_complete=False) + for svc, state in zip(svcs, resp.strip().split('\n\n')): + active_state, sub_state = state.split('\n', 1) + new_state: Dict[str, str] = { + 'active_state': active_state, + 'sub_state': sub_state + } + if self.available_services[svc] != new_state: + self.available_services[svc] = new_state + if notify: + self.server.send_event( + "machine:service_state_changed", + {svc: new_state}) + except Exception: + logging.exception("Error processing service state update") + + async def extract_service_info( + self, + service_name: str, + pid: int, + properties: Optional[List[str]] = None, + raw: bool = False + ) -> Dict[str, Any]: + service_info: Dict[str, Any] = {} + expected_name = f"{service_name}.service" + if properties is None: + properties = SERVICE_PROPERTIES + try: + resp: str = await self.shell_cmd.exec_cmd( + f"systemctl status {pid}" + ) + unit_name = resp.split(maxsplit=2)[1] + service_info["unit_name"] = unit_name + service_info["is_default"] = True + service_info["manager"] = "systemd" + if unit_name != expected_name: + service_info["is_default"] = False + logging.info( + f"Detected alternate unit name for {service_name}: " + f"{unit_name}" + ) + prop_args = ",".join(properties) + props: str = await self.shell_cmd.exec_cmd( + f"systemctl show -p {prop_args} {unit_name}", attempts=5, + timeout=10. + ) + raw_props: Dict[str, Any] = {} + lines = [p.strip() for p in props.split("\n") if p.strip()] + for line in lines: + parts = line.split("=", 1) + if len(parts) == 2: + key = parts[0].strip() + val = parts[1].strip() + raw_props[key] = val + if raw: + service_info["properties"] = raw_props + else: + processed = self._process_raw_properties(raw_props) + service_info["properties"] = processed + except Exception: + logging.exception("Error extracting service info") + return {} + return service_info + + def _process_raw_properties( + self, raw_props: Dict[str, str] + ) -> Dict[str, Any]: + processed: Dict[str, Any] = {} + for key, val in raw_props.items(): + processed[key] = val + if key == "ExecStart": + # this is a struct, we need to deconstruct it + match = re.search(r"argv\[\]=([^;]+);", val) + if match is not None: + processed[key] = match.group(1).strip() + elif key == "EnvironmentFiles": + if val: + processed[key] = val.split()[0] + elif key in ["Requires", "After", "SupplementaryGroups"]: + vals = [v.strip() for v in val.split() if v.strip()] + processed[key] = vals + return processed + +class SystemdDbusProvider(BaseProvider): + def __init__(self, config: ConfigHelper) -> None: + super().__init__(config) + self.dbus_mgr: DbusManager = self.server.lookup_component( + "dbus_manager") + self.login_mgr: Optional[ProxyInterface] = None + self.props: List[Tuple[ProxyInterface, Callable]] = [] + + async def initialize(self) -> None: + if not self.dbus_mgr.is_connected(): + self.server.add_warning( + "[machine]: DBus Connection Not available, systemd " + " service tracking and actions are disabled") + return + # Get the systemd manager interface + self.systemd_mgr = await self.dbus_mgr.get_interface( + "org.freedesktop.systemd1", + "/org/freedesktop/systemd1", + "org.freedesktop.systemd1.Manager" + ) + # Check for systemd PolicyKit Permissions + await self.dbus_mgr.check_permission( + "org.freedesktop.systemd1.manage-units", + "System Service Management (start, stop, restart) " + "will be disabled") + if self.shutdown_action == "poweroff": + await self.dbus_mgr.check_permission( + "org.freedesktop.login1.power-off", + "The shutdown API will be disabled" + ) + await self.dbus_mgr.check_permission( + "org.freedesktop.login1.power-off-multiple-sessions", + "The shutdown API will be disabled if multiple user " + "sessions are open." + ) + else: + await self.dbus_mgr.check_permission( + "org.freedesktop.login1.halt", + "The shutdown API will be disabled" + ) + await self.dbus_mgr.check_permission( + "org.freedesktop.login1.halt-multiple-sessions", + "The shutdown API will be disabled if multiple user " + "sessions are open." + ) + try: + # Get the login manaager interface + self.login_mgr = await self.dbus_mgr.get_interface( + "org.freedesktop.login1", + "/org/freedesktop/login1", + "org.freedesktop.login1.Manager" + ) + except self.dbus_mgr.DbusError as e: + logging.info( + "Unable to acquire the systemd-logind D-Bus interface, " + f"falling back to CLI Reboot and Shutdown APIs. {e}") + self.login_mgr = None + else: + # Check for logind permissions + await self.dbus_mgr.check_permission( + "org.freedesktop.login1.reboot", + "The reboot API will be disabled" + ) + await self.dbus_mgr.check_permission( + "org.freedesktop.login1.reboot-multiple-sessions", + "The reboot API will be disabled if multiple user " + "sessions are open." + ) + await self._detect_active_services() + + async def reboot(self) -> None: + if self.login_mgr is None: + await super().reboot() + await self.login_mgr.call_reboot(False) # type: ignore + + async def shutdown(self) -> None: + if self.login_mgr is None: + await super().shutdown() + if self.shutdown_action == "poweroff": + await self.login_mgr.call_power_off(False) # type: ignore + else: + await self.login_mgr.call_halt(False) # type: ignore + + async def do_service_action(self, + action: str, + service_name: str + ) -> None: + if not self.dbus_mgr.is_connected(): + raise self.server.error("DBus Not Connected, ", 503) + mgr = self.systemd_mgr + if not service_name.endswith(".service"): + service_name += ".service" + if action == "start": + await mgr.call_start_unit(service_name, "replace") # type: ignore + elif action == "stop": + await mgr.call_stop_unit(service_name, "replace") # type: ignore + elif action == "restart": + await mgr.call_restart_unit( # type: ignore + service_name, "replace") + else: + raise self.server.error(f"Invalid service action: {action}") + + async def check_virt_status(self) -> Dict[str, Any]: + if not self.dbus_mgr.is_connected(): + return { + 'virt_type': "unknown", + 'virt_identifier': "unknown" + } + mgr = self.systemd_mgr + virt_id = virt_type = "none" + virt: str = await mgr.get_virtualization() # type: ignore + virt = virt.strip() + if virt: + virt_id = virt + container_types = [ + "openvz", "lxc", "lxc-libvirt", "systemd-nspawn", + "docker", "podman", "rkt", "wsl", "proot", "pouch"] + if virt_id in container_types: + virt_type = "container" + else: + virt_type = "vm" + logging.info( + f"Virtualized Environment Detected, Type: {virt_type} " + f"id: {virt_id}") + else: + logging.info("No Virtualization Detected") + return { + 'virt_type': virt_type, + 'virt_identifier': virt_id + } + + async def _detect_active_services(self) -> None: + # Get loaded service + mgr = self.systemd_mgr + machine: Machine = self.server.lookup_component("machine") + units: List[str] + units = await mgr.call_list_units_filtered(["loaded"]) # type: ignore + for unit in units: + name: str = unit[0].split('.')[0] + if not machine.is_service_allowed(name): + continue + state: str = unit[3] + substate: str = unit[4] + dbus_path: str = unit[6] + if name in self.available_services: + continue + self.available_services[name] = { + 'active_state': state, + 'sub_state': substate + } + # setup state monitoring + props = await self.dbus_mgr.get_interface( + "org.freedesktop.systemd1", dbus_path, + "org.freedesktop.DBus.Properties" + ) + prop_callback = self._create_properties_callback(name) + self.props.append((props, prop_callback)) + props.on_properties_changed( # type: ignore + prop_callback) + + def _create_properties_callback(self, name) -> Callable: + def prop_wrapper(dbus_obj: str, + changed_props: Dict[str, Variant], + invalid_props: Dict[str, Variant] + ) -> None: + if dbus_obj != 'org.freedesktop.systemd1.Unit': + return + self._on_service_update(name, changed_props) + return prop_wrapper + + def _on_service_update(self, + service_name: str, + changed_props: Dict[str, Variant] + ) -> None: + if service_name not in self.available_services: + return + svc = self.available_services[service_name] + notify = False + if "ActiveState" in changed_props: + state: str = changed_props['ActiveState'].value + if state != svc['active_state']: + notify = True + svc['active_state'] = state + if "SubState" in changed_props: + state = changed_props['SubState'].value + if state != svc['sub_state']: + notify = True + svc['sub_state'] = state + if notify: + self.server.send_event("machine:service_state_changed", + {service_name: dict(svc)}) + + async def extract_service_info( + self, + service_name: str, + pid: int, + properties: Optional[List[str]] = None, + raw: bool = False + ) -> Dict[str, Any]: + if not hasattr(self, "systemd_mgr"): + return {} + mgr = self.systemd_mgr + service_info: Dict[str, Any] = {} + expected_name = f"{service_name}.service" + if properties is None: + properties = SERVICE_PROPERTIES + try: + dbus_path: str + dbus_path = await mgr.call_get_unit_by_pid(pid) # type: ignore + bus = "org.freedesktop.systemd1" + unit_intf, svc_intf = await self.dbus_mgr.get_interfaces( + "org.freedesktop.systemd1", dbus_path, + [f"{bus}.Unit", f"{bus}.Service"] + ) + unit_name = await unit_intf.get_id() # type: ignore + service_info["unit_name"] = unit_name + service_info["is_default"] = True + service_info["manager"] = "systemd" + if unit_name != expected_name: + service_info["is_default"] = False + logging.info( + f"Detected alternate unit name for {service_name}: " + f"{unit_name}" + ) + raw_props: Dict[str, Any] = {} + for key in properties: + snake_key = re.sub(r"(.)([A-Z][a-z]+)", r"\1_\2", key).lower() + func = getattr(unit_intf, f"get_{snake_key}", None) + if func is None: + func = getattr(svc_intf, f"get_{snake_key}", None) + if func is None: + continue + val = await func() + raw_props[key] = val + if raw: + service_info["properties"] = raw_props + else: + processed = self._process_raw_properties(raw_props) + service_info["properties"] = processed + except Exception: + logging.exception("Error Extracting Service Info") + return {} + return service_info + + def _process_raw_properties( + self, raw_props: Dict[str, Any] + ) -> Dict[str, Any]: + processed: Dict[str, Any] = {} + for key, val in raw_props.items(): + if key == "ExecStart": + try: + val = " ".join(val[0][1]) + except Exception: + pass + elif key == "EnvironmentFiles": + try: + val = val[0][0] + except Exception: + pass + processed[key] = val + return processed + +# for docker klipper-moonraker image multi-service managing +# since in container, all command is launched by normal user, +# sudo_cmd is not needed. +class SupervisordCliProvider(BaseProvider): + def __init__(self, config: ConfigHelper) -> None: + super().__init__(config) + self.spv_conf: str = config.get("supervisord_config_path", "") + + async def initialize(self) -> None: + await self._detect_active_services() + keys = ' '.join(list(self.available_services.keys())) + if self.spv_conf: + cmd = f"supervisorctl -c {self.spv_conf} status {keys}" + else: + cmd = f"supervisorctl status {keys}" + self.svc_cmd = self.shell_cmd.build_shell_command(cmd) + await self._update_service_status(0, notify=True) + pstats: ProcStats = self.server.lookup_component('proc_stats') + pstats.register_stat_callback(self._update_service_status) + + async def do_service_action( + self, action: str, service_name: str + ) -> None: + # slow reaction for supervisord, timeout set to 6.0 + await self._exec_supervisorctl_command( + f"{action} {service_name}", timeout=6. + ) + + async def _exec_supervisorctl_command( + self, + args: str, + tries: int = 1, + timeout: float = 2., + success_codes: Optional[List[int]] = None + ) -> str: + if self.spv_conf: + cmd = f"supervisorctl -c {self.spv_conf} {args}" + else: + cmd = f"supervisorctl {args}" + return await self.shell_cmd.exec_cmd( + cmd, proc_input=None, log_complete=False, attempts=tries, + timeout=timeout, success_codes=success_codes + ) + + def _get_active_state(self, sub_state: str) -> str: + if sub_state == "stopping": + return "deactivating" + elif sub_state == "running": + return "active" + else: + return "inactive" + + async def _detect_active_services(self) -> None: + machine: Machine = self.server.lookup_component("machine") + units: Dict[str, Any] = await self._get_process_info() + for unit, info in units.items(): + if machine.is_service_allowed(unit): + self.available_services[unit] = { + 'active_state': self._get_active_state(info["state"]), + 'sub_state': info["state"] + } + + async def _get_process_info( + self, process_names: Optional[List[str]] = None + ) -> Dict[str, Any]: + units: Dict[str, Any] = {} + cmd = "status" + if process_names is not None: + cmd = f"status {' '.join(process_names)}" + try: + resp = await self._exec_supervisorctl_command( + cmd, timeout=6., success_codes=[0, 3] + ) + lines = [line.strip() for line in resp.split("\n") if line.strip()] + except Exception: + return {} + for line in lines: + parts = line.split() + name: str = parts[0] + state: str = parts[1].lower() + if state == "running" and len(parts) >= 6: + units[name] = { + "state": state, + "pid": int(parts[3].rstrip(",")), + "uptime": parts[5] + } + else: + units[name] = { + "state": parts[1].lower() + } + return units + + async def _update_service_status(self, + sequence: int, + notify: bool = True + ) -> None: + if sequence % 2: + # Update every other sequence + return + svcs = list(self.available_services.keys()) + try: + # slow reaction for supervisord, timeout set to 6.0 + resp = await self.svc_cmd.run_with_response( + log_complete=False, timeout=6., success_codes=[0, 3] + ) + resp_l = resp.strip().split("\n") # drop lengend + for svc, state in zip(svcs, resp_l): + sub_state = state.split()[1].lower() + new_state: Dict[str, str] = { + 'active_state': self._get_active_state(sub_state), + 'sub_state': sub_state + } + if self.available_services[svc] != new_state: + self.available_services[svc] = new_state + if notify: + self.server.send_event( + "machine:service_state_changed", + {svc: new_state}) + except Exception: + logging.exception("Error processing service state update") + + async def _find_service_by_pid( + self, expected_name: str, pid: int + ) -> Dict[str, Any]: + service_info: Dict[str, Any] = {} + for _ in range(5): + proc_info = await self._get_process_info( + list(self.available_services.keys()) + ) + service_info["unit_name"] = expected_name + service_info["is_default"] = True + service_info["manager"] = "supervisord" + need_retry = False + for name, info in proc_info.items(): + if "pid" not in info: + need_retry |= info["state"] == "starting" + elif info["pid"] == pid: + if name != expected_name: + service_info["unit_name"] = name + service_info["is_default"] = False + logging.info( + "Detected alternate unit name for " + f"{expected_name}: {name}" + ) + return service_info + if need_retry: + await asyncio.sleep(1.) + else: + break + return {} + + async def extract_service_info( + self, + service_name: str, + pid: int, + properties: Optional[List[str]] = None, + raw: bool = False + ) -> Dict[str, Any]: + service_info = await self._find_service_by_pid(service_name, pid) + if not service_info: + logging.info( + f"Unable to locate service info for {service_name}, pid: {pid}" + ) + return {} + # locate supervisord.conf + if self.spv_conf: + spv_path = pathlib.Path(self.spv_conf) + if not spv_path.is_file(): + logging.info( + f"Invalid supervisord configuration file: {self.spv_conf}" + ) + return service_info + else: + default_config_locations = [ + "/etc/supervisord.conf", + "/etc/supervisor/supervisord.conf" + ] + for conf_path in default_config_locations: + spv_path = pathlib.Path(conf_path) + if spv_path.is_file(): + break + else: + logging.info("Failed to locate supervisord.conf") + return service_info + spv_config = configparser.ConfigParser(interpolation=None) + spv_config.read_string(spv_path.read_text()) + unit = service_info["unit_name"] + section_name = f"program:{unit}" + if not spv_config.has_section(section_name): + logging.info( + f"Unable to location supervisor section {section_name}" + ) + return service_info + service_info["properties"] = dict(spv_config[section_name]) + return service_info + + +# Install validation +INSTALL_VERSION = 1 +SERVICE_VERSION = 1 + +SYSTEMD_UNIT = \ +""" +# systemd service file for moonraker +[Unit] +Description=API Server for Klipper SV%d +Requires=network-online.target +After=network-online.target + +[Install] +WantedBy=multi-user.target + +[Service] +Type=simple +User=%s +SupplementaryGroups=moonraker-admin +RemainAfterExit=yes +EnvironmentFile=%s +ExecStart=%s $MOONRAKER_ARGS +Restart=always +RestartSec=10 +""" # noqa: E122 + +TEMPLATE_NAME = "password_request.html" + +class ValidationError(Exception): + pass + +class InstallValidator: + def __init__(self, config: ConfigHelper) -> None: + self.server = config.get_server() + self.config = config + self.server.load_component(config, "template") + self.force_validation = config.getboolean("force_validation", False) + self.sc_enabled = config.getboolean("validate_service", True) + self.cc_enabled = config.getboolean("validate_config", True) + app_args = self.server.get_app_args() + self.data_path = pathlib.Path(app_args["data_path"]) + self._update_backup_path() + self.data_path_valid = True + self._sudo_requested = False + self.announcement_id = "" + self.validation_enabled = False + + def _update_backup_path(self) -> None: + str_time = time.strftime("%Y%m%dT%H%M%SZ", time.gmtime()) + if not hasattr(self, "backup_path"): + self.backup_path = self.data_path.joinpath(f"backup/{str_time}") + elif not self.backup_path.exists(): + self.backup_path = self.data_path.joinpath(f"backup/{str_time}") + + async def validation_init(self) -> None: + db: MoonrakerDatabase = self.server.lookup_component("database") + install_ver: Optional[int] = await db.get_item( + "moonraker", "validate_install.install_version", None + ) + if install_ver is None: + # skip validation for new installs + await db.insert_item( + "moonraker", "validate_install.install_version", INSTALL_VERSION + ) + install_ver = INSTALL_VERSION + if install_ver < INSTALL_VERSION: + logging.info("Validation version in database out of date") + self.validation_enabled = True + else: + msg = "Installation version in database up to date" + if self.force_validation: + msg += ", force is enabled" + logging.info(msg) + self.validation_enabled = self.force_validation + is_bkp_cfg = self.server.get_app_args().get("is_backup_config", False) + if self.validation_enabled and is_bkp_cfg: + self.server.add_warning( + "Backup configuration loaded, aborting install validation. " + "Please correct the configuration issue and restart moonraker." + ) + self.validation_enabled = False + return + + async def perform_validation(self) -> bool: + db: MoonrakerDatabase = self.server.lookup_component("database") + if not self.validation_enabled: + return False + fm: FileManager = self.server.lookup_component("file_manager") + need_restart: bool = False + has_error: bool = False + name = "service" + try: + need_restart = await self._check_service_file() + name = "config" + need_restart |= await self._check_configuration() + except asyncio.CancelledError: + raise + except ValidationError as ve: + has_error = True + self.server.add_warning(str(ve)) + fm.disable_write_access() + except Exception as e: + has_error = True + msg = f"Failed to validate {name}: {e}" + logging.exception(msg) + self.server.add_warning(msg, log=False) + fm.disable_write_access() + else: + self.validation_enabled = False + await db.insert_item( + "moonraker", "validate_install.install_version", INSTALL_VERSION + ) + if not has_error and need_restart: + machine: Machine = self.server.lookup_component("machine") + machine.restart_moonraker_service() + return True + return False + + async def _check_service_file(self) -> bool: + if not self.sc_enabled: + return False + machine: Machine = self.server.lookup_component("machine") + if machine.is_inside_container(): + raise ValidationError( + "Moonraker instance running inside a container, " + "cannot validate service file." + ) + if machine.get_provider_type() == "none": + raise ValidationError( + "No machine provider configured, cannot validate service file." + ) + logging.info("Performing Service Validation...") + app_args = self.server.get_app_args() + svc_info = machine.get_moonraker_service_info() + if not svc_info: + raise ValidationError( + "Unable to retrieve Moonraker service info. Service file " + "must be updated manually." + ) + props: Dict[str, str] = svc_info.get("properties", {}) + if "FragmentPath" not in props: + raise ValidationError( + "Unable to locate path to Moonraker's service unit. Service " + "file must be must be updated manually." + ) + desc = props.get("Description", "") + ver_match = re.match(r"API Server for Klipper SV(\d+)", desc) + if ver_match is not None and int(ver_match.group(1)) == SERVICE_VERSION: + logging.info("Service file validated and up to date") + return False + unit: str = svc_info.get("unit_name", "").split(".", 1)[0] + if not unit: + raise ValidationError( + "Unable to retrieve service unit name. Service file " + "must be updated manually." + ) + if unit != "moonraker": + logging.info(f"Custom service file detected: {unit}") + # Not using he default unit name + if app_args["is_default_data_path"] and self.data_path_valid: + # No datapath set, create a new, unique data path + df = f"~/{unit}_data" + match = re.match(r"moonraker[-_]?(\d+)", unit) + if match is not None: + df = f"~/printer_{match.group(1)}_data" + new_dp = pathlib.Path(df).expanduser().resolve() + if new_dp.exists() and not self._check_path_bare(new_dp): + raise ValidationError( + f"Cannot resolve data path for custom unit '{unit}', " + f"data path '{new_dp}' already exists. Service file " + "must be updated manually." + ) + + # If the current path is bare we can remove it + if ( + self.data_path.exists() and + self._check_path_bare(self.data_path) + ): + shutil.rmtree(self.data_path) + self.data_path = new_dp + if not self.data_path.exists(): + logging.info(f"New data path created at {self.data_path}") + self.data_path.mkdir() + # A non-default datapath requires successful update of the + # service + self.data_path_valid = False + user: str = props["User"] + has_sudo = False + if await machine.check_sudo_access(): + has_sudo = True + logging.info("Moonraker has sudo access") + elif user == "pi" and machine.sudo_password is None: + machine.sudo_password = "raspberry" + has_sudo = await machine.check_sudo_access() + if not has_sudo: + self._request_sudo_access() + raise ValidationError( + "Moonraker requires sudo permission to update the system " + "service. Please check your notifications for further " + "intructions." + ) + self._sudo_requested = False + svc_dest = pathlib.Path(props["FragmentPath"]) + tmp_svc = pathlib.Path( + tempfile.gettempdir() + ).joinpath(f"{unit}-tmp.svc") + # Create local environment file + sysd_data = self.data_path.joinpath("systemd") + if not sysd_data.exists(): + sysd_data.mkdir() + env_file = sysd_data.joinpath("moonraker.env") + env_vars: Dict[str, str] = { + "MOONRAKER_DATA_PATH": str(self.data_path) + } + cfg_file = pathlib.Path(app_args["config_file"]) + fm: FileManager = self.server.lookup_component("file_manager") + cfg_path = fm.get_directory("config") + log_path = fm.get_directory("logs") + if not cfg_path or not cfg_file.parent.samefile(cfg_path): + env_vars["MOONRAKER_CONFIG_PATH"] = str(cfg_file) + elif cfg_file.name != "moonraker.conf": + cfg_file = self.data_path.joinpath(f"config/{cfg_file.name}") + env_vars["MOONRAKER_CONFIG_PATH"] = str(cfg_file) + if not app_args["log_file"]: + # No log file configured + env_vars["MOONRAKER_DISABLE_FILE_LOG"] = "y" + else: + # Log file does not exist in log path + log_file = pathlib.Path(app_args["log_file"]) + if not log_path or not log_file.parent.samefile(log_path): + env_vars["MOONRAKER_LOG_PATH"] = str(log_file) + elif log_file.name != "moonraker.log": + cfg_file = self.data_path.joinpath(f"logs/{log_file.name}") + env_vars["MOONRAKER_LOG_PATH"] = str(log_file) + # backup existing service files + self._update_backup_path() + svc_bkp_path = self.backup_path.joinpath("service") + os.makedirs(str(svc_bkp_path), exist_ok=True) + if env_file.exists(): + env_bkp = svc_bkp_path.joinpath(env_file.name) + shutil.copy2(str(env_file), str(env_bkp)) + service_bkp = svc_bkp_path.joinpath(svc_dest.name) + shutil.copy2(str(svc_dest), str(service_bkp)) + # write temporary service file + src_path = source_info.source_path() + exec_path = pathlib.Path(sys.executable) + py_exec = exec_path.parent.joinpath("python") + if exec_path.name == "python" or py_exec.is_file(): + # Default to loading via the python executable. This + # makes it possible to switch between git repos, pip + # releases and git releases without reinstalling the + # service. + exec_path = py_exec + env_vars["MOONRAKER_ARGS"] = "-m moonraker" + if not source_info.is_dist_package(): + # This module isn't in site/dist packages, + # add PYTHONPATH env variable + env_vars["PYTHONPATH"] = str(src_path) + tmp_svc.write_text( + SYSTEMD_UNIT + % (SERVICE_VERSION, user, env_file, exec_path) + ) + try: + # write new environment + envout = "\n".join(f"{key}=\"{val}\"" for key, val in env_vars.items()) + env_file.write_text(envout) + await machine.exec_sudo_command( + f"cp -f {tmp_svc} {svc_dest}", tries=5, timeout=60.) + await machine.exec_sudo_command( + "systemctl daemon-reload", tries=5, timeout=60. + ) + except asyncio.CancelledError: + raise + except Exception: + logging.exception("Failed to update moonraker service unit") + raise ValidationError( + f"Failed to update service unit file '{svc_dest}'. Update must " + f"be performed manually." + ) from None + finally: + tmp_svc.unlink() + self.data_path_valid = True + self.sc_enabled = False + return True + + def _check_path_bare(self, path: pathlib.Path) -> bool: + empty: bool = True + if not path.exists(): + return True + for item in path.iterdir(): + if ( + item.is_file() or + item.is_symlink() or + item.name not in ["gcodes", "config", "logs", "certs"] + ): + empty = False + break + if item.is_dir() and next(item.iterdir(), None) is not None: + empty = False + break + return empty + + def _link_data_subfolder( + self, + folder_name: str, + source_dir: Union[str, pathlib.Path], + exist_ok: bool = False + ) -> None: + if isinstance(source_dir, str): + source_dir = pathlib.Path(source_dir).expanduser().resolve() + subfolder = self.data_path.joinpath(folder_name) + if not source_dir.exists(): + logging.info( + f"Source path '{source_dir}' does not exist. Falling " + f"back to default folder {subfolder}" + ) + return + if not source_dir.is_dir(): + raise ValidationError( + f"Failed to link subfolder '{folder_name}' to source path " + f"'{source_dir}'. The requested path is not a valid directory." + ) + if subfolder.is_symlink(): + if not subfolder.samefile(source_dir): + if exist_ok: + logging.info( + f"Folder {subfolder} already linked, aborting link " + f"to {source_dir}" + ) + return + raise ValidationError( + f"Failed to link subfolder '{folder_name}' to " + f"'{source_dir}'. '{folder_name}' already exists and is " + f"linked to {subfolder}. This conflict requires " + "manual resolution." + ) + return + if not subfolder.exists(): + subfolder.symlink_to(source_dir) + return + if subfolder.is_dir() and next(subfolder.iterdir(), None) is None: + subfolder.rmdir() + subfolder.symlink_to(source_dir) + return + if exist_ok: + logging.info( + f"Path at {subfolder} exists, aborting link to {source_dir}" + ) + return + raise ValidationError( + f"Failed to link subfolder '{folder_name}' to '{source_dir}'. " + f"Folder '{folder_name}' already exists. This conflict requires " + "manual resolution." + ) + + def _link_data_file( + self, + data_file: Union[str, pathlib.Path], + target: Union[str, pathlib.Path] + ) -> None: + if isinstance(data_file, str): + data_file = pathlib.Path(data_file) + if isinstance(target, str): + target = pathlib.Path(target) + target = target.expanduser().resolve() + if not target.exists(): + logging.info( + f"Target file {target} does not exist. Aborting symbolic " + f"link to {data_file.name}." + ) + return + if not target.is_file(): + raise ValidationError( + f"Failed to link data file {data_file.name}. Target " + f"{target} is not a valid file." + ) + if data_file.is_symlink(): + if not data_file.samefile(target): + raise ValidationError( + f"Failed to link data file {data_file.name}. Link " + f"to {data_file.resolve()} already exists. This conflict " + "must be resolved manually." + ) + return + if not data_file.exists(): + data_file.symlink_to(target) + return + raise ValidationError( + f"Failed to link data file {data_file.name}. File already exists. " + f"This conflict must be resolved manually." + ) + + async def _check_configuration(self) -> bool: + if not self.cc_enabled or not self.data_path_valid: + return False + db: MoonrakerDatabase = self.server.lookup_component("database") + cfg_source = cast(FileSourceWrapper, self.config.get_source()) + cfg_source.backup_source() + try: + # write current configuration to backup path + self._update_backup_path() + cfg_bkp_path = self.backup_path.joinpath("config") + os.makedirs(str(cfg_bkp_path), exist_ok=True) + await cfg_source.write_config(cfg_bkp_path) + # Create symbolic links for configured folders + server_cfg = self.config["server"] + + db_cfg = self.config["database"] + # symlink database path first + db_path = db_cfg.get("database_path", None) + default_db = pathlib.Path("~/.moonraker_database").expanduser() + if db_path is None and default_db.exists(): + self._link_data_subfolder( + "database", default_db, exist_ok=True + ) + elif db_path is not None: + self._link_data_subfolder("database", db_path) + cfg_source.remove_option("database", "database_path") + + fm_cfg = self.config["file_manager"] + cfg_path = fm_cfg.get("config_path", None) + if cfg_path is None: + cfg_path = server_cfg.get("config_path", None) + if cfg_path is not None: + self._link_data_subfolder("config", cfg_path) + cfg_source.remove_option("server", "config_path") + cfg_source.remove_option("file_manager", "config_path") + + log_path = fm_cfg.get("log_path", None) + if log_path is None: + log_path = server_cfg.get("log_path", None) + if log_path is not None: + self._link_data_subfolder("logs", log_path) + cfg_source.remove_option("server", "log_path") + cfg_source.remove_option("file_manager", "log_path") + + gc_path: Optional[str] = await db.get_item( + "moonraker", "file_manager.gcode_path", None + ) + if gc_path is not None: + self._link_data_subfolder("gcodes", gc_path) + db.delete_item("moonraker", "file_manager.gcode_path") + + # Link individual files + secrets_path = self.config["secrets"].get("secrets_path", None) + if secrets_path is not None: + secrets_dest = self.data_path.joinpath("moonraker.secrets") + self._link_data_file(secrets_dest, secrets_path) + cfg_source.remove_option("secrets", "secrets_path") + certs_path = self.data_path.joinpath("certs") + if not certs_path.exists(): + certs_path.mkdir() + ssl_cert = server_cfg.get("ssl_certificate_path", None) + if ssl_cert is not None: + cert_dest = certs_path.joinpath("moonraker.cert") + self._link_data_file(cert_dest, ssl_cert) + cfg_source.remove_option("server", "ssl_certificate_path") + ssl_key = server_cfg.get("ssl_key_path", None) + if ssl_key is not None: + key_dest = certs_path.joinpath("moonraker.key") + self._link_data_file(key_dest, ssl_key) + cfg_source.remove_option("server", "ssl_key_path") + + # Remove deprecated debug options + if server_cfg.has_option("enable_debug_logging"): + cfg_source.remove_option("server", "enable_debug_logging") + um_cfg = server_cfg["update_manager"] + if um_cfg.has_option("enable_repo_debug"): + cfg_source.remove_option("update_manager", "enable_repo_debug") + except Exception: + cfg_source.cancel() + raise + finally: + self.cc_enabled = False + return await cfg_source.save() + + def _request_sudo_access(self) -> None: + if self._sudo_requested: + return + self._sudo_requested = True + machine: Machine = self.server.lookup_component("machine") + machine.register_sudo_request( + self._on_password_received, + "Sudo password required to update Moonraker's systemd service." + ) + if not machine.public_ip: + async def wrapper(pub_ip): + if not pub_ip: + return + await self.remove_announcement() + self._announce_sudo_request() + self.server.register_event_handler( + "machine:public_ip_changed", wrapper + ) + self._announce_sudo_request() + + def _announce_sudo_request(self) -> None: + machine: Machine = self.server.lookup_component("machine") + host_info = self.server.get_host_info() + host_addr: str = host_info["address"] + if host_addr.lower() not in ["all", "0.0.0.0", "::"]: + address = host_addr + else: + address = machine.public_ip + if not address: + address = f"{host_info['hostname']}.local" + elif ":" in address: + # ipv6 address + address = f"[{address}]" + app: MoonrakerApp = self.server.lookup_component("application") + scheme = "https" if app.https_enabled() else "http" + host_info = self.server.get_host_info() + port = host_info["port"] + url = f"{scheme}://{address}:{port}/" + ancmp: Announcements = self.server.lookup_component("announcements") + entry = ancmp.add_internal_announcement( + "Sudo Password Required", + "Moonraker requires sudo access to finish updating. " + "Please click on the attached link and follow the " + "instructions.", + url, "high", "machine" + ) + self.announcement_id = entry.get("entry_id", "") + gc_announcement = ( + "!! ATTENTION: Moonraker requires sudo access to complete " + "the update. Go to the following URL and provide your linux " + f"password: {url}" + ) + self.server.send_event("server:gcode_response", gc_announcement) + + async def remove_announcement(self) -> None: + if not self.announcement_id: + return + ancmp: Announcements = self.server.lookup_component("announcements") + # remove stale announcement + try: + await ancmp.remove_announcement(self.announcement_id) + except self.server.error: + pass + self.announcement_id = "" + + async def _on_password_received(self) -> Tuple[str, bool]: + name = "Service" + try: + await self._check_service_file() + name = "Config" + await self._check_configuration() + except asyncio.CancelledError: + raise + except Exception: + logging.exception(f"{name} validation failed") + raise self.server.error( + f"{name} validation failed", 500 + ) from None + await self.remove_announcement() + db: MoonrakerDatabase = self.server.lookup_component("database") + await db.insert_item( + "moonraker", "validate_install.install_version", INSTALL_VERSION + ) + self.validation_enabled = False + return "System update complete.", True + +def load_component(config: ConfigHelper) -> Machine: + return Machine(config)