Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 87 additions & 41 deletions beetsplug/listenbrainz.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import datetime
from collections import Counter
from typing import TYPE_CHECKING, ClassVar

import requests
Expand Down Expand Up @@ -48,21 +49,48 @@ def func(lib, opts, args):
return [lbupdate_cmd]

def _lbupdate(self, lib, log):
"""Obtain view count from Listenbrainz."""
found_total = 0
unknown_total = 0
ls = self.get_listens()
tracks = self.get_tracks_from_listens(ls)
log.info("Found {} listens", len(ls))
if tracks:
found, unknown = update_play_counts(
lib, tracks, log, "listenbrainz"
)
found_total += found
unknown_total += unknown
"""Obtain play counts from ListenBrainz."""
listens = self.get_listens()
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grug see when request fail, get_listens break and return empty list. then _lbupdate log "No listens found" like user have none, but really API error. grug want error path distinct: log warning/error and bail, or return None/raise so caller can tell.

Suggested change
listens = self.get_listens()
listens = self.get_listens()
if listens is None:
# grug: error path different from "no listens" path
log.error("Failed to fetch listens from ListenBrainz; aborting import.")
return

Copilot uses AI. Check for mistakes.
if listens is None:
log.error("Failed to fetch listens from ListenBrainz.")
return
if not listens:
log.info("No listens found.")
return
log.info("Found {} listens", len(listens))
tracks = self._aggregate_listens(self.get_tracks_from_listens(listens))
log.info("Aggregated into {} unique tracks", len(tracks))
found, unknown = update_play_counts(lib, tracks, log, "listenbrainz")
log.info("... done!")
log.info("{} unknown play-counts", unknown_total)
log.info("{} play-counts imported", found_total)
log.info("{} unknown play-counts", unknown)
log.info("{} play-counts imported", found)

@staticmethod
def _aggregate_listens(tracks: list[Track]) -> list[Track]:
"""Aggregate individual listen events into per-track play counts.

ListenBrainz returns individual listen events (each with playcount=1).
We aggregate them by track identity so each unique track gets its total
count, making the import idempotent.
"""
_agg_key = str | tuple[str, str, str]
play_counts: Counter[_agg_key] = Counter()
track_info: dict[_agg_key, Track] = {}
for t in tracks:
mbid = t.get("mbid") or ""
artist = t["artist"]
name = t["name"]
album = t.get("album") or ""

key: _agg_key = mbid if mbid else (artist, name, album)
play_counts[key] += 1
if key not in track_info:
track_info[key] = t

return [
{**info, "playcount": play_counts[key]}
for key, info in track_info.items()
]

def _make_request(self, url, params=None):
"""Makes a request to the ListenBrainz API."""
Expand All @@ -80,41 +108,59 @@ def _make_request(self, url, params=None):
return None

def get_listens(self, min_ts=None, max_ts=None, count=None):
"""Gets the listen history of a given user.
"""Gets the full listen history of a given user.

Paginates through all available listens using the max_ts parameter.

Args:
username: User to get listen history of.
min_ts: History before this timestamp will not be returned.
DO NOT USE WITH max_ts.
max_ts: History after this timestamp will not be returned.
DO NOT USE WITH min_ts.
count: How many listens to return. If not specified,
uses a default from the server.
count: How many listens to return per page (max 1000).

Returns:
A list of listen info dictionaries if there's an OK status.

