feat: import listens from spotify extended streaming history (#305)

This commit is contained in:
Julian Tölle 2023-10-01 03:35:02 +02:00 committed by GitHub
parent 23d7ea0995
commit 7140cb0679
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
50 changed files with 1051 additions and 215 deletions

View file

@ -1,5 +1,5 @@
import { Test, TestingModule } from "@nestjs/testing";
import type { Response } from "express";
import type { Response as ExpressResponse } from "express";
import { User } from "../users/user.entity";
import { AuthSession } from "./auth-session.entity";
import { AuthController } from "./auth.controller";
@ -27,7 +27,7 @@ describe("AuthController", () => {
describe("spotifyCallback", () => {
let user: User;
let res: Response;
let res: ExpressResponse;
let refreshToken: string;
beforeEach(() => {
@ -36,7 +36,7 @@ describe("AuthController", () => {
statusCode: 200,
cookie: jest.fn(),
redirect: jest.fn(),
} as unknown as Response;
} as unknown as ExpressResponse;
refreshToken = "REFRESH_TOKEN";
authService.createSession = jest.fn().mockResolvedValue({ refreshToken });

View file

@ -1,5 +1,5 @@
import {
Body,
Body as NestBody,
Controller,
Delete,
Get,
@ -10,7 +10,7 @@ import {
UseGuards,
} from "@nestjs/common";
import { ApiBody, ApiTags } from "@nestjs/swagger";
import type { Response } from "express";
import type { Response as ExpressResponse } from "express";
import { User } from "../users/user.entity";
import { AuthSession } from "./auth-session.entity";
import { AuthService } from "./auth.service";
@ -42,7 +42,7 @@ export class AuthController {
@Get("spotify/callback")
@UseFilters(SpotifyAuthFilter)
@UseGuards(SpotifyAuthGuard)
async spotifyCallback(@ReqUser() user: User, @Res() res: Response) {
async spotifyCallback(@ReqUser() user: User, @Res() res: ExpressResponse) {
const { refreshToken } = await this.authService.createSession(user);
// Refresh token should not be accessible to frontend to reduce risk
@ -69,7 +69,7 @@ export class AuthController {
@AuthAccessToken()
async createApiToken(
@ReqUser() user: User,
@Body("description") description: string,
@NestBody("description") description: string,
): Promise<NewApiTokenDto> {
const apiToken = await this.authService.createApiToken(user, description);

View file

@ -5,14 +5,14 @@ import {
ForbiddenException,
Logger,
} from "@nestjs/common";
import type { Response } from "express";
import type { Response as ExpressResponse } from "express";
@Catch()
export class SpotifyAuthFilter implements ExceptionFilter {
private readonly logger = new Logger(this.constructor.name);
catch(exception: Error, host: ArgumentsHost) {
const response = host.switchToHttp().getResponse<Response>();
const response = host.switchToHttp().getResponse<ExpressResponse>();
let reason = "unknown";

View file

@ -27,7 +27,7 @@ export const DatabaseModule = TypeOrmModule.forRootAsync({
// Debug/Development Options
//
// logging: true,
//logging: true,
//
// synchronize: true,
// migrationsRun: false,

View file

@ -0,0 +1,68 @@
import {
MigrationInterface,
QueryRunner,
Table,
TableIndex,
TableForeignKey,
} from "typeorm";
import { TableColumnOptions } from "typeorm/schema-builder/options/TableColumnOptions";
const primaryUUIDColumn: TableColumnOptions = {
name: "id",
type: "uuid",
isPrimary: true,
isGenerated: true,
generationStrategy: "uuid",
};
export class CreateSpotifyImportTables0000000000009
implements MigrationInterface
{
async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.createTable(
new Table({
name: "spotify_extended_streaming_history_listen",
columns: [
primaryUUIDColumn,
{ name: "userId", type: "uuid" },
{ name: "playedAt", type: "timestamp" },
{ name: "spotifyTrackUri", type: "varchar" },
{ name: "trackId", type: "uuid", isNullable: true },
{ name: "listenId", type: "uuid", isNullable: true },
],
indices: [
new TableIndex({
name: "IDX_SPOTIFY_EXTENDED_STREAMING_HISTORY_LISTEN_USER_PLAYED_AT",
columnNames: ["userId", "playedAt", "spotifyTrackUri"],
isUnique: true,
}),
],
foreignKeys: [
new TableForeignKey({
name: "FK_SPOTIFY_EXTENDED_STREAMING_HISTORY_LISTEN_USER_ID",
columnNames: ["userId"],
referencedColumnNames: ["id"],
referencedTableName: "user",
}),
new TableForeignKey({
name: "FK_SPOTIFY_EXTENDED_STREAMING_HISTORY_LISTEN_TRACK_ID",
columnNames: ["trackId"],
referencedColumnNames: ["id"],
referencedTableName: "track",
}),
new TableForeignKey({
name: "FK_SPOTIFY_EXTENDED_STREAMING_HISTORY_LISTEN_LISTEN_ID",
columnNames: ["listenId"],
referencedColumnNames: ["id"],
referencedTableName: "listen",
}),
],
}),
true,
);
}
async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.dropTable("spotify_extended_streaming_history_listen");
}
}

View file

@ -3,10 +3,6 @@ import { Repository, SelectQueryBuilder } from "typeorm";
import { EntityRepository } from "../database/entity-repository";
import { Interval } from "../reports/interval";
import { User } from "../users/user.entity";
import {
CreateListenRequestDto,
CreateListenResponseDto,
} from "./dto/create-listen.dto";
import { Listen } from "./listen.entity";
export class ListenScopes extends SelectQueryBuilder<Listen> {
@ -37,52 +33,4 @@ export class ListenRepository extends Repository<Listen> {
get scoped(): ListenScopes {
return new ListenScopes(this.createQueryBuilder("listen"));
}
async insertNoConflict({
user,
track,
playedAt,
}: CreateListenRequestDto): Promise<CreateListenResponseDto> {
const result = await this.createQueryBuilder()
.insert()
.values({
user,
track,
playedAt,
})
.onConflict('("playedAt", "trackId", "userId") DO NOTHING')
.execute();
const [insertedRowIdentifier] = result.identifiers;
if (!insertedRowIdentifier) {
// We did not insert a new listen, it already existed
return {
listen: await this.findOneBy({ user, track, playedAt }),
isDuplicate: true,
};
}
return {
listen: await this.findOneBy({ id: insertedRowIdentifier.id }),
isDuplicate: false,
};
}
/**
*
* @param rows
* @returns A list of all new (non-duplicate) listens
*/
async insertsNoConflict(rows: CreateListenRequestDto[]): Promise<Listen[]> {
const result = await this.createQueryBuilder()
.insert()
.values(rows)
.orIgnore()
.execute();
return this.findBy(
result.identifiers.filter(Boolean).map(({ id }) => ({ id })),
);
}
}

View file

@ -4,9 +4,7 @@ import {
paginate,
PaginationTypeEnum,
} from "nestjs-typeorm-paginate";
import { Track } from "../music-library/track.entity";
import { User } from "../users/user.entity";
import { CreateListenResponseDto } from "./dto/create-listen.dto";
import { GetListensDto } from "./dto/get-listens.dto";
import { Listen } from "./listen.entity";
import { ListenRepository, ListenScopes } from "./listen.repository";
@ -35,39 +33,6 @@ describe("ListensService", () => {
expect(listenRepository).toBeDefined();
});
describe("createListen", () => {
let user: User;
let track: Track;
let playedAt: Date;
let response: CreateListenResponseDto;
beforeEach(() => {
user = { id: "USER" } as User;
track = { id: "TRACK" } as Track;
playedAt = new Date("2021-01-01T00:00:00Z");
response = {
listen: {
id: "LISTEN",
} as Listen,
isDuplicate: true,
};
listenRepository.insertNoConflict = jest.fn().mockResolvedValue(response);
});
it("creates the listen", async () => {
await expect(
service.createListen({ user, track, playedAt }),
).resolves.toEqual(response);
expect(listenRepository.insertNoConflict).toHaveBeenCalledTimes(1);
expect(listenRepository.insertNoConflict).toHaveBeenLastCalledWith({
user,
track,
playedAt,
});
});
});
describe("getListens", () => {
let options: GetListensDto & IPaginationOptions;
let user: User;

View file

@ -1,14 +1,12 @@
import { Injectable } from "@nestjs/common";
import { Span } from "nestjs-otel";
import {
IPaginationOptions,
paginate,
Pagination,
PaginationTypeEnum,
} from "nestjs-typeorm-paginate";
import {
CreateListenRequestDto,
CreateListenResponseDto,
} from "./dto/create-listen.dto";
import { CreateListenRequestDto } from "./dto/create-listen.dto";
import { GetListensDto } from "./dto/get-listens.dto";
import { Listen } from "./listen.entity";
import { ListenRepository, ListenScopes } from "./listen.repository";
@ -17,20 +15,7 @@ import { ListenRepository, ListenScopes } from "./listen.repository";
export class ListensService {
constructor(private readonly listenRepository: ListenRepository) {}
async createListen({
user,
track,
playedAt,
}: CreateListenRequestDto): Promise<CreateListenResponseDto> {
const response = await this.listenRepository.insertNoConflict({
user,
track,
playedAt,
});
return response;
}
@Span()
async createListens(
listensData: CreateListenRequestDto[],
): Promise<Listen[]> {
@ -46,9 +31,11 @@ export class ListensService {
),
);
return this.listenRepository.save(
const newListens = await this.listenRepository.save(
missingListens.map((entry) => this.listenRepository.create(entry)),
);
return [...existingListens, ...newListens];
}
async getListens(

View file

@ -43,6 +43,7 @@ async function bootstrap() {
const app = await NestFactory.create<NestExpressApplication>(AppModule, {
bufferLogs: true,
rawBody: true,
});
app.useLogger(app.get(Logger));
app.useGlobalPipes(
@ -51,6 +52,10 @@ async function bootstrap() {
transformOptions: { enableImplicitConversion: true },
}),
);
app.useBodyParser("json", {
limit:
"10mb" /* Need large bodies for Spotify Extended Streaming History */,
});
app.enableShutdownHooks();
const configService = app.get<ConfigService>(ConfigService);

View file

@ -1,5 +1,6 @@
export class FindTrackDto {
spotify: {
id: string;
id?: string;
uri?: string;
};
}

View file

@ -175,9 +175,7 @@ export class MusicLibraryService {
}
async findTrack(query: FindTrackDto): Promise<Track | undefined> {
return this.trackRepository.findOneBy({
spotify: { id: query.spotify.id },
});
return this.trackRepository.findOneBy(query);
}
async findTracks(query: FindTrackDto[]): Promise<Track[]> {

5
src/override.d.ts vendored
View file

@ -1,5 +0,0 @@
// Issue with opentelemetry-js: https://github.com/open-telemetry/opentelemetry-js/issues/3580#issuecomment-1701157270
export {};
declare global {
type BlobPropertyBag = unknown;
}

View file

@ -33,11 +33,11 @@ export class SchedulerService implements OnApplicationBootstrap {
}
private async setupSpotifyCrawlerSupervisor(): Promise<void> {
await this.superviseImportJobsJobService.schedule("*/1 * * * *", {}, {});
// await this.superviseImportJobsJobService.schedule("*/1 * * * *", {}, {});
}
@Span()
@CrawlerSupervisorJob.Handle()
// @CrawlerSupervisorJob.Handle()
async superviseImportJobs(): Promise<void> {
this.logger.log("Starting crawler jobs");
const userInfo = await this.spotifyService.getCrawlableUserInfo();

View file

@ -0,0 +1,13 @@
import { ApiProperty } from "@nestjs/swagger";
export class ExtendedStreamingHistoryStatusDto {
@ApiProperty({
type: Number,
})
total: number;
@ApiProperty({
type: Number,
})
imported: number;
}

View file

@ -0,0 +1,13 @@
import { ApiProperty } from "@nestjs/swagger";
import { ArrayMaxSize } from "class-validator";
import { SpotifyExtendedStreamingHistoryItemDto } from "./spotify-extended-streaming-history-item.dto";
export class ImportExtendedStreamingHistoryDto {
@ApiProperty({
type: SpotifyExtendedStreamingHistoryItemDto,
isArray: true,
maxItems: 50_000,
})
@ArrayMaxSize(50_000) // File size is ~16k by default, might need refactoring if Spotify starts exporting larger files
listens: SpotifyExtendedStreamingHistoryItemDto[];
}

View file

@ -0,0 +1,9 @@
import { ApiProperty } from "@nestjs/swagger";
export class SpotifyExtendedStreamingHistoryItemDto {
@ApiProperty({ format: "iso8601", example: "2018-11-30T08:33:33Z" })
ts: string;
@ApiProperty({ example: "spotify:track:6askbS4pEVWbbDnUGEXh3G" })
spotify_track_uri: string;
}

View file

@ -0,0 +1,32 @@
import { Body as NestBody, Controller, Get, Post } from "@nestjs/common";
import { ApiBody, ApiTags } from "@nestjs/swagger";
import { AuthAccessToken } from "../../../auth/decorators/auth-access-token.decorator";
import { ReqUser } from "../../../auth/decorators/req-user.decorator";
import { User } from "../../../users/user.entity";
import { ExtendedStreamingHistoryStatusDto } from "./dto/extended-streaming-history-status.dto";
import { ImportExtendedStreamingHistoryDto } from "./dto/import-extended-streaming-history.dto";
import { ImportService } from "./import.service";
@ApiTags("import")
@Controller("api/v1/import")
export class ImportController {
constructor(private readonly importService: ImportService) {}
@Post("extended-streaming-history")
@ApiBody({ type: () => ImportExtendedStreamingHistoryDto })
@AuthAccessToken()
async importExtendedStreamingHistory(
@ReqUser() user: User,
@NestBody() data: ImportExtendedStreamingHistoryDto,
): Promise<void> {
return this.importService.importExtendedStreamingHistory(user, data);
}
@Get("extended-streaming-history/status")
@AuthAccessToken()
async getExtendedStreamingHistoryStatus(
@ReqUser() user: User,
): Promise<ExtendedStreamingHistoryStatusDto> {
return this.importService.getExtendedStreamingHistoryStatus(user);
}
}

View file

@ -0,0 +1,177 @@
import { JobService } from "@apricote/nest-pg-boss";
import { Injectable, Logger } from "@nestjs/common";
import { uniq } from "lodash";
import { Span } from "nestjs-otel";
import type { Job } from "pg-boss";
import { ListensService } from "../../../listens/listens.service";
import { User } from "../../../users/user.entity";
import { SpotifyService } from "../spotify.service";
import { ExtendedStreamingHistoryStatusDto } from "./dto/extended-streaming-history-status.dto";
import { ImportExtendedStreamingHistoryDto } from "./dto/import-extended-streaming-history.dto";
import {
IProcessSpotifyExtendedStreamingHistoryListenJob,
ProcessSpotifyExtendedStreamingHistoryListenJob,
} from "./jobs";
import { SpotifyExtendedStreamingHistoryListenRepository } from "./listen.repository";
@Injectable()
export class ImportService {
private readonly logger = new Logger(this.constructor.name);
constructor(
private readonly importListenRepository: SpotifyExtendedStreamingHistoryListenRepository,
@ProcessSpotifyExtendedStreamingHistoryListenJob.Inject()
private readonly processListenJobService: JobService<IProcessSpotifyExtendedStreamingHistoryListenJob>,
private readonly spotifyService: SpotifyService,
private readonly listensService: ListensService,
) {}
@Span()
async importExtendedStreamingHistory(
user: User,
{ listens: importListens }: ImportExtendedStreamingHistoryDto,
): Promise<void> {
// IDK what's happening, but my personal data set has entries with duplicate
// listens? might be related to offline mode.
// Anyway, this cleans it up:
const uniqEntries = new Set();
const uniqueListens = importListens.filter((listen) => {
const key = `${listen.spotify_track_uri}-${listen.ts}`;
if (!uniqEntries.has(key)) {
// New entry
uniqEntries.add(key);
return true;
}
return false;
});
let listens = uniqueListens.map((listenData) =>
this.importListenRepository.create({
user,
playedAt: new Date(listenData.ts),
spotifyTrackUri: listenData.spotify_track_uri,
}),
);
// Save listens to import table
const insertResult = await this.importListenRepository.upsert(listens, [
"user",
"playedAt",
"spotifyTrackUri",
]);
const processJobs = insertResult.identifiers.map((listen) => ({
data: {
id: listen.id,
},
singletonKey: listen.id,
retryLimit: 10,
retryDelay: 5,
retryBackoff: true,
}));
// Schedule jobs to process imports
await this.processListenJobService.insert(processJobs);
}
@ProcessSpotifyExtendedStreamingHistoryListenJob.Handle({
// Spotify API "Get Several XY" allows max 50 IDs
batchSize: 50,
newJobCheckInterval: 500,
})
@Span()
async processListens(
jobs: Job<IProcessSpotifyExtendedStreamingHistoryListenJob>[],
): Promise<void> {
this.logger.debug(
{ jobs: jobs.length },
"processing extended streaming history listens",
);
const importListens = await this.importListenRepository.findBy(
jobs.map((job) => ({ id: job.data.id })),
);
const listensWithoutTracks = importListens.filter(
(importListen) => !importListen.track,
);
if (listensWithoutTracks.length > 0) {
const missingTrackIDs = uniq(
listensWithoutTracks.map((importListen) =>
importListen.spotifyTrackUri.replace("spotify:track:", ""),
),
);
const tracks = await this.spotifyService.importTracks(missingTrackIDs);
listensWithoutTracks.forEach((listen) => {
listen.track = tracks.find(
(track) => listen.spotifyTrackUri === track.spotify.uri,
);
if (!listen.track) {
this.logger.warn(
{ listen },
"could not find track for extended streaming history listen",
);
throw new Error(
`could not find track for extended streaming history listen`,
);
}
});
// Using upsert instead of save to only do a single query
await this.importListenRepository.upsert(listensWithoutTracks, ["id"]);
}
const listensWithoutListen = importListens.filter(
(importListen) => !importListen.listen,
);
if (listensWithoutListen.length > 0) {
const listens = await this.listensService.createListens(
listensWithoutListen.map((listen) => ({
user: listen.user,
track: listen.track,
playedAt: listen.playedAt,
})),
);
listensWithoutListen.forEach((importListen) => {
importListen.listen = listens.find(
(listen) =>
importListen.user.id === listen.user.id &&
importListen.track.id === listen.track.id &&
importListen.playedAt.getTime() === listen.playedAt.getTime(),
);
if (!importListen.listen) {
this.logger.warn(
{ listen: importListen, listens: listens },
"could not find listen for extended streaming history listen",
);
throw new Error(
`could not find listen for extended streaming history listen`,
);
}
});
// Using upsert instead of save to only do a single query
await this.importListenRepository.upsert(listensWithoutListen, ["id"]);
}
}
@Span()
async getExtendedStreamingHistoryStatus(
user: User,
): Promise<ExtendedStreamingHistoryStatusDto> {
const qb = this.importListenRepository
.createQueryBuilder("listen")
.where("listen.userId = :user", { user: user.id });
const [total, imported] = await Promise.all([
qb.clone().getCount(),
qb.clone().andWhere("listen.listenId IS NOT NULL").getCount(),
]);
return { total, imported };
}
}

View file

@ -0,0 +1,4 @@
export { ImportController } from "./import.controller";
export { ImportService } from "./import.service";
export { ProcessSpotifyExtendedStreamingHistoryListenJob } from "./jobs";
export { SpotifyExtendedStreamingHistoryListenRepository } from "./listen.repository";

View file

@ -0,0 +1,7 @@
import { createJob } from "@apricote/nest-pg-boss";
export type IProcessSpotifyExtendedStreamingHistoryListenJob = { id: string };
export const ProcessSpotifyExtendedStreamingHistoryListenJob =
createJob<IProcessSpotifyExtendedStreamingHistoryListenJob>(
"process-spotify-extended-streaming-history-listen",
);

View file

@ -0,0 +1,25 @@
import { Column, Entity, ManyToOne, PrimaryGeneratedColumn } from "typeorm";
import { Track } from "../../../music-library/track.entity";
import { User } from "../../../users/user.entity";
import { Listen } from "../../../listens/listen.entity";
@Entity({ name: "spotify_extended_streaming_history_listen" })
export class SpotifyExtendedStreamingHistoryListen {
@PrimaryGeneratedColumn("uuid")
id: string;
@ManyToOne(() => User, { eager: true })
user: User;
@Column({ type: "timestamp" })
playedAt: Date;
@Column()
spotifyTrackUri: string;
@ManyToOne(() => Track, { nullable: true, eager: true })
track?: Track;
@ManyToOne(() => Listen, { nullable: true, eager: true })
listen?: Listen;
}

View file

@ -0,0 +1,6 @@
import { Repository } from "typeorm";
import { EntityRepository } from "../../../database/entity-repository";
import { SpotifyExtendedStreamingHistoryListen } from "./listen.entity";
@EntityRepository(SpotifyExtendedStreamingHistoryListen)
export class SpotifyExtendedStreamingHistoryListenRepository extends Repository<SpotifyExtendedStreamingHistoryListen> {}

View file

@ -1,25 +1,37 @@
import { PGBossModule } from "@apricote/nest-pg-boss";
import { Module } from "@nestjs/common";
import { TypeOrmRepositoryModule } from "../../database/entity-repository/typeorm-repository.module";
import { ListensModule } from "../../listens/listens.module";
import { MusicLibraryModule } from "../../music-library/music-library.module";
import { UsersModule } from "../../users/users.module";
import { ImportSpotifyJob } from "../jobs";
import {
ImportController,
ImportService,
ProcessSpotifyExtendedStreamingHistoryListenJob,
SpotifyExtendedStreamingHistoryListenRepository,
} from "./import-extended-streaming-history";
import { SpotifyApiModule } from "./spotify-api/spotify-api.module";
import { SpotifyAuthModule } from "./spotify-auth/spotify-auth.module";
import { SpotifyService } from "./spotify.service";
@Module({
imports: [
PGBossModule.forJobs([ImportSpotifyJob]),
PGBossModule.forJobs([
ImportSpotifyJob,
ProcessSpotifyExtendedStreamingHistoryListenJob,
]),
TypeOrmRepositoryModule.for([
SpotifyExtendedStreamingHistoryListenRepository,
]),
UsersModule,
ListensModule,
MusicLibraryModule,
SpotifyApiModule,
SpotifyAuthModule,
],
providers: [SpotifyService],
providers: [SpotifyService, ImportService],
controllers: [ImportController],
exports: [SpotifyService],
})
export class SpotifyModule {
constructor(private readonly spotifyService: SpotifyService) {}
}
export class SpotifyModule {}