mirror of
https://github.com/livebook-dev/livebook.git
synced 2024-09-20 10:05:57 +08:00
Implement File Systems management directly from Hub and handle their events (#2201)
This commit is contained in:
parent
4564af3fb3
commit
7f1feec7d9
|
@ -33,16 +33,22 @@ defmodule Livebook.FileSystem.S3 do
|
|||
|
||||
* `:external_id` - the external id from Teams.
|
||||
|
||||
* `:prefix` - the id prefix.
|
||||
|
||||
"""
|
||||
@spec new(String.t(), String.t(), String.t(), keyword()) :: t()
|
||||
def new(bucket_url, access_key_id, secret_access_key, opts \\ []) do
|
||||
opts = Keyword.validate!(opts, [:region, :external_id])
|
||||
opts = Keyword.validate!(opts, [:region, :external_id, :prefix])
|
||||
|
||||
bucket_url = String.trim_trailing(bucket_url, "/")
|
||||
region = opts[:region] || region_from_uri(bucket_url)
|
||||
|
||||
hash = :crypto.hash(:sha256, bucket_url) |> Base.url_encode64(padding: false)
|
||||
id = "s3-#{hash}"
|
||||
|
||||
id =
|
||||
if prefix = opts[:prefix],
|
||||
do: "#{prefix}-s3-#{hash}",
|
||||
else: "s3-#{hash}"
|
||||
|
||||
%__MODULE__{
|
||||
id: id,
|
||||
|
@ -371,14 +377,16 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
|
|||
external_id: fields["external_id"],
|
||||
region: fields["region"],
|
||||
access_key_id: fields["access_key_id"],
|
||||
secret_access_key: fields["secret_access_key"]
|
||||
secret_access_key: fields["secret_access_key"],
|
||||
prefix: fields["prefix"]
|
||||
})
|
||||
end
|
||||
|
||||
def load(_file_system, fields) do
|
||||
S3.new(fields.bucket_url, fields.access_key_id, fields.secret_access_key,
|
||||
region: fields[:region],
|
||||
external_id: fields[:external_id]
|
||||
external_id: fields[:external_id],
|
||||
prefix: fields[:prefix]
|
||||
)
|
||||
end
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
defmodule Livebook.Hubs do
|
||||
@moduledoc false
|
||||
|
||||
alias Livebook.FileSystem
|
||||
alias Livebook.Storage
|
||||
alias Livebook.Hubs.{Broadcasts, Metadata, Personal, Provider, Team}
|
||||
alias Livebook.Secrets.Secret
|
||||
|
@ -294,4 +295,45 @@ defmodule Livebook.Hubs do
|
|||
def verify_notebook_stamp(hub, notebook_source, stamp) do
|
||||
Provider.verify_notebook_stamp(hub, notebook_source, stamp)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Gets a list of file systems for given hub.
|
||||
"""
|
||||
@spec get_file_systems(Provider.t()) :: list(FileSystem.t())
|
||||
def get_file_systems(hub) do
|
||||
hub_file_systems = Provider.get_file_systems(hub)
|
||||
local_file_system = Livebook.Config.local_file_system()
|
||||
|
||||
[local_file_system | Enum.sort_by(hub_file_systems, & &1.id)]
|
||||
end
|
||||
|
||||
@doc """
|
||||
Creates a file system for given hub.
|
||||
"""
|
||||
@spec create_file_system(Provider.t(), FileSystem.t()) ::
|
||||
:ok
|
||||
| {:error, Ecto.Changeset.t()}
|
||||
| {:transport_error, String.t()}
|
||||
def create_file_system(hub, file_system) do
|
||||
Provider.create_file_system(hub, file_system)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Updates a file system for given hub.
|
||||
"""
|
||||
@spec update_file_system(Provider.t(), FileSystem.t()) ::
|
||||
:ok
|
||||
| {:error, Ecto.Changeset.t()}
|
||||
| {:transport_error, String.t()}
|
||||
def update_file_system(hub, file_system) do
|
||||
Provider.update_file_system(hub, file_system)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Deletes a file system for given hub.
|
||||
"""
|
||||
@spec delete_file_system(Provider.t(), FileSystem.t()) :: :ok | {:transport_error, String.t()}
|
||||
def delete_file_system(hub, file_system) do
|
||||
Provider.delete_file_system(hub, file_system)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
defmodule Livebook.Hubs.Broadcasts do
|
||||
@moduledoc false
|
||||
|
||||
alias Livebook.FileSystem
|
||||
alias Livebook.Secrets.Secret
|
||||
|
||||
@type broadcast :: :ok | {:error, term()}
|
||||
|
@ -8,6 +9,7 @@ defmodule Livebook.Hubs.Broadcasts do
|
|||
@crud_topic "hubs:crud"
|
||||
@connection_topic "hubs:connection"
|
||||
@secrets_topic "hubs:secrets"
|
||||
@file_systems_topic "hubs:file_systems"
|
||||
|
||||
@doc """
|
||||
Broadcasts under `#{@crud_topic}` topic when hubs changed.
|
||||
|
@ -65,6 +67,32 @@ defmodule Livebook.Hubs.Broadcasts do
|
|||
broadcast(@secrets_topic, {:secret_deleted, secret})
|
||||
end
|
||||
|
||||
@allowed_file_systems [FileSystem.S3]
|
||||
|
||||
@doc """
|
||||
Broadcasts under `#{@file_systems_topic}` topic when hub received a new file system.
|
||||
"""
|
||||
@spec file_system_created(FileSystem.t()) :: broadcast()
|
||||
def file_system_created(%struct{} = file_system) when struct in @allowed_file_systems do
|
||||
broadcast(@file_systems_topic, {:file_system_created, file_system})
|
||||
end
|
||||
|
||||
@doc """
|
||||
Broadcasts under `#{@file_systems_topic}` topic when hub received an updated file system.
|
||||
"""
|
||||
@spec file_system_updated(FileSystem.t()) :: broadcast()
|
||||
def file_system_updated(%struct{} = file_system) when struct in @allowed_file_systems do
|
||||
broadcast(@file_systems_topic, {:file_system_updated, file_system})
|
||||
end
|
||||
|
||||
@doc """
|
||||
Broadcasts under `#{@file_systems_topic}` topic when hub received a deleted file system.
|
||||
"""
|
||||
@spec file_system_deleted(FileSystem.t()) :: broadcast()
|
||||
def file_system_deleted(%struct{} = file_system) when struct in @allowed_file_systems do
|
||||
broadcast(@file_systems_topic, {:file_system_deleted, file_system})
|
||||
end
|
||||
|
||||
defp broadcast(topic, message) do
|
||||
Phoenix.PubSub.broadcast(Livebook.PubSub, topic, message)
|
||||
end
|
||||
|
|
|
@ -4,6 +4,8 @@ defmodule Livebook.Hubs.Personal do
|
|||
use Ecto.Schema
|
||||
import Ecto.Changeset
|
||||
|
||||
alias Livebook.FileSystem
|
||||
alias Livebook.FileSystems
|
||||
alias Livebook.Hubs
|
||||
alias Livebook.Storage
|
||||
alias Livebook.Secrets.Secret
|
||||
|
@ -11,6 +13,8 @@ defmodule Livebook.Hubs.Personal do
|
|||
@secrets_namespace :hub_secrets
|
||||
@secret_key_size 64
|
||||
|
||||
@file_systems_namespace :file_systems
|
||||
|
||||
@type t :: %__MODULE__{
|
||||
id: String.t() | nil,
|
||||
hub_name: String.t() | nil,
|
||||
|
@ -131,6 +135,52 @@ defmodule Livebook.Hubs.Personal do
|
|||
def generate_secret_key() do
|
||||
Base.url_encode64(:crypto.strong_rand_bytes(@secret_key_size), padding: false)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Get the file systems list from storage.
|
||||
"""
|
||||
@spec get_file_systems() :: list(FileSystem.t())
|
||||
def get_file_systems() do
|
||||
Storage.all(@file_systems_namespace)
|
||||
|> Enum.sort_by(& &1.bucket_url)
|
||||
|> Enum.map(&to_file_system/1)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Gets a file system from storage.
|
||||
|
||||
Raises `RuntimeError` if the secret doesn't exist.
|
||||
"""
|
||||
@spec fetch_file_system!(String.t()) :: FileSystem.t()
|
||||
def fetch_file_system!(id) do
|
||||
Storage.fetch!(@file_systems_namespace, id) |> to_file_system()
|
||||
end
|
||||
|
||||
@doc """
|
||||
Saves a new file system to the configured ones.
|
||||
"""
|
||||
@spec save_file_system(FileSystem.t()) :: FileSystem.t()
|
||||
def save_file_system(file_system) do
|
||||
attributes = FileSystem.dump(file_system)
|
||||
type = FileSystems.type(file_system)
|
||||
storage_attributes = Map.put(attributes, :type, type)
|
||||
|
||||
:ok = Storage.insert(@file_systems_namespace, file_system.id, Map.to_list(storage_attributes))
|
||||
|
||||
file_system
|
||||
end
|
||||
|
||||
@doc """
|
||||
Removes the given file system from the configured ones.
|
||||
"""
|
||||
@spec remove_file_system(FileSystem.id()) :: :ok
|
||||
def remove_file_system(id) do
|
||||
Storage.delete(@file_systems_namespace, id)
|
||||
end
|
||||
|
||||
defp to_file_system(fields) do
|
||||
FileSystems.load(fields.type, fields)
|
||||
end
|
||||
end
|
||||
|
||||
defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Personal do
|
||||
|
@ -205,4 +255,23 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Personal do
|
|||
def dump(personal) do
|
||||
Map.from_struct(personal)
|
||||
end
|
||||
|
||||
def get_file_systems(_personal) do
|
||||
Personal.get_file_systems()
|
||||
end
|
||||
|
||||
def create_file_system(_personal, file_system) do
|
||||
Personal.save_file_system(file_system)
|
||||
:ok = Broadcasts.file_system_created(file_system)
|
||||
end
|
||||
|
||||
def update_file_system(_personal, file_system) do
|
||||
Personal.save_file_system(file_system)
|
||||
:ok = Broadcasts.file_system_updated(file_system)
|
||||
end
|
||||
|
||||
def delete_file_system(_personal, file_system) do
|
||||
:ok = Personal.remove_file_system(file_system.id)
|
||||
:ok = Broadcasts.file_system_deleted(file_system)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
defprotocol Livebook.Hubs.Provider do
|
||||
@moduledoc false
|
||||
|
||||
alias Livebook.FileSystem
|
||||
alias Livebook.Secrets.Secret
|
||||
|
||||
@typedoc """
|
||||
|
@ -109,4 +110,34 @@ defprotocol Livebook.Hubs.Provider do
|
|||
"""
|
||||
@spec dump(t()) :: map()
|
||||
def dump(hub)
|
||||
|
||||
@doc """
|
||||
Gets the file systems of given hub.
|
||||
"""
|
||||
@spec get_file_systems(t()) :: list(FileSystem.t())
|
||||
def get_file_systems(hub)
|
||||
|
||||
@doc """
|
||||
Creates a file system of the given hub.
|
||||
"""
|
||||
@spec create_file_system(t(), FileSystem.t()) ::
|
||||
:ok
|
||||
| {:error, Ecto.Changeset.t()}
|
||||
| {:transport_error, String.t()}
|
||||
def create_file_system(hub, file_system)
|
||||
|
||||
@doc """
|
||||
Updates a file system of the given hub.
|
||||
"""
|
||||
@spec update_file_system(t(), FileSystem.t()) ::
|
||||
:ok
|
||||
| {:error, Ecto.Changeset.t()}
|
||||
| {:transport_error, String.t()}
|
||||
def update_file_system(hub, file_system)
|
||||
|
||||
@doc """
|
||||
Deletes a file system of the given hub.
|
||||
"""
|
||||
@spec delete_file_system(t(), FileSystem.t()) :: :ok | {:transport_error, String.t()}
|
||||
def delete_file_system(hub, file_system)
|
||||
end
|
||||
|
|
|
@ -185,4 +185,14 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Team do
|
|||
|> Map.from_struct()
|
||||
|> Map.delete(:offline)
|
||||
end
|
||||
|
||||
def get_file_systems(team) do
|
||||
TeamClient.get_file_systems(team.id)
|
||||
end
|
||||
|
||||
def create_file_system(team, file_system), do: Teams.create_file_system(team, file_system)
|
||||
|
||||
def update_file_system(team, file_system), do: Teams.update_file_system(team, file_system)
|
||||
|
||||
def delete_file_system(team, file_system), do: Teams.delete_file_system(team, file_system)
|
||||
end
|
||||
|
|
|
@ -3,23 +3,31 @@ defmodule Livebook.Hubs.TeamClient do
|
|||
use GenServer
|
||||
require Logger
|
||||
|
||||
alias Livebook.Hubs.Broadcasts
|
||||
alias Livebook.Hubs.Team
|
||||
alias Livebook.FileSystem
|
||||
alias Livebook.FileSystems
|
||||
alias Livebook.Hubs
|
||||
alias Livebook.Secrets
|
||||
alias Livebook.Teams
|
||||
alias Livebook.Teams.Connection
|
||||
|
||||
@registry Livebook.HubsRegistry
|
||||
@supervisor Livebook.HubsSupervisor
|
||||
|
||||
defstruct [:hub, :connection_error, :derived_keys, connected?: false, secrets: []]
|
||||
defstruct [
|
||||
:hub,
|
||||
:connection_error,
|
||||
:derived_keys,
|
||||
connected?: false,
|
||||
secrets: [],
|
||||
file_systems: []
|
||||
]
|
||||
|
||||
@type registry_name :: {:via, Registry, {Livebook.HubsRegistry, String.t()}}
|
||||
|
||||
@doc """
|
||||
Connects the Team client with WebSocket server.
|
||||
"""
|
||||
@spec start_link(Team.t()) :: GenServer.on_start()
|
||||
def start_link(%Team{} = team) do
|
||||
@spec start_link(Hubs.Team.t()) :: GenServer.on_start()
|
||||
def start_link(%Hubs.Team{} = team) do
|
||||
GenServer.start_link(__MODULE__, team, name: registry_name(team.id))
|
||||
end
|
||||
|
||||
|
@ -38,11 +46,19 @@ defmodule Livebook.Hubs.TeamClient do
|
|||
@doc """
|
||||
Returns a list of cached secrets.
|
||||
"""
|
||||
@spec get_secrets(String.t()) :: list(Secret.t())
|
||||
@spec get_secrets(String.t()) :: list(Secrets.Secret.t())
|
||||
def get_secrets(id) do
|
||||
GenServer.call(registry_name(id), :get_secrets)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Returns a list of cached file systems.
|
||||
"""
|
||||
@spec get_file_systems(String.t()) :: list(FileSystem.t())
|
||||
def get_file_systems(id) do
|
||||
GenServer.call(registry_name(id), :get_file_systems)
|
||||
end
|
||||
|
||||
@doc """
|
||||
Returns the latest error from connection.
|
||||
"""
|
||||
|
@ -66,7 +82,7 @@ defmodule Livebook.Hubs.TeamClient do
|
|||
## GenServer callbacks
|
||||
|
||||
@impl true
|
||||
def init(%Team{offline: nil} = team) do
|
||||
def init(%Hubs.Team{offline: nil} = team) do
|
||||
derived_keys = Teams.derive_keys(team.teams_key)
|
||||
|
||||
headers = [
|
||||
|
@ -77,11 +93,11 @@ defmodule Livebook.Hubs.TeamClient do
|
|||
{"x-session-token", team.session_token}
|
||||
]
|
||||
|
||||
{:ok, _pid} = Connection.start_link(self(), headers)
|
||||
{:ok, _pid} = Teams.Connection.start_link(self(), headers)
|
||||
{:ok, %__MODULE__{hub: team, derived_keys: derived_keys}}
|
||||
end
|
||||
|
||||
def init(%Team{} = team) do
|
||||
def init(%Hubs.Team{} = team) do
|
||||
derived_keys = Teams.derive_keys(team.teams_key)
|
||||
|
||||
{:ok, %__MODULE__{hub: team, secrets: team.offline.secrets, derived_keys: derived_keys}}
|
||||
|
@ -100,20 +116,24 @@ defmodule Livebook.Hubs.TeamClient do
|
|||
{:reply, state.secrets, state}
|
||||
end
|
||||
|
||||
def handle_call(:get_file_systems, _caller, state) do
|
||||
{:reply, state.file_systems, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:connected, state) do
|
||||
Broadcasts.hub_connected(state.hub.id)
|
||||
Hubs.Broadcasts.hub_connected(state.hub.id)
|
||||
{:noreply, %{state | connected?: true, connection_error: nil}}
|
||||
end
|
||||
|
||||
def handle_info({:connection_error, reason}, state) do
|
||||
Broadcasts.hub_connection_failed(state.hub.id, reason)
|
||||
Hubs.Broadcasts.hub_connection_failed(state.hub.id, reason)
|
||||
{:noreply, %{state | connected?: false, connection_error: reason}}
|
||||
end
|
||||
|
||||
def handle_info({:server_error, reason}, state) do
|
||||
Broadcasts.hub_server_error(state.hub.id, "#{state.hub.hub_name}: #{reason}")
|
||||
:ok = Livebook.Hubs.delete_hub(state.hub.id)
|
||||
Hubs.Broadcasts.hub_server_error(state.hub.id, "#{state.hub.hub_name}: #{reason}")
|
||||
:ok = Hubs.delete_hub(state.hub.id)
|
||||
|
||||
{:noreply, %{state | connected?: false}}
|
||||
end
|
||||
|
@ -143,61 +163,161 @@ defmodule Livebook.Hubs.TeamClient do
|
|||
{secret_key, sign_secret} = state.derived_keys
|
||||
{:ok, decrypted_value} = Teams.decrypt(value, secret_key, sign_secret)
|
||||
|
||||
%Livebook.Secrets.Secret{
|
||||
%Secrets.Secret{
|
||||
name: name,
|
||||
value: decrypted_value,
|
||||
hub_id: state.hub.id
|
||||
}
|
||||
end
|
||||
|
||||
defp put_file_system(state, file_system) do
|
||||
state = remove_file_system(state, file_system)
|
||||
%{state | file_systems: [file_system | state.file_systems]}
|
||||
end
|
||||
|
||||
defp remove_file_system(state, file_system) do
|
||||
%{
|
||||
state
|
||||
| file_systems:
|
||||
Enum.reject(state.file_systems, &(&1.external_id == file_system.external_id))
|
||||
}
|
||||
end
|
||||
|
||||
defp build_file_system(state, file_system) do
|
||||
{secret_key, sign_secret} = state.derived_keys
|
||||
{:ok, decrypted_value} = Teams.decrypt(file_system.value, secret_key, sign_secret)
|
||||
|
||||
dumped_data =
|
||||
Map.merge(Jason.decode!(decrypted_value), %{
|
||||
"external_id" => file_system.id,
|
||||
"prefix" => state.hub.id
|
||||
})
|
||||
|
||||
FileSystems.load(file_system.type, dumped_data)
|
||||
end
|
||||
|
||||
defp handle_event(:secret_created, %Secrets.Secret{} = secret, state) do
|
||||
Hubs.Broadcasts.secret_created(secret)
|
||||
|
||||
put_secret(state, secret)
|
||||
end
|
||||
|
||||
defp handle_event(:secret_created, secret_created, state) do
|
||||
secret = build_secret(state, secret_created)
|
||||
Broadcasts.secret_created(secret)
|
||||
handle_event(:secret_created, build_secret(state, secret_created), state)
|
||||
end
|
||||
|
||||
defp handle_event(:secret_updated, %Secrets.Secret{} = secret, state) do
|
||||
Hubs.Broadcasts.secret_updated(secret)
|
||||
|
||||
put_secret(state, secret)
|
||||
end
|
||||
|
||||
defp handle_event(:secret_updated, secret_updated, state) do
|
||||
secret = build_secret(state, secret_updated)
|
||||
Broadcasts.secret_updated(secret)
|
||||
|
||||
put_secret(state, secret)
|
||||
handle_event(:secret_updated, build_secret(state, secret_updated), state)
|
||||
end
|
||||
|
||||
defp handle_event(:secret_deleted, secret_deleted, state) do
|
||||
if secret = Enum.find(state.secrets, &(&1.name == secret_deleted.name)) do
|
||||
Broadcasts.secret_deleted(secret)
|
||||
Hubs.Broadcasts.secret_deleted(secret)
|
||||
remove_secret(state, secret)
|
||||
else
|
||||
state
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_event(:user_connected, %{secrets: secrets}, state) do
|
||||
created_secrets =
|
||||
Enum.reject(secrets, fn secret ->
|
||||
Enum.find(state.secrets, &(&1.name == secret.name and &1.value == secret.value))
|
||||
end)
|
||||
defp handle_event(:file_system_created, %{external_id: _} = file_system, state) do
|
||||
Hubs.Broadcasts.file_system_created(file_system)
|
||||
|
||||
deleted_secrets =
|
||||
Enum.reject(state.secrets, fn secret ->
|
||||
Enum.find(secrets, &(&1.name == secret.name))
|
||||
end)
|
||||
put_file_system(state, file_system)
|
||||
end
|
||||
|
||||
updated_secrets =
|
||||
Enum.filter(secrets, fn secret ->
|
||||
Enum.find(state.secrets, &(&1.name == secret.name and &1.value != secret.value))
|
||||
end)
|
||||
defp handle_event(:file_system_created, file_system_created, state) do
|
||||
handle_event(:file_system_created, build_file_system(state, file_system_created), state)
|
||||
end
|
||||
|
||||
secrets_by_topic = [
|
||||
secret_deleted: deleted_secrets,
|
||||
secret_created: created_secrets,
|
||||
secret_updated: updated_secrets
|
||||
]
|
||||
defp handle_event(:file_system_updated, %{external_id: _} = file_system, state) do
|
||||
Hubs.Broadcasts.file_system_updated(file_system)
|
||||
|
||||
for {topic, secrets} <- secrets_by_topic,
|
||||
secret <- secrets,
|
||||
put_file_system(state, file_system)
|
||||
end
|
||||
|
||||
defp handle_event(:file_system_updated, file_system_updated, state) do
|
||||
handle_event(:file_system_updated, build_file_system(state, file_system_updated), state)
|
||||
end
|
||||
|
||||
defp handle_event(:file_system_deleted, %{external_id: _} = file_system, state) do
|
||||
Hubs.Broadcasts.file_system_deleted(file_system)
|
||||
|
||||
remove_file_system(state, file_system)
|
||||
end
|
||||
|
||||
defp handle_event(:file_system_deleted, %{id: id}, state) do
|
||||
if file_system = Enum.find(state.file_systems, &(&1.external_id == id)) do
|
||||
handle_event(:file_system_deleted, file_system, state)
|
||||
else
|
||||
state
|
||||
end
|
||||
end
|
||||
|
||||
defp handle_event(:user_connected, user_connected, state) do
|
||||
state
|
||||
|> dispatch_secrets(user_connected)
|
||||
|> dispatch_file_systems(user_connected)
|
||||
end
|
||||
|
||||
defp dispatch_secrets(state, %{secrets: secrets}) do
|
||||
decrypted_secrets = Enum.map(secrets, &build_secret(state, &1))
|
||||
|
||||
{created, deleted, updated} =
|
||||
diff(
|
||||
state.secrets,
|
||||
decrypted_secrets,
|
||||
&(&1.name == &2.name and &1.value == &2.value),
|
||||
&(&1.name == &2.name),
|
||||
&(&1.name == &2.name and &1.value != &2.value)
|
||||
)
|
||||
|
||||
dispatch_events(state,
|
||||
secret_deleted: deleted,
|
||||
secret_created: created,
|
||||
secret_updated: updated
|
||||
)
|
||||
end
|
||||
|
||||
defp dispatch_file_systems(state, %{file_systems: file_systems}) do
|
||||
decrypted_file_systems = Enum.map(file_systems, &build_file_system(state, &1))
|
||||
|
||||
{created, deleted, updated} =
|
||||
diff(
|
||||
state.file_systems,
|
||||
decrypted_file_systems,
|
||||
&(&1.external_id == &2.external_id)
|
||||
)
|
||||
|
||||
dispatch_events(state,
|
||||
file_system_deleted: deleted,
|
||||
file_system_created: created,
|
||||
file_system_updated: updated
|
||||
)
|
||||
end
|
||||
|
||||
defp dispatch_file_systems(state, _), do: state
|
||||
|
||||
defp diff(old_list, new_list, fun, deleted_fun \\ nil, updated_fun \\ nil) do
|
||||
deleted_fun = unless deleted_fun, do: fun, else: deleted_fun
|
||||
updated_fun = unless updated_fun, do: fun, else: updated_fun
|
||||
|
||||
created = Enum.reject(new_list, fn item -> Enum.find(old_list, &fun.(&1, item)) end)
|
||||
deleted = Enum.reject(old_list, fn item -> Enum.find(new_list, &deleted_fun.(&1, item)) end)
|
||||
updated = Enum.filter(new_list, fn item -> Enum.find(old_list, &updated_fun.(&1, item)) end)
|
||||
|
||||
{created, deleted, updated}
|
||||
end
|
||||
|
||||
defp dispatch_events(state, events_by_topic) do
|
||||
for {topic, events} <- events_by_topic,
|
||||
event <- events,
|
||||
reduce: state,
|
||||
do: (acc -> handle_event(topic, secret, acc))
|
||||
do: (acc -> handle_event(topic, event, acc))
|
||||
end
|
||||
end
|
||||
|
|
|
@ -6,7 +6,7 @@ defmodule Livebook.Hubs.TeamClientTest do
|
|||
@moduletag :capture_log
|
||||
|
||||
setup do
|
||||
Livebook.Hubs.subscribe([:connection, :secrets])
|
||||
Livebook.Hubs.subscribe([:connection, :file_systems, :secrets])
|
||||
:ok
|
||||
end
|
||||
|
||||
|
@ -109,5 +109,365 @@ defmodule Livebook.Hubs.TeamClientTest do
|
|||
# receives `{:secret_deleted, secret_deleted}` event
|
||||
assert_receive {:secret_deleted, %{name: ^name, value: ^value}}
|
||||
end
|
||||
|
||||
test "receives the file_system_created event", %{user: user, node: node} do
|
||||
team = create_team_hub(user, node)
|
||||
id = team.id
|
||||
|
||||
assert_receive {:hub_connected, ^id}
|
||||
|
||||
file_system = build(:fs_s3, bucket_url: "https://file_system_created.s3.amazonaws.com")
|
||||
assert Livebook.Teams.create_file_system(team, file_system) == :ok
|
||||
|
||||
bucket_url = file_system.bucket_url
|
||||
|
||||
# receives `{:event, :file_system_created, file_system_created}` event
|
||||
assert_receive {:file_system_created, %{bucket_url: ^bucket_url}}
|
||||
end
|
||||
|
||||
test "receives the file_system_updated event", %{user: user, node: node} do
|
||||
team = create_team_hub(user, node)
|
||||
id = team.id
|
||||
|
||||
assert_receive {:hub_connected, ^id}
|
||||
|
||||
file_system =
|
||||
build(:fs_s3,
|
||||
bucket_url: "https://file_system_updated.s3.amazonaws.com",
|
||||
region: "us-east-1"
|
||||
)
|
||||
|
||||
assert Livebook.Teams.create_file_system(team, file_system) == :ok
|
||||
|
||||
bucket_url = file_system.bucket_url
|
||||
region = file_system.region
|
||||
|
||||
# receives `{:file_system_created, file_system_created}` event
|
||||
assert_receive {:file_system_created,
|
||||
%{external_id: id, bucket_url: ^bucket_url, region: ^region}}
|
||||
|
||||
# updates the file system
|
||||
update_file_system = %{file_system | region: "eu-central-1", external_id: id}
|
||||
assert Livebook.Teams.update_file_system(team, update_file_system) == :ok
|
||||
|
||||
new_region = update_file_system.region
|
||||
|
||||
# receives `{:file_system_updated, file_system_updated}` event
|
||||
assert_receive {:file_system_updated,
|
||||
%{external_id: ^id, bucket_url: ^bucket_url, region: ^new_region}}
|
||||
end
|
||||
|
||||
test "receives the file_system_deleted event", %{user: user, node: node} do
|
||||
team = create_team_hub(user, node)
|
||||
id = team.id
|
||||
|
||||
assert_receive {:hub_connected, ^id}
|
||||
|
||||
file_system = build(:fs_s3, bucket_url: "https://file_system_deleted.s3.amazonaws.com")
|
||||
assert Livebook.Teams.create_file_system(team, file_system) == :ok
|
||||
|
||||
bucket_url = file_system.bucket_url
|
||||
|
||||
# receives `{:file_system_created, file_system_created}` event
|
||||
assert_receive {:file_system_created, %{external_id: id, bucket_url: ^bucket_url}}
|
||||
|
||||
# deletes the file system
|
||||
delete_file_system = %{file_system | external_id: id}
|
||||
assert Livebook.Teams.delete_file_system(team, delete_file_system) == :ok
|
||||
|
||||
# receives `{:file_system_deleted, file_system_deleted}` event
|
||||
assert_receive {:file_system_deleted, %{external_id: ^id, bucket_url: ^bucket_url}}
|
||||
end
|
||||
end
|
||||
|
||||
describe "user connected event" do
|
||||
test "fills the secrets list", %{user: user, node: node} do
|
||||
team = build_team_hub(user, node)
|
||||
id = team.id
|
||||
|
||||
secret =
|
||||
build(:secret,
|
||||
name: "SECRET_CREATED",
|
||||
value: "an encrypted value",
|
||||
hub_id: id
|
||||
)
|
||||
|
||||
{secret_key, sign_secret} = Livebook.Teams.derive_keys(team.teams_key)
|
||||
secret_value = Livebook.Teams.encrypt(secret.value, secret_key, sign_secret)
|
||||
livebook_proto_secret = LivebookProto.Secret.new!(name: secret.name, value: secret_value)
|
||||
|
||||
user_connected =
|
||||
LivebookProto.UserConnected.new!(
|
||||
name: team.hub_name,
|
||||
secrets: [livebook_proto_secret],
|
||||
file_systems: []
|
||||
)
|
||||
|
||||
{:ok, pid} = TeamClient.start_link(team)
|
||||
assert_receive {:hub_connected, ^id}
|
||||
refute_receive {:secret_created, ^secret}
|
||||
|
||||
send(pid, {:event, :user_connected, user_connected})
|
||||
assert_receive {:secret_created, ^secret}
|
||||
assert secret in TeamClient.get_secrets(team.id)
|
||||
end
|
||||
|
||||
test "replaces the secret with updated value", %{user: user, node: node} do
|
||||
team = build_team_hub(user, node)
|
||||
id = team.id
|
||||
|
||||
secret =
|
||||
build(:secret,
|
||||
name: "SECRET_UPDATED",
|
||||
value: "an encrypted value",
|
||||
hub_id: id
|
||||
)
|
||||
|
||||
{secret_key, sign_secret} = Livebook.Teams.derive_keys(team.teams_key)
|
||||
secret_value = Livebook.Teams.encrypt(secret.value, secret_key, sign_secret)
|
||||
livebook_proto_secret = LivebookProto.Secret.new!(name: secret.name, value: secret_value)
|
||||
|
||||
user_connected =
|
||||
LivebookProto.UserConnected.new!(
|
||||
name: team.hub_name,
|
||||
secrets: [livebook_proto_secret],
|
||||
file_systems: []
|
||||
)
|
||||
|
||||
{:ok, pid} = TeamClient.start_link(team)
|
||||
assert_receive {:hub_connected, ^id}
|
||||
refute_receive {:secret_created, ^secret}
|
||||
|
||||
send(pid, {:event, :user_connected, user_connected})
|
||||
assert_receive {:secret_created, ^secret}
|
||||
|
||||
updated_secret = %{secret | value: "an updated value"}
|
||||
secret_value = Livebook.Teams.encrypt(updated_secret.value, secret_key, sign_secret)
|
||||
|
||||
updated_livebook_proto_secret =
|
||||
LivebookProto.Secret.new!(name: updated_secret.name, value: secret_value)
|
||||
|
||||
user_connected =
|
||||
LivebookProto.UserConnected.new!(
|
||||
name: team.hub_name,
|
||||
secrets: [updated_livebook_proto_secret],
|
||||
file_systems: []
|
||||
)
|
||||
|
||||
send(pid, {:event, :user_connected, user_connected})
|
||||
assert_receive {:secret_updated, ^updated_secret}
|
||||
|
||||
refute secret in TeamClient.get_secrets(team.id)
|
||||
assert updated_secret in TeamClient.get_secrets(team.id)
|
||||
end
|
||||
|
||||
test "deletes the secret", %{user: user, node: node} do
|
||||
team = build_team_hub(user, node)
|
||||
id = team.id
|
||||
|
||||
secret =
|
||||
build(:secret,
|
||||
name: "SECRET_UPDATED",
|
||||
value: "an encrypted value",
|
||||
hub_id: id
|
||||
)
|
||||
|
||||
{secret_key, sign_secret} = Livebook.Teams.derive_keys(team.teams_key)
|
||||
secret_value = Livebook.Teams.encrypt(secret.value, secret_key, sign_secret)
|
||||
livebook_proto_secret = LivebookProto.Secret.new!(name: secret.name, value: secret_value)
|
||||
|
||||
user_connected =
|
||||
LivebookProto.UserConnected.new!(
|
||||
name: team.hub_name,
|
||||
secrets: [livebook_proto_secret],
|
||||
file_systems: []
|
||||
)
|
||||
|
||||
{:ok, pid} = TeamClient.start_link(team)
|
||||
assert_receive {:hub_connected, ^id}
|
||||
refute_receive {:secret_created, ^secret}
|
||||
|
||||
send(pid, {:event, :user_connected, user_connected})
|
||||
assert_receive {:secret_created, ^secret}
|
||||
|
||||
user_connected =
|
||||
LivebookProto.UserConnected.new!(
|
||||
name: team.hub_name,
|
||||
secrets: [],
|
||||
file_systems: []
|
||||
)
|
||||
|
||||
send(pid, {:event, :user_connected, user_connected})
|
||||
assert_receive {:secret_deleted, ^secret}
|
||||
|
||||
refute secret in TeamClient.get_secrets(team.id)
|
||||
end
|
||||
|
||||
test "fills the file systems list", %{user: user, node: node} do
|
||||
team = build_team_hub(user, node)
|
||||
id = team.id
|
||||
|
||||
bucket_url = "https://mybucket.s3.amazonaws.com"
|
||||
hash = :crypto.hash(:sha256, bucket_url)
|
||||
fs_id = "#{id}-s3-#{Base.url_encode64(hash, padding: false)}"
|
||||
|
||||
file_system = build(:fs_s3, id: fs_id, bucket_url: bucket_url, external_id: "123456")
|
||||
|
||||
type = Livebook.FileSystems.type(file_system)
|
||||
%{name: name} = Livebook.FileSystem.external_metadata(file_system)
|
||||
attrs = Livebook.FileSystem.dump(file_system)
|
||||
credentials = Jason.encode!(attrs)
|
||||
|
||||
{secret_key, sign_secret} = Livebook.Teams.derive_keys(team.teams_key)
|
||||
value = Livebook.Teams.encrypt(credentials, secret_key, sign_secret)
|
||||
|
||||
livebook_proto_file_system =
|
||||
LivebookProto.FileSystem.new!(
|
||||
id: file_system.external_id,
|
||||
name: name,
|
||||
type: to_string(type),
|
||||
value: value
|
||||
)
|
||||
|
||||
user_connected =
|
||||
LivebookProto.UserConnected.new!(
|
||||
name: team.hub_name,
|
||||
secrets: [],
|
||||
file_systems: [livebook_proto_file_system]
|
||||
)
|
||||
|
||||
{:ok, pid} = TeamClient.start_link(team)
|
||||
assert_receive {:hub_connected, ^id}
|
||||
refute_receive {:file_system_created, ^file_system}
|
||||
|
||||
send(pid, {:event, :user_connected, user_connected})
|
||||
assert_receive {:file_system_created, ^file_system}
|
||||
assert file_system in TeamClient.get_file_systems(team.id)
|
||||
end
|
||||
|
||||
test "replaces the file system with updated value", %{user: user, node: node} do
|
||||
team = build_team_hub(user, node)
|
||||
id = team.id
|
||||
|
||||
bucket_url = "https://update_fs_994641.s3.amazonaws.com"
|
||||
hash = :crypto.hash(:sha256, bucket_url)
|
||||
fs_id = "#{id}-s3-#{Base.url_encode64(hash, padding: false)}"
|
||||
|
||||
file_system = build(:fs_s3, id: fs_id, bucket_url: bucket_url, external_id: "994641")
|
||||
|
||||
type = Livebook.FileSystems.type(file_system)
|
||||
%{name: name} = Livebook.FileSystem.external_metadata(file_system)
|
||||
attrs = Livebook.FileSystem.dump(file_system)
|
||||
credentials = Jason.encode!(attrs)
|
||||
|
||||
{secret_key, sign_secret} = Livebook.Teams.derive_keys(team.teams_key)
|
||||
value = Livebook.Teams.encrypt(credentials, secret_key, sign_secret)
|
||||
|
||||
livebook_proto_file_system =
|
||||
LivebookProto.FileSystem.new!(
|
||||
id: file_system.external_id,
|
||||
name: name,
|
||||
type: to_string(type),
|
||||
value: value
|
||||
)
|
||||
|
||||
user_connected =
|
||||
LivebookProto.UserConnected.new!(
|
||||
name: team.hub_name,
|
||||
secrets: [],
|
||||
file_systems: [livebook_proto_file_system]
|
||||
)
|
||||
|
||||
{:ok, pid} = TeamClient.start_link(team)
|
||||
assert_receive {:hub_connected, ^id}
|
||||
refute_receive {:file_system_created, ^file_system}
|
||||
|
||||
send(pid, {:event, :user_connected, user_connected})
|
||||
assert_receive {:file_system_created, ^file_system}
|
||||
|
||||
updated_file_system = %{
|
||||
file_system
|
||||
| id: "#{id}-s3-ATC52Lo7d-bS21OLjPQ8KFBPJN8ku4hCn2nic2jTGeI",
|
||||
bucket_url: "https://updated_name.s3.amazonaws.com"
|
||||
}
|
||||
|
||||
updated_attrs = Livebook.FileSystem.dump(updated_file_system)
|
||||
updated_credentials = Jason.encode!(updated_attrs)
|
||||
updated_value = Livebook.Teams.encrypt(updated_credentials, secret_key, sign_secret)
|
||||
|
||||
updated_livebook_proto_file_system =
|
||||
LivebookProto.FileSystem.new!(
|
||||
id: updated_file_system.external_id,
|
||||
name: updated_file_system.bucket_url,
|
||||
type: to_string(type),
|
||||
value: updated_value
|
||||
)
|
||||
|
||||
user_connected =
|
||||
LivebookProto.UserConnected.new!(
|
||||
name: team.hub_name,
|
||||
secrets: [],
|
||||
file_systems: [updated_livebook_proto_file_system]
|
||||
)
|
||||
|
||||
send(pid, {:event, :user_connected, user_connected})
|
||||
assert_receive {:file_system_updated, ^updated_file_system}
|
||||
|
||||
refute file_system in TeamClient.get_file_systems(team.id)
|
||||
assert updated_file_system in TeamClient.get_file_systems(team.id)
|
||||
end
|
||||
|
||||
test "deletes the file system", %{user: user, node: node} do
|
||||
team = build_team_hub(user, node)
|
||||
id = team.id
|
||||
|
||||
bucket_url = "https://delete_fs_45465641.s3.amazonaws.com"
|
||||
hash = :crypto.hash(:sha256, bucket_url)
|
||||
fs_id = "#{id}-s3-#{Base.url_encode64(hash, padding: false)}"
|
||||
|
||||
file_system = build(:fs_s3, id: fs_id, bucket_url: bucket_url, external_id: "45465641")
|
||||
|
||||
type = Livebook.FileSystems.type(file_system)
|
||||
%{name: name} = Livebook.FileSystem.external_metadata(file_system)
|
||||
attrs = Livebook.FileSystem.dump(file_system)
|
||||
credentials = Jason.encode!(attrs)
|
||||
|
||||
{secret_key, sign_secret} = Livebook.Teams.derive_keys(team.teams_key)
|
||||
value = Livebook.Teams.encrypt(credentials, secret_key, sign_secret)
|
||||
|
||||
livebook_proto_file_system =
|
||||
LivebookProto.FileSystem.new!(
|
||||
id: file_system.external_id,
|
||||
name: name,
|
||||
type: to_string(type),
|
||||
value: value
|
||||
)
|
||||
|
||||
user_connected =
|
||||
LivebookProto.UserConnected.new!(
|
||||
name: team.hub_name,
|
||||
secrets: [],
|
||||
file_systems: [livebook_proto_file_system]
|
||||
)
|
||||
|
||||
{:ok, pid} = TeamClient.start_link(team)
|
||||
assert_receive {:hub_connected, ^id}
|
||||
refute_receive {:file_system_created, ^file_system}
|
||||
|
||||
send(pid, {:event, :user_connected, user_connected})
|
||||
assert_receive {:file_system_created, ^file_system}
|
||||
|
||||
user_connected =
|
||||
LivebookProto.UserConnected.new!(
|
||||
name: team.hub_name,
|
||||
secrets: [],
|
||||
file_systems: []
|
||||
)
|
||||
|
||||
send(pid, {:event, :user_connected, user_connected})
|
||||
assert_receive {:file_system_deleted, ^file_system}
|
||||
|
||||
refute file_system in TeamClient.get_file_systems(team.id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
defmodule Livebook.Teams.ConnectionTest do
|
||||
alias Livebook.FileSystem
|
||||
use Livebook.TeamsIntegrationCase, async: true
|
||||
|
||||
@moduletag :capture_log
|
||||
|
@ -50,5 +51,26 @@ defmodule Livebook.Teams.ConnectionTest do
|
|||
assert secret_created.name == secret.name
|
||||
refute secret_created.value == secret.value
|
||||
end
|
||||
|
||||
test "receives the file_system_created event", %{user: user, node: node} do
|
||||
{hub, headers} = build_team_headers(user, node)
|
||||
|
||||
assert {:ok, _conn} = Connection.start_link(self(), headers)
|
||||
assert_receive :connected
|
||||
|
||||
# creates a new file system
|
||||
file_system = build(:fs_s3, bucket_url: "https://file_system_created.s3.amazonaws.com")
|
||||
assert Livebook.Teams.create_file_system(hub, file_system) == :ok
|
||||
type = Livebook.FileSystems.type(file_system)
|
||||
%{name: name} = FileSystem.external_metadata(file_system)
|
||||
|
||||
# receives `{:event, :file_system_created, file_system_created}` event
|
||||
# without decrypting the value
|
||||
assert_receive {:event, :file_system_created, file_system_created}
|
||||
assert file_system_created.name == name
|
||||
assert file_system_created.type == to_string(type)
|
||||
refute file_system_created.value == FileSystem.dump(file_system)
|
||||
assert is_binary(file_system_created.value)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue