From d5d0a5173cdc29b88cc9f9c6a3b06987155cc153 Mon Sep 17 00:00:00 2001 From: zehkira <9485872-zehkira@users.noreply.gitlab.com> Date: Tue, 25 Nov 2025 11:50:25 +0100 Subject: [PATCH 1/3] Replace logging with logboth --- source/bin/monophony.py | 55 +++++-- source/data/licenses.json | 6 + source/data/manifest.json | 15 ++ source/monophony/data.py | 4 +- source/monophony/debug.py | 12 +- source/monophony/downloads.py | 64 +++++---- source/monophony/logging.py | 134 ------------------ source/monophony/mpris.py | 7 +- source/monophony/player.py | 73 +++++----- source/monophony/playlists.py | 55 +++---- source/monophony/recents.py | 14 +- source/monophony/recommendations.py | 9 +- source/monophony/settings.py | 12 +- .../monophony/ui/rows/editable_group_row.py | 9 +- .../ui/rows/synchronized_group_row.py | 9 +- source/monophony/ui/windows/main_window.py | 48 ++++--- source/monophony/yt.py | 132 ++++++++--------- 17 files changed, 293 insertions(+), 365 deletions(-) delete mode 100644 source/monophony/logging.py diff --git a/source/bin/monophony.py b/source/bin/monophony.py index f6fbd07..31b76eb 100755 --- a/source/bin/monophony.py +++ b/source/bin/monophony.py @@ -3,6 +3,8 @@ import gettext import os +import pathlib +import platform import sys import threading import traceback @@ -15,18 +17,19 @@ gi.require_versions({ }) -from monophony import NAME, logging +from monophony import NAME, __version__ from monophony.app import Application +import logboth from gi.repository import Gio, GLib -sys.excepthook = lambda exception, value, trace: logging.error( +sys.excepthook = lambda exception, value, trace: logboth.error( __name__, 'Unhandled exception', ''.join(traceback.format_exception(exception, value, trace)) ) -threading.excepthook = lambda args: logging.error( +threading.excepthook = lambda args: logboth.error( f'{__name__} (thread "{args.thread.name}")', 'Unhandled exception in thread', ''.join( @@ -34,43 +37,69 @@ threading.excepthook = lambda args: logging.error( ) ) +wanted_levels_names = os.getenv( + 'MONOPHONY_LOG_LEVELS', 'INFO,WARN,ERRO,SUCC' +).split(',') +levels = [level for level in logboth.config.levels if level.name in wanted_levels_names] +logboth.config.levels = levels +logboth.config.directory /= pathlib.Path(NAME) +logboth.config.file = 'log.txt' +logboth.basic_info() + +os_release = {} +try: + os_release = platform.freedesktop_os_release() +except OSError: + logboth.warning(__name__, 'Could not read OS release file') + +os_info_string = '' +for key in ('PRETTY_NAME', 'NAME', 'ID', 'ID_LIKE', 'VERSION', 'VERSION_ID'): + if value := os_release.get(key): + os_info_string += f'{key}: {value}\n' + container = os.getenv('container', 'unknown') # noqa: SIM112 - Container is lowercase if container != 'flatpak': - logging.warning( + logboth.warning( __name__, f'App was installed from unofficial source. Container type: {container}' ) -logging.info(__name__, 'Loading GResources...') +logboth.info( + __name__, + f'{NAME} {__version__} on {platform.platform(aliased=True)}', + os_info_string.strip('\n') +) + +logboth.info(__name__, 'Loading GResources...') resources_file = 'resources.gresource' for path in os.getenv('XDG_DATA_DIRS', '/usr/share/').split(':'): data_path = f'{path}{NAME}' if path.endswith('/') else f'{path}/{NAME}' - logging.info(__name__, f'Trying to load GResources from "{data_path}"...') + logboth.info(__name__, f'Trying to load GResources from "{data_path}"...') try: resource = Gio.Resource.load(data_path + '/' + resources_file) except GLib.GError: continue Gio.resources_register(resource) - logging.info(__name__, f'Loaded GResources from "{data_path}/{resources_file}"') + logboth.info(__name__, f'Loaded GResources from "{data_path}/{resources_file}"') break else: - logging.error(__name__, 'Failed to load GResources: not found') + logboth.error(__name__, 'Failed to load GResources: not found') sys.exit(1) -logging.info(__name__, 'Installing translation...') +logboth.info(__name__, 'Installing translation...') for path in os.getenv('XDG_DATA_DIRS', '/usr/share/').split(':'): locale_path = f'{path}locale' if path.endswith('/') else f'{path}/locale' - logging.info(__name__, f'Trying to install translation from "{locale_path}"...') + logboth.info(__name__, f'Trying to install translation from "{locale_path}"...') if 'share' in path and os.path.isdir(locale_path): gettext.translation(NAME, locale_path, fallback=True).install() - logging.info( + logboth.info( __name__, f'Installed translation from "{locale_path}/{NAME}/"' ) break else: - logging.error(__name__, 'Failed to install translation: not found') + logboth.error(__name__, 'Failed to install translation: not found') sys.exit(1) Application().run() -logging.info(__name__, 'Exited') +logboth.info(__name__, 'Exited') diff --git a/source/data/licenses.json b/source/data/licenses.json index 4ea6ea6..9b760c3 100644 --- a/source/data/licenses.json +++ b/source/data/licenses.json @@ -71,6 +71,12 @@ "license": "LGPL-2.1-or-later", "text": "" }, + { + "name": "logboth", + "copyright": "Copyright © Zehkira and contributors", + "license": "0BSD", + "text": "" + }, { "name": "Mopidy-MPRIS", "copyright": "", diff --git a/source/data/manifest.json b/source/data/manifest.json index 655f6ba..9657db8 100644 --- a/source/data/manifest.json +++ b/source/data/manifest.json @@ -88,6 +88,21 @@ } ] }, + { + "name": "logboth", + "buildsystem": "simple", + "build-commands": [ + "pip3 install . --exists-action=i --no-index --find-links=\"file://${PWD}\" --prefix=${FLATPAK_DEST} --no-build-isolation" + ], + "sources": [ + { + "type": "git", + "url": "https://gitlab.com/zehkira/logboth.git", + "tag": "v0.1.0", + "commit": "1c637f0ed7134ce54adab1d9bf1b5ed6987cf254" + } + ] + }, { "name": "adwaita-icon-theme", "buildsystem": "meson", diff --git a/source/monophony/data.py b/source/monophony/data.py index 6f6f92b..b33a77a 100644 --- a/source/monophony/data.py +++ b/source/monophony/data.py @@ -2,7 +2,7 @@ import contextlib import datetime from typing import Any -from monophony import logging +import logboth class YTItem: @@ -123,7 +123,7 @@ class TimeString: with contextlib.suppress(ValueError): seconds += int(parts[-3]) * 60 * 60 if len(parts) > TimeString.HOURS_POS + 1: - logging.warning( + logboth.warning( __name__, f'TimeString "{self._string}" has too many parts' ) diff --git a/source/monophony/debug.py b/source/monophony/debug.py index 4f4d7b9..e388716 100644 --- a/source/monophony/debug.py +++ b/source/monophony/debug.py @@ -1,8 +1,7 @@ import gc import os -from monophony import logging - +import logboth from gi.repository import GLib, GObject @@ -19,7 +18,7 @@ def log_memory_status() -> bool: else: other_count += 1 - logging.info( + logboth.info( __name__, f'{gobject_count} GObjects and {other_count} other objects in memory' ) @@ -29,19 +28,18 @@ def log_memory_status() -> bool: # Use with multiple inheriance: class Class(MemoryDebugger, ...) class MemoryDebugger: def __del__(self): - logging.info(__name__, f'Collected {self.__class__.__name__}') + logboth.info(__name__, f'Collected {self.__class__.__name__}') def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) - logging.info(__name__, f'Initialized {self.__class__.__name__}') + logboth.info(__name__, f'Initialized {self.__class__.__name__}') debug_active = os.getenv(_DEBUG_VARIABLE) if debug_active: - logging.warning(__name__, 'Debug mode enabled, expect low performance') + logboth.warning(__name__, 'Debug mode enabled, expect low performance') GLib.timeout_add_seconds(2, log_memory_status) else: del MemoryDebugger.__del__ del MemoryDebugger.__init__ - diff --git a/source/monophony/downloads.py b/source/monophony/downloads.py index 5b01b88..081bea9 100644 --- a/source/monophony/downloads.py +++ b/source/monophony/downloads.py @@ -5,10 +5,11 @@ import os import subprocess import traceback -from monophony import NAME, logging +from monophony import NAME from monophony.asynchronous import Task from monophony.data import Artist, Group, Song +import logboth from gi.repository import GLib @@ -30,7 +31,7 @@ def get_file(song: Song) -> str | None: if files: if len(files) > 1: - logging.warning( + logboth.warning( __name__, f'Multiple song files match id "{song.yt_id}"', '\n'.join(files) ) @@ -52,20 +53,20 @@ def is_downloaded(song: Song) -> bool: class DownloadTask(Task): def _function(self, downloader: '_Downloader', group: Group) -> bool: downloader.lock.lock() - logging.info(__name__, f'Downloading {len(group.songs)} songs...') + logboth.info(__name__, f'Downloading {len(group.songs)} songs...') path = get_directory() needed_ids = [] new_group = Group() for song in group.songs: if not song.yt_id: - logging.error( + logboth.error( __name__, f'Failed to download song "{song.title}" - no id' ) continue if is_downloaded(song) or is_being_downloaded(song): - logging.info( + logboth.info( __name__, f'Skipped download of song "{song.yt_id}" - already taken care of' ) @@ -80,7 +81,7 @@ class DownloadTask(Task): downloader.lock.unlock() if not needed_ids: - logging.info( + logboth.info( __name__, 'Canceled download as there are no songs to download' ) return True @@ -116,17 +117,17 @@ class DownloadTask(Task): downloader.delete_lock_file(yt_id) if return_code != 0: - logging.error(__name__, 'Failed to download songs', ytdlp.stdout.read()) + logboth.error(__name__, 'Failed to download songs', ytdlp.stdout.read()) downloader.lock.unlock() return False new_group.songs = [song for song in new_group.songs if is_downloaded(song)] - logging.info( + logboth.info( __name__, f'Downloaded {len(new_group.songs)}/{len(group.songs)} songs' ) - logging.info(__name__, 'Saving data about newly downloaded songs...') + logboth.info(__name__, 'Saving data about newly downloaded songs...') downloader.write(Group(songs=new_group.songs + downloader.read().songs)) - logging.info(__name__, 'Saved newly downloaded song data') + logboth.info(__name__, 'Saved newly downloaded song data') downloader.lock.unlock() return True @@ -136,15 +137,16 @@ class _Downloader: def __init__(self): self.lock = GLib.Mutex() + def clean_up(self): self.lock.lock() - logging.info(__name__, 'Cleaning up downloads...') + logboth.info(__name__, 'Cleaning up downloads...') downloads_group = self.read() path = get_directory() os.makedirs(path, exist_ok=True) for file in os.listdir(path): if file.endswith(NAME): os.remove(path + file) - logging.info(__name__, f'Removed abandoned temp file "{file}"') + logboth.info(__name__, f'Removed abandoned temp file "{file}"') continue yt_id = 'null' @@ -152,17 +154,17 @@ class _Downloader: yt_id = file.split('.')[-2][-11:] if Song(yt_id=yt_id) not in downloads_group.songs: os.remove(path + file) - logging.warning(__name__, f'Removed unexpected file "{file}"') + logboth.warning(__name__, f'Removed unexpected file "{file}"') - logging.info(__name__, 'Cleaned up downloads') + logboth.info(__name__, 'Cleaned up downloads') self.lock.unlock() def create_lock_file(self, name: str): - logging.info(__name__, f'Creating lock file "{name}.{NAME}"...') + logboth.info(__name__, f'Creating lock file "{name}.{NAME}"...') # Always lock self.lock before calling this to prevent race conditions if self.lock.trylock(): - logging.warning( + logboth.warning( __name__, 'Creating lock file while self.lock is unlocked' ) self.lock.unlock() @@ -170,19 +172,19 @@ class _Downloader: try: open(f'{get_directory()}{name}.{NAME}', 'w').close() except OSError: - logging.error( + logboth.error( __name__, 'Failed to create lock file', traceback.format_exc() ) return - logging.info(__name__, 'Created lock file') + logboth.info(__name__, 'Created lock file') def delete_lock_file(self, name: str): - logging.info(__name__, f'Deleting lock file "{name}.{NAME}"...') + logboth.info(__name__, f'Deleting lock file "{name}.{NAME}"...') # Always lock self.lock before calling this to prevent race conditions if self.lock.trylock(): - logging.warning( + logboth.warning( __name__, 'Deleting lock file while self.lock is unlocked' ) self.lock.unlock() @@ -190,18 +192,18 @@ class _Downloader: try: os.remove(f'{get_directory()}{name}.{NAME}') except (OSError, FileNotFoundError): - logging.error( + logboth.error( __name__, f'Failed to remove lock file "{name}.{NAME}"', traceback.format_exc() ) - logging.info(__name__, 'Deleted lock file') + logboth.info(__name__, 'Deleted lock file') def read(self) -> Group: # Always lock self.lock before calling this to prevent race conditions if self.lock.trylock(): - logging.warning(__name__, 'Reading downloads while self.lock is unlocked') + logboth.warning(__name__, 'Reading downloads while self.lock is unlocked') self.lock.unlock() songs_path = os.getenv( @@ -228,11 +230,11 @@ class _Downloader: return Group() def write(self, group: Group): - logging.info(__name__, f'Writing {len(group.songs)} songs to downloads...') + logboth.info(__name__, f'Writing {len(group.songs)} songs to downloads...') # Always lock self.lock before calling this to prevent race conditions if self.lock.trylock(): - logging.warning(__name__, 'Writing downloads while self.lock is unlocked') + logboth.warning(__name__, 'Writing downloads while self.lock is unlocked') self.lock.unlock() dir_path = os.getenv( @@ -244,7 +246,7 @@ class _Downloader: with open(downloads_path, 'w') as downloads_file: json.dump(group.serialize()['contents'], downloads_file, indent='\t') - logging.info(__name__, 'Done writing to downloads') + logboth.info(__name__, 'Done writing to downloads') def get_downloads(self) -> Group: self.lock.lock() @@ -254,18 +256,18 @@ class _Downloader: def remove(self, song: Song): self.lock.lock() - logging.info(__name__, f'Removing song "{song.yt_id}" from downloads...') + logboth.info(__name__, f'Removing song "{song.yt_id}" from downloads...') # No race conditions here as long as lock files are only created and # deleted while holding the lock if not is_downloaded(song): - logging.error( + logboth.error( __name__, 'Failed remove song from downloads - not downloaded' ) self.lock.unlock() return if is_being_downloaded(song): - logging.error( + logboth.error( __name__, 'Failed remove song from downloads - download in progress' ) self.lock.unlock() @@ -277,12 +279,12 @@ class _Downloader: file = get_file(song) if not file: - logging.error( + logboth.error( __name__, 'Failed remove song from downloads - file not found' ) os.remove(file) - logging.info(__name__, 'Removed song from downloads') + logboth.info(__name__, 'Removed song from downloads') self.lock.unlock() diff --git a/source/monophony/logging.py b/source/monophony/logging.py deleted file mode 100644 index 8b3acdf..0000000 --- a/source/monophony/logging.py +++ /dev/null @@ -1,134 +0,0 @@ -import os -import platform -import sys -import threading -import time -import traceback -from typing import Any - -from monophony import NAME, __version__ - - -_LOG_LEVELS_VARIABLE = 'MONOPHONY_LOG_LEVELS' -_DEFAULT_LOG_LEVELS = 'INFO,WARN,ERRO' -_lock = threading.Lock() - - -class LogLevel: - name = '' - color = '' - - -class InfoLevel(LogLevel): - name = 'INFO' - color = '\033[0m' - - -class WarningLevel(LogLevel): - name = 'WARN' - color = '\033[1;33m' - - -class ErrorLevel(LogLevel): - name = 'ERRO' - color = '\033[1;31m' - - -def get_log() -> str: - if threading.current_thread() is not threading.main_thread(): - warning(__name__, 'Reading log from non-main thread') - - try: - with open(_get_directory() + '/log') as log_file: - return log_file.read() - except OSError: - error(__name__, 'Failed to read log file', traceback.format_exc()) - - return '' - - -# For status updates -def info(source: str, text: Any, details: Any=''): - _log(InfoLevel, source, str(text), str(details)) - - -# For unusual but possibly acceptable things -def warning(source: str, text: Any, details: Any=''): - _log(WarningLevel, source, str(text), str(details)) - - -# For failures of all sorts -def error(source: str, text: Any, details: Any=''): - _log(ErrorLevel, source, str(text), str(details)) - - -def _log(level: type, source: str, text: str, details: str): - if level.name not in log_levels: - return - - _lock.acquire() - - text = text.strip() - details = details.strip() - line = f'{time.strftime("%H:%M")} [{level.name}] {source}: {text}' - sys.stdout.write(f'{level.color}{line}\n') - if details: - sys.stdout.write(details + '\n') - sys.stdout.write('\033[0m') - - log_directory = _get_directory() - os.makedirs(log_directory, exist_ok=True) - try: - with open(f'{log_directory}/log', 'a+') as log: - log.write(line + '\n') - if details: - log.write(details + '\n') - except (OSError, ValueError): - sys.stdout.write( - f'{ErrorLevel.color}[{ErrorLevel.name}] {__name__}: ' - 'Failed to write to log file\033[0m\n' - f'{traceback.format_exc()}\n' - ) - - _lock.release() - - -def _get_directory() -> str: - return os.getenv('XDG_RUNTIME_DIR', '/var/tmp') + '/' + NAME - - -log_levels = os.getenv( - _LOG_LEVELS_VARIABLE, _DEFAULT_LOG_LEVELS -).split(',') - -log_directory = _get_directory() -os.makedirs(log_directory, exist_ok=True) -open(f'{log_directory}/log', 'a+').close() - -max_log_lines = 1000 -with open(f'{log_directory}/log', 'r+') as log_file: - lines = log_file.readlines() - if len(lines) > max_log_lines: - lines = lines[-max_log_lines:] - log_file.seek(0) - log_file.truncate() - log_file.writelines(lines) - -info(__name__, f'Logging initiated. Logs will be written to {log_directory}/log') - -os_release = {} -try: - os_release = platform.freedesktop_os_release() -except OSError: - warning(__name__, 'Could not read OS release file') - -os_info_string = '' -for key in ('PRETTY_NAME', 'NAME', 'ID', 'ID_LIKE', 'VERSION', 'VERSION_ID'): - if value := os_release.get(key): - os_info_string += f'{key}: {value}\n' - -info( - __name__, - f'{NAME} {__version__} on {platform.platform(aliased=True)}', - os_info_string.strip('\n') -) diff --git a/source/monophony/mpris.py b/source/monophony/mpris.py index a92ad84..d67dfbf 100644 --- a/source/monophony/mpris.py +++ b/source/monophony/mpris.py @@ -1,6 +1,7 @@ -from monophony import ID, logging +from monophony import ID from monophony.data import PlaybackMode, PlaybackState +import logboth from mprisify.adapters import MprisAdapter from mprisify.adapters import PlayState as MprisPlayState from mprisify.events import PlayerEventAdapter as MprisPlayerEventAdapter @@ -43,7 +44,7 @@ class EventHandler(MprisAdapter): elif self._player.state == PlaybackState.NONE: playstate = MprisPlayState.STOPPED - logging.info(__name__, f'Reported playstate as "{playstate}"') + logboth.info(__name__, f'Reported playstate as "{playstate}"') return playstate def is_repeating(self) -> bool: @@ -98,7 +99,7 @@ class EventHandler(MprisAdapter): 'xesam:artist': [song.author.name] } - logging.info(__name__, 'Reported metadata for current song', metadata) + logboth.info(__name__, 'Reported metadata for current song', metadata) return metadata diff --git a/source/monophony/player.py b/source/monophony/player.py index 857cce5..1bff3f0 100644 --- a/source/monophony/player.py +++ b/source/monophony/player.py @@ -2,22 +2,23 @@ import copy import random import time -from monophony import DISPLAY_NAME, ID, downloads, logging, recents, settings, yt +from monophony import DISPLAY_NAME, ID, downloads, recents, settings, yt from monophony.asynchronous import Task from monophony.data import Group, PlaybackMode, PlaybackState, Song from monophony.mpris import EventHandler, EventSender, Server +import logboth from gi.repository import GObject, Gst class ReportProgressTask(Task): def _function(self): - logging.info(__name__, 'Progress reporting started') + logboth.info(__name__, 'Progress reporting started') while not self.is_canceled(): self._update_progress() time.sleep(1) - logging.info(__name__, 'Progress reporting stopped') + logboth.info(__name__, 'Progress reporting stopped') class FindRadioSongsTask(Task): @@ -27,28 +28,28 @@ class FindRadioSongsTask(Task): class FindURITask(Task): def _function(self, song: Song, known_uris: dict) -> str | None: - logging.info( + logboth.info( __name__, f'Looking for "{song.yt_id}" song URI locally and online...' ) if uri := known_uris.get(song.yt_id): - logging.info(__name__, 'Found already known song URI') + logboth.info(__name__, 'Found already known song URI') return uri if downloads.is_downloaded(song): song_path = downloads.get_file(song) if song_path: - logging.info(__name__, 'Found local song URI') + logboth.info(__name__, 'Found local song URI') return 'file://' + song_path if self.is_canceled(): - logging.info(__name__, 'Canceled URI lookup') + logboth.info(__name__, 'Canceled URI lookup') return None if uri := yt.get_song_uri(song): - logging.info(__name__, 'Found online song URI') + logboth.info(__name__, 'Found online song URI') return uri - logging.error(__name__, 'Failed to find song URI') + logboth.error(__name__, 'Failed to find song URI') return None @@ -156,27 +157,27 @@ class Player(GObject.Object): self._find_uri_task.start() return - logging.info(__name__, 'Found all song URIs for current queue') + logboth.info(__name__, 'Found all song URIs for current queue') def _on_buffering(self, _bus: Gst.Bus, message: Gst.Message): percentage = message.parse_buffering() self.emit('buffering-changed', percentage / 100.0) if percentage < 100: # noqa: PLR2004 - 100% if not self.buffering: - logging.info(__name__, 'Buffering...') + logboth.info(__name__, 'Buffering...') self._playbin.set_state(Gst.State.PAUSED) self.buffering = True return - logging.info(__name__, 'Done buffering') + logboth.info(__name__, 'Done buffering') self.buffering = False if self.state != PlaybackState.NONE: self._playbin.set_state(Gst.State.PLAYING) self.state = PlaybackState.PLAYING self.emit('state-changed', self.state) if self._start_position > 0: - logging.info(__name__, f'Seeking to {self._start_position}ns') + logboth.info(__name__, f'Seeking to {self._start_position}ns') self._playbin.seek_simple( Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE, @@ -185,12 +186,12 @@ class Player(GObject.Object): self._start_position = 0 return - logging.info( + logboth.info( __name__, 'Ignoring end of buffering as state is already NONE' ) def _on_bus_error(self, _bus: Gst.Bus, message: Gst.Message): - logging.error(__name__, 'Bus error', message.parse_error().gerror.message) + logboth.error(__name__, 'Bus error', message.parse_error().gerror.message) self.pop_uri(self._queue.songs[self._queue_index].yt_id) self.play( self._queue.songs[self._queue_index], @@ -200,16 +201,16 @@ class Player(GObject.Object): def _on_latency(self, _bus: Gst.Bus, _message: Gst.Message): if self._playbin.recalculate_latency(): - logging.info(__name__, 'Recalculated latency') + logboth.info(__name__, 'Recalculated latency') else: - logging.error(__name__, 'Failed to recalculate latency') + logboth.error(__name__, 'Failed to recalculate latency') def _on_radio_songs_found(self, task: FindRadioSongsTask): if task.is_canceled() or self._radio_task is not task: return if not task.result: - logging.warning(__name__, 'No radio songs found') + logboth.warning(__name__, 'No radio songs found') self.play(self._queue.songs[self._queue_index], self._queue) return @@ -220,17 +221,17 @@ class Player(GObject.Object): if success != Gst.StateChangeReturn.SUCCESS: return if state == Gst.State.PLAYING and self.paused: - logging.info(__name__, 'Adjusted state to paused after change') + logboth.info(__name__, 'Adjusted state to paused after change') self._playbin.set_state(Gst.State.PAUSED) def _on_stream_start(self, _bus: Gst.Bus, _message: Gst.Message): if self.state == PlaybackState.LOADING and not self.buffering: - logging.info(__name__, 'Stream started') + logboth.info(__name__, 'Stream started') self._playbin.set_state(Gst.State.PLAYING) self.state = PlaybackState.PLAYING self.emit('state-changed', self.state) if self._start_position: - logging.info(__name__, f'Seeking to {self._start_position}ns') + logboth.info(__name__, f'Seeking to {self._start_position}ns') self._playbin.seek_simple( Gst.Format.TIME, Gst.SeekFlags.FLUSH | Gst.SeekFlags.ACCURATE, @@ -239,7 +240,7 @@ class Player(GObject.Object): self._start_position = 0 def _on_stream_end(self, _bus: Gst.Bus, _message): - logging.info(__name__, 'Stream has ended') + logboth.info(__name__, 'Stream has ended') self.next() def _report_progress(self, _task: ReportProgressTask): @@ -255,17 +256,17 @@ class Player(GObject.Object): self._song_uris.pop(next(iter(self._song_uris.keys()))) self._song_uris[yt_id] = uri - logging.info(__name__, f'Added URI for song "{yt_id}" to known') + logboth.info(__name__, f'Added URI for song "{yt_id}" to known') def _start_playback(self, task: FindURITask, position: int=0): if task.is_canceled() or self._find_uri_task is not task: - logging.info(__name__, 'Ignoring callback from canceled URI lookup task') + logboth.info(__name__, 'Ignoring callback from canceled URI lookup task') return song = self._queue.songs[self._queue_index] uri = task.result if not uri: - logging.error(__name__, f'Failed to find URI for song "{song.yt_id}"') + logboth.error(__name__, f'Failed to find URI for song "{song.yt_id}"') self.play(song, self._queue, position) return @@ -285,7 +286,7 @@ class Player(GObject.Object): # Don't actually start yet - wait for messages on the bus self._playbin.props.uri = uri self._playbin.set_state(Gst.State.PAUSED) - logging.info(__name__, 'Started playback') + logboth.info(__name__, 'Started playback') def add_to_queue(self, group: Group): if self._queue.songs: @@ -314,7 +315,7 @@ class Player(GObject.Object): return self._playbin.props.volume def move_song(self, song: Song, target: Song): - logging.info( + logboth.info( __name__, f'Moving song "{song.yt_id}" to "{target.yt_id}" in queue...' ) current_song = self._queue.songs[self._queue_index] @@ -330,7 +331,7 @@ class Player(GObject.Object): self._queue_index = self._queue.songs.index(current_song) self.emit('queue-changed', self._queue, self._queue_index) - logging.info(__name__, 'Moved song in queue') + logboth.info(__name__, 'Moved song in queue') def next(self, from_user: bool=False): if self.mode == PlaybackMode.RADIO: @@ -374,7 +375,7 @@ class Player(GObject.Object): self.stop() def play(self, song: Song, group: Group, position: int=0): - logging.info( + logboth.info( __name__, f'Playback of song "{song.yt_id}" at {position}ns requested' ) @@ -412,7 +413,7 @@ class Player(GObject.Object): def pop_uri(self, yt_id: str) -> str | None: if yt_id in self._song_uris: - logging.info(__name__, f'Popped known URI for song "{yt_id}"') + logboth.info(__name__, f'Popped known URI for song "{yt_id}"') return self._song_uris.pop(yt_id) return None @@ -442,7 +443,7 @@ class Player(GObject.Object): self._mpris_event_sender.on_seek(value) def set_pause(self, pause: bool): - logging.info(__name__, f'Setting pause to "{pause}"...') + logboth.info(__name__, f'Setting pause to "{pause}"...') self.paused = pause if not self.buffering and self.state != PlaybackState.LOADING: @@ -452,7 +453,7 @@ class Player(GObject.Object): self.emit('pause-changed', pause) self._mpris_event_sender.on_playpause() - logging.info(__name__, f'Set pause to "{pause}"') + logboth.info(__name__, f'Set pause to "{pause}"') def set_volume( self, @@ -478,7 +479,7 @@ class Player(GObject.Object): self.emit('mode-changed', self.mode) def shuffle(self): - logging.info(__name__, 'Shuffling songs...') + logboth.info(__name__, 'Shuffling songs...') back_part = self._queue.songs[:self._queue_index] front_part = self._queue.songs[self._queue_index + 1:] @@ -495,10 +496,10 @@ class Player(GObject.Object): self.emit('queue-changed', self._queue, self._queue_index) break - logging.info(__name__, 'Shuffled songs') + logboth.info(__name__, 'Shuffled songs') def stop(self): - logging.info(__name__, 'Stopping playback...') + logboth.info(__name__, 'Stopping playback...') self._find_uri_task.cancel() self._progress_task.cancel() self._radio_task.cancel() @@ -512,4 +513,4 @@ class Player(GObject.Object): self._queue_index = 0 self.emit('queue-changed', self._queue, self._queue_index) self._mpris_event_sender.emit_all() - logging.info(__name__, 'Stopped playback') + logboth.info(__name__, 'Stopped playback') diff --git a/source/monophony/playlists.py b/source/monophony/playlists.py index 310c076..97b314c 100644 --- a/source/monophony/playlists.py +++ b/source/monophony/playlists.py @@ -2,10 +2,11 @@ import json import os import time -from monophony import NAME, logging, yt +from monophony import NAME, yt from monophony.asynchronous import Task from monophony.data import Artist, Group, Song +import logboth from gi.repository import GLib @@ -24,7 +25,7 @@ def _get_external_file_path() -> str: def add(playlist: Group) -> str: - logging.info(__name__, f'Adding playlist "{playlist.title}"...') + logboth.info(__name__, f'Adding playlist "{playlist.title}"...') new_lists = read() old_title = playlist.title playlist.title = make_unique_name(playlist.title) @@ -39,12 +40,12 @@ def add(playlist: Group) -> str: playlist.songs = unique_songs new_lists.append(playlist) _write(playlists=new_lists) - logging.info(__name__, f'Added playlist "{old_title}" as "{playlist.title}"') + logboth.info(__name__, f'Added playlist "{old_title}" as "{playlist.title}"') return playlist.title def add_external(playlist: Group): - logging.info(__name__, f'Adding external playlist "{playlist.yt_id}"...') + logboth.info(__name__, f'Adding external playlist "{playlist.yt_id}"...') lists = read_external() song_ids = [] @@ -57,45 +58,45 @@ def add_external(playlist: Group): playlist.songs = unique_songs lists.append(playlist) _write(ext_playlists=lists) - logging.info(__name__, 'Added external playlist') + logboth.info(__name__, 'Added external playlist') def rename(name: str, new_name: str) -> str: new_name = make_unique_name(new_name) - logging.info(__name__, f'Renaming playlist "{name}" to "{new_name}"...') + logboth.info(__name__, f'Renaming playlist "{name}" to "{new_name}"...') new_lists = read() for playlist in new_lists: if playlist.title == name: playlist.title = new_name _write(playlists=new_lists) - logging.info(__name__, 'Renamed playlist') + logboth.info(__name__, 'Renamed playlist') return new_name - logging.error(__name__, 'Failed to rename: playlist not found') + logboth.error(__name__, 'Failed to rename: playlist not found') return name def delete(playlist_name: str): - logging.info(__name__, f'Deleting playlist "{playlist_name}"...') + logboth.info(__name__, f'Deleting playlist "{playlist_name}"...') _write( playlists=[playlist for playlist in read() if playlist.title != playlist_name] ) - logging.info(__name__, 'Deleted playlist') + logboth.info(__name__, 'Deleted playlist') def delete_external(playlist_name: str): - logging.info(__name__, f'Deleting external playlist "{playlist_name}"...') + logboth.info(__name__, f'Deleting external playlist "{playlist_name}"...') _write( ext_playlists=[ playlist for playlist in read_external() if playlist.title != playlist_name ] ) - logging.info(__name__, 'Deleted external playlist') + logboth.info(__name__, 'Deleted external playlist') def add_songs(songs: Group, playlist_name: str): - logging.info( + logboth.info( __name__, f'Adding {len(songs.songs)} songs to playlist "{playlist_name}"...' ) new_lists = read() @@ -109,11 +110,11 @@ def add_songs(songs: Group, playlist_name: str): break _write(playlists=new_lists) - logging.info(__name__, 'Added songs to playlist') + logboth.info(__name__, 'Added songs to playlist') def swap_songs(playlist_name: str, i: int, j: int): - logging.info( + logboth.info( __name__, f'Swapping songs #{i} and #{j} in playlist "{playlist_name}"...' ) new_lists = read() @@ -125,11 +126,11 @@ def swap_songs(playlist_name: str, i: int, j: int): break _write(playlists=new_lists) - logging.info(__name__, 'Swapped songs') + logboth.info(__name__, 'Swapped songs') def move_song(playlist_name: str, from_i: int, to_i: int): - logging.info( + logboth.info( __name__, f'Moving song from #{from_i} to #{to_i} in playlist "{playlist_name}"...' ) @@ -142,11 +143,11 @@ def move_song(playlist_name: str, from_i: int, to_i: int): break _write(playlists=new_lists) - logging.info(__name__, 'Moved song') + logboth.info(__name__, 'Moved song') def remove_song(song: Song, playlist_name: str): - logging.info( + logboth.info( __name__, f'Removing song "{song.yt_id}" from playlist "{playlist_name}"...' ) new_lists = read() @@ -156,7 +157,7 @@ def remove_song(song: Song, playlist_name: str): break _write(playlists=new_lists) - logging.info(__name__, 'Removed song') + logboth.info(__name__, 'Removed song') def make_unique_name(name: str) -> str: @@ -176,7 +177,7 @@ def make_unique_name(name: str) -> str: def _write(playlists: list[Group] | None=None, ext_playlists: list[Group] | None=None): lock.lock() - logging.info( + logboth.info( __name__, f'Writing {len(playlists) if playlists else "no"} playlists and ' f'{len(ext_playlists) if ext_playlists else "no"} external playlists...' @@ -200,7 +201,7 @@ def _write(playlists: list[Group] | None=None, ext_playlists: list[Group] | None indent='\t' ) - logging.info(__name__, 'Done writing playlist and external playlists') + logboth.info(__name__, 'Done writing playlist and external playlists') lock.unlock() @@ -265,7 +266,7 @@ class ImportTask(Task): def _function( self, name: str, url: str, local: bool, overwrite: bool=False ) -> bool: - logging.info(__name__, f'Importing playlist "{url}"...') + logboth.info(__name__, f'Importing playlist "{url}"...') new_lists = [playlist for playlist in read() if playlist.title != name] new_ext_lists = [ playlist for playlist in read_external() if playlist.title != name @@ -274,7 +275,7 @@ class ImportTask(Task): if not ( playlist := yt.get_album_or_playlist(url.split('list=')[-1].split('&')[0]) ): - logging.error(__name__, 'Failed to import playlist') + logboth.error(__name__, 'Failed to import playlist') return False playlist.title = make_unique_name( @@ -288,13 +289,13 @@ class ImportTask(Task): new_ext_lists.append(playlist) _write(ext_playlists=new_ext_lists) - logging.info(__name__, 'Imported playlist') + logboth.info(__name__, 'Imported playlist') return True class UpdateExternalTask(Task): def _function(self): - logging.info(__name__, 'Updating external playlists...') + logboth.info(__name__, 'Updating external playlists...') tasks = [] external = read_external() for i, playlist in enumerate(external): @@ -313,7 +314,7 @@ class UpdateExternalTask(Task): self._update_progress((i / len(tasks)) / 2 + 0.5) time.sleep(0.5) - logging.info(__name__, 'Updated external playlists') + logboth.info(__name__, 'Updated external playlists') # Signleton diff --git a/source/monophony/recents.py b/source/monophony/recents.py index b16b721..116a772 100644 --- a/source/monophony/recents.py +++ b/source/monophony/recents.py @@ -2,9 +2,11 @@ import json import os import traceback -from monophony import NAME, logging +from monophony import NAME from monophony.data import Artist, Group, Song +import logboth + MAX_SONGS = 15 @@ -23,24 +25,24 @@ def _write(group: Group): if len(group.songs) > MAX_SONGS: group.songs = group.songs[:MAX_SONGS] - logging.info(__name__, f'Writing {len(group.songs)} songs to recents...') + logboth.info(__name__, f'Writing {len(group.songs)} songs to recents...') os.makedirs(_get_directory(), exist_ok=True) try: with open(_get_file_path(), 'w') as recents_file: json.dump(group.serialize()['contents'], recents_file, indent='\t') except OSError: - logging.error(__name__, 'Failed to write to recents', traceback.format_exc()) + logboth.error(__name__, 'Failed to write to recents', traceback.format_exc()) return - logging.info(__name__, 'Done writing to recents') + logboth.info(__name__, 'Done writing to recents') def add(song: Song): - logging.info(__name__, f'Adding song "{song.yt_id}" to recents...') + logboth.info(__name__, f'Adding song "{song.yt_id}" to recents...') group = read() group.songs = [song, *group.songs] _write(group) - logging.info(__name__, 'Added song to recents') + logboth.info(__name__, 'Added song to recents') def clear(): diff --git a/source/monophony/recommendations.py b/source/monophony/recommendations.py index b63146a..1a00a8b 100644 --- a/source/monophony/recommendations.py +++ b/source/monophony/recommendations.py @@ -1,9 +1,11 @@ import json import os -from monophony import NAME, logging +from monophony import NAME from monophony.data import Artist, Group, Song +import logboth + def _get_directory() -> str: return os.getenv( @@ -16,7 +18,7 @@ def _get_file_path() -> str: def write(recommendations: list[Group]): - logging.info(__name__, f'Writing {len(recommendations)} recommendations...') + logboth.info(__name__, f'Writing {len(recommendations)} recommendations...') recommendations_path = _get_file_path() os.makedirs(_get_directory(), exist_ok=True) @@ -26,7 +28,7 @@ def write(recommendations: list[Group]): with open(recommendations_path, 'w') as recommendations_file: json.dump(serialized_recommendations, recommendations_file, indent='\t') - logging.info(__name__, 'Done writing recommendations') + logboth.info(__name__, 'Done writing recommendations') def read() -> list[Group]: @@ -51,4 +53,3 @@ def read() -> list[Group]: ] except (OSError, json.decoder.JSONDecodeError): return [] - diff --git a/source/monophony/settings.py b/source/monophony/settings.py index ab7e26d..e2f014d 100644 --- a/source/monophony/settings.py +++ b/source/monophony/settings.py @@ -2,18 +2,20 @@ import json import os from typing import Any -from monophony import NAME, logging +from monophony import NAME + +import logboth def save(values: dict): - logging.info(__name__, f'Saving settings "{values}"...') + logboth.info(__name__, f'Saving settings "{values}"...') settings = _read() for key, value in values.items(): settings[key] = value _write(settings) - logging.info(__name__, 'Saved settings') + logboth.info(__name__, 'Saved settings') def load(key: str, default: Any=None) -> Any: @@ -21,7 +23,7 @@ def load(key: str, default: Any=None) -> Any: def _write(settings: dict): - logging.info(__name__, 'Writing settings...') + logboth.info(__name__, 'Writing settings...') directory = os.getenv( 'XDG_CONFIG_HOME', os.path.expanduser('~/.config') ) + '/' + NAME @@ -31,7 +33,7 @@ def _write(settings: dict): with open(settings_path, 'w') as settings_file: json.dump(settings, settings_file, indent='\t') - logging.info(__name__, 'Done writing settings') + logboth.info(__name__, 'Done writing settings') def _read() -> dict: diff --git a/source/monophony/ui/rows/editable_group_row.py b/source/monophony/ui/rows/editable_group_row.py index 484faf1..ecfc017 100644 --- a/source/monophony/ui/rows/editable_group_row.py +++ b/source/monophony/ui/rows/editable_group_row.py @@ -1,12 +1,13 @@ import weakref -from monophony import logging, playlists +from monophony import playlists from monophony.data import Group, Song from monophony.ui.popovers.editable_group_row_popover import EditableGroupRowPopover from monophony.ui.rows.editable_song_row import EditableSongRow from monophony.ui.rows.group_row import GroupRow from monophony.ui.windows.rename_window import RenameWindow +import logboth from gi.repository import GLib, GObject, Gtk @@ -70,7 +71,7 @@ class EditableGroupRow(GroupRow): ) def update_contents(self): - logging.info( + logboth.info( __name__, f'Updating row contents for playlist "{self.group.title}"...' ) @@ -79,7 +80,7 @@ class EditableGroupRow(GroupRow): self.group = playlist break else: - logging.error( + logboth.error( __name__, 'Failed to update row contents - ' f'playlist "{self.group.title}" does not exist' @@ -87,7 +88,7 @@ class EditableGroupRow(GroupRow): return super().update_contents() - logging.info(__name__, 'Updated row contents') + logboth.info(__name__, 'Updated row contents') def _on_move_song(self, from_song: Song, to_song: Song): i = self.group.songs.index(from_song) diff --git a/source/monophony/ui/rows/synchronized_group_row.py b/source/monophony/ui/rows/synchronized_group_row.py index 7566a20..c1e9b5a 100644 --- a/source/monophony/ui/rows/synchronized_group_row.py +++ b/source/monophony/ui/rows/synchronized_group_row.py @@ -1,6 +1,6 @@ import weakref -from monophony import logging, playlists +from monophony import playlists from monophony.data import Group from monophony.ui.popovers.synchronized_group_row_popover import ( SynchronizedGroupRowPopover, @@ -8,6 +8,7 @@ from monophony.ui.popovers.synchronized_group_row_popover import ( from monophony.ui.rows.group_row import GroupRow from monophony.ui.rows.song_row import SongRow +import logboth from gi.repository import GObject, Gtk @@ -52,7 +53,7 @@ class SynchronizedGroupRow(GroupRow): button.set_popover(self._popover) def update_contents(self): - logging.info( + logboth.info( __name__, f'Updating row contents for external playlist "{self.group.title}"...' ) @@ -62,7 +63,7 @@ class SynchronizedGroupRow(GroupRow): self.group = playlist break else: - logging.error( + logboth.error( __name__, 'Failed to update row contents - ' f'external playlist "{self.group.title}" does not exist' @@ -70,4 +71,4 @@ class SynchronizedGroupRow(GroupRow): return super().update_contents() - logging.info(__name__, 'Updated row contents') + logboth.info(__name__, 'Updated row contents') diff --git a/source/monophony/ui/windows/main_window.py b/source/monophony/ui/windows/main_window.py index 70abbde..dbb291a 100644 --- a/source/monophony/ui/windows/main_window.py +++ b/source/monophony/ui/windows/main_window.py @@ -12,7 +12,6 @@ from monophony import ( NAME, __version__, downloads, - logging, recommendations, settings, ) @@ -33,6 +32,7 @@ from monophony.ui.windows.import_window import ImportWindow from monophony.ui.windows.message_window import MessageWindow from monophony.yt import GetArtistTask, GetRecommendationsTask, SearchTask +import logboth from gi.repository import Adw, Gio, GLib, GObject, Gtk @@ -74,6 +74,8 @@ class MainWindow(Adw.ApplicationWindow): super().__init__(**kwargs) self._downloader = downloads.downloader + self._downloader.clean_up() + self._inhibit_suspend_cookie = 0 self._last_search_query = None self._last_artist = None @@ -435,9 +437,9 @@ class MainWindow(Adw.ApplicationWindow): self._player.stop() def _on_close(self) -> bool: - logging.info(__name__, 'Close requested') + logboth.info(__name__, 'Close requested') if self._player.get_queue().songs: - logging.info(__name__, 'Still playing - hiding instead of closing') + logboth.info(__name__, 'Still playing - hiding instead of closing') self.props.visible = False return True @@ -523,14 +525,14 @@ class MainWindow(Adw.ApplicationWindow): def _on_loading_progress(self, task: Task, progress: float): if task.is_canceled() or task is not self._current_browsing_task: - logging.info( + logboth.info( __name__, f'Ignoring progress update "{progress}" from canceled task' ) return page = self._navigation_view.props.visible_page if not isinstance(page, LoadingPage): - logging.info( + logboth.info( __name__, f'Ignoring progress update "{progress}" as loading is done' ) return @@ -598,7 +600,7 @@ class MainWindow(Adw.ApplicationWindow): ) self._navigation_view.push(loading_page) - logging.info(__name__, f'Searching for "{query}" with filter "{filter_}"...') + logboth.info(__name__, f'Searching for "{query}" with filter "{filter_}"...') self._current_browsing_task.cancel() self._current_browsing_task = SearchTask( @@ -613,7 +615,7 @@ class MainWindow(Adw.ApplicationWindow): def _on_search_finished(self, task: SearchTask): if task.is_canceled() or self._current_browsing_task is not task: - logging.info(__name__, 'Ignoring callback from canceled search task') + logboth.info(__name__, 'Ignoring callback from canceled search task') return filter_ = task.extra_data @@ -623,21 +625,21 @@ class MainWindow(Adw.ApplicationWindow): page = None if results is None: - logging.error(__name__, 'Failed to search') + logboth.error(__name__, 'Failed to search') page = StatusPage( _('Failed to Search'), _('Check your internet connection and try again'), 'dialog-error-symbolic' ) elif results == []: - logging.warning(__name__, 'Done searching, no results') + logboth.warning(__name__, 'Done searching, no results') page = StatusPage( _('No Results'), _('Try searching for something else'), 'dialog-information-symbolic' ) else: - logging.info(__name__, 'Done searching') + logboth.info(__name__, 'Done searching') page = ResultsPage(results, filter_) page.connect( 'play', @@ -714,7 +716,7 @@ class MainWindow(Adw.ApplicationWindow): about_dialog = Adw.AboutDialog.new_from_appdata( GRESOURCES_PATH + '/metainfo.xml', __version__ ) - about_dialog.props.debug_info = logging.get_log() + about_dialog.props.debug_info = logboth.read() about_dialog.props.debug_info_filename = 'log.txt' about_dialog.props.translator_credits = _('translator-credits') about_dialog.props.copyright = 'Copyright © Zehkira and contributors' @@ -744,26 +746,26 @@ class MainWindow(Adw.ApplicationWindow): } licenses_data = [] - logging.info(__name__, 'Loading licenses...') + logboth.info(__name__, 'Loading licenses...') for path in os.getenv('XDG_DATA_DIRS', '/usr/share/').split(':'): file_path = f'{path}{"" if path.endswith("/") else "/"}{NAME}/licenses.json' - logging.info(__name__, f'Trying to load licenses from "{file_path}"...') + logboth.info(__name__, f'Trying to load licenses from "{file_path}"...') try: with open(file_path) as licenses_file: licenses_data = json.load(licenses_file) - logging.info( + logboth.info( __name__, f'Loaded licenses from "{file_path}"' ) break except OSError: continue except json.decoder.JSONDecodeError: - logging.error( + logboth.error( __name__, 'Failed to load licenses', traceback.format_exc() ) break else: - logging.error(__name__, 'Failed to load licenses file: not found') + logboth.error(__name__, 'Failed to load licenses file: not found') for data in licenses_data: about_dialog.add_legal_section( @@ -787,7 +789,7 @@ class MainWindow(Adw.ApplicationWindow): self._toolbar_view.props.reveal_bottom_bars = False self._uninhibit_suspend() if not self.props.visible: - logging.info(__name__, 'Playback ended while window hidden') + logboth.info(__name__, 'Playback ended while window hidden') self.close() return @@ -817,7 +819,7 @@ class MainWindow(Adw.ApplicationWindow): ) self._navigation_view.push(loading_page) - logging.info(__name__, f'Showing artist "{artist.yt_id}"...') + logboth.info(__name__, f'Showing artist "{artist.yt_id}"...') self._current_browsing_task.cancel() self._current_browsing_task = GetArtistTask( progress_callback=self._on_loading_progress, @@ -831,7 +833,7 @@ class MainWindow(Adw.ApplicationWindow): def _on_view_artist_finished(self, task: GetArtistTask): if task.is_canceled() or self._current_browsing_task is not task: - logging.info(__name__, 'Ignoring callback from canceled view artist task') + logboth.info(__name__, 'Ignoring callback from canceled view artist task') return results = task.result @@ -841,21 +843,21 @@ class MainWindow(Adw.ApplicationWindow): page = None if results is None: - logging.error(__name__, 'Failed to load artist page') + logboth.error(__name__, 'Failed to load artist page') page = StatusPage( _('Failed to Load Artist Page'), _('Check your internet connection and try again'), 'dialog-error-symbolic' ) elif results == []: - logging.warning(__name__, 'Loaded empty artist page') + logboth.warning(__name__, 'Loaded empty artist page') page = StatusPage( _('Empty Artist Page'), _('No content found from this artist'), 'dialog-information-symbolic' ) else: - logging.info(__name__, 'Loaded artist page') + logboth.info(__name__, 'Loaded artist page') page = ArtistPage(results, filter_) page.connect( 'play', @@ -932,5 +934,5 @@ class MainWindow(Adw.ApplicationWindow): self._player.set_volume(volume, notify_frontend=False) def present(self): - logging.info(__name__, 'Presenting window') + logboth.info(__name__, 'Presenting window') super().present() diff --git a/source/monophony/yt.py b/source/monophony/yt.py index ccd306f..12640b2 100644 --- a/source/monophony/yt.py +++ b/source/monophony/yt.py @@ -3,10 +3,10 @@ import subprocess import time import traceback -from monophony import logging from monophony.asynchronous import Task from monophony.data import Artist, Group, Song, TimeString, YTItem +import logboth import requests import ytmusicapi @@ -39,7 +39,7 @@ def _get_artist_name(name_data: list[dict] | str) -> str: if isinstance(name_data, str): return name_data if not isinstance(name_data, list): - logging.warning(__name__, 'Got unusual artist name data', name_data or None) + logboth.warning(__name__, 'Got unusual artist name data', name_data or None) return '' return ', '.join( @@ -51,7 +51,7 @@ def _get_artist_id(artists: list[dict] | str) -> str: if isinstance(artists, str): return '' if not isinstance(artists, list): - logging.warning(__name__, 'Got unusual artist data', artists or None) + logboth.warning(__name__, 'Got unusual artist data', artists or None) return '' a_id = '' @@ -68,13 +68,13 @@ def _parse_single_result(yt: ytmusicapi.YTMusic, data: dict) -> SearchResult | N type_ = data.get('resultType', '') if type_ in ('podcast', 'episode'): - logging.info(__name__, 'Discarded podcast result') + logboth.info(__name__, 'Discarded podcast result') return None try: result = SearchResult(type_, category == 'Top result') except KeyError: - logging.error( + logboth.error( __name__, f'Failed to parse result of unexpected type "{type_}"', data ) return None @@ -122,7 +122,7 @@ def _parse_single_result(yt: ytmusicapi.YTMusic, data: dict) -> SearchResult | N result_name = ( result.item.name if isinstance(result.item, Artist) else result.item.title ) - logging.warning( + logboth.warning( __name__, f'Discarded id-less result "{result_name}" of type "{result.type}"' ) @@ -139,7 +139,7 @@ def _parse_single_result(yt: ytmusicapi.YTMusic, data: dict) -> SearchResult | N ytmusicapi.exceptions.YTMusicUserError, # Invalid ID requests.exceptions.ConnectionError ): - logging.error( + logboth.error( __name__, 'Failed to parse a result', traceback.format_exc() ) return None @@ -157,7 +157,7 @@ def _parse_single_result(yt: ytmusicapi.YTMusic, data: dict) -> SearchResult | N def get_song_uri(song: Song) -> str | None: - logging.info(__name__, f'Getting URI for song "{song.yt_id}"...') + logboth.info(__name__, f'Getting URI for song "{song.yt_id}"...') out, err = subprocess.Popen( [ 'yt-dlp', @@ -172,15 +172,15 @@ def get_song_uri(song: Song) -> str | None: stderr=subprocess.PIPE, ).communicate() if err: - logging.error(__name__, 'Failed to get song URI', err) + logboth.error(__name__, 'Failed to get song URI', err) return None - logging.info(__name__, 'Got song URI') + logboth.info(__name__, 'Got song URI') return out.split('\n')[0] def get_similar_songs(song: Song, ignore: Group | None=None) -> Group | None: - logging.info( + logboth.info( __name__, f'Getting similar song to "{song.yt_id}" ignoring ' f'{len(ignore.songs) if ignore else 0} songs...' @@ -191,7 +191,7 @@ def get_similar_songs(song: Song, ignore: Group | None=None) -> Group | None: try: data = yt.get_watch_playlist(song.yt_id, radio=True)['tracks'] except (*YTMUSICAPI_PARSING_EXCEPTIONS, requests.exceptions.ConnectionError): - logging.error( + logboth.error( __name__, 'Failed to get similar song', traceback.format_exc() ) return None @@ -211,36 +211,36 @@ def get_similar_songs(song: Song, ignore: Group | None=None) -> Group | None: available_songs.append(song) if available_songs: - logging.info(__name__, f'Got {len(available_songs)} similar songs') + logboth.info(__name__, f'Got {len(available_songs)} similar songs') return Group(songs=available_songs) - logging.error(__name__, 'Failed to get similar song - no songs available') + logboth.error(__name__, 'Failed to get similar song - no songs available') return Group() def get_song(id_: str) -> Song | None: - logging.info(__name__, f'Getting song "{id_}"...') + logboth.info(__name__, f'Getting song "{id_}"...') yt = ytmusicapi.YTMusic() try: result = yt.get_song(id_)['videoDetails'] except (*YTMUSICAPI_PARSING_EXCEPTIONS, requests.exceptions.ConnectionError): - logging.error( + logboth.error( __name__, 'Failed to get song', traceback.format_exc() ) return None result['resultType'] = 'song' if parsed := _parse_single_result(yt, result): - logging.info(__name__, 'Got song') + logboth.info(__name__, 'Got song') return parsed.item - logging.error(__name__, 'Failed to get song') + logboth.error(__name__, 'Failed to get song') return None def get_album_or_playlist(yt_id: str) -> Group | None: - logging.info(__name__, f'Getting album/playlist "{yt_id}"...') + logboth.info(__name__, f'Getting album/playlist "{yt_id}"...') if result := _parse_single_result( ytmusicapi.YTMusic(), @@ -249,10 +249,10 @@ def get_album_or_playlist(yt_id: str) -> Group | None: 'playlistId': yt_id } ): - logging.info(__name__, 'Got album/playlist') + logboth.info(__name__, 'Got album/playlist') return result.item - logging.error(__name__, 'Failed to get album/playlist') + logboth.error(__name__, 'Failed to get album/playlist') return None @@ -262,14 +262,14 @@ class ParseResultsTask(Task): ) -> list[SearchResult] | None: count_per_type = {} - logging.info( + logboth.info( __name__, f'Parsing {len(data)} results with limit of {limit} per type...' ) results = [] got_top_result = False for i, item in enumerate(data): if self.is_canceled(): - logging.info(__name__, 'Parsing canceled') + logboth.info(__name__, 'Parsing canceled') return None with contextlib.suppress(KeyError): @@ -282,17 +282,17 @@ class ParseResultsTask(Task): if parsed: if parsed.top: if got_top_result: - logging.warning(__name__, 'Multiple top results') + logboth.warning(__name__, 'Multiple top results') parsed.top = False else: - logging.info(__name__, f'Top result type is "{parsed.type}"') + logboth.info(__name__, f'Top result type is "{parsed.type}"') got_top_result = True count_per_type[parsed.type] = count_per_type.get(parsed.type, 0) + 1 results.append(parsed) # Internal results (in albums, playlists) not counted - logging.info(__name__, f'Done parsing results, kept {len(results)}/{len(data)}') + logboth.info(__name__, f'Done parsing results, kept {len(results)}/{len(data)}') return results @@ -304,24 +304,24 @@ class GetArtistTask(Task): def _function( self, browse_id: str, filter_: str, limit: int | None=None ) -> list[SearchResult] | None: - logging.info( + logboth.info( __name__, f'Getting artist "{browse_id}" with filter "{filter_}" and limit of ' f'{limit} per type...' ) yt = ytmusicapi.YTMusic() - logging.info(__name__, 'Fetching artist...') + logboth.info(__name__, 'Fetching artist...') try: try: data = yt.get_artist(browse_id) - logging.info(__name__, 'Fetched artist') + logboth.info(__name__, 'Fetched artist') except YTMUSICAPI_PARSING_EXCEPTIONS: - logging.info(__name__, 'No such artist, fetching as user instead...') + logboth.info(__name__, 'No such artist, fetching as user instead...') data = yt.get_user(browse_id) - logging.info(__name__, 'Fetched artist as user') + logboth.info(__name__, 'Fetched artist as user') except (*YTMUSICAPI_PARSING_EXCEPTIONS, requests.exceptions.ConnectionError): - logging.error( + logboth.error( __name__, 'Failed to get artist - could not fetch', traceback.format_exc() @@ -329,7 +329,7 @@ class GetArtistTask(Task): return None if self.is_canceled(): - logging.info(__name__, 'Canceled getting artist') + logboth.info(__name__, 'Canceled getting artist') return None self._update_progress(0.1) @@ -340,42 +340,42 @@ class GetArtistTask(Task): continue if not (group := data.get(type_)): - logging.info(__name__, f'Artist has no {type_} list') + logboth.info(__name__, f'Artist has no {type_} list') continue tracks = [] - logging.info( + logboth.info( __name__, f'Trying to get {type_} from artist with get_playlist...' ) try: try: tracks = yt.get_playlist(group.get('browseId', ''))['tracks'] except YTMUSICAPI_PARSING_EXCEPTIONS: - logging.info( + logboth.info( __name__, f'Got no {type_}, trying with get_user_videos instead...' ) # Does not raise YTMUSICAPI_PARSING_EXCEPTIONS, ever tracks = yt.get_user_videos(browse_id, group.get('params', '')) except requests.exceptions.ConnectionError: - logging.error(__name__, 'Failed to get artist', traceback.format_exc()) + logboth.error(__name__, 'Failed to get artist', traceback.format_exc()) return None if not tracks: - logging.info( + logboth.info( __name__, f'Got no {type_}, trying from "results" list instead...' ) if not (tracks := group.get('results')): - logging.info(__name__, f'Got no {type_} from artist') + logboth.info(__name__, f'Got no {type_} from artist') continue - logging.info(__name__, f'Got {len(tracks)} {type_} from artist') + logboth.info(__name__, f'Got {len(tracks)} {type_} from artist') for track in tracks: track['resultType'] = type_[:-1] to_parse.append(track) if self.is_canceled(): - logging.info(__name__, 'Canceled getting artist') + logboth.info(__name__, 'Canceled getting artist') return None self._update_progress(0.2) @@ -384,11 +384,11 @@ class GetArtistTask(Task): continue if not (group := data.get(type_)): - logging.info(__name__, f'Artist has no {type_} list') + logboth.info(__name__, f'Artist has no {type_} list') continue lists = [] - logging.info( + logboth.info( __name__, f'Trying to get {type_} from artist with get_artist_albums...' ) try: @@ -397,31 +397,31 @@ class GetArtistTask(Task): group.get('browseId', ''), group.get('params', '') ) except YTMUSICAPI_PARSING_EXCEPTIONS: - logging.info( + logboth.info( __name__, f'Got no {type_}, trying with get_user_playlists instead...' ) # Does not raise YTMUSICAPI_PARSING_EXCEPTIONS, ever lists = yt.get_user_playlists(browse_id, group.get('params', '')) except requests.exceptions.ConnectionError: - logging.error(__name__, 'Failed to get artist', traceback.format_exc()) + logboth.error(__name__, 'Failed to get artist', traceback.format_exc()) return None if not lists: - logging.info( + logboth.info( __name__, f'Got no {type_}, trying from "results" list instead...' ) if not (lists := group.get('results')): - logging.info(__name__, f'Got no {type_} from artist') + logboth.info(__name__, f'Got no {type_} from artist') continue - logging.info(__name__, f'Got {len(lists)} {type_} from artist') + logboth.info(__name__, f'Got {len(lists)} {type_} from artist') for list_ in lists: list_['resultType'] = type_[:-1] to_parse.append(list_) if self.is_canceled(): - logging.info(__name__, 'Canceled getting artist') + logboth.info(__name__, 'Canceled getting artist') return None self._update_progress(0.5) @@ -434,7 +434,7 @@ class GetArtistTask(Task): time.sleep(0.1) if self.is_canceled(): parse_task.cancel() - logging.info(__name__, 'Canceled getting artist') + logboth.info(__name__, 'Canceled getting artist') return None if results := parse_task.result: @@ -443,22 +443,22 @@ class GetArtistTask(Task): result.item.author.name = data.get('name', '') time.sleep(0.1) - logging.info(__name__, 'Got artist') + logboth.info(__name__, 'Got artist') return results - logging.error(__name__, 'Failed to get artist') + logboth.error(__name__, 'Failed to get artist') return None class GetRecommendationsTask(Task): def _function(self) -> list[Group] | None: - logging.info(__name__, 'Getting recommendations...') + logboth.info(__name__, 'Getting recommendations...') yt = ytmusicapi.YTMusic() try: data = yt.get_home() except (*YTMUSICAPI_PARSING_EXCEPTIONS, requests.exceptions.ConnectionError): - logging.error( + logboth.error( __name__, 'Failed to get recommendations', traceback.format_exc() ) return None @@ -468,7 +468,7 @@ class GetRecommendationsTask(Task): playlist = Group(title=grouping.get('title', '')) for item in grouping.get('contents', []): if not isinstance(item, dict): - logging.warning(__name__, 'Unexpected recommendation item', item) + logboth.warning(__name__, 'Unexpected recommendation item', item) continue if 'videoId' not in item: @@ -482,7 +482,7 @@ class GetRecommendationsTask(Task): recommendations.append(playlist) self._update_progress(i / len(data)) - logging.info(__name__, f'Got {len(recommendations)} groups of recommendations') + logboth.info(__name__, f'Got {len(recommendations)} groups of recommendations') return recommendations @@ -494,7 +494,7 @@ class SearchTask(Task): def _function( self, query: str, filter_: str='', limit: int | None=None ) -> list[SearchResult] | None: - logging.info(__name__, f'Searching for "{query}" with filter "{filter_}"...') + logboth.info(__name__, f'Searching for "{query}" with filter "{filter_}"...') yt = ytmusicapi.YTMusic() self._update_progress(0.1) @@ -502,18 +502,18 @@ class SearchTask(Task): if '?v=' in query and '/' in query: song = get_song(query.split('?v=')[-1].split('&')[0]) if song: - logging.info(__name__, 'Done searching - got song from URL') + logboth.info(__name__, 'Done searching - got song from URL') return [SearchResult('song', True, song)] - logging.error( + logboth.error( __name__, 'Failed to search - failed to get song from URL' ) return None if 'youtu.be/' in query: song = get_song(query.split('youtu.be/')[-1].split('?')[0]) if song: - logging.info(__name__, 'Done searching - got song from URL') + logboth.info(__name__, 'Done searching - got song from URL') return [SearchResult('song', True, song)] - logging.error( + logboth.error( __name__, 'Failed to search - failed to get song from URL' ) return None @@ -524,11 +524,11 @@ class SearchTask(Task): else yt.search(query) ) except (*YTMUSICAPI_PARSING_EXCEPTIONS, requests.exceptions.ConnectionError): - logging.error(__name__, 'Failed to search', traceback.format_exc()) + logboth.error(__name__, 'Failed to search', traceback.format_exc()) return None if self.is_canceled(): - logging.info(__name__, 'Canceled search') + logboth.info(__name__, 'Canceled search') return None self._update_progress(0.5) @@ -541,13 +541,13 @@ class SearchTask(Task): time.sleep(0.1) if self.is_canceled(): parse_task.cancel() - logging.info(__name__, 'Canceled search') + logboth.info(__name__, 'Canceled search') return None if parse_task.result is not None: time.sleep(0.1) - logging.info(__name__, 'Done searching') + logboth.info(__name__, 'Done searching') return parse_task.result - logging.error(__name__, 'Failed to search') + logboth.error(__name__, 'Failed to search') return None -- GitLab From 73ff6f0679cab58208df232a251f263863e3c918 Mon Sep 17 00:00:00 2001 From: zehkira <9485872-zehkira@users.noreply.gitlab.com> Date: Tue, 25 Nov 2025 11:58:12 +0100 Subject: [PATCH 2/3] Move logboth init/config to __init__.py --- source/bin/monophony.py | 9 --------- source/monophony/__init__.py | 15 +++++++++++++++ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/source/bin/monophony.py b/source/bin/monophony.py index 31b76eb..6778c0b 100755 --- a/source/bin/monophony.py +++ b/source/bin/monophony.py @@ -37,15 +37,6 @@ threading.excepthook = lambda args: logboth.error( ) ) -wanted_levels_names = os.getenv( - 'MONOPHONY_LOG_LEVELS', 'INFO,WARN,ERRO,SUCC' -).split(',') -levels = [level for level in logboth.config.levels if level.name in wanted_levels_names] -logboth.config.levels = levels -logboth.config.directory /= pathlib.Path(NAME) -logboth.config.file = 'log.txt' -logboth.basic_info() - os_release = {} try: os_release = platform.freedesktop_os_release() diff --git a/source/monophony/__init__.py b/source/monophony/__init__.py index 5e8e4cd..b085167 100644 --- a/source/monophony/__init__.py +++ b/source/monophony/__init__.py @@ -1,6 +1,21 @@ +import os +import pathlib + +import logboth + + __version__ = '4.1.2' ID = 'io.gitlab.zehkira.Monophony' # Full ID per Freedesktop standards NAME = 'monophony' # Use for app executable, directories and so on DISPLAY_NAME = 'Monophony' # For window titles and such - do not use in logic GRESOURCES_PATH = '/io/gitlab/zehkira/Monophony' MIN_WIDTH = 360 # Same as in metainfo.xml + +wanted_levels_names = os.getenv( + 'MONOPHONY_LOG_LEVELS', 'INFO,WARN,ERRO,SUCC' +).split(',') +levels = [level for level in logboth.config.levels if level.name in wanted_levels_names] +logboth.config.levels = levels +logboth.config.directory /= pathlib.Path(NAME) +logboth.config.file = 'log.txt' +logboth.basic_info() -- GitLab From 213f7ad30b7722672c93e1f9dd84249a3c940c1a Mon Sep 17 00:00:00 2001 From: zehkira <9485872-zehkira@users.noreply.gitlab.com> Date: Tue, 25 Nov 2025 11:59:52 +0100 Subject: [PATCH 3/3] Remove unused pathlib --- source/bin/monophony.py | 1 - 1 file changed, 1 deletion(-) diff --git a/source/bin/monophony.py b/source/bin/monophony.py index 6778c0b..809155a 100755 --- a/source/bin/monophony.py +++ b/source/bin/monophony.py @@ -3,7 +3,6 @@ import gettext import os -import pathlib import platform import sys import threading -- GitLab