From 19b777eb4e5eafee2ae22270bfbf18f970ead276 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Wed, 9 Feb 2022 19:47:26 +0100 Subject: [PATCH] Handle runtime event broadcast in a separate worker process (#992) * Handle runtime event broadcast in a separate worker process * Improve wording --- lib/livebook/evaluator.ex | 52 ++++++++------ lib/livebook/evaluator/io_proxy.ex | 44 ++++++------ lib/livebook/runtime.ex | 9 ++- lib/livebook/runtime/attached.ex | 4 +- lib/livebook/runtime/elixir_standalone.ex | 4 +- lib/livebook/runtime/embedded.ex | 4 +- .../runtime/erl_dist/evaluator_supervisor.ex | 11 ++- .../runtime/erl_dist/runtime_server.ex | 28 +++++--- lib/livebook/runtime/mix_standalone.ex | 4 +- lib/livebook/session.ex | 39 ++++++----- lib/livebook/session/worker.ex | 28 ++++++++ test/livebook/evaluator/io_proxy_test.exs | 66 ++++++++++-------- test/livebook/evaluator_test.exs | 68 +++++++++---------- .../runtime/erl_dist/runtime_server_test.exs | 12 ++-- test/support/noop_runtime.ex | 2 +- 15 files changed, 218 insertions(+), 157 deletions(-) create mode 100644 lib/livebook/session/worker.ex diff --git a/lib/livebook/evaluator.ex b/lib/livebook/evaluator.ex index 93f5af5bc..e5d5ac5ad 100644 --- a/lib/livebook/evaluator.ex +++ b/lib/livebook/evaluator.ex @@ -23,9 +23,11 @@ defmodule Livebook.Evaluator do @type t :: %{pid: pid(), ref: reference()} @type state :: %{ - ref: reference(), + evaluator_ref: reference(), formatter: module(), io_proxy: pid(), + send_to: pid(), + runtime_broadcast_to: pid(), object_tracker: pid(), contexts: %{ref() => context()}, initial_context: context() @@ -58,11 +60,16 @@ defmodule Livebook.Evaluator do @doc """ Starts the evaluator. - Options: + ## Options - * `object_tracker` - a PID of `Livebook.Evaluator.ObjectTracker`, required + * `:send_to` - the process to send evaluation messages to, required - * `formatter` - a module implementing the `Livebook.Evaluator.Formatter` behaviour, + * `:runtime_broadcast_to` - the process to send runtime broadcast + messages to. Defaults to the value of `:send_to` + + * `:object_tracker` - a pid of `Livebook.Evaluator.ObjectTracker`, required + + * `:formatter` - a module implementing the `Livebook.Evaluator.Formatter` behaviour, used for transforming evaluation response before it's sent to the client """ @spec start_link(keyword()) :: {:ok, pid(), t()} | {:error, term()} @@ -107,8 +114,9 @@ defmodule Livebook.Evaluator do Any subsequent calls may specify `prev_ref` pointing to a previous evaluation, in which case the corresponding binding and environment are used during evaluation. - Evaluation response is sent to the process identified by `send_to` as `{:evaluation_response, ref, response, metadata}`. - Note that response is transformed with the configured formatter (identity by default). + Evaluation response is sent to the process configured via `:send_to` as + `{:evaluation_response, ref, response, metadata}`. Note that response is + transformed with the configured formatter (identity by default). ## Options @@ -116,9 +124,9 @@ defmodule Livebook.Evaluator do this has an impact on the value of `__DIR__`. """ - @spec evaluate_code(t(), pid(), String.t(), ref(), ref() | nil, keyword()) :: :ok - def evaluate_code(evaluator, send_to, code, ref, prev_ref \\ nil, opts \\ []) when ref != nil do - cast(evaluator, {:evaluate_code, send_to, code, ref, prev_ref, opts}) + @spec evaluate_code(t(), String.t(), ref(), ref() | nil, keyword()) :: :ok + def evaluate_code(evaluator, code, ref, prev_ref \\ nil, opts \\ []) when ref != nil do + cast(evaluator, {:evaluate_code, code, ref, prev_ref, opts}) end @doc """ @@ -227,10 +235,13 @@ defmodule Livebook.Evaluator do end def init(opts) do + send_to = Keyword.fetch!(opts, :send_to) + runtime_broadcast_to = Keyword.get(opts, :runtime_broadcast_to, send_to) object_tracker = Keyword.fetch!(opts, :object_tracker) formatter = Keyword.get(opts, :formatter, Evaluator.IdentityFormatter) - {:ok, io_proxy} = Evaluator.IOProxy.start_link(self(), object_tracker) + {:ok, io_proxy} = + Evaluator.IOProxy.start_link(self(), send_to, runtime_broadcast_to, object_tracker) # Use the dedicated IO device as the group leader, so that # intercepts all :stdio requests and also handles Livebook @@ -238,26 +249,25 @@ defmodule Livebook.Evaluator do Process.group_leader(self(), io_proxy) evaluator_ref = make_ref() - state = initial_state(evaluator_ref, formatter, io_proxy, object_tracker) evaluator = %{pid: self(), ref: evaluator_ref} - :proc_lib.init_ack(evaluator) - - loop(state) - end - - defp initial_state(evaluator_ref, formatter, io_proxy, object_tracker) do context = initial_context() Process.put(@initial_env_key, context.env) - %{ + state = %{ evaluator_ref: evaluator_ref, formatter: formatter, io_proxy: io_proxy, + send_to: send_to, + runtime_broadcast_to: runtime_broadcast_to, object_tracker: object_tracker, contexts: %{}, initial_context: context } + + :proc_lib.init_ack(evaluator) + + loop(state) end defp loop(%{evaluator_ref: evaluator_ref} = state) do @@ -279,8 +289,8 @@ defmodule Livebook.Evaluator do %{binding: [], env: env, id: random_id()} end - defp handle_cast({:evaluate_code, send_to, code, ref, prev_ref, opts}, state) do - Evaluator.IOProxy.configure(state.io_proxy, send_to, ref) + defp handle_cast({:evaluate_code, code, ref, prev_ref, opts}, state) do + Evaluator.IOProxy.configure(state.io_proxy, ref) Evaluator.ObjectTracker.remove_reference(state.object_tracker, {self(), ref}) @@ -316,7 +326,7 @@ defmodule Livebook.Evaluator do code_error: code_error } - send(send_to, {:evaluation_response, ref, output, metadata}) + send(state.send_to, {:evaluation_response, ref, output, metadata}) :erlang.garbage_collect(self()) {:noreply, state} diff --git a/lib/livebook/evaluator/io_proxy.ex b/lib/livebook/evaluator/io_proxy.ex index 0c66dfc58..1e8462902 100644 --- a/lib/livebook/evaluator/io_proxy.ex +++ b/lib/livebook/evaluator/io_proxy.ex @@ -7,7 +7,7 @@ defmodule Livebook.Evaluator.IOProxy do # and can be thought of as a *virtual* IO device. # # Upon receiving an IO requests, the process sends a message - # the `target` process specified during initialization. + # the `:send_to` process specified during initialization. # Currently only output requests are supported. # # The implementation is based on the built-in `StringIO`, @@ -22,18 +22,18 @@ defmodule Livebook.Evaluator.IOProxy do @doc """ Starts the IO device process. - Make sure to use `configure/3` to actually proxy the requests. + Make sure to use `configure/3` to correctly proxy the requests. """ - @spec start_link(pid(), pid()) :: GenServer.on_start() - def start_link(evaluator, object_tracker) do - GenServer.start_link(__MODULE__, evaluator: evaluator, object_tracker: object_tracker) + @spec start_link(pid(), pid(), pid(), pid()) :: GenServer.on_start() + def start_link(evaluator, send_to, runtime_broadcast_to, object_tracker) do + GenServer.start_link(__MODULE__, {evaluator, send_to, runtime_broadcast_to, object_tracker}) end @doc """ Sets IO proxy destination and the reference to be attached to all messages. - For all supported requests a message is sent to `target`, + For all supported requests a message is sent to `:send_to`, so this device serves as a proxy. The given evaluation reference (`ref`) is also sent in all messages. @@ -46,14 +46,14 @@ defmodule Livebook.Evaluator.IOProxy do As described by the `Livebook.Runtime` protocol. The `ref` is always the given evaluation reference. """ - @spec configure(pid(), pid(), Evaluator.ref()) :: :ok - def configure(pid, target, ref) do - GenServer.cast(pid, {:configure, target, ref}) + @spec configure(pid(), Evaluator.ref()) :: :ok + def configure(pid, ref) do + GenServer.cast(pid, {:configure, ref}) end @doc """ Synchronously sends all buffer contents to the configured - target process. + `:send_to` process. """ @spec flush(pid()) :: :ok def flush(pid) do @@ -80,26 +80,24 @@ defmodule Livebook.Evaluator.IOProxy do ## Callbacks @impl true - def init(opts) do - evaluator = Keyword.fetch!(opts, :evaluator) - object_tracker = Keyword.fetch!(opts, :object_tracker) - + def init({evaluator, send_to, runtime_broadcast_to, object_tracker}) do {:ok, %{ encoding: :unicode, - target: nil, ref: nil, buffer: [], input_cache: %{}, token_count: 0, evaluator: evaluator, + send_to: send_to, + runtime_broadcast_to: runtime_broadcast_to, object_tracker: object_tracker }} end @impl true - def handle_cast({:configure, target, ref}, state) do - {:noreply, %{state | target: target, ref: ref, token_count: 0}} + def handle_cast({:configure, ref}, state) do + {:noreply, %{state | ref: ref, token_count: 0}} end def handle_cast(:clear_input_cache, state) do @@ -197,7 +195,7 @@ defmodule Livebook.Evaluator.IOProxy do defp io_request({:livebook_put_output, output}, state) do state = flush_buffer(state) - send(state.target, {:evaluation_output, state.ref, output}) + send(state.send_to, {:evaluation_output, state.ref, output}) {:ok, state} end @@ -239,7 +237,7 @@ defmodule Livebook.Evaluator.IOProxy do end defp io_request(:livebook_get_broadcast_target, state) do - {{:ok, state.target}, state} + {{:ok, state.runtime_broadcast_to}, state} end defp io_request(_, state) do @@ -271,9 +269,9 @@ defmodule Livebook.Evaluator.IOProxy do end defp request_input_value(input_id, state) do - send(state.target, {:evaluation_input, state.ref, self(), input_id}) + send(state.send_to, {:evaluation_input, state.ref, self(), input_id}) - ref = Process.monitor(state.target) + ref = Process.monitor(state.send_to) receive do {:evaluation_input_reply, {:ok, value}} -> @@ -296,8 +294,8 @@ defmodule Livebook.Evaluator.IOProxy do defp flush_buffer(state) do string = state.buffer |> Enum.reverse() |> Enum.join() - if state.target != nil and string != "" do - send(state.target, {:evaluation_output, state.ref, {:stdout, string}}) + if state.send_to != nil and string != "" do + send(state.send_to, {:evaluation_output, state.ref, {:stdout, string}}) end %{state | buffer: []} diff --git a/lib/livebook/runtime.ex b/lib/livebook/runtime.ex index 6fd93c7e6..74c674c40 100644 --- a/lib/livebook/runtime.ex +++ b/lib/livebook/runtime.ex @@ -142,9 +142,14 @@ defprotocol Livebook.Runtime do monitoring that process and return the monitor reference. This way the caller is notified when the runtime goes down by listening to the :DOWN message. + + ## Options + + * `:runtime_broadcast_to` - the process to which broadcast + messages should be sent. Defaults to the owner """ - @spec connect(t()) :: reference() - def connect(runtime) + @spec connect(t(), keyword()) :: reference() + def connect(runtime, opts \\ []) @doc """ Disconnects the current owner from runtime. diff --git a/lib/livebook/runtime/attached.ex b/lib/livebook/runtime/attached.ex index f51e2f474..9ae6bceab 100644 --- a/lib/livebook/runtime/attached.ex +++ b/lib/livebook/runtime/attached.ex @@ -41,8 +41,8 @@ end defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do alias Livebook.Runtime.ErlDist - def connect(runtime) do - ErlDist.RuntimeServer.set_owner(runtime.server_pid, self()) + def connect(runtime, opts \\ []) do + ErlDist.RuntimeServer.attach(runtime.server_pid, self(), opts) Process.monitor(runtime.server_pid) end diff --git a/lib/livebook/runtime/elixir_standalone.ex b/lib/livebook/runtime/elixir_standalone.ex index 6a659d504..6244d1e79 100644 --- a/lib/livebook/runtime/elixir_standalone.ex +++ b/lib/livebook/runtime/elixir_standalone.ex @@ -69,8 +69,8 @@ end defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do alias Livebook.Runtime.ErlDist - def connect(runtime) do - ErlDist.RuntimeServer.set_owner(runtime.server_pid, self()) + def connect(runtime, opts \\ []) do + ErlDist.RuntimeServer.attach(runtime.server_pid, self(), opts) Process.monitor(runtime.server_pid) end diff --git a/lib/livebook/runtime/embedded.ex b/lib/livebook/runtime/embedded.ex index 5a8463f66..3d920726e 100644 --- a/lib/livebook/runtime/embedded.ex +++ b/lib/livebook/runtime/embedded.ex @@ -42,8 +42,8 @@ end defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do alias Livebook.Runtime.ErlDist - def connect(runtime) do - ErlDist.RuntimeServer.set_owner(runtime.server_pid, self()) + def connect(runtime, opts \\ []) do + ErlDist.RuntimeServer.attach(runtime.server_pid, self(), opts) Process.monitor(runtime.server_pid) end diff --git a/lib/livebook/runtime/erl_dist/evaluator_supervisor.ex b/lib/livebook/runtime/erl_dist/evaluator_supervisor.ex index c1233a58e..b17ca4ec5 100644 --- a/lib/livebook/runtime/erl_dist/evaluator_supervisor.ex +++ b/lib/livebook/runtime/erl_dist/evaluator_supervisor.ex @@ -20,12 +20,11 @@ defmodule Livebook.Runtime.ErlDist.EvaluatorSupervisor do @doc """ Spawns a new evaluator. """ - @spec start_evaluator(pid(), pid()) :: {:ok, Evaluator.t()} | {:error, any()} - def start_evaluator(supervisor, object_tracker) do - case DynamicSupervisor.start_child( - supervisor, - {Evaluator, [formatter: Evaluator.DefaultFormatter, object_tracker: object_tracker]} - ) do + @spec start_evaluator(pid(), keyword()) :: {:ok, Evaluator.t()} | {:error, any()} + def start_evaluator(supervisor, opts) do + opts = Keyword.put_new(opts, :formatter, Evaluator.DefaultFormatter) + + case DynamicSupervisor.start_child(supervisor, {Evaluator, opts}) do {:ok, _pid, evaluator} -> {:ok, evaluator} {:error, reason} -> {:error, reason} end diff --git a/lib/livebook/runtime/erl_dist/runtime_server.ex b/lib/livebook/runtime/erl_dist/runtime_server.ex index c58be7f23..40ee9edd3 100644 --- a/lib/livebook/runtime/erl_dist/runtime_server.ex +++ b/lib/livebook/runtime/erl_dist/runtime_server.ex @@ -25,7 +25,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do @doc """ Starts the manager. - Note: make sure to call `set_owner` within #{@await_owner_timeout}ms + Note: make sure to call `attach` within #{@await_owner_timeout}ms or the runtime server assumes it's not needed and terminates. """ def start_link(opts \\ []) do @@ -38,10 +38,15 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do The owner process is monitored and as soon as it terminates, the server also terminates. All the evaluation results are send directly to the owner. + + ## Options + + See `Livebook.Runtime.connect/2` for the list of available + options. """ - @spec set_owner(pid(), pid()) :: :ok - def set_owner(pid, owner) do - GenServer.cast(pid, {:set_owner, owner}) + @spec attach(pid(), pid(), keyword()) :: :ok + def attach(pid, owner, opts \\ []) do + GenServer.cast(pid, {:attach, owner, opts}) end @doc """ @@ -141,6 +146,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do {:ok, %{ owner: nil, + runtime_broadcast_to: nil, evaluators: %{}, evaluator_supervisor: evaluator_supervisor, task_supervisor: task_supervisor, @@ -189,9 +195,14 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do def handle_info(_message, state), do: {:noreply, state} @impl true - def handle_cast({:set_owner, owner}, state) do + def handle_cast({:attach, owner, opts}, state) do + if state.owner do + raise "runtime owner has already been configured" + end + Process.monitor(owner) - state = %{state | owner: owner} + + state = %{state | owner: owner, runtime_broadcast_to: opts[:runtime_broadcast_to]} report_memory_usage(state) {:noreply, state} end @@ -219,7 +230,6 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do Evaluator.evaluate_code( state.evaluators[container_ref], - state.owner, code, evaluation_ref, prev_evaluation_ref, @@ -289,7 +299,9 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do {:ok, evaluator} = ErlDist.EvaluatorSupervisor.start_evaluator( state.evaluator_supervisor, - state.object_tracker + send_to: state.owner, + runtime_broadcast_to: state.runtime_broadcast_to, + object_tracker: state.object_tracker ) Process.monitor(evaluator.pid) diff --git a/lib/livebook/runtime/mix_standalone.ex b/lib/livebook/runtime/mix_standalone.ex index 866e0f7e9..effe35429 100644 --- a/lib/livebook/runtime/mix_standalone.ex +++ b/lib/livebook/runtime/mix_standalone.ex @@ -136,8 +136,8 @@ end defimpl Livebook.Runtime, for: Livebook.Runtime.MixStandalone do alias Livebook.Runtime.ErlDist - def connect(runtime) do - ErlDist.RuntimeServer.set_owner(runtime.server_pid, self()) + def connect(runtime, opts \\ []) do + ErlDist.RuntimeServer.attach(runtime.server_pid, self(), opts) Process.monitor(runtime.server_pid) end diff --git a/lib/livebook/session.ex b/lib/livebook/session.ex index 541c149c0..ba06906a0 100644 --- a/lib/livebook/session.ex +++ b/lib/livebook/session.ex @@ -478,7 +478,11 @@ defmodule Livebook.Session do @impl true def init(opts) do - with {:ok, state} <- init_state(opts), + id = Keyword.fetch!(opts, :id) + + {:ok, worker_pid} = Livebook.Session.Worker.start_link(id) + + with {:ok, state} <- init_state(id, worker_pid, opts), :ok <- if(copy_images_from = opts[:copy_images_from], do: copy_images(state, copy_images_from), @@ -497,9 +501,7 @@ defmodule Livebook.Session do end end - defp init_state(opts) do - id = Keyword.fetch!(opts, :id) - + defp init_state(id, worker_pid, opts) do with {:ok, data} <- init_data(opts) do state = %{ session_id: id, @@ -510,7 +512,8 @@ defmodule Livebook.Session do autosave_path: opts[:autosave_path], save_task_pid: nil, saved_default_file: nil, - memory_usage: %{runtime: nil, system: Livebook.SystemResources.memory()} + memory_usage: %{runtime: nil, system: Livebook.SystemResources.memory()}, + worker_pid: worker_pid } {:ok, state} @@ -763,11 +766,9 @@ defmodule Livebook.Session do Runtime.disconnect(old_runtime) end - runtime_monitor_ref = Runtime.connect(runtime) + state = do_connect_runtime(runtime, state) - {:noreply, - %{state | runtime_monitor_ref: runtime_monitor_ref} - |> handle_operation({:set_runtime, client_pid, runtime})} + {:noreply, handle_operation(state, {:set_runtime, client_pid, runtime})} end def handle_cast({:set_file, client_pid, file}, state) do @@ -845,12 +846,6 @@ defmodule Livebook.Session do {:noreply, state} end - def handle_info({:runtime_broadcast, topic, subtopic, message}, state) do - full_topic = runtime_messages_topic(state.session_id, topic, subtopic) - Phoenix.PubSub.broadcast(Livebook.PubSub, full_topic, message) - {:noreply, state} - end - def handle_info({:container_down, container_ref, message}, state) do broadcast_error(state.session_id, "evaluation process terminated - #{message}") @@ -1019,6 +1014,11 @@ defmodule Livebook.Session do end) end + defp do_connect_runtime(runtime, state) do + runtime_monitor_ref = Runtime.connect(runtime, runtime_broadcast_to: state.worker_pid) + %{state | runtime_monitor_ref: runtime_monitor_ref} + end + # Given any operation on `Livebook.Session.Data`, the process # does the following: # @@ -1134,10 +1134,8 @@ defmodule Livebook.Session do case apply(runtime_module, :init, args) do {:ok, runtime} -> - runtime_monitor_ref = Runtime.connect(runtime) - - %{state | runtime_monitor_ref: runtime_monitor_ref} - |> handle_operation({:set_runtime, self(), runtime}) + state = do_connect_runtime(runtime, state) + handle_operation(state, {:set_runtime, self(), runtime}) {:error, error} -> broadcast_error(state.session_id, "failed to setup runtime - #{error}") @@ -1332,7 +1330,8 @@ defmodule Livebook.Session do ) end - defp runtime_messages_topic(session_id, topic, subtopic) do + @doc false + def runtime_messages_topic(session_id, topic, subtopic) do "sessions:#{session_id}:runtime_messages:#{topic}:#{subtopic}" end diff --git a/lib/livebook/session/worker.ex b/lib/livebook/session/worker.ex new file mode 100644 index 000000000..f7dcc1748 --- /dev/null +++ b/lib/livebook/session/worker.ex @@ -0,0 +1,28 @@ +defmodule Livebook.Session.Worker do + @moduledoc false + + # A dedicated process for offloading the session process, + # when the session state is not necessary. + # + # In particular, this process handles broadcast messages + # sent from within the runtime and distributes them to the + # actual subscribers via pubsub. + + use GenServer + + def start_link(session_id) do + GenServer.start_link(__MODULE__, {session_id}) + end + + @impl true + def init({session_id}) do + {:ok, %{session_id: session_id}} + end + + @impl true + def handle_info({:runtime_broadcast, topic, subtopic, message}, state) do + full_topic = Livebook.Session.runtime_messages_topic(state.session_id, topic, subtopic) + Phoenix.PubSub.broadcast(Livebook.PubSub, full_topic, message) + {:noreply, state} + end +end diff --git a/test/livebook/evaluator/io_proxy_test.exs b/test/livebook/evaluator/io_proxy_test.exs index 464e5f486..e159148c6 100644 --- a/test/livebook/evaluator/io_proxy_test.exs +++ b/test/livebook/evaluator/io_proxy_test.exs @@ -5,12 +5,13 @@ defmodule Livebook.Evaluator.IOProxyTest do alias Livebook.Evaluator.IOProxy setup do - # {:ok, io} = IOProxy.start_link() - {:ok, object_tracker} = start_supervised(Evaluator.ObjectTracker) - {:ok, _pid, evaluator} = start_supervised({Evaluator, [object_tracker: object_tracker]}) + + {:ok, _pid, evaluator} = + start_supervised({Evaluator, [send_to: self(), object_tracker: object_tracker]}) + io = Process.info(evaluator.pid)[:group_leader] - IOProxy.configure(io, self(), :ref) + IOProxy.configure(io, :ref) %{io: io} end @@ -41,30 +42,41 @@ defmodule Livebook.Evaluator.IOProxyTest do describe "input" do test "responds to Livebook input request", %{io: io} do - configure_owner_with_input(io, "input1", :value) + pid = + spawn(fn -> + assert livebook_get_input_value(io, "input1") == {:ok, :value} + end) - assert livebook_get_input_value(io, "input1") == {:ok, :value} + reply_to_input_request(:ref, "input1", {:ok, :value}, 1) + + await_termination(pid) end test "responds to subsequent requests with the same value", %{io: io} do - configure_owner_with_input(io, "input1", :value) + pid = + spawn(fn -> + assert livebook_get_input_value(io, "input1") == {:ok, :value} + assert livebook_get_input_value(io, "input1") == {:ok, :value} + end) - assert livebook_get_input_value(io, "input1") == {:ok, :value} - assert livebook_get_input_value(io, "input1") == {:ok, :value} + reply_to_input_request(:ref, "input1", {:ok, :value}, 1) + + await_termination(pid) end test "clear_input_cache/1 clears all cached input information", %{io: io} do pid = spawn_link(fn -> - reply_to_input_request(:ref, "input1", {:ok, :value1}, 1) - reply_to_input_request(:ref, "input1", {:ok, :value2}, 1) + IOProxy.configure(io, :ref) + assert livebook_get_input_value(io, "input1") == {:ok, :value1} + IOProxy.clear_input_cache(io) + assert livebook_get_input_value(io, "input1") == {:ok, :value2} end) - IOProxy.configure(io, pid, :ref) + reply_to_input_request(:ref, "input1", {:ok, :value1}, 1) + reply_to_input_request(:ref, "input1", {:ok, :value2}, 1) - assert livebook_get_input_value(io, "input1") == {:ok, :value1} - IOProxy.clear_input_cache(io) - assert livebook_get_input_value(io, "input1") == {:ok, :value2} + await_termination(pid) end end @@ -94,25 +106,25 @@ defmodule Livebook.Evaluator.IOProxyTest do describe "token requests" do test "returns different tokens for subsequent calls", %{io: io} do - IOProxy.configure(io, self(), :ref1) + IOProxy.configure(io, :ref1) token1 = livebook_generate_token(io) token2 = livebook_generate_token(io) assert token1 != token2 end test "returns different tokens for different refs", %{io: io} do - IOProxy.configure(io, self(), :ref1) + IOProxy.configure(io, :ref1) token1 = livebook_generate_token(io) - IOProxy.configure(io, self(), :ref2) + IOProxy.configure(io, :ref2) token2 = livebook_generate_token(io) assert token1 != token2 end test "returns same tokens for the same ref", %{io: io} do - IOProxy.configure(io, self(), :ref) + IOProxy.configure(io, :ref) token1 = livebook_generate_token(io) token2 = livebook_generate_token(io) - IOProxy.configure(io, self(), :ref) + IOProxy.configure(io, :ref) token3 = livebook_generate_token(io) token4 = livebook_generate_token(io) assert token1 == token3 @@ -122,15 +134,6 @@ defmodule Livebook.Evaluator.IOProxyTest do # Helpers - defp configure_owner_with_input(io, input_id, value) do - pid = - spawn_link(fn -> - reply_to_input_request(:ref, input_id, {:ok, value}, 1) - end) - - IOProxy.configure(io, pid, :ref) - end - defp reply_to_input_request(_ref, _input_id, _reply, 0), do: :ok defp reply_to_input_request(ref, input_id, reply, times) do @@ -159,4 +162,9 @@ defmodule Livebook.Evaluator.IOProxyTest do assert_receive {:io_reply, ^ref, reply} reply end + + defp await_termination(pid) do + ref = Process.monitor(pid) + assert_receive {:DOWN, ^ref, :process, _, _} + end end diff --git a/test/livebook/evaluator_test.exs b/test/livebook/evaluator_test.exs index 63892dddb..0f40784aa 100644 --- a/test/livebook/evaluator_test.exs +++ b/test/livebook/evaluator_test.exs @@ -5,7 +5,10 @@ defmodule Livebook.EvaluatorTest do setup do {:ok, object_tracker} = start_supervised(Evaluator.ObjectTracker) - {:ok, _pid, evaluator} = start_supervised({Evaluator, [object_tracker: object_tracker]}) + + {:ok, _pid, evaluator} = + start_supervised({Evaluator, [send_to: self(), object_tracker: object_tracker]}) + %{evaluator: evaluator, object_tracker: object_tracker} end @@ -23,7 +26,7 @@ defmodule Livebook.EvaluatorTest do x + y """ - Evaluator.evaluate_code(evaluator, self(), code, :code_1) + Evaluator.evaluate_code(evaluator, code, :code_1) assert_receive {:evaluation_response, :code_1, {:ok, 3}, metadata() = metadata} assert metadata.evaluation_time_ms >= 0 @@ -33,11 +36,11 @@ defmodule Livebook.EvaluatorTest do end test "given no prev_ref does not see previous evaluation context", %{evaluator: evaluator} do - Evaluator.evaluate_code(evaluator, self(), "x = 1", :code_1) + Evaluator.evaluate_code(evaluator, "x = 1", :code_1) assert_receive {:evaluation_response, :code_1, _, metadata()} ignore_warnings(fn -> - Evaluator.evaluate_code(evaluator, self(), "x", :code_2) + Evaluator.evaluate_code(evaluator, "x", :code_2) assert_receive {:evaluation_response, :code_2, {:error, _kind, @@ -48,22 +51,22 @@ defmodule Livebook.EvaluatorTest do end test "given prev_ref sees previous evaluation context", %{evaluator: evaluator} do - Evaluator.evaluate_code(evaluator, self(), "x = 1", :code_1) + Evaluator.evaluate_code(evaluator, "x = 1", :code_1) assert_receive {:evaluation_response, :code_1, _, metadata()} - Evaluator.evaluate_code(evaluator, self(), "x", :code_2, :code_1) + Evaluator.evaluate_code(evaluator, "x", :code_2, :code_1) assert_receive {:evaluation_response, :code_2, {:ok, 1}, metadata()} end test "given invalid prev_ref just uses default context", %{evaluator: evaluator} do - Evaluator.evaluate_code(evaluator, self(), ":hey", :code_1, :code_nonexistent) + Evaluator.evaluate_code(evaluator, ":hey", :code_1, :code_nonexistent) assert_receive {:evaluation_response, :code_1, {:ok, :hey}, metadata()} end test "captures standard output and sends it to the caller", %{evaluator: evaluator} do - Evaluator.evaluate_code(evaluator, self(), ~s{IO.puts("hey")}, :code_1) + Evaluator.evaluate_code(evaluator, ~s{IO.puts("hey")}, :code_1) assert_receive {:evaluation_output, :code_1, {:stdout, "hey\n"}} end @@ -78,7 +81,7 @@ defmodule Livebook.EvaluatorTest do end """ - Evaluator.evaluate_code(evaluator, self(), code, :code_1) + Evaluator.evaluate_code(evaluator, code, :code_1) assert_receive {:evaluation_input, :code_1, reply_to, "input1"} send(reply_to, {:evaluation_input_reply, {:ok, :value}}) @@ -91,7 +94,7 @@ defmodule Livebook.EvaluatorTest do List.first(%{}) """ - Evaluator.evaluate_code(evaluator, self(), code, :code_1, nil, file: "file.ex") + Evaluator.evaluate_code(evaluator, code, :code_1, nil, file: "file.ex") assert_receive {:evaluation_response, :code_1, {:error, :error, :function_clause, [{List, :first, _arity, _location}]}, @@ -101,7 +104,7 @@ defmodule Livebook.EvaluatorTest do test "returns additional metadata when there is a syntax error", %{evaluator: evaluator} do code = "1+" - Evaluator.evaluate_code(evaluator, self(), code, :code_1, nil, file: "file.ex") + Evaluator.evaluate_code(evaluator, code, :code_1, nil, file: "file.ex") assert_receive {:evaluation_response, :code_1, {:error, :error, %TokenMissingError{}, []}, %{ @@ -115,7 +118,7 @@ defmodule Livebook.EvaluatorTest do test "returns additional metadata when there is a compilation error", %{evaluator: evaluator} do code = "x" - Evaluator.evaluate_code(evaluator, self(), code, :code_1, nil, file: "file.ex") + Evaluator.evaluate_code(evaluator, code, :code_1, nil, file: "file.ex") assert_receive {:evaluation_response, :code_1, {:error, :error, %CompileError{}, []}, %{ @@ -131,7 +134,7 @@ defmodule Livebook.EvaluatorTest do Code.eval_string("x") """ - Evaluator.evaluate_code(evaluator, self(), code, :code_1, nil, file: "file.ex") + Evaluator.evaluate_code(evaluator, code, :code_1, nil, file: "file.ex") expected_stacktrace = [{Code, :validated_eval_string, 3, [file: 'lib/code.ex', line: 404]}] @@ -160,7 +163,7 @@ defmodule Livebook.EvaluatorTest do """ ignore_warnings(fn -> - Evaluator.evaluate_code(evaluator, self(), code, :code_1) + Evaluator.evaluate_code(evaluator, code, :code_1) expected_stacktrace = [ {Livebook.EvaluatorTest.Stacktrace.Math, :bad_math, 0, [file: 'nofile', line: 3]}, @@ -188,14 +191,14 @@ defmodule Livebook.EvaluatorTest do x * x """ - Evaluator.evaluate_code(evaluator, self(), code1, :code_1) + Evaluator.evaluate_code(evaluator, code1, :code_1) assert_receive {:evaluation_response, :code_1, {:ok, _}, metadata()} - Evaluator.evaluate_code(evaluator, self(), code2, :code_2, :code_1) + Evaluator.evaluate_code(evaluator, code2, :code_2, :code_1) assert_receive {:evaluation_response, :code_2, {:error, _, _, _}, metadata()} - Evaluator.evaluate_code(evaluator, self(), code3, :code_3, :code_2) + Evaluator.evaluate_code(evaluator, code3, :code_3, :code_2) assert_receive {:evaluation_response, :code_3, {:ok, 4}, metadata()} end @@ -205,7 +208,7 @@ defmodule Livebook.EvaluatorTest do """ opts = [file: "/path/dir/file"] - Evaluator.evaluate_code(evaluator, self(), code, :code_1, nil, opts) + Evaluator.evaluate_code(evaluator, code, :code_1, nil, opts) assert_receive {:evaluation_response, :code_1, {:ok, "/path/dir"}, metadata()} end @@ -215,13 +218,13 @@ defmodule Livebook.EvaluatorTest do # The evaluation reference is the same, so the second one overrides # the first one and the first widget should eventually be kiled. - Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1) + Evaluator.evaluate_code(evaluator, spawn_widget_code(), :code_1) assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}, metadata()} ref = Process.monitor(widget_pid1) - Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1) + Evaluator.evaluate_code(evaluator, spawn_widget_code(), :code_1) assert_receive {:evaluation_response, :code_1, {:ok, widget_pid2}, metadata()} @@ -234,12 +237,7 @@ defmodule Livebook.EvaluatorTest do # The widget is spawned from a process that terminates, # so the widget should terminate immediately as well - Evaluator.evaluate_code( - evaluator, - self(), - spawn_widget_from_terminating_process_code(), - :code_1 - ) + Evaluator.evaluate_code(evaluator, spawn_widget_from_terminating_process_code(), :code_1) assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}, metadata()} @@ -249,13 +247,13 @@ defmodule Livebook.EvaluatorTest do describe "forget_evaluation/2" do test "invalidates the given reference", %{evaluator: evaluator} do - Evaluator.evaluate_code(evaluator, self(), "x = 1", :code_1) + Evaluator.evaluate_code(evaluator, "x = 1", :code_1) assert_receive {:evaluation_response, :code_1, _, metadata()} Evaluator.forget_evaluation(evaluator, :code_1) ignore_warnings(fn -> - Evaluator.evaluate_code(evaluator, self(), "x", :code_2, :code_1) + Evaluator.evaluate_code(evaluator, "x", :code_2, :code_1) assert_receive {:evaluation_response, :code_2, {:error, _kind, @@ -266,7 +264,7 @@ defmodule Livebook.EvaluatorTest do end test "kills widgets that no evaluation points to", %{evaluator: evaluator} do - Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1) + Evaluator.evaluate_code(evaluator, spawn_widget_code(), :code_1) assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}, metadata()} @@ -280,30 +278,32 @@ defmodule Livebook.EvaluatorTest do describe "initialize_from/3" do setup %{object_tracker: object_tracker} do {:ok, _pid, parent_evaluator} = - start_supervised({Evaluator, [object_tracker: object_tracker]}, id: :parent_evaluator) + start_supervised({Evaluator, [send_to: self(), object_tracker: object_tracker]}, + id: :parent_evaluator + ) %{parent_evaluator: parent_evaluator} end test "copies the given context and sets as the initial one", %{evaluator: evaluator, parent_evaluator: parent_evaluator} do - Evaluator.evaluate_code(parent_evaluator, self(), "x = 1", :code_1) + Evaluator.evaluate_code(parent_evaluator, "x = 1", :code_1) assert_receive {:evaluation_response, :code_1, _, metadata()} Evaluator.initialize_from(evaluator, parent_evaluator, :code_1) - Evaluator.evaluate_code(evaluator, self(), "x", :code_2) + Evaluator.evaluate_code(evaluator, "x", :code_2) assert_receive {:evaluation_response, :code_2, {:ok, 1}, metadata()} end test "mirrors process dictionary of the given evaluator", %{evaluator: evaluator, parent_evaluator: parent_evaluator} do - Evaluator.evaluate_code(parent_evaluator, self(), "Process.put(:data, 1)", :code_1) + Evaluator.evaluate_code(parent_evaluator, "Process.put(:data, 1)", :code_1) assert_receive {:evaluation_response, :code_1, _, metadata()} Evaluator.initialize_from(evaluator, parent_evaluator, :code_1) - Evaluator.evaluate_code(evaluator, self(), "Process.get(:data)", :code_2) + Evaluator.evaluate_code(evaluator, "Process.get(:data)", :code_2) assert_receive {:evaluation_response, :code_2, {:ok, 1}, metadata()} end end diff --git a/test/livebook/runtime/erl_dist/runtime_server_test.exs b/test/livebook/runtime/erl_dist/runtime_server_test.exs index 77533452d..a4afad682 100644 --- a/test/livebook/runtime/erl_dist/runtime_server_test.exs +++ b/test/livebook/runtime/erl_dist/runtime_server_test.exs @@ -8,12 +8,13 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do start_supervised({NodeManager, [unload_modules_on_termination: false, anonymous: true]}) runtime_server_pid = NodeManager.start_runtime_server(manager_pid) - RuntimeServer.set_owner(runtime_server_pid, self()) - {:ok, %{pid: runtime_server_pid}} + RuntimeServer.attach(runtime_server_pid, self()) + {:ok, %{pid: runtime_server_pid, manager_pid: manager_pid}} end - describe "set_owner/2" do - test "starts watching the given process and terminates as soon as it terminates", %{pid: pid} do + describe "attach/2" do + test "starts watching the given process and terminates as soon as it terminates", + %{manager_pid: manager_pid} do owner = spawn(fn -> receive do @@ -21,7 +22,8 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do end end) - RuntimeServer.set_owner(pid, owner) + pid = NodeManager.start_runtime_server(manager_pid) + RuntimeServer.attach(pid, owner) # Make sure the node is running. assert Process.alive?(pid) diff --git a/test/support/noop_runtime.ex b/test/support/noop_runtime.ex index 01373b3e2..0869dd039 100644 --- a/test/support/noop_runtime.ex +++ b/test/support/noop_runtime.ex @@ -9,7 +9,7 @@ defmodule Livebook.Runtime.NoopRuntime do def new(), do: %__MODULE__{} defimpl Livebook.Runtime do - def connect(_), do: make_ref() + def connect(_, _), do: make_ref() def disconnect(_), do: :ok def evaluate_code(_, _, _, _, _ \\ []), do: :ok def forget_evaluation(_, _), do: :ok