Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions beets/ui/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,9 @@ def _field_diff(field, old, old_fmt, new, new_fmt):
return f"{oldstr} -> {newstr}"


def show_model_changes(new, old=None, fields=None, always=False):
def show_model_changes(
new, old=None, fields=None, always=False, print_obj: bool = True
):
"""Given a Model object, print a list of changes from its pristine
version stored in the database. Return a boolean indicating whether
any changes were found.
Expand Down Expand Up @@ -1117,7 +1119,7 @@ def show_model_changes(new, old=None, fields=None, always=False):
)

# Print changes.
if changes or always:
if print_obj and (changes or always):
print_(format(old))
if changes:
print_("\n".join(changes))
Expand Down
140 changes: 47 additions & 93 deletions beetsplug/lastgenre/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@
https://gist.github.com/1241307
"""

from __future__ import annotations

import os
import traceback
from functools import singledispatchmethod
from pathlib import Path
from typing import Union
from typing import TYPE_CHECKING, Union

import pylast
import yaml
Expand All @@ -34,6 +37,9 @@
from beets.library import Album, Item
from beets.util import plurality, unique_list

if TYPE_CHECKING:
from beets.library import LibModel

LASTFM = pylast.LastFMNetwork(api_key=plugins.LASTFM_KEY)

PYLAST_EXCEPTIONS = (
Expand Down Expand Up @@ -101,6 +107,7 @@ def __init__(self):
"prefer_specific": False,
"title_case": True,
"extended_debug": False,
"pretend": False,
}
)
self.setup()
Expand Down Expand Up @@ -321,7 +328,7 @@ def _format_and_stringify(self, tags: list[str]) -> str:

return self.config["separator"].as_str().join(formatted)

def _get_existing_genres(self, obj: Union[Album, Item]) -> list[str]:
def _get_existing_genres(self, obj: LibModel) -> list[str]:
"""Return a list of genres for this Item or Album. Empty string genres
are removed."""
separator = self.config["separator"].get()
Expand All @@ -342,9 +349,7 @@ def _combine_resolve_and_log(
combined = old + new
return self._resolve_genres(combined)

def _get_genre(
self, obj: Union[Album, Item]
) -> tuple[Union[str, None], ...]:
def _get_genre(self, obj: LibModel) -> tuple[Union[str, None], ...]:
"""Get the final genre string for an Album or Item object.

`self.sources` specifies allowed genre sources. Starting with the first
Expand Down Expand Up @@ -459,6 +464,39 @@ def _try_resolve_stage(stage_label: str, keep_genres, new_genres):

# Beets plugin hooks and CLI.

def _fetch_and_log_genre(self, obj: LibModel) -> None:
"""Fetch genre and log it."""
self._log.info(str(obj))
obj.genre, label = self._get_genre(obj)
self._log.debug("Resolved ({}): {}", label, obj.genre)

ui.show_model_changes(obj, fields=["genre"], print_obj=False)

@singledispatchmethod
def _process(self, obj: LibModel, write: bool) -> None:
"""Process an object, dispatching to the appropriate method."""
raise NotImplementedError

@_process.register
def _process_track(self, obj: Item, write: bool) -> None:
"""Process a single track/item."""
self._fetch_and_log_genre(obj)
if not self.config["pretend"]:
obj.try_sync(write=write, move=False)

@_process.register
def _process_album(self, obj: Album, write: bool) -> None:
"""Process an entire album."""
self._fetch_and_log_genre(obj)
if "track" in self.sources:
for item in obj.items():
self._process(item, write)

if not self.config["pretend"]:
obj.try_sync(
write=write, move=False, inherit="track" not in self.sources
)

def commands(self):
lastgenre_cmd = ui.Subcommand("lastgenre", help="fetch genres")
lastgenre_cmd.parser.add_option(
Expand Down Expand Up @@ -526,101 +564,17 @@ def commands(self):
lastgenre_cmd.parser.set_defaults(album=True)

def lastgenre_func(lib, opts, args):
write = ui.should_write()
pretend = getattr(opts, "pretend", False)
self.config.set_args(opts)

if opts.album:
# Fetch genres for whole albums
for album in lib.albums(args):
album_genre, src = self._get_genre(album)
prefix = "Pretend: " if pretend else ""
self._log.info(
'{}genre for album "{.album}" ({}): {}',
prefix,
album,
src,
album_genre,
)
if not pretend:
album.genre = album_genre
if "track" in self.sources:
album.store(inherit=False)
else:
album.store()

for item in album.items():
# If we're using track-level sources, also look up each
# track on the album.
if "track" in self.sources:
item_genre, src = self._get_genre(item)
self._log.info(
'{}genre for track "{.title}" ({}): {}',
prefix,
item,
src,
item_genre,
)
if not pretend:
item.genre = item_genre
item.store()

if write and not pretend:
item.try_write()
else:
# Just query singletons, i.e. items that are not part of
# an album
for item in lib.items(args):
item_genre, src = self._get_genre(item)
prefix = "Pretend: " if pretend else ""
self._log.info(
'{}genre for track "{0.title}" ({1}): {}',
prefix,
item,
src,
item_genre,
)
if not pretend:
item.genre = item_genre
item.store()
if write and not pretend:
item.try_write()
method = lib.albums if opts.album else lib.items
for obj in method(args):
self._process(obj, write=ui.should_write())

lastgenre_cmd.func = lastgenre_func
return [lastgenre_cmd]

def imported(self, session, task):
"""Event hook called when an import task finishes."""
if task.is_album:
album = task.album
album.genre, src = self._get_genre(album)
self._log.debug(
'genre for album "{0.album}" ({1}): {0.genre}', album, src
)

# If we're using track-level sources, store the album genre only,
# then also look up individual track genres.
if "track" in self.sources:
album.store(inherit=False)
for item in album.items():
item.genre, src = self._get_genre(item)
self._log.debug(
'genre for track "{0.title}" ({1}): {0.genre}',
item,
src,
)
item.store()
# Store the album genre and inherit to tracks.
else:
album.store()

else:
item = task.item
item.genre, src = self._get_genre(item)
self._log.debug(
'genre for track "{0.title}" ({1}): {0.genre}', item, src
)
item.store()
self._process(task.album if task.is_album else task.item, write=False)

def _tags_for(self, obj, min_weight=None):
"""Core genre identification routine.
Expand Down
42 changes: 17 additions & 25 deletions test/plugins/test_lastgenre.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import pytest

from beets.test import _common
from beets.test.helper import BeetsTestCase
from beets.test.helper import PluginTestCase
from beetsplug import lastgenre


class LastGenrePluginTest(BeetsTestCase):
class LastGenrePluginTest(PluginTestCase):
plugin = "lastgenre"

def setUp(self):
super().setUp()
self.plugin = lastgenre.LastGenrePlugin()
Expand Down Expand Up @@ -131,6 +133,11 @@ def test_prefer_specific_without_canonical(self):
"math rock",
]

