mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-01-01 12:41:43 +08:00
Improve memory tracking and address race conditions (#957)
1. Allow multiple sessions to close or disconnect at once, and then request the system resources to be updated 2. Make sure that closing a session happens synchronously, otherwise a race condition could still list the closed session in a dashboard 3. Ensure close and disconnect can happen more than once (for example if two users click the same button at the same time)
This commit is contained in:
parent
b81c7f55ec
commit
d2344a5c3b
5 changed files with 92 additions and 46 deletions
|
@ -55,6 +55,8 @@ defmodule Livebook.Session do
|
|||
alias Livebook.Users.User
|
||||
alias Livebook.Notebook.{Cell, Section}
|
||||
|
||||
@timeout :infinity
|
||||
|
||||
@type t :: %__MODULE__{
|
||||
id: id(),
|
||||
pid: pid(),
|
||||
|
@ -122,7 +124,7 @@ defmodule Livebook.Session do
|
|||
"""
|
||||
@spec get_by_pid(pid()) :: Session.t()
|
||||
def get_by_pid(pid) do
|
||||
GenServer.call(pid, :describe_self)
|
||||
GenServer.call(pid, :describe_self, @timeout)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -136,7 +138,7 @@ defmodule Livebook.Session do
|
|||
"""
|
||||
@spec register_client(pid(), pid(), User.t()) :: Data.t()
|
||||
def register_client(pid, client_pid, user) do
|
||||
GenServer.call(pid, {:register_client, client_pid, user})
|
||||
GenServer.call(pid, {:register_client, client_pid, user}, @timeout)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -144,7 +146,7 @@ defmodule Livebook.Session do
|
|||
"""
|
||||
@spec get_data(pid()) :: Data.t()
|
||||
def get_data(pid) do
|
||||
GenServer.call(pid, :get_data)
|
||||
GenServer.call(pid, :get_data, @timeout)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -152,7 +154,7 @@ defmodule Livebook.Session do
|
|||
"""
|
||||
@spec get_notebook(pid()) :: Notebook.t()
|
||||
def get_notebook(pid) do
|
||||
GenServer.call(pid, :get_notebook)
|
||||
GenServer.call(pid, :get_notebook, @timeout)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -171,7 +173,7 @@ defmodule Livebook.Session do
|
|||
:ok
|
||||
else
|
||||
with {:ok, runtime, archive_path} <-
|
||||
GenServer.call(pid, {:get_runtime_and_archive_path, hash}) do
|
||||
GenServer.call(pid, {:get_runtime_and_archive_path, hash}, @timeout) do
|
||||
fun = fn ->
|
||||
# Make sure the file hasn't been fetched by this point
|
||||
unless File.exists?(local_assets_path) do
|
||||
|
@ -390,16 +392,6 @@ defmodule Livebook.Session do
|
|||
GenServer.cast(pid, {:connect_runtime, self(), runtime})
|
||||
end
|
||||
|
||||
@doc """
|
||||
Disconnects from the current runtime.
|
||||
|
||||
Note that this results in clearing the evaluation state.
|
||||
"""
|
||||
@spec disconnect_runtime(pid()) :: :ok
|
||||
def disconnect_runtime(pid) do
|
||||
GenServer.cast(pid, {:disconnect_runtime, self()})
|
||||
end
|
||||
|
||||
@doc """
|
||||
Sends file location update request to the server.
|
||||
"""
|
||||
|
@ -426,18 +418,38 @@ defmodule Livebook.Session do
|
|||
"""
|
||||
@spec save_sync(pid()) :: :ok
|
||||
def save_sync(pid) do
|
||||
GenServer.call(pid, :save_sync)
|
||||
GenServer.call(pid, :save_sync, @timeout)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Sends a close request to the server.
|
||||
Closes one or more sessions.
|
||||
|
||||
This results in saving the file and broadcasting
|
||||
a :closed message to the session topic.
|
||||
"""
|
||||
@spec close(pid()) :: :ok
|
||||
@spec close(pid() | [pid()]) :: :ok
|
||||
def close(pid) do
|
||||
GenServer.cast(pid, :close)
|
||||
_ = call_many(List.wrap(pid), :close)
|
||||
Livebook.SystemResources.update()
|
||||
:ok
|
||||
end
|
||||
|
||||
@doc """
|
||||
Disconnects one or more sessions from the current runtime.
|
||||
|
||||
Note that this results in clearing the evaluation state.
|
||||
"""
|
||||
@spec disconnect_runtime(pid() | [pid()]) :: :ok
|
||||
def disconnect_runtime(pid) do
|
||||
_ = call_many(List.wrap(pid), {:disconnect_runtime, self()})
|
||||
Livebook.SystemResources.update()
|
||||
:ok
|
||||
end
|
||||
|
||||
defp call_many(list, request) do
|
||||
list
|
||||
|> Enum.map(&:gen_server.send_request(&1, request))
|
||||
|> Enum.map(&:gen_server.wait_response(&1, :infinity))
|
||||
end
|
||||
|
||||
## Callbacks
|
||||
|
@ -573,6 +585,23 @@ defmodule Livebook.Session do
|
|||
{:reply, :ok, maybe_save_notebook_sync(state)}
|
||||
end
|
||||
|
||||
def handle_call(:close, _from, state) do
|
||||
maybe_save_notebook_sync(state)
|
||||
broadcast_message(state.session_id, :session_closed)
|
||||
|
||||
{:stop, :shutdown, :ok, state}
|
||||
end
|
||||
|
||||
def handle_call({:disconnect_runtime, client_pid}, _from, state) do
|
||||
if old_runtime = state.data.runtime do
|
||||
Runtime.disconnect(old_runtime)
|
||||
end
|
||||
|
||||
{:reply, :ok,
|
||||
%{state | runtime_monitor_ref: nil}
|
||||
|> handle_operation({:set_runtime, client_pid, nil})}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast({:set_notebook_attributes, client_pid, attrs}, state) do
|
||||
operation = {:set_notebook_attributes, client_pid, attrs}
|
||||
|
@ -708,8 +737,8 @@ defmodule Livebook.Session do
|
|||
end
|
||||
|
||||
def handle_cast({:connect_runtime, client_pid, runtime}, state) do
|
||||
if state.data.runtime do
|
||||
Runtime.disconnect(state.data.runtime)
|
||||
if old_runtime = state.data.runtime do
|
||||
Runtime.disconnect(old_runtime)
|
||||
end
|
||||
|
||||
runtime_monitor_ref = Runtime.connect(runtime)
|
||||
|
@ -719,14 +748,6 @@ defmodule Livebook.Session do
|
|||
|> handle_operation({:set_runtime, client_pid, runtime})}
|
||||
end
|
||||
|
||||
def handle_cast({:disconnect_runtime, client_pid}, state) do
|
||||
Runtime.disconnect(state.data.runtime)
|
||||
|
||||
{:noreply,
|
||||
%{state | runtime_monitor_ref: nil}
|
||||
|> handle_operation({:set_runtime, client_pid, nil})}
|
||||
end
|
||||
|
||||
def handle_cast({:set_file, client_pid, file}, state) do
|
||||
if file do
|
||||
FileGuard.lock(file, self())
|
||||
|
@ -751,13 +772,6 @@ defmodule Livebook.Session do
|
|||
{:noreply, maybe_save_notebook_async(state)}
|
||||
end
|
||||
|
||||
def handle_cast(:close, state) do
|
||||
maybe_save_notebook_sync(state)
|
||||
broadcast_message(state.session_id, :session_closed)
|
||||
|
||||
{:stop, :normal, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info({:DOWN, ref, :process, _, _}, %{runtime_monitor_ref: ref} = state) do
|
||||
broadcast_info(state.session_id, "runtime node terminated unexpectedly")
|
||||
|
|
|
@ -6,11 +6,22 @@ defmodule Livebook.SystemResources do
|
|||
use GenServer
|
||||
@name __MODULE__
|
||||
|
||||
@doc """
|
||||
Returns system memory.
|
||||
"""
|
||||
@spec memory() :: memory()
|
||||
def memory do
|
||||
:ets.lookup_element(@name, :memory, 2)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Updates the resources kept by this process.
|
||||
"""
|
||||
@spec update() :: :ok
|
||||
def update do
|
||||
GenServer.cast(@name, :update)
|
||||
end
|
||||
|
||||
@doc false
|
||||
def start_link(_opts) do
|
||||
GenServer.start_link(__MODULE__, :ok, name: @name)
|
||||
|
@ -19,19 +30,30 @@ defmodule Livebook.SystemResources do
|
|||
@impl true
|
||||
def init(:ok) do
|
||||
:ets.new(@name, [:set, :named_table, :protected])
|
||||
measure_and_schedule()
|
||||
measure()
|
||||
schedule()
|
||||
{:ok, %{}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:measure, state) do
|
||||
measure_and_schedule()
|
||||
measure()
|
||||
schedule()
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp measure_and_schedule() do
|
||||
@impl true
|
||||
def handle_cast(:update, state) do
|
||||
measure()
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp measure() do
|
||||
memory = :memsup.get_system_memory_data()
|
||||
:ets.insert(@name, {:memory, %{total: memory[:total_memory], free: memory[:free_memory]}})
|
||||
end
|
||||
|
||||
defp schedule() do
|
||||
Process.send_after(self(), :measure, 15000)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -51,7 +51,8 @@ defmodule LivebookWeb.HomeLive.EditSessionsComponent do
|
|||
@impl true
|
||||
def handle_event("close_all", %{}, socket) do
|
||||
socket.assigns.selected_sessions
|
||||
|> Enum.each(&Livebook.Session.close(&1.pid))
|
||||
|> Enum.map(& &1.pid)
|
||||
|> Livebook.Session.close()
|
||||
|
||||
{:noreply, push_patch(socket, to: socket.assigns.return_to, replace: true)}
|
||||
end
|
||||
|
@ -59,7 +60,8 @@ defmodule LivebookWeb.HomeLive.EditSessionsComponent do
|
|||
def handle_event("disconnect", %{}, socket) do
|
||||
socket.assigns.selected_sessions
|
||||
|> Enum.reject(&(&1.memory_usage.runtime == nil))
|
||||
|> Enum.each(&Livebook.Session.disconnect_runtime(&1.pid))
|
||||
|> Enum.map(& &1.pid)
|
||||
|> Livebook.Session.disconnect_runtime()
|
||||
|
||||
{:noreply, push_patch(socket, to: socket.assigns.return_to, replace: true)}
|
||||
end
|
||||
|
|
|
@ -6,7 +6,7 @@ defmodule LivebookWeb.HomeLive.SessionListComponent do
|
|||
|
||||
@impl true
|
||||
def mount(socket) do
|
||||
{:ok, assign(socket, order_by: "date")}
|
||||
{:ok, assign(socket, order_by: "date"), temporary_assigns: [memory: nil]}
|
||||
end
|
||||
|
||||
@impl true
|
||||
|
@ -24,7 +24,11 @@ defmodule LivebookWeb.HomeLive.SessionListComponent do
|
|||
socket =
|
||||
socket
|
||||
|> assign(assigns)
|
||||
|> assign(sessions: sessions, show_autosave_note?: show_autosave_note?)
|
||||
|> assign(
|
||||
sessions: sessions,
|
||||
show_autosave_note?: show_autosave_note?,
|
||||
memory: Livebook.SystemResources.memory()
|
||||
)
|
||||
|
||||
{:ok, socket}
|
||||
end
|
||||
|
@ -40,7 +44,7 @@ defmodule LivebookWeb.HomeLive.SessionListComponent do
|
|||
</h2>
|
||||
</div>
|
||||
<div class="flex flex-row">
|
||||
<.memory_info />
|
||||
<.memory_info memory={@memory} />
|
||||
<%= if @sessions != [] do %>
|
||||
<.edit_sessions sessions={@sessions} socket={@socket}/>
|
||||
<% end %>
|
||||
|
@ -170,8 +174,7 @@ defmodule LivebookWeb.HomeLive.SessionListComponent do
|
|||
"""
|
||||
end
|
||||
|
||||
defp memory_info(assigns) do
|
||||
%{free: free, total: total} = Livebook.SystemResources.memory()
|
||||
defp memory_info(%{memory: %{free: free, total: total}} = assigns) do
|
||||
used = total - free
|
||||
percentage = Float.round(used / total * 100, 2)
|
||||
assigns = assign(assigns, free: free, used: used, total: total, percentage: percentage)
|
||||
|
|
|
@ -195,7 +195,9 @@ defmodule Livebook.SessionTest do
|
|||
{:ok, runtime} = Livebook.Runtime.Embedded.init()
|
||||
Session.connect_runtime(session.pid, runtime)
|
||||
|
||||
# Calling twice can happen in a race, make sure it doesn't crash
|
||||
Session.disconnect_runtime(session.pid)
|
||||
Session.disconnect_runtime([session.pid])
|
||||
|
||||
assert_receive {:operation, {:set_runtime, ^pid, nil}}
|
||||
end
|
||||
|
@ -328,7 +330,10 @@ defmodule Livebook.SessionTest do
|
|||
assert {:ok, false} = FileSystem.File.exists?(file)
|
||||
|
||||
Process.flag(:trap_exit, true)
|
||||
|
||||
# Calling twice can happen in a race, make sure it doesn't crash
|
||||
Session.close(session.pid)
|
||||
Session.close([session.pid])
|
||||
|
||||
assert_receive :session_closed
|
||||
assert {:ok, "# My notebook\n" <> _rest} = FileSystem.File.read(file)
|
||||
|
|
Loading…
Reference in a new issue