From f61ef2fa53df9449845f4523f469d3a3a696f45f Mon Sep 17 00:00:00 2001 From: Feuerfuchs Date: Tue, 20 Nov 2018 23:11:58 +0100 Subject: Cleanup --- mpris_to_text.py | 412 +++++++++++++++++++++++++++---------------------------- 1 file changed, 205 insertions(+), 207 deletions(-) diff --git a/mpris_to_text.py b/mpris_to_text.py index 16ee1e0..24baabf 100755 --- a/mpris_to_text.py +++ b/mpris_to_text.py @@ -16,230 +16,227 @@ from blessed import Terminal dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) -name_regex = re.compile("^org\.mpris\.MediaPlayer2\.") -bus = dbus.SessionBus() -term = Terminal() - -refresh_cond = threading.Condition() -refresh_flag = True -exit_flag = False - -players = [] -player_id_to_index = {} -active_player = "" -current_output = "" -playing_song_changed_signal_receiver = None - -filename = "" -meta_format = "" -meta_format_artist = "" -meta_format_title = "" -meta_format_album = "" - - -def track_string(metadata): - artist = metadata["xesam:artist"][0] if "xesam:artist" in metadata else "" - title = metadata["xesam:title"] if "xesam:title" in metadata else "" - album = metadata["xesam:album"] if "xesam:album" in metadata else "" - - return meta_format.format( - artist = meta_format_artist.format(artist) if artist != "" else "", - title = meta_format_title.format(title) if title != "" else "", - album = meta_format_album.format(album) if album != "" else "" - ) - - -def write_track(track): - global filename - global current_output +class MetaWriter: + filename = "" + output_format = "" + output_format_artist = "" + output_format_title = "" + output_format_album = "" + last_output = "" + + def __init__(self, filename, output_format, output_format_artist, output_format_title, output_format_album): + self.filename = filename + self.output_format = output_format + self.output_format_album = output_format_album + self.output_format_artist = output_format_artist + self.output_format_title = output_format_title + + def write_meta(self, metadata): + artist = metadata["xesam:artist"][0] if "xesam:artist" in metadata else "" + title = metadata["xesam:title"] if "xesam:title" in metadata else "" + album = metadata["xesam:album"] if "xesam:album" in metadata else "" + + self.write(self.output_format.format( + artist = self.output_format_artist.format(artist) if artist != "" else "", + title = self.output_format_title.format(title) if title != "" else "", + album = self.output_format_album.format(album) if album != "" else "" + )) + + def write(self, text): + self.last_output = text + f = open(self.filename, "w") + f.write(text) + f.close() + + +class PlayerSelector(threading.Thread): + service_regex = re.compile("^org\.mpris\.MediaPlayer2\.") + bus = None + loop = None + signal_receiver = None + players = {} + players_indexes = [] + active_player = "" + menu = None + meta_writer = None + + def __init__(self, meta_writer, menu=None): + super().__init__() + + self.bus = dbus.SessionBus() + self.meta_writer = meta_writer + self.menu = menu + + def set_menu(self, menu): + self.menu = menu - current_output = track - f = open(filename, "w") - f.write(track) - f.close() - - -def playing_song_changed(player, changed_properties, invalidated_properties): - global refresh_cond - global refresh_flag - - if "Metadata" in changed_properties: - write_track(track_string(changed_properties["Metadata"])) - with refresh_cond: - refresh_flag = True - refresh_cond.notify() - - -def dbus_name_owner_changed(name, old_owner, new_owner): - global refresh_cond - global refresh_flag - - if name_regex.match(name): - get_players() - with refresh_cond: - refresh_flag = True - refresh_cond.notify() - - -def set_active_player(player_id): - global bus - global players - global player_id_to_index - global active_player - global playing_song_changed_signal_receiver - - if player_id in player_id_to_index: - active_player = player_id - elif len(player_id_to_index) != 0: - active_player = next(iter(player_id_to_index)) - else: - active_player = "" - - if playing_song_changed_signal_receiver is not None: - playing_song_changed_signal_receiver.remove() - - if active_player != "": - playing_song_changed_signal_receiver = bus.add_signal_receiver( - handler_function = playing_song_changed, - bus_name = active_player, - dbus_interface = "org.freedesktop.DBus.Properties", - signal_name = "PropertiesChanged", - path = "/org/mpris/MediaPlayer2" - ) - - player_path = bus.get_object(active_player, "/org/mpris/MediaPlayer2") - player_props = dbus.Interface(player_path, "org.freedesktop.DBus.Properties") - - write_track(track_string(player_props.Get("org.mpris.MediaPlayer2.Player", "Metadata"))) - else: - write_track("") - - -def get_players(): - global players - global player_id_to_index - global active_player - - players = [] - player_id_to_index = {} - bus_path = bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus") - bus_proxy = dbus.Interface(bus_path, "org.freedesktop.DBus") - names = bus_proxy.ListNames() - - for name in names: - if name_regex.match(name): - split_name = name.split(".") - id_start_index = len(split_name[0]) + len(split_name[1]) + len(split_name[2]) + 3 - - player_path = bus.get_object(name, "/org/mpris/MediaPlayer2") + def get_players(self): + self.players = {} + self.players_indexes = [] + + bus_path = self.bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus") + bus_proxy = dbus.Interface(bus_path, "org.freedesktop.DBus") + + for service in bus_proxy.ListNames(): + if self.service_regex.match(service): + split_service = service.split(".") + name_start_index = len(split_service[0]) + len(split_service[1]) + len(split_service[2]) + 3 + + player_path = self.bus.get_object(service, "/org/mpris/MediaPlayer2") + player_props = dbus.Interface(player_path, "org.freedesktop.DBus.Properties") + player_name = service[name_start_index:] + try: + player_name = player_props.Get("org.mpris.MediaPlayer2.Player", "Identity") + except dbus.exceptions.DBusException: + pass + + self.players[service] = player_name + self.players_indexes.append(service) + + self.set_active_player(self.active_player) + + def set_active_player_index(self, player_index): + if player_index < len(self.players_indexes): + self.set_active_player(self.players_indexes[player_index]) + + def set_active_player(self, player_id): + if player_id in self.players: + self.active_player = player_id + elif len(self.players) != 0: + self.active_player = next(iter(self.players.keys())) + else: + self.active_player = "" + + if self.signal_receiver is not None: + self.signal_receiver.remove() + + if self.active_player != "": + self.signal_receiver = self.bus.add_signal_receiver( + handler_function = self.playing_song_changed, + bus_name = self.active_player, + dbus_interface = "org.freedesktop.DBus.Properties", + signal_name = "PropertiesChanged", + path = "/org/mpris/MediaPlayer2" + ) + + player_path = self.bus.get_object(self.active_player, "/org/mpris/MediaPlayer2") player_props = dbus.Interface(player_path, "org.freedesktop.DBus.Properties") - player_id = name[id_start_index:] - try: - player_id = player_props.Get("org.mpris.MediaPlayer2.Player", "Identity") - except dbus.exceptions.DBusException: - pass - players.append((name, player_id)) - player_id_to_index[name] = len(players) - 1 + self.meta_writer.write_meta(player_props.Get("org.mpris.MediaPlayer2.Player", "Metadata")) + else: + self.meta_writer.write("") - set_active_player(active_player) + def run(self): + bus_path = self.bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus") + bus_proxy = dbus.Interface(bus_path, "org.freedesktop.DBus") + bus_proxy.connect_to_signal("NameOwnerChanged", self.dbus_name_owner_changed) -def draw_menu(): - global term - global players - global refresh_cond - global refresh_flag - global exit_flag - global current_output - global filename + self.get_players() + self.menu.refresh() - while not exit_flag: - with refresh_cond: - while not refresh_flag and not exit_flag: - refresh_cond.wait() + self.loop = GLib.MainLoop() + self.loop.run() + + def quit(self): + self.loop.quit() + + def dbus_name_owner_changed(self, name, old_owner, new_owner): + if self.service_regex.match(name): + self.get_players() + self.menu.refresh() - refresh_flag = False + def playing_song_changed(self, player, changed_properties, invalidated_properties): + if "Metadata" in changed_properties: + self.meta_writer.write_meta(changed_properties["Metadata"]) + self.menu.refresh() - with term.fullscreen(): - print(term.move(0, 0) + term.bold_bright_white_on_bright_black(("{0:<{width}}").format("MPRIS To Text", width=term.width)) + "\n") - print(term.move_x(2) + term.bold("Player: ") + term.move_up()) - - for i in range(len(players)): - player = players[i] - output = "%d: %s" % (i, player[1]) - if players[player_id_to_index[active_player]][0] == player[0]: - print(term.move_x(10) + term.standout(output)) - else: - print(term.move_x(10) + output) +class Menu(threading.Thread): + refresh_cond = threading.Condition() + refresh_flag = True + exit_flag = False + player_selector = None + meta_writer = None - print(term.move_x(2) + term.bold("File: ") + filename) - print(term.move_x(2) + term.bold("Output: ") + "\n".join(term.wrap(current_output, width=term.width - 10, subsequent_indent=" " * 10))) + def __init__(self, term, player_selector, meta_writer): + super().__init__() - print(term.move_x(0) + "\nEnter number to select player or q to exit." + term.move_up()) + self.player_selector = player_selector + self.meta_writer = meta_writer - with term.fullscreen(): - print(term.move(0, 0) + "Exiting...") + def refresh(self, exit_flag=False): + with self.refresh_cond: + self.refresh_flag = True + self.exit_flag = self.exit_flag or exit_flag + self.refresh_cond.notify() + def run(self): + while not self.exit_flag: + with self.refresh_cond: + while not self.refresh_flag and not self.exit_flag: + self.refresh_cond.wait() -def on_resize(*args): - global refresh_cond - global refresh_flag + self.refresh_flag = False + + with term.fullscreen(): + print(term.move(0, 0) + term.bold_bright_white_on_bright_black(("{0:<{width}}").format("MPRIS To Text", width=term.width)) + "\n") + print(term.move_x(2) + term.bold("Player: ") + term.move_up()) + + for i, (id, name) in enumerate(self.player_selector.players.items()): + output = "%d: %s" % (i, name) - with refresh_cond: - refresh_flag = True - refresh_cond.notify() + if id == self.player_selector.active_player: + print(term.move_x(10) + term.standout(output)) + else: + print(term.move_x(10) + output) + print(term.move_x(2) + term.bold("File: ") + self.meta_writer.filename) + print(term.move_x(2) + term.bold("Output: ") + "\n".join(term.wrap(self.meta_writer.last_output, width=term.width - 10, subsequent_indent=" " * 10))) -def init_dbus(): - global bus + print(term.move_x(0) + "\nEnter number to select player or q to exit." + term.move_up()) - bus_path = bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus") - bus_proxy = dbus.Interface(bus_path, "org.freedesktop.DBus") + with term.fullscreen(): + print(term.move(0, 0) + "Exiting...") - bus_proxy.connect_to_signal("NameOwnerChanged", dbus_name_owner_changed) - get_players() +class Input(threading.Thread): + player_selector = None + meta_writer = None + menu = None + def __init__(self, term, player_selector, meta_writer, menu): + super().__init__() -def init_blessed(): - global bus - global term - global refresh_cond - global refresh_flag - global exit_flag - global loop + self.player_selector = player_selector + self.meta_writer = meta_writer + self.menu = menu - with term.cbreak(): - val = None + def run(self): + with term.cbreak(): + val = None - while val not in (u'q', u'Q',): - val = term.inkey(timeout=5) + while val not in (u'q', u'Q',): + val = term.inkey(timeout=5) - if not val or val.is_sequence or not val.isnumeric(): - continue + if not val or val.is_sequence or not val.isnumeric(): + continue - if int(val) < len(players): - set_active_player(players[int(val)][0]) - with refresh_cond: - refresh_flag = True - refresh_cond.notify() + if int(val) < len(self.player_selector.players): + self.player_selector.set_active_player_index(int(val)) + self.menu.refresh() - with refresh_cond: - exit_flag = True - refresh_cond.notify() + self.menu.refresh(True) + self.player_selector.quit() - loop.quit() +def on_resize(*args): + global menu_thread + + menu_thread.refresh() -def read_args(): - global filename - global meta_format - global meta_format_artist - global meta_format_title - global meta_format_album +def create_writer(): parser = argparse.ArgumentParser(description="Write metadata from MPRIS-compliant media players into a text file.") parser.add_argument("--file", type = str, @@ -272,26 +269,27 @@ def read_args(): help = "Full format string (default: \"{artist}{title}{album} \")" ) - args = parser.parse_args() - filename = args.filename - meta_format = args.format - meta_format_artist = args.format_artist - meta_format_title = args.format_title - meta_format_album = args.format_album + args = parser.parse_args() + return MetaWriter( + filename = args.filename, + output_format = args.format, + output_format_artist = args.format_artist, + output_format_title = args.format_title, + output_format_album = args.format_album + ) -read_args() +term = Terminal() +meta_writer = create_writer() signal.signal(signal.SIGWINCH, on_resize) -init_dbus() +player_selector = PlayerSelector(meta_writer) +menu_thread = Menu(term, player_selector, meta_writer) +player_selector.set_menu(menu_thread) +input_thread = Input(term, player_selector, meta_writer, menu_thread) -blessed_thread = threading.Thread(target=init_blessed) -blessed_thread.start() - -menu_thread = threading.Thread(target=draw_menu) +player_selector.start() menu_thread.start() - -loop = GLib.MainLoop() -loop.run() +input_thread.start() -- cgit v1.2.3-70-g09d2