Handle runtime event broadcast in a separate worker process (#992)

* Handle runtime event broadcast in a separate worker process

* Improve wording
This commit is contained in:
Jonatan Kłosko 2022-02-09 19:47:26 +01:00 committed by GitHub
parent ea54214359
commit 19b777eb4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 218 additions and 157 deletions

View file

@ -23,9 +23,11 @@ defmodule Livebook.Evaluator do
@type t :: %{pid: pid(), ref: reference()} @type t :: %{pid: pid(), ref: reference()}
@type state :: %{ @type state :: %{
ref: reference(), evaluator_ref: reference(),
formatter: module(), formatter: module(),
io_proxy: pid(), io_proxy: pid(),
send_to: pid(),
runtime_broadcast_to: pid(),
object_tracker: pid(), object_tracker: pid(),
contexts: %{ref() => context()}, contexts: %{ref() => context()},
initial_context: context() initial_context: context()
@ -58,11 +60,16 @@ defmodule Livebook.Evaluator do
@doc """ @doc """
Starts the evaluator. Starts the evaluator.
Options: ## Options
* `object_tracker` - a PID of `Livebook.Evaluator.ObjectTracker`, required * `:send_to` - the process to send evaluation messages to, required
* `formatter` - a module implementing the `Livebook.Evaluator.Formatter` behaviour, * `:runtime_broadcast_to` - the process to send runtime broadcast
messages to. Defaults to the value of `:send_to`
* `:object_tracker` - a pid of `Livebook.Evaluator.ObjectTracker`, required
* `:formatter` - a module implementing the `Livebook.Evaluator.Formatter` behaviour,
used for transforming evaluation response before it's sent to the client used for transforming evaluation response before it's sent to the client
""" """
@spec start_link(keyword()) :: {:ok, pid(), t()} | {:error, term()} @spec start_link(keyword()) :: {:ok, pid(), t()} | {:error, term()}
@ -107,8 +114,9 @@ defmodule Livebook.Evaluator do
Any subsequent calls may specify `prev_ref` pointing to a previous evaluation, Any subsequent calls may specify `prev_ref` pointing to a previous evaluation,
in which case the corresponding binding and environment are used during evaluation. in which case the corresponding binding and environment are used during evaluation.
Evaluation response is sent to the process identified by `send_to` as `{:evaluation_response, ref, response, metadata}`. Evaluation response is sent to the process configured via `:send_to` as
Note that response is transformed with the configured formatter (identity by default). `{:evaluation_response, ref, response, metadata}`. Note that response is
transformed with the configured formatter (identity by default).
## Options ## Options
@ -116,9 +124,9 @@ defmodule Livebook.Evaluator do
this has an impact on the value of `__DIR__`. this has an impact on the value of `__DIR__`.
""" """
@spec evaluate_code(t(), pid(), String.t(), ref(), ref() | nil, keyword()) :: :ok @spec evaluate_code(t(), String.t(), ref(), ref() | nil, keyword()) :: :ok
def evaluate_code(evaluator, send_to, code, ref, prev_ref \\ nil, opts \\ []) when ref != nil do def evaluate_code(evaluator, code, ref, prev_ref \\ nil, opts \\ []) when ref != nil do
cast(evaluator, {:evaluate_code, send_to, code, ref, prev_ref, opts}) cast(evaluator, {:evaluate_code, code, ref, prev_ref, opts})
end end
@doc """ @doc """
@ -227,10 +235,13 @@ defmodule Livebook.Evaluator do
end end
def init(opts) do def init(opts) do
send_to = Keyword.fetch!(opts, :send_to)
runtime_broadcast_to = Keyword.get(opts, :runtime_broadcast_to, send_to)
object_tracker = Keyword.fetch!(opts, :object_tracker) object_tracker = Keyword.fetch!(opts, :object_tracker)
formatter = Keyword.get(opts, :formatter, Evaluator.IdentityFormatter) formatter = Keyword.get(opts, :formatter, Evaluator.IdentityFormatter)
{:ok, io_proxy} = Evaluator.IOProxy.start_link(self(), object_tracker) {:ok, io_proxy} =
Evaluator.IOProxy.start_link(self(), send_to, runtime_broadcast_to, object_tracker)
# Use the dedicated IO device as the group leader, so that # Use the dedicated IO device as the group leader, so that
# intercepts all :stdio requests and also handles Livebook # intercepts all :stdio requests and also handles Livebook
@ -238,26 +249,25 @@ defmodule Livebook.Evaluator do
Process.group_leader(self(), io_proxy) Process.group_leader(self(), io_proxy)
evaluator_ref = make_ref() evaluator_ref = make_ref()
state = initial_state(evaluator_ref, formatter, io_proxy, object_tracker)
evaluator = %{pid: self(), ref: evaluator_ref} evaluator = %{pid: self(), ref: evaluator_ref}
:proc_lib.init_ack(evaluator)
loop(state)
end
defp initial_state(evaluator_ref, formatter, io_proxy, object_tracker) do
context = initial_context() context = initial_context()
Process.put(@initial_env_key, context.env) Process.put(@initial_env_key, context.env)
%{ state = %{
evaluator_ref: evaluator_ref, evaluator_ref: evaluator_ref,
formatter: formatter, formatter: formatter,
io_proxy: io_proxy, io_proxy: io_proxy,
send_to: send_to,
runtime_broadcast_to: runtime_broadcast_to,
object_tracker: object_tracker, object_tracker: object_tracker,
contexts: %{}, contexts: %{},
initial_context: context initial_context: context
} }
:proc_lib.init_ack(evaluator)
loop(state)
end end
defp loop(%{evaluator_ref: evaluator_ref} = state) do defp loop(%{evaluator_ref: evaluator_ref} = state) do
@ -279,8 +289,8 @@ defmodule Livebook.Evaluator do
%{binding: [], env: env, id: random_id()} %{binding: [], env: env, id: random_id()}
end end
defp handle_cast({:evaluate_code, send_to, code, ref, prev_ref, opts}, state) do defp handle_cast({:evaluate_code, code, ref, prev_ref, opts}, state) do
Evaluator.IOProxy.configure(state.io_proxy, send_to, ref) Evaluator.IOProxy.configure(state.io_proxy, ref)
Evaluator.ObjectTracker.remove_reference(state.object_tracker, {self(), ref}) Evaluator.ObjectTracker.remove_reference(state.object_tracker, {self(), ref})
@ -316,7 +326,7 @@ defmodule Livebook.Evaluator do
code_error: code_error code_error: code_error
} }
send(send_to, {:evaluation_response, ref, output, metadata}) send(state.send_to, {:evaluation_response, ref, output, metadata})
:erlang.garbage_collect(self()) :erlang.garbage_collect(self())
{:noreply, state} {:noreply, state}

View file

@ -7,7 +7,7 @@ defmodule Livebook.Evaluator.IOProxy do
# and can be thought of as a *virtual* IO device. # and can be thought of as a *virtual* IO device.
# #
# Upon receiving an IO requests, the process sends a message # Upon receiving an IO requests, the process sends a message
# the `target` process specified during initialization. # the `:send_to` process specified during initialization.
# Currently only output requests are supported. # Currently only output requests are supported.
# #
# The implementation is based on the built-in `StringIO`, # The implementation is based on the built-in `StringIO`,
@ -22,18 +22,18 @@ defmodule Livebook.Evaluator.IOProxy do
@doc """ @doc """
Starts the IO device process. Starts the IO device process.
Make sure to use `configure/3` to actually proxy the requests. Make sure to use `configure/3` to correctly proxy the requests.
""" """
@spec start_link(pid(), pid()) :: GenServer.on_start() @spec start_link(pid(), pid(), pid(), pid()) :: GenServer.on_start()
def start_link(evaluator, object_tracker) do def start_link(evaluator, send_to, runtime_broadcast_to, object_tracker) do
GenServer.start_link(__MODULE__, evaluator: evaluator, object_tracker: object_tracker) GenServer.start_link(__MODULE__, {evaluator, send_to, runtime_broadcast_to, object_tracker})
end end
@doc """ @doc """
Sets IO proxy destination and the reference to be attached Sets IO proxy destination and the reference to be attached
to all messages. to all messages.
For all supported requests a message is sent to `target`, For all supported requests a message is sent to `:send_to`,
so this device serves as a proxy. The given evaluation so this device serves as a proxy. The given evaluation
reference (`ref`) is also sent in all messages. reference (`ref`) is also sent in all messages.
@ -46,14 +46,14 @@ defmodule Livebook.Evaluator.IOProxy do
As described by the `Livebook.Runtime` protocol. The `ref` As described by the `Livebook.Runtime` protocol. The `ref`
is always the given evaluation reference. is always the given evaluation reference.
""" """
@spec configure(pid(), pid(), Evaluator.ref()) :: :ok @spec configure(pid(), Evaluator.ref()) :: :ok
def configure(pid, target, ref) do def configure(pid, ref) do
GenServer.cast(pid, {:configure, target, ref}) GenServer.cast(pid, {:configure, ref})
end end
@doc """ @doc """
Synchronously sends all buffer contents to the configured Synchronously sends all buffer contents to the configured
target process. `:send_to` process.
""" """
@spec flush(pid()) :: :ok @spec flush(pid()) :: :ok
def flush(pid) do def flush(pid) do
@ -80,26 +80,24 @@ defmodule Livebook.Evaluator.IOProxy do
## Callbacks ## Callbacks
@impl true @impl true
def init(opts) do def init({evaluator, send_to, runtime_broadcast_to, object_tracker}) do
evaluator = Keyword.fetch!(opts, :evaluator)
object_tracker = Keyword.fetch!(opts, :object_tracker)
{:ok, {:ok,
%{ %{
encoding: :unicode, encoding: :unicode,
target: nil,
ref: nil, ref: nil,
buffer: [], buffer: [],
input_cache: %{}, input_cache: %{},
token_count: 0, token_count: 0,
evaluator: evaluator, evaluator: evaluator,
send_to: send_to,
runtime_broadcast_to: runtime_broadcast_to,
object_tracker: object_tracker object_tracker: object_tracker
}} }}
end end
@impl true @impl true
def handle_cast({:configure, target, ref}, state) do def handle_cast({:configure, ref}, state) do
{:noreply, %{state | target: target, ref: ref, token_count: 0}} {:noreply, %{state | ref: ref, token_count: 0}}
end end
def handle_cast(:clear_input_cache, state) do def handle_cast(:clear_input_cache, state) do
@ -197,7 +195,7 @@ defmodule Livebook.Evaluator.IOProxy do
defp io_request({:livebook_put_output, output}, state) do defp io_request({:livebook_put_output, output}, state) do
state = flush_buffer(state) state = flush_buffer(state)
send(state.target, {:evaluation_output, state.ref, output}) send(state.send_to, {:evaluation_output, state.ref, output})
{:ok, state} {:ok, state}
end end
@ -239,7 +237,7 @@ defmodule Livebook.Evaluator.IOProxy do
end end
defp io_request(:livebook_get_broadcast_target, state) do defp io_request(:livebook_get_broadcast_target, state) do
{{:ok, state.target}, state} {{:ok, state.runtime_broadcast_to}, state}
end end
defp io_request(_, state) do defp io_request(_, state) do
@ -271,9 +269,9 @@ defmodule Livebook.Evaluator.IOProxy do
end end
defp request_input_value(input_id, state) do defp request_input_value(input_id, state) do
send(state.target, {:evaluation_input, state.ref, self(), input_id}) send(state.send_to, {:evaluation_input, state.ref, self(), input_id})
ref = Process.monitor(state.target) ref = Process.monitor(state.send_to)
receive do receive do
{:evaluation_input_reply, {:ok, value}} -> {:evaluation_input_reply, {:ok, value}} ->
@ -296,8 +294,8 @@ defmodule Livebook.Evaluator.IOProxy do
defp flush_buffer(state) do defp flush_buffer(state) do
string = state.buffer |> Enum.reverse() |> Enum.join() string = state.buffer |> Enum.reverse() |> Enum.join()
if state.target != nil and string != "" do if state.send_to != nil and string != "" do
send(state.target, {:evaluation_output, state.ref, {:stdout, string}}) send(state.send_to, {:evaluation_output, state.ref, {:stdout, string}})
end end
%{state | buffer: []} %{state | buffer: []}

View file

@ -142,9 +142,14 @@ defprotocol Livebook.Runtime do
monitoring that process and return the monitor reference. monitoring that process and return the monitor reference.
This way the caller is notified when the runtime goes down This way the caller is notified when the runtime goes down
by listening to the :DOWN message. by listening to the :DOWN message.
## Options
* `:runtime_broadcast_to` - the process to which broadcast
messages should be sent. Defaults to the owner
""" """
@spec connect(t()) :: reference() @spec connect(t(), keyword()) :: reference()
def connect(runtime) def connect(runtime, opts \\ [])
@doc """ @doc """
Disconnects the current owner from runtime. Disconnects the current owner from runtime.

View file

@ -41,8 +41,8 @@ end
defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
alias Livebook.Runtime.ErlDist alias Livebook.Runtime.ErlDist
def connect(runtime) do def connect(runtime, opts \\ []) do
ErlDist.RuntimeServer.set_owner(runtime.server_pid, self()) ErlDist.RuntimeServer.attach(runtime.server_pid, self(), opts)
Process.monitor(runtime.server_pid) Process.monitor(runtime.server_pid)
end end

View file

@ -69,8 +69,8 @@ end
defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
alias Livebook.Runtime.ErlDist alias Livebook.Runtime.ErlDist
def connect(runtime) do def connect(runtime, opts \\ []) do
ErlDist.RuntimeServer.set_owner(runtime.server_pid, self()) ErlDist.RuntimeServer.attach(runtime.server_pid, self(), opts)
Process.monitor(runtime.server_pid) Process.monitor(runtime.server_pid)
end end

View file

@ -42,8 +42,8 @@ end
defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
alias Livebook.Runtime.ErlDist alias Livebook.Runtime.ErlDist
def connect(runtime) do def connect(runtime, opts \\ []) do
ErlDist.RuntimeServer.set_owner(runtime.server_pid, self()) ErlDist.RuntimeServer.attach(runtime.server_pid, self(), opts)
Process.monitor(runtime.server_pid) Process.monitor(runtime.server_pid)
end end

View file

@ -20,12 +20,11 @@ defmodule Livebook.Runtime.ErlDist.EvaluatorSupervisor do
@doc """ @doc """
Spawns a new evaluator. Spawns a new evaluator.
""" """
@spec start_evaluator(pid(), pid()) :: {:ok, Evaluator.t()} | {:error, any()} @spec start_evaluator(pid(), keyword()) :: {:ok, Evaluator.t()} | {:error, any()}
def start_evaluator(supervisor, object_tracker) do def start_evaluator(supervisor, opts) do
case DynamicSupervisor.start_child( opts = Keyword.put_new(opts, :formatter, Evaluator.DefaultFormatter)
supervisor,
{Evaluator, [formatter: Evaluator.DefaultFormatter, object_tracker: object_tracker]} case DynamicSupervisor.start_child(supervisor, {Evaluator, opts}) do
) do
{:ok, _pid, evaluator} -> {:ok, evaluator} {:ok, _pid, evaluator} -> {:ok, evaluator}
{:error, reason} -> {:error, reason} {:error, reason} -> {:error, reason}
end end

View file

@ -25,7 +25,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
@doc """ @doc """
Starts the manager. Starts the manager.
Note: make sure to call `set_owner` within #{@await_owner_timeout}ms Note: make sure to call `attach` within #{@await_owner_timeout}ms
or the runtime server assumes it's not needed and terminates. or the runtime server assumes it's not needed and terminates.
""" """
def start_link(opts \\ []) do def start_link(opts \\ []) do
@ -38,10 +38,15 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
The owner process is monitored and as soon as it terminates, The owner process is monitored and as soon as it terminates,
the server also terminates. All the evaluation results are the server also terminates. All the evaluation results are
send directly to the owner. send directly to the owner.
## Options
See `Livebook.Runtime.connect/2` for the list of available
options.
""" """
@spec set_owner(pid(), pid()) :: :ok @spec attach(pid(), pid(), keyword()) :: :ok
def set_owner(pid, owner) do def attach(pid, owner, opts \\ []) do
GenServer.cast(pid, {:set_owner, owner}) GenServer.cast(pid, {:attach, owner, opts})
end end
@doc """ @doc """
@ -141,6 +146,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
{:ok, {:ok,
%{ %{
owner: nil, owner: nil,
runtime_broadcast_to: nil,
evaluators: %{}, evaluators: %{},
evaluator_supervisor: evaluator_supervisor, evaluator_supervisor: evaluator_supervisor,
task_supervisor: task_supervisor, task_supervisor: task_supervisor,
@ -189,9 +195,14 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
def handle_info(_message, state), do: {:noreply, state} def handle_info(_message, state), do: {:noreply, state}
@impl true @impl true
def handle_cast({:set_owner, owner}, state) do def handle_cast({:attach, owner, opts}, state) do
if state.owner do
raise "runtime owner has already been configured"
end
Process.monitor(owner) Process.monitor(owner)
state = %{state | owner: owner}
state = %{state | owner: owner, runtime_broadcast_to: opts[:runtime_broadcast_to]}
report_memory_usage(state) report_memory_usage(state)
{:noreply, state} {:noreply, state}
end end
@ -219,7 +230,6 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
Evaluator.evaluate_code( Evaluator.evaluate_code(
state.evaluators[container_ref], state.evaluators[container_ref],
state.owner,
code, code,
evaluation_ref, evaluation_ref,
prev_evaluation_ref, prev_evaluation_ref,
@ -289,7 +299,9 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
{:ok, evaluator} = {:ok, evaluator} =
ErlDist.EvaluatorSupervisor.start_evaluator( ErlDist.EvaluatorSupervisor.start_evaluator(
state.evaluator_supervisor, state.evaluator_supervisor,
state.object_tracker send_to: state.owner,
runtime_broadcast_to: state.runtime_broadcast_to,
object_tracker: state.object_tracker
) )
Process.monitor(evaluator.pid) Process.monitor(evaluator.pid)

View file

@ -136,8 +136,8 @@ end
defimpl Livebook.Runtime, for: Livebook.Runtime.MixStandalone do defimpl Livebook.Runtime, for: Livebook.Runtime.MixStandalone do
alias Livebook.Runtime.ErlDist alias Livebook.Runtime.ErlDist
def connect(runtime) do def connect(runtime, opts \\ []) do
ErlDist.RuntimeServer.set_owner(runtime.server_pid, self()) ErlDist.RuntimeServer.attach(runtime.server_pid, self(), opts)
Process.monitor(runtime.server_pid) Process.monitor(runtime.server_pid)
end end

View file

@ -478,7 +478,11 @@ defmodule Livebook.Session do
@impl true @impl true
def init(opts) do def init(opts) do
with {:ok, state} <- init_state(opts), id = Keyword.fetch!(opts, :id)
{:ok, worker_pid} = Livebook.Session.Worker.start_link(id)
with {:ok, state} <- init_state(id, worker_pid, opts),
:ok <- :ok <-
if(copy_images_from = opts[:copy_images_from], if(copy_images_from = opts[:copy_images_from],
do: copy_images(state, copy_images_from), do: copy_images(state, copy_images_from),
@ -497,9 +501,7 @@ defmodule Livebook.Session do
end end
end end
defp init_state(opts) do defp init_state(id, worker_pid, opts) do
id = Keyword.fetch!(opts, :id)
with {:ok, data} <- init_data(opts) do with {:ok, data} <- init_data(opts) do
state = %{ state = %{
session_id: id, session_id: id,
@ -510,7 +512,8 @@ defmodule Livebook.Session do
autosave_path: opts[:autosave_path], autosave_path: opts[:autosave_path],
save_task_pid: nil, save_task_pid: nil,
saved_default_file: nil, saved_default_file: nil,
memory_usage: %{runtime: nil, system: Livebook.SystemResources.memory()} memory_usage: %{runtime: nil, system: Livebook.SystemResources.memory()},
worker_pid: worker_pid
} }
{:ok, state} {:ok, state}
@ -763,11 +766,9 @@ defmodule Livebook.Session do
Runtime.disconnect(old_runtime) Runtime.disconnect(old_runtime)
end end
runtime_monitor_ref = Runtime.connect(runtime) state = do_connect_runtime(runtime, state)
{:noreply, {:noreply, handle_operation(state, {:set_runtime, client_pid, runtime})}
%{state | runtime_monitor_ref: runtime_monitor_ref}
|> handle_operation({:set_runtime, client_pid, runtime})}
end end
def handle_cast({:set_file, client_pid, file}, state) do def handle_cast({:set_file, client_pid, file}, state) do
@ -845,12 +846,6 @@ defmodule Livebook.Session do
{:noreply, state} {:noreply, state}
end end
def handle_info({:runtime_broadcast, topic, subtopic, message}, state) do
full_topic = runtime_messages_topic(state.session_id, topic, subtopic)
Phoenix.PubSub.broadcast(Livebook.PubSub, full_topic, message)
{:noreply, state}
end
def handle_info({:container_down, container_ref, message}, state) do def handle_info({:container_down, container_ref, message}, state) do
broadcast_error(state.session_id, "evaluation process terminated - #{message}") broadcast_error(state.session_id, "evaluation process terminated - #{message}")
@ -1019,6 +1014,11 @@ defmodule Livebook.Session do
end) end)
end end
defp do_connect_runtime(runtime, state) do
runtime_monitor_ref = Runtime.connect(runtime, runtime_broadcast_to: state.worker_pid)
%{state | runtime_monitor_ref: runtime_monitor_ref}
end
# Given any operation on `Livebook.Session.Data`, the process # Given any operation on `Livebook.Session.Data`, the process
# does the following: # does the following:
# #
@ -1134,10 +1134,8 @@ defmodule Livebook.Session do
case apply(runtime_module, :init, args) do case apply(runtime_module, :init, args) do
{:ok, runtime} -> {:ok, runtime} ->
runtime_monitor_ref = Runtime.connect(runtime) state = do_connect_runtime(runtime, state)
handle_operation(state, {:set_runtime, self(), runtime})
%{state | runtime_monitor_ref: runtime_monitor_ref}
|> handle_operation({:set_runtime, self(), runtime})
{:error, error} -> {:error, error} ->
broadcast_error(state.session_id, "failed to setup runtime - #{error}") broadcast_error(state.session_id, "failed to setup runtime - #{error}")
@ -1332,7 +1330,8 @@ defmodule Livebook.Session do
) )
end end
defp runtime_messages_topic(session_id, topic, subtopic) do @doc false
def runtime_messages_topic(session_id, topic, subtopic) do
"sessions:#{session_id}:runtime_messages:#{topic}:#{subtopic}" "sessions:#{session_id}:runtime_messages:#{topic}:#{subtopic}"
end end

View file

@ -0,0 +1,28 @@
defmodule Livebook.Session.Worker do
@moduledoc false
# A dedicated process for offloading the session process,
# when the session state is not necessary.
#
# In particular, this process handles broadcast messages
# sent from within the runtime and distributes them to the
# actual subscribers via pubsub.
use GenServer
def start_link(session_id) do
GenServer.start_link(__MODULE__, {session_id})
end
@impl true
def init({session_id}) do
{:ok, %{session_id: session_id}}
end
@impl true
def handle_info({:runtime_broadcast, topic, subtopic, message}, state) do
full_topic = Livebook.Session.runtime_messages_topic(state.session_id, topic, subtopic)
Phoenix.PubSub.broadcast(Livebook.PubSub, full_topic, message)
{:noreply, state}
end
end

View file

@ -5,12 +5,13 @@ defmodule Livebook.Evaluator.IOProxyTest do
alias Livebook.Evaluator.IOProxy alias Livebook.Evaluator.IOProxy
setup do setup do
# {:ok, io} = IOProxy.start_link()
{:ok, object_tracker} = start_supervised(Evaluator.ObjectTracker) {:ok, object_tracker} = start_supervised(Evaluator.ObjectTracker)
{:ok, _pid, evaluator} = start_supervised({Evaluator, [object_tracker: object_tracker]})
{:ok, _pid, evaluator} =
start_supervised({Evaluator, [send_to: self(), object_tracker: object_tracker]})
io = Process.info(evaluator.pid)[:group_leader] io = Process.info(evaluator.pid)[:group_leader]
IOProxy.configure(io, self(), :ref) IOProxy.configure(io, :ref)
%{io: io} %{io: io}
end end
@ -41,30 +42,41 @@ defmodule Livebook.Evaluator.IOProxyTest do
describe "input" do describe "input" do
test "responds to Livebook input request", %{io: io} do test "responds to Livebook input request", %{io: io} do
configure_owner_with_input(io, "input1", :value) pid =
spawn(fn ->
assert livebook_get_input_value(io, "input1") == {:ok, :value}
end)
assert livebook_get_input_value(io, "input1") == {:ok, :value} reply_to_input_request(:ref, "input1", {:ok, :value}, 1)
await_termination(pid)
end end
test "responds to subsequent requests with the same value", %{io: io} do test "responds to subsequent requests with the same value", %{io: io} do
configure_owner_with_input(io, "input1", :value) pid =
spawn(fn ->
assert livebook_get_input_value(io, "input1") == {:ok, :value}
assert livebook_get_input_value(io, "input1") == {:ok, :value}
end)
assert livebook_get_input_value(io, "input1") == {:ok, :value} reply_to_input_request(:ref, "input1", {:ok, :value}, 1)
assert livebook_get_input_value(io, "input1") == {:ok, :value}
await_termination(pid)
end end
test "clear_input_cache/1 clears all cached input information", %{io: io} do test "clear_input_cache/1 clears all cached input information", %{io: io} do
pid = pid =
spawn_link(fn -> spawn_link(fn ->
reply_to_input_request(:ref, "input1", {:ok, :value1}, 1) IOProxy.configure(io, :ref)
reply_to_input_request(:ref, "input1", {:ok, :value2}, 1) assert livebook_get_input_value(io, "input1") == {:ok, :value1}
IOProxy.clear_input_cache(io)
assert livebook_get_input_value(io, "input1") == {:ok, :value2}
end) end)
IOProxy.configure(io, pid, :ref) reply_to_input_request(:ref, "input1", {:ok, :value1}, 1)
reply_to_input_request(:ref, "input1", {:ok, :value2}, 1)
assert livebook_get_input_value(io, "input1") == {:ok, :value1} await_termination(pid)
IOProxy.clear_input_cache(io)
assert livebook_get_input_value(io, "input1") == {:ok, :value2}
end end
end end
@ -94,25 +106,25 @@ defmodule Livebook.Evaluator.IOProxyTest do
describe "token requests" do describe "token requests" do
test "returns different tokens for subsequent calls", %{io: io} do test "returns different tokens for subsequent calls", %{io: io} do
IOProxy.configure(io, self(), :ref1) IOProxy.configure(io, :ref1)
token1 = livebook_generate_token(io) token1 = livebook_generate_token(io)
token2 = livebook_generate_token(io) token2 = livebook_generate_token(io)
assert token1 != token2 assert token1 != token2
end end
test "returns different tokens for different refs", %{io: io} do test "returns different tokens for different refs", %{io: io} do
IOProxy.configure(io, self(), :ref1) IOProxy.configure(io, :ref1)
token1 = livebook_generate_token(io) token1 = livebook_generate_token(io)
IOProxy.configure(io, self(), :ref2) IOProxy.configure(io, :ref2)
token2 = livebook_generate_token(io) token2 = livebook_generate_token(io)
assert token1 != token2 assert token1 != token2
end end
test "returns same tokens for the same ref", %{io: io} do test "returns same tokens for the same ref", %{io: io} do
IOProxy.configure(io, self(), :ref) IOProxy.configure(io, :ref)
token1 = livebook_generate_token(io) token1 = livebook_generate_token(io)
token2 = livebook_generate_token(io) token2 = livebook_generate_token(io)
IOProxy.configure(io, self(), :ref) IOProxy.configure(io, :ref)
token3 = livebook_generate_token(io) token3 = livebook_generate_token(io)
token4 = livebook_generate_token(io) token4 = livebook_generate_token(io)
assert token1 == token3 assert token1 == token3
@ -122,15 +134,6 @@ defmodule Livebook.Evaluator.IOProxyTest do
# Helpers # Helpers
defp configure_owner_with_input(io, input_id, value) do
pid =
spawn_link(fn ->
reply_to_input_request(:ref, input_id, {:ok, value}, 1)
end)
IOProxy.configure(io, pid, :ref)
end
defp reply_to_input_request(_ref, _input_id, _reply, 0), do: :ok defp reply_to_input_request(_ref, _input_id, _reply, 0), do: :ok
defp reply_to_input_request(ref, input_id, reply, times) do defp reply_to_input_request(ref, input_id, reply, times) do
@ -159,4 +162,9 @@ defmodule Livebook.Evaluator.IOProxyTest do
assert_receive {:io_reply, ^ref, reply} assert_receive {:io_reply, ^ref, reply}
reply reply
end end
defp await_termination(pid) do
ref = Process.monitor(pid)
assert_receive {:DOWN, ^ref, :process, _, _}
end
end end

View file

@ -5,7 +5,10 @@ defmodule Livebook.EvaluatorTest do
setup do setup do
{:ok, object_tracker} = start_supervised(Evaluator.ObjectTracker) {:ok, object_tracker} = start_supervised(Evaluator.ObjectTracker)
{:ok, _pid, evaluator} = start_supervised({Evaluator, [object_tracker: object_tracker]})
{:ok, _pid, evaluator} =
start_supervised({Evaluator, [send_to: self(), object_tracker: object_tracker]})
%{evaluator: evaluator, object_tracker: object_tracker} %{evaluator: evaluator, object_tracker: object_tracker}
end end
@ -23,7 +26,7 @@ defmodule Livebook.EvaluatorTest do
x + y x + y
""" """
Evaluator.evaluate_code(evaluator, self(), code, :code_1) Evaluator.evaluate_code(evaluator, code, :code_1)
assert_receive {:evaluation_response, :code_1, {:ok, 3}, metadata() = metadata} assert_receive {:evaluation_response, :code_1, {:ok, 3}, metadata() = metadata}
assert metadata.evaluation_time_ms >= 0 assert metadata.evaluation_time_ms >= 0
@ -33,11 +36,11 @@ defmodule Livebook.EvaluatorTest do
end end
test "given no prev_ref does not see previous evaluation context", %{evaluator: evaluator} do test "given no prev_ref does not see previous evaluation context", %{evaluator: evaluator} do
Evaluator.evaluate_code(evaluator, self(), "x = 1", :code_1) Evaluator.evaluate_code(evaluator, "x = 1", :code_1)
assert_receive {:evaluation_response, :code_1, _, metadata()} assert_receive {:evaluation_response, :code_1, _, metadata()}
ignore_warnings(fn -> ignore_warnings(fn ->
Evaluator.evaluate_code(evaluator, self(), "x", :code_2) Evaluator.evaluate_code(evaluator, "x", :code_2)
assert_receive {:evaluation_response, :code_2, assert_receive {:evaluation_response, :code_2,
{:error, _kind, {:error, _kind,
@ -48,22 +51,22 @@ defmodule Livebook.EvaluatorTest do
end end
test "given prev_ref sees previous evaluation context", %{evaluator: evaluator} do test "given prev_ref sees previous evaluation context", %{evaluator: evaluator} do
Evaluator.evaluate_code(evaluator, self(), "x = 1", :code_1) Evaluator.evaluate_code(evaluator, "x = 1", :code_1)
assert_receive {:evaluation_response, :code_1, _, metadata()} assert_receive {:evaluation_response, :code_1, _, metadata()}
Evaluator.evaluate_code(evaluator, self(), "x", :code_2, :code_1) Evaluator.evaluate_code(evaluator, "x", :code_2, :code_1)
assert_receive {:evaluation_response, :code_2, {:ok, 1}, metadata()} assert_receive {:evaluation_response, :code_2, {:ok, 1}, metadata()}
end end
test "given invalid prev_ref just uses default context", %{evaluator: evaluator} do test "given invalid prev_ref just uses default context", %{evaluator: evaluator} do
Evaluator.evaluate_code(evaluator, self(), ":hey", :code_1, :code_nonexistent) Evaluator.evaluate_code(evaluator, ":hey", :code_1, :code_nonexistent)
assert_receive {:evaluation_response, :code_1, {:ok, :hey}, metadata()} assert_receive {:evaluation_response, :code_1, {:ok, :hey}, metadata()}
end end
test "captures standard output and sends it to the caller", %{evaluator: evaluator} do test "captures standard output and sends it to the caller", %{evaluator: evaluator} do
Evaluator.evaluate_code(evaluator, self(), ~s{IO.puts("hey")}, :code_1) Evaluator.evaluate_code(evaluator, ~s{IO.puts("hey")}, :code_1)
assert_receive {:evaluation_output, :code_1, {:stdout, "hey\n"}} assert_receive {:evaluation_output, :code_1, {:stdout, "hey\n"}}
end end
@ -78,7 +81,7 @@ defmodule Livebook.EvaluatorTest do
end end
""" """
Evaluator.evaluate_code(evaluator, self(), code, :code_1) Evaluator.evaluate_code(evaluator, code, :code_1)
assert_receive {:evaluation_input, :code_1, reply_to, "input1"} assert_receive {:evaluation_input, :code_1, reply_to, "input1"}
send(reply_to, {:evaluation_input_reply, {:ok, :value}}) send(reply_to, {:evaluation_input_reply, {:ok, :value}})
@ -91,7 +94,7 @@ defmodule Livebook.EvaluatorTest do
List.first(%{}) List.first(%{})
""" """
Evaluator.evaluate_code(evaluator, self(), code, :code_1, nil, file: "file.ex") Evaluator.evaluate_code(evaluator, code, :code_1, nil, file: "file.ex")
assert_receive {:evaluation_response, :code_1, assert_receive {:evaluation_response, :code_1,
{:error, :error, :function_clause, [{List, :first, _arity, _location}]}, {:error, :error, :function_clause, [{List, :first, _arity, _location}]},
@ -101,7 +104,7 @@ defmodule Livebook.EvaluatorTest do
test "returns additional metadata when there is a syntax error", %{evaluator: evaluator} do test "returns additional metadata when there is a syntax error", %{evaluator: evaluator} do
code = "1+" code = "1+"
Evaluator.evaluate_code(evaluator, self(), code, :code_1, nil, file: "file.ex") Evaluator.evaluate_code(evaluator, code, :code_1, nil, file: "file.ex")
assert_receive {:evaluation_response, :code_1, {:error, :error, %TokenMissingError{}, []}, assert_receive {:evaluation_response, :code_1, {:error, :error, %TokenMissingError{}, []},
%{ %{
@ -115,7 +118,7 @@ defmodule Livebook.EvaluatorTest do
test "returns additional metadata when there is a compilation error", %{evaluator: evaluator} do test "returns additional metadata when there is a compilation error", %{evaluator: evaluator} do
code = "x" code = "x"
Evaluator.evaluate_code(evaluator, self(), code, :code_1, nil, file: "file.ex") Evaluator.evaluate_code(evaluator, code, :code_1, nil, file: "file.ex")
assert_receive {:evaluation_response, :code_1, {:error, :error, %CompileError{}, []}, assert_receive {:evaluation_response, :code_1, {:error, :error, %CompileError{}, []},
%{ %{
@ -131,7 +134,7 @@ defmodule Livebook.EvaluatorTest do
Code.eval_string("x") Code.eval_string("x")
""" """
Evaluator.evaluate_code(evaluator, self(), code, :code_1, nil, file: "file.ex") Evaluator.evaluate_code(evaluator, code, :code_1, nil, file: "file.ex")
expected_stacktrace = [{Code, :validated_eval_string, 3, [file: 'lib/code.ex', line: 404]}] expected_stacktrace = [{Code, :validated_eval_string, 3, [file: 'lib/code.ex', line: 404]}]
@ -160,7 +163,7 @@ defmodule Livebook.EvaluatorTest do
""" """
ignore_warnings(fn -> ignore_warnings(fn ->
Evaluator.evaluate_code(evaluator, self(), code, :code_1) Evaluator.evaluate_code(evaluator, code, :code_1)
expected_stacktrace = [ expected_stacktrace = [
{Livebook.EvaluatorTest.Stacktrace.Math, :bad_math, 0, [file: 'nofile', line: 3]}, {Livebook.EvaluatorTest.Stacktrace.Math, :bad_math, 0, [file: 'nofile', line: 3]},
@ -188,14 +191,14 @@ defmodule Livebook.EvaluatorTest do
x * x x * x
""" """
Evaluator.evaluate_code(evaluator, self(), code1, :code_1) Evaluator.evaluate_code(evaluator, code1, :code_1)
assert_receive {:evaluation_response, :code_1, {:ok, _}, metadata()} assert_receive {:evaluation_response, :code_1, {:ok, _}, metadata()}
Evaluator.evaluate_code(evaluator, self(), code2, :code_2, :code_1) Evaluator.evaluate_code(evaluator, code2, :code_2, :code_1)
assert_receive {:evaluation_response, :code_2, {:error, _, _, _}, metadata()} assert_receive {:evaluation_response, :code_2, {:error, _, _, _}, metadata()}
Evaluator.evaluate_code(evaluator, self(), code3, :code_3, :code_2) Evaluator.evaluate_code(evaluator, code3, :code_3, :code_2)
assert_receive {:evaluation_response, :code_3, {:ok, 4}, metadata()} assert_receive {:evaluation_response, :code_3, {:ok, 4}, metadata()}
end end
@ -205,7 +208,7 @@ defmodule Livebook.EvaluatorTest do
""" """
opts = [file: "/path/dir/file"] opts = [file: "/path/dir/file"]
Evaluator.evaluate_code(evaluator, self(), code, :code_1, nil, opts) Evaluator.evaluate_code(evaluator, code, :code_1, nil, opts)
assert_receive {:evaluation_response, :code_1, {:ok, "/path/dir"}, metadata()} assert_receive {:evaluation_response, :code_1, {:ok, "/path/dir"}, metadata()}
end end
@ -215,13 +218,13 @@ defmodule Livebook.EvaluatorTest do
# The evaluation reference is the same, so the second one overrides # The evaluation reference is the same, so the second one overrides
# the first one and the first widget should eventually be kiled. # the first one and the first widget should eventually be kiled.
Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1) Evaluator.evaluate_code(evaluator, spawn_widget_code(), :code_1)
assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}, metadata()} assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}, metadata()}
ref = Process.monitor(widget_pid1) ref = Process.monitor(widget_pid1)
Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1) Evaluator.evaluate_code(evaluator, spawn_widget_code(), :code_1)
assert_receive {:evaluation_response, :code_1, {:ok, widget_pid2}, metadata()} assert_receive {:evaluation_response, :code_1, {:ok, widget_pid2}, metadata()}
@ -234,12 +237,7 @@ defmodule Livebook.EvaluatorTest do
# The widget is spawned from a process that terminates, # The widget is spawned from a process that terminates,
# so the widget should terminate immediately as well # so the widget should terminate immediately as well
Evaluator.evaluate_code( Evaluator.evaluate_code(evaluator, spawn_widget_from_terminating_process_code(), :code_1)
evaluator,
self(),
spawn_widget_from_terminating_process_code(),
:code_1
)
assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}, metadata()} assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}, metadata()}
@ -249,13 +247,13 @@ defmodule Livebook.EvaluatorTest do
describe "forget_evaluation/2" do describe "forget_evaluation/2" do
test "invalidates the given reference", %{evaluator: evaluator} do test "invalidates the given reference", %{evaluator: evaluator} do
Evaluator.evaluate_code(evaluator, self(), "x = 1", :code_1) Evaluator.evaluate_code(evaluator, "x = 1", :code_1)
assert_receive {:evaluation_response, :code_1, _, metadata()} assert_receive {:evaluation_response, :code_1, _, metadata()}
Evaluator.forget_evaluation(evaluator, :code_1) Evaluator.forget_evaluation(evaluator, :code_1)
ignore_warnings(fn -> ignore_warnings(fn ->
Evaluator.evaluate_code(evaluator, self(), "x", :code_2, :code_1) Evaluator.evaluate_code(evaluator, "x", :code_2, :code_1)
assert_receive {:evaluation_response, :code_2, assert_receive {:evaluation_response, :code_2,
{:error, _kind, {:error, _kind,
@ -266,7 +264,7 @@ defmodule Livebook.EvaluatorTest do
end end
test "kills widgets that no evaluation points to", %{evaluator: evaluator} do test "kills widgets that no evaluation points to", %{evaluator: evaluator} do
Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1) Evaluator.evaluate_code(evaluator, spawn_widget_code(), :code_1)
assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}, metadata()} assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}, metadata()}
@ -280,30 +278,32 @@ defmodule Livebook.EvaluatorTest do
describe "initialize_from/3" do describe "initialize_from/3" do
setup %{object_tracker: object_tracker} do setup %{object_tracker: object_tracker} do
{:ok, _pid, parent_evaluator} = {:ok, _pid, parent_evaluator} =
start_supervised({Evaluator, [object_tracker: object_tracker]}, id: :parent_evaluator) start_supervised({Evaluator, [send_to: self(), object_tracker: object_tracker]},
id: :parent_evaluator
)
%{parent_evaluator: parent_evaluator} %{parent_evaluator: parent_evaluator}
end end
test "copies the given context and sets as the initial one", test "copies the given context and sets as the initial one",
%{evaluator: evaluator, parent_evaluator: parent_evaluator} do %{evaluator: evaluator, parent_evaluator: parent_evaluator} do
Evaluator.evaluate_code(parent_evaluator, self(), "x = 1", :code_1) Evaluator.evaluate_code(parent_evaluator, "x = 1", :code_1)
assert_receive {:evaluation_response, :code_1, _, metadata()} assert_receive {:evaluation_response, :code_1, _, metadata()}
Evaluator.initialize_from(evaluator, parent_evaluator, :code_1) Evaluator.initialize_from(evaluator, parent_evaluator, :code_1)
Evaluator.evaluate_code(evaluator, self(), "x", :code_2) Evaluator.evaluate_code(evaluator, "x", :code_2)
assert_receive {:evaluation_response, :code_2, {:ok, 1}, metadata()} assert_receive {:evaluation_response, :code_2, {:ok, 1}, metadata()}
end end
test "mirrors process dictionary of the given evaluator", test "mirrors process dictionary of the given evaluator",
%{evaluator: evaluator, parent_evaluator: parent_evaluator} do %{evaluator: evaluator, parent_evaluator: parent_evaluator} do
Evaluator.evaluate_code(parent_evaluator, self(), "Process.put(:data, 1)", :code_1) Evaluator.evaluate_code(parent_evaluator, "Process.put(:data, 1)", :code_1)
assert_receive {:evaluation_response, :code_1, _, metadata()} assert_receive {:evaluation_response, :code_1, _, metadata()}
Evaluator.initialize_from(evaluator, parent_evaluator, :code_1) Evaluator.initialize_from(evaluator, parent_evaluator, :code_1)
Evaluator.evaluate_code(evaluator, self(), "Process.get(:data)", :code_2) Evaluator.evaluate_code(evaluator, "Process.get(:data)", :code_2)
assert_receive {:evaluation_response, :code_2, {:ok, 1}, metadata()} assert_receive {:evaluation_response, :code_2, {:ok, 1}, metadata()}
end end
end end

View file

@ -8,12 +8,13 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do
start_supervised({NodeManager, [unload_modules_on_termination: false, anonymous: true]}) start_supervised({NodeManager, [unload_modules_on_termination: false, anonymous: true]})
runtime_server_pid = NodeManager.start_runtime_server(manager_pid) runtime_server_pid = NodeManager.start_runtime_server(manager_pid)
RuntimeServer.set_owner(runtime_server_pid, self()) RuntimeServer.attach(runtime_server_pid, self())
{:ok, %{pid: runtime_server_pid}} {:ok, %{pid: runtime_server_pid, manager_pid: manager_pid}}
end end
describe "set_owner/2" do describe "attach/2" do
test "starts watching the given process and terminates as soon as it terminates", %{pid: pid} do test "starts watching the given process and terminates as soon as it terminates",
%{manager_pid: manager_pid} do
owner = owner =
spawn(fn -> spawn(fn ->
receive do receive do
@ -21,7 +22,8 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do
end end
end) end)
RuntimeServer.set_owner(pid, owner) pid = NodeManager.start_runtime_server(manager_pid)
RuntimeServer.attach(pid, owner)
# Make sure the node is running. # Make sure the node is running.
assert Process.alive?(pid) assert Process.alive?(pid)

View file

@ -9,7 +9,7 @@ defmodule Livebook.Runtime.NoopRuntime do
def new(), do: %__MODULE__{} def new(), do: %__MODULE__{}
defimpl Livebook.Runtime do defimpl Livebook.Runtime do
def connect(_), do: make_ref() def connect(_, _), do: make_ref()
def disconnect(_), do: :ok def disconnect(_), do: :ok
def evaluate_code(_, _, _, _, _ \\ []), do: :ok def evaluate_code(_, _, _, _, _ \\ []), do: :ok
def forget_evaluation(_, _), do: :ok def forget_evaluation(_, _), do: :ok