Use registry to identify IO proxy in log handler (#1952)

This commit is contained in:
Jonatan Kłosko 2023-06-01 00:32:47 +02:00 committed by GitHub
parent fac942a617
commit 8f71ad700b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 68 additions and 70 deletions

View file

@ -96,22 +96,13 @@ defmodule Livebook.Runtime.ErlDist.LoggerGLBackend do
defp log_event(level, msg, ts, md, gl, state) do
output = format_event(level, msg, ts, md, state)
if io_proxy?(gl) do
if Livebook.Runtime.ErlDist.NodeManager.known_io_proxy?(gl) do
async_io(gl, output)
else
send(Livebook.Runtime.ErlDist.NodeManager, {:orphan_log, output})
end
end
defp io_proxy?(pid) do
try do
info = Process.info(pid, [:dictionary])
info[:dictionary][:"$initial_call"] == {Livebook.Runtime.Evaluator.IOProxy, :init, 1}
rescue
_ -> false
end
end
def async_io(device, output) when is_pid(device) do
send(device, {:io_request, self(), make_ref(), {:put_chars, :unicode, output}})
end

View file

@ -5,22 +5,13 @@ defmodule Livebook.Runtime.ErlDist.LoggerGLHandler do
def log(%{meta: meta} = event, %{formatter: {formatter_module, formatter_config}}) do
message = apply(formatter_module, :format, [event, formatter_config])
if io_proxy?(meta.gl) do
if Livebook.Runtime.ErlDist.NodeManager.known_io_proxy?(meta.gl) do
async_io(meta.gl, message)
else
send(Livebook.Runtime.ErlDist.NodeManager, {:orphan_log, message})
end
end
defp io_proxy?(pid) do
try do
info = Process.info(pid, [:dictionary])
info[:dictionary][:"$initial_call"] == {Livebook.Runtime.Evaluator.IOProxy, :init, 1}
rescue
_ -> false
end
end
def async_io(device, output) when is_pid(device) do
send(device, {:io_request, self(), make_ref(), {:put_chars, :unicode, output}})
end

View file

@ -19,6 +19,7 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do
alias Livebook.Runtime.ErlDist
@name __MODULE__
@io_proxy_registry_name __MODULE__.IOProxyRegistry
@doc """
Starts the node manager.
@ -29,12 +30,6 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do
Livebook related modules from the node on termination.
Defaults to `true`
* `:anonymous` - configures whether manager should
be registered under a global name or not.
In most cases we enforce a single manager per node
and identify it by a name, but this can be opted-out
from by using this option. Defaults to `false`
* `:auto_termination` - whether to terminate the manager
when the last runtime server terminates. Defaults to `true`
@ -48,8 +43,7 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do
"""
def start(opts \\ []) do
{opts, gen_opts} = split_opts(opts)
GenServer.start(__MODULE__, opts, gen_opts)
GenServer.start(__MODULE__, opts, name: @name)
end
@doc """
@ -58,29 +52,25 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do
See `start/1` for available options.
"""
def start_link(opts \\ []) do
{opts, gen_opts} = split_opts(opts)
GenServer.start_link(__MODULE__, opts, gen_opts)
end
defp split_opts(opts) do
{anonymous?, opts} = Keyword.pop(opts, :anonymous, false)
gen_opts = [
name: if(anonymous?, do: nil, else: @name)
]
{opts, gen_opts}
GenServer.start_link(__MODULE__, opts, name: @name)
end
@doc """
Starts a new `Livebook.Runtime.ErlDist.RuntimeServer` for evaluation.
"""
@spec start_runtime_server(node() | pid(), keyword()) :: pid()
def start_runtime_server(node_or_pid, opts \\ []) do
GenServer.call(server(node_or_pid), {:start_runtime_server, opts})
@spec start_runtime_server(node(), keyword()) :: pid()
def start_runtime_server(node, opts \\ []) do
GenServer.call(server(node), {:start_runtime_server, opts})
end
@doc false
def known_io_proxy?(pid) do
case Registry.keys(@io_proxy_registry_name, pid) do
[_] -> true
[] -> false
end
end
defp server(pid) when is_pid(pid), do: pid
defp server(node) when is_atom(node), do: {@name, node}
@impl true
@ -96,6 +86,9 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do
{:ok, server_supervisor} = DynamicSupervisor.start_link(strategy: :one_for_one)
{:ok, io_proxy_registry} =
Registry.start_link(name: @io_proxy_registry_name, keys: :duplicate)
# Register our own standard error IO device that proxies
# to sender's group leader.
original_standard_error = Process.whereis(:standard_error)
@ -142,7 +135,8 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do
original_standard_error: original_standard_error,
parent_node: parent_node,
capture_orphan_logs: capture_orphan_logs,
tmp_dir: tmp_dir
tmp_dir: tmp_dir,
io_proxy_registry: io_proxy_registry
}}
end
@ -217,6 +211,7 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do
|> Keyword.put_new(:ebin_path, ebin_path(state.tmp_dir))
|> Keyword.put_new(:tmp_dir, child_tmp_dir(state.tmp_dir))
|> Keyword.put_new(:base_path_env, System.get_env("PATH", ""))
|> Keyword.put_new(:io_proxy_registry, @io_proxy_registry_name)
{:ok, server_pid} =
DynamicSupervisor.start_child(state.server_supervisor, {ErlDist.RuntimeServer, opts})

View file

