mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-09-04 12:04:20 +08:00
Move subscribe function to Broadcasts module
This commit is contained in:
parent
4c7259688d
commit
a9b818c374
11 changed files with 65 additions and 65 deletions
|
@ -139,59 +139,6 @@ defmodule Livebook.Hubs do
|
|||
:ok
|
||||
end
|
||||
|
||||
@doc """
|
||||
Subscribes to one or more subtopics in `"hubs"`.
|
||||
|
||||
## Messages
|
||||
|
||||
Topic `hubs:crud`:
|
||||
|
||||
* `{:hub_changed, hub_id}`
|
||||
|
||||
Topic `hubs:connection`:
|
||||
|
||||
* `{:hub_connected, hub_id}`
|
||||
* `{:hub_connection_failed, hub_id, reason}`
|
||||
* `{:hub_server_error, hub_id, reason}`
|
||||
|
||||
Topic `hubs:secrets`:
|
||||
|
||||
* `{:secret_created, %Secret{}}`
|
||||
* `{:secret_updated, %Secret{}}`
|
||||
* `{:secret_deleted, %Secret{}}`
|
||||
|
||||
Topic `hubs:file_systems`:
|
||||
|
||||
* `{:file_system_created, FileSystem.t()}`
|
||||
* `{:file_system_updated, FileSystem.t()}`
|
||||
* `{:file_system_deleted, FileSystem.t()}`
|
||||
|
||||
"""
|
||||
@spec subscribe(atom() | list(atom())) :: :ok | {:error, term()}
|
||||
def subscribe(topics) when is_list(topics) do
|
||||
for topic <- topics, do: subscribe(topic)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
def subscribe(topic) do
|
||||
Phoenix.PubSub.subscribe(Livebook.PubSub, "hubs:#{topic}")
|
||||
end
|
||||
|
||||
@doc """
|
||||
Unsubscribes from `subscribe/0`.
|
||||
"""
|
||||
@spec unsubscribe(atom() | list(atom())) :: :ok
|
||||
def unsubscribe(topics) when is_list(topics) do
|
||||
for topic <- topics, do: unsubscribe(topic)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
def unsubscribe(topic) do
|
||||
Phoenix.PubSub.unsubscribe(Livebook.PubSub, "hubs:#{topic}")
|
||||
end
|
||||
|
||||
defp to_struct(%{id: "personal-" <> _} = fields) do
|
||||
Provider.load(%Personal{}, fields)
|
||||
end
|
||||
|
|
|
@ -9,6 +9,59 @@ defmodule Livebook.Hubs.Broadcasts do
|
|||
@secrets_topic "hubs:secrets"
|
||||
@file_systems_topic "hubs:file_systems"
|
||||
|
||||
@doc """
|
||||
Subscribes to one or more subtopics in `"hubs"`.
|
||||
|
||||
## Messages
|
||||
|
||||
Topic `hubs:crud`:
|
||||
|
||||
* `{:hub_changed, hub_id}`
|
||||
|
||||
Topic `hubs:connection`:
|
||||
|
||||
* `{:hub_connected, hub_id}`
|
||||
* `{:hub_connection_failed, hub_id, reason}`
|
||||
* `{:hub_server_error, hub_id, reason}`
|
||||
|
||||
Topic `hubs:secrets`:
|
||||
|
||||
* `{:secret_created, %Secret{}}`
|
||||
* `{:secret_updated, %Secret{}}`
|
||||
* `{:secret_deleted, %Secret{}}`
|
||||
|
||||
Topic `hubs:file_systems`:
|
||||
|
||||
* `{:file_system_created, FileSystem.t()}`
|
||||
* `{:file_system_updated, FileSystem.t()}`
|
||||
* `{:file_system_deleted, FileSystem.t()}`
|
||||
|
||||
"""
|
||||
@spec subscribe(atom() | list(atom())) :: :ok | {:error, term()}
|
||||
def subscribe(topics) when is_list(topics) do
|
||||
for topic <- topics, do: subscribe(topic)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
def subscribe(topic) do
|
||||
Phoenix.PubSub.subscribe(Livebook.PubSub, "hubs:#{topic}")
|
||||
end
|
||||
|
||||
@doc """
|
||||
Unsubscribes from `subscribe/0`.
|
||||
"""
|
||||
@spec unsubscribe(atom() | list(atom())) :: :ok
|
||||
def unsubscribe(topics) when is_list(topics) do
|
||||
for topic <- topics, do: unsubscribe(topic)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
def unsubscribe(topic) do
|
||||
Phoenix.PubSub.unsubscribe(Livebook.PubSub, "hubs:#{topic}")
|
||||
end
|
||||
|
||||
@doc """
|
||||
Broadcasts under `#{@crud_topic}` topic when hubs changed.
|
||||
"""
|
||||
|
|
|
@ -113,7 +113,7 @@ defmodule Livebook.NotebookManager do
|
|||
|
||||
@impl true
|
||||
def init(_opts) do
|
||||
Livebook.Hubs.subscribe([:file_systems])
|
||||
Livebook.Hubs.Broadcasts.subscribe([:file_systems])
|
||||
|
||||
{:ok, nil, {:continue, :load_state}}
|
||||
end
|
||||
|
|
|
@ -807,7 +807,7 @@ defmodule Livebook.Session do
|
|||
@impl true
|
||||
def init(opts) do
|
||||
Livebook.Settings.subscribe()
|
||||
Livebook.Hubs.subscribe([:secrets])
|
||||
Livebook.Hubs.Broadcasts.subscribe([:secrets])
|
||||
id = Keyword.fetch!(opts, :id)
|
||||
|
||||
{:ok, worker_pid} = Livebook.Session.Worker.start_link(id)
|
||||
|
|
|
@ -7,7 +7,7 @@ defmodule LivebookWeb.SidebarHook do
|
|||
|
||||
def on_mount(:default, _params, _session, socket) do
|
||||
if connected?(socket) do
|
||||
Livebook.Hubs.subscribe([:crud, :connection])
|
||||
Livebook.Hubs.Broadcasts.subscribe([:crud, :connection])
|
||||
Phoenix.PubSub.subscribe(Livebook.PubSub, "sidebar")
|
||||
end
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ defmodule LivebookWeb.Hub.EditLive do
|
|||
@impl true
|
||||
def mount(_params, _session, socket) do
|
||||
if connected?(socket) do
|
||||
Hubs.subscribe([:connection])
|
||||
Hubs.Broadcasts.subscribe([:connection])
|
||||
end
|
||||
|
||||
{:ok, assign(socket, hub: nil, type: nil, page_title: "Hub - Livebook", params: %{})}
|
||||
|
|
|
@ -4,7 +4,7 @@ defmodule Livebook.Integration.AppsTest do
|
|||
describe "deploy_apps_in_dir/1" do
|
||||
@tag :tmp_dir
|
||||
test "deploys apps with hub secrets", %{user: user, node: node, tmp_dir: tmp_dir} do
|
||||
Livebook.Hubs.subscribe([:secrets])
|
||||
Livebook.Hubs.Broadcasts.subscribe([:secrets])
|
||||
|
||||
hub = create_team_hub(user, node)
|
||||
hub_id = hub.id
|
||||
|
|
|
@ -6,7 +6,7 @@ defmodule Livebook.Hubs.TeamClientTest do
|
|||
@moduletag :capture_log
|
||||
|
||||
setup do
|
||||
Livebook.Hubs.subscribe([:connection, :file_systems, :secrets])
|
||||
Livebook.Hubs.Broadcasts.subscribe([:connection, :file_systems, :secrets])
|
||||
:ok
|
||||
end
|
||||
|
||||
|
|
|
@ -7,7 +7,7 @@ defmodule LivebookWeb.Integration.Hub.EditLiveTest do
|
|||
alias Livebook.Hubs
|
||||
|
||||
setup %{user: user, node: node} do
|
||||
Livebook.Hubs.subscribe([:crud, :connection, :secrets, :file_systems])
|
||||
Livebook.Hubs.Broadcasts.subscribe([:crud, :connection, :secrets, :file_systems])
|
||||
hub = create_team_hub(user, node)
|
||||
id = hub.id
|
||||
|
||||
|
@ -322,7 +322,7 @@ defmodule LivebookWeb.Integration.Hub.EditLiveTest do
|
|||
use Livebook.TeamsIntegrationCase, async: false
|
||||
|
||||
setup %{user: user, node: node} do
|
||||
Livebook.Hubs.subscribe([:crud, :connection, :secrets, :file_systems])
|
||||
Livebook.Hubs.Broadcasts.subscribe([:crud, :connection, :secrets, :file_systems])
|
||||
hub = create_team_hub(user, node)
|
||||
id = hub.id
|
||||
|
||||
|
|
|
@ -90,7 +90,7 @@ defmodule LivebookWeb.Integration.SessionLiveTest do
|
|||
|
||||
test "redirects the user to update or delete a secret",
|
||||
%{conn: conn, user: user, node: node, session: session} do
|
||||
Livebook.Hubs.subscribe([:secrets, :connection])
|
||||
Livebook.Hubs.Broadcasts.subscribe([:secrets, :connection])
|
||||
team = create_team_hub(user, node)
|
||||
id = team.id
|
||||
assert_receive {:hub_connected, ^id}
|
||||
|
@ -278,7 +278,7 @@ defmodule LivebookWeb.Integration.SessionLiveTest do
|
|||
test "shows only hub's file systems",
|
||||
%{conn: conn, user: user, node: node, session: session} do
|
||||
Session.subscribe(session.id)
|
||||
Livebook.Hubs.subscribe([:file_systems])
|
||||
Livebook.Hubs.Broadcasts.subscribe([:file_systems])
|
||||
|
||||
personal_id = Livebook.Hubs.Personal.id()
|
||||
personal_file_system = build(:fs_s3)
|
||||
|
@ -326,7 +326,7 @@ defmodule LivebookWeb.Integration.SessionLiveTest do
|
|||
|
||||
test "shows file system from offline hub", %{conn: conn, session: session} do
|
||||
Session.subscribe(session.id)
|
||||
Livebook.Hubs.subscribe([:file_systems])
|
||||
Livebook.Hubs.Broadcasts.subscribe([:file_systems])
|
||||
|
||||
hub = offline_hub()
|
||||
hub_id = hub.id
|
||||
|
|
|
@ -9,7 +9,7 @@ defmodule LivebookWeb.Hub.EditLiveTest do
|
|||
|
||||
describe "personal" do
|
||||
setup do
|
||||
Livebook.Hubs.subscribe([:crud, :secrets, :file_systems])
|
||||
Livebook.Hubs.Broadcasts.subscribe([:crud, :secrets, :file_systems])
|
||||
{:ok, hub: Hubs.fetch_hub!(Hubs.Personal.id())}
|
||||
end
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue