Track and garbage collect widgets (#344)

This commit is contained in:
Jonatan Kłosko 2021-06-11 15:42:41 +02:00 committed by GitHub
parent 2acb4020ca
commit d3b4a8a9e6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 182 additions and 19 deletions

View file

@ -18,8 +18,13 @@ defmodule Livebook.Evaluator do
@type t :: GenServer.server()
@type state :: %{
formatter: module(),
io_proxy: pid(),
contexts: %{ref() => context()}
contexts: %{ref() => context()},
# We track the widgets rendered by every evaluation,
# so that we can kill those no longer needed
widget_pids: %{ref() => MapSet.t(pid())},
widget_counts: %{pid() => non_neg_integer()}
}
@typedoc """
@ -114,7 +119,9 @@ defmodule Livebook.Evaluator do
%{
formatter: formatter,
io_proxy: io_proxy,
contexts: %{}
contexts: %{},
widget_pids: %{},
widget_counts: %{}
}
end
@ -143,18 +150,27 @@ defmodule Livebook.Evaluator do
{context, response}
end
state = put_in(state.contexts[ref], result_context)
Evaluator.IOProxy.flush(state.io_proxy)
Evaluator.IOProxy.clear_input_buffers(state.io_proxy)
send_evaluation_response(send_to, ref, response, state.formatter)
output = state.formatter.format_response(response)
send(send_to, {:evaluation_response, ref, output})
new_state = put_in(state.contexts[ref], result_context)
{:noreply, new_state}
widget_pids = Evaluator.IOProxy.flush_widgets(state.io_proxy)
state = track_evaluation_widgets(state, ref, widget_pids, output)
{:noreply, state}
end
def handle_cast({:forget_evaluation, ref}, state) do
new_state = %{state | contexts: Map.delete(state.contexts, ref)}
{:noreply, new_state}
state =
state
|> Map.update!(:contexts, &Map.delete(&1, ref))
|> garbage_collect_widgets(ref, [])
{:noreply, state}
end
def handle_cast({:request_completion_items, send_to, ref, hint, evaluation_ref}, state) do
@ -165,11 +181,6 @@ defmodule Livebook.Evaluator do
{:noreply, state}
end
defp send_evaluation_response(send_to, ref, evaluation_response, formatter) do
response = formatter.format_response(evaluation_response)
send(send_to, {:evaluation_response, ref, response})
end
defp eval(code, binding, env) do
try do
quoted = Code.string_to_quoted!(code)
@ -210,4 +221,54 @@ defmodule Livebook.Evaluator do
|> Enum.reverse()
|> Enum.reject(&(elem(&1, 0) in @elixir_internals))
end
# Widgets
defp track_evaluation_widgets(state, ref, widget_pids, output) do
widget_pids =
case widget_pid_from_output(output) do
{:ok, pid} -> MapSet.put(widget_pids, pid)
:error -> widget_pids
end
garbage_collect_widgets(state, ref, widget_pids)
end
defp garbage_collect_widgets(state, ref, widget_pids) do
prev_widget_pids = state.widget_pids[ref] || []
state = put_in(state.widget_pids[ref], widget_pids)
update_in(state.widget_counts, fn counts ->
counts =
Enum.reduce(prev_widget_pids, counts, fn pid, counts ->
Map.update!(counts, pid, &(&1 - 1))
end)
counts =
Enum.reduce(widget_pids, counts, fn pid, counts ->
Map.update(counts, pid, 1, &(&1 + 1))
end)
{to_remove, to_keep} = Enum.split_with(counts, fn {_pid, count} -> count == 0 end)
for {pid, 0} <- to_remove do
Process.exit(pid, :shutdown)
end
Map.new(to_keep)
end)
end
@doc """
Checks the given output value for widget pid to track.
"""
@spec widget_pid_from_output(term()) :: {:ok, pid()} | :error
def widget_pid_from_output(output)
def widget_pid_from_output({_type, pid}) when is_pid(pid) do
{:ok, pid}
end
def widget_pid_from_output(_output), do: :error
end

View file

@ -63,11 +63,27 @@ defmodule Livebook.Evaluator.IOProxy do
GenServer.cast(pid, :clear_input_buffers)
end
@doc """
Returns the accumulated widget pids and clears the accumulator.
"""
@spec flush_widgets(pid()) :: MapSet.t(pid())
def flush_widgets(pid) do
GenServer.call(pid, :flush_widgets)
end
## Callbacks
@impl true
def init(_opts) do
{:ok, %{encoding: :unicode, target: nil, ref: nil, buffer: [], input_buffers: %{}}}
{:ok,
%{
encoding: :unicode,
target: nil,
ref: nil,
buffer: [],
input_buffers: %{},
widget_pids: MapSet.new()
}}
end
@impl true
@ -84,6 +100,10 @@ defmodule Livebook.Evaluator.IOProxy do
{:reply, :ok, flush_buffer(state)}
end
def handle_call(:flush_widgets, _from, state) do
{:reply, state.widget_pids, %{state | widget_pids: MapSet.new()}}
end
@impl true
def handle_info({:io_request, from, reply_as, req}, state) do
{reply, state} = io_request(req, state)
@ -169,6 +189,13 @@ defmodule Livebook.Evaluator.IOProxy do
defp io_request({:livebook_put_output, output}, state) do
state = flush_buffer(state)
send(state.target, {:evaluation_output, state.ref, output})
state =
case Evaluator.widget_pid_from_output(output) do
{:ok, pid} -> update_in(state.widget_pids, &MapSet.put(&1, pid))
:error -> state
end
{:ok, state}
end
@ -266,7 +293,7 @@ defmodule Livebook.Evaluator.IOProxy do
send(from, {:io_reply, reply_as, reply})
end
def flush_buffer(state) do
defp flush_buffer(state) do
string = state.buffer |> Enum.reverse() |> Enum.join()
if state.target != nil and string != "" do

View file

@ -104,13 +104,23 @@ defmodule Livebook.Evaluator.IOProxyTest do
end
test "supports direct livebook output forwarding", %{io: io} do
ref = make_ref()
send(io, {:io_request, self(), ref, {:livebook_put_output, {:text, "[1, 2, 3]"}}})
assert_receive {:io_reply, ^ref, :ok}
put_livebook_output(io, {:text, "[1, 2, 3]"})
assert_received {:evaluation_output, :ref, {:text, "[1, 2, 3]"}}
end
test "flush_widgets/1 returns new widget pids", %{io: io} do
widget1_pid = IEx.Helpers.pid(0, 0, 0)
widget2_pid = IEx.Helpers.pid(0, 0, 1)
put_livebook_output(io, {:vega_lite_dynamic, widget1_pid})
put_livebook_output(io, {:vega_lite_dynamic, widget2_pid})
put_livebook_output(io, {:vega_lite_dynamic, widget1_pid})
assert IOProxy.flush_widgets(io) == MapSet.new([widget1_pid, widget2_pid])
assert IOProxy.flush_widgets(io) == MapSet.new()
end
# Helpers
defp reply_to_input_request(_ref, _prompt, _reply, 0), do: :ok
@ -122,4 +132,10 @@ defmodule Livebook.Evaluator.IOProxyTest do
reply_to_input_request(ref, prompt, reply, times - 1)
end
end
defp put_livebook_output(io, output) do
ref = make_ref()
send(io, {:io_request, self(), ref, {:livebook_put_output, output}})
assert_receive {:io_reply, ^ref, :ok}
end
end

View file

@ -146,6 +146,36 @@ defmodule Livebook.EvaluatorTest do
assert_receive {:evaluation_response, :code_1, {:ok, "/path/dir"}}
end
test "kills widgets that that no evaluation points to", %{evaluator: evaluator} do
# Evaluate the code twice, which spawns two widget processes
# First of them should be eventually killed
Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1)
assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}}
Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1)
assert_receive {:evaluation_response, :code_1, {:ok, widget_pid2}}
ref = Process.monitor(widget_pid1)
assert_receive {:DOWN, ^ref, :process, ^widget_pid1, :shutdown}
assert Process.alive?(widget_pid2)
end
test "does not kill a widget if another evaluation points to it", %{evaluator: evaluator} do
Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1)
assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}}
Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_2)
assert_receive {:evaluation_response, :code_2, {:ok, widget_pid2}}
ref = Process.monitor(widget_pid1)
refute_receive {:DOWN, ^ref, :process, ^widget_pid1, :shutdown}
assert Process.alive?(widget_pid1)
assert Process.alive?(widget_pid2)
end
end
describe "forget_evaluation/2" do
@ -163,6 +193,16 @@ defmodule Livebook.EvaluatorTest do
_stacktrace}}
end)
end
test "kills widgets that no evaluation points to", %{evaluator: evaluator} do
Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1)
assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}}
Evaluator.forget_evaluation(evaluator, :code_1)
ref = Process.monitor(widget_pid1)
assert_receive {:DOWN, ^ref, :process, ^widget_pid1, :shutdown}
end
end
describe "request_completion_items/5" do
@ -172,12 +212,12 @@ defmodule Livebook.EvaluatorTest do
end
test "given evaluation reference uses its bindings and env", %{evaluator: evaluator} do
code1 = """
code = """
alias IO.ANSI
number = 10
"""
Evaluator.evaluate_code(evaluator, self(), code1, :code_1)
Evaluator.evaluate_code(evaluator, self(), code, :code_1)
assert_receive {:evaluation_response, :code_1, _}
Evaluator.request_completion_items(evaluator, self(), :comp_ref, "num", :code_1)
@ -197,4 +237,23 @@ defmodule Livebook.EvaluatorTest do
ExUnit.CaptureIO.capture_io(:stderr, fun)
:ok
end
# Returns a code that spawns and renders a widget process
# and returns its pid from the evaluation
defp spawn_widget_code() do
"""
widget_pid = spawn(fn ->
Process.sleep(:infinity)
end)
ref = make_ref()
send(Process.group_leader(), {:io_request, self(), ref, {:livebook_put_output, {:vega_lite_dynamic, widget_pid}}})
receive do
{:io_reply, ^ref, :ok} -> :ok
end
widget_pid
"""
end
end