@ -51,6 +51,9 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
to merge new values into when setting environment variables.
Defaults to `System.get_env("PATH", "")`
* `:io_proxy_registry` - the registry to register IO proxy
processes in
"""
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts)
@ -316,6 +319,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
base_env_path:
Keyword.get_lazy(opts, :base_env_path, fn -> System.get_env("PATH", "") end),
ebin_path: Keyword.get(opts, :ebin_path),
io_proxy_registry: Keyword.get(opts, :io_proxy_registry),
tmp_dir: Keyword.get(opts, :tmp_dir)
}}
end
@ -675,7 +679,8 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
send_to: state.owner,
runtime_broadcast_to: state.runtime_broadcast_to,
object_tracker: state.object_tracker,
ebin_path: state.ebin_path
ebin_path: state.ebin_path,
io_proxy_registry: state.io_proxy_registry
)
Process.monitor(evaluator.pid)

View file

@ -89,6 +89,9 @@ defmodule Livebook.Runtime.Evaluator do
* `:ebin_path` - a directory to write modules bytecode into. When
not specified, modules are not written to disk
* `:io_proxy_registry` - the registry to register IO proxy
processes in
"""
@spec start_link(keyword()) :: {:ok, pid(), t()} | {:error, term()}
def start_link(opts \\ []) do
@ -271,9 +274,17 @@ defmodule Livebook.Runtime.Evaluator do
object_tracker = Keyword.fetch!(opts, :object_tracker)
formatter = Keyword.get(opts, :formatter, Evaluator.IdentityFormatter)
ebin_path = Keyword.get(opts, :ebin_path)
io_proxy_registry = Keyword.get(opts, :io_proxy_registry)
{:ok, io_proxy} =
Evaluator.IOProxy.start(self(), send_to, runtime_broadcast_to, object_tracker, ebin_path)
Evaluator.IOProxy.start(
self(),
send_to,
runtime_broadcast_to,
object_tracker,
ebin_path,
io_proxy_registry
)
io_proxy_monitor = Process.monitor(io_proxy)

View file

@ -26,22 +26,23 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
For all supported requests a message is sent to the configured
`:send_to` process, so this device serves as a proxy.
"""
@spec start(pid(), pid(), pid(), pid(), String.t() | nil) :: GenServer.on_start()
def start(evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path) do
@spec start(pid(), pid(), pid(), pid(), String.t() | nil, atom() | nil) :: GenServer.on_start()
def start(evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, registry) do
GenServer.start(
__MODULE__,
{evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path}
{evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, registry}
)
end
@doc """
Linking version of `start/4`.
"""
@spec start_link(pid(), pid(), pid(), pid(), String.t() | nil) :: GenServer.on_start()
def start_link(evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path) do
@spec start_link(pid(), pid(), pid(), pid(), String.t() | nil, atom() | nil) ::
GenServer.on_start()
def start_link(evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, registry) do
GenServer.start_link(
__MODULE__,
{evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path}
{evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, registry}
)
end
@ -72,9 +73,13 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
end
@impl true
def init({evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path}) do
def init({evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, registry}) do
evaluator_monitor = Process.monitor(evaluator)
if registry do
Registry.register(registry, nil, nil)
end
{:ok,
%{
evaluator_monitor: evaluator_monitor,

View file

@ -1,21 +1,25 @@
defmodule Livebook.Runtime.ErlDist.NodeManagerTest do
use ExUnit.Case, async: false
use ExUnit.Case, async: true
alias Livebook.Runtime
alias Livebook.Runtime.ErlDist.{NodeManager, RuntimeServer}
test "terminates when the last runtime server terminates" do
{:ok, manager_pid} =
start_supervised({NodeManager, [unload_modules_on_termination: false, anonymous: true]})
# We use a standalone runtime, so that we have an isolated node
# with its own node manager
assert {:ok, %{node: node, server_pid: server1} = runtime} =
Runtime.ElixirStandalone.new() |> Runtime.connect()
server1 = NodeManager.start_runtime_server(manager_pid)
server2 = NodeManager.start_runtime_server(manager_pid)
Runtime.take_ownership(runtime)
manager_pid = :erpc.call(node, Process, :whereis, [Livebook.Runtime.ErlDist.NodeManager])
ref = Process.monitor(manager_pid)
RuntimeServer.stop(server1)
assert Process.alive?(manager_pid)
server2 = NodeManager.start_runtime_server(node)
RuntimeServer.stop(server1)
RuntimeServer.stop(server2)
assert_receive {:DOWN, ^ref, :process, _, _}
end
end

View file

@ -4,17 +4,13 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do
alias Livebook.Runtime.ErlDist.{NodeManager, RuntimeServer}
setup ctx do
{:ok, manager_pid} =
start_supervised({NodeManager, [unload_modules_on_termination: false, anonymous: true]})
runtime_server_pid = NodeManager.start_runtime_server(manager_pid, ctx[:opts] || [])
runtime_server_pid = NodeManager.start_runtime_server(node(), ctx[:opts] || [])
RuntimeServer.attach(runtime_server_pid, self())
{:ok, %{pid: runtime_server_pid, manager_pid: manager_pid}}
{:ok, %{pid: runtime_server_pid}}
end
describe "attach/2" do
test "starts watching the given process and terminates as soon as it terminates",
%{manager_pid: manager_pid} do
test "starts watching the given process and terminates as soon as it terminates" do
owner =
spawn(fn ->
receive do
@ -22,7 +18,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do
end
end)
pid = NodeManager.start_runtime_server(manager_pid)
pid = NodeManager.start_runtime_server(node())
RuntimeServer.attach(pid, owner)
# Make sure the node is running.

View file

@ -1,5 +1,5 @@
defmodule Livebook.Session.FileGuardTest do
use ExUnit.Case, async: false
use ExUnit.Case, async: true
import Livebook.TestHelpers