diff --git a/src/invidious/helpers/helpers.cr b/src/invidious/helpers/helpers.cr index 6add0237..d72468f1 100644 --- a/src/invidious/helpers/helpers.cr +++ b/src/invidious/helpers/helpers.cr @@ -119,11 +119,11 @@ def create_notification_stream(env, topics, connection_channel) end end - spawn do - begin - loop do - event = connection.receive - + begin + heartbeat_timer = 0.seconds + loop do + select + when event = connection.receive notification = JSON.parse(event.payload) topic = notification["topic"].as_s video_id = notification["videoId"].as_s @@ -139,27 +139,20 @@ def create_notification_stream(env, topics, connection_channel) env.response.puts "id: #{id}" env.response.puts "data: #{response.to_json}" - env.response.puts - env.response.flush id += 1 + when timeout heartbeat_timer + # Send heartbeat on every timeout + env.response.puts ":keepalive #{Time.utc.to_unix}" end - rescue ex - ensure - connection_channel.send({false, connection}) - end - end - begin - # Send heartbeat - loop do - env.response.puts ":keepalive #{Time.utc.to_unix}" + heartbeat_timer = ((20 + rand(11)).seconds) env.response.puts env.response.flush - sleep (20 + rand(11)).seconds end rescue ex ensure + connection.close connection_channel.send({false, connection}) end end diff --git a/src/invidious/jobs/notification_job.cr b/src/invidious/jobs/notification_job.cr index 968ee47f..72b526d7 100644 --- a/src/invidious/jobs/notification_job.cr +++ b/src/invidious/jobs/notification_job.cr @@ -32,7 +32,13 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob def begin connections = [] of ::Channel(PQ::Notification) - PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) } + PG.connect_listen(pg_url, "notifications") do |event| + connections.each do |channel| + channel.send(event) + rescue ::Channel::ClosedError + # Notification stream was closed. + end + end # hash of channels to their videos (id+published) that need notifying to_notify = Hash(String, Set(VideoNotification)).new(