From d3b4a8a9e6d2f34d2c309e6cc59774dcf12ba0af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Fri, 11 Jun 2021 15:42:41 +0200 Subject: [PATCH] Track and garbage collect widgets (#344) --- lib/livebook/evaluator.ex | 85 +++++++++++++++++++---- lib/livebook/evaluator/io_proxy.ex | 31 ++++++++- test/livebook/evaluator/io_proxy_test.exs | 22 +++++- test/livebook/evaluator_test.exs | 63 ++++++++++++++++- 4 files changed, 182 insertions(+), 19 deletions(-) diff --git a/lib/livebook/evaluator.ex b/lib/livebook/evaluator.ex index 1a7099973..adbc5601d 100644 --- a/lib/livebook/evaluator.ex +++ b/lib/livebook/evaluator.ex @@ -18,8 +18,13 @@ defmodule Livebook.Evaluator do @type t :: GenServer.server() @type state :: %{ + formatter: module(), io_proxy: pid(), - contexts: %{ref() => context()} + contexts: %{ref() => context()}, + # We track the widgets rendered by every evaluation, + # so that we can kill those no longer needed + widget_pids: %{ref() => MapSet.t(pid())}, + widget_counts: %{pid() => non_neg_integer()} } @typedoc """ @@ -114,7 +119,9 @@ defmodule Livebook.Evaluator do %{ formatter: formatter, io_proxy: io_proxy, - contexts: %{} + contexts: %{}, + widget_pids: %{}, + widget_counts: %{} } end @@ -143,18 +150,27 @@ defmodule Livebook.Evaluator do {context, response} end + state = put_in(state.contexts[ref], result_context) + Evaluator.IOProxy.flush(state.io_proxy) Evaluator.IOProxy.clear_input_buffers(state.io_proxy) - send_evaluation_response(send_to, ref, response, state.formatter) + output = state.formatter.format_response(response) + send(send_to, {:evaluation_response, ref, output}) - new_state = put_in(state.contexts[ref], result_context) - {:noreply, new_state} + widget_pids = Evaluator.IOProxy.flush_widgets(state.io_proxy) + state = track_evaluation_widgets(state, ref, widget_pids, output) + + {:noreply, state} end def handle_cast({:forget_evaluation, ref}, state) do - new_state = %{state | contexts: Map.delete(state.contexts, ref)} - {:noreply, new_state} + state = + state + |> Map.update!(:contexts, &Map.delete(&1, ref)) + |> garbage_collect_widgets(ref, []) + + {:noreply, state} end def handle_cast({:request_completion_items, send_to, ref, hint, evaluation_ref}, state) do @@ -165,11 +181,6 @@ defmodule Livebook.Evaluator do {:noreply, state} end - defp send_evaluation_response(send_to, ref, evaluation_response, formatter) do - response = formatter.format_response(evaluation_response) - send(send_to, {:evaluation_response, ref, response}) - end - defp eval(code, binding, env) do try do quoted = Code.string_to_quoted!(code) @@ -210,4 +221,54 @@ defmodule Livebook.Evaluator do |> Enum.reverse() |> Enum.reject(&(elem(&1, 0) in @elixir_internals)) end + + # Widgets + + defp track_evaluation_widgets(state, ref, widget_pids, output) do + widget_pids = + case widget_pid_from_output(output) do + {:ok, pid} -> MapSet.put(widget_pids, pid) + :error -> widget_pids + end + + garbage_collect_widgets(state, ref, widget_pids) + end + + defp garbage_collect_widgets(state, ref, widget_pids) do + prev_widget_pids = state.widget_pids[ref] || [] + + state = put_in(state.widget_pids[ref], widget_pids) + + update_in(state.widget_counts, fn counts -> + counts = + Enum.reduce(prev_widget_pids, counts, fn pid, counts -> + Map.update!(counts, pid, &(&1 - 1)) + end) + + counts = + Enum.reduce(widget_pids, counts, fn pid, counts -> + Map.update(counts, pid, 1, &(&1 + 1)) + end) + + {to_remove, to_keep} = Enum.split_with(counts, fn {_pid, count} -> count == 0 end) + + for {pid, 0} <- to_remove do + Process.exit(pid, :shutdown) + end + + Map.new(to_keep) + end) + end + + @doc """ + Checks the given output value for widget pid to track. + """ + @spec widget_pid_from_output(term()) :: {:ok, pid()} | :error + def widget_pid_from_output(output) + + def widget_pid_from_output({_type, pid}) when is_pid(pid) do + {:ok, pid} + end + + def widget_pid_from_output(_output), do: :error end diff --git a/lib/livebook/evaluator/io_proxy.ex b/lib/livebook/evaluator/io_proxy.ex index bd5789d11..082775979 100644 --- a/lib/livebook/evaluator/io_proxy.ex +++ b/lib/livebook/evaluator/io_proxy.ex @@ -63,11 +63,27 @@ defmodule Livebook.Evaluator.IOProxy do GenServer.cast(pid, :clear_input_buffers) end + @doc """ + Returns the accumulated widget pids and clears the accumulator. + """ + @spec flush_widgets(pid()) :: MapSet.t(pid()) + def flush_widgets(pid) do + GenServer.call(pid, :flush_widgets) + end + ## Callbacks @impl true def init(_opts) do - {:ok, %{encoding: :unicode, target: nil, ref: nil, buffer: [], input_buffers: %{}}} + {:ok, + %{ + encoding: :unicode, + target: nil, + ref: nil, + buffer: [], + input_buffers: %{}, + widget_pids: MapSet.new() + }} end @impl true @@ -84,6 +100,10 @@ defmodule Livebook.Evaluator.IOProxy do {:reply, :ok, flush_buffer(state)} end + def handle_call(:flush_widgets, _from, state) do + {:reply, state.widget_pids, %{state | widget_pids: MapSet.new()}} + end + @impl true def handle_info({:io_request, from, reply_as, req}, state) do {reply, state} = io_request(req, state) @@ -169,6 +189,13 @@ defmodule Livebook.Evaluator.IOProxy do defp io_request({:livebook_put_output, output}, state) do state = flush_buffer(state) send(state.target, {:evaluation_output, state.ref, output}) + + state = + case Evaluator.widget_pid_from_output(output) do + {:ok, pid} -> update_in(state.widget_pids, &MapSet.put(&1, pid)) + :error -> state + end + {:ok, state} end @@ -266,7 +293,7 @@ defmodule Livebook.Evaluator.IOProxy do send(from, {:io_reply, reply_as, reply}) end - def flush_buffer(state) do + defp flush_buffer(state) do string = state.buffer |> Enum.reverse() |> Enum.join() if state.target != nil and string != "" do diff --git a/test/livebook/evaluator/io_proxy_test.exs b/test/livebook/evaluator/io_proxy_test.exs index 183d02611..714b54a74 100644 --- a/test/livebook/evaluator/io_proxy_test.exs +++ b/test/livebook/evaluator/io_proxy_test.exs @@ -104,13 +104,23 @@ defmodule Livebook.Evaluator.IOProxyTest do end test "supports direct livebook output forwarding", %{io: io} do - ref = make_ref() - send(io, {:io_request, self(), ref, {:livebook_put_output, {:text, "[1, 2, 3]"}}}) - assert_receive {:io_reply, ^ref, :ok} + put_livebook_output(io, {:text, "[1, 2, 3]"}) assert_received {:evaluation_output, :ref, {:text, "[1, 2, 3]"}} end + test "flush_widgets/1 returns new widget pids", %{io: io} do + widget1_pid = IEx.Helpers.pid(0, 0, 0) + widget2_pid = IEx.Helpers.pid(0, 0, 1) + + put_livebook_output(io, {:vega_lite_dynamic, widget1_pid}) + put_livebook_output(io, {:vega_lite_dynamic, widget2_pid}) + put_livebook_output(io, {:vega_lite_dynamic, widget1_pid}) + + assert IOProxy.flush_widgets(io) == MapSet.new([widget1_pid, widget2_pid]) + assert IOProxy.flush_widgets(io) == MapSet.new() + end + # Helpers defp reply_to_input_request(_ref, _prompt, _reply, 0), do: :ok @@ -122,4 +132,10 @@ defmodule Livebook.Evaluator.IOProxyTest do reply_to_input_request(ref, prompt, reply, times - 1) end end + + defp put_livebook_output(io, output) do + ref = make_ref() + send(io, {:io_request, self(), ref, {:livebook_put_output, output}}) + assert_receive {:io_reply, ^ref, :ok} + end end diff --git a/test/livebook/evaluator_test.exs b/test/livebook/evaluator_test.exs index 1233bf369..a96eefcd3 100644 --- a/test/livebook/evaluator_test.exs +++ b/test/livebook/evaluator_test.exs @@ -146,6 +146,36 @@ defmodule Livebook.EvaluatorTest do assert_receive {:evaluation_response, :code_1, {:ok, "/path/dir"}} end + + test "kills widgets that that no evaluation points to", %{evaluator: evaluator} do + # Evaluate the code twice, which spawns two widget processes + # First of them should be eventually killed + + Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1) + assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}} + + Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1) + assert_receive {:evaluation_response, :code_1, {:ok, widget_pid2}} + + ref = Process.monitor(widget_pid1) + assert_receive {:DOWN, ^ref, :process, ^widget_pid1, :shutdown} + + assert Process.alive?(widget_pid2) + end + + test "does not kill a widget if another evaluation points to it", %{evaluator: evaluator} do + Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1) + assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}} + + Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_2) + assert_receive {:evaluation_response, :code_2, {:ok, widget_pid2}} + + ref = Process.monitor(widget_pid1) + refute_receive {:DOWN, ^ref, :process, ^widget_pid1, :shutdown} + + assert Process.alive?(widget_pid1) + assert Process.alive?(widget_pid2) + end end describe "forget_evaluation/2" do @@ -163,6 +193,16 @@ defmodule Livebook.EvaluatorTest do _stacktrace}} end) end + + test "kills widgets that no evaluation points to", %{evaluator: evaluator} do + Evaluator.evaluate_code(evaluator, self(), spawn_widget_code(), :code_1) + assert_receive {:evaluation_response, :code_1, {:ok, widget_pid1}} + + Evaluator.forget_evaluation(evaluator, :code_1) + + ref = Process.monitor(widget_pid1) + assert_receive {:DOWN, ^ref, :process, ^widget_pid1, :shutdown} + end end describe "request_completion_items/5" do @@ -172,12 +212,12 @@ defmodule Livebook.EvaluatorTest do end test "given evaluation reference uses its bindings and env", %{evaluator: evaluator} do - code1 = """ + code = """ alias IO.ANSI number = 10 """ - Evaluator.evaluate_code(evaluator, self(), code1, :code_1) + Evaluator.evaluate_code(evaluator, self(), code, :code_1) assert_receive {:evaluation_response, :code_1, _} Evaluator.request_completion_items(evaluator, self(), :comp_ref, "num", :code_1) @@ -197,4 +237,23 @@ defmodule Livebook.EvaluatorTest do ExUnit.CaptureIO.capture_io(:stderr, fun) :ok end + + # Returns a code that spawns and renders a widget process + # and returns its pid from the evaluation + defp spawn_widget_code() do + """ + widget_pid = spawn(fn -> + Process.sleep(:infinity) + end) + + ref = make_ref() + send(Process.group_leader(), {:io_request, self(), ref, {:livebook_put_output, {:vega_lite_dynamic, widget_pid}}}) + + receive do + {:io_reply, ^ref, :ok} -> :ok + end + + widget_pid + """ + end end