diff --git a/lib/live_book/runtime/erl_dist.ex b/lib/live_book/runtime/erl_dist.ex index 7bce3c1b9..ae9a319c0 100644 --- a/lib/live_book/runtime/erl_dist.ex +++ b/lib/live_book/runtime/erl_dist.ex @@ -23,7 +23,8 @@ defmodule LiveBook.Runtime.ErlDist do LiveBook.Evaluator.StringFormatter, LiveBook.Runtime.ErlDist, LiveBook.Runtime.ErlDist.Manager, - LiveBook.Runtime.ErlDist.EvaluatorSupervisor + LiveBook.Runtime.ErlDist.EvaluatorSupervisor, + LiveBook.Runtime.ErlDist.IOForwardGL ] @doc """ diff --git a/lib/live_book/runtime/erl_dist/io_forward_gl.ex b/lib/live_book/runtime/erl_dist/io_forward_gl.ex new file mode 100644 index 000000000..7bee46e98 --- /dev/null +++ b/lib/live_book/runtime/erl_dist/io_forward_gl.ex @@ -0,0 +1,53 @@ +defmodule LiveBook.Runtime.ErlDist.IOForwardGL do + @moduledoc false + + # An IO device process forwarding all requests to sender's group leader. + # + # We use this device as `:standard_error` on connected runtime node, + # so that all evaluation warnings are treated as stdout. + # + # The process implements [The Erlang I/O Protocol](https://erlang.org/doc/apps/stdlib/io_protocol.html) + # and can be thought of as a *virtual* IO device. + + use GenServer + + @type state :: %{(reply_as :: term()) => from :: pid()} + + ## API + + @spec start_link() :: GenServer.on_start() + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts) + end + + ## Callbacks + + @impl true + def init(_opts) do + {:ok, %{}} + end + + @impl true + def handle_info({:io_request, from, reply_as, req}, state) do + case Process.info(from, :group_leader) do + {:group_leader, group_leader} -> + # Forward the request to sender's group leader + # and instruct it to get back to us. + send(group_leader, {:io_request, self(), reply_as, req}) + state = Map.put(state, reply_as, from) + + {:noreply, state} + + _ -> + {:noreply, state} + end + end + + def handle_info({:io_reply, reply_as, reply}, state) do + # Forward the reply from group leader to the original client. + {initially_from, state} = Map.pop(state, reply_as) + send(initially_from, {:io_reply, reply_as, reply}) + + {:noreply, state} + end +end diff --git a/lib/live_book/runtime/erl_dist/manager.ex b/lib/live_book/runtime/erl_dist/manager.ex index aaa2939fb..026391182 100644 --- a/lib/live_book/runtime/erl_dist/manager.ex +++ b/lib/live_book/runtime/erl_dist/manager.ex @@ -88,20 +88,31 @@ defmodule LiveBook.Runtime.ErlDist.Manager do @impl true def init(_opts) do - Process.flag(:trap_exit, true) - ErlDist.EvaluatorSupervisor.start_link() - Process.send_after(self(), :check_owner, @await_owner_timeout) + ## Initialize the node + + Process.flag(:trap_exit, true) + + {:ok, _} = ErlDist.EvaluatorSupervisor.start_link() + {:ok, io_forward_gl_pid} = LiveBook.Runtime.ErlDist.IOForwardGL.start_link() + # Set `ignore_module_conflict` only for the Manager lifetime. initial_ignore_module_conflict = Code.compiler_options()[:ignore_module_conflict] Code.compiler_options(ignore_module_conflict: true) + # Register our own standard error IO devices that proxies + # to sender's group leader. + original_standard_error = Process.whereis(:standard_error) + Process.unregister(:standard_error) + Process.register(io_forward_gl_pid, :standard_error) + {:ok, %{ owner: nil, evaluators: %{}, - initial_ignore_module_conflict: initial_ignore_module_conflict + initial_ignore_module_conflict: initial_ignore_module_conflict, + original_standard_error: original_standard_error }} end @@ -109,6 +120,9 @@ defmodule LiveBook.Runtime.ErlDist.Manager do def terminate(_reason, state) do Code.compiler_options(ignore_module_conflict: state.initial_ignore_module_conflict) + Process.unregister(:standard_error) + Process.register(state.original_standard_error, :standard_error) + ErlDist.unload_required_modules() :ok diff --git a/lib/live_book/session/data.ex b/lib/live_book/session/data.ex index 718561261..a5b863b6c 100644 --- a/lib/live_book/session/data.ex +++ b/lib/live_book/session/data.ex @@ -576,7 +576,9 @@ defmodule LiveBook.Session.Data do info = put_in(info.revision_by_client_pid[client_pid], info.revision) purge_deltas(info) end) - |> add_action({:broadcast_delta, client_pid, %{cell | source: new_source}, transformed_new_delta}) + |> add_action( + {:broadcast_delta, client_pid, %{cell | source: new_source}, transformed_new_delta} + ) end defp report_revision(data_actions, client_pid, cell, revision) do diff --git a/test/live_book/runtime/erl_dist/io_forward_gl_test.exs b/test/live_book/runtime/erl_dist/io_forward_gl_test.exs new file mode 100644 index 000000000..be032b75d --- /dev/null +++ b/test/live_book/runtime/erl_dist/io_forward_gl_test.exs @@ -0,0 +1,19 @@ +defmodule LiveBook.Runtime.ErlDist.IOForwardGLTest do + use ExUnit.Case, async: true + + alias LiveBook.Runtime.ErlDist.IOForwardGL + + test "forwards requests to sender's group leader" do + {:ok, pid} = IOForwardGL.start_link() + + group_leader_io = + ExUnit.CaptureIO.capture_io(:stdio, fn -> + # This sends an IO request to the IOForwardGL process. + # Our group leader is :stdio (by default) so we expect + # it to receive the string. + IO.puts(pid, "hey") + end) + + assert group_leader_io == "hey\n" + end +end diff --git a/test/live_book/runtime/erl_dist/manager_test.exs b/test/live_book/runtime/erl_dist/manager_test.exs index a21794647..b2c09592b 100644 --- a/test/live_book/runtime/erl_dist/manager_test.exs +++ b/test/live_book/runtime/erl_dist/manager_test.exs @@ -56,5 +56,16 @@ defmodule LiveBook.Runtime.ErlDist.ManagerTest do Manager.stop(node()) end + + test "proxies evaluation stderr to stdout" do + Manager.start() + Manager.set_owner(node(), self()) + + Manager.evaluate_code(node(), ~s{IO.puts(:stderr, "error")}, :container1, :evaluation1) + + assert_receive {:evaluation_stdout, :evaluation1, "error\n"} + + Manager.stop(node()) + end end end diff --git a/test/live_book/session/data_test.exs b/test/live_book/session/data_test.exs index 1dba76f95..c14d985fb 100644 --- a/test/live_book/session/data_test.exs +++ b/test/live_book/session/data_test.exs @@ -875,7 +875,10 @@ defmodule LiveBook.Session.DataTest do assert {:ok, %{ cell_infos: %{ - "c1" => %{deltas: [], revision_by_client_pid: %{^client1_pid => 1, ^client2_pid => 1}} + "c1" => %{ + deltas: [], + revision_by_client_pid: %{^client1_pid => 1, ^client2_pid => 1} + } } }, _} = Data.apply_operation(data, operation) end