diff --git a/lib/livebook/session.ex b/lib/livebook/session.ex index 22baff901..6b34f4aaa 100644 --- a/lib/livebook/session.ex +++ b/lib/livebook/session.ex @@ -158,6 +158,24 @@ defmodule Livebook.Session do GenServer.call(pid, :get_notebook, @timeout) end + @doc """ + Subscribes to update in sessions list. + + ## Messages + + * `:session_closed` + * `{:session_updated, session}` + * `{:hydrate_bin_entries, entries}` + * `{:operation, operation}` + * `{:error, error}` + * `{:info, info}` + + """ + @spec subscribe(id()) :: :ok | {:error, term()} + def subscribe(session_id) do + Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session_id}") + end + @doc """ Computes the file name for download. @@ -1205,7 +1223,7 @@ defmodule Livebook.Session do defp after_operation(state, prev_state, {:client_join, _client_pid, user}) do unless Map.has_key?(prev_state.data.users_map, user.id) do - Phoenix.PubSub.subscribe(Livebook.PubSub, "users:#{user.id}") + Livebook.Users.subscribe(user.id) end state @@ -1215,7 +1233,7 @@ defmodule Livebook.Session do user_id = prev_state.data.clients_map[client_pid] unless Map.has_key?(state.data.users_map, user_id) do - Phoenix.PubSub.unsubscribe(Livebook.PubSub, "users:#{user_id}") + Livebook.Users.unsubscribe(user_id) end state diff --git a/lib/livebook/sessions.ex b/lib/livebook/sessions.ex index 52d5b38b9..d71353085 100644 --- a/lib/livebook/sessions.ex +++ b/lib/livebook/sessions.ex @@ -14,8 +14,6 @@ defmodule Livebook.Sessions do Spawns a new `Session` process with the given options. Makes the session globally visible within the session tracker. - - `{:session_created, session}` gets broadcasted under the "tracker_sessions" topic. """ @spec create_session(keyword()) :: {:ok, Session.t()} | {:error, any()} def create_session(opts \\ []) do @@ -73,11 +71,24 @@ defmodule Livebook.Sessions do @doc """ Updates the given session info across the cluster. - - `{:session_updated, session}` gets broadcasted under the "tracker_sessions" topic. """ @spec update_session(Session.t()) :: :ok | {:error, any()} def update_session(session) do Livebook.Tracker.update_session(session) end + + @doc """ + Subscribes to update in sessions list. + + ## Messages + + * `{:session_created, session}` + * `{:session_updated, session}` + * `{:session_closed, session}` + + """ + @spec subscribe() :: :ok | {:error, term()} + def subscribe() do + Phoenix.PubSub.subscribe(Livebook.PubSub, "tracker_sessions") + end end diff --git a/lib/livebook/system_resources.ex b/lib/livebook/system_resources.ex index 65a4950de..c54480840 100644 --- a/lib/livebook/system_resources.ex +++ b/lib/livebook/system_resources.ex @@ -16,6 +16,19 @@ defmodule Livebook.SystemResources do :ets.lookup_element(@name, :memory, 2) end + @doc """ + Subscribes to resource usage updates. + + ## Messages + + * `{:memory_update, memory}` + + """ + @spec subscribe() :: :ok | {:error, term()} + def subscribe() do + Phoenix.PubSub.subscribe(Livebook.PubSub, "system_resources") + end + @doc """ Updates the resources kept by this process. """ diff --git a/lib/livebook/users.ex b/lib/livebook/users.ex index c7abc3cc2..604f3e6d2 100644 --- a/lib/livebook/users.ex +++ b/lib/livebook/users.ex @@ -17,4 +17,25 @@ defmodule Livebook.Users do defp broadcast_user_message(user_id, message) do Phoenix.PubSub.broadcast(Livebook.PubSub, "users:#{user_id}", message) end + + @doc """ + Subscribes to updates in user information. + + ## Messages + + * `{:user_change, user}` + + """ + @spec subscribe(User.id()) :: :ok | {:error, term()} + def subscribe(user_id) do + Phoenix.PubSub.subscribe(Livebook.PubSub, "users:#{user_id}") + end + + @doc """ + Unsubscribes from `subscribe/1`. + """ + @spec unsubscribe(User.id()) :: :ok + def unsubscribe(user_id) do + Phoenix.PubSub.unsubscribe(Livebook.PubSub, "users:#{user_id}") + end end diff --git a/lib/livebook_web/live/home_live.ex b/lib/livebook_web/live/home_live.ex index 57ef6f5e1..3d7c41f46 100644 --- a/lib/livebook_web/live/home_live.ex +++ b/lib/livebook_web/live/home_live.ex @@ -10,8 +10,8 @@ defmodule LivebookWeb.HomeLive do @impl true def mount(params, _session, socket) do if connected?(socket) do - Phoenix.PubSub.subscribe(Livebook.PubSub, "tracker_sessions") - Phoenix.PubSub.subscribe(Livebook.PubSub, "system_resources") + Livebook.Sessions.subscribe() + Livebook.SystemResources.subscribe() end sessions = Sessions.list_sessions() diff --git a/lib/livebook_web/live/hooks/user_hook.ex b/lib/livebook_web/live/hooks/user_hook.ex index dec7b815e..797333ed3 100644 --- a/lib/livebook_web/live/hooks/user_hook.ex +++ b/lib/livebook_web/live/hooks/user_hook.ex @@ -5,7 +5,7 @@ defmodule LivebookWeb.UserHook do def on_mount(:default, _params, %{"current_user_id" => current_user_id} = session, socket) do if connected?(socket) do - Phoenix.PubSub.subscribe(Livebook.PubSub, "users:#{current_user_id}") + Livebook.Users.subscribe(current_user_id) end socket = diff --git a/lib/livebook_web/live/session_live.ex b/lib/livebook_web/live/session_live.ex index c0b391952..c00c72c44 100644 --- a/lib/livebook_web/live/session_live.ex +++ b/lib/livebook_web/live/session_live.ex @@ -19,7 +19,7 @@ defmodule LivebookWeb.SessionLive do data = if connected?(socket) do data = Session.register_client(session_pid, self(), socket.assigns.current_user) - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session_id}") + Session.subscribe(session_id) data else diff --git a/lib/livebook_web/live/session_live/attached_live.ex b/lib/livebook_web/live/session_live/attached_live.ex index e008fe66c..489657807 100644 --- a/lib/livebook_web/live/session_live/attached_live.ex +++ b/lib/livebook_web/live/session_live/attached_live.ex @@ -10,7 +10,7 @@ defmodule LivebookWeb.SessionLive.AttachedLive do end if connected?(socket) do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) end {:ok, diff --git a/lib/livebook_web/live/session_live/elixir_standalone_live.ex b/lib/livebook_web/live/session_live/elixir_standalone_live.ex index 0f0e2dc6e..201aeaa82 100644 --- a/lib/livebook_web/live/session_live/elixir_standalone_live.ex +++ b/lib/livebook_web/live/session_live/elixir_standalone_live.ex @@ -10,7 +10,7 @@ defmodule LivebookWeb.SessionLive.ElixirStandaloneLive do end if connected?(socket) do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) end {:ok, assign(socket, session: session, current_runtime: current_runtime, error_message: nil)} diff --git a/lib/livebook_web/live/session_live/embedded_live.ex b/lib/livebook_web/live/session_live/embedded_live.ex index 75fbbcebd..5476e2252 100644 --- a/lib/livebook_web/live/session_live/embedded_live.ex +++ b/lib/livebook_web/live/session_live/embedded_live.ex @@ -10,7 +10,7 @@ defmodule LivebookWeb.SessionLive.EmbeddedLive do end if connected?(socket) do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) end {:ok, assign(socket, session: session, current_runtime: current_runtime)} diff --git a/lib/livebook_web/live/session_live/mix_standalone_live.ex b/lib/livebook_web/live/session_live/mix_standalone_live.ex index f326901a3..99fa2f39f 100644 --- a/lib/livebook_web/live/session_live/mix_standalone_live.ex +++ b/lib/livebook_web/live/session_live/mix_standalone_live.ex @@ -12,7 +12,7 @@ defmodule LivebookWeb.SessionLive.MixStandaloneLive do end if connected?(socket) do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) end {:ok, diff --git a/test/livebook/session_test.exs b/test/livebook/session_test.exs index 036c67f7a..bf3aa9818 100644 --- a/test/livebook/session_test.exs +++ b/test/livebook/session_test.exs @@ -41,7 +41,7 @@ defmodule Livebook.SessionTest do describe "set_notebook_attributes/2" do test "sends an attributes update to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() attrs = %{set_notebook_attributes: true} @@ -52,7 +52,7 @@ defmodule Livebook.SessionTest do describe "insert_section/2" do test "sends an insert opreation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() Session.insert_section(session.pid, 0) @@ -62,7 +62,7 @@ defmodule Livebook.SessionTest do describe "insert_cell/4" do test "sends an insert opreation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() Session.insert_section(session.pid, 0) @@ -75,7 +75,7 @@ defmodule Livebook.SessionTest do describe "delete_section/3" do test "sends a delete opreation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() {section_id, _cell_id} = insert_section_and_cell(session.pid) @@ -87,7 +87,7 @@ defmodule Livebook.SessionTest do describe "delete_cell/2" do test "sends a delete opreation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() {_section_id, cell_id} = insert_section_and_cell(session.pid) @@ -99,7 +99,7 @@ defmodule Livebook.SessionTest do describe "restore_cell/2" do test "sends a restore opreation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() {_section_id, cell_id} = insert_section_and_cell(session.pid) @@ -118,7 +118,7 @@ defmodule Livebook.SessionTest do session = start_session(notebook: notebook) - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() Session.convert_smart_cell(session.pid, smart_cell.id) @@ -140,7 +140,7 @@ defmodule Livebook.SessionTest do runtime = connected_noop_runtime() Session.set_runtime(session.pid, runtime) - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) Session.add_dependencies(session.pid, [{:kino, "~> 0.5.0"}]) @@ -171,7 +171,7 @@ defmodule Livebook.SessionTest do runtime = connected_noop_runtime() Session.set_runtime(session.pid, runtime) - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) Session.add_dependencies(session.pid, [{:kino, "~> 0.5.0"}]) @@ -182,7 +182,7 @@ defmodule Livebook.SessionTest do describe "queue_cell_evaluation/2" do test "triggers evaluation and sends update operation once it finishes", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() {_section_id, cell_id} = insert_section_and_cell(session.pid) @@ -199,7 +199,7 @@ defmodule Livebook.SessionTest do describe "cancel_cell_evaluation/2" do test "sends a cancel evaluation operation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() {_section_id, cell_id} = insert_section_and_cell(session.pid) @@ -213,7 +213,7 @@ defmodule Livebook.SessionTest do describe "set_notebook_name/2" do test "sends a notebook name update operation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() Session.set_notebook_name(session.pid, "Cat's guide to life") @@ -223,7 +223,7 @@ defmodule Livebook.SessionTest do describe "set_section_name/3" do test "sends a section name update operation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() {section_id, _cell_id} = insert_section_and_cell(session.pid) @@ -235,7 +235,7 @@ defmodule Livebook.SessionTest do describe "apply_cell_delta/4" do test "sends a cell delta operation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() {_section_id, cell_id} = insert_section_and_cell(session.pid) @@ -252,7 +252,7 @@ defmodule Livebook.SessionTest do describe "report_cell_revision/3" do test "sends a revision report operation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() {_section_id, cell_id} = insert_section_and_cell(session.pid) @@ -265,7 +265,7 @@ defmodule Livebook.SessionTest do describe "set_cell_attributes/3" do test "sends an attributes update operation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() {_section_id, cell_id} = insert_section_and_cell(session.pid) @@ -278,7 +278,7 @@ defmodule Livebook.SessionTest do describe "connect_runtime/2" do test "sends a runtime update operation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() runtime = connected_noop_runtime() @@ -290,7 +290,7 @@ defmodule Livebook.SessionTest do describe "disconnect_runtime/1" do test "sends a runtime update operation to subscribers", %{session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() runtime = connected_noop_runtime() @@ -310,7 +310,7 @@ defmodule Livebook.SessionTest do @tag :tmp_dir test "sends a file update operation to subscribers", %{session: session, tmp_dir: tmp_dir} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) pid = self() tmp_dir = FileSystem.File.local(tmp_dir <> "/") @@ -327,7 +327,7 @@ defmodule Livebook.SessionTest do file = FileSystem.File.resolve(tmp_dir, "notebook.livemd") start_session(file: file) - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) Session.set_file(session.pid, file) @@ -389,7 +389,7 @@ defmodule Livebook.SessionTest do # Perform a change, so the notebook is dirty Session.set_notebook_name(session.pid, "My notebook") - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) assert {:ok, false} = FileSystem.File.exists?(file) @@ -407,7 +407,7 @@ defmodule Livebook.SessionTest do # Perform a change, so the notebook is dirty Session.set_notebook_name(session.pid, "My notebook") - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) assert {:ok, false} = FileSystem.File.exists?(file) @@ -428,7 +428,7 @@ defmodule Livebook.SessionTest do # Perform a change, so the notebook is dirty Session.set_notebook_name(session.pid, "My notebook") - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) assert {:ok, false} = FileSystem.File.exists?(file) @@ -502,7 +502,7 @@ defmodule Livebook.SessionTest do test "starts a standalone runtime upon first evaluation if there was none set explicitly" do session = start_session() - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) {_section_id, cell_id} = insert_section_and_cell(session.pid) @@ -517,7 +517,7 @@ defmodule Livebook.SessionTest do session = start_session() {:ok, runtime} = Runtime.ElixirStandalone.new() |> Runtime.connect() - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) # Wait for the runtime to be set Session.set_runtime(session.pid, runtime) @@ -535,7 +535,7 @@ defmodule Livebook.SessionTest do user = Livebook.Users.User.new() Session.register_client(session.pid, self(), user) - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) updated_user = %{user | name: "Jake Peralta"} Livebook.Users.broadcast_change(updated_user) @@ -581,7 +581,7 @@ defmodule Livebook.SessionTest do cell_id = code_cell.id - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) Session.queue_cell_evaluation(session.pid, cell_id) assert_receive {:operation, @@ -605,7 +605,7 @@ defmodule Livebook.SessionTest do cell_id = code_cell.id - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) Session.queue_cell_evaluation(session.pid, cell_id) assert_receive {:operation, @@ -630,7 +630,7 @@ defmodule Livebook.SessionTest do {:runtime_smart_cell_definitions, [%{kind: "text", name: "Text", requirement: nil}]} ) - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) send( session.pid, @@ -770,7 +770,7 @@ defmodule Livebook.SessionTest do notebook_glob = Path.join(tmp_dir, "**/*.livemd") - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) Session.save(session.pid) assert_receive {:operation, {:mark_as_not_dirty, _}} diff --git a/test/livebook/sessions_test.exs b/test/livebook/sessions_test.exs index ac8777f84..66f48b941 100644 --- a/test/livebook/sessions_test.exs +++ b/test/livebook/sessions_test.exs @@ -10,7 +10,7 @@ defmodule Livebook.SessionsTest do end test "broadcasts a message to subscribers" do - Phoenix.PubSub.subscribe(Livebook.PubSub, "tracker_sessions") + Sessions.subscribe() {:ok, %{id: id}} = Sessions.create_session() assert_receive {:session_created, %{id: ^id}} end @@ -38,7 +38,7 @@ defmodule Livebook.SessionsTest do describe "update_session/1" do test "broadcasts a message to subscribers" do {:ok, %{id: id} = session} = Sessions.create_session() - Phoenix.PubSub.subscribe(Livebook.PubSub, "tracker_sessions") + Sessions.subscribe() updated_session = %{session | notebook_name: "New name"} Livebook.Sessions.update_session(updated_session) assert_receive {:session_updated, %{id: ^id, notebook_name: "New name"}} diff --git a/test/livebook/users_test.exs b/test/livebook/users_test.exs index 92807472f..5300dd2a3 100644 --- a/test/livebook/users_test.exs +++ b/test/livebook/users_test.exs @@ -7,7 +7,7 @@ defmodule Livebook.UsersTest do describe "broadcast_change/1" do test "notifies subscribers of user change" do user = User.new() - Phoenix.PubSub.subscribe(Livebook.PubSub, "users:#{user.id}") + Users.subscribe(user.id) Users.broadcast_change(user) diff --git a/test/livebook_web/live/home_live_test.exs b/test/livebook_web/live/home_live_test.exs index 666c00c85..5ce53d0bf 100644 --- a/test/livebook_web/live/home_live_test.exs +++ b/test/livebook_web/live/home_live_test.exs @@ -129,7 +129,7 @@ defmodule LivebookWeb.HomeLiveTest do end test "updates UI whenever a session is added or deleted", %{conn: conn} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "tracker_sessions") + Sessions.subscribe() {:ok, view, _} = live(conn, "/") diff --git a/test/livebook_web/live/session_live_test.exs b/test/livebook_web/live/session_live_test.exs index 5b5aba628..39f260eb0 100644 --- a/test/livebook_web/live/session_live_test.exs +++ b/test/livebook_web/live/session_live_test.exs @@ -114,7 +114,7 @@ defmodule LivebookWeb.SessionLiveTest do end test "queueing cell evaluation", %{conn: conn, session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) evaluate_setup(session.pid) section_id = insert_section(session.pid) @@ -131,7 +131,7 @@ defmodule LivebookWeb.SessionLiveTest do end test "reevaluting the setup cell", %{conn: conn, session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) evaluate_setup(session.pid) {:ok, view, _} = live(conn, "/sessions/#{session.id}") @@ -238,7 +238,7 @@ defmodule LivebookWeb.SessionLiveTest do destination: test } - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) insert_cell_with_output(session.pid, section_id, {:input, input}) @@ -267,7 +267,7 @@ defmodule LivebookWeb.SessionLiveTest do destination: test } - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) insert_cell_with_output(session.pid, section_id, {:input, input}) @@ -305,7 +305,7 @@ defmodule LivebookWeb.SessionLiveTest do reset_on_submit: [] } - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) insert_cell_with_output(session.pid, section_id, {:control, form_control}) @@ -330,7 +330,7 @@ defmodule LivebookWeb.SessionLiveTest do describe "outputs" do test "stdout output update", %{conn: conn, session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) evaluate_setup(session.pid) section_id = insert_section(session.pid) @@ -351,7 +351,7 @@ defmodule LivebookWeb.SessionLiveTest do end test "frame output update", %{conn: conn, session: session} do - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) evaluate_setup(session.pid) section_id = insert_section(session.pid) @@ -414,7 +414,7 @@ defmodule LivebookWeb.SessionLiveTest do %{conn: conn, session: session} do {:ok, view, _} = live(conn, "/sessions/#{session.id}/settings/runtime") - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) view |> element("button", "Elixir standalone") @@ -599,7 +599,7 @@ defmodule LivebookWeb.SessionLiveTest do %{conn: conn, session: session} do {:ok, view, _} = live(conn, "/sessions/#{session.id}") - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) user1 = create_user_with_name("Jake Peralta") @@ -635,7 +635,7 @@ defmodule LivebookWeb.SessionLiveTest do {:ok, view, _} = live(conn, "/sessions/#{session.id}") - Phoenix.PubSub.subscribe(Livebook.PubSub, "sessions:#{session.id}") + Session.subscribe(session.id) assert render(view) =~ "Jake Peralta"