mirror of
https://git.nadeko.net/Fijxu/invidious.git
synced 2026-01-30 02:32:45 +00:00
Rework companion switcher
This commit is contained in:
@@ -1,110 +0,0 @@
|
||||
module BackendInfo
|
||||
extend self
|
||||
|
||||
enum Status
|
||||
Dead = 0
|
||||
Blocked = 1
|
||||
Working = 2
|
||||
end
|
||||
|
||||
struct CompanionData
|
||||
include JSON::Serializable
|
||||
property blocked : Bool = false
|
||||
@[JSON::Field(key: "blockedCount")]
|
||||
property blocked_count : Int64 = 0
|
||||
end
|
||||
|
||||
@@status : Array(Int32) = Array.new(CONFIG.invidious_companion.size, Status::Dead.to_i)
|
||||
@@csp : Array(String) = Array.new(CONFIG.invidious_companion.size, "")
|
||||
@@working_ends : Array(Int32) = Array(Int32).new(0)
|
||||
@@csp_mutex : Mutex = Mutex.new
|
||||
@@check_mutex : Mutex = Mutex.new
|
||||
|
||||
def check_backends
|
||||
check_companion()
|
||||
LOGGER.debug("Invidious companion: New working_ends \"#{@@working_ends}\"")
|
||||
LOGGER.debug("Invidious companion: New status \"#{@@status}\"")
|
||||
end
|
||||
|
||||
private def check_companion
|
||||
# Create Channels the size of CONFIG.invidious_companion
|
||||
comp_size = CONFIG.invidious_companion.size
|
||||
channels = Channel(Nil).new(comp_size)
|
||||
updated_ends = Array(Int32).new(0)
|
||||
updated_status = Array(Int32).new(CONFIG.invidious_companion.size, 0)
|
||||
|
||||
LOGGER.debug("Invidious companion: comp_size \"#{comp_size}\"")
|
||||
CONFIG.invidious_companion.each_with_index do |companion, index|
|
||||
spawn do
|
||||
begin
|
||||
client = HTTP::Client.new(companion.private_url)
|
||||
client.connect_timeout = 10.seconds
|
||||
response = client.get(CONFIG.check_backends_path)
|
||||
if response.status_code == 200
|
||||
if response.content_type == "application/json"
|
||||
body = response.body
|
||||
status_json = CompanionData.from_json(body)
|
||||
if status_json.blocked
|
||||
updated_ends, updated_status = self.set_status(index, updated_ends, updated_status, Status::Blocked, true)
|
||||
else
|
||||
updated_ends, updated_status = self.set_status(index, updated_ends, updated_status, Status::Working, true)
|
||||
end
|
||||
else
|
||||
updated_ends, updated_status = self.set_status(index, updated_ends, updated_status, Status::Working, true)
|
||||
end
|
||||
self.generate_csp([companion.public_url, companion.i2p_public_url], index)
|
||||
else
|
||||
_, updated_status = self.set_status(index, updated_ends, updated_status, Status::Dead, false)
|
||||
end
|
||||
rescue
|
||||
_, updated_status = self.set_status(index, updated_ends, updated_status, Status::Dead, false)
|
||||
ensure
|
||||
LOGGER.trace("Invidious companion: Done Index: \"#{index}\"")
|
||||
channels.send(nil)
|
||||
end
|
||||
end
|
||||
end
|
||||
# Wait until we receive a signal from them all
|
||||
LOGGER.debug("Invidious companion: Updating working_ends")
|
||||
comp_size.times { channels.receive }
|
||||
@@working_ends = updated_ends.sort!
|
||||
@@status = updated_status
|
||||
end
|
||||
|
||||
private def generate_csp(companion_url : Array(URI), index : Int32? = nil)
|
||||
@@csp_mutex.synchronize do
|
||||
@@csp[index] = ""
|
||||
companion_url.each do |url|
|
||||
fixed_url = "#{url.scheme}://#{url.host}#{url.port ? ":#{url.port}" : ""}"
|
||||
@@csp[index] += " #{fixed_url}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private def set_status(index, updated_ends, updated_status, status, push = false)
|
||||
@@check_mutex.synchronize do
|
||||
updated_status[index] = status.to_i
|
||||
updated_ends.push(index) if push
|
||||
end
|
||||
return {updated_ends, updated_status}
|
||||
end
|
||||
|
||||
def get_status
|
||||
# Shouldn't need to lock since we never edit this array, only change the pointer.
|
||||
return @@status
|
||||
end
|
||||
|
||||
def get_working_ends
|
||||
# Shouldn't need to lock since we never edit this array, only change the pointer.
|
||||
return @@working_ends
|
||||
end
|
||||
|
||||
def get_csp(index : Int32)
|
||||
# A little mutex to prevent sending a partial CSP header
|
||||
# Not sure if this is necessary. But if the @@csp[index] is being assigned
|
||||
# at the same time when it's being accessed, a data race will appear
|
||||
@@csp_mutex.synchronize do
|
||||
return @@csp[index], @@csp[index]
|
||||
end
|
||||
end
|
||||
end
|
||||
138
src/invidious/helpers/companion_status.cr
Normal file
138
src/invidious/helpers/companion_status.cr
Normal file
@@ -0,0 +1,138 @@
|
||||
require "wait_group"
|
||||
|
||||
class CompanionStatus
|
||||
enum Status
|
||||
# Color in the backend switcher: Red
|
||||
Down = 0
|
||||
# Color in the backend switcher: Yellow
|
||||
Blocked = 1
|
||||
# Color in the backend switcher: Green
|
||||
Working = 2
|
||||
end
|
||||
|
||||
struct CompanionHealthData
|
||||
include JSON::Serializable
|
||||
|
||||
property blocked : Bool = false
|
||||
@[JSON::Field(key: "blockedCount")]
|
||||
property blocked_count : Int64 = 0
|
||||
end
|
||||
|
||||
class CompanionInfo
|
||||
property companion : Config::CompanionConfig
|
||||
property status : Status
|
||||
property csp : String
|
||||
|
||||
def initialize(companion)
|
||||
@companion = companion
|
||||
@status = Status::Down
|
||||
@csp = ""
|
||||
end
|
||||
end
|
||||
|
||||
class WorkingCompanions
|
||||
property all : Array(Int32)
|
||||
property community : Array(Int32)
|
||||
|
||||
def initialize
|
||||
@all = Array(Int32).new
|
||||
@community = Array(Int32).new
|
||||
end
|
||||
end
|
||||
|
||||
getter companions : Array(CompanionInfo)
|
||||
getter working_companions : WorkingCompanions
|
||||
# Reusable TLS Context for HTTP Client
|
||||
# https://github.com/crystal-lang/crystal/issues/15419
|
||||
@tlscontext : OpenSSL::SSL::Context::Client
|
||||
|
||||
def initialize
|
||||
@companions = Array(CompanionInfo).new(CONFIG.invidious_companion.size) do |index|
|
||||
CompanionInfo.new(CONFIG.invidious_companion[index])
|
||||
end
|
||||
@working_companions = WorkingCompanions.new
|
||||
@tlscontext = OpenSSL::SSL::Context::Client.new
|
||||
end
|
||||
|
||||
def check_companions
|
||||
wg = WaitGroup.new(@companions.size)
|
||||
|
||||
@companions.each_with_index do |companion, index|
|
||||
c = companion.companion
|
||||
spawn do
|
||||
begin
|
||||
self.healthcheck(c, index)
|
||||
if @companions[index].status == Status::Working
|
||||
LOGGER.trace("Companion checker: generating CSP for #{c.private_url}")
|
||||
self.generate_csp(
|
||||
[c.public_url,
|
||||
c.i2p_public_url], index)
|
||||
end
|
||||
rescue
|
||||
@companions[index].status == Status::Down
|
||||
ensure
|
||||
wg.done
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
wg.wait
|
||||
self.generate_working_companions
|
||||
end
|
||||
|
||||
private def generate_csp(companion_urls : Array(URI), index : Int32)
|
||||
local_csp = ""
|
||||
|
||||
companion_urls.each do |url|
|
||||
host = url.host
|
||||
next if !host.presence
|
||||
scheme = url.scheme
|
||||
port = url.port ? ":#{url.port}" : ""
|
||||
|
||||
local_csp += "#{scheme}://#{host}#{port} "
|
||||
end
|
||||
|
||||
@companions[index].csp = local_csp
|
||||
end
|
||||
|
||||
private def generate_working_companions
|
||||
# Aux variable to temporarily store the alive companions
|
||||
# If we were to empty the `@working_companions`, some requests in the
|
||||
# timespan of the `@info` iteration to find the working companions could be
|
||||
# displayed as there was not working companions
|
||||
local_working_companions = WorkingCompanions.new
|
||||
|
||||
@companions.each_with_index do |companion, index|
|
||||
if companion.status == Status::Working
|
||||
local_working_companions.community << index
|
||||
if !companion.companion.community
|
||||
local_working_companions.all << index
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@working_companions = local_working_companions
|
||||
end
|
||||
|
||||
private def healthcheck(companion : Config::CompanionConfig, index : Int32)
|
||||
client = HTTP::Client.new(companion.private_url, tls: @tlscontext)
|
||||
client.connect_timeout = 10.seconds
|
||||
|
||||
response = client.get(CONFIG.check_backends_path)
|
||||
if response.status_code == 200
|
||||
if response.content_type == "application/json"
|
||||
body = response.body
|
||||
status_json = CompanionHealthData.from_json(body)
|
||||
if status_json.blocked
|
||||
@companions[index].status = Status::Blocked
|
||||
else
|
||||
@companions[index].status = Status::Working
|
||||
end
|
||||
else
|
||||
@companions[index].status = Status::Working
|
||||
end
|
||||
else
|
||||
@companions[index].status = Status::Down
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user