mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-10-09 21:16:26 +08:00
Improve hubs provider API (#1670)
This commit is contained in:
parent
d47535ac88
commit
887d423007
11 changed files with 174 additions and 87 deletions
|
@ -3,6 +3,7 @@ defmodule Livebook.Hubs do
|
|||
|
||||
alias Livebook.Storage
|
||||
alias Livebook.Hubs.{Broadcasts, Enterprise, Fly, Local, Metadata, Provider}
|
||||
alias Livebook.Secrets.Secret
|
||||
|
||||
@namespace :hubs
|
||||
|
||||
|
@ -22,6 +23,16 @@ defmodule Livebook.Hubs do
|
|||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Gets a list of hubs from storage with given capabilities.
|
||||
"""
|
||||
@spec get_hubs(Provider.capabilities()) :: list(Provider.t())
|
||||
def get_hubs(capabilities) do
|
||||
for hub <- get_hubs(),
|
||||
capability?(hub, capabilities),
|
||||
do: hub
|
||||
end
|
||||
|
||||
@doc """
|
||||
Gets a list of metadatas from storage.
|
||||
"""
|
||||
|
@ -79,10 +90,7 @@ defmodule Livebook.Hubs do
|
|||
@doc false
|
||||
def delete_hub(id) do
|
||||
with {:ok, hub} <- get_hub(id) do
|
||||
if connected_hub = get_connected_hub(hub) do
|
||||
GenServer.stop(connected_hub.pid, :shutdown)
|
||||
end
|
||||
|
||||
:ok = Provider.disconnect(hub)
|
||||
:ok = Storage.delete(@namespace, id)
|
||||
:ok = Broadcasts.hubs_metadata_changed()
|
||||
end
|
||||
|
@ -167,7 +175,9 @@ defmodule Livebook.Hubs do
|
|||
"""
|
||||
@spec connect_hubs() :: :ok
|
||||
def connect_hubs do
|
||||
for hub <- get_hubs(), do: connect_hub(hub)
|
||||
for hub <- get_hubs(),
|
||||
capability?(hub, [:connect]),
|
||||
do: connect_hub(hub)
|
||||
|
||||
:ok
|
||||
end
|
||||
|
@ -181,23 +191,36 @@ defmodule Livebook.Hubs do
|
|||
end
|
||||
|
||||
@doc """
|
||||
Gets a list of connected hubs.
|
||||
|
||||
## Example
|
||||
|
||||
iex> get_connected_hubs()
|
||||
[%{pid: #PID<0.178.0>, hub: %Enterprise{}}, ...]
|
||||
Gets a list of hub secrets.
|
||||
|
||||
It gets from all hubs with secret management.
|
||||
"""
|
||||
@spec get_connected_hubs() :: connected_hubs()
|
||||
def get_connected_hubs do
|
||||
for hub <- get_hubs(), connected = get_connected_hub(hub), do: connected
|
||||
@spec get_secrets() :: list(Secret.t())
|
||||
def get_secrets do
|
||||
for hub <- get_hubs([:secrets]),
|
||||
secret <- Provider.get_secrets(hub),
|
||||
do: secret
|
||||
end
|
||||
|
||||
defp get_connected_hub(hub) do
|
||||
case Registry.lookup(Livebook.HubsRegistry, hub.id) do
|
||||
[{pid, _}] -> %{pid: pid, hub: hub}
|
||||
[] -> nil
|
||||
@doc """
|
||||
Creates a secret for given hub.
|
||||
"""
|
||||
@spec create_secret(Secret.t()) :: :ok | {:error, list({String.t(), list(String.t())})}
|
||||
def create_secret(%Secret{origin: {:hub, id}} = secret) do
|
||||
case get_hub(id) do
|
||||
{:ok, hub} ->
|
||||
if capability?(hub, [:secrets]) do
|
||||
Provider.create_secret(hub, secret)
|
||||
else
|
||||
{:error, %{errors: [{"hub_id", {"is invalid", []}}]}}
|
||||
end
|
||||
|
||||
:error ->
|
||||
{:error, %{errors: [{"hub_id", {"doest not exists", []}}]}}
|
||||
end
|
||||
end
|
||||
|
||||
defp capability?(hub, capabilities) do
|
||||
capabilities -- Provider.capabilities(hub) == []
|
||||
end
|
||||
end
|
||||
|
|
|
@ -105,7 +105,9 @@ defmodule Livebook.Hubs.Enterprise do
|
|||
end
|
||||
|
||||
defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Enterprise do
|
||||
def load(%Livebook.Hubs.Enterprise{} = enterprise, fields) do
|
||||
alias Livebook.Hubs.EnterpriseClient
|
||||
|
||||
def load(enterprise, fields) do
|
||||
%{
|
||||
enterprise
|
||||
| id: fields.id,
|
||||
|
@ -117,7 +119,7 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Enterprise do
|
|||
}
|
||||
end
|
||||
|
||||
def normalize(%Livebook.Hubs.Enterprise{} = enterprise) do
|
||||
def normalize(enterprise) do
|
||||
%Livebook.Hubs.Metadata{
|
||||
id: enterprise.id,
|
||||
name: enterprise.hub_name,
|
||||
|
@ -128,10 +130,35 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Enterprise do
|
|||
|
||||
def type(_enterprise), do: "enterprise"
|
||||
|
||||
def connect(%Livebook.Hubs.Enterprise{} = enterprise),
|
||||
do: {Livebook.Hubs.EnterpriseClient, enterprise}
|
||||
def connect(enterprise), do: {EnterpriseClient, enterprise}
|
||||
|
||||
def connected?(%Livebook.Hubs.Enterprise{id: id}) do
|
||||
Livebook.Hubs.EnterpriseClient.connected?(id)
|
||||
def connected?(enterprise) do
|
||||
EnterpriseClient.connected?(enterprise.id)
|
||||
end
|
||||
|
||||
def disconnect(enterprise) do
|
||||
EnterpriseClient.stop(enterprise.id)
|
||||
end
|
||||
|
||||
def capabilities(_enterprise), do: [:connect, :secrets]
|
||||
|
||||
def get_secrets(enterprise) do
|
||||
EnterpriseClient.get_secrets(enterprise.id)
|
||||
end
|
||||
|
||||
def create_secret(enterprise, %Livebook.Secrets.Secret{name: name, value: value}) do
|
||||
create_secret_request = LivebookProto.CreateSecretRequest.new!(name: name, value: value)
|
||||
|
||||
case EnterpriseClient.send_request(enterprise.id, create_secret_request) do
|
||||
{:create_secret, _} ->
|
||||
:ok
|
||||
|
||||
{:changeset_error, errors} ->
|
||||
errors =
|
||||
for {field, values} <- errors,
|
||||
do: {to_string(field), values}
|
||||
|
||||
{:error, %{errors: errors}}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -8,9 +8,12 @@ defmodule Livebook.Hubs.EnterpriseClient do
|
|||
alias Livebook.WebSocket.ClientConnection
|
||||
|
||||
@registry Livebook.HubsRegistry
|
||||
@supervisor Livebook.HubsSupervisor
|
||||
|
||||
defstruct [:server, :hub, connected?: false, secrets: []]
|
||||
|
||||
@type registry_name :: {:via, Registry, {Livebook.HubsRegistry, String.t()}}
|
||||
|
||||
@doc """
|
||||
Connects the Enterprise client with WebSocket server.
|
||||
"""
|
||||
|
@ -22,16 +25,23 @@ defmodule Livebook.Hubs.EnterpriseClient do
|
|||
@doc """
|
||||
Stops the WebSocket server.
|
||||
"""
|
||||
@spec stop(pid()) :: :ok
|
||||
def stop(pid) do
|
||||
pid |> GenServer.call(:get_server) |> GenServer.stop()
|
||||
GenServer.stop(pid)
|
||||
@spec stop(String.t()) :: :ok
|
||||
def stop(id) do
|
||||
if pid = GenServer.whereis(registry_name(id)) do
|
||||
DynamicSupervisor.terminate_child(@supervisor, pid)
|
||||
end
|
||||
|
||||
:ok
|
||||
end
|
||||
|
||||
@doc """
|
||||
Sends a request to the WebSocket server.
|
||||
"""
|
||||
@spec send_request(pid(), WebSocket.proto()) :: {atom(), term()}
|
||||
@spec send_request(String.t() | registry_name() | pid(), WebSocket.proto()) :: {atom(), term()}
|
||||
def send_request(id, %_struct{} = data) when is_binary(id) do
|
||||
send_request(registry_name(id), data)
|
||||
end
|
||||
|
||||
def send_request(pid, %_struct{} = data) do
|
||||
ClientConnection.send_request(GenServer.call(pid, :get_server), data)
|
||||
end
|
||||
|
@ -39,9 +49,9 @@ defmodule Livebook.Hubs.EnterpriseClient do
|
|||
@doc """
|
||||
Returns a list of cached secrets.
|
||||
"""
|
||||
@spec list_cached_secrets(pid()) :: list(Secret.t())
|
||||
def list_cached_secrets(pid) do
|
||||
GenServer.call(pid, :list_cached_secrets)
|
||||
@spec get_secrets(String.t()) :: list(Secret.t())
|
||||
def get_secrets(id) do
|
||||
GenServer.call(registry_name(id), :get_secrets)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -51,8 +61,7 @@ defmodule Livebook.Hubs.EnterpriseClient do
|
|||
def connected?(id) do
|
||||
GenServer.call(registry_name(id), :connected?)
|
||||
catch
|
||||
:exit, _ ->
|
||||
false
|
||||
:exit, _ -> false
|
||||
end
|
||||
|
||||
## GenServer callbacks
|
||||
|
@ -70,7 +79,7 @@ defmodule Livebook.Hubs.EnterpriseClient do
|
|||
{:reply, state.server, state}
|
||||
end
|
||||
|
||||
def handle_call(:list_cached_secrets, _caller, state) do
|
||||
def handle_call(:get_secrets, _caller, state) do
|
||||
{:reply, state.secrets, state}
|
||||
end
|
||||
|
||||
|
|
|
@ -110,7 +110,7 @@ defmodule Livebook.Hubs.Fly do
|
|||
end
|
||||
|
||||
defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Fly do
|
||||
def load(%Livebook.Hubs.Fly{} = fly, fields) do
|
||||
def load(fly, fields) do
|
||||
%{
|
||||
fly
|
||||
| id: fields.id,
|
||||
|
@ -124,7 +124,7 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Fly do
|
|||
}
|
||||
end
|
||||
|
||||
def normalize(%Livebook.Hubs.Fly{} = fly) do
|
||||
def normalize(fly) do
|
||||
%Livebook.Hubs.Metadata{
|
||||
id: fly.id,
|
||||
name: fly.hub_name,
|
||||
|
@ -138,4 +138,12 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Fly do
|
|||
def connect(_fly), do: nil
|
||||
|
||||
def connected?(_fly), do: false
|
||||
|
||||
def disconnect(_fly), do: :ok
|
||||
|
||||
def capabilities(_fly), do: []
|
||||
|
||||
def get_secrets(_fly), do: []
|
||||
|
||||
def create_secret(_fly, _secret), do: :ok
|
||||
end
|
||||
|
|
|
@ -5,11 +5,11 @@ defmodule Livebook.Hubs.Local do
|
|||
end
|
||||
|
||||
defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Local do
|
||||
def load(%Livebook.Hubs.Local{} = local, fields) do
|
||||
def load(local, fields) do
|
||||
%{local | id: fields.id, hub_name: fields.hub_name, hub_emoji: fields.hub_emoji}
|
||||
end
|
||||
|
||||
def normalize(%Livebook.Hubs.Local{} = local) do
|
||||
def normalize(local) do
|
||||
%Livebook.Hubs.Metadata{
|
||||
id: local.id,
|
||||
name: local.hub_name,
|
||||
|
@ -23,4 +23,12 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Local do
|
|||
def connect(_local), do: nil
|
||||
|
||||
def connected?(_local), do: false
|
||||
|
||||
def disconnect(_local), do: :ok
|
||||
|
||||
def capabilities(_local), do: []
|
||||
|
||||
def get_secrets(_local), do: []
|
||||
|
||||
def create_secret(_local, _secret), do: :ok
|
||||
end
|
||||
|
|
|
@ -1,33 +1,63 @@
|
|||
defprotocol Livebook.Hubs.Provider do
|
||||
@moduledoc false
|
||||
|
||||
alias Livebook.Secrets.Secret
|
||||
|
||||
@type capability :: :connect | :secrets
|
||||
@type capabilities :: list(capability())
|
||||
@type changeset_errors :: %{required(:errors) => list({String.t(), {Stirng.t(), list()}})}
|
||||
|
||||
@doc """
|
||||
Normalize given struct to `Livebook.Hubs.Metadata` struct.
|
||||
Normalize given hub to `Livebook.Hubs.Metadata` struct.
|
||||
"""
|
||||
@spec normalize(struct()) :: Livebook.Hubs.Metadata.t()
|
||||
def normalize(struct)
|
||||
|
||||
@doc """
|
||||
Loads fields into given struct.
|
||||
Loads fields into given hub.
|
||||
"""
|
||||
@spec load(struct(), map() | keyword()) :: struct()
|
||||
def load(struct, fields)
|
||||
|
||||
@doc """
|
||||
Gets the type from struct.
|
||||
Gets the type from hub.
|
||||
"""
|
||||
@spec type(struct()) :: String.t()
|
||||
def type(struct)
|
||||
|
||||
@doc """
|
||||
Gets the child spec of the given struct.
|
||||
Gets the child spec of the given hub.
|
||||
"""
|
||||
@spec connect(struct()) :: Supervisor.child_spec() | module() | {module(), any()} | nil
|
||||
def connect(struct)
|
||||
|
||||
@doc """
|
||||
Gets the connection status of the given struct.
|
||||
Gets the connection status of the given hub.
|
||||
"""
|
||||
@spec connected?(struct()) :: boolean()
|
||||
def connected?(struct)
|
||||
|
||||
@doc """
|
||||
Disconnects the given hub.
|
||||
"""
|
||||
@spec disconnect(struct()) :: :ok
|
||||
def disconnect(struct)
|
||||
|
||||
@doc """
|
||||
Gets the capabilities of the given hub.
|
||||
"""
|
||||
@spec capabilities(struct()) :: capabilities()
|
||||
def capabilities(struct)
|
||||
|
||||
@doc """
|
||||
Gets the secrets of the given hub.
|
||||
"""
|
||||
@spec get_secrets(struct()) :: list(Secret.t())
|
||||
def get_secrets(struct)
|
||||
|
||||
@doc """
|
||||
Creates a secret of the given hub.
|
||||
"""
|
||||
@spec create_secret(struct(), Secret.t()) :: :ok | {:error, changeset_errors()}
|
||||
def create_secret(struct, secret)
|
||||
end
|
||||
|
|
|
@ -31,6 +31,16 @@ defmodule Livebook.Secrets do
|
|||
to_struct(fields)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Gets a secret from storage.
|
||||
"""
|
||||
@spec get_secret(String.t()) :: {:ok, Secret.t()} | :error
|
||||
def get_secret(id) do
|
||||
with {:ok, fields} <- Storage.fetch(:secrets, id) do
|
||||
{:ok, to_struct(fields)}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Checks if the secret already exists.
|
||||
"""
|
||||
|
@ -66,8 +76,7 @@ defmodule Livebook.Secrets do
|
|||
"""
|
||||
@spec unset_secret(String.t()) :: :ok
|
||||
def unset_secret(id) do
|
||||
if secret_exists?(id) do
|
||||
secret = fetch_secret!(id)
|
||||
with {:ok, secret} <- get_secret(id) do
|
||||
Storage.delete(:secrets, id)
|
||||
broadcast_secrets_change({:unset_secret, secret})
|
||||
end
|
||||
|
|
|
@ -122,7 +122,7 @@ defmodule LivebookWeb.Hub.New.EnterpriseComponent do
|
|||
|
||||
receive do
|
||||
{:hub_connection_failed, reason} ->
|
||||
EnterpriseClient.stop(pid)
|
||||
EnterpriseClient.stop(base.id)
|
||||
|
||||
{:noreply,
|
||||
socket
|
||||
|
@ -141,7 +141,7 @@ defmodule LivebookWeb.Hub.New.EnterpriseComponent do
|
|||
{:noreply, assign(socket, pid: pid, changeset: changeset, base: base)}
|
||||
|
||||
{:error, reason} ->
|
||||
EnterpriseClient.stop(pid)
|
||||
EnterpriseClient.stop(base.id)
|
||||
|
||||
{:noreply,
|
||||
socket
|
||||
|
|
|
@ -9,7 +9,6 @@ defmodule LivebookWeb.SessionLive do
|
|||
alias Livebook.Notebook.{Cell, ContentLoader}
|
||||
alias Livebook.JSInterop
|
||||
alias Livebook.Hubs
|
||||
alias Livebook.Hubs.EnterpriseClient
|
||||
|
||||
on_mount LivebookWeb.SidebarHook
|
||||
|
||||
|
@ -2277,11 +2276,6 @@ defmodule LivebookWeb.SessionLive do
|
|||
end
|
||||
|
||||
defp get_saved_secrets do
|
||||
hub_secrets =
|
||||
for connected_hub <- Hubs.get_connected_hubs(),
|
||||
secret <- EnterpriseClient.list_cached_secrets(connected_hub.pid),
|
||||
do: secret
|
||||
|
||||
Enum.sort(hub_secrets ++ Secrets.get_secrets())
|
||||
Enum.sort(Hubs.get_secrets() ++ Secrets.get_secrets())
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
defmodule LivebookWeb.SessionLive.SecretsComponent do
|
||||
use LivebookWeb, :live_component
|
||||
|
||||
alias Livebook.Hubs.EnterpriseClient
|
||||
alias Livebook.Secrets.Secret
|
||||
|
||||
@impl true
|
||||
|
@ -9,7 +8,7 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do
|
|||
socket =
|
||||
socket
|
||||
|> assign(assigns)
|
||||
|> assign(connected_hubs: Livebook.Hubs.get_connected_hubs())
|
||||
|> assign(hubs: Livebook.Hubs.get_hubs([:secrets]))
|
||||
|
||||
prefill_form = prefill_secret_name(socket)
|
||||
|
||||
|
@ -127,17 +126,12 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do
|
|||
<%= if Livebook.Config.feature_flag_enabled?(:hub) do %>
|
||||
<%= label class: "flex items-center gap-2 text-gray-600" do %>
|
||||
<%= radio_button(f, :store, "hub",
|
||||
disabled: @connected_hubs == [],
|
||||
disabled: @hubs == [],
|
||||
checked: @data["store"] == "hub"
|
||||
) %> in the Hub
|
||||
<% end %>
|
||||
<%= if @data["store"] == "hub" do %>
|
||||
<%= select(
|
||||
f,
|
||||
:connected_hub,
|
||||
connected_hubs_options(@connected_hubs, @data["connected_hub"]),
|
||||
class: "input"
|
||||
) %>
|
||||
<%= select(f, :hub_id, hubs_options(@hubs, @data["hub_id"]), class: "input") %>
|
||||
<% end %>
|
||||
<% end %>
|
||||
</div>
|
||||
|
@ -323,7 +317,7 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do
|
|||
|
||||
defp build_origin(%{"store" => "session"}), do: :session
|
||||
defp build_origin(%{"store" => "app"}), do: :app
|
||||
defp build_origin(%{"store" => "hub", "connected_hub" => id}), do: {:hub, id}
|
||||
defp build_origin(%{"store" => "hub", "hub_id" => id}), do: {:hub, id}
|
||||
|
||||
defp build_attrs(%{"name" => name, "value" => value} = attrs) do
|
||||
%{name: name, value: value, origin: build_origin(attrs)}
|
||||
|
@ -338,21 +332,8 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do
|
|||
Livebook.Session.set_secret(socket.assigns.session.pid, secret)
|
||||
end
|
||||
|
||||
defp set_secret(socket, %Secret{origin: {:hub, id}} = secret) when is_binary(id) do
|
||||
if hub = Enum.find(socket.assigns.connected_hubs, &(&1.hub.id == id)) do
|
||||
create_secret_request =
|
||||
LivebookProto.CreateSecretRequest.new!(
|
||||
name: secret.name,
|
||||
value: secret.value
|
||||
)
|
||||
|
||||
case EnterpriseClient.send_request(hub.pid, create_secret_request) do
|
||||
{:create_secret, _} -> :ok
|
||||
{:error, reason} -> {:error, put_flash(socket, :error, reason)}
|
||||
end
|
||||
else
|
||||
{:error, %{errors: [{"connected_hub", {"can't be blank", []}}]}}
|
||||
end
|
||||
defp set_secret(_socket, %Secret{origin: {:hub, id}} = secret) when is_binary(id) do
|
||||
Livebook.Hubs.create_secret(secret)
|
||||
end
|
||||
|
||||
defp grant_access(secrets, secret_name, origin, socket) do
|
||||
|
@ -385,11 +366,10 @@ defmodule LivebookWeb.SessionLive.SecretsComponent do
|
|||
Enum.any?(socket.assigns.saved_secrets, &(&1.name == secret_name))
|
||||
end
|
||||
|
||||
# TODO: Livebook.Hubs.fetch_hubs_with_secrets_storage()
|
||||
defp connected_hubs_options(connected_hubs, selected_hub) do
|
||||
defp hubs_options(hubs, hub_id) do
|
||||
[[key: "Select one Hub", value: "", selected: true, disabled: true]] ++
|
||||
for %{hub: %{id: id, hub_name: name}} <- connected_hubs do
|
||||
[key: name, value: id, selected: id == selected_hub]
|
||||
for hub <- hubs do
|
||||
[key: hub.hub_name, value: hub.id, selected: hub.id == hub_id]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -8,8 +8,7 @@ defmodule LivebookWeb.SessionLive.SecretsComponentTest do
|
|||
|
||||
describe "enterprise" do
|
||||
setup %{url: url, token: token} do
|
||||
node = EnterpriseServer.get_node()
|
||||
id = :erpc.call(node, Enterprise.Integration, :fetch_env!, ["ENTERPRISE_ID"])
|
||||
id = Livebook.Utils.random_short_id()
|
||||
hub_id = "enterprise-#{id}"
|
||||
|
||||
Livebook.Hubs.subscribe([:connection, :secrets])
|
||||
|
@ -65,7 +64,7 @@ defmodule LivebookWeb.SessionLive.SecretsComponentTest do
|
|||
name: secret.name,
|
||||
value: secret.value,
|
||||
store: "hub",
|
||||
connected_hub: enterprise.id
|
||||
hub_id: enterprise.id
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue