Implement Teams API file system management (#2188)

This commit is contained in:
Alexandre de Souza 2023-09-05 12:14:50 -03:00 committed by GitHub
parent c77d63b2a1
commit 51a3ab1895
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 361 additions and 54 deletions

View file

@ -241,4 +241,10 @@ defprotocol Livebook.FileSystem do
"""
@spec dump(t()) :: map()
def dump(file_system)
@doc """
Returns file system metadata for external storages.
"""
@spec external_metadata(t()) :: %{name: String.t(), error_field: String.t()}
def external_metadata(file_system)
end

View file

@ -302,4 +302,6 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.Local do
def load(_file_system, _fields), do: raise("not implemented")
def dump(_file_system), do: raise("not implemented")
def external_metadata(_file_system), do: raise("not implemented")
end

View file

@ -1,18 +1,27 @@
defmodule Livebook.FileSystem.S3 do
@moduledoc false
use Ecto.Schema
import Ecto.Changeset
# File system backed by an S3 bucket.
defstruct [:id, :bucket_url, :region, :access_key_id, :secret_access_key]
@type t :: %__MODULE__{
id: String.t(),
bucket_url: String.t(),
external_id: String.t(),
region: String.t(),
access_key_id: String.t(),
secret_access_key: String.t()
}
embedded_schema do
field :bucket_url, :string
field :external_id, :string
field :region, :string
field :access_key_id, :string
field :secret_access_key, :string
end
@doc """
Returns a new file system struct.
@ -22,10 +31,12 @@ defmodule Livebook.FileSystem.S3 do
to have the format `*.[region].[rootdomain].com` and the region
is inferred from that URL
* `:external_id` - the external id from Teams.
"""
@spec new(String.t(), String.t(), String.t(), keyword()) :: t()
def new(bucket_url, access_key_id, secret_access_key, opts \\ []) do
opts = Keyword.validate!(opts, [:region])
opts = Keyword.validate!(opts, [:region, :external_id])
bucket_url = String.trim_trailing(bucket_url, "/")
region = opts[:region] || region_from_uri(bucket_url)
@ -36,6 +47,7 @@ defmodule Livebook.FileSystem.S3 do
%__MODULE__{
id: id,
bucket_url: bucket_url,
external_id: opts[:external_id],
region: region,
access_key_id: access_key_id,
secret_access_key: secret_access_key
@ -59,7 +71,12 @@ defmodule Livebook.FileSystem.S3 do
access_key_id: access_key_id,
secret_access_key: secret_access_key
} ->
file_system = new(bucket_url, access_key_id, secret_access_key, region: config[:region])
file_system =
new(bucket_url, access_key_id, secret_access_key,
region: config[:region],
external_id: config[:external_id]
)
{:ok, file_system}
_config ->
@ -72,6 +89,27 @@ defmodule Livebook.FileSystem.S3 do
def to_config(%__MODULE__{} = s3) do
Map.take(s3, [:bucket_url, :region, :access_key_id, :secret_access_key])
end
@doc """
Returns an `%Ecto.Changeset{}` for tracking file system changes.
"""
@spec change_file_system(t(), map()) :: Ecto.Changeset.t()
def change_file_system(s3, attrs \\ %{}) do
changeset(s3, attrs)
end
defp changeset(s3, attrs) do
s3
|> cast(attrs, [
:bucket_url,
:external_id,
:region,
:access_key_id,
:secret_access_key
])
|> validate_required([:bucket_url, :access_key_id, :secret_access_key])
|> Livebook.Utils.validate_url(:bucket_url)
end
end
defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
@ -327,15 +365,20 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
S3.Client.multipart_get_object(file_system, key, collectable)
end
def load(_file_system, %{"bucket_url" => _} = fields) do
S3.new(fields["bucket_url"], fields["access_key_id"], fields["secret_access_key"],
region: fields["region"]
)
def load(file_system, %{"bucket_url" => _} = fields) do
load(file_system, %{
bucket_url: fields["bucket_url"],
external_id: fields["external_id"],
region: fields["region"],
access_key_id: fields["access_key_id"],
secret_access_key: fields["secret_access_key"]
})
end
def load(_file_system, fields) do
S3.new(fields.bucket_url, fields.access_key_id, fields.secret_access_key,
region: fields[:region]
region: fields[:region],
external_id: fields[:external_id]
)
end
@ -344,4 +387,8 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
|> Map.from_struct()
|> Map.take([:bucket_url, :region, :access_key_id, :secret_access_key])
end
def external_metadata(file_system) do
%{name: file_system.bucket_url, error_field: "bucket_url"}
end
end

View file

@ -0,0 +1,19 @@
defmodule Livebook.FileSystems do
@moduledoc false
alias Livebook.FileSystem
@doc """
Returns the type identifier for the given file system.
"""
@spec type(FileSystem.t()) :: String.t()
def type(%FileSystem.S3{}), do: "s3"
@doc """
Loads the file system from given type and dumped data.
"""
@spec load(String.t(), map()) :: FileSystem.t()
def load("s3", dumped_data) do
FileSystem.load(%FileSystem.S3{}, dumped_data)
end
end

View file

@ -1,7 +1,7 @@
defmodule Livebook.Teams do
@moduledoc false
alias Livebook.Hubs
alias Livebook.{FileSystem, Hubs}
alias Livebook.Hubs.Team
alias Livebook.Secrets.Secret
alias Livebook.Teams.{Requests, Org}
@ -101,9 +101,6 @@ defmodule Livebook.Teams do
@doc """
Creates a Secret.
With success, returns the response from Livebook Teams API.
Otherwise, it will return an error tuple with changeset.
"""
@spec create_secret(Team.t(), Secret.t()) ::
:ok
@ -112,16 +109,13 @@ defmodule Livebook.Teams do
def create_secret(%Team{} = team, %Secret{} = secret) do
case Requests.create_secret(team, secret) do
{:ok, %{"id" => _}} -> :ok
{:error, %{"errors" => errors_map}} -> {:error, add_secret_errors(secret, errors_map)}
{:error, %{"errors" => errors}} -> {:error, add_secret_errors(secret, errors)}
any -> any
end
end
@doc """
Updates a Secret.
With success, returns the response from Livebook Teams API.
Otherwise, it will return an error tuple with changeset.
"""
@spec update_secret(Team.t(), Secret.t()) ::
:ok
@ -137,9 +131,6 @@ defmodule Livebook.Teams do
@doc """
Deletes a Secret.
With success, returns the response from Livebook Teams API.
Otherwise, it will return an error tuple with changeset.
"""
@spec delete_secret(Team.t(), Secret.t()) ::
:ok
@ -153,6 +144,51 @@ defmodule Livebook.Teams do
end
end
@doc """
Creates a File System.
"""
@spec create_file_system(Team.t(), FileSystem.t()) ::
:ok
| {:error, Ecto.Changeset.t()}
| {:transport_error, String.t()}
def create_file_system(%Team{} = team, file_system) do
case Requests.create_file_system(team, file_system) do
{:ok, %{"id" => _}} -> :ok
{:error, %{"errors" => errors}} -> {:error, add_file_system_errors(file_system, errors)}
any -> any
end
end
@doc """
Updates a File System.
"""
@spec update_file_system(Team.t(), FileSystem.t()) ::
:ok
| {:error, Ecto.Changeset.t()}
| {:transport_error, String.t()}
def update_file_system(%Team{} = team, file_system) do
case Requests.update_file_system(team, file_system) do
{:ok, %{"id" => _}} -> :ok
{:error, %{"errors" => errors}} -> {:error, add_file_system_errors(file_system, errors)}
any -> any
end
end
@doc """
Deletes a File System.
"""
@spec delete_file_system(Team.t(), FileSystem.t()) ::
:ok
| {:error, Ecto.Changeset.t()}
| {:transport_error, String.t()}
def delete_file_system(%Team{} = team, file_system) do
case Requests.delete_file_system(team, file_system) do
{:ok, _} -> :ok
{:error, %{"errors" => errors}} -> {:error, add_file_system_errors(file_system, errors)}
any -> any
end
end
@doc """
Creates a Hub.
@ -189,7 +225,9 @@ defmodule Livebook.Teams do
@doc """
Encrypts the given value with Teams key derived keys.
"""
@spec encrypt(String.t(), bitstring(), bitstring()) :: String.t()
@spec encrypt(String.t() | nil, bitstring(), bitstring()) :: String.t()
def encrypt(value, _secret, _sign_secret) when value in ["", nil], do: value
def encrypt(value, secret, sign_secret) do
Plug.Crypto.MessageEncryptor.encrypt(value, secret, sign_secret)
end
@ -197,7 +235,9 @@ defmodule Livebook.Teams do
@doc """
Decrypts the given encrypted value with Teams key derived keys.
"""
@spec decrypt(String.t(), bitstring(), bitstring()) :: {:ok, String.t()} | :error
@spec decrypt(String.t() | nil, bitstring(), bitstring()) :: {:ok, String.t()} | :error
def decrypt(value, _secret, _sign_secret) when value in ["", nil], do: value
def decrypt(encrypted_value, secret, sign_secret) do
Plug.Crypto.MessageEncryptor.decrypt(encrypted_value, secret, sign_secret)
end
@ -223,6 +263,13 @@ defmodule Livebook.Teams do
add_errors(change(secret), Secret.__schema__(:fields), errors_map)
end
defp add_file_system_errors(%struct{} = file_system, errors_map) do
%{error_field: field} = FileSystem.external_metadata(file_system)
errors_map = Map.new(errors_map, fn {_key, values} -> {field, values} end)
add_errors(change(file_system), struct.__schema__(:fields), errors_map)
end
defp add_errors(%Ecto.Changeset{} = changeset, fields, errors_map) do
for {key, errors} <- errors_map,
field = String.to_atom(key),

View file

@ -1,6 +1,8 @@
defmodule Livebook.Teams.Requests do
@moduledoc false
alias Livebook.FileSystem
alias Livebook.FileSystems
alias Livebook.Hubs.Team
alias Livebook.Secrets.Secret
alias Livebook.Teams
@ -86,6 +88,65 @@ defmodule Livebook.Teams.Requests do
delete("/api/v1/org/secrets", params, headers)
end
@doc """
Send a request to Livebook Team API to create a file system.
"""
@spec create_file_system(Team.t(), FileSystem.t()) ::
{:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def create_file_system(team, file_system) do
{secret_key, sign_secret} = Teams.derive_keys(team.teams_key)
headers = auth_headers(team)
type = FileSystems.type(file_system)
%{name: name} = FileSystem.external_metadata(file_system)
attrs = FileSystem.dump(file_system)
json = Jason.encode!(attrs)
params = %{
name: name,
type: to_string(type),
value: Teams.encrypt(json, secret_key, sign_secret)
}
post("/api/v1/org/file-systems", params, headers)
end
@doc """
Send a request to Livebook Team API to update a file system.
"""
@spec update_file_system(Team.t(), FileSystem.t()) ::
{:ok, map()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def update_file_system(team, file_system) do
{secret_key, sign_secret} = Teams.derive_keys(team.teams_key)
headers = auth_headers(team)
type = FileSystems.type(file_system)
%{name: name} = FileSystem.external_metadata(file_system)
attrs = FileSystem.dump(file_system)
json = Jason.encode!(attrs)
params = %{
id: file_system.external_id,
name: name,
type: to_string(type),
value: Teams.encrypt(json, secret_key, sign_secret)
}
put("/api/v1/org/file-systems", params, headers)
end
@doc """
Send a request to Livebook Team API to delete a file system.
"""
@spec delete_file_system(Team.t(), FileSystem.t()) ::
{:ok, String.t()} | {:error, map() | String.t()} | {:transport_error, String.t()}
def delete_file_system(team, file_system) do
headers = auth_headers(team)
params = %{id: file_system.external_id}
delete("/api/v1/org/file-systems", params, headers)
end
defp auth_headers(team) do
token = "#{team.user_id}:#{team.org_id}:#{team.org_key_id}:#{team.session_token}"

View file

@ -988,10 +988,11 @@ defmodule Livebook.FileSystem.S3Test do
end
describe "FileSystem.load/2" do
test "loads the region from fields map" do
test "loads region and external id from fields map" do
fields = %{
bucket_url: "https://mybucket.s3.amazonaws.com",
region: "us-east-1",
external_id: "123456789",
access_key_id: "key",
secret_access_key: "secret"
}
@ -1002,13 +1003,14 @@ defmodule Livebook.FileSystem.S3Test do
assert FileSystem.load(%S3{}, fields) == %S3{
id: id,
bucket_url: fields.bucket_url,
external_id: fields.external_id,
region: fields.region,
access_key_id: fields.access_key_id,
secret_access_key: fields.secret_access_key
}
end
test "loads the region from bucket url" do
test "loads region from bucket url" do
fields = %{
bucket_url: "https://mybucket.s3.us-east-1.amazonaws.com",
access_key_id: "key",
@ -1016,20 +1018,22 @@ defmodule Livebook.FileSystem.S3Test do
}
hash = :crypto.hash(:sha256, fields.bucket_url)
id = "s3-#{Base.url_encode64(hash, padding: false)}"
assert FileSystem.load(%S3{}, fields) == %S3{
id: id,
id: "s3-#{Base.url_encode64(hash, padding: false)}",
bucket_url: fields.bucket_url,
external_id: nil,
region: "us-east-1",
access_key_id: fields.access_key_id,
secret_access_key: fields.secret_access_key
}
end
test "loads the file system with string keys" do
test "loads from string keys" do
fields = %{
"bucket_url" => "https://mybucket.s3.us-east-1.amazonaws.com",
"bucket_url" => "https://mybucket.s3.amazonaws.com",
"region" => "us-east-1",
"external_id" => "123456789",
"access_key_id" => "key",
"secret_access_key" => "secret"
}
@ -1040,7 +1044,8 @@ defmodule Livebook.FileSystem.S3Test do
assert FileSystem.load(%S3{}, fields) == %S3{
id: id,
bucket_url: fields["bucket_url"],
region: "us-east-1",
external_id: fields["external_id"],
region: fields["region"],
access_key_id: fields["access_key_id"],
secret_access_key: fields["secret_access_key"]
}

View file

@ -151,18 +151,7 @@ defmodule Livebook.TeamsTest do
describe "create_secret/2" do
test "creates a new secret", %{user: user, node: node} do
org = :erpc.call(node, Hub.Integration, :create_org, [])
org_key = :erpc.call(node, Hub.Integration, :create_org_key, [[org: org]])
token = :erpc.call(node, Hub.Integration, :associate_user_with_org, [user, org])
hub =
build(:team,
user_id: user.id,
org_id: org.id,
org_key_id: org_key.id,
session_token: token
)
hub = create_team_hub(user, node)
secret = build(:secret, name: "FOO", value: "BAR")
assert Teams.create_secret(hub, secret) == :ok
@ -173,22 +162,151 @@ defmodule Livebook.TeamsTest do
end
test "returns changeset errors when data is invalid", %{user: user, node: node} do
org = :erpc.call(node, Hub.Integration, :create_org, [])
org_key = :erpc.call(node, Hub.Integration, :create_org_key, [[org: org]])
token = :erpc.call(node, Hub.Integration, :associate_user_with_org, [user, org])
hub =
build(:team,
user_id: user.id,
org_id: org.id,
org_key_id: org_key.id,
session_token: token
)
hub = create_team_hub(user, node)
secret = build(:secret, name: "LB_FOO", value: "BAR")
assert {:error, changeset} = Teams.create_secret(hub, secret)
assert "cannot start with the LB_ prefix" in errors_on(changeset).name
end
end
describe "update_secret/2" do
test "updates a secret", %{user: user, node: node} do
hub = create_team_hub(user, node)
secret = build(:secret, name: "UPDATE_ME", value: "BAR")
assert Teams.create_secret(hub, secret) == :ok
update_secret = Map.replace!(secret, :value, "BAZ")
assert Teams.update_secret(hub, update_secret) == :ok
end
test "returns changeset errors when data is invalid", %{user: user, node: node} do
hub = create_team_hub(user, node)
secret = build(:secret, name: "FIX_ME", value: "BAR")
assert Teams.create_secret(hub, secret) == :ok
update_secret = Map.replace!(secret, :value, "")
assert {:error, changeset} = Teams.update_secret(hub, update_secret)
assert "can't be blank" in errors_on(changeset).value
end
end
describe "delete_secret/2" do
test "deletes a secret", %{user: user, node: node} do
hub = create_team_hub(user, node)
secret = build(:secret, name: "DELETE_ME", value: "BAR")
assert Teams.create_secret(hub, secret) == :ok
assert Teams.delete_secret(hub, secret) == :ok
# Guarantee it's been removed and will return HTTP status 404
assert Teams.delete_secret(hub, secret) ==
{:transport_error,
"Something went wrong, try again later or please file a bug if it persists"}
end
test "returns transport errors when secret doesn't exists", %{user: user, node: node} do
hub = create_team_hub(user, node)
secret = build(:secret, name: "I_CANT_EXIST", value: "BAR")
# Guarantee it doesn't exists and will return HTTP status 404
assert Teams.delete_secret(hub, secret) ==
{:transport_error,
"Something went wrong, try again later or please file a bug if it persists"}
end
end
describe "create_file_system/2" do
test "creates a new file system", %{user: user, node: node} do
hub = create_team_hub(user, node)
file_system = build(:fs_s3, bucket_url: "https://file_system_created.s3.amazonaws.com")
assert Teams.create_file_system(hub, file_system) == :ok
# Guarantee uniqueness
assert {:error, changeset} = Teams.create_file_system(hub, file_system)
assert "has already been taken" in errors_on(changeset).bucket_url
end
test "returns changeset errors when data is invalid", %{user: user, node: node} do
hub = create_team_hub(user, node)
file_system = build(:fs_s3, bucket_url: nil)
assert {:error, changeset} = Teams.create_file_system(hub, file_system)
assert "can't be blank" in errors_on(changeset).bucket_url
end
end
describe "update_file_system/2" do
test "updates a file system", %{user: user, node: node} do
hub = create_team_hub(user, node)
teams_file_system = create_teams_file_system(hub, node)
file_system =
build(:fs_s3,
bucket_url: teams_file_system.name,
region: "us-east-1",
external_id: to_string(teams_file_system.id)
)
update_file_system = Map.replace!(file_system, :region, "eu-central-1")
assert Teams.update_file_system(hub, update_file_system) == :ok
end
test "returns changeset errors when data is invalid", %{user: user, node: node} do
hub = create_team_hub(user, node)
teams_file_system = create_teams_file_system(hub, node)
file_system =
build(:fs_s3,
bucket_url: "https://fix_me.s3.amazonaws.com",
external_id: to_string(teams_file_system.id)
)
update_file_system = Map.replace!(file_system, :bucket_url, "")
assert {:error, changeset} = Teams.update_file_system(hub, update_file_system)
assert "can't be blank" in errors_on(changeset).bucket_url
end
end
describe "delete_file_system/2" do
test "deletes a file system", %{user: user, node: node} do
hub = create_team_hub(user, node)
teams_file_system = create_teams_file_system(hub, node)
file_system =
build(:fs_s3,
bucket_url: teams_file_system.name,
region: "us-east-1",
external_id: to_string(teams_file_system.id)
)
assert Teams.delete_file_system(hub, file_system) == :ok
# Guarantee it's been removed and will return HTTP status 404
assert Teams.delete_file_system(hub, file_system) ==
{:transport_error,
"Something went wrong, try again later or please file a bug if it persists"}
end
test "returns transport errors when file system doesn't exists", %{user: user, node: node} do
hub = create_team_hub(user, node)
file_system = build(:fs_s3, bucket_url: "https://i_cant_exist.s3.amazonaws.com")
# Guarantee it doesn't exists and will return HTTP status 404
assert Teams.delete_file_system(hub, file_system) ==
{:transport_error,
"Something went wrong, try again later or please file a bug if it persists"}
end
end
defp create_teams_file_system(hub, node) do
org_key = :erpc.call(node, Hub.Integration, :get_org_key!, [hub.org_key_id])
:erpc.call(node, Hub.Integration, :create_file_system, [[org_key: org_key]])
end
end

View file

@ -70,10 +70,12 @@ defmodule Livebook.Factory do
def build(:fs_s3) do
bucket_url = "https://mybucket.s3.amazonaws.com"
hash = :crypto.hash(:sha256, bucket_url)
id = "s3-#{Base.url_encode64(hash, padding: false)}"
%Livebook.FileSystem.S3{
id: "s3-#{Base.url_encode64(hash, padding: false)}",
id: id,
bucket_url: bucket_url,
external_id: id,
region: "us-east-1",
access_key_id: "key",
secret_access_key: "secret"