From ded671ea6d873e48156fc0f1ec6e1f7e6eec8dd3 Mon Sep 17 00:00:00 2001 From: syeopite Date: Sat, 31 May 2025 04:29:15 -0700 Subject: [PATCH] Fix leaked fiber in `notification_stream_handler` Of the technically four fibers spawned by `create_notification_stream` two of them are wrapped around an ensure clause to always unsubscribe itself from the notification job after an error, or when it simply finishes. The first is the heartbeat fiber, which is also actually the main fiber of the route handler. The second is a fiber that awaits for notification pushes from the notification job through the `connection` channel. When an error occurs within the main heartbeat fiber, the ensure clause is executed and the function will unsubscribe itself from receiving any pushes from the notification job. The problem however is that this could (will almost always actually) occur when the notification receiver fiber is awaiting a value from the notification job. Except the job will no longer be able to send anything to the receiver since they were unsubscribed by the heartbeat fiber just a moment ago. The notification receiver fiber will now block indefinitely. And in doing so will pretty much prevent the entire execution stack of the fiber and the `create_notification_stream` function from getting garbage collected. The IO buffers for the contents of the request and response will stay referenced, the underlying TCP/TLS sockets will become inaccessible and leaked, the parsed structures of the YT's massive JSON objects will stay allocated, etc. This PR simply merges the two into a single fiber, via a select statement ensuring that there will be no concurrency problems. --- src/invidious/helpers/helpers.cr | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/src/invidious/helpers/helpers.cr b/src/invidious/helpers/helpers.cr index 6add0237..516b0a6e 100644 --- a/src/invidious/helpers/helpers.cr +++ b/src/invidious/helpers/helpers.cr @@ -119,11 +119,11 @@ def create_notification_stream(env, topics, connection_channel) end end - spawn do - begin - loop do - event = connection.receive - + begin + heartbeat_timer = 0.seconds + loop do + select + when event = connection.receive notification = JSON.parse(event.payload) topic = notification["topic"].as_s video_id = notification["videoId"].as_s @@ -139,24 +139,16 @@ def create_notification_stream(env, topics, connection_channel) env.response.puts "id: #{id}" env.response.puts "data: #{response.to_json}" - env.response.puts - env.response.flush id += 1 + when timeout heartbeat_timer + # Send heartbeat on every timeout + env.response.puts ":keepalive #{Time.utc.to_unix}" end - rescue ex - ensure - connection_channel.send({false, connection}) - end - end - begin - # Send heartbeat - loop do - env.response.puts ":keepalive #{Time.utc.to_unix}" + heartbeat_timer = ((20 + rand(11)).seconds) env.response.puts env.response.flush - sleep (20 + rand(11)).seconds end rescue ex ensure