mirror of
https://github.com/livebook-dev/livebook.git
synced 2024-12-26 09:22:00 +08:00
Use a node pool for node names (#256)
This commit is contained in:
parent
a32b415707
commit
ec07e4749a
4 changed files with 185 additions and 2 deletions
|
@ -18,6 +18,8 @@ defmodule Livebook.Application do
|
|||
Livebook.SessionSupervisor,
|
||||
# Start the server responsible for associating files with sessions
|
||||
Livebook.Session.FileGuard,
|
||||
# Start the Node Pool for managing node names
|
||||
Livebook.Runtime.NodePool,
|
||||
# Start the Endpoint (http/https)
|
||||
LivebookWeb.Endpoint
|
||||
]
|
||||
|
|
105
lib/livebook/runtime/node_pool.ex
Normal file
105
lib/livebook/runtime/node_pool.ex
Normal file
|
@ -0,0 +1,105 @@
|
|||
defmodule Livebook.Runtime.NodePool do
|
||||
use GenServer
|
||||
|
||||
@moduledoc false
|
||||
|
||||
# A pool for reusing child node names.
|
||||
#
|
||||
# `free_name` refers to the list of unused names.
|
||||
# `generated_names` refers to the list of names ever generated.
|
||||
#
|
||||
# `buffer_time` refers to time the pool waits before
|
||||
# adding a name to `pool`, which by default is 1 minute.
|
||||
|
||||
@default_time 60_000
|
||||
|
||||
# Client interface
|
||||
|
||||
@doc """
|
||||
Starts the GenServer from a Supervision tree
|
||||
|
||||
## Options
|
||||
|
||||
- `:name` - The name the NodePool is locally registered as. By default, it is `Livebook.Runtime.NodePool`
|
||||
- `:buffer_time` - The time that is spent before a disconnected node's name is added to pool. The default is 1 minute.
|
||||
"""
|
||||
def start_link(opts) do
|
||||
name = opts[:name] || __MODULE__
|
||||
buffer_time = opts[:buffer_time] || @default_time
|
||||
|
||||
GenServer.start_link(
|
||||
__MODULE__,
|
||||
%{buffer_time: buffer_time},
|
||||
name: name
|
||||
)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Retuns a node name.
|
||||
|
||||
Generates a new name if pool is empty, or takes one from pool.
|
||||
"""
|
||||
def get_name(server \\ __MODULE__, basename) do
|
||||
GenServer.call(server, {:get_name, basename})
|
||||
end
|
||||
|
||||
# Server side code
|
||||
|
||||
@impl GenServer
|
||||
def init(opts) do
|
||||
:net_kernel.monitor_nodes(true, node_type: :all)
|
||||
{:ok, %{buffer_time: opts.buffer_time, generated_names: MapSet.new(), free_names: []}}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def handle_call({:get_name, basename}, _, state) do
|
||||
{name, new_state} = name(state, basename)
|
||||
{:reply, name, new_state}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def handle_info({:nodedown, node, _info}, state) do
|
||||
_ = Process.send_after(self(), {:add_node, node}, state.buffer_time)
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def handle_info({:nodeup, _node, _info}, state) do
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl GenServer
|
||||
def handle_info({:add_node, node}, state) do
|
||||
{:noreply, add_node(state, node)}
|
||||
end
|
||||
|
||||
# Helper functions
|
||||
|
||||
defp name(state, basename) do
|
||||
if Enum.empty?(state.free_names) do
|
||||
generate_name(state, basename)
|
||||
else
|
||||
get_existing_name(state)
|
||||
end
|
||||
end
|
||||
|
||||
defp generate_name(state, basename) do
|
||||
new_name = :"#{Livebook.Utils.random_short_id()}-#{basename}"
|
||||
generated_names = MapSet.put(state.generated_names, new_name)
|
||||
{new_name, %{state | generated_names: generated_names}}
|
||||
end
|
||||
|
||||
defp get_existing_name(state) do
|
||||
[name | free_names] = state.free_names
|
||||
{name, %{state | free_names: free_names}}
|
||||
end
|
||||
|
||||
defp add_node(state, node) do
|
||||
if MapSet.member?(state.generated_names, node) do
|
||||
free_names = [node | state.free_names]
|
||||
%{state | free_names: free_names}
|
||||
else
|
||||
state
|
||||
end
|
||||
end
|
||||
end
|
|
@ -5,15 +5,15 @@ defmodule Livebook.Runtime.StandaloneInit do
|
|||
# a new Elixir system process. It's used by both ElixirStandalone
|
||||
# and MixStandalone runtimes.
|
||||
|
||||
alias Livebook.Utils
|
||||
alias Livebook.Utils.Emitter
|
||||
alias Livebook.Runtime.NodePool
|
||||
|
||||
@doc """
|
||||
Returns a random name for a dynamically spawned node.
|
||||
"""
|
||||
@spec child_node_name(atom()) :: atom()
|
||||
def child_node_name(parent) do
|
||||
:"#{Utils.random_short_id()}-#{parent}"
|
||||
NodePool.get_name(parent)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
|
76
test/livebook/runtime/node_pool_test.exs
Normal file
76
test/livebook/runtime/node_pool_test.exs
Normal file
|
@ -0,0 +1,76 @@
|
|||
defmodule Livebook.Runtime.NodePoolTest do
|
||||
use ExUnit.Case, async: true
|
||||
|
||||
alias Livebook.Runtime.NodePool
|
||||
|
||||
# Tests for Livebook.Runtime.NodePool
|
||||
#
|
||||
# Note:
|
||||
#
|
||||
# We do not spawn actual nodes as it can be time
|
||||
# intensive (on low spec machines) and is generally
|
||||
# complicated.
|
||||
|
||||
describe "start_link" do
|
||||
test "correctly starts a registered GenServer", config do
|
||||
start_supervised!({NodePool, name: config.test})
|
||||
|
||||
# Verify Process is running
|
||||
assert Process.whereis(config.test)
|
||||
end
|
||||
end
|
||||
|
||||
describe "get_name/2" do
|
||||
test "creates a new node name if pool is empty", config do
|
||||
start_supervised!({NodePool, name: config.test})
|
||||
|
||||
# Assert that we get a result and that it is an atom
|
||||
result = NodePool.get_name(config.test, node())
|
||||
assert result
|
||||
assert is_atom(result)
|
||||
end
|
||||
|
||||
test "returns an existing name if pool is not empty", config do
|
||||
start_supervised!({NodePool, name: config.test, buffer_time: 0})
|
||||
|
||||
name = NodePool.get_name(config.test, node())
|
||||
send(config.test, {:nodedown, name, {}})
|
||||
|
||||
# Since we want the `:add_node` message processed first
|
||||
# before we call `get_name`, we wait
|
||||
Process.sleep(1)
|
||||
|
||||
assert NodePool.get_name(config.test, node()) == name
|
||||
end
|
||||
|
||||
test "removes an existing name when used", config do
|
||||
start_supervised!({NodePool, name: config.test, buffer_time: 0})
|
||||
|
||||
name = NodePool.get_name(config.test, node())
|
||||
send(config.test, {:nodedown, name, {}})
|
||||
|
||||
# Since we want the `:add_node` message processed first
|
||||
# before we call `get_name`, we wait
|
||||
Process.sleep(1)
|
||||
|
||||
name = NodePool.get_name(config.test, node())
|
||||
assert NodePool.get_name(config.test, node()) != name
|
||||
end
|
||||
end
|
||||
|
||||
describe "on nodedown" do
|
||||
test "does not add node name to pool if not in generated_names", config do
|
||||
start_supervised!({NodePool, name: config.test, buffer_time: 0})
|
||||
|
||||
# Mock a nodedown
|
||||
send(config.test, {:nodedown, :some_foo, {}})
|
||||
|
||||
# Since we want the `:add_node` message processed first
|
||||
# before we call `get_name`, we wait
|
||||
Process.sleep(1)
|
||||
|
||||
# Verify that name is not in pool, by calling get_name/2
|
||||
assert NodePool.get_name(config.test, node()) != :some_foo
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue