From 7f1feec7d99bd11b29fb24c5779c6b6e62504358 Mon Sep 17 00:00:00 2001 From: Alexandre de Souza Date: Wed, 13 Sep 2023 10:32:33 -0300 Subject: [PATCH] Implement File Systems management directly from Hub and handle their events (#2201) --- lib/livebook/file_system/s3.ex | 16 +- lib/livebook/hubs.ex | 42 ++ lib/livebook/hubs/broadcasts.ex | 28 ++ lib/livebook/hubs/personal.ex | 69 ++++ lib/livebook/hubs/provider.ex | 31 ++ lib/livebook/hubs/team.ex | 10 + lib/livebook/hubs/team_client.ex | 206 +++++++--- test/livebook_teams/hubs/team_client_test.exs | 362 +++++++++++++++++- test/livebook_teams/teams/connection_test.exs | 22 ++ 9 files changed, 738 insertions(+), 48 deletions(-) diff --git a/lib/livebook/file_system/s3.ex b/lib/livebook/file_system/s3.ex index bc2bf6482..14504fdd9 100644 --- a/lib/livebook/file_system/s3.ex +++ b/lib/livebook/file_system/s3.ex @@ -33,16 +33,22 @@ defmodule Livebook.FileSystem.S3 do * `:external_id` - the external id from Teams. + * `:prefix` - the id prefix. + """ @spec new(String.t(), String.t(), String.t(), keyword()) :: t() def new(bucket_url, access_key_id, secret_access_key, opts \\ []) do - opts = Keyword.validate!(opts, [:region, :external_id]) + opts = Keyword.validate!(opts, [:region, :external_id, :prefix]) bucket_url = String.trim_trailing(bucket_url, "/") region = opts[:region] || region_from_uri(bucket_url) hash = :crypto.hash(:sha256, bucket_url) |> Base.url_encode64(padding: false) - id = "s3-#{hash}" + + id = + if prefix = opts[:prefix], + do: "#{prefix}-s3-#{hash}", + else: "s3-#{hash}" %__MODULE__{ id: id, @@ -371,14 +377,16 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do external_id: fields["external_id"], region: fields["region"], access_key_id: fields["access_key_id"], - secret_access_key: fields["secret_access_key"] + secret_access_key: fields["secret_access_key"], + prefix: fields["prefix"] }) end def load(_file_system, fields) do S3.new(fields.bucket_url, fields.access_key_id, fields.secret_access_key, region: fields[:region], - external_id: fields[:external_id] + external_id: fields[:external_id], + prefix: fields[:prefix] ) end diff --git a/lib/livebook/hubs.ex b/lib/livebook/hubs.ex index 9281f3f7a..f5ac3a112 100644 --- a/lib/livebook/hubs.ex +++ b/lib/livebook/hubs.ex @@ -1,6 +1,7 @@ defmodule Livebook.Hubs do @moduledoc false + alias Livebook.FileSystem alias Livebook.Storage alias Livebook.Hubs.{Broadcasts, Metadata, Personal, Provider, Team} alias Livebook.Secrets.Secret @@ -294,4 +295,45 @@ defmodule Livebook.Hubs do def verify_notebook_stamp(hub, notebook_source, stamp) do Provider.verify_notebook_stamp(hub, notebook_source, stamp) end + + @doc """ + Gets a list of file systems for given hub. + """ + @spec get_file_systems(Provider.t()) :: list(FileSystem.t()) + def get_file_systems(hub) do + hub_file_systems = Provider.get_file_systems(hub) + local_file_system = Livebook.Config.local_file_system() + + [local_file_system | Enum.sort_by(hub_file_systems, & &1.id)] + end + + @doc """ + Creates a file system for given hub. + """ + @spec create_file_system(Provider.t(), FileSystem.t()) :: + :ok + | {:error, Ecto.Changeset.t()} + | {:transport_error, String.t()} + def create_file_system(hub, file_system) do + Provider.create_file_system(hub, file_system) + end + + @doc """ + Updates a file system for given hub. + """ + @spec update_file_system(Provider.t(), FileSystem.t()) :: + :ok + | {:error, Ecto.Changeset.t()} + | {:transport_error, String.t()} + def update_file_system(hub, file_system) do + Provider.update_file_system(hub, file_system) + end + + @doc """ + Deletes a file system for given hub. + """ + @spec delete_file_system(Provider.t(), FileSystem.t()) :: :ok | {:transport_error, String.t()} + def delete_file_system(hub, file_system) do + Provider.delete_file_system(hub, file_system) + end end diff --git a/lib/livebook/hubs/broadcasts.ex b/lib/livebook/hubs/broadcasts.ex index 57b06e637..78b62e8fc 100644 --- a/lib/livebook/hubs/broadcasts.ex +++ b/lib/livebook/hubs/broadcasts.ex @@ -1,6 +1,7 @@ defmodule Livebook.Hubs.Broadcasts do @moduledoc false + alias Livebook.FileSystem alias Livebook.Secrets.Secret @type broadcast :: :ok | {:error, term()} @@ -8,6 +9,7 @@ defmodule Livebook.Hubs.Broadcasts do @crud_topic "hubs:crud" @connection_topic "hubs:connection" @secrets_topic "hubs:secrets" + @file_systems_topic "hubs:file_systems" @doc """ Broadcasts under `#{@crud_topic}` topic when hubs changed. @@ -65,6 +67,32 @@ defmodule Livebook.Hubs.Broadcasts do broadcast(@secrets_topic, {:secret_deleted, secret}) end + @allowed_file_systems [FileSystem.S3] + + @doc """ + Broadcasts under `#{@file_systems_topic}` topic when hub received a new file system. + """ + @spec file_system_created(FileSystem.t()) :: broadcast() + def file_system_created(%struct{} = file_system) when struct in @allowed_file_systems do + broadcast(@file_systems_topic, {:file_system_created, file_system}) + end + + @doc """ + Broadcasts under `#{@file_systems_topic}` topic when hub received an updated file system. + """ + @spec file_system_updated(FileSystem.t()) :: broadcast() + def file_system_updated(%struct{} = file_system) when struct in @allowed_file_systems do + broadcast(@file_systems_topic, {:file_system_updated, file_system}) + end + + @doc """ + Broadcasts under `#{@file_systems_topic}` topic when hub received a deleted file system. + """ + @spec file_system_deleted(FileSystem.t()) :: broadcast() + def file_system_deleted(%struct{} = file_system) when struct in @allowed_file_systems do + broadcast(@file_systems_topic, {:file_system_deleted, file_system}) + end + defp broadcast(topic, message) do Phoenix.PubSub.broadcast(Livebook.PubSub, topic, message) end diff --git a/lib/livebook/hubs/personal.ex b/lib/livebook/hubs/personal.ex index 4eb9256cf..11c03d25d 100644 --- a/lib/livebook/hubs/personal.ex +++ b/lib/livebook/hubs/personal.ex @@ -4,6 +4,8 @@ defmodule Livebook.Hubs.Personal do use Ecto.Schema import Ecto.Changeset + alias Livebook.FileSystem + alias Livebook.FileSystems alias Livebook.Hubs alias Livebook.Storage alias Livebook.Secrets.Secret @@ -11,6 +13,8 @@ defmodule Livebook.Hubs.Personal do @secrets_namespace :hub_secrets @secret_key_size 64 + @file_systems_namespace :file_systems + @type t :: %__MODULE__{ id: String.t() | nil, hub_name: String.t() | nil, @@ -131,6 +135,52 @@ defmodule Livebook.Hubs.Personal do def generate_secret_key() do Base.url_encode64(:crypto.strong_rand_bytes(@secret_key_size), padding: false) end + + @doc """ + Get the file systems list from storage. + """ + @spec get_file_systems() :: list(FileSystem.t()) + def get_file_systems() do + Storage.all(@file_systems_namespace) + |> Enum.sort_by(& &1.bucket_url) + |> Enum.map(&to_file_system/1) + end + + @doc """ + Gets a file system from storage. + + Raises `RuntimeError` if the secret doesn't exist. + """ + @spec fetch_file_system!(String.t()) :: FileSystem.t() + def fetch_file_system!(id) do + Storage.fetch!(@file_systems_namespace, id) |> to_file_system() + end + + @doc """ + Saves a new file system to the configured ones. + """ + @spec save_file_system(FileSystem.t()) :: FileSystem.t() + def save_file_system(file_system) do + attributes = FileSystem.dump(file_system) + type = FileSystems.type(file_system) + storage_attributes = Map.put(attributes, :type, type) + + :ok = Storage.insert(@file_systems_namespace, file_system.id, Map.to_list(storage_attributes)) + + file_system + end + + @doc """ + Removes the given file system from the configured ones. + """ + @spec remove_file_system(FileSystem.id()) :: :ok + def remove_file_system(id) do + Storage.delete(@file_systems_namespace, id) + end + + defp to_file_system(fields) do + FileSystems.load(fields.type, fields) + end end defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Personal do @@ -205,4 +255,23 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Personal do def dump(personal) do Map.from_struct(personal) end + + def get_file_systems(_personal) do + Personal.get_file_systems() + end + + def create_file_system(_personal, file_system) do + Personal.save_file_system(file_system) + :ok = Broadcasts.file_system_created(file_system) + end + + def update_file_system(_personal, file_system) do + Personal.save_file_system(file_system) + :ok = Broadcasts.file_system_updated(file_system) + end + + def delete_file_system(_personal, file_system) do + :ok = Personal.remove_file_system(file_system.id) + :ok = Broadcasts.file_system_deleted(file_system) + end end diff --git a/lib/livebook/hubs/provider.ex b/lib/livebook/hubs/provider.ex index b7cf27000..5f56de4f1 100644 --- a/lib/livebook/hubs/provider.ex +++ b/lib/livebook/hubs/provider.ex @@ -1,6 +1,7 @@ defprotocol Livebook.Hubs.Provider do @moduledoc false + alias Livebook.FileSystem alias Livebook.Secrets.Secret @typedoc """ @@ -109,4 +110,34 @@ defprotocol Livebook.Hubs.Provider do """ @spec dump(t()) :: map() def dump(hub) + + @doc """ + Gets the file systems of given hub. + """ + @spec get_file_systems(t()) :: list(FileSystem.t()) + def get_file_systems(hub) + + @doc """ + Creates a file system of the given hub. + """ + @spec create_file_system(t(), FileSystem.t()) :: + :ok + | {:error, Ecto.Changeset.t()} + | {:transport_error, String.t()} + def create_file_system(hub, file_system) + + @doc """ + Updates a file system of the given hub. + """ + @spec update_file_system(t(), FileSystem.t()) :: + :ok + | {:error, Ecto.Changeset.t()} + | {:transport_error, String.t()} + def update_file_system(hub, file_system) + + @doc """ + Deletes a file system of the given hub. + """ + @spec delete_file_system(t(), FileSystem.t()) :: :ok | {:transport_error, String.t()} + def delete_file_system(hub, file_system) end diff --git a/lib/livebook/hubs/team.ex b/lib/livebook/hubs/team.ex index 0e99ecbf6..46a936e90 100644 --- a/lib/livebook/hubs/team.ex +++ b/lib/livebook/hubs/team.ex @@ -185,4 +185,14 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Team do |> Map.from_struct() |> Map.delete(:offline) end + + def get_file_systems(team) do + TeamClient.get_file_systems(team.id) + end + + def create_file_system(team, file_system), do: Teams.create_file_system(team, file_system) + + def update_file_system(team, file_system), do: Teams.update_file_system(team, file_system) + + def delete_file_system(team, file_system), do: Teams.delete_file_system(team, file_system) end diff --git a/lib/livebook/hubs/team_client.ex b/lib/livebook/hubs/team_client.ex index 03250e446..526262a3e 100644 --- a/lib/livebook/hubs/team_client.ex +++ b/lib/livebook/hubs/team_client.ex @@ -3,23 +3,31 @@ defmodule Livebook.Hubs.TeamClient do use GenServer require Logger - alias Livebook.Hubs.Broadcasts - alias Livebook.Hubs.Team + alias Livebook.FileSystem + alias Livebook.FileSystems + alias Livebook.Hubs + alias Livebook.Secrets alias Livebook.Teams - alias Livebook.Teams.Connection @registry Livebook.HubsRegistry @supervisor Livebook.HubsSupervisor - defstruct [:hub, :connection_error, :derived_keys, connected?: false, secrets: []] + defstruct [ + :hub, + :connection_error, + :derived_keys, + connected?: false, + secrets: [], + file_systems: [] + ] @type registry_name :: {:via, Registry, {Livebook.HubsRegistry, String.t()}} @doc """ Connects the Team client with WebSocket server. """ - @spec start_link(Team.t()) :: GenServer.on_start() - def start_link(%Team{} = team) do + @spec start_link(Hubs.Team.t()) :: GenServer.on_start() + def start_link(%Hubs.Team{} = team) do GenServer.start_link(__MODULE__, team, name: registry_name(team.id)) end @@ -38,11 +46,19 @@ defmodule Livebook.Hubs.TeamClient do @doc """ Returns a list of cached secrets. """ - @spec get_secrets(String.t()) :: list(Secret.t()) + @spec get_secrets(String.t()) :: list(Secrets.Secret.t()) def get_secrets(id) do GenServer.call(registry_name(id), :get_secrets) end + @doc """ + Returns a list of cached file systems. + """ + @spec get_file_systems(String.t()) :: list(FileSystem.t()) + def get_file_systems(id) do + GenServer.call(registry_name(id), :get_file_systems) + end + @doc """ Returns the latest error from connection. """ @@ -66,7 +82,7 @@ defmodule Livebook.Hubs.TeamClient do ## GenServer callbacks @impl true - def init(%Team{offline: nil} = team) do + def init(%Hubs.Team{offline: nil} = team) do derived_keys = Teams.derive_keys(team.teams_key) headers = [ @@ -77,11 +93,11 @@ defmodule Livebook.Hubs.TeamClient do {"x-session-token", team.session_token} ] - {:ok, _pid} = Connection.start_link(self(), headers) + {:ok, _pid} = Teams.Connection.start_link(self(), headers) {:ok, %__MODULE__{hub: team, derived_keys: derived_keys}} end - def init(%Team{} = team) do + def init(%Hubs.Team{} = team) do derived_keys = Teams.derive_keys(team.teams_key) {:ok, %__MODULE__{hub: team, secrets: team.offline.secrets, derived_keys: derived_keys}} @@ -100,20 +116,24 @@ defmodule Livebook.Hubs.TeamClient do {:reply, state.secrets, state} end + def handle_call(:get_file_systems, _caller, state) do + {:reply, state.file_systems, state} + end + @impl true def handle_info(:connected, state) do - Broadcasts.hub_connected(state.hub.id) + Hubs.Broadcasts.hub_connected(state.hub.id) {:noreply, %{state | connected?: true, connection_error: nil}} end def handle_info({:connection_error, reason}, state) do - Broadcasts.hub_connection_failed(state.hub.id, reason) + Hubs.Broadcasts.hub_connection_failed(state.hub.id, reason) {:noreply, %{state | connected?: false, connection_error: reason}} end def handle_info({:server_error, reason}, state) do - Broadcasts.hub_server_error(state.hub.id, "#{state.hub.hub_name}: #{reason}") - :ok = Livebook.Hubs.delete_hub(state.hub.id) + Hubs.Broadcasts.hub_server_error(state.hub.id, "#{state.hub.hub_name}: #{reason}") + :ok = Hubs.delete_hub(state.hub.id) {:noreply, %{state | connected?: false}} end @@ -143,61 +163,161 @@ defmodule Livebook.Hubs.TeamClient do {secret_key, sign_secret} = state.derived_keys {:ok, decrypted_value} = Teams.decrypt(value, secret_key, sign_secret) - %Livebook.Secrets.Secret{ + %Secrets.Secret{ name: name, value: decrypted_value, hub_id: state.hub.id } end + defp put_file_system(state, file_system) do + state = remove_file_system(state, file_system) + %{state | file_systems: [file_system | state.file_systems]} + end + + defp remove_file_system(state, file_system) do + %{ + state + | file_systems: + Enum.reject(state.file_systems, &(&1.external_id == file_system.external_id)) + } + end + + defp build_file_system(state, file_system) do + {secret_key, sign_secret} = state.derived_keys + {:ok, decrypted_value} = Teams.decrypt(file_system.value, secret_key, sign_secret) + + dumped_data = + Map.merge(Jason.decode!(decrypted_value), %{ + "external_id" => file_system.id, + "prefix" => state.hub.id + }) + + FileSystems.load(file_system.type, dumped_data) + end + + defp handle_event(:secret_created, %Secrets.Secret{} = secret, state) do + Hubs.Broadcasts.secret_created(secret) + + put_secret(state, secret) + end + defp handle_event(:secret_created, secret_created, state) do - secret = build_secret(state, secret_created) - Broadcasts.secret_created(secret) + handle_event(:secret_created, build_secret(state, secret_created), state) + end + + defp handle_event(:secret_updated, %Secrets.Secret{} = secret, state) do + Hubs.Broadcasts.secret_updated(secret) put_secret(state, secret) end defp handle_event(:secret_updated, secret_updated, state) do - secret = build_secret(state, secret_updated) - Broadcasts.secret_updated(secret) - - put_secret(state, secret) + handle_event(:secret_updated, build_secret(state, secret_updated), state) end defp handle_event(:secret_deleted, secret_deleted, state) do if secret = Enum.find(state.secrets, &(&1.name == secret_deleted.name)) do - Broadcasts.secret_deleted(secret) + Hubs.Broadcasts.secret_deleted(secret) remove_secret(state, secret) else state end end - defp handle_event(:user_connected, %{secrets: secrets}, state) do - created_secrets = - Enum.reject(secrets, fn secret -> - Enum.find(state.secrets, &(&1.name == secret.name and &1.value == secret.value)) - end) + defp handle_event(:file_system_created, %{external_id: _} = file_system, state) do + Hubs.Broadcasts.file_system_created(file_system) - deleted_secrets = - Enum.reject(state.secrets, fn secret -> - Enum.find(secrets, &(&1.name == secret.name)) - end) + put_file_system(state, file_system) + end - updated_secrets = - Enum.filter(secrets, fn secret -> - Enum.find(state.secrets, &(&1.name == secret.name and &1.value != secret.value)) - end) + defp handle_event(:file_system_created, file_system_created, state) do + handle_event(:file_system_created, build_file_system(state, file_system_created), state) + end - secrets_by_topic = [ - secret_deleted: deleted_secrets, - secret_created: created_secrets, - secret_updated: updated_secrets - ] + defp handle_event(:file_system_updated, %{external_id: _} = file_system, state) do + Hubs.Broadcasts.file_system_updated(file_system) - for {topic, secrets} <- secrets_by_topic, - secret <- secrets, + put_file_system(state, file_system) + end + + defp handle_event(:file_system_updated, file_system_updated, state) do + handle_event(:file_system_updated, build_file_system(state, file_system_updated), state) + end + + defp handle_event(:file_system_deleted, %{external_id: _} = file_system, state) do + Hubs.Broadcasts.file_system_deleted(file_system) + + remove_file_system(state, file_system) + end + + defp handle_event(:file_system_deleted, %{id: id}, state) do + if file_system = Enum.find(state.file_systems, &(&1.external_id == id)) do + handle_event(:file_system_deleted, file_system, state) + else + state + end + end + + defp handle_event(:user_connected, user_connected, state) do + state + |> dispatch_secrets(user_connected) + |> dispatch_file_systems(user_connected) + end + + defp dispatch_secrets(state, %{secrets: secrets}) do + decrypted_secrets = Enum.map(secrets, &build_secret(state, &1)) + + {created, deleted, updated} = + diff( + state.secrets, + decrypted_secrets, + &(&1.name == &2.name and &1.value == &2.value), + &(&1.name == &2.name), + &(&1.name == &2.name and &1.value != &2.value) + ) + + dispatch_events(state, + secret_deleted: deleted, + secret_created: created, + secret_updated: updated + ) + end + + defp dispatch_file_systems(state, %{file_systems: file_systems}) do + decrypted_file_systems = Enum.map(file_systems, &build_file_system(state, &1)) + + {created, deleted, updated} = + diff( + state.file_systems, + decrypted_file_systems, + &(&1.external_id == &2.external_id) + ) + + dispatch_events(state, + file_system_deleted: deleted, + file_system_created: created, + file_system_updated: updated + ) + end + + defp dispatch_file_systems(state, _), do: state + + defp diff(old_list, new_list, fun, deleted_fun \\ nil, updated_fun \\ nil) do + deleted_fun = unless deleted_fun, do: fun, else: deleted_fun + updated_fun = unless updated_fun, do: fun, else: updated_fun + + created = Enum.reject(new_list, fn item -> Enum.find(old_list, &fun.(&1, item)) end) + deleted = Enum.reject(old_list, fn item -> Enum.find(new_list, &deleted_fun.(&1, item)) end) + updated = Enum.filter(new_list, fn item -> Enum.find(old_list, &updated_fun.(&1, item)) end) + + {created, deleted, updated} + end + + defp dispatch_events(state, events_by_topic) do + for {topic, events} <- events_by_topic, + event <- events, reduce: state, - do: (acc -> handle_event(topic, secret, acc)) + do: (acc -> handle_event(topic, event, acc)) end end diff --git a/test/livebook_teams/hubs/team_client_test.exs b/test/livebook_teams/hubs/team_client_test.exs index 130fe01e7..6f4464558 100644 --- a/test/livebook_teams/hubs/team_client_test.exs +++ b/test/livebook_teams/hubs/team_client_test.exs @@ -6,7 +6,7 @@ defmodule Livebook.Hubs.TeamClientTest do @moduletag :capture_log setup do - Livebook.Hubs.subscribe([:connection, :secrets]) + Livebook.Hubs.subscribe([:connection, :file_systems, :secrets]) :ok end @@ -109,5 +109,365 @@ defmodule Livebook.Hubs.TeamClientTest do # receives `{:secret_deleted, secret_deleted}` event assert_receive {:secret_deleted, %{name: ^name, value: ^value}} end + + test "receives the file_system_created event", %{user: user, node: node} do + team = create_team_hub(user, node) + id = team.id + + assert_receive {:hub_connected, ^id} + + file_system = build(:fs_s3, bucket_url: "https://file_system_created.s3.amazonaws.com") + assert Livebook.Teams.create_file_system(team, file_system) == :ok + + bucket_url = file_system.bucket_url + + # receives `{:event, :file_system_created, file_system_created}` event + assert_receive {:file_system_created, %{bucket_url: ^bucket_url}} + end + + test "receives the file_system_updated event", %{user: user, node: node} do + team = create_team_hub(user, node) + id = team.id + + assert_receive {:hub_connected, ^id} + + file_system = + build(:fs_s3, + bucket_url: "https://file_system_updated.s3.amazonaws.com", + region: "us-east-1" + ) + + assert Livebook.Teams.create_file_system(team, file_system) == :ok + + bucket_url = file_system.bucket_url + region = file_system.region + + # receives `{:file_system_created, file_system_created}` event + assert_receive {:file_system_created, + %{external_id: id, bucket_url: ^bucket_url, region: ^region}} + + # updates the file system + update_file_system = %{file_system | region: "eu-central-1", external_id: id} + assert Livebook.Teams.update_file_system(team, update_file_system) == :ok + + new_region = update_file_system.region + + # receives `{:file_system_updated, file_system_updated}` event + assert_receive {:file_system_updated, + %{external_id: ^id, bucket_url: ^bucket_url, region: ^new_region}} + end + + test "receives the file_system_deleted event", %{user: user, node: node} do + team = create_team_hub(user, node) + id = team.id + + assert_receive {:hub_connected, ^id} + + file_system = build(:fs_s3, bucket_url: "https://file_system_deleted.s3.amazonaws.com") + assert Livebook.Teams.create_file_system(team, file_system) == :ok + + bucket_url = file_system.bucket_url + + # receives `{:file_system_created, file_system_created}` event + assert_receive {:file_system_created, %{external_id: id, bucket_url: ^bucket_url}} + + # deletes the file system + delete_file_system = %{file_system | external_id: id} + assert Livebook.Teams.delete_file_system(team, delete_file_system) == :ok + + # receives `{:file_system_deleted, file_system_deleted}` event + assert_receive {:file_system_deleted, %{external_id: ^id, bucket_url: ^bucket_url}} + end + end + + describe "user connected event" do + test "fills the secrets list", %{user: user, node: node} do + team = build_team_hub(user, node) + id = team.id + + secret = + build(:secret, + name: "SECRET_CREATED", + value: "an encrypted value", + hub_id: id + ) + + {secret_key, sign_secret} = Livebook.Teams.derive_keys(team.teams_key) + secret_value = Livebook.Teams.encrypt(secret.value, secret_key, sign_secret) + livebook_proto_secret = LivebookProto.Secret.new!(name: secret.name, value: secret_value) + + user_connected = + LivebookProto.UserConnected.new!( + name: team.hub_name, + secrets: [livebook_proto_secret], + file_systems: [] + ) + + {:ok, pid} = TeamClient.start_link(team) + assert_receive {:hub_connected, ^id} + refute_receive {:secret_created, ^secret} + + send(pid, {:event, :user_connected, user_connected}) + assert_receive {:secret_created, ^secret} + assert secret in TeamClient.get_secrets(team.id) + end + + test "replaces the secret with updated value", %{user: user, node: node} do + team = build_team_hub(user, node) + id = team.id + + secret = + build(:secret, + name: "SECRET_UPDATED", + value: "an encrypted value", + hub_id: id + ) + + {secret_key, sign_secret} = Livebook.Teams.derive_keys(team.teams_key) + secret_value = Livebook.Teams.encrypt(secret.value, secret_key, sign_secret) + livebook_proto_secret = LivebookProto.Secret.new!(name: secret.name, value: secret_value) + + user_connected = + LivebookProto.UserConnected.new!( + name: team.hub_name, + secrets: [livebook_proto_secret], + file_systems: [] + ) + + {:ok, pid} = TeamClient.start_link(team) + assert_receive {:hub_connected, ^id} + refute_receive {:secret_created, ^secret} + + send(pid, {:event, :user_connected, user_connected}) + assert_receive {:secret_created, ^secret} + + updated_secret = %{secret | value: "an updated value"} + secret_value = Livebook.Teams.encrypt(updated_secret.value, secret_key, sign_secret) + + updated_livebook_proto_secret = + LivebookProto.Secret.new!(name: updated_secret.name, value: secret_value) + + user_connected = + LivebookProto.UserConnected.new!( + name: team.hub_name, + secrets: [updated_livebook_proto_secret], + file_systems: [] + ) + + send(pid, {:event, :user_connected, user_connected}) + assert_receive {:secret_updated, ^updated_secret} + + refute secret in TeamClient.get_secrets(team.id) + assert updated_secret in TeamClient.get_secrets(team.id) + end + + test "deletes the secret", %{user: user, node: node} do + team = build_team_hub(user, node) + id = team.id + + secret = + build(:secret, + name: "SECRET_UPDATED", + value: "an encrypted value", + hub_id: id + ) + + {secret_key, sign_secret} = Livebook.Teams.derive_keys(team.teams_key) + secret_value = Livebook.Teams.encrypt(secret.value, secret_key, sign_secret) + livebook_proto_secret = LivebookProto.Secret.new!(name: secret.name, value: secret_value) + + user_connected = + LivebookProto.UserConnected.new!( + name: team.hub_name, + secrets: [livebook_proto_secret], + file_systems: [] + ) + + {:ok, pid} = TeamClient.start_link(team) + assert_receive {:hub_connected, ^id} + refute_receive {:secret_created, ^secret} + + send(pid, {:event, :user_connected, user_connected}) + assert_receive {:secret_created, ^secret} + + user_connected = + LivebookProto.UserConnected.new!( + name: team.hub_name, + secrets: [], + file_systems: [] + ) + + send(pid, {:event, :user_connected, user_connected}) + assert_receive {:secret_deleted, ^secret} + + refute secret in TeamClient.get_secrets(team.id) + end + + test "fills the file systems list", %{user: user, node: node} do + team = build_team_hub(user, node) + id = team.id + + bucket_url = "https://mybucket.s3.amazonaws.com" + hash = :crypto.hash(:sha256, bucket_url) + fs_id = "#{id}-s3-#{Base.url_encode64(hash, padding: false)}" + + file_system = build(:fs_s3, id: fs_id, bucket_url: bucket_url, external_id: "123456") + + type = Livebook.FileSystems.type(file_system) + %{name: name} = Livebook.FileSystem.external_metadata(file_system) + attrs = Livebook.FileSystem.dump(file_system) + credentials = Jason.encode!(attrs) + + {secret_key, sign_secret} = Livebook.Teams.derive_keys(team.teams_key) + value = Livebook.Teams.encrypt(credentials, secret_key, sign_secret) + + livebook_proto_file_system = + LivebookProto.FileSystem.new!( + id: file_system.external_id, + name: name, + type: to_string(type), + value: value + ) + + user_connected = + LivebookProto.UserConnected.new!( + name: team.hub_name, + secrets: [], + file_systems: [livebook_proto_file_system] + ) + + {:ok, pid} = TeamClient.start_link(team) + assert_receive {:hub_connected, ^id} + refute_receive {:file_system_created, ^file_system} + + send(pid, {:event, :user_connected, user_connected}) + assert_receive {:file_system_created, ^file_system} + assert file_system in TeamClient.get_file_systems(team.id) + end + + test "replaces the file system with updated value", %{user: user, node: node} do + team = build_team_hub(user, node) + id = team.id + + bucket_url = "https://update_fs_994641.s3.amazonaws.com" + hash = :crypto.hash(:sha256, bucket_url) + fs_id = "#{id}-s3-#{Base.url_encode64(hash, padding: false)}" + + file_system = build(:fs_s3, id: fs_id, bucket_url: bucket_url, external_id: "994641") + + type = Livebook.FileSystems.type(file_system) + %{name: name} = Livebook.FileSystem.external_metadata(file_system) + attrs = Livebook.FileSystem.dump(file_system) + credentials = Jason.encode!(attrs) + + {secret_key, sign_secret} = Livebook.Teams.derive_keys(team.teams_key) + value = Livebook.Teams.encrypt(credentials, secret_key, sign_secret) + + livebook_proto_file_system = + LivebookProto.FileSystem.new!( + id: file_system.external_id, + name: name, + type: to_string(type), + value: value + ) + + user_connected = + LivebookProto.UserConnected.new!( + name: team.hub_name, + secrets: [], + file_systems: [livebook_proto_file_system] + ) + + {:ok, pid} = TeamClient.start_link(team) + assert_receive {:hub_connected, ^id} + refute_receive {:file_system_created, ^file_system} + + send(pid, {:event, :user_connected, user_connected}) + assert_receive {:file_system_created, ^file_system} + + updated_file_system = %{ + file_system + | id: "#{id}-s3-ATC52Lo7d-bS21OLjPQ8KFBPJN8ku4hCn2nic2jTGeI", + bucket_url: "https://updated_name.s3.amazonaws.com" + } + + updated_attrs = Livebook.FileSystem.dump(updated_file_system) + updated_credentials = Jason.encode!(updated_attrs) + updated_value = Livebook.Teams.encrypt(updated_credentials, secret_key, sign_secret) + + updated_livebook_proto_file_system = + LivebookProto.FileSystem.new!( + id: updated_file_system.external_id, + name: updated_file_system.bucket_url, + type: to_string(type), + value: updated_value + ) + + user_connected = + LivebookProto.UserConnected.new!( + name: team.hub_name, + secrets: [], + file_systems: [updated_livebook_proto_file_system] + ) + + send(pid, {:event, :user_connected, user_connected}) + assert_receive {:file_system_updated, ^updated_file_system} + + refute file_system in TeamClient.get_file_systems(team.id) + assert updated_file_system in TeamClient.get_file_systems(team.id) + end + + test "deletes the file system", %{user: user, node: node} do + team = build_team_hub(user, node) + id = team.id + + bucket_url = "https://delete_fs_45465641.s3.amazonaws.com" + hash = :crypto.hash(:sha256, bucket_url) + fs_id = "#{id}-s3-#{Base.url_encode64(hash, padding: false)}" + + file_system = build(:fs_s3, id: fs_id, bucket_url: bucket_url, external_id: "45465641") + + type = Livebook.FileSystems.type(file_system) + %{name: name} = Livebook.FileSystem.external_metadata(file_system) + attrs = Livebook.FileSystem.dump(file_system) + credentials = Jason.encode!(attrs) + + {secret_key, sign_secret} = Livebook.Teams.derive_keys(team.teams_key) + value = Livebook.Teams.encrypt(credentials, secret_key, sign_secret) + + livebook_proto_file_system = + LivebookProto.FileSystem.new!( + id: file_system.external_id, + name: name, + type: to_string(type), + value: value + ) + + user_connected = + LivebookProto.UserConnected.new!( + name: team.hub_name, + secrets: [], + file_systems: [livebook_proto_file_system] + ) + + {:ok, pid} = TeamClient.start_link(team) + assert_receive {:hub_connected, ^id} + refute_receive {:file_system_created, ^file_system} + + send(pid, {:event, :user_connected, user_connected}) + assert_receive {:file_system_created, ^file_system} + + user_connected = + LivebookProto.UserConnected.new!( + name: team.hub_name, + secrets: [], + file_systems: [] + ) + + send(pid, {:event, :user_connected, user_connected}) + assert_receive {:file_system_deleted, ^file_system} + + refute file_system in TeamClient.get_file_systems(team.id) + end end end diff --git a/test/livebook_teams/teams/connection_test.exs b/test/livebook_teams/teams/connection_test.exs index a5856be08..33fb5157a 100644 --- a/test/livebook_teams/teams/connection_test.exs +++ b/test/livebook_teams/teams/connection_test.exs @@ -1,4 +1,5 @@ defmodule Livebook.Teams.ConnectionTest do + alias Livebook.FileSystem use Livebook.TeamsIntegrationCase, async: true @moduletag :capture_log @@ -50,5 +51,26 @@ defmodule Livebook.Teams.ConnectionTest do assert secret_created.name == secret.name refute secret_created.value == secret.value end + + test "receives the file_system_created event", %{user: user, node: node} do + {hub, headers} = build_team_headers(user, node) + + assert {:ok, _conn} = Connection.start_link(self(), headers) + assert_receive :connected + + # creates a new file system + file_system = build(:fs_s3, bucket_url: "https://file_system_created.s3.amazonaws.com") + assert Livebook.Teams.create_file_system(hub, file_system) == :ok + type = Livebook.FileSystems.type(file_system) + %{name: name} = FileSystem.external_metadata(file_system) + + # receives `{:event, :file_system_created, file_system_created}` event + # without decrypting the value + assert_receive {:event, :file_system_created, file_system_created} + assert file_system_created.name == name + assert file_system_created.type == to_string(type) + refute file_system_created.value == FileSystem.dump(file_system) + assert is_binary(file_system_created.value) + end end end