Raises:
An HTTPError if there's a failure.
A ValueError if the JSON in the response is invalid.
An IndexError if the JSON is not structured as expected.
A list of listen info dictionaries, or None on API failure.
"""
if min_ts is not None and max_ts is not None:
raise ValueError("min_ts and max_ts are mutually exclusive.")

per_page = min(count or 1000, 1000)
url = f"{self.ROOT}/user/{self.username}/listens"
params = {
k: v
for k, v in {
"min_ts": min_ts,
"max_ts": max_ts,
"count": count,
}.items()
if v is not None
}
response = self._make_request(url, params)

if response is not None:
return response["payload"]["listens"]
else:
return None
all_listens = []

while True:
params = {"count": per_page}
if max_ts is not None:
params["max_ts"] = max_ts
if min_ts is not None:
Comment on lines +130 to +136
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grug see count become per_page, but no clamp to API max 1000 and no check for <=0. grug want clamp to 1000 and reject non-positive so request not fail or loop strange.

Copilot uses AI. Check for mistakes.
params["min_ts"] = min_ts

response = self._make_request(url, params)
if response is None:
Comment on lines +134 to +140
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grug see doc say min_ts and max_ts no mix. but code will send both if caller pass both. grug want guard: raise ValueError (or ui.UserError) when both set, so no weird API result.

Copilot uses AI. Check for mistakes.
if not all_listens:
return None
break

listens = response["payload"]["listens"]
if not listens:
break

all_listens.extend(listens)
self._log.info("Fetched {} listens so far...", len(all_listens))

# If we got fewer than requested, we've reached the end
if len(listens) < per_page:
break

Comment on lines +151 to +155
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grug see all_listens grow big then later convert to tracks then aggregate. for big user, memory boom. grug want stream pages: aggregate Counter as pages come, no keep every listen dict in list.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

memory boom 🤣

# Paginate using the oldest listen's timestamp.
# Subtract 1 to avoid re-fetching listens at the boundary.
new_max_ts = listens[-1]["listened_at"] - 1
if max_ts is not None and new_max_ts >= max_ts:
break
max_ts = new_max_ts

return all_listens

def get_tracks_from_listens(self, listens) -> list[Track]:
"""Returns a list of tracks from a list of listens."""
Expand All @@ -123,8 +169,8 @@ def get_tracks_from_listens(self, listens) -> list[Track]:
if track["track_metadata"].get("release_name") is None:
continue
mbid_mapping = track["track_metadata"].get("mbid_mapping", {})
mbid = None
if mbid_mapping.get("recording_mbid") is None:
mbid = mbid_mapping.get("recording_mbid")
if mbid is None:
# search for the track using title and release
mbid = self.get_mb_recording_id(track)
tracks.append(
Expand Down
4 changes: 4 additions & 0 deletions docs/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ New features
Bug fixes
~~~~~~~~~

- :doc:`plugins/listenbrainz`: Paginate through all ListenBrainz listens instead
of fetching only 25, aggregate individual listen events into correct play
counts, and use ``recording_mbid`` from the API response when available.
:bug:`6469`
- Correctly handle semicolon-delimited genre values from externally-tagged
files. :bug:`6450`
- :doc:`plugins/listenbrainz`: Fix ``lbimport`` crashing when ListenBrainz
Expand Down
180 changes: 180 additions & 0 deletions test/plugins/test_listenbrainz.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from unittest.mock import patch

import pytest

from beets.test.helper import ConfigMixin
Expand Down Expand Up @@ -45,3 +47,181 @@ def test_get_track_info(self, plugin, requests_mock):
"year": "2023",
}
]

def test_aggregate_listens_counts_by_mbid(self, plugin):
tracks = [
{
"mbid": "m1",
"artist": "A",
"name": "S",
"album": "Al",
"playcount": 1,
},
{
"mbid": "m1",
"artist": "A",
"name": "S",
"album": "Al",
"playcount": 1,
},
{
"mbid": "m1",
"artist": "A",
"name": "S",
"album": "Al",
"playcount": 1,
},
{
"mbid": "m2",
"artist": "B",
Comment on lines +73 to +76
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grug see new test lines very long (nested dict in one line). repo Ruff check E501 unless file ignored, and this file not ignored. grug want wrap these dict literals across lines so lint pass.

Copilot uses AI. Check for mistakes.
"name": "T",
"album": "Bl",
"playcount": 1,
},
Comment on lines +77 to +80
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grug see same long-line problem for page2 listen dict too. please wrap line to stay under configured max line length so Ruff not yell.

Copilot uses AI. Check for mistakes.
]
result = plugin._aggregate_listens(tracks)
by_mbid = {t["mbid"]: t["playcount"] for t in result}
assert by_mbid == {"m1": 3, "m2": 1}

def test_aggregate_listens_falls_back_to_artist_title_album(self, plugin):
tracks = [
{
"mbid": None,
"artist": "A",
"name": "S",
"album": "Al",
"playcount": 1,
},
{
"mbid": "",
"artist": "A",
"name": "S",
"album": "Al",
"playcount": 1,
},
{
"mbid": None,
"artist": "A",
"name": "S",
"album": "Al",
"playcount": 1,
},
]
result = plugin._aggregate_listens(tracks)
assert len(result) == 1
assert result[0]["playcount"] == 3

def test_get_listens_paginates(self, plugin):
page1 = [
{
"listened_at": 100 - i,
"track_metadata": {
"track_name": f"T{i}",
"artist_name": "A",
"release_name": "R",
},
}
for i in range(5)
]
page2 = [
{
"listened_at": 50 - i,
"track_metadata": {
"track_name": f"T{5 + i}",
"artist_name": "A",
"release_name": "R",
},
}
for i in range(3)
]

call_count = 0

def mock_request(url, params=None):
nonlocal call_count
call_count += 1
if call_count == 1:
return {"payload": {"listens": page1}}
elif call_count == 2:
return {"payload": {"listens": page2}}
return {"payload": {"listens": []}}

with patch.object(plugin, "_make_request", side_effect=mock_request):
result = plugin.get_listens(count=5)

assert len(result) == 8
assert call_count == 2

def test_get_listens_stops_on_empty_page(self, plugin):
def mock_request(url, params=None):
return {"payload": {"listens": []}}

with patch.object(plugin, "_make_request", side_effect=mock_request):
result = plugin.get_listens()

assert result == []

def test_get_listens_returns_none_on_api_error(self, plugin):
def mock_request(url, params=None):
return None

with patch.object(plugin, "_make_request", side_effect=mock_request):
result = plugin.get_listens()

assert result is None

def test_get_listens_rejects_both_min_and_max_ts(self, plugin):
with pytest.raises(ValueError, match="mutually exclusive"):
plugin.get_listens(min_ts=1, max_ts=2)

def test_get_listens_clamps_count_to_1000(self, plugin):
calls = []

def mock_request(url, params=None):
calls.append(params)
return {"payload": {"listens": []}}

with patch.object(plugin, "_make_request", side_effect=mock_request):
plugin.get_listens(count=5000)

assert calls[0]["count"] == 1000

def test_get_tracks_from_listens_uses_recording_mbid(self, plugin):
listens = [
{
"listened_at": 1000,
"track_metadata": {
"track_name": "Song",
"artist_name": "Artist",
"release_name": "Album",
"mbid_mapping": {
"recording_mbid": "rec-mbid-123",
"release_mbid": "rel-mbid",
},
},
}
]
with patch.object(plugin, "get_mb_recording_id") as mock_mb:
tracks = plugin.get_tracks_from_listens(listens)
mock_mb.assert_not_called()

assert tracks[0]["mbid"] == "rec-mbid-123"
assert tracks[0]["playcount"] == 1

def test_get_tracks_from_listens_flat_structure(self, plugin):
listens = [
{
"listened_at": 1000,
"track_metadata": {
"track_name": "Song",
"artist_name": "Artist",
"release_name": "Album",
"mbid_mapping": {"recording_mbid": "m1"},
},
}
]
tracks = plugin.get_tracks_from_listens(listens)
t = tracks[0]
assert isinstance(t["artist"], str)
assert isinstance(t["name"], str)
assert t["album"] == "Album"
Loading