-
Notifications
You must be signed in to change notification settings - Fork 2k
listenbrainz: Add pagination, play count aggregation, and recording_mbid fix #6484
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,6 +3,7 @@ | |
| from __future__ import annotations | ||
|
|
||
| import datetime | ||
| from collections import Counter | ||
| from typing import TYPE_CHECKING, ClassVar | ||
|
|
||
| import requests | ||
|
|
@@ -48,21 +49,48 @@ def func(lib, opts, args): | |
| return [lbupdate_cmd] | ||
|
|
||
| def _lbupdate(self, lib, log): | ||
| """Obtain view count from Listenbrainz.""" | ||
| found_total = 0 | ||
| unknown_total = 0 | ||
| ls = self.get_listens() | ||
| tracks = self.get_tracks_from_listens(ls) | ||
| log.info("Found {} listens", len(ls)) | ||
| if tracks: | ||
| found, unknown = update_play_counts( | ||
| lib, tracks, log, "listenbrainz" | ||
| ) | ||
| found_total += found | ||
| unknown_total += unknown | ||
| """Obtain play counts from ListenBrainz.""" | ||
| listens = self.get_listens() | ||
| if listens is None: | ||
| log.error("Failed to fetch listens from ListenBrainz.") | ||
| return | ||
| if not listens: | ||
| log.info("No listens found.") | ||
| return | ||
| log.info("Found {} listens", len(listens)) | ||
| tracks = self._aggregate_listens(self.get_tracks_from_listens(listens)) | ||
| log.info("Aggregated into {} unique tracks", len(tracks)) | ||
| found, unknown = update_play_counts(lib, tracks, log, "listenbrainz") | ||
| log.info("... done!") | ||
| log.info("{} unknown play-counts", unknown_total) | ||
| log.info("{} play-counts imported", found_total) | ||
| log.info("{} unknown play-counts", unknown) | ||
| log.info("{} play-counts imported", found) | ||
|
|
||
| @staticmethod | ||
| def _aggregate_listens(tracks: list[Track]) -> list[Track]: | ||
| """Aggregate individual listen events into per-track play counts. | ||
|
|
||
| ListenBrainz returns individual listen events (each with playcount=1). | ||
| We aggregate them by track identity so each unique track gets its total | ||
| count, making the import idempotent. | ||
| """ | ||
| _agg_key = str | tuple[str, str, str] | ||
| play_counts: Counter[_agg_key] = Counter() | ||
| track_info: dict[_agg_key, Track] = {} | ||
| for t in tracks: | ||
| mbid = t.get("mbid") or "" | ||
| artist = t["artist"] | ||
| name = t["name"] | ||
| album = t.get("album") or "" | ||
|
|
||
| key: _agg_key = mbid if mbid else (artist, name, album) | ||
| play_counts[key] += 1 | ||
| if key not in track_info: | ||
| track_info[key] = t | ||
|
|
||
| return [ | ||
| {**info, "playcount": play_counts[key]} | ||
| for key, info in track_info.items() | ||
| ] | ||
|
|
||
| def _make_request(self, url, params=None): | ||
| """Makes a request to the ListenBrainz API.""" | ||
|
|
@@ -80,41 +108,59 @@ def _make_request(self, url, params=None): | |
| return None | ||
|
|
||
| def get_listens(self, min_ts=None, max_ts=None, count=None): | ||
| """Gets the listen history of a given user. | ||
| """Gets the full listen history of a given user. | ||
|
|
||
| Paginates through all available listens using the max_ts parameter. | ||
|
|
||
| Args: | ||
| username: User to get listen history of. | ||
| min_ts: History before this timestamp will not be returned. | ||
| DO NOT USE WITH max_ts. | ||
| max_ts: History after this timestamp will not be returned. | ||
| DO NOT USE WITH min_ts. | ||
| count: How many listens to return. If not specified, | ||
| uses a default from the server. | ||
| count: How many listens to return per page (max 1000). | ||
|
|
||
| Returns: | ||
| A list of listen info dictionaries if there's an OK status. | ||
|
|
||
| Raises: | ||
| An HTTPError if there's a failure. | ||
| A ValueError if the JSON in the response is invalid. | ||
| An IndexError if the JSON is not structured as expected. | ||
| A list of listen info dictionaries, or None on API failure. | ||
| """ | ||
| if min_ts is not None and max_ts is not None: | ||
| raise ValueError("min_ts and max_ts are mutually exclusive.") | ||
|
|
||
| per_page = min(count or 1000, 1000) | ||
| url = f"{self.ROOT}/user/{self.username}/listens" | ||
| params = { | ||
| k: v | ||
| for k, v in { | ||
| "min_ts": min_ts, | ||
| "max_ts": max_ts, | ||
| "count": count, | ||
| }.items() | ||
| if v is not None | ||
| } | ||
| response = self._make_request(url, params) | ||
|
|
||
| if response is not None: | ||
| return response["payload"]["listens"] | ||
| else: | ||
| return None | ||
| all_listens = [] | ||
|
|
||
| while True: | ||
| params = {"count": per_page} | ||
| if max_ts is not None: | ||
| params["max_ts"] = max_ts | ||
| if min_ts is not None: | ||
|
Comment on lines
+130
to
+136
|
||
| params["min_ts"] = min_ts | ||
|
|
||
| response = self._make_request(url, params) | ||
| if response is None: | ||
|
Comment on lines
+134
to
+140
|
||
| if not all_listens: | ||
| return None | ||
| break | ||
|
|
||
| listens = response["payload"]["listens"] | ||
| if not listens: | ||
| break | ||
|
|
||
| all_listens.extend(listens) | ||
| self._log.info("Fetched {} listens so far...", len(all_listens)) | ||
|
|
||
| # If we got fewer than requested, we've reached the end | ||
| if len(listens) < per_page: | ||
| break | ||
|
|
||
|
Comment on lines
+151
to
+155
|
||
| # Paginate using the oldest listen's timestamp. | ||
| # Subtract 1 to avoid re-fetching listens at the boundary. | ||
| new_max_ts = listens[-1]["listened_at"] - 1 | ||
| if max_ts is not None and new_max_ts >= max_ts: | ||
| break | ||
| max_ts = new_max_ts | ||
|
|
||
| return all_listens | ||
|
|
||
| def get_tracks_from_listens(self, listens) -> list[Track]: | ||
| """Returns a list of tracks from a list of listens.""" | ||
|
|
@@ -123,8 +169,8 @@ def get_tracks_from_listens(self, listens) -> list[Track]: | |
| if track["track_metadata"].get("release_name") is None: | ||
| continue | ||
| mbid_mapping = track["track_metadata"].get("mbid_mapping", {}) | ||
| mbid = None | ||
| if mbid_mapping.get("recording_mbid") is None: | ||
| mbid = mbid_mapping.get("recording_mbid") | ||
| if mbid is None: | ||
| # search for the track using title and release | ||
| mbid = self.get_mb_recording_id(track) | ||
| tracks.append( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| from unittest.mock import patch | ||
|
|
||
| import pytest | ||
|
|
||
| from beets.test.helper import ConfigMixin | ||
|
|
@@ -45,3 +47,181 @@ def test_get_track_info(self, plugin, requests_mock): | |
| "year": "2023", | ||
| } | ||
| ] | ||
|
|
||
| def test_aggregate_listens_counts_by_mbid(self, plugin): | ||
| tracks = [ | ||
| { | ||
| "mbid": "m1", | ||
| "artist": "A", | ||
| "name": "S", | ||
| "album": "Al", | ||
| "playcount": 1, | ||
| }, | ||
| { | ||
| "mbid": "m1", | ||
| "artist": "A", | ||
| "name": "S", | ||
| "album": "Al", | ||
| "playcount": 1, | ||
| }, | ||
| { | ||
| "mbid": "m1", | ||
| "artist": "A", | ||
| "name": "S", | ||
| "album": "Al", | ||
| "playcount": 1, | ||
| }, | ||
| { | ||
| "mbid": "m2", | ||
| "artist": "B", | ||
|
Comment on lines
+73
to
+76
|
||
| "name": "T", | ||
| "album": "Bl", | ||
| "playcount": 1, | ||
| }, | ||
|
Comment on lines
+77
to
+80
|
||
| ] | ||
| result = plugin._aggregate_listens(tracks) | ||
| by_mbid = {t["mbid"]: t["playcount"] for t in result} | ||
| assert by_mbid == {"m1": 3, "m2": 1} | ||
|
|
||
| def test_aggregate_listens_falls_back_to_artist_title_album(self, plugin): | ||
| tracks = [ | ||
| { | ||
| "mbid": None, | ||
| "artist": "A", | ||
| "name": "S", | ||
| "album": "Al", | ||
| "playcount": 1, | ||
| }, | ||
| { | ||
| "mbid": "", | ||
| "artist": "A", | ||
| "name": "S", | ||
| "album": "Al", | ||
| "playcount": 1, | ||
| }, | ||
| { | ||
| "mbid": None, | ||
| "artist": "A", | ||
| "name": "S", | ||
| "album": "Al", | ||
| "playcount": 1, | ||
| }, | ||
| ] | ||
| result = plugin._aggregate_listens(tracks) | ||
| assert len(result) == 1 | ||
| assert result[0]["playcount"] == 3 | ||
|
|
||
| def test_get_listens_paginates(self, plugin): | ||
| page1 = [ | ||
| { | ||
| "listened_at": 100 - i, | ||
| "track_metadata": { | ||
| "track_name": f"T{i}", | ||
| "artist_name": "A", | ||
| "release_name": "R", | ||
| }, | ||
| } | ||
| for i in range(5) | ||
| ] | ||
| page2 = [ | ||
| { | ||
| "listened_at": 50 - i, | ||
| "track_metadata": { | ||
| "track_name": f"T{5 + i}", | ||
| "artist_name": "A", | ||
| "release_name": "R", | ||
| }, | ||
| } | ||
| for i in range(3) | ||
| ] | ||
|
|
||
| call_count = 0 | ||
|
|
||
| def mock_request(url, params=None): | ||
| nonlocal call_count | ||
| call_count += 1 | ||
| if call_count == 1: | ||
| return {"payload": {"listens": page1}} | ||
| elif call_count == 2: | ||
| return {"payload": {"listens": page2}} | ||
| return {"payload": {"listens": []}} | ||
|
|
||
| with patch.object(plugin, "_make_request", side_effect=mock_request): | ||
| result = plugin.get_listens(count=5) | ||
|
|
||
| assert len(result) == 8 | ||
| assert call_count == 2 | ||
|
|
||
| def test_get_listens_stops_on_empty_page(self, plugin): | ||
| def mock_request(url, params=None): | ||
| return {"payload": {"listens": []}} | ||
|
|
||
| with patch.object(plugin, "_make_request", side_effect=mock_request): | ||
| result = plugin.get_listens() | ||
|
|
||
| assert result == [] | ||
|
|
||
| def test_get_listens_returns_none_on_api_error(self, plugin): | ||
| def mock_request(url, params=None): | ||
| return None | ||
|
|
||
| with patch.object(plugin, "_make_request", side_effect=mock_request): | ||
| result = plugin.get_listens() | ||
|
|
||
| assert result is None | ||
|
|
||
| def test_get_listens_rejects_both_min_and_max_ts(self, plugin): | ||
| with pytest.raises(ValueError, match="mutually exclusive"): | ||
| plugin.get_listens(min_ts=1, max_ts=2) | ||
|
|
||
| def test_get_listens_clamps_count_to_1000(self, plugin): | ||
| calls = [] | ||
|
|
||
| def mock_request(url, params=None): | ||
| calls.append(params) | ||
| return {"payload": {"listens": []}} | ||
|
|
||
| with patch.object(plugin, "_make_request", side_effect=mock_request): | ||
| plugin.get_listens(count=5000) | ||
|
|
||
| assert calls[0]["count"] == 1000 | ||
|
|
||
| def test_get_tracks_from_listens_uses_recording_mbid(self, plugin): | ||
| listens = [ | ||
| { | ||
| "listened_at": 1000, | ||
| "track_metadata": { | ||
| "track_name": "Song", | ||
| "artist_name": "Artist", | ||
| "release_name": "Album", | ||
| "mbid_mapping": { | ||
| "recording_mbid": "rec-mbid-123", | ||
| "release_mbid": "rel-mbid", | ||
| }, | ||
| }, | ||
| } | ||
| ] | ||
| with patch.object(plugin, "get_mb_recording_id") as mock_mb: | ||
| tracks = plugin.get_tracks_from_listens(listens) | ||
| mock_mb.assert_not_called() | ||
|
|
||
| assert tracks[0]["mbid"] == "rec-mbid-123" | ||
| assert tracks[0]["playcount"] == 1 | ||
|
|
||
| def test_get_tracks_from_listens_flat_structure(self, plugin): | ||
| listens = [ | ||
| { | ||
| "listened_at": 1000, | ||
| "track_metadata": { | ||
| "track_name": "Song", | ||
| "artist_name": "Artist", | ||
| "release_name": "Album", | ||
| "mbid_mapping": {"recording_mbid": "m1"}, | ||
| }, | ||
| } | ||
| ] | ||
| tracks = plugin.get_tracks_from_listens(listens) | ||
| t = tracks[0] | ||
| assert isinstance(t["artist"], str) | ||
| assert isinstance(t["name"], str) | ||
| assert t["album"] == "Album" | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
grug see when request fail, get_listens break and return empty list. then _lbupdate log "No listens found" like user have none, but really API error. grug want error path distinct: log warning/error and bail, or return None/raise so caller can tell.