From eeadb0811bb90103d7bee36a8d44b680a2abecea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Fri, 2 Sep 2022 23:45:58 +0200 Subject: [PATCH] Wait for processes to be garbage collected before evaluation (#1385) --- lib/livebook/runtime/evaluator.ex | 4 +- lib/livebook/runtime/evaluator/io_proxy.ex | 9 ++- .../runtime/evaluator/object_tracker.ex | 65 +++++++++++++++---- .../runtime/evaluator/object_tracker_test.exs | 38 ++++++++--- 4 files changed, 91 insertions(+), 25 deletions(-) diff --git a/lib/livebook/runtime/evaluator.ex b/lib/livebook/runtime/evaluator.ex index a308e876b..4369a22a5 100644 --- a/lib/livebook/runtime/evaluator.ex +++ b/lib/livebook/runtime/evaluator.ex @@ -308,7 +308,7 @@ defmodule Livebook.Runtime.Evaluator do end defp handle_cast({:evaluate_code, code, ref, base_ref, opts}, state) do - Evaluator.ObjectTracker.remove_reference(state.object_tracker, {self(), ref}) + Evaluator.ObjectTracker.remove_reference_sync(state.object_tracker, {self(), ref}) context = get_context(state, base_ref) file = Keyword.get(opts, :file, "nofile") @@ -357,7 +357,7 @@ defmodule Livebook.Runtime.Evaluator do defp handle_cast({:forget_evaluation, ref}, state) do state = delete_context(state, ref) - Evaluator.ObjectTracker.remove_reference(state.object_tracker, {self(), ref}) + Evaluator.ObjectTracker.remove_reference_sync(state.object_tracker, {self(), ref}) :erlang.garbage_collect(self()) {:noreply, state} diff --git a/lib/livebook/runtime/evaluator/io_proxy.ex b/lib/livebook/runtime/evaluator/io_proxy.ex index f82b7e4de..bdc3e1556 100644 --- a/lib/livebook/runtime/evaluator/io_proxy.ex +++ b/lib/livebook/runtime/evaluator/io_proxy.ex @@ -221,8 +221,15 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do {:ok, state} end + # Used until Kino v0.7 defp io_request({:livebook_monitor_object, object, destination, payload}, state) do - reply = Evaluator.ObjectTracker.monitor(state.object_tracker, object, destination, payload) + io_request({:livebook_monitor_object, object, destination, payload, false}, state) + end + + defp io_request({:livebook_monitor_object, object, destination, payload, ack?}, state) do + reply = + Evaluator.ObjectTracker.monitor(state.object_tracker, object, destination, payload, ack?) + {reply, state} end diff --git a/lib/livebook/runtime/evaluator/object_tracker.ex b/lib/livebook/runtime/evaluator/object_tracker.ex index 22c4c0773..98bb524df 100644 --- a/lib/livebook/runtime/evaluator/object_tracker.ex +++ b/lib/livebook/runtime/evaluator/object_tracker.ex @@ -18,6 +18,8 @@ defmodule Livebook.Runtime.Evaluator.ObjectTracker do use GenServer + require Logger + @type state :: %{ objects: %{ object() => %{ @@ -60,19 +62,22 @@ defmodule Livebook.Runtime.Evaluator.ObjectTracker do @doc """ Removes the given reference from all objects it is attached to. + + This function synchronously awaits all monitoring acknowledgements + in case the object is released. """ - @spec remove_reference(pid(), object_reference()) :: :ok - def remove_reference(object_tracker, reference) do - GenServer.cast(object_tracker, {:remove_reference, reference}) + @spec remove_reference_sync(pid(), object_reference()) :: :ok + def remove_reference_sync(object_tracker, reference) do + GenServer.call(object_tracker, {:remove_reference_sync, reference}) end @doc """ Schedules `payload` to be send to `destination` when the object is released. """ - @spec monitor(pid(), object(), Process.dest(), term()) :: :ok | {:error, :bad_object} - def monitor(object_tracker, object, destination, payload) do - GenServer.call(object_tracker, {:monitor, object, destination, payload}) + @spec monitor(pid(), object(), Process.dest(), term(), boolean()) :: :ok | {:error, :bad_object} + def monitor(object_tracker, object, destination, payload, ack?) do + GenServer.call(object_tracker, {:monitor, object, destination, payload, ack?}) end @impl true @@ -97,15 +102,15 @@ defmodule Livebook.Runtime.Evaluator.ObjectTracker do {:noreply, state} end - def handle_cast({:remove_reference, reference}, state) do + @impl true + def handle_call({:remove_reference_sync, reference}, _from, state) do state = update_references(state, fn references -> List.delete(references, reference) end) - {:noreply, garbage_collect(state)} + {:reply, :ok, garbage_collect(state, true)} end - @impl true - def handle_call({:monitor, object, destination, payload}, _from, state) do - monitor = {destination, payload} + def handle_call({:monitor, object, destination, payload, ack?}, _from, state) do + monitor = {destination, payload, ack?} if state.objects[object] do state = @@ -129,6 +134,10 @@ defmodule Livebook.Runtime.Evaluator.ObjectTracker do {:noreply, garbage_collect(state)} end + def handle_info({:monitor_ack, :ignored}, state) do + {:noreply, state} + end + # Updates references for every object with the given function defp update_references(state, fun) do update_in(state.objects, fn objects -> @@ -138,11 +147,39 @@ defmodule Livebook.Runtime.Evaluator.ObjectTracker do end) end - defp garbage_collect(state) do + defp garbage_collect(state, sync \\ false) do {to_release, objects} = Enum.split_with(state.objects, &match?({_, %{references: []}}, &1)) - for {_, %{monitors: monitors}} <- to_release, {dest, payload} <- monitors do - send(dest, payload) + monitors = for {_, %{monitors: monitors}} <- to_release, monitor <- monitors, do: monitor + + for {dest, payload, _ack? = false} <- monitors, do: send(dest, payload) + + if sync do + ack_refs = + for {dest, payload, _ack? = true} <- monitors do + ack_ref = Process.monitor(dest) + send(dest, {payload, self(), {:monitor_ack, ack_ref}}) + ack_ref + end + + for ack_ref <- ack_refs do + receive do + {:monitor_ack, ^ack_ref} -> + Process.demonitor(ack_ref, [:flush]) + + {:DOWN, ^ack_ref, :process, _pid, _reason} -> + :ok + after + 5_000 -> + Logger.warning( + "expected a monitoring acknowledgement, but none was received within 5 seconds" + ) + end + end + else + for {dest, payload, true} <- monitors do + send(dest, {payload, self(), {:monitor_ack, :ignored}}) + end end %{state | objects: Map.new(objects)} diff --git a/test/livebook/runtime/evaluator/object_tracker_test.exs b/test/livebook/runtime/evaluator/object_tracker_test.exs index 12c542b20..037a86bbb 100644 --- a/test/livebook/runtime/evaluator/object_tracker_test.exs +++ b/test/livebook/runtime/evaluator/object_tracker_test.exs @@ -8,10 +8,10 @@ defmodule Livebook.Runtime.Evaluator.ObjecTrackerTest do %{object_tracker: object_tracker} end - test "monitor/4 returns an error when the given object doesn't exist", + test "monitor/5 returns an error when the given object doesn't exist", %{object_tracker: object_tracker} do assert {:error, :bad_object} = - ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released) + ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released, false) end test "sends scheduled monitor messages when all object references are released", @@ -19,10 +19,10 @@ defmodule Livebook.Runtime.Evaluator.ObjecTrackerTest do ObjectTracker.add_reference(object_tracker, :object1, {self(), :ref1}) ObjectTracker.add_reference(object_tracker, :object1, {self(), :ref2}) - ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released) + ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released, false) - ObjectTracker.remove_reference(object_tracker, {self(), :ref1}) - ObjectTracker.remove_reference(object_tracker, {self(), :ref2}) + ObjectTracker.remove_reference_sync(object_tracker, {self(), :ref1}) + ObjectTracker.remove_reference_sync(object_tracker, {self(), :ref2}) assert_receive :object1_released end @@ -32,13 +32,35 @@ defmodule Livebook.Runtime.Evaluator.ObjecTrackerTest do ObjectTracker.add_reference(object_tracker, :object1, {self(), :ref1}) ObjectTracker.add_reference(object_tracker, :object1, {self(), :ref2}) - ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released) + ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released, false) - ObjectTracker.remove_reference(object_tracker, {self(), :ref1}) + ObjectTracker.remove_reference_sync(object_tracker, {self(), :ref1}) refute_receive :object1_released end + test "remove_reference_sync/2 awaits for monitor acknowledgements", + %{object_tracker: object_tracker} do + ObjectTracker.add_reference(object_tracker, :object1, {self(), :ref1}) + + ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released1, true) + ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released2, false) + + myself = self() + + spawn_link(fn -> + ObjectTracker.remove_reference_sync(object_tracker, {myself, :ref1}) + send(myself, :removed) + end) + + assert_receive {:object1_released1, ^object_tracker, reply_as} + assert_receive :object1_released2 + + refute_receive :removed + send(object_tracker, reply_as) + assert_receive :removed + end + test "removes a reference if its process terminates", %{object_tracker: object_tracker} do reference_pid = spawn(fn -> @@ -49,7 +71,7 @@ defmodule Livebook.Runtime.Evaluator.ObjecTrackerTest do ObjectTracker.add_reference(object_tracker, :object1, {reference_pid, :ref1}) - ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released) + ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released, false) send(reference_pid, :stop) assert_receive :object1_released