Wait for processes to be garbage collected before evaluation (#1385)

This commit is contained in:
Jonatan Kłosko 2022-09-02 23:45:58 +02:00 committed by GitHub
parent a0901ada01
commit eeadb0811b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 91 additions and 25 deletions

View file

@ -308,7 +308,7 @@ defmodule Livebook.Runtime.Evaluator do
end
defp handle_cast({:evaluate_code, code, ref, base_ref, opts}, state) do
Evaluator.ObjectTracker.remove_reference(state.object_tracker, {self(), ref})
Evaluator.ObjectTracker.remove_reference_sync(state.object_tracker, {self(), ref})
context = get_context(state, base_ref)
file = Keyword.get(opts, :file, "nofile")
@ -357,7 +357,7 @@ defmodule Livebook.Runtime.Evaluator do
defp handle_cast({:forget_evaluation, ref}, state) do
state = delete_context(state, ref)
Evaluator.ObjectTracker.remove_reference(state.object_tracker, {self(), ref})
Evaluator.ObjectTracker.remove_reference_sync(state.object_tracker, {self(), ref})
:erlang.garbage_collect(self())
{:noreply, state}

View file

@ -221,8 +221,15 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
{:ok, state}
end
# Used until Kino v0.7
defp io_request({:livebook_monitor_object, object, destination, payload}, state) do
reply = Evaluator.ObjectTracker.monitor(state.object_tracker, object, destination, payload)
io_request({:livebook_monitor_object, object, destination, payload, false}, state)
end
defp io_request({:livebook_monitor_object, object, destination, payload, ack?}, state) do
reply =
Evaluator.ObjectTracker.monitor(state.object_tracker, object, destination, payload, ack?)
{reply, state}
end

View file

@ -18,6 +18,8 @@ defmodule Livebook.Runtime.Evaluator.ObjectTracker do
use GenServer
require Logger
@type state :: %{
objects: %{
object() => %{
@ -60,19 +62,22 @@ defmodule Livebook.Runtime.Evaluator.ObjectTracker do
@doc """
Removes the given reference from all objects it is attached to.
This function synchronously awaits all monitoring acknowledgements
in case the object is released.
"""
@spec remove_reference(pid(), object_reference()) :: :ok
def remove_reference(object_tracker, reference) do
GenServer.cast(object_tracker, {:remove_reference, reference})
@spec remove_reference_sync(pid(), object_reference()) :: :ok
def remove_reference_sync(object_tracker, reference) do
GenServer.call(object_tracker, {:remove_reference_sync, reference})
end
@doc """
Schedules `payload` to be send to `destination` when the object
is released.
"""
@spec monitor(pid(), object(), Process.dest(), term()) :: :ok | {:error, :bad_object}
def monitor(object_tracker, object, destination, payload) do
GenServer.call(object_tracker, {:monitor, object, destination, payload})
@spec monitor(pid(), object(), Process.dest(), term(), boolean()) :: :ok | {:error, :bad_object}
def monitor(object_tracker, object, destination, payload, ack?) do
GenServer.call(object_tracker, {:monitor, object, destination, payload, ack?})
end
@impl true
@ -97,15 +102,15 @@ defmodule Livebook.Runtime.Evaluator.ObjectTracker do
{:noreply, state}
end
def handle_cast({:remove_reference, reference}, state) do
@impl true
def handle_call({:remove_reference_sync, reference}, _from, state) do
state = update_references(state, fn references -> List.delete(references, reference) end)
{:noreply, garbage_collect(state)}
{:reply, :ok, garbage_collect(state, true)}
end
@impl true
def handle_call({:monitor, object, destination, payload}, _from, state) do
monitor = {destination, payload}
def handle_call({:monitor, object, destination, payload, ack?}, _from, state) do
monitor = {destination, payload, ack?}
if state.objects[object] do
state =
@ -129,6 +134,10 @@ defmodule Livebook.Runtime.Evaluator.ObjectTracker do
{:noreply, garbage_collect(state)}
end
def handle_info({:monitor_ack, :ignored}, state) do
{:noreply, state}
end
# Updates references for every object with the given function
defp update_references(state, fun) do
update_in(state.objects, fn objects ->
@ -138,11 +147,39 @@ defmodule Livebook.Runtime.Evaluator.ObjectTracker do
end)
end
defp garbage_collect(state) do
defp garbage_collect(state, sync \\ false) do
{to_release, objects} = Enum.split_with(state.objects, &match?({_, %{references: []}}, &1))
for {_, %{monitors: monitors}} <- to_release, {dest, payload} <- monitors do
send(dest, payload)
monitors = for {_, %{monitors: monitors}} <- to_release, monitor <- monitors, do: monitor
for {dest, payload, _ack? = false} <- monitors, do: send(dest, payload)
if sync do
ack_refs =
for {dest, payload, _ack? = true} <- monitors do
ack_ref = Process.monitor(dest)
send(dest, {payload, self(), {:monitor_ack, ack_ref}})
ack_ref
end
for ack_ref <- ack_refs do
receive do
{:monitor_ack, ^ack_ref} ->
Process.demonitor(ack_ref, [:flush])
{:DOWN, ^ack_ref, :process, _pid, _reason} ->
:ok
after
5_000 ->
Logger.warning(
"expected a monitoring acknowledgement, but none was received within 5 seconds"
)
end
end
else
for {dest, payload, true} <- monitors do
send(dest, {payload, self(), {:monitor_ack, :ignored}})
end
end
%{state | objects: Map.new(objects)}

View file

@ -8,10 +8,10 @@ defmodule Livebook.Runtime.Evaluator.ObjecTrackerTest do
%{object_tracker: object_tracker}
end
test "monitor/4 returns an error when the given object doesn't exist",
test "monitor/5 returns an error when the given object doesn't exist",
%{object_tracker: object_tracker} do
assert {:error, :bad_object} =
ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released)
ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released, false)
end
test "sends scheduled monitor messages when all object references are released",
@ -19,10 +19,10 @@ defmodule Livebook.Runtime.Evaluator.ObjecTrackerTest do
ObjectTracker.add_reference(object_tracker, :object1, {self(), :ref1})
ObjectTracker.add_reference(object_tracker, :object1, {self(), :ref2})
ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released)
ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released, false)
ObjectTracker.remove_reference(object_tracker, {self(), :ref1})
ObjectTracker.remove_reference(object_tracker, {self(), :ref2})
ObjectTracker.remove_reference_sync(object_tracker, {self(), :ref1})
ObjectTracker.remove_reference_sync(object_tracker, {self(), :ref2})
assert_receive :object1_released
end
@ -32,13 +32,35 @@ defmodule Livebook.Runtime.Evaluator.ObjecTrackerTest do
ObjectTracker.add_reference(object_tracker, :object1, {self(), :ref1})
ObjectTracker.add_reference(object_tracker, :object1, {self(), :ref2})
ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released)
ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released, false)
ObjectTracker.remove_reference(object_tracker, {self(), :ref1})
ObjectTracker.remove_reference_sync(object_tracker, {self(), :ref1})
refute_receive :object1_released
end
test "remove_reference_sync/2 awaits for monitor acknowledgements",
%{object_tracker: object_tracker} do
ObjectTracker.add_reference(object_tracker, :object1, {self(), :ref1})
ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released1, true)
ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released2, false)
myself = self()
spawn_link(fn ->
ObjectTracker.remove_reference_sync(object_tracker, {myself, :ref1})
send(myself, :removed)
end)
assert_receive {:object1_released1, ^object_tracker, reply_as}
assert_receive :object1_released2
refute_receive :removed
send(object_tracker, reply_as)
assert_receive :removed
end
test "removes a reference if its process terminates", %{object_tracker: object_tracker} do
reference_pid =
spawn(fn ->
@ -49,7 +71,7 @@ defmodule Livebook.Runtime.Evaluator.ObjecTrackerTest do
ObjectTracker.add_reference(object_tracker, :object1, {reference_pid, :ref1})
ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released)
ObjectTracker.monitor(object_tracker, :object1, self(), :object1_released, false)
send(reference_pid, :stop)
assert_receive :object1_released