Fix leaked fiber in notification_stream_handler

Of the technically four fibers spawned by `create_notification_stream`
two of them are wrapped around an ensure clause to always unsubscribe
itself from the notification job after an error, or when it simply
finishes.

The first is the heartbeat fiber, which is also actually the main
fiber of the route handler. The second is a fiber that awaits for
notification pushes from the notification job through the
`connection` channel. When an error occurs within the main heartbeat
fiber, the ensure clause is executed and the function will unsubscribe
itself from receiving any pushes from the notification job.

The problem however is that this could (will almost always actually)
occur when the notification receiver fiber is awaiting a value from
the notification job. Except the job will no longer be able to
send anything to the receiver since they were unsubscribed by the
heartbeat fiber just a moment ago.

The notification receiver fiber will now block indefinitely.

And in doing so will pretty much prevent the entire execution stack of
the fiber and the `create_notification_stream` function from getting
garbage collected.

The IO buffers for the contents of the request and
response will stay referenced, the underlying TCP/TLS sockets will
become inaccessible and leaked, the parsed structures of the YT's
massive JSON objects will stay allocated, etc.

This PR simply merges the two into a single fiber, via a select
statement ensuring that there will be no concurrency problems.
This commit is contained in:
syeopite 2025-05-31 04:29:15 -07:00
parent df8839d1f0
commit ded671ea6d
No known key found for this signature in database
GPG Key ID: A73C186DA3955A1A

View File

@ -119,11 +119,11 @@ def create_notification_stream(env, topics, connection_channel)
end
end
spawn do
begin
heartbeat_timer = 0.seconds
loop do
event = connection.receive
select
when event = connection.receive
notification = JSON.parse(event.payload)
topic = notification["topic"].as_s
video_id = notification["videoId"].as_s
@ -139,24 +139,16 @@ def create_notification_stream(env, topics, connection_channel)
env.response.puts "id: #{id}"
env.response.puts "data: #{response.to_json}"
env.response.puts
env.response.flush
id += 1
end
rescue ex
ensure
connection_channel.send({false, connection})
end
when timeout heartbeat_timer
# Send heartbeat on every timeout
env.response.puts ":keepalive #{Time.utc.to_unix}"
end
begin
# Send heartbeat
loop do
env.response.puts ":keepalive #{Time.utc.to_unix}"
heartbeat_timer = ((20 + rand(11)).seconds)
env.response.puts
env.response.flush
sleep (20 + rand(11)).seconds
end
rescue ex
ensure