Simplify communication with evaluation group leader (#1816)

This commit is contained in:
Jonatan Kłosko 2023-03-21 20:31:55 +01:00 committed by GitHub
parent d69cfc3d6d
commit eba8bcac59
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 57 deletions

View file

@ -414,7 +414,7 @@ defmodule Livebook.Runtime.Evaluator do
file = Keyword.get(opts, :file, "nofile")
context = put_in(context.env.file, file)
Evaluator.IOProxy.configure(state.io_proxy, ref, file)
Evaluator.IOProxy.before_evaluation(state.io_proxy, ref, file)
set_pdict(context, state.ignored_pdict_keys)
@ -422,7 +422,7 @@ defmodule Livebook.Runtime.Evaluator do
eval_result = eval(code, context.binding, context.env)
evaluation_time_ms = time_diff_ms(start_time)
tracer_info = Evaluator.IOProxy.get_tracer_info(state.io_proxy)
%{tracer_info: tracer_info} = Evaluator.IOProxy.after_evaluation(state.io_proxy)
{new_context, result, code_error, identifiers_used, identifiers_defined} =
case eval_result do
@ -461,9 +461,6 @@ defmodule Livebook.Runtime.Evaluator do
state = put_context(state, ref, new_context)
Evaluator.IOProxy.flush(state.io_proxy)
Evaluator.IOProxy.clear_input_cache(state.io_proxy)
output = state.formatter.format_result(result)
metadata = %{

View file

@ -24,8 +24,7 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
Starts an IO device process.
For all supported requests a message is sent to the configured
`:send_to` process, so this device serves as a proxy. Make sure
to also call configure/3` before every evaluation.
`:send_to` process, so this device serves as a proxy.
"""
@spec start(pid(), pid(), pid(), pid(), String.t() | nil) :: GenServer.on_start()
def start(evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path) do
@ -51,27 +50,17 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
The given reference is attached to all the proxied messages.
"""
@spec configure(pid(), Evaluator.ref(), String.t()) :: :ok
def configure(pid, ref, file) do
GenServer.cast(pid, {:configure, ref, file})
@spec before_evaluation(pid(), Evaluator.ref(), String.t()) :: :ok
def before_evaluation(pid, ref, file) do
GenServer.cast(pid, {:before_evaluation, ref, file})
end
@doc """
Synchronously clears the buffered output and sends it to the
configured `:send_to` process.
Flushes any buffered output and returns gathered metadata.
"""
@spec flush(pid()) :: :ok
def flush(pid) do
GenServer.call(pid, :flush)
end
@doc """
Asynchronously clears all cached inputs, so on next read they
are requested again.
"""
@spec clear_input_cache(pid()) :: :ok
def clear_input_cache(pid) do
GenServer.cast(pid, :clear_input_cache)
@spec after_evaluation(pid()) :: %{tracer_info: %Evaluator.Tracer{}}
def after_evaluation(pid) do
GenServer.call(pid, :after_evaluation)
end
@doc """
@ -82,14 +71,6 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
GenServer.cast(pid, {:tracer_updates, updates})
end
@doc """
Returns the accumulated tracer info.
"""
@spec get_tracer_info(pid()) :: %Evaluator.Tracer{}
def get_tracer_info(pid) do
GenServer.call(pid, :get_tracer_info)
end
@impl true
def init({evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path}) do
evaluator_monitor = Process.monitor(evaluator)
@ -114,12 +95,16 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
end
@impl true
def handle_cast({:configure, ref, file}, state) do
{:noreply, %{state | ref: ref, file: file, token_count: 0, tracer_info: %Evaluator.Tracer{}}}
end
def handle_cast(:clear_input_cache, state) do
{:noreply, %{state | input_cache: %{}}}
def handle_cast({:before_evaluation, ref, file}, state) do
{:noreply,
%{
state
| ref: ref,
file: file,
token_count: 0,
input_cache: %{},
tracer_info: %Evaluator.Tracer{}
}}
end
def handle_cast({:tracer_updates, updates}, state) do
@ -134,12 +119,10 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
end
@impl true
def handle_call(:flush, _from, state) do
{:reply, :ok, flush_buffer(state)}
end
def handle_call(:get_tracer_info, _from, state) do
{:reply, state.tracer_info, state}
def handle_call(:after_evaluation, _from, state) do
state = flush_buffer(state)
info = %{tracer_info: state.tracer_info}
{:reply, info, state}
end
@impl true

View file

@ -11,7 +11,7 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do
start_supervised({Evaluator, [send_to: self(), object_tracker: object_tracker]})
io = Process.info(evaluator.pid)[:group_leader]
IOProxy.configure(io, :ref, "cell")
IOProxy.before_evaluation(io, :ref, "cell")
%{io: io}
end
@ -64,12 +64,12 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do
await_termination(pid)
end
test "clear_input_cache/1 clears all cached input information", %{io: io} do
test "before_evaluation/3 clears all cached input information", %{io: io} do
pid =
spawn_link(fn ->
IOProxy.configure(io, :ref, "cell")
IOProxy.before_evaluation(io, :ref, "cell")
assert livebook_get_input_value(io, "input1") == {:ok, :value1}
IOProxy.clear_input_cache(io)
IOProxy.before_evaluation(io, :ref, "cell")
assert livebook_get_input_value(io, "input1") == {:ok, :value2}
end)
@ -92,9 +92,9 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do
assert_receive {:runtime_evaluation_output, :ref, {:stdout, "\roverride\r"}}
end
test "flush/1 synchronously sends buffer contents", %{io: io} do
test "after_evaluation/1 synchronously sends buffer contents", %{io: io} do
IO.puts(io, "hey")
IOProxy.flush(io)
IOProxy.after_evaluation(io)
assert_received {:runtime_evaluation_output, :ref, {:stdout, "hey\n"}}
end
@ -112,25 +112,25 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do
describe "token requests" do
test "returns different tokens for subsequent calls", %{io: io} do
IOProxy.configure(io, :ref1, "cell1")
IOProxy.before_evaluation(io, :ref1, "cell1")
token1 = livebook_generate_token(io)
token2 = livebook_generate_token(io)
assert token1 != token2
end
test "returns different tokens for different refs", %{io: io} do
IOProxy.configure(io, :ref1, "cell1")
IOProxy.before_evaluation(io, :ref1, "cell1")
token1 = livebook_generate_token(io)
IOProxy.configure(io, :ref2, "cell2")
IOProxy.before_evaluation(io, :ref2, "cell2")
token2 = livebook_generate_token(io)
assert token1 != token2
end
test "returns same tokens for the same ref", %{io: io} do
IOProxy.configure(io, :ref, "cell")
IOProxy.before_evaluation(io, :ref, "cell")
token1 = livebook_generate_token(io)
token2 = livebook_generate_token(io)
IOProxy.configure(io, :ref, "cell")
IOProxy.before_evaluation(io, :ref, "cell")
token3 = livebook_generate_token(io)
token4 = livebook_generate_token(io)
assert token1 == token3
@ -139,8 +139,8 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do
end
describe "evaluation file requests" do
test "returns the configured file", %{io: io} do
IOProxy.configure(io, :ref1, "cell1")
test "returns the before_evaluationd file", %{io: io} do
IOProxy.before_evaluation(io, :ref1, "cell1")
assert livebook_get_evaluation_file(io) == "cell1"
end
end