Restructure remote node processes and allow for multiple connections (#434)

* Restructure remote node processes and allow for multiple connections

* Return proper error from Attached duplicate
This commit is contained in:
Jonatan Kłosko 2021-07-05 20:01:27 +02:00 committed by GitHub
parent 26954ce47c
commit 9b2f039e29
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 419 additions and 366 deletions

View file

@ -15,10 +15,6 @@ defmodule Livebook.Application do
LivebookWeb.Telemetry, LivebookWeb.Telemetry,
# Start the PubSub system # Start the PubSub system
{Phoenix.PubSub, name: Livebook.PubSub}, {Phoenix.PubSub, name: Livebook.PubSub},
# Start the our own :standard_error handler (standard error -> group leader)
# This way we can run multiple embedded runtimes without worrying
# about restoring :standard_error to a valid process when terminating
{Livebook.Runtime.ErlDist.IOForwardGL, name: :standard_error},
# Start the supervisor dynamically managing sessions # Start the supervisor dynamically managing sessions
Livebook.SessionSupervisor, Livebook.SessionSupervisor,
# Start the server responsible for associating files with sessions # Start the server responsible for associating files with sessions
@ -29,10 +25,6 @@ defmodule Livebook.Application do
LivebookWeb.Endpoint LivebookWeb.Endpoint
] ]
# Similarly as with :standard_error, we register our backend
# within the Livebook node, specifically for the embedded runtime
Logger.add_backend(Livebook.Runtime.ErlDist.LoggerGLBackend)
opts = [strategy: :one_for_one, name: Livebook.Supervisor] opts = [strategy: :one_for_one, name: Livebook.Supervisor]
with {:ok, _} = result <- Supervisor.start_link(children, opts) do with {:ok, _} = result <- Supervisor.start_link(children, opts) do

View file