@patch("beets.ui.should_write", Mock(return_value=True))
@patch(
"beetsplug.lastgenre.LastGenrePlugin._get_genre",
Mock(return_value=("Mock Genre", "mock stage")),
)
def test_pretend_option_skips_library_updates(self):
item = self.create_item(
album="Pretend Album",
Expand All @@ -141,32 +148,17 @@ def test_pretend_option_skips_library_updates(self):
)
album = self.lib.add_album([item])

command = self.plugin.commands()[0]
opts, args = command.parser.parse_args(["--pretend"])

with patch.object(lastgenre.ui, "should_write", return_value=True):
with patch.object(
self.plugin,
"_get_genre",
return_value=("Mock Genre", "mock stage"),
) as mock_get_genre:
with patch.object(self.plugin._log, "info") as log_info:
# Mock try_write to verify it's never called in pretend mode
with patch.object(item, "try_write") as mock_try_write:
command.func(self.lib, opts, args)

mock_get_genre.assert_called_once()

assert any(
call.args[1] == "Pretend: " for call in log_info.call_args_list
)
def unexpected_store(*_, **__):
raise AssertionError("Unexpected store call")

# Verify that try_write was never called (file operations skipped)
mock_try_write.assert_not_called()
with patch("beetsplug.lastgenre.Item.store", unexpected_store):
output = self.run_with_output("lastgenre", "--pretend")

stored_album = self.lib.get_album(album.id)
assert stored_album.genre == "Original Genre"
assert stored_album.items()[0].genre == "Original Genre"
assert "Mock Genre" in output
album.load()
assert album.genre == "Original Genre"
assert album.items()[0].genre == "Original Genre"

def test_no_duplicate(self):
"""Remove duplicated genres."""
Expand Down