From d2344a5c3bbb3356ebd27a144f9c824d930e4469 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Valim?= Date: Sun, 30 Jan 2022 12:47:50 +0100 Subject: [PATCH] Improve memory tracking and address race conditions (#957) 1. Allow multiple sessions to close or disconnect at once, and then request the system resources to be updated 2. Make sure that closing a session happens synchronously, otherwise a race condition could still list the closed session in a dashboard 3. Ensure close and disconnect can happen more than once (for example if two users click the same button at the same time) --- lib/livebook/session.ex | 86 +++++++++++-------- lib/livebook/system_resources.ex | 28 +++++- .../live/home_live/edit_sessions_component.ex | 6 +- .../live/home_live/session_list_component.ex | 13 +-- test/livebook/session_test.exs | 5 ++ 5 files changed, 92 insertions(+), 46 deletions(-) diff --git a/lib/livebook/session.ex b/lib/livebook/session.ex index e23e84eac..9cac531d5 100644 --- a/lib/livebook/session.ex +++ b/lib/livebook/session.ex @@ -55,6 +55,8 @@ defmodule Livebook.Session do alias Livebook.Users.User alias Livebook.Notebook.{Cell, Section} + @timeout :infinity + @type t :: %__MODULE__{ id: id(), pid: pid(), @@ -122,7 +124,7 @@ defmodule Livebook.Session do """ @spec get_by_pid(pid()) :: Session.t() def get_by_pid(pid) do - GenServer.call(pid, :describe_self) + GenServer.call(pid, :describe_self, @timeout) end @doc """ @@ -136,7 +138,7 @@ defmodule Livebook.Session do """ @spec register_client(pid(), pid(), User.t()) :: Data.t() def register_client(pid, client_pid, user) do - GenServer.call(pid, {:register_client, client_pid, user}) + GenServer.call(pid, {:register_client, client_pid, user}, @timeout) end @doc """ @@ -144,7 +146,7 @@ defmodule Livebook.Session do """ @spec get_data(pid()) :: Data.t() def get_data(pid) do - GenServer.call(pid, :get_data) + GenServer.call(pid, :get_data, @timeout) end @doc """ @@ -152,7 +154,7 @@ defmodule Livebook.Session do """ @spec get_notebook(pid()) :: Notebook.t() def get_notebook(pid) do - GenServer.call(pid, :get_notebook) + GenServer.call(pid, :get_notebook, @timeout) end @doc """ @@ -171,7 +173,7 @@ defmodule Livebook.Session do :ok else with {:ok, runtime, archive_path} <- - GenServer.call(pid, {:get_runtime_and_archive_path, hash}) do + GenServer.call(pid, {:get_runtime_and_archive_path, hash}, @timeout) do fun = fn -> # Make sure the file hasn't been fetched by this point unless File.exists?(local_assets_path) do @@ -390,16 +392,6 @@ defmodule Livebook.Session do GenServer.cast(pid, {:connect_runtime, self(), runtime}) end - @doc """ - Disconnects from the current runtime. - - Note that this results in clearing the evaluation state. - """ - @spec disconnect_runtime(pid()) :: :ok - def disconnect_runtime(pid) do - GenServer.cast(pid, {:disconnect_runtime, self()}) - end - @doc """ Sends file location update request to the server. """ @@ -426,18 +418,38 @@ defmodule Livebook.Session do """ @spec save_sync(pid()) :: :ok def save_sync(pid) do - GenServer.call(pid, :save_sync) + GenServer.call(pid, :save_sync, @timeout) end @doc """ - Sends a close request to the server. + Closes one or more sessions. This results in saving the file and broadcasting a :closed message to the session topic. """ - @spec close(pid()) :: :ok + @spec close(pid() | [pid()]) :: :ok def close(pid) do - GenServer.cast(pid, :close) + _ = call_many(List.wrap(pid), :close) + Livebook.SystemResources.update() + :ok + end + + @doc """ + Disconnects one or more sessions from the current runtime. + + Note that this results in clearing the evaluation state. + """ + @spec disconnect_runtime(pid() | [pid()]) :: :ok + def disconnect_runtime(pid) do + _ = call_many(List.wrap(pid), {:disconnect_runtime, self()}) + Livebook.SystemResources.update() + :ok + end + + defp call_many(list, request) do + list + |> Enum.map(&:gen_server.send_request(&1, request)) + |> Enum.map(&:gen_server.wait_response(&1, :infinity)) end ## Callbacks @@ -573,6 +585,23 @@ defmodule Livebook.Session do {:reply, :ok, maybe_save_notebook_sync(state)} end + def handle_call(:close, _from, state) do + maybe_save_notebook_sync(state) + broadcast_message(state.session_id, :session_closed) + + {:stop, :shutdown, :ok, state} + end + + def handle_call({:disconnect_runtime, client_pid}, _from, state) do + if old_runtime = state.data.runtime do + Runtime.disconnect(old_runtime) + end + + {:reply, :ok, + %{state | runtime_monitor_ref: nil} + |> handle_operation({:set_runtime, client_pid, nil})} + end + @impl true def handle_cast({:set_notebook_attributes, client_pid, attrs}, state) do operation = {:set_notebook_attributes, client_pid, attrs} @@ -708,8 +737,8 @@ defmodule Livebook.Session do end def handle_cast({:connect_runtime, client_pid, runtime}, state) do - if state.data.runtime do - Runtime.disconnect(state.data.runtime) + if old_runtime = state.data.runtime do + Runtime.disconnect(old_runtime) end runtime_monitor_ref = Runtime.connect(runtime) @@ -719,14 +748,6 @@ defmodule Livebook.Session do |> handle_operation({:set_runtime, client_pid, runtime})} end - def handle_cast({:disconnect_runtime, client_pid}, state) do - Runtime.disconnect(state.data.runtime) - - {:noreply, - %{state | runtime_monitor_ref: nil} - |> handle_operation({:set_runtime, client_pid, nil})} - end - def handle_cast({:set_file, client_pid, file}, state) do if file do FileGuard.lock(file, self()) @@ -751,13 +772,6 @@ defmodule Livebook.Session do {:noreply, maybe_save_notebook_async(state)} end - def handle_cast(:close, state) do - maybe_save_notebook_sync(state) - broadcast_message(state.session_id, :session_closed) - - {:stop, :normal, state} - end - @impl true def handle_info({:DOWN, ref, :process, _, _}, %{runtime_monitor_ref: ref} = state) do broadcast_info(state.session_id, "runtime node terminated unexpectedly") diff --git a/lib/livebook/system_resources.ex b/lib/livebook/system_resources.ex index 6a7678dd9..1f0e00727 100644 --- a/lib/livebook/system_resources.ex +++ b/lib/livebook/system_resources.ex @@ -6,11 +6,22 @@ defmodule Livebook.SystemResources do use GenServer @name __MODULE__ + @doc """ + Returns system memory. + """ @spec memory() :: memory() def memory do :ets.lookup_element(@name, :memory, 2) end + @doc """ + Updates the resources kept by this process. + """ + @spec update() :: :ok + def update do + GenServer.cast(@name, :update) + end + @doc false def start_link(_opts) do GenServer.start_link(__MODULE__, :ok, name: @name) @@ -19,19 +30,30 @@ defmodule Livebook.SystemResources do @impl true def init(:ok) do :ets.new(@name, [:set, :named_table, :protected]) - measure_and_schedule() + measure() + schedule() {:ok, %{}} end @impl true def handle_info(:measure, state) do - measure_and_schedule() + measure() + schedule() {:noreply, state} end - defp measure_and_schedule() do + @impl true + def handle_cast(:update, state) do + measure() + {:noreply, state} + end + + defp measure() do memory = :memsup.get_system_memory_data() :ets.insert(@name, {:memory, %{total: memory[:total_memory], free: memory[:free_memory]}}) + end + + defp schedule() do Process.send_after(self(), :measure, 15000) end end diff --git a/lib/livebook_web/live/home_live/edit_sessions_component.ex b/lib/livebook_web/live/home_live/edit_sessions_component.ex index 43e45d7fe..c461c92ea 100644 --- a/lib/livebook_web/live/home_live/edit_sessions_component.ex +++ b/lib/livebook_web/live/home_live/edit_sessions_component.ex @@ -51,7 +51,8 @@ defmodule LivebookWeb.HomeLive.EditSessionsComponent do @impl true def handle_event("close_all", %{}, socket) do socket.assigns.selected_sessions - |> Enum.each(&Livebook.Session.close(&1.pid)) + |> Enum.map(& &1.pid) + |> Livebook.Session.close() {:noreply, push_patch(socket, to: socket.assigns.return_to, replace: true)} end @@ -59,7 +60,8 @@ defmodule LivebookWeb.HomeLive.EditSessionsComponent do def handle_event("disconnect", %{}, socket) do socket.assigns.selected_sessions |> Enum.reject(&(&1.memory_usage.runtime == nil)) - |> Enum.each(&Livebook.Session.disconnect_runtime(&1.pid)) + |> Enum.map(& &1.pid) + |> Livebook.Session.disconnect_runtime() {:noreply, push_patch(socket, to: socket.assigns.return_to, replace: true)} end diff --git a/lib/livebook_web/live/home_live/session_list_component.ex b/lib/livebook_web/live/home_live/session_list_component.ex index 68b1d8869..fe6f7d372 100644 --- a/lib/livebook_web/live/home_live/session_list_component.ex +++ b/lib/livebook_web/live/home_live/session_list_component.ex @@ -6,7 +6,7 @@ defmodule LivebookWeb.HomeLive.SessionListComponent do @impl true def mount(socket) do - {:ok, assign(socket, order_by: "date")} + {:ok, assign(socket, order_by: "date"), temporary_assigns: [memory: nil]} end @impl true @@ -24,7 +24,11 @@ defmodule LivebookWeb.HomeLive.SessionListComponent do socket = socket |> assign(assigns) - |> assign(sessions: sessions, show_autosave_note?: show_autosave_note?) + |> assign( + sessions: sessions, + show_autosave_note?: show_autosave_note?, + memory: Livebook.SystemResources.memory() + ) {:ok, socket} end @@ -40,7 +44,7 @@ defmodule LivebookWeb.HomeLive.SessionListComponent do
- <.memory_info /> + <.memory_info memory={@memory} /> <%= if @sessions != [] do %> <.edit_sessions sessions={@sessions} socket={@socket}/> <% end %> @@ -170,8 +174,7 @@ defmodule LivebookWeb.HomeLive.SessionListComponent do """ end - defp memory_info(assigns) do - %{free: free, total: total} = Livebook.SystemResources.memory() + defp memory_info(%{memory: %{free: free, total: total}} = assigns) do used = total - free percentage = Float.round(used / total * 100, 2) assigns = assign(assigns, free: free, used: used, total: total, percentage: percentage) diff --git a/test/livebook/session_test.exs b/test/livebook/session_test.exs index 9409c1494..5facc7b7b 100644 --- a/test/livebook/session_test.exs +++ b/test/livebook/session_test.exs @@ -195,7 +195,9 @@ defmodule Livebook.SessionTest do {:ok, runtime} = Livebook.Runtime.Embedded.init() Session.connect_runtime(session.pid, runtime) + # Calling twice can happen in a race, make sure it doesn't crash Session.disconnect_runtime(session.pid) + Session.disconnect_runtime([session.pid]) assert_receive {:operation, {:set_runtime, ^pid, nil}} end @@ -328,7 +330,10 @@ defmodule Livebook.SessionTest do assert {:ok, false} = FileSystem.File.exists?(file) Process.flag(:trap_exit, true) + + # Calling twice can happen in a race, make sure it doesn't crash Session.close(session.pid) + Session.close([session.pid]) assert_receive :session_closed assert {:ok, "# My notebook\n" <> _rest} = FileSystem.File.read(file)