@ -9,31 +9,27 @@ defmodule Livebook.Runtime.Attached do
# The node can be an ordinary Elixir runtime, # The node can be an ordinary Elixir runtime,
# a Mix project shell, a running release or anything else. # a Mix project shell, a running release or anything else.
defstruct [:node, :cookie] defstruct [:node, :cookie, :server_pid]
@type t :: %__MODULE__{ @type t :: %__MODULE__{
node: node(), node: node(),
cookie: atom() cookie: atom(),
server_pid: pid()
} }
@doc """ @doc """
Checks if the given node is available for use and initializes Checks if the given node is available for use and initializes
it with Livebook-specific modules and processes. it with Livebook-specific modules and processes.
""" """
@spec init(node(), atom()) :: {:ok, t()} | {:error, :unreachable | :already_in_use} @spec init(node(), atom()) :: {:ok, t()} | {:error, :unreachable}
def init(node, cookie \\ Node.get_cookie()) do def init(node, cookie \\ Node.get_cookie()) do
# Set cookie for connecting to this specific node # Set cookie for connecting to this specific node
Node.set_cookie(node, cookie) Node.set_cookie(node, cookie)
case Node.ping(node) do case Node.ping(node) do
:pong -> :pong ->
case Livebook.Runtime.ErlDist.initialize(node) do server_pid = Livebook.Runtime.ErlDist.initialize(node)
:ok -> {:ok, %__MODULE__{node: node, cookie: cookie, server_pid: server_pid}}
{:ok, %__MODULE__{node: node, cookie: cookie}}
{:error, :already_in_use} ->
{:error, :already_in_use}
end
:pang -> :pang ->
{:error, :unreachable} {:error, :unreachable}
@ -45,12 +41,12 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
alias Livebook.Runtime.ErlDist alias Livebook.Runtime.ErlDist
def connect(runtime) do def connect(runtime) do
ErlDist.Manager.set_owner(runtime.node, self()) ErlDist.RuntimeServer.set_owner(runtime.server_pid, self())
Process.monitor({ErlDist.Manager, runtime.node}) Process.monitor(runtime.server_pid)
end end
def disconnect(runtime) do def disconnect(runtime) do
ErlDist.Manager.stop(runtime.node) ErlDist.RuntimeServer.stop(runtime.server_pid)
end end
def evaluate_code( def evaluate_code(
@ -61,8 +57,8 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
prev_evaluation_ref, prev_evaluation_ref,
opts \\ [] opts \\ []
) do ) do
ErlDist.Manager.evaluate_code( ErlDist.RuntimeServer.evaluate_code(
runtime.node, runtime.server_pid,
code, code,
container_ref, container_ref,
evaluation_ref, evaluation_ref,
@ -72,16 +68,16 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
end end
def forget_evaluation(runtime, container_ref, evaluation_ref) do def forget_evaluation(runtime, container_ref, evaluation_ref) do
ErlDist.Manager.forget_evaluation(runtime.node, container_ref, evaluation_ref) ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, container_ref, evaluation_ref)
end end
def drop_container(runtime, container_ref) do def drop_container(runtime, container_ref) do
ErlDist.Manager.drop_container(runtime.node, container_ref) ErlDist.RuntimeServer.drop_container(runtime.server_pid, container_ref)
end end
def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do
ErlDist.Manager.request_completion_items( ErlDist.RuntimeServer.request_completion_items(
runtime.node, runtime.server_pid,
send_to, send_to,
ref, ref,
hint, hint,
@ -90,7 +86,10 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
) )
end end
def duplicate(_runtime) do def duplicate(runtime) do
{:error, "attached runtime is connected to a specific VM and cannot be duplicated"} case Livebook.Runtime.Attached.init(runtime.node, runtime.cookie) do
{:ok, runtime} -> {:ok, runtime}
{:error, :unreachable} -> {:error, "node #{inspect(runtime.node)} is unreachable"}
end
end end
end end

View file

@ -1,5 +1,5 @@
defmodule Livebook.Runtime.ElixirStandalone do defmodule Livebook.Runtime.ElixirStandalone do
defstruct [:node, :primary_pid] defstruct [:node, :server_pid]
# A runtime backed by a standalone Elixir node managed by Livebook. # A runtime backed by a standalone Elixir node managed by Livebook.
# #
@ -13,7 +13,7 @@ defmodule Livebook.Runtime.ElixirStandalone do
@type t :: %__MODULE__{ @type t :: %__MODULE__{
node: node(), node: node(),
primary_pid: pid() server_pid: pid()
} }
@doc """ @doc """
@ -38,10 +38,10 @@ defmodule Livebook.Runtime.ElixirStandalone do
with {:ok, elixir_path} <- find_elixir_executable(), with {:ok, elixir_path} <- find_elixir_executable(),
port = start_elixir_node(elixir_path, child_node, child_node_eval_string(), argv), port = start_elixir_node(elixir_path, child_node, child_node_eval_string(), argv),
{:ok, primary_pid} <- parent_init_sequence(child_node, port) do {:ok, server_pid} <- parent_init_sequence(child_node, port) do
runtime = %__MODULE__{ runtime = %__MODULE__{
node: child_node, node: child_node,
primary_pid: primary_pid server_pid: server_pid
} }
{:ok, runtime} {:ok, runtime}
@ -69,12 +69,12 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
alias Livebook.Runtime.ErlDist alias Livebook.Runtime.ErlDist
def connect(runtime) do def connect(runtime) do
ErlDist.Manager.set_owner(runtime.node, self()) ErlDist.RuntimeServer.set_owner(runtime.server_pid, self())
Process.monitor({ErlDist.Manager, runtime.node}) Process.monitor(runtime.server_pid)
end end
def disconnect(runtime) do def disconnect(runtime) do
ErlDist.Manager.stop(runtime.node) ErlDist.RuntimeServer.stop(runtime.server_pid)
end end
def evaluate_code( def evaluate_code(
@ -85,8 +85,8 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
prev_evaluation_ref, prev_evaluation_ref,
opts \\ [] opts \\ []
) do ) do
ErlDist.Manager.evaluate_code( ErlDist.RuntimeServer.evaluate_code(
runtime.node, runtime.server_pid,
code, code,
container_ref, container_ref,
evaluation_ref, evaluation_ref,
@ -96,16 +96,16 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
end end
def forget_evaluation(runtime, container_ref, evaluation_ref) do def forget_evaluation(runtime, container_ref, evaluation_ref) do
ErlDist.Manager.forget_evaluation(runtime.node, container_ref, evaluation_ref) ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, container_ref, evaluation_ref)
end end
def drop_container(runtime, container_ref) do def drop_container(runtime, container_ref) do
ErlDist.Manager.drop_container(runtime.node, container_ref) ErlDist.RuntimeServer.drop_container(runtime.server_pid, container_ref)
end end
def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do
ErlDist.Manager.request_completion_items( ErlDist.RuntimeServer.request_completion_items(
runtime.node, runtime.server_pid,
send_to, send_to,
ref, ref,
hint, hint,

View file

@ -7,11 +7,11 @@ defmodule Livebook.Runtime.Embedded do
# where there is no option of starting a separate # where there is no option of starting a separate
# Elixir runtime. # Elixir runtime.
defstruct [:node, :manager_pid] defstruct [:node, :server_pid]
@type t :: %__MODULE__{ @type t :: %__MODULE__{
node: node(), node: node(),
manager_pid: pid() server_pid: pid()
} }
alias Livebook.Runtime.ErlDist alias Livebook.Runtime.ErlDist
@ -20,7 +20,7 @@ defmodule Livebook.Runtime.Embedded do
Initializes new runtime by starting the necessary Initializes new runtime by starting the necessary
processes within the current node. processes within the current node.
""" """
@spec init() :: {:ok, t()} | {:error, :failure} @spec init() :: {:ok, t()}
def init() do def init() do
# As we run in the Livebook node, all the necessary modules # As we run in the Livebook node, all the necessary modules
# are in place, so we just start the manager process. # are in place, so we just start the manager process.
@ -33,17 +33,9 @@ defmodule Livebook.Runtime.Embedded do
# We tell manager to not override :standard_error, # We tell manager to not override :standard_error,
# as we already do it for the Livebook application globally # as we already do it for the Livebook application globally
# (see Livebook.Application.start/2). # (see Livebook.Application.start/2).
case ErlDist.Manager.start(
anonymous: true,
cleanup_on_termination: false,
register_standard_error_proxy: false
) do
{:ok, pid} ->
{:ok, %__MODULE__{node: node(), manager_pid: pid}}
_ -> server_pid = ErlDist.initialize(node(), unload_modules_on_termination: false)
{:error, :failure} {:ok, %__MODULE__{node: node(), server_pid: server_pid}}
end
end end
end end
@ -51,12 +43,12 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
alias Livebook.Runtime.ErlDist alias Livebook.Runtime.ErlDist
def connect(runtime) do def connect(runtime) do
ErlDist.Manager.set_owner(runtime.manager_pid, self()) ErlDist.RuntimeServer.set_owner(runtime.server_pid, self())
Process.monitor(runtime.manager_pid) Process.monitor(runtime.server_pid)
end end
def disconnect(runtime) do def disconnect(runtime) do
ErlDist.Manager.stop(runtime.manager_pid) ErlDist.RuntimeServer.stop(runtime.server_pid)
end end
def evaluate_code( def evaluate_code(
@ -67,8 +59,8 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
prev_evaluation_ref, prev_evaluation_ref,
opts \\ [] opts \\ []
) do ) do
ErlDist.Manager.evaluate_code( ErlDist.RuntimeServer.evaluate_code(
runtime.manager_pid, runtime.server_pid,
code, code,
container_ref, container_ref,
evaluation_ref, evaluation_ref,
@ -78,16 +70,16 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
end end
def forget_evaluation(runtime, container_ref, evaluation_ref) do def forget_evaluation(runtime, container_ref, evaluation_ref) do
ErlDist.Manager.forget_evaluation(runtime.manager_pid, container_ref, evaluation_ref) ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, container_ref, evaluation_ref)
end end
def drop_container(runtime, container_ref) do def drop_container(runtime, container_ref) do
ErlDist.Manager.drop_container(runtime.manager_pid, container_ref) ErlDist.RuntimeServer.drop_container(runtime.server_pid, container_ref)
end end
def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do
ErlDist.Manager.request_completion_items( ErlDist.RuntimeServer.request_completion_items(
runtime.manager_pid, runtime.server_pid,
send_to, send_to,
ref, ref,
hint, hint,
@ -97,7 +89,6 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
end end
def duplicate(_runtime) do def duplicate(_runtime) do
{:error, Livebook.Runtime.Embedded.init()
"embedded runtime is connected to the Livebook application VM and cannot be duplicated"}
end end
end end

View file

@ -8,13 +8,16 @@ defmodule Livebook.Runtime.ErlDist do
# code evaluation may take place in a separate Elixir runtime, # code evaluation may take place in a separate Elixir runtime,
# which also makes it easy to terminate the whole # which also makes it easy to terminate the whole
# evaluation environment without stopping Livebook. # evaluation environment without stopping Livebook.
# This is what both `Runtime.ElixirStandalone` and `Runtime.Attached` do # This is what `Runtime.ElixirStandalone`, `Runtime.MixStandalone`
# and this module contains the shared functionality they need. # and `Runtime.Attached` do, so this module contains the shared
# functionality they need.
# #
# To work with a separate node, we have to inject the necessary # To work with a separate node, we have to inject the necessary
# Livebook modules there and also start the relevant processes # Livebook modules there and also start the relevant processes
# related to evaluation. Fortunately Erlang allows us to send modules # related to evaluation. Fortunately Erlang allows us to send modules
# binary representation to the other node and load them dynamically. # binary representation to the other node and load them dynamically.
#
# For further details see `Livebook.Runtime.ErlDist.NodeManager`.
# Modules to load into the connected node. # Modules to load into the connected node.
@required_modules [ @required_modules [
@ -23,29 +26,31 @@ defmodule Livebook.Runtime.ErlDist do
Livebook.Evaluator.DefaultFormatter, Livebook.Evaluator.DefaultFormatter,
Livebook.Completion, Livebook.Completion,
Livebook.Runtime.ErlDist, Livebook.Runtime.ErlDist,
Livebook.Runtime.ErlDist.Manager, Livebook.Runtime.ErlDist.NodeManager,
Livebook.Runtime.ErlDist.RuntimeServer,
Livebook.Runtime.ErlDist.EvaluatorSupervisor, Livebook.Runtime.ErlDist.EvaluatorSupervisor,
Livebook.Runtime.ErlDist.IOForwardGL, Livebook.Runtime.ErlDist.IOForwardGL,
Livebook.Runtime.ErlDist.LoggerGLBackend Livebook.Runtime.ErlDist.LoggerGLBackend
] ]
@doc """ @doc """
Loads the necessary modules into the given node Starts a runtime server on the given node.
and starts the primary Livebook remote process.
The initialization may be invoked only once on the given If necessary, the required modules are loaded
node until its disconnected. into the given node and the node manager process
is started with `node_manager_opts`.
""" """
@spec initialize(node()) :: :ok | {:error, :already_in_use} @spec initialize(node(), keyword()) :: pid()
def initialize(node) do def initialize(node, node_manager_opts \\ []) do
if initialized?(node) do unless modules_loaded?(node) do
{:error, :already_in_use}
else
load_required_modules(node) load_required_modules(node)
start_manager(node)
:ok
end end
unless node_manager_started?(node) do
start_node_manager(node, node_manager_opts)
end
start_runtime_server(node)
end end
defp load_required_modules(node) do defp load_required_modules(node) do
@ -55,12 +60,20 @@ defmodule Livebook.Runtime.ErlDist do
end end
end end
defp start_manager(node) do defp start_node_manager(node, opts) do
:rpc.call(node, Livebook.Runtime.ErlDist.Manager, :start, []) :rpc.call(node, Livebook.Runtime.ErlDist.NodeManager, :start, [opts])
end end
defp initialized?(node) do defp start_runtime_server(node) do
case :rpc.call(node, Process, :whereis, [Livebook.Runtime.ErlDist.Manager]) do Livebook.Runtime.ErlDist.NodeManager.start_runtime_server(node)
end
defp modules_loaded?(node) do
:rpc.call(node, Code, :ensure_loaded?, [Livebook.Runtime.ErlDist.NodeManager])
end
defp node_manager_started?(node) do
case :rpc.call(node, Process, :whereis, [Livebook.Runtime.ErlDist.NodeManager]) do
nil -> false nil -> false
_pid -> true _pid -> true
end end

View file

@ -0,0 +1,146 @@
defmodule Livebook.Runtime.ErlDist.NodeManager do
@moduledoc false
# The primary Livebook process started on a remote node.
#
# This process is responsible for initializing the node
# with necessary runtime configuration and then starting
# runtime server processes, one per runtime.
# This approach allows for multiple runtimes connected
# to the same node, while preserving the necessary
# cleanup semantics.
#
# The manager process terminates as soon as the last runtime
# server terminates. Upon termination the manager reverts the
# runtime configuration back to the initial state.
use GenServer
alias Livebook.Runtime.ErlDist
@name __MODULE__
@doc """
Starts the node manager.
## Options
* `:unload_modules_on_termination` - whether to unload all
Livebook related modules from the node on termination.
Defaults to `true`.
* `:anonymous` - configures whether manager should
be registered under a global name or not.
In most cases we enforce a single manager per node
and identify it by a name, but this can be opted-out
from by using this option. Defaults to `false`.
"""
def start(opts \\ []) do
{opts, gen_opts} = split_opts(opts)
GenServer.start(__MODULE__, opts, gen_opts)
end
@doc """
Starts the node manager with link.
See `start/1` for available options.
"""
def start_link(opts \\ []) do
{opts, gen_opts} = split_opts(opts)
GenServer.start_link(__MODULE__, opts, gen_opts)
end
defp split_opts(opts) do
{anonymous?, opts} = Keyword.pop(opts, :anonymous, false)
gen_opts = [
name: if(anonymous?, do: nil, else: @name)
]
{opts, gen_opts}
end
@doc """
Starts a new `Livebook.Runtime.ErlDist.RuntimeServer` for evaluation.
"""
@spec start_runtime_server(node() | pid()) :: pid()
def start_runtime_server(node_or_pid) do
GenServer.call(server(node_or_pid), :start_runtime_server)
end
defp server(pid) when is_pid(pid), do: pid
defp server(node) when is_atom(node), do: {@name, node}
@impl true
def init(opts) do
unload_modules_on_termination = Keyword.get(opts, :unload_modules_on_termination, true)
## Initialize the node
Process.flag(:trap_exit, true)
{:ok, server_supevisor} = DynamicSupervisor.start_link(strategy: :one_for_one)
# Register our own standard error IO device that proxies
# to sender's group leader.
original_standard_error = Process.whereis(:standard_error)
{:ok, io_forward_gl_pid} = ErlDist.IOForwardGL.start_link()
Process.unregister(:standard_error)
Process.register(io_forward_gl_pid, :standard_error)
Logger.add_backend(Livebook.Runtime.ErlDist.LoggerGLBackend)
# Set `ignore_module_conflict` only for the NodeManager lifetime.
initial_ignore_module_conflict = Code.compiler_options()[:ignore_module_conflict]
Code.compiler_options(ignore_module_conflict: true)
{:ok,
%{
unload_modules_on_termination: unload_modules_on_termination,
server_supevisor: server_supevisor,
runtime_servers: [],
initial_ignore_module_conflict: initial_ignore_module_conflict,
original_standard_error: original_standard_error
}}
end
@impl true
def terminate(_reason, state) do
Code.compiler_options(ignore_module_conflict: state.initial_ignore_module_conflict)
Process.unregister(:standard_error)
Process.register(state.original_standard_error, :standard_error)
Logger.remove_backend(Livebook.Runtime.ErlDist.LoggerGLBackend)
if state.unload_modules_on_termination do
ErlDist.unload_required_modules()
end
:ok
end
@impl true
def handle_info({:DOWN, _, :process, pid, _}, state) do
if pid in state.runtime_servers do
case update_in(state.runtime_servers, &List.delete(&1, pid)) do
%{runtime_servers: []} = state -> {:stop, :normal, state}
state -> {:noreply, state}
end
else
{:noreply, state}
end
end
def handle_info(_message, state), do: {:noreply, state}
@impl true
def handle_call(:start_runtime_server, _from, state) do
{:ok, server_pid} =
DynamicSupervisor.start_child(state.server_supevisor, ErlDist.RuntimeServer)
Process.monitor(server_pid)
state = update_in(state.runtime_servers, &[server_pid | &1])
{:reply, server_pid, state}
end
end

View file

@ -1,65 +1,42 @@
defmodule Livebook.Runtime.ErlDist.Manager do defmodule Livebook.Runtime.ErlDist.RuntimeServer do
@moduledoc false @moduledoc false
# The primary Livebook process started on a remote node. # A server process backing a specific runtime.
# #
# This process is responsible for monitoring the owner # This process handles `Livebook.Runtime` operations,
# process on the main node and cleaning up if it terminates. # like evaluation and completion. It spawns/terminates
# Also, this process keeps track of the evaluators # individual evaluators as necessary.
# and spawns/terminates them whenever necessary for the evaluation. #
# Every runtime server must have an owner process,
# to which the server lifetime is bound.
use GenServer use GenServer, restart: :temporary
alias Livebook.Evaluator alias Livebook.Evaluator
alias Livebook.Runtime.ErlDist alias Livebook.Runtime.ErlDist
@name __MODULE__
@await_owner_timeout 5_000 @await_owner_timeout 5_000
@doc """ @doc """
Starts the manager. Starts the manager.
Note: make sure to call `set_owner` within `@await_owner_timeout` Note: make sure to call `set_owner` within `@await_owner_timeout`
or the manager assumes it's not needed and terminates. or the runtime server assumes it's not needed and terminates.
## Options
* `:anonymous` - configures whether manager should
be registered under a global name or not.
In most cases we enforce a single manager per node
and identify it by a name, but this can be opted-out
from using this option. Defaults to `false`.
* `:cleanup_on_termination` - configures whether
manager should cleanup any global configuration
it altered and unload Livebook-specific modules
from the node. Defaults to `true`.
* `:register_standard_error_proxy` - configures whether
manager should start an IOForwardGL process and register
it as `:standard_error`. Defaults to `true`.
""" """
def start(opts \\ []) do def start_link(opts \\ []) do
{anonymous?, opts} = Keyword.pop(opts, :anonymous, false) GenServer.start_link(__MODULE__, opts)
gen_opts = [
name: if(anonymous?, do: nil, else: @name)
]
GenServer.start(__MODULE__, opts, gen_opts)
end end
@doc """ @doc """
Sets the owner process. Sets the owner process.
The owner process is watched and as soon as it terminates, The owner process is monitored and as soon as it terminates,
the manager also terminates. All the evaluation results are the server also terminates. All the evaluation results are
send directly to the owner. send directly to the owner.
""" """
@spec set_owner(node() | pid(), pid()) :: :ok @spec set_owner(pid(), pid()) :: :ok
def set_owner(node_or_pid, owner) do def set_owner(pid, owner) do
GenServer.cast(server(node_or_pid), {:set_owner, owner}) GenServer.cast(pid, {:set_owner, owner})
end end
@doc """ @doc """
@ -73,23 +50,16 @@ defmodule Livebook.Runtime.ErlDist.Manager do
See `Evaluator` for more details. See `Evaluator` for more details.
""" """
@spec evaluate_code( @spec evaluate_code(
node() | pid(), pid(),
String.t(), String.t(),
Evaluator.ref(), Evaluator.ref(),
Evaluator.ref(), Evaluator.ref(),
Evaluator.ref() | nil, Evaluator.ref() | nil,
keyword() keyword()
) :: :ok ) :: :ok
def evaluate_code( def evaluate_code(pid, code, container_ref, evaluation_ref, prev_evaluation_ref, opts \\ []) do
node_or_pid,
code,
container_ref,
evaluation_ref,
prev_evaluation_ref,
opts \\ []
) do
GenServer.cast( GenServer.cast(
server(node_or_pid), pid,
{:evaluate_code, code, container_ref, evaluation_ref, prev_evaluation_ref, opts} {:evaluate_code, code, container_ref, evaluation_ref, prev_evaluation_ref, opts}
) )
end end
@ -99,17 +69,17 @@ defmodule Livebook.Runtime.ErlDist.Manager do
See `Evaluator` for more details. See `Evaluator` for more details.
""" """
@spec forget_evaluation(node() | pid(), Evaluator.ref(), Evaluator.ref()) :: :ok @spec forget_evaluation(pid(), Evaluator.ref(), Evaluator.ref()) :: :ok
def forget_evaluation(node_or_pid, container_ref, evaluation_ref) do def forget_evaluation(pid, container_ref, evaluation_ref) do
GenServer.cast(server(node_or_pid), {:forget_evaluation, container_ref, evaluation_ref}) GenServer.cast(pid, {:forget_evaluation, container_ref, evaluation_ref})
end end
@doc """ @doc """
Terminates the `Evaluator` process belonging to the given container. Terminates the `Evaluator` process belonging to the given container.
""" """
@spec drop_container(node() | pid(), Evaluator.ref()) :: :ok @spec drop_container(pid(), Evaluator.ref()) :: :ok
def drop_container(node_or_pid, container_ref) do def drop_container(pid, container_ref) do
GenServer.cast(server(node_or_pid), {:drop_container, container_ref}) GenServer.cast(pid, {:drop_container, container_ref})
end end
@doc """ @doc """
@ -123,16 +93,16 @@ defmodule Livebook.Runtime.ErlDist.Manager do
See `Livebook.Runtime` for more details. See `Livebook.Runtime` for more details.
""" """
@spec request_completion_items( @spec request_completion_items(
node() | pid(), pid(),
pid(), pid(),
term(), term(),
String.t(), String.t(),
Evaluator.ref(), Evaluator.ref(),
Evaluator.ref() Evaluator.ref()
) :: :ok ) :: :ok
def request_completion_items(node_or_pid, send_to, ref, hint, container_ref, evaluation_ref) do def request_completion_items(pid, send_to, ref, hint, container_ref, evaluation_ref) do
GenServer.cast( GenServer.cast(
server(node_or_pid), pid,
{:request_completion_items, send_to, ref, hint, container_ref, evaluation_ref} {:request_completion_items, send_to, ref, hint, container_ref, evaluation_ref}
) )
end end
@ -142,77 +112,27 @@ defmodule Livebook.Runtime.ErlDist.Manager do
This results in all Livebook-related modules being unloaded from this node. This results in all Livebook-related modules being unloaded from this node.
""" """
@spec stop(node() | pid()) :: :ok @spec stop(pid()) :: :ok
def stop(node_or_pid) do def stop(pid) do
GenServer.stop(server(node_or_pid)) GenServer.stop(pid)
end end
defp server(pid) when is_pid(pid), do: pid
defp server(node) when is_atom(node), do: {@name, node}
@impl true @impl true
def init(opts) do def init(_opts) do
cleanup_on_termination = Keyword.get(opts, :cleanup_on_termination, true)
register_standard_error_proxy = Keyword.get(opts, :register_standard_error_proxy, true)
Process.send_after(self(), :check_owner, @await_owner_timeout) Process.send_after(self(), :check_owner, @await_owner_timeout)
## Initialize the node
Process.flag(:trap_exit, true)
{:ok, evaluator_supervisor} = ErlDist.EvaluatorSupervisor.start_link() {:ok, evaluator_supervisor} = ErlDist.EvaluatorSupervisor.start_link()
{:ok, completion_supervisor} = Task.Supervisor.start_link() {:ok, completion_supervisor} = Task.Supervisor.start_link()
# Register our own standard error IO device that proxies
# to sender's group leader.
original_standard_error = Process.whereis(:standard_error)
if register_standard_error_proxy do
{:ok, io_forward_gl_pid} = ErlDist.IOForwardGL.start_link()
Process.unregister(:standard_error)
Process.register(io_forward_gl_pid, :standard_error)
end
Logger.add_backend(Livebook.Runtime.ErlDist.LoggerGLBackend)
# Set `ignore_module_conflict` only for the Manager lifetime.
initial_ignore_module_conflict = Code.compiler_options()[:ignore_module_conflict]
Code.compiler_options(ignore_module_conflict: true)
{:ok, {:ok,
%{ %{
cleanup_on_termination: cleanup_on_termination,
register_standard_error_proxy: register_standard_error_proxy,
owner: nil, owner: nil,
evaluators: %{}, evaluators: %{},
evaluator_supervisor: evaluator_supervisor, evaluator_supervisor: evaluator_supervisor,
completion_supervisor: completion_supervisor, completion_supervisor: completion_supervisor
initial_ignore_module_conflict: initial_ignore_module_conflict,
original_standard_error: original_standard_error
}} }}
end end
@impl true
def terminate(_reason, state) do
if state.cleanup_on_termination do
Code.compiler_options(ignore_module_conflict: state.initial_ignore_module_conflict)
if state.register_standard_error_proxy do
Process.unregister(:standard_error)
Process.register(state.original_standard_error, :standard_error)
end
Logger.remove_backend(Livebook.Runtime.ErlDist.LoggerGLBackend)
ErlDist.unload_required_modules()
end
:ok
end
@impl true @impl true
def handle_info(:check_owner, state) do def handle_info(:check_owner, state) do
# If not owner has been set within @await_owner_timeout # If not owner has been set within @await_owner_timeout
@ -244,10 +164,6 @@ defmodule Livebook.Runtime.ErlDist.Manager do
end end
end end
def handle_info({:EXIT, _from, _reason}, state) do
{:stop, :shutdown, state}
end
def handle_info(_message, state), do: {:noreply, state} def handle_info(_message, state), do: {:noreply, state}
@impl true @impl true

View file

@ -1,5 +1,5 @@
defmodule Livebook.Runtime.MixStandalone do defmodule Livebook.Runtime.MixStandalone do
defstruct [:node, :primary_pid, :project_path] defstruct [:node, :server_pid, :project_path]
# A runtime backed by a standalone Elixir node managed by Livebook. # A runtime backed by a standalone Elixir node managed by Livebook.
# #
@ -13,7 +13,7 @@ defmodule Livebook.Runtime.MixStandalone do
@type t :: %__MODULE__{ @type t :: %__MODULE__{
node: node(), node: node(),
primary_pid: pid(), server_pid: pid(),
project_path: String.t() project_path: String.t()
} }
@ -56,10 +56,10 @@ defmodule Livebook.Runtime.MixStandalone do
:ok <- run_mix_task("compile", project_path, output_emitter), :ok <- run_mix_task("compile", project_path, output_emitter),
eval = child_node_eval_string(), eval = child_node_eval_string(),
port = start_elixir_mix_node(elixir_path, child_node, eval, argv, project_path), port = start_elixir_mix_node(elixir_path, child_node, eval, argv, project_path),
{:ok, primary_pid} <- parent_init_sequence(child_node, port, output_emitter) do {:ok, server_pid} <- parent_init_sequence(child_node, port, output_emitter) do
runtime = %__MODULE__{ runtime = %__MODULE__{
node: child_node, node: child_node,
primary_pid: primary_pid, server_pid: server_pid,
project_path: project_path project_path: project_path
} }
@ -122,12 +122,12 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.MixStandalone do
alias Livebook.Runtime.ErlDist alias Livebook.Runtime.ErlDist
def connect(runtime) do def connect(runtime) do
ErlDist.Manager.set_owner(runtime.node, self()) ErlDist.RuntimeServer.set_owner(runtime.server_pid, self())
Process.monitor({ErlDist.Manager, runtime.node}) Process.monitor(runtime.server_pid)
end end
def disconnect(runtime) do def disconnect(runtime) do
ErlDist.Manager.stop(runtime.node) ErlDist.RuntimeServer.stop(runtime.server_pid)
end end
def evaluate_code( def evaluate_code(
@ -138,8 +138,8 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.MixStandalone do
prev_evaluation_ref, prev_evaluation_ref,
opts \\ [] opts \\ []
) do ) do
ErlDist.Manager.evaluate_code( ErlDist.RuntimeServer.evaluate_code(
runtime.node, runtime.server_pid,
code, code,
container_ref, container_ref,
evaluation_ref, evaluation_ref,
@ -149,16 +149,16 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.MixStandalone do
end end
def forget_evaluation(runtime, container_ref, evaluation_ref) do def forget_evaluation(runtime, container_ref, evaluation_ref) do
ErlDist.Manager.forget_evaluation(runtime.node, container_ref, evaluation_ref) ErlDist.RuntimeServer.forget_evaluation(runtime.server_pid, container_ref, evaluation_ref)
end end
def drop_container(runtime, container_ref) do def drop_container(runtime, container_ref) do
ErlDist.Manager.drop_container(runtime.node, container_ref) ErlDist.RuntimeServer.drop_container(runtime.server_pid, container_ref)
end end
def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do def request_completion_items(runtime, send_to, ref, hint, container_ref, evaluation_ref) do
ErlDist.Manager.request_completion_items( ErlDist.RuntimeServer.request_completion_items(
runtime.node, runtime.server_pid,
send_to, send_to,
ref, ref,
hint, hint,

View file

@ -60,14 +60,14 @@ defmodule Livebook.Runtime.StandaloneInit do
# 1. The child sends {:node_initialized, ref} message to the parent # 1. The child sends {:node_initialized, ref} message to the parent
# to communicate it's ready for initialization. # to communicate it's ready for initialization.
# #
# 2. The parent initializes the child node - loads necessary modules # 2. The parent initializes the child node - loads necessary modules,
# and starts the Manager process. # starts the NodeManager process and a single RuntimeServer process.
# #
# 3. The parent sends {:node_initialized, ref} message back to the child, # 3. The parent sends {:node_initialized, ref} message back to the child,
# to communicate successful initialization. # to communicate successful initialization.
# #
# 4. The child starts monitoring the Manager process and freezes # 4. The child starts monitoring the NodeManager process and freezes
# until the Manager process terminates. The Manager process # until the NodeManager process terminates. The NodeManager process
# serves as the leading remote process and represents the node from now on. # serves as the leading remote process and represents the node from now on.
# #
# The nodes either successfully go through this flow or return an error, # The nodes either successfully go through this flow or return an error,
@ -90,12 +90,11 @@ defmodule Livebook.Runtime.StandaloneInit do
{:node_started, init_ref, ^child_node, primary_pid} -> {:node_started, init_ref, ^child_node, primary_pid} ->
Port.demonitor(port_ref) Port.demonitor(port_ref)
# We've just created the node, so it is surely not in use server_pid = Livebook.Runtime.ErlDist.initialize(child_node)
:ok = Livebook.Runtime.ErlDist.initialize(child_node)
send(primary_pid, {:node_initialized, init_ref}) send(primary_pid, {:node_initialized, init_ref})
{:ok, primary_pid} {:ok, server_pid}
{^port, {:data, output}} -> {^port, {:data, output}} ->
# Pass all the outputs through the given emitter. # Pass all the outputs through the given emitter.
@ -122,7 +121,7 @@ defmodule Livebook.Runtime.StandaloneInit do
parent_process = {node(), String.to_atom(parent_node)};\ parent_process = {node(), String.to_atom(parent_node)};\
send(parent_process, {:node_started, init_ref, node(), self()});\ send(parent_process, {:node_started, init_ref, node(), self()});\
receive do {:node_initialized, ^init_ref} ->\ receive do {:node_initialized, ^init_ref} ->\
manager_ref = Process.monitor(Livebook.Runtime.ErlDist.Manager);\ manager_ref = Process.monitor(Livebook.Runtime.ErlDist.NodeManager);\
receive do {:DOWN, ^manager_ref, :process, _object, _reason} -> :ok end;\ receive do {:DOWN, ^manager_ref, :process, _object, _reason} -> :ok end;\
after 10_000 ->\ after 10_000 ->\
:timeout;\ :timeout;\

View file

@ -39,7 +39,7 @@ defmodule LivebookWeb.SessionLive.AttachedLive do
<p class="text-gray-700"> <p class="text-gray-700">
Then enter the connection information below: Then enter the connection information below:
</p> </p>
<%= f = form_for :data, "#", phx_submit: "init", phx_change: "validate" %> <%= f = form_for :data, "#", phx_submit: "init", phx_change: "validate", autocomplete: "off", spellcheck: "false" %>
<div class="flex flex-col space-y-4"> <div class="flex flex-col space-y-4">
<div> <div>
<div class="input-label">Name</div> <div class="input-label">Name</div>
@ -94,7 +94,4 @@ defmodule LivebookWeb.SessionLive.AttachedLive do
end end
defp runtime_error_to_message(:unreachable), do: "Node unreachable" defp runtime_error_to_message(:unreachable), do: "Node unreachable"
defp runtime_error_to_message(:already_in_use),
do: "Another session is already connected to this node"
end end

View file

@ -4,7 +4,7 @@ defmodule Livebook.Runtime.ElixirStandaloneTest do
alias Livebook.Runtime alias Livebook.Runtime
describe "init/1" do describe "init/1" do
test "starts a new Elixir runtime in distribution mode and ties its lifetime to the Manager process" do test "starts a new Elixir runtime in distribution mode and ties its lifetime to the NodeManager process" do
assert {:ok, %{node: node} = runtime} = Runtime.ElixirStandalone.init() assert {:ok, %{node: node} = runtime} = Runtime.ElixirStandalone.init()
Runtime.connect(runtime) Runtime.connect(runtime)
@ -12,10 +12,11 @@ defmodule Livebook.Runtime.ElixirStandaloneTest do
Node.monitor(node, true) Node.monitor(node, true)
assert :pong = Node.ping(node) assert :pong = Node.ping(node)
# Tell the owner process to stop. # Kill the manager process.
Livebook.Runtime.ErlDist.Manager.stop(node) pid = :rpc.call(node, Process, :whereis, [Livebook.Runtime.ErlDist.NodeManager])
Process.exit(pid, :kill)
# Once Manager terminates, the node should terminate as well. # Once NodeManager terminates, the node should terminate as well.
assert_receive {:nodedown, ^node} assert_receive {:nodedown, ^node}
end end
@ -46,6 +47,6 @@ defmodule Livebook.Runtime.ElixirStandaloneTest do
end end
defp manager_started?(node) do defp manager_started?(node) do
:rpc.call(node, Process, :whereis, [Livebook.Runtime.ErlDist.Manager]) != nil :rpc.call(node, Process, :whereis, [Livebook.Runtime.ErlDist.NodeManager]) != nil
end end
end end

View file

@ -1,132 +0,0 @@
defmodule Livebook.Runtime.ErlDist.ManagerTest do
use ExUnit.Case, async: false
alias Livebook.Runtime.ErlDist.Manager
describe "set_owner/2" do
test "starts watching the given process and terminates as soon as it terminates" do
Manager.start()
owner =
spawn(fn ->
receive do
:stop -> :ok
end
end)
Manager.set_owner(node(), owner)
# Make sure the node is running.
assert Process.whereis(Livebook.Runtime.ErlDist.Manager) != nil
ref = Process.monitor(Livebook.Runtime.ErlDist.Manager)
# Tell the owner process to stop.
send(owner, :stop)
# Once the owner process terminates, the node should terminate as well.
assert_receive {:DOWN, ^ref, :process, _, _}
end
end
describe "evaluate_code/6" do
test "spawns a new evaluator when necessary" do
Manager.start()
Manager.set_owner(node(), self())
Manager.evaluate_code(node(), "1 + 1", :container1, :evaluation1, nil)
assert_receive {:evaluation_response, :evaluation1, _, %{evaluation_time_ms: _time_ms}}
Manager.stop(node())
end
test "prevents from module redefinition warning being printed to standard error" do
Manager.start()
Manager.set_owner(node(), self())
stderr =
ExUnit.CaptureIO.capture_io(:stderr, fn ->
Manager.evaluate_code(node(), "defmodule Foo do end", :container1, :evaluation1, nil)
Manager.evaluate_code(node(), "defmodule Foo do end", :container1, :evaluation2, nil)
assert_receive {:evaluation_response, :evaluation1, _, %{evaluation_time_ms: _time_ms}}
assert_receive {:evaluation_response, :evaluation2, _, %{evaluation_time_ms: _time_ms}}
end)
assert stderr == ""
Manager.stop(node())
end
test "proxies evaluation stderr to evaluation stdout" do
Manager.start()
Manager.set_owner(node(), self())
Manager.evaluate_code(node(), ~s{IO.puts(:stderr, "error")}, :container1, :evaluation1, nil)
assert_receive {:evaluation_output, :evaluation1, "error\n"}
Manager.stop(node())
end
@tag capture_log: true
test "proxies logger messages to evaluation stdout" do
Manager.start()
Manager.set_owner(node(), self())
code = """
require Logger
Logger.error("hey")
"""
Manager.evaluate_code(node(), code, :container1, :evaluation1, nil)
assert_receive {:evaluation_output, :evaluation1, log_message}
assert log_message =~ "[error] hey"
Manager.stop(node())
end
end
describe "request_completion_items/6" do
test "provides basic completion when no evaluation reference is given" do
Manager.start()
Manager.set_owner(node(), self())
Manager.request_completion_items(node(), self(), :comp_ref, "System.ver", nil, nil)
assert_receive {:completion_response, :comp_ref, [%{label: "version/0"}]}
Manager.stop(node())
end
test "provides extended completion when previous evaluation reference is given" do
Manager.start()
Manager.set_owner(node(), self())
Manager.evaluate_code(node(), "number = 10", :c1, :e1, nil)
assert_receive {:evaluation_response, :e1, _, %{evaluation_time_ms: _time_ms}}
Manager.request_completion_items(node(), self(), :comp_ref, "num", :c1, :e1)
assert_receive {:completion_response, :comp_ref, [%{label: "number"}]}
Manager.stop(node())
end
end
@tag capture_log: true
test "notifies the owner when an evaluator goes down" do
Manager.start()
Manager.set_owner(node(), self())
code = """
spawn_link(fn -> raise "sad cat" end)
"""
Manager.evaluate_code(node(), code, :container1, :evaluation1, nil)
assert_receive {:container_down, :container1, message}
assert message =~ "sad cat"
Manager.stop(node())
end
end

View file

@ -0,0 +1,21 @@
defmodule Livebook.Runtime.ErlDist.NodeManagerTest do
use ExUnit.Case, async: false
alias Livebook.Runtime.ErlDist.{NodeManager, RuntimeServer}
test "terminates when the last runtime server terminates" do
{:ok, manager_pid} =
NodeManager.start_link(unload_modules_on_termination: false, anonymous: true)
server1 = NodeManager.start_runtime_server(manager_pid)
server2 = NodeManager.start_runtime_server(manager_pid)
ref = Process.monitor(manager_pid)
RuntimeServer.stop(server1)
assert Process.alive?(manager_pid)
RuntimeServer.stop(server2)
assert_receive {:DOWN, ^ref, :process, _, _}
end
end

View file

@ -0,0 +1,110 @@
defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do
use ExUnit.Case, async: false
alias Livebook.Runtime.ErlDist.{NodeManager, RuntimeServer}
setup do
{:ok, manager_pid} =
NodeManager.start_link(unload_modules_on_termination: false, anonymous: true)
runtime_server_pid = NodeManager.start_runtime_server(manager_pid)
RuntimeServer.set_owner(runtime_server_pid, self())
{:ok, %{pid: runtime_server_pid}}
end
describe "set_owner/2" do
test "starts watching the given process and terminates as soon as it terminates", %{pid: pid} do
owner =
spawn(fn ->
receive do
:stop -> :ok
end
end)
RuntimeServer.set_owner(pid, owner)
# Make sure the node is running.
assert Process.alive?(pid)
ref = Process.monitor(pid)
# Tell the owner process to stop.
send(owner, :stop)
# Once the owner process terminates, the node should terminate as well.
assert_receive {:DOWN, ^ref, :process, _, _}
end
end
describe "evaluate_code/6" do
test "spawns a new evaluator when necessary", %{pid: pid} do
RuntimeServer.evaluate_code(pid, "1 + 1", :container1, :evaluation1, nil)
assert_receive {:evaluation_response, :evaluation1, _, %{evaluation_time_ms: _time_ms}}
end
test "prevents from module redefinition warning being printed to standard error", %{pid: pid} do
stderr =
ExUnit.CaptureIO.capture_io(:stderr, fn ->
RuntimeServer.evaluate_code(pid, "defmodule Foo do end", :container1, :evaluation1, nil)
RuntimeServer.evaluate_code(pid, "defmodule Foo do end", :container1, :evaluation2, nil)
assert_receive {:evaluation_response, :evaluation1, _, %{evaluation_time_ms: _time_ms}}
assert_receive {:evaluation_response, :evaluation2, _, %{evaluation_time_ms: _time_ms}}
end)
assert stderr == ""
end
test "proxies evaluation stderr to evaluation stdout", %{pid: pid} do
RuntimeServer.evaluate_code(
pid,
~s{IO.puts(:stderr, "error")},
:container1,
:evaluation1,
nil
)
assert_receive {:evaluation_output, :evaluation1, "error\n"}
end
@tag capture_log: true
test "proxies logger messages to evaluation stdout", %{pid: pid} do
code = """
require Logger
Logger.error("hey")
"""
RuntimeServer.evaluate_code(pid, code, :container1, :evaluation1, nil)
assert_receive {:evaluation_output, :evaluation1, log_message}
assert log_message =~ "[error] hey"
end
end
describe "request_completion_items/6" do
test "provides basic completion when no evaluation reference is given", %{pid: pid} do
RuntimeServer.request_completion_items(pid, self(), :comp_ref, "System.ver", nil, nil)
assert_receive {:completion_response, :comp_ref, [%{label: "version/0"}]}
end
test "provides extended completion when previous evaluation reference is given", %{pid: pid} do
RuntimeServer.evaluate_code(pid, "number = 10", :c1, :e1, nil)
assert_receive {:evaluation_response, :e1, _, %{evaluation_time_ms: _time_ms}}
RuntimeServer.request_completion_items(pid, self(), :comp_ref, "num", :c1, :e1)
assert_receive {:completion_response, :comp_ref, [%{label: "number"}]}
end
end
test "notifies the owner when an evaluator goes down", %{pid: pid} do
code = """
spawn_link(fn -> Process.exit(self(), :kill) end)
"""
RuntimeServer.evaluate_code(pid, code, :container1, :evaluation1, nil)
assert_receive {:container_down, :container1, message}
assert message =~ "killed"
end
end

View file

@ -38,6 +38,6 @@ defmodule Livebook.Runtime.MixStandaloneTest do
end end
defp manager_started?(node) do defp manager_started?(node) do
:rpc.call(node, Process, :whereis, [Livebook.Runtime.ErlDist.Manager]) != nil :rpc.call(node, Process, :whereis, [Livebook.Runtime.ErlDist.NodeManager]) != nil
end end
end end