From 0853be35c6e913dda5a86413dd3c6776b292387d Mon Sep 17 00:00:00 2001 From: Matthew McGarvey Date: Fri, 16 Oct 2020 16:58:45 -0500 Subject: [PATCH] Migrate refresh and subscribe to feeds jobs --- src/invidious.cr | 4 +- src/invidious/helpers/jobs.cr | 74 ------------------- src/invidious/jobs/refresh_feeds_job.cr | 77 ++++++++++++++++++++ src/invidious/jobs/subscribe_to_feeds_job.cr | 52 +++++++++++++ 4 files changed, 131 insertions(+), 76 deletions(-) create mode 100644 src/invidious/jobs/refresh_feeds_job.cr create mode 100644 src/invidious/jobs/subscribe_to_feeds_job.cr diff --git a/src/invidious.cr b/src/invidious.cr index 29a412a1..6a9d4970 100644 --- a/src/invidious.cr +++ b/src/invidious.cr @@ -160,8 +160,8 @@ end # Start jobs Invidious::Jobs.register Invidious::Jobs::RefreshChannelsJob.new(PG_DB, logger, config) -refresh_feeds(PG_DB, logger, config) -subscribe_to_feeds(PG_DB, logger, HMAC_KEY, config) +Invidious::Jobs.register Invidious::Jobs::RefreshFeedsJob.new(PG_DB, logger, config) +Invidious::Jobs.register Invidious::Jobs::SubscribeToFeedsJob.new(PG_DB, logger, config, HMAC_KEY) statistics = { "error" => "Statistics are not availabile.", diff --git a/src/invidious/helpers/jobs.cr b/src/invidious/helpers/jobs.cr index 11eb7def..8ce6690e 100644 --- a/src/invidious/helpers/jobs.cr +++ b/src/invidious/helpers/jobs.cr @@ -1,77 +1,3 @@ -def refresh_feeds(db, logger, config) - max_channel = Channel(Int32).new - spawn do - max_threads = max_channel.receive - active_threads = 0 - active_channel = Channel(Bool).new - - loop do - db.query("SELECT email FROM users WHERE feed_needs_update = true OR feed_needs_update IS NULL") do |rs| - rs.each do - email = rs.read(String) - view_name = "subscriptions_#{sha256(email)}" - - if active_threads >= max_threads - if active_channel.receive - active_threads -= 1 - end - end - - active_threads += 1 - spawn do - begin - # Drop outdated views - column_array = get_column_array(db, view_name) - ChannelVideo.type_array.each_with_index do |name, i| - if name != column_array[i]? - logger.puts("DROP MATERIALIZED VIEW #{view_name}") - db.exec("DROP MATERIALIZED VIEW #{view_name}") - raise "view does not exist" - end - end - - if !db.query_one("SELECT pg_get_viewdef('#{view_name}')", as: String).includes? "WHERE ((cv.ucid = ANY (u.subscriptions))" - logger.puts("Materialized view #{view_name} is out-of-date, recreating...") - db.exec("DROP MATERIALIZED VIEW #{view_name}") - end - - db.exec("REFRESH MATERIALIZED VIEW #{view_name}") - db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email) - rescue ex - # Rename old views - begin - legacy_view_name = "subscriptions_#{sha256(email)[0..7]}" - - db.exec("SELECT * FROM #{legacy_view_name} LIMIT 0") - logger.puts("RENAME MATERIALIZED VIEW #{legacy_view_name}") - db.exec("ALTER MATERIALIZED VIEW #{legacy_view_name} RENAME TO #{view_name}") - rescue ex - begin - # While iterating through, we may have an email stored from a deleted account - if db.query_one?("SELECT true FROM users WHERE email = $1", email, as: Bool) - logger.puts("CREATE #{view_name}") - db.exec("CREATE MATERIALIZED VIEW #{view_name} AS #{MATERIALIZED_VIEW_SQL.call(email)}") - db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email) - end - rescue ex - logger.puts("REFRESH #{email} : #{ex.message}") - end - end - end - - active_channel.send(true) - end - end - end - - sleep 5.seconds - Fiber.yield - end - end - - max_channel.send(config.feed_threads) -end - def subscribe_to_feeds(db, logger, key, config) if config.use_pubsub_feeds case config.use_pubsub_feeds diff --git a/src/invidious/jobs/refresh_feeds_job.cr b/src/invidious/jobs/refresh_feeds_job.cr new file mode 100644 index 00000000..eebdf0f3 --- /dev/null +++ b/src/invidious/jobs/refresh_feeds_job.cr @@ -0,0 +1,77 @@ +class Invidious::Jobs::RefreshFeedsJob < Invidious::Jobs::BaseJob + private getter db : DB::Database + private getter logger : Invidious::LogHandler + private getter config : Config + + def initialize(@db, @logger, @config) + end + + def begin + max_threads = config.feed_threads + active_threads = 0 + active_channel = Channel(Bool).new + + loop do + db.query("SELECT email FROM users WHERE feed_needs_update = true OR feed_needs_update IS NULL") do |rs| + rs.each do + email = rs.read(String) + view_name = "subscriptions_#{sha256(email)}" + + if active_threads >= max_threads + if active_channel.receive + active_threads -= 1 + end + end + + active_threads += 1 + spawn do + begin + # Drop outdated views + column_array = get_column_array(db, view_name) + ChannelVideo.type_array.each_with_index do |name, i| + if name != column_array[i]? + logger.puts("DROP MATERIALIZED VIEW #{view_name}") + db.exec("DROP MATERIALIZED VIEW #{view_name}") + raise "view does not exist" + end + end + + if !db.query_one("SELECT pg_get_viewdef('#{view_name}')", as: String).includes? "WHERE ((cv.ucid = ANY (u.subscriptions))" + logger.puts("Materialized view #{view_name} is out-of-date, recreating...") + db.exec("DROP MATERIALIZED VIEW #{view_name}") + end + + db.exec("REFRESH MATERIALIZED VIEW #{view_name}") + db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email) + rescue ex + # Rename old views + begin + legacy_view_name = "subscriptions_#{sha256(email)[0..7]}" + + db.exec("SELECT * FROM #{legacy_view_name} LIMIT 0") + logger.puts("RENAME MATERIALIZED VIEW #{legacy_view_name}") + db.exec("ALTER MATERIALIZED VIEW #{legacy_view_name} RENAME TO #{view_name}") + rescue ex + begin + # While iterating through, we may have an email stored from a deleted account + if db.query_one?("SELECT true FROM users WHERE email = $1", email, as: Bool) + logger.puts("CREATE #{view_name}") + db.exec("CREATE MATERIALIZED VIEW #{view_name} AS #{MATERIALIZED_VIEW_SQL.call(email)}") + db.exec("UPDATE users SET feed_needs_update = false WHERE email = $1", email) + end + rescue ex + logger.puts("REFRESH #{email} : #{ex.message}") + end + end + end + + active_channel.send(true) + end + end + end + + sleep 5.seconds + Fiber.yield + end + end +end diff --git a/src/invidious/jobs/subscribe_to_feeds_job.cr b/src/invidious/jobs/subscribe_to_feeds_job.cr new file mode 100644 index 00000000..3d3b2218 --- /dev/null +++ b/src/invidious/jobs/subscribe_to_feeds_job.cr @@ -0,0 +1,52 @@ +class Invidious::Jobs::SubscribeToFeedsJob < Invidious::Jobs::BaseJob + private getter db : DB::Database + private getter logger : Invidious::LogHandler + private getter hmac_key : String + private getter config : Config + + def initialize(@db, @logger, @config, @hmac_key) + end + + def begin + max_threads = 1 + if config.use_pubsub_feeds.is_a?(Int32) + max_threads = config.use_pubsub_feeds.as(Int32) + end + + active_threads = 0 + active_channel = Channel(Bool).new + + loop do + db.query_all("SELECT id FROM channels WHERE CURRENT_TIMESTAMP - subscribed > interval '4 days' OR subscribed IS NULL") do |rs| + rs.each do + ucid = rs.read(String) + + if active_threads >= max_threads.as(Int32) + if active_channel.receive + active_threads -= 1 + end + end + + active_threads += 1 + + spawn do + begin + response = subscribe_pubsub(ucid, hmac_key, config) + + if response.status_code >= 400 + logger.puts("#{ucid} : #{response.body}") + end + rescue ex + logger.puts("#{ucid} : #{ex.message}") + end + + active_channel.send(true) + end + end + end + + sleep 1.minute + Fiber.yield + end + end +end