From 8f71ad700b664803f94bdd993b374a95fe2e025d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Thu, 1 Jun 2023 00:32:47 +0200 Subject: [PATCH] Use registry to identify IO proxy in log handler (#1952) --- .../runtime/erl_dist/logger_gl_backend.ex | 11 +---- .../runtime/erl_dist/logger_gl_handler.ex | 11 +---- lib/livebook/runtime/erl_dist/node_manager.ex | 45 +++++++++---------- .../runtime/erl_dist/runtime_server.ex | 7 ++- lib/livebook/runtime/evaluator.ex | 13 +++++- lib/livebook/runtime/evaluator/io_proxy.ex | 19 +++++--- .../runtime/erl_dist/node_manager_test.exs | 18 +++++--- .../runtime/erl_dist/runtime_server_test.exs | 12 ++--- test/livebook/session/file_guard_test.exs | 2 +- 9 files changed, 68 insertions(+), 70 deletions(-) diff --git a/lib/livebook/runtime/erl_dist/logger_gl_backend.ex b/lib/livebook/runtime/erl_dist/logger_gl_backend.ex index a74a4642b..b0040f5bc 100644 --- a/lib/livebook/runtime/erl_dist/logger_gl_backend.ex +++ b/lib/livebook/runtime/erl_dist/logger_gl_backend.ex @@ -96,22 +96,13 @@ defmodule Livebook.Runtime.ErlDist.LoggerGLBackend do defp log_event(level, msg, ts, md, gl, state) do output = format_event(level, msg, ts, md, state) - if io_proxy?(gl) do + if Livebook.Runtime.ErlDist.NodeManager.known_io_proxy?(gl) do async_io(gl, output) else send(Livebook.Runtime.ErlDist.NodeManager, {:orphan_log, output}) end end - defp io_proxy?(pid) do - try do - info = Process.info(pid, [:dictionary]) - info[:dictionary][:"$initial_call"] == {Livebook.Runtime.Evaluator.IOProxy, :init, 1} - rescue - _ -> false - end - end - def async_io(device, output) when is_pid(device) do send(device, {:io_request, self(), make_ref(), {:put_chars, :unicode, output}}) end diff --git a/lib/livebook/runtime/erl_dist/logger_gl_handler.ex b/lib/livebook/runtime/erl_dist/logger_gl_handler.ex index c20332e0f..8a4e350ba 100644 --- a/lib/livebook/runtime/erl_dist/logger_gl_handler.ex +++ b/lib/livebook/runtime/erl_dist/logger_gl_handler.ex @@ -5,22 +5,13 @@ defmodule Livebook.Runtime.ErlDist.LoggerGLHandler do def log(%{meta: meta} = event, %{formatter: {formatter_module, formatter_config}}) do message = apply(formatter_module, :format, [event, formatter_config]) - if io_proxy?(meta.gl) do + if Livebook.Runtime.ErlDist.NodeManager.known_io_proxy?(meta.gl) do async_io(meta.gl, message) else send(Livebook.Runtime.ErlDist.NodeManager, {:orphan_log, message}) end end - defp io_proxy?(pid) do - try do - info = Process.info(pid, [:dictionary]) - info[:dictionary][:"$initial_call"] == {Livebook.Runtime.Evaluator.IOProxy, :init, 1} - rescue - _ -> false - end - end - def async_io(device, output) when is_pid(device) do send(device, {:io_request, self(), make_ref(), {:put_chars, :unicode, output}}) end diff --git a/lib/livebook/runtime/erl_dist/node_manager.ex b/lib/livebook/runtime/erl_dist/node_manager.ex index 8289b7e5d..6a536ed52 100644 --- a/lib/livebook/runtime/erl_dist/node_manager.ex +++ b/lib/livebook/runtime/erl_dist/node_manager.ex @@ -19,6 +19,7 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do alias Livebook.Runtime.ErlDist @name __MODULE__ + @io_proxy_registry_name __MODULE__.IOProxyRegistry @doc """ Starts the node manager. @@ -29,12 +30,6 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do Livebook related modules from the node on termination. Defaults to `true` - * `:anonymous` - configures whether manager should - be registered under a global name or not. - In most cases we enforce a single manager per node - and identify it by a name, but this can be opted-out - from by using this option. Defaults to `false` - * `:auto_termination` - whether to terminate the manager when the last runtime server terminates. Defaults to `true` @@ -48,8 +43,7 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do """ def start(opts \\ []) do - {opts, gen_opts} = split_opts(opts) - GenServer.start(__MODULE__, opts, gen_opts) + GenServer.start(__MODULE__, opts, name: @name) end @doc """ @@ -58,29 +52,25 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do See `start/1` for available options. """ def start_link(opts \\ []) do - {opts, gen_opts} = split_opts(opts) - GenServer.start_link(__MODULE__, opts, gen_opts) - end - - defp split_opts(opts) do - {anonymous?, opts} = Keyword.pop(opts, :anonymous, false) - - gen_opts = [ - name: if(anonymous?, do: nil, else: @name) - ] - - {opts, gen_opts} + GenServer.start_link(__MODULE__, opts, name: @name) end @doc """ Starts a new `Livebook.Runtime.ErlDist.RuntimeServer` for evaluation. """ - @spec start_runtime_server(node() | pid(), keyword()) :: pid() - def start_runtime_server(node_or_pid, opts \\ []) do - GenServer.call(server(node_or_pid), {:start_runtime_server, opts}) + @spec start_runtime_server(node(), keyword()) :: pid() + def start_runtime_server(node, opts \\ []) do + GenServer.call(server(node), {:start_runtime_server, opts}) + end + + @doc false + def known_io_proxy?(pid) do + case Registry.keys(@io_proxy_registry_name, pid) do + [_] -> true + [] -> false + end end - defp server(pid) when is_pid(pid), do: pid defp server(node) when is_atom(node), do: {@name, node} @impl true @@ -96,6 +86,9 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do {:ok, server_supervisor} = DynamicSupervisor.start_link(strategy: :one_for_one) + {:ok, io_proxy_registry} = + Registry.start_link(name: @io_proxy_registry_name, keys: :duplicate) + # Register our own standard error IO device that proxies # to sender's group leader. original_standard_error = Process.whereis(:standard_error) @@ -142,7 +135,8 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do original_standard_error: original_standard_error, parent_node: parent_node, capture_orphan_logs: capture_orphan_logs, - tmp_dir: tmp_dir + tmp_dir: tmp_dir, + io_proxy_registry: io_proxy_registry }} end @@ -217,6 +211,7 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do |> Keyword.put_new(:ebin_path, ebin_path(state.tmp_dir)) |> Keyword.put_new(:tmp_dir, child_tmp_dir(state.tmp_dir)) |> Keyword.put_new(:base_path_env, System.get_env("PATH", "")) + |> Keyword.put_new(:io_proxy_registry, @io_proxy_registry_name) {:ok, server_pid} = DynamicSupervisor.start_child(state.server_supervisor, {ErlDist.RuntimeServer, opts}) diff --git a/lib/livebook/runtime/erl_dist/runtime_server.ex b/lib/livebook/runtime/erl_dist/runtime_server.ex index e7e474bd5..5c0208a2f 100644 --- a/lib/livebook/runtime/erl_dist/runtime_server.ex +++ b/lib/livebook/runtime/erl_dist/runtime_server.ex @@ -51,6 +51,9 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do to merge new values into when setting environment variables. Defaults to `System.get_env("PATH", "")` + * `:io_proxy_registry` - the registry to register IO proxy + processes in + """ def start_link(opts \\ []) do GenServer.start_link(__MODULE__, opts) @@ -316,6 +319,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do base_env_path: Keyword.get_lazy(opts, :base_env_path, fn -> System.get_env("PATH", "") end), ebin_path: Keyword.get(opts, :ebin_path), + io_proxy_registry: Keyword.get(opts, :io_proxy_registry), tmp_dir: Keyword.get(opts, :tmp_dir) }} end @@ -675,7 +679,8 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do send_to: state.owner, runtime_broadcast_to: state.runtime_broadcast_to, object_tracker: state.object_tracker, - ebin_path: state.ebin_path + ebin_path: state.ebin_path, + io_proxy_registry: state.io_proxy_registry ) Process.monitor(evaluator.pid) diff --git a/lib/livebook/runtime/evaluator.ex b/lib/livebook/runtime/evaluator.ex index e87f42c6d..edeb8e43a 100644 --- a/lib/livebook/runtime/evaluator.ex +++ b/lib/livebook/runtime/evaluator.ex @@ -89,6 +89,9 @@ defmodule Livebook.Runtime.Evaluator do * `:ebin_path` - a directory to write modules bytecode into. When not specified, modules are not written to disk + * `:io_proxy_registry` - the registry to register IO proxy + processes in + """ @spec start_link(keyword()) :: {:ok, pid(), t()} | {:error, term()} def start_link(opts \\ []) do @@ -271,9 +274,17 @@ defmodule Livebook.Runtime.Evaluator do object_tracker = Keyword.fetch!(opts, :object_tracker) formatter = Keyword.get(opts, :formatter, Evaluator.IdentityFormatter) ebin_path = Keyword.get(opts, :ebin_path) + io_proxy_registry = Keyword.get(opts, :io_proxy_registry) {:ok, io_proxy} = - Evaluator.IOProxy.start(self(), send_to, runtime_broadcast_to, object_tracker, ebin_path) + Evaluator.IOProxy.start( + self(), + send_to, + runtime_broadcast_to, + object_tracker, + ebin_path, + io_proxy_registry + ) io_proxy_monitor = Process.monitor(io_proxy) diff --git a/lib/livebook/runtime/evaluator/io_proxy.ex b/lib/livebook/runtime/evaluator/io_proxy.ex index 4174549c6..2b4bf9a5f 100644 --- a/lib/livebook/runtime/evaluator/io_proxy.ex +++ b/lib/livebook/runtime/evaluator/io_proxy.ex @@ -26,22 +26,23 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do For all supported requests a message is sent to the configured `:send_to` process, so this device serves as a proxy. """ - @spec start(pid(), pid(), pid(), pid(), String.t() | nil) :: GenServer.on_start() - def start(evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path) do + @spec start(pid(), pid(), pid(), pid(), String.t() | nil, atom() | nil) :: GenServer.on_start() + def start(evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, registry) do GenServer.start( __MODULE__, - {evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path} + {evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, registry} ) end @doc """ Linking version of `start/4`. """ - @spec start_link(pid(), pid(), pid(), pid(), String.t() | nil) :: GenServer.on_start() - def start_link(evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path) do + @spec start_link(pid(), pid(), pid(), pid(), String.t() | nil, atom() | nil) :: + GenServer.on_start() + def start_link(evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, registry) do GenServer.start_link( __MODULE__, - {evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path} + {evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, registry} ) end @@ -72,9 +73,13 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do end @impl true - def init({evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path}) do + def init({evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, registry}) do evaluator_monitor = Process.monitor(evaluator) + if registry do + Registry.register(registry, nil, nil) + end + {:ok, %{ evaluator_monitor: evaluator_monitor, diff --git a/test/livebook/runtime/erl_dist/node_manager_test.exs b/test/livebook/runtime/erl_dist/node_manager_test.exs index b4f5a8188..85d285c85 100644 --- a/test/livebook/runtime/erl_dist/node_manager_test.exs +++ b/test/livebook/runtime/erl_dist/node_manager_test.exs @@ -1,21 +1,25 @@ defmodule Livebook.Runtime.ErlDist.NodeManagerTest do - use ExUnit.Case, async: false + use ExUnit.Case, async: true + alias Livebook.Runtime alias Livebook.Runtime.ErlDist.{NodeManager, RuntimeServer} test "terminates when the last runtime server terminates" do - {:ok, manager_pid} = - start_supervised({NodeManager, [unload_modules_on_termination: false, anonymous: true]}) + # We use a standalone runtime, so that we have an isolated node + # with its own node manager + assert {:ok, %{node: node, server_pid: server1} = runtime} = + Runtime.ElixirStandalone.new() |> Runtime.connect() - server1 = NodeManager.start_runtime_server(manager_pid) - server2 = NodeManager.start_runtime_server(manager_pid) + Runtime.take_ownership(runtime) + manager_pid = :erpc.call(node, Process, :whereis, [Livebook.Runtime.ErlDist.NodeManager]) ref = Process.monitor(manager_pid) - RuntimeServer.stop(server1) - assert Process.alive?(manager_pid) + server2 = NodeManager.start_runtime_server(node) + RuntimeServer.stop(server1) RuntimeServer.stop(server2) + assert_receive {:DOWN, ^ref, :process, _, _} end end diff --git a/test/livebook/runtime/erl_dist/runtime_server_test.exs b/test/livebook/runtime/erl_dist/runtime_server_test.exs index 3552fcd09..c576aa5bd 100644 --- a/test/livebook/runtime/erl_dist/runtime_server_test.exs +++ b/test/livebook/runtime/erl_dist/runtime_server_test.exs @@ -4,17 +4,13 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do alias Livebook.Runtime.ErlDist.{NodeManager, RuntimeServer} setup ctx do - {:ok, manager_pid} = - start_supervised({NodeManager, [unload_modules_on_termination: false, anonymous: true]}) - - runtime_server_pid = NodeManager.start_runtime_server(manager_pid, ctx[:opts] || []) + runtime_server_pid = NodeManager.start_runtime_server(node(), ctx[:opts] || []) RuntimeServer.attach(runtime_server_pid, self()) - {:ok, %{pid: runtime_server_pid, manager_pid: manager_pid}} + {:ok, %{pid: runtime_server_pid}} end describe "attach/2" do - test "starts watching the given process and terminates as soon as it terminates", - %{manager_pid: manager_pid} do + test "starts watching the given process and terminates as soon as it terminates" do owner = spawn(fn -> receive do @@ -22,7 +18,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do end end) - pid = NodeManager.start_runtime_server(manager_pid) + pid = NodeManager.start_runtime_server(node()) RuntimeServer.attach(pid, owner) # Make sure the node is running. diff --git a/test/livebook/session/file_guard_test.exs b/test/livebook/session/file_guard_test.exs index a12b32b1a..090a16bb5 100644 --- a/test/livebook/session/file_guard_test.exs +++ b/test/livebook/session/file_guard_test.exs @@ -1,5 +1,5 @@ defmodule Livebook.Session.FileGuardTest do - use ExUnit.Case, async: false + use ExUnit.Case, async: true import Livebook.TestHelpers