feat(api): use nest-pg-boss for spotify interactions

This commit is contained in:
Julian Tölle 2022-06-29 21:29:00 +02:00
parent cd672a408e
commit b9f92bbdfa
9 changed files with 314 additions and 72 deletions

View file

@ -1,5 +1,4 @@
import { Module } from "@nestjs/common";
import { ScheduleModule } from "@nestjs/schedule";
import { ServeStaticModule } from "@nestjs/serve-static";
import { RavenModule } from "nest-raven";
import { join } from "path";
@ -14,13 +13,14 @@ import { ReportsModule } from "./reports/reports.module";
import { SourcesModule } from "./sources/sources.module";
import { UsersModule } from "./users/users.module";
import { OpenTelemetryModule } from "./open-telemetry/open-telemetry.module";
import { JobQueueModule } from "./job-queue/job-queue.module";
@Module({
imports: [
LoggerModule,
ConfigModule,
DatabaseModule,
ScheduleModule.forRoot(),
JobQueueModule,
ServeStaticModule.forRoot({
rootPath: join(__dirname, "..", "static"),
exclude: ["/api*"],

View file

@ -0,0 +1,23 @@
import { Module } from "@nestjs/common";
import { PGBossModule } from "@apricote/nest-pg-boss";
import { ConfigService } from "@nestjs/config";
@Module({
imports: [
PGBossModule.forRootAsync({
application_name: "listory",
useFactory: (config: ConfigService) => ({
// Connection details
host: config.get<string>("DB_HOST"),
user: config.get<string>("DB_USERNAME"),
password: config.get<string>("DB_PASSWORD"),
database: config.get<string>("DB_DATABASE"),
schema: "public",
max: config.get<number>("DB_POOL_MAX"),
}),
inject: [ConfigService],
}),
],
exports: [PGBossModule],
})
export class JobQueueModule {}

14
src/sources/jobs.ts Normal file
View file

@ -0,0 +1,14 @@
import { createJob } from "@apricote/nest-pg-boss";
export type ICrawlerSupervisorJob = {};
export const CrawlerSupervisorJob = createJob<ICrawlerSupervisorJob>(
"spotify-crawler-supervisor"
);
export type IUpdateSpotifyLibraryJob = {};
export const UpdateSpotifyLibraryJob = createJob<IUpdateSpotifyLibraryJob>(
"update-spotify-library"
);
export type IImportSpotifyJob = { userID: string };
export const ImportSpotifyJob = createJob<IImportSpotifyJob>("import-spotify");

View file

@ -1,8 +1,15 @@
import { Injectable, Logger, OnApplicationBootstrap } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import { SchedulerRegistry } from "@nestjs/schedule";
import { captureException } from "@sentry/node";
import { SpotifyService } from "./spotify/spotify.service";
import {
CrawlerSupervisorJob,
ICrawlerSupervisorJob,
IImportSpotifyJob,
ImportSpotifyJob,
IUpdateSpotifyLibraryJob,
UpdateSpotifyLibraryJob,
} from "./jobs";
import { JobService } from "@apricote/nest-pg-boss";
@Injectable()
export class SchedulerService implements OnApplicationBootstrap {
@ -10,38 +17,42 @@ export class SchedulerService implements OnApplicationBootstrap {
constructor(
private readonly config: ConfigService,
private readonly registry: SchedulerRegistry,
private readonly spotifyService: SpotifyService
private readonly spotifyService: SpotifyService,
@CrawlerSupervisorJob.Inject()
private readonly superviseImportJobsJobService: JobService<ICrawlerSupervisorJob>,
@ImportSpotifyJob.Inject()
private readonly importSpotifyJobService: JobService<IImportSpotifyJob>,
@UpdateSpotifyLibraryJob.Inject()
private readonly updateSpotifyLibraryJobService: JobService<IUpdateSpotifyLibraryJob>
) {}
onApplicationBootstrap() {
this.setupSpotifyCrawler();
this.setupSpotifyMusicLibraryUpdater();
async onApplicationBootstrap() {
await this.setupSpotifyCrawlerSupervisor();
await this.setupSpotifyMusicLibraryUpdater();
}
private setupSpotifyCrawler() {
const callback = () =>
this.spotifyService.runCrawlerForAllUsers().catch((err) => {
captureException(err);
this.logger.error(`Spotify crawler loop crashed! ${err.stack}`);
});
const timeoutMs =
this.config.get<number>("SPOTIFY_FETCH_INTERVAL_SEC") * 1000;
const interval = setInterval(callback, timeoutMs);
this.registry.addInterval("crawler_spotify", interval);
private async setupSpotifyCrawlerSupervisor(): Promise<void> {
await this.superviseImportJobsJobService.schedule("*/1 * * * *", {}, {});
}
private setupSpotifyMusicLibraryUpdater() {
const callback = () => {
this.spotifyService.runUpdaterForAllEntities();
};
const timeoutMs =
this.config.get<number>("SPOTIFY_UPDATE_INTERVAL_SEC") * 1000;
@CrawlerSupervisorJob.Handle()
async superviseImportJobs(): Promise<void> {
this.logger.log("Starting crawler jobs");
const users = await this.spotifyService.getCrawlableUserInfo();
const interval = setInterval(callback, timeoutMs);
await Promise.all(
users.map((user) =>
this.importSpotifyJobService.sendOnce({ userID: user.id }, {}, user.id)
)
);
}
this.registry.addInterval("updater_spotify", interval);
private async setupSpotifyMusicLibraryUpdater() {
await this.updateSpotifyLibraryJobService.schedule("*/1 * * * *", {}, {});
}
@UpdateSpotifyLibraryJob.Handle()
async updateSpotifyLibrary() {
this.spotifyService.runUpdaterForAllEntities();
}
}

View file

@ -1,9 +1,22 @@
import { Module } from "@nestjs/common";
import { PGBossModule } from "@apricote/nest-pg-boss";
import {
CrawlerSupervisorJob,
ImportSpotifyJob,
UpdateSpotifyLibraryJob,
} from "./jobs";
import { SchedulerService } from "./scheduler.service";
import { SpotifyModule } from "./spotify/spotify.module";
@Module({
imports: [SpotifyModule],
imports: [
SpotifyModule,
PGBossModule.forJobs([
CrawlerSupervisorJob,
ImportSpotifyJob,
UpdateSpotifyLibraryJob,
]),
],
providers: [SchedulerService],
})
export class SourcesModule {}

View file

@ -1,13 +1,16 @@
import { PGBossModule } from "@apricote/nest-pg-boss";
import { Module } from "@nestjs/common";
import { ListensModule } from "../../listens/listens.module";
import { MusicLibraryModule } from "../../music-library/music-library.module";
import { UsersModule } from "../../users/users.module";
import { ImportSpotifyJob } from "../jobs";
import { SpotifyApiModule } from "./spotify-api/spotify-api.module";
import { SpotifyAuthModule } from "./spotify-auth/spotify-auth.module";
import { SpotifyService } from "./spotify.service";
@Module({
imports: [
PGBossModule.forJobs([ImportSpotifyJob]),
UsersModule,
ListensModule,
MusicLibraryModule,
@ -17,4 +20,6 @@ import { SpotifyService } from "./spotify.service";
providers: [SpotifyService],
exports: [SpotifyService],
})
export class SpotifyModule {}
export class SpotifyModule {
constructor(private readonly spotifyService: SpotifyService) {}
}

View file

@ -8,6 +8,11 @@ import { MusicLibraryService } from "../../music-library/music-library.service";
import { Track } from "../../music-library/track.entity";
import { User } from "../../users/user.entity";
import { UsersService } from "../../users/users.service";
import {
IImportSpotifyJob,
ImportSpotifyJob,
UpdateSpotifyLibraryJob,
} from "../jobs";
import { AlbumObject } from "./spotify-api/entities/album-object";
import { ArtistObject } from "./spotify-api/entities/artist-object";
import { PlayHistoryObject } from "./spotify-api/entities/play-history-object";
@ -31,15 +36,19 @@ export class SpotifyService {
) {}
@Span()
async runCrawlerForAllUsers(): Promise<void> {
this.logger.debug("Starting Spotify crawler loop");
const users = await this.usersService.findAll();
async getCrawlableUserInfo(): Promise<User[]> {
return this.usersService.findAll();
}
for (const user of users) {
// We want to run this sequentially to avoid rate limits
// eslint-disable-next-line no-await-in-loop
await this.crawlListensForUser(user);
@ImportSpotifyJob.Handle()
async importSpotifyJobHandler({ userID }: IImportSpotifyJob): Promise<void> {
const user = await this.usersService.findById(userID);
if (!user) {
this.logger.warn("User for import job not found", { userID });
return;
}
await this.crawlListensForUser(user);
}
@Span()
@ -130,6 +139,7 @@ export class SpotifyService {
}
@Span()
@UpdateSpotifyLibraryJob.Handle()
async runUpdaterForAllEntities(): Promise<void> {
this.logger.debug("Starting Spotify updater loop");