This commit is contained in:
syeopite 2025-06-01 20:09:01 +00:00 committed by GitHub
commit 38aaade5b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 17 additions and 18 deletions

View File

@ -119,11 +119,11 @@ def create_notification_stream(env, topics, connection_channel)
end end
end end
spawn do begin
begin heartbeat_timer = 0.seconds
loop do loop do
event = connection.receive select
when event = connection.receive
notification = JSON.parse(event.payload) notification = JSON.parse(event.payload)
topic = notification["topic"].as_s topic = notification["topic"].as_s
video_id = notification["videoId"].as_s video_id = notification["videoId"].as_s
@ -139,27 +139,20 @@ def create_notification_stream(env, topics, connection_channel)
env.response.puts "id: #{id}" env.response.puts "id: #{id}"
env.response.puts "data: #{response.to_json}" env.response.puts "data: #{response.to_json}"
env.response.puts
env.response.flush
id += 1 id += 1
when timeout heartbeat_timer
# Send heartbeat on every timeout
env.response.puts ":keepalive #{Time.utc.to_unix}"
end end
rescue ex
ensure
connection_channel.send({false, connection})
end
end
begin heartbeat_timer = ((20 + rand(11)).seconds)
# Send heartbeat
loop do
env.response.puts ":keepalive #{Time.utc.to_unix}"
env.response.puts env.response.puts
env.response.flush env.response.flush
sleep (20 + rand(11)).seconds
end end
rescue ex rescue ex
ensure ensure
connection.close
connection_channel.send({false, connection}) connection_channel.send({false, connection})
end end
end end

View File

@ -32,7 +32,13 @@ class Invidious::Jobs::NotificationJob < Invidious::Jobs::BaseJob
def begin def begin
connections = [] of ::Channel(PQ::Notification) connections = [] of ::Channel(PQ::Notification)
PG.connect_listen(pg_url, "notifications") { |event| connections.each(&.send(event)) } PG.connect_listen(pg_url, "notifications") do |event|
connections.each do |channel|
channel.send(event)
rescue ::Channel::ClosedError
# Notification stream was closed.
end
end
# hash of channels to their videos (id+published) that need notifying # hash of channels to their videos (id+published) that need notifying
to_notify = Hash(String, Set(VideoNotification)).new( to_notify = Hash(String, Set(VideoNotification)).new(