diff --git a/lib/livebook/runtime.ex b/lib/livebook/runtime.ex index f7a6d4ee8..d29bb093c 100644 --- a/lib/livebook/runtime.ex +++ b/lib/livebook/runtime.ex @@ -746,6 +746,23 @@ defprotocol Livebook.Runtime do """ @type transient_state :: %{atom() => term()} + @typedoc """ + An identifier representing a client process. + + A client is a connected user that interacts with the runtime outputs. + """ + @type client_id :: String.t() + + @typedoc """ + User information about a particular client. + """ + @type user_info :: %{ + id: String.t(), + name: String.t() | nil, + email: String.t() | nil, + source: atom() + } + @doc """ Returns relevant information about the runtime. @@ -1072,4 +1089,16 @@ defprotocol Livebook.Runtime do """ @spec restore_transient_state(t(), transient_state()) :: :ok def restore_transient_state(runtime, transient_state) + + @doc """ + Notifies the runtime about connected clients. + """ + @spec register_clients(t(), list({client_id(), user_info()})) :: :ok + def register_clients(runtime, clients) + + @doc """ + Notifies the runtime about clients leaving. + """ + @spec unregister_clients(t(), list(client_id())) :: :ok + def unregister_clients(runtime, client_ids) end diff --git a/lib/livebook/runtime/attached.ex b/lib/livebook/runtime/attached.ex index b7b5288b5..81af9945a 100644 --- a/lib/livebook/runtime/attached.ex +++ b/lib/livebook/runtime/attached.ex @@ -196,4 +196,12 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do def restore_transient_state(runtime, transient_state) do RuntimeServer.restore_transient_state(runtime.server_pid, transient_state) end + + def register_clients(runtime, clients) do + RuntimeServer.register_clients(runtime.server_pid, clients) + end + + def unregister_clients(runtime, client_ids) do + RuntimeServer.unregister_clients(runtime.server_pid, client_ids) + end end diff --git a/lib/livebook/runtime/elixir_standalone.ex b/lib/livebook/runtime/elixir_standalone.ex index f18c83933..5d5b34e58 100644 --- a/lib/livebook/runtime/elixir_standalone.ex +++ b/lib/livebook/runtime/elixir_standalone.ex @@ -194,4 +194,12 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do def restore_transient_state(runtime, transient_state) do RuntimeServer.restore_transient_state(runtime.server_pid, transient_state) end + + def register_clients(runtime, clients) do + RuntimeServer.register_clients(runtime.server_pid, clients) + end + + def unregister_clients(runtime, client_ids) do + RuntimeServer.unregister_clients(runtime.server_pid, client_ids) + end end diff --git a/lib/livebook/runtime/embedded.ex b/lib/livebook/runtime/embedded.ex index bd094d67e..06fcd6f84 100644 --- a/lib/livebook/runtime/embedded.ex +++ b/lib/livebook/runtime/embedded.ex @@ -163,6 +163,14 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do RuntimeServer.restore_transient_state(runtime.server_pid, transient_state) end + def register_clients(runtime, clients) do + RuntimeServer.register_clients(runtime.server_pid, clients) + end + + def unregister_clients(runtime, client_ids) do + RuntimeServer.unregister_clients(runtime.server_pid, client_ids) + end + defp config() do Application.get_env(:livebook, Livebook.Runtime.Embedded, []) end diff --git a/lib/livebook/runtime/erl_dist.ex b/lib/livebook/runtime/erl_dist.ex index 529ae603b..58cd2a9f7 100644 --- a/lib/livebook/runtime/erl_dist.ex +++ b/lib/livebook/runtime/erl_dist.ex @@ -27,6 +27,7 @@ defmodule Livebook.Runtime.ErlDist do Livebook.Runtime.Evaluator.IOProxy, Livebook.Runtime.Evaluator.Tracer, Livebook.Runtime.Evaluator.ObjectTracker, + Livebook.Runtime.Evaluator.ClientTracker, Livebook.Runtime.Evaluator.Formatter, Livebook.Runtime.Evaluator.Doctests, Livebook.Intellisense, diff --git a/lib/livebook/runtime/erl_dist/runtime_server.ex b/lib/livebook/runtime/erl_dist/runtime_server.ex index 0412446d4..4659c8b2a 100644 --- a/lib/livebook/runtime/erl_dist/runtime_server.ex +++ b/lib/livebook/runtime/erl_dist/runtime_server.ex @@ -301,6 +301,22 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do GenServer.cast(pid, {:restore_transient_state, transient_state}) end + @doc """ + Notifies the runtime about connected clients. + """ + @spec register_clients(pid(), list({Runtime.client_id(), Runtime.user_info()})) :: :ok + def register_clients(pid, clients) do + GenServer.cast(pid, {:register_clients, clients}) + end + + @doc """ + Notifies the runtime about clients leaving. + """ + @spec unregister_clients(pid(), list(Runtime.client_id())) :: :ok + def unregister_clients(pid, client_ids) do + GenServer.cast(pid, {:unregister_clients, client_ids}) + end + @doc """ Stops the runtime server. @@ -320,7 +336,8 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do {:ok, evaluator_supervisor} = ErlDist.EvaluatorSupervisor.start_link() {:ok, task_supervisor} = Task.Supervisor.start_link() - {:ok, object_tracker} = Livebook.Runtime.Evaluator.ObjectTracker.start_link() + {:ok, object_tracker} = Evaluator.ObjectTracker.start_link() + {:ok, client_tracker} = Evaluator.ClientTracker.start_link() {:ok, %{ @@ -330,6 +347,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do evaluator_supervisor: evaluator_supervisor, task_supervisor: task_supervisor, object_tracker: object_tracker, + client_tracker: client_tracker, smart_cell_supervisor: nil, smart_cell_gl: nil, smart_cells: %{}, @@ -656,6 +674,16 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do {:noreply, state} end + def handle_cast({:register_clients, clients}, state) do + Evaluator.ClientTracker.register_clients(state.client_tracker, clients) + {:noreply, state} + end + + def handle_cast({:unregister_clients, client_ids}, state) do + Evaluator.ClientTracker.unregister_clients(state.client_tracker, client_ids) + {:noreply, state} + end + def handle_cast({:relabel_file, file_id, new_file_id}, state) do path = file_path(state, file_id) new_path = file_path(state, new_file_id) @@ -738,6 +766,7 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do send_to: state.owner, runtime_broadcast_to: state.runtime_broadcast_to, object_tracker: state.object_tracker, + client_tracker: state.client_tracker, ebin_path: state.ebin_path, tmp_dir: evaluator_tmp_dir(state), io_proxy_registry: state.io_proxy_registry diff --git a/lib/livebook/runtime/evaluator.ex b/lib/livebook/runtime/evaluator.ex index e154b7552..3f4fefcc0 100644 --- a/lib/livebook/runtime/evaluator.ex +++ b/lib/livebook/runtime/evaluator.ex @@ -28,6 +28,7 @@ defmodule Livebook.Runtime.Evaluator do send_to: pid(), runtime_broadcast_to: pid(), object_tracker: pid(), + client_tracker: pid(), contexts: %{ref() => context()}, initial_context: context(), initial_context_version: nil | (md5 :: binary()), @@ -75,6 +76,9 @@ defmodule Livebook.Runtime.Evaluator do * `:object_tracker` - a pid of `Livebook.Runtime.Evaluator.ObjectTracker`. Required + * `:client_tracker` - a pid of `Livebook.Runtime.Evaluator.ClientTracker`. + Required + * `:runtime_broadcast_to` - the process to send runtime broadcast events to. Defaults to the value of `:send_to` @@ -266,6 +270,7 @@ defmodule Livebook.Runtime.Evaluator do send_to = Keyword.fetch!(opts, :send_to) runtime_broadcast_to = Keyword.get(opts, :runtime_broadcast_to, send_to) object_tracker = Keyword.fetch!(opts, :object_tracker) + client_tracker = Keyword.fetch!(opts, :client_tracker) ebin_path = Keyword.get(opts, :ebin_path) tmp_dir = Keyword.get(opts, :tmp_dir) io_proxy_registry = Keyword.get(opts, :io_proxy_registry) @@ -276,6 +281,7 @@ defmodule Livebook.Runtime.Evaluator do send_to, runtime_broadcast_to, object_tracker, + client_tracker, ebin_path, tmp_dir, io_proxy_registry @@ -309,6 +315,7 @@ defmodule Livebook.Runtime.Evaluator do send_to: send_to, runtime_broadcast_to: runtime_broadcast_to, object_tracker: object_tracker, + client_tracker: client_tracker, contexts: %{}, initial_context: context, initial_context_version: nil, diff --git a/lib/livebook/runtime/evaluator/client_tracker.ex b/lib/livebook/runtime/evaluator/client_tracker.ex new file mode 100644 index 000000000..90aa27401 --- /dev/null +++ b/lib/livebook/runtime/evaluator/client_tracker.ex @@ -0,0 +1,85 @@ +defmodule Livebook.Runtime.Evaluator.ClientTracker do + # Keeps track of connected clients as reported to the runtime by the + # owner. + # + # Sends events to processes monitoring clients presence. + + use GenServer + + alias Livebook.Runtime + + @doc """ + Starts a new client tracker. + """ + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(_opts \\ []) do + GenServer.start_link(__MODULE__, {}) + end + + @doc """ + Registeres new connected clients. + """ + @spec register_clients(pid(), list({Runtime.client_id(), Runtime.user_info()})) :: :ok + def register_clients(client_tracker, clients) do + GenServer.cast(client_tracker, {:register_clients, clients}) + end + + @doc """ + Unregisteres connected clients. + """ + @spec unregister_clients(pid(), list(Runtime.client_id())) :: :ok + def unregister_clients(client_tracker, client_ids) do + GenServer.cast(client_tracker, {:unregister_clients, client_ids}) + end + + @doc """ + Subscribes the given process to client presence events. + """ + @spec monitor_clients(pid(), pid) :: :ok + def monitor_clients(client_tracker, pid) do + GenServer.cast(client_tracker, {:monitor_clients, pid}) + end + + @impl true + def init({}) do + {:ok, %{clients: %{}, subscribers: MapSet.new()}} + end + + @impl true + def handle_cast({:register_clients, clients}, state) do + for {client_id, _user_info} <- clients, pid <- state.subscribers do + send(pid, {:client_join, client_id}) + end + + state = update_in(state.clients, &Enum.into(clients, &1)) + + {:noreply, state} + end + + def handle_cast({:unregister_clients, client_ids}, state) do + for client_id <- client_ids, pid <- state.subscribers do + send(pid, {:client_leave, client_id}) + end + + state = update_in(state.clients, &Map.drop(&1, client_ids)) + + {:noreply, state} + end + + def handle_cast({:monitor_clients, pid}, state) do + Process.monitor(pid) + state = update_in(state.subscribers, &MapSet.put(&1, pid)) + + for {client_id, _user_info} <- state.clients do + send(pid, {:client_join, client_id}) + end + + {:noreply, state} + end + + @impl true + def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + state = update_in(state.subscribers, &MapSet.delete(&1, pid)) + {:noreply, state} + end +end diff --git a/lib/livebook/runtime/evaluator/io_proxy.ex b/lib/livebook/runtime/evaluator/io_proxy.ex index ea717f355..d835dfb42 100644 --- a/lib/livebook/runtime/evaluator/io_proxy.ex +++ b/lib/livebook/runtime/evaluator/io_proxy.ex @@ -29,6 +29,7 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do pid(), pid(), pid(), + pid(), String.t() | nil, String.t() | nil, atom() | nil @@ -38,13 +39,15 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do send_to, runtime_broadcast_to, object_tracker, + client_tracker, ebin_path, tmp_dir, registry ) do GenServer.start( __MODULE__, - {evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, tmp_dir, registry} + {evaluator, send_to, runtime_broadcast_to, object_tracker, client_tracker, ebin_path, + tmp_dir, registry} ) end @@ -103,7 +106,8 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do @impl true def init( - {evaluator, send_to, runtime_broadcast_to, object_tracker, ebin_path, tmp_dir, registry} + {evaluator, send_to, runtime_broadcast_to, object_tracker, client_tracker, ebin_path, + tmp_dir, registry} ) do evaluator_monitor = Process.monitor(evaluator) @@ -124,6 +128,7 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do send_to: send_to, runtime_broadcast_to: runtime_broadcast_to, object_tracker: object_tracker, + client_tracker: client_tracker, ebin_path: ebin_path, tmp_dir: tmp_dir, tracer_info: %Evaluator.Tracer{}, @@ -392,6 +397,11 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do {result, state} end + defp io_request({:livebook_monitor_clients, pid}, state) do + Evaluator.ClientTracker.monitor_clients(state.client_tracker, pid) + {:ok, state} + end + defp io_request(_, state) do {{:error, :request}, state} end diff --git a/lib/livebook/session.ex b/lib/livebook/session.ex index a0fda852c..818a542ad 100644 --- a/lib/livebook/session.ex +++ b/lib/livebook/session.ex @@ -2085,6 +2085,14 @@ defmodule Livebook.Session do Runtime.restore_transient_state(runtime, state.data.runtime_transient_state) end + clients = + for {client_id, user_id} <- state.data.clients_map do + user = Map.fetch!(state.data.users_map, user_id) + {client_id, user_info(user)} + end + + Runtime.register_clients(runtime, clients) + %{state | runtime_monitor_ref: runtime_monitor_ref} end @@ -2198,6 +2206,10 @@ defmodule Livebook.Session do state = put_in(state.client_id_with_assets[client_id], %{}) + if Runtime.connected?(state.data.runtime) do + Runtime.register_clients(state.data.runtime, [{client_id, user_info(user)}]) + end + app_report_client_count_change(state) schedule_auto_shutdown(state) @@ -2213,6 +2225,10 @@ defmodule Livebook.Session do state = delete_client_files(state, client_id) {_, state} = pop_in(state.client_id_with_assets[client_id]) + if Runtime.connected?(state.data.runtime) do + Runtime.unregister_clients(state.data.runtime, [client_id]) + end + app_report_client_count_change(state) schedule_auto_shutdown(state) @@ -2836,13 +2852,7 @@ defmodule Livebook.Session do info = %{type: :multi_session} if user = state.started_by do - {type, _module, _key} = Livebook.Config.identity_provider() - - started_by = - user - |> Map.take([:id, :name, :email]) - |> Map.put(:source, type) - + started_by = user_info(user) Map.put(info, :started_by, started_by) else info @@ -2996,6 +3006,14 @@ defmodule Livebook.Session do end end + defp user_info(user) do + {type, _module, _key} = Livebook.Config.identity_provider() + + user + |> Map.take([:id, :name, :email]) + |> Map.put(:source, type) + end + # Normalizes output to match the most recent specification. # # Rewrites legacy output formats and adds defaults for newly introduced diff --git a/test/livebook/runtime/erl_dist/runtime_server_test.exs b/test/livebook/runtime/erl_dist/runtime_server_test.exs index 9dfa2e93e..70b08d355 100644 --- a/test/livebook/runtime/erl_dist/runtime_server_test.exs +++ b/test/livebook/runtime/erl_dist/runtime_server_test.exs @@ -340,4 +340,31 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServerTest do RuntimeServer.evaluate_code(pid, :elixir, "1 + 1", {:c1, :e1}, []) end end + + test "clients monitoring", %{pid: pid, test: test} do + # Pretend we are the subscriber + Process.register(self(), test) + + code = + """ + pid = Process.whereis(#{inspect(test)}) + + ref = make_ref() + send(Process.group_leader(), {:io_request, self(), ref, {:livebook_monitor_clients, pid}}) + + receive do + {:io_reply, ^ref, :ok} -> :ok + end + """ + + RuntimeServer.evaluate_code(pid, :elixir, code, {:c1, :e1}, []) + assert_receive {:runtime_evaluation_response, :e1, _, %{evaluation_time_ms: _time_ms}} + + clients = [{"c1", %{id: "u1", name: "Jake Peralta", email: nil, source: :session}}] + RuntimeServer.register_clients(pid, clients) + assert_receive {:client_join, "c1"} + + RuntimeServer.unregister_clients(pid, ["c1"]) + assert_receive {:client_leave, "c1"} + end end diff --git a/test/livebook/runtime/evaluator/client_tracker_test.exs b/test/livebook/runtime/evaluator/client_tracker_test.exs new file mode 100644 index 000000000..0c8c91fd9 --- /dev/null +++ b/test/livebook/runtime/evaluator/client_tracker_test.exs @@ -0,0 +1,52 @@ +defmodule Livebook.Runtime.Evaluator.ClientTrackerTest do + use ExUnit.Case, async: true + + alias Livebook.Runtime.Evaluator.ClientTracker + + setup do + {:ok, client_tracker} = start_supervised(ClientTracker) + %{client_tracker: client_tracker} + end + + test "sends client joins to monitoring processes", %{client_tracker: client_tracker} do + ClientTracker.monitor_clients(client_tracker, self()) + + clients = [{"c1", user_info()}, {"c2", user_info()}] + ClientTracker.register_clients(client_tracker, clients) + + assert_receive {:client_join, "c1"} + assert_receive {:client_join, "c2"} + end + + test "sends client leaves to monitoring processes", %{client_tracker: client_tracker} do + ClientTracker.monitor_clients(client_tracker, self()) + + clients = [{"c1", user_info()}, {"c2", user_info()}] + ClientTracker.register_clients(client_tracker, clients) + + ClientTracker.unregister_clients(client_tracker, ["c1"]) + assert_receive {:client_leave, "c1"} + + ClientTracker.unregister_clients(client_tracker, ["c2"]) + assert_receive {:client_leave, "c2"} + end + + test "sends existing client joins when monitoring starts", %{client_tracker: client_tracker} do + clients = [{"c1", user_info()}, {"c2", user_info()}] + ClientTracker.register_clients(client_tracker, clients) + + ClientTracker.monitor_clients(client_tracker, self()) + + assert_receive {:client_join, "c1"} + assert_receive {:client_join, "c2"} + end + + defp user_info() do + %{ + id: "u1", + name: "Jake Peralta", + email: nil, + source: :session + } + end +end diff --git a/test/livebook/runtime/evaluator/io_proxy_test.exs b/test/livebook/runtime/evaluator/io_proxy_test.exs index 974b0c736..16b3e8e2d 100644 --- a/test/livebook/runtime/evaluator/io_proxy_test.exs +++ b/test/livebook/runtime/evaluator/io_proxy_test.exs @@ -8,9 +8,13 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do setup do {:ok, object_tracker} = start_supervised(Evaluator.ObjectTracker) + {:ok, client_tracker} = start_supervised(Evaluator.ClientTracker) {:ok, _pid, evaluator} = - start_supervised({Evaluator, [send_to: self(), object_tracker: object_tracker]}) + start_supervised( + {Evaluator, + [send_to: self(), object_tracker: object_tracker, client_tracker: client_tracker]} + ) io = Process.info(evaluator.pid)[:group_leader] IOProxy.before_evaluation(io, :ref, "cell") diff --git a/test/livebook/runtime/evaluator/object_tracker_test.exs b/test/livebook/runtime/evaluator/object_tracker_test.exs index 037a86bbb..53db8e04c 100644 --- a/test/livebook/runtime/evaluator/object_tracker_test.exs +++ b/test/livebook/runtime/evaluator/object_tracker_test.exs @@ -1,4 +1,4 @@ -defmodule Livebook.Runtime.Evaluator.ObjecTrackerTest do +defmodule Livebook.Runtime.Evaluator.ObjectTrackerTest do use ExUnit.Case, async: true alias Livebook.Runtime.Evaluator.ObjectTracker diff --git a/test/livebook/runtime/evaluator_test.exs b/test/livebook/runtime/evaluator_test.exs index 6a9c80db8..d9def1dba 100644 --- a/test/livebook/runtime/evaluator_test.exs +++ b/test/livebook/runtime/evaluator_test.exs @@ -17,13 +17,23 @@ defmodule Livebook.Runtime.EvaluatorTest do end {:ok, object_tracker} = start_supervised(Evaluator.ObjectTracker) + {:ok, client_tracker} = start_supervised(Evaluator.ClientTracker) - {:ok, _pid, evaluator} = - start_supervised( - {Evaluator, [send_to: self(), object_tracker: object_tracker, ebin_path: ebin_path]} - ) + opts = [ + send_to: self(), + object_tracker: object_tracker, + client_tracker: client_tracker, + ebin_path: ebin_path + ] - %{evaluator: evaluator, object_tracker: object_tracker, ebin_path: ebin_path} + {:ok, _pid, evaluator} = start_supervised({Evaluator, opts}) + + %{ + evaluator: evaluator, + object_tracker: object_tracker, + client_tracker: client_tracker, + ebin_path: ebin_path + } end defmacrop metadata do @@ -1188,11 +1198,9 @@ defmodule Livebook.Runtime.EvaluatorTest do end describe "initialize_from/3" do - setup %{object_tracker: object_tracker} do - {:ok, _pid, parent_evaluator} = - start_supervised({Evaluator, [send_to: self(), object_tracker: object_tracker]}, - id: :parent_evaluator - ) + setup %{object_tracker: object_tracker, client_tracker: client_tracker} do + opts = [send_to: self(), object_tracker: object_tracker, client_tracker: client_tracker] + {:ok, _pid, parent_evaluator} = start_supervised({Evaluator, opts}, id: :parent_evaluator) %{parent_evaluator: parent_evaluator} end diff --git a/test/support/noop_runtime.ex b/test/support/noop_runtime.ex index 48285a370..eccdcb7e9 100644 --- a/test/support/noop_runtime.ex +++ b/test/support/noop_runtime.ex @@ -71,6 +71,9 @@ defmodule Livebook.Runtime.NoopRuntime do :ok end + def register_clients(_, _), do: :ok + def unregister_clients(_, _), do: :ok + defp trace(runtime, fun, args) do if runtime.trace_to do send(runtime.trace_to, {:runtime_trace, fun, args})