Implements load/2 and dump/1 to FileSystem protocol and some improvements (#2171)

This commit is contained in:
Alexandre de Souza 2023-08-22 16:20:35 -03:00 committed by GitHub
parent 7ff6103475
commit 928181cefe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 575 additions and 450 deletions

View file

@ -229,4 +229,16 @@ defprotocol Livebook.FileSystem do
@spec read_stream_into(t(), path(), Collectable.t()) ::
{:ok, Collectable.t()} | {:error, error()}
def read_stream_into(file_system, path, collectable)
@doc """
Loads fields into given file system.
"""
@spec load(t(), map()) :: struct()
def load(file_system, fields)
@doc """
Transforms file system to the attributes map.
"""
@spec dump(t()) :: map()
def dump(file_system)
end

View file

@ -298,4 +298,8 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.Local do
error in File.Error -> FileSystem.Utils.posix_error(error.reason)
end
end
def load(_file_system, _fields), do: raise("not implemented")
def dump(_file_system), do: raise("not implemented")
end

View file

@ -76,8 +76,7 @@ end
defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
alias Livebook.FileSystem
alias Livebook.Utils.HTTP
alias Livebook.FileSystem.S3.XML
alias Livebook.FileSystem.S3
def resource_identifier(file_system) do
{:s3, file_system.bucket_url}
@ -94,10 +93,10 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
def list(file_system, path, recursive) do
FileSystem.Utils.assert_dir_path!(path)
"/" <> dir_key = path
delimiter = if recursive, do: nil, else: "/"
opts = [prefix: dir_key, delimiter: delimiter]
with {:ok, %{keys: keys}} <- list_objects(file_system, prefix: dir_key, delimiter: delimiter) do
with {:ok, %{keys: keys}} <- S3.Client.list_objects(file_system, opts) do
if keys == [] and dir_key != "" do
FileSystem.Utils.posix_error(:enoent)
else
@ -110,13 +109,15 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
def read(file_system, path) do
FileSystem.Utils.assert_regular_path!(path)
"/" <> key = path
get_object(file_system, key)
S3.Client.get_object(file_system, key)
end
def write(file_system, path, content) do
FileSystem.Utils.assert_regular_path!(path)
"/" <> key = path
put_object(file_system, key, content)
S3.Client.put_object(file_system, key, content)
end
def access(_file_system, _path) do
@ -126,25 +127,26 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
def create_dir(file_system, path) do
FileSystem.Utils.assert_dir_path!(path)
"/" <> key = path
# S3 has no concept of directories, but keys with trailing
# slash are interpreted as such, so we create an empty
# object for the given key
put_object(file_system, key, nil)
S3.Client.put_object(file_system, key, nil)
end
def remove(file_system, path) do
"/" <> key = path
if FileSystem.Utils.dir_path?(path) do
with {:ok, %{keys: keys}} <- list_objects(file_system, prefix: key) do
with {:ok, %{keys: keys}} <- S3.Client.list_objects(file_system, prefix: key) do
if keys == [] do
FileSystem.Utils.posix_error(:enoent)
else
delete_objects(file_system, keys)
S3.Client.delete_objects(file_system, keys)
end
end
else
delete_object(file_system, key)
S3.Client.delete_object(file_system, key)
end
end
@ -154,7 +156,8 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
"/" <> destination_key = destination_path
if FileSystem.Utils.dir_path?(source_path) do
with {:ok, %{bucket: bucket, keys: keys}} <- list_objects(file_system, prefix: source_key) do
with {:ok, %{bucket: bucket, keys: keys}} <-
S3.Client.list_objects(file_system, prefix: source_key) do
if keys == [] do
FileSystem.Utils.posix_error(:enoent)
else
@ -163,7 +166,7 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
renamed_key = String.replace_prefix(key, source_key, destination_key)
Task.async(fn ->
copy_object(file_system, bucket, key, renamed_key)
S3.Client.copy_object(file_system, bucket, key, renamed_key)
end)
end)
|> Task.await_many(:infinity)
@ -175,8 +178,8 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
end
end
else
with {:ok, bucket} <- get_bucket_name(file_system) do
copy_object(file_system, bucket, source_key, destination_key)
with {:ok, bucket} <- S3.Client.get_bucket_name(file_system) do
S3.Client.copy_object(file_system, bucket, source_key, destination_key)
end
end
end
@ -185,7 +188,7 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
FileSystem.Utils.assert_same_type!(source_path, destination_path)
"/" <> destination_key = destination_path
with {:ok, destination_exists?} <- object_exists(file_system, destination_key) do
with {:ok, destination_exists?} <- S3.Client.object_exists(file_system, destination_key) do
if destination_exists? do
FileSystem.Utils.posix_error(:eexist)
else
@ -202,14 +205,15 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
FileSystem.Utils.assert_regular_path!(path)
"/" <> key = path
with {:ok, %{etag: etag}} <- head_object(file_system, key) do
with {:ok, %{etag: etag}} <- S3.Client.head_object(file_system, key) do
{:ok, etag}
end
end
def exists?(file_system, path) do
"/" <> key = path
object_exists(file_system, key)
S3.Client.object_exists(file_system, key)
end
def resolve_path(_file_system, dir_path, subject) do
@ -245,7 +249,7 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
if state.upload_id do
{:ok, state}
else
with {:ok, upload_id} <- create_multipart_upload(file_system, state.key) do
with {:ok, upload_id} <- S3.Client.create_multipart_upload(file_system, state.key) do
{:ok, %{state | upload_id: upload_id}}
end
end
@ -266,7 +270,8 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
parts = state.parts + 1
with {:ok, %{etag: etag}} <- upload_part(file_system, state.key, state.upload_id, parts, part) do
with {:ok, %{etag: etag}} <-
S3.Client.upload_part(file_system, state.key, state.upload_id, parts, part) do
{:ok,
%{
state
@ -289,7 +294,7 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
with {:ok, state} <- maybe_state,
:ok <-
complete_multipart_upload(
S3.Client.complete_multipart_upload(
file_system,
state.key,
state.upload_id,
@ -298,18 +303,18 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
:ok
else
{:error, error} ->
abort_multipart_upload(file_system, state.key, state.upload_id)
S3.Client.abort_multipart_upload(file_system, state.key, state.upload_id)
{:error, error}
end
else
content = state.current_chunks |> Enum.reverse() |> IO.iodata_to_binary()
put_object(file_system, state.key, content)
S3.Client.put_object(file_system, state.key, content)
end
end
def write_stream_halt(file_system, state) do
if state.upload_id do
abort_multipart_upload(file_system, state.key, state.upload_id)
S3.Client.abort_multipart_upload(file_system, state.key, state.upload_id)
else
:ok
end
@ -318,313 +323,25 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
def read_stream_into(file_system, path, collectable) do
FileSystem.Utils.assert_regular_path!(path)
"/" <> key = path
multipart_get_object(file_system, key, collectable)
S3.Client.multipart_get_object(file_system, key, collectable)
end
# Requests
defp list_objects(file_system, opts) do
prefix = opts[:prefix]
delimiter = opts[:delimiter]
query = %{"list-type" => "2", "prefix" => prefix, "delimiter" => delimiter}
case request(file_system, :get, "/", query: query) |> decode() do
{:ok, 200, _headers, %{"ListBucketResult" => result}} ->
bucket = result["Name"]
file_keys = result |> xml_get_list("Contents") |> Enum.map(& &1["Key"])
prefix_keys = result |> xml_get_list("CommonPrefixes") |> Enum.map(& &1["Prefix"])
keys = file_keys ++ prefix_keys
{:ok, %{bucket: bucket, keys: keys}}
other ->
request_response_to_error(other)
end
end
defp get_bucket_name(file_system) do
# We have bucket URL, but it's not straightforward to extract
# bucket name from the URL, because it may be either the path
# or a part of the host.
#
# Endpoints that return bucket information doesn't include the
# name, but the listing endpoint does, so we just list keys
# with an upper limit of 0 and retrieve the bucket name.
query = %{"list-type" => "2", "max-keys" => "0"}
case request(file_system, :get, "/", query: query) |> decode() do
{:ok, 200, _headers, %{"ListBucketResult" => %{"Name" => bucket}}} ->
{:ok, bucket}
other ->
request_response_to_error(other)
end
end
defp get_object(file_system, key) do
case request(file_system, :get, "/" <> encode_key(key), long: true) do
{:ok, 200, _headers, body} -> {:ok, body}
{:ok, 404, _headers, _body} -> FileSystem.Utils.posix_error(:enoent)
other -> request_response_to_error(other)
end
end
defp multipart_get_object(file_system, key, collectable) do
case download(file_system, "/" <> encode_key(key), collectable) do
{:ok, collectable} -> {:ok, collectable}
{:error, _message, 404} -> FileSystem.Utils.posix_error(:enoent)
{:error, message, _status} -> {:error, message}
end
end
defp put_object(file_system, key, content) do
case request(file_system, :put, "/" <> encode_key(key), body: content, long: true)
|> decode() do
{:ok, 200, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
end
defp head_object(file_system, key) do
case request(file_system, :head, "/" <> encode_key(key)) do
{:ok, 200, headers, _body} ->
{:ok, etag} = HTTP.fetch_header(headers, "etag")
{:ok, %{etag: etag}}
{:ok, 404, _headers, _body} ->
FileSystem.Utils.posix_error(:enoent)
other ->
request_response_to_error(other)
end
end
defp copy_object(file_system, bucket, source_key, destination_key) do
copy_source = bucket <> "/" <> encode_key(source_key)
headers = [{"x-amz-copy-source", copy_source}]
case request(file_system, :put, "/" <> encode_key(destination_key), headers: headers)
|> decode() do
{:ok, 200, _headers, _body} -> :ok
{:ok, 404, _headers, _body} -> FileSystem.Utils.posix_error(:enoent)
other -> request_response_to_error(other)
end
end
defp delete_object(file_system, key) do
case request(file_system, :delete, "/" <> encode_key(key)) |> decode() do
{:ok, 204, _headers, _body} -> :ok
{:ok, 404, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
end
defp delete_objects(file_system, keys) do
objects = Enum.map(keys, fn key -> %{"Key" => key} end)
body =
%{"Delete" => %{"Object" => objects, "Quiet" => "true"}}
|> XML.encode_to_iodata!()
|> IO.iodata_to_binary()
body_md5 = :crypto.hash(:md5, body) |> Base.encode64()
headers = [{"Content-MD5", body_md5}]
case request(file_system, :post, "/", query: %{"delete" => ""}, headers: headers, body: body)
|> decode() do
{:ok, 200, _headers, %{"Error" => errors}} -> {:error, format_errors(errors)}
{:ok, 200, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
end
defp object_exists(file_system, key) do
# It is possible for /dir/obj to exist without the /dir/ object,
# but we still consider it as existing. That's why we list
# objects instead of checking the key directly.
with {:ok, %{keys: keys}} <- list_objects(file_system, prefix: key, delimiter: "/") do
exists? =
if String.ends_with?(key, "/") do
keys != []
else
key in keys
end
{:ok, exists?}
end
end
defp create_multipart_upload(file_system, key) do
query = %{"uploads" => ""}
case request(file_system, :post, "/" <> encode_key(key), query: query, body: "")
|> decode() do
{:ok, 200, _headers, %{"InitiateMultipartUploadResult" => %{"UploadId" => upload_id}}} ->
{:ok, upload_id}
other ->
request_response_to_error(other)
end
end
defp upload_part(file_system, key, upload_id, part_number, content) do
query = %{"uploadId" => upload_id, "partNumber" => part_number}
case request(file_system, :put, "/" <> encode_key(key),
query: query,
body: content,
long: true
)
|> decode() do
{:ok, 200, headers, _body} ->
{:ok, etag} = HTTP.fetch_header(headers, "etag")
{:ok, %{etag: etag}}
other ->
request_response_to_error(other)
end
end
defp complete_multipart_upload(file_system, key, upload_id, etags) do
query = %{"uploadId" => upload_id}
parts =
for {etag, n} <- Enum.with_index(etags, 1) do
%{"PartNumber" => n, "ETag" => etag}
end
body =
%{"CompleteMultipartUpload" => %{"Part" => parts}}
|> XML.encode_to_iodata!()
|> IO.iodata_to_binary()
case request(file_system, :post, "/" <> encode_key(key), query: query, body: body)
|> decode() do
{:ok, 200, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
end
defp abort_multipart_upload(file_system, key, upload_id) do
query = %{"uploadId" => upload_id}
case request(file_system, :delete, "/" <> encode_key(key), query: query) |> decode() do
{:ok, 204, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
end
defp encode_key(key) do
key
|> String.split("/")
|> Enum.map_join("/", fn segment -> URI.encode(segment, &URI.char_unreserved?/1) end)
end
defp request_response_to_error(error)
defp request_response_to_error({:ok, 403, _headers, %{"Error" => %{"Message" => message}}}) do
{:error, "access denied, " <> Livebook.Utils.downcase_first(message)}
end
defp request_response_to_error({:ok, 403, _headers, _body}) do
{:error, "access denied"}
end
defp request_response_to_error({:ok, _status, _headers, %{"Error" => error}}) do
{:error, format_errors(error)}
end
defp request_response_to_error({:ok, _status, _headers, _body}) do
{:error, "unexpected response"}
end
defp request_response_to_error({:error, _error}) do
{:error, "failed to make an HTTP request"}
end
defp format_errors(%{"Message" => message}) do
Livebook.Utils.downcase_first(message)
end
defp format_errors([%{"Message" => message} | errors]) do
Livebook.Utils.downcase_first(message) <> ", and #{length(errors)} more errors"
end
defp request(file_system, method, path, opts \\ []) do
query = opts[:query] || %{}
headers = opts[:headers] || []
body = opts[:body]
long = Keyword.get(opts, :long, false)
timeout_opts = if(long, do: [timeout: 60_000], else: [])
url = url(file_system, path, query)
headers = headers(file_system, method, url, headers, body)
body = body && {"application/octet-stream", body}
HTTP.request(method, url, [headers: headers, body: body] ++ timeout_opts)
end
defp download(file_system, path, collectable, opts \\ []) do
query = opts[:query] || %{}
headers = opts[:headers] || []
url = url(file_system, path, query)
headers = headers(file_system, :get, url, headers)
HTTP.download(url, collectable, headers: headers)
end
defp url(file_system, path, query) do
file_system.bucket_url <> path <> "?" <> URI.encode_query(query)
end
defp headers(file_system, method, url, headers, body \\ nil) do
now = NaiveDateTime.utc_now() |> NaiveDateTime.to_erl()
%{host: host} = URI.parse(file_system.bucket_url)
headers = [{"Host", host} | headers]
:aws_signature.sign_v4(
file_system.access_key_id,
file_system.secret_access_key,
file_system.region,
"s3",
now,
Atom.to_string(method),
url,
headers,
body || "",
uri_encode_path: false
def load(_file_system, %{"bucket_url" => _} = fields) do
S3.new(fields["bucket_url"], fields["access_key_id"], fields["secret_access_key"],
region: fields["region"]
)
end
defp decode({:ok, status, headers, body}) do
guess_xml? = String.starts_with?(body, "<?xml")
case HTTP.fetch_content_type(headers) do
{:ok, content_type} when content_type in ["text/xml", "application/xml"] ->
{:ok, status, headers, XML.decode!(body)}
# Apparently some requests return XML without content-type
:error when guess_xml? ->
{:ok, status, headers, XML.decode!(body)}
_ ->
{:ok, status, headers, body}
end
def load(_file_system, fields) do
S3.new(fields.bucket_url, fields.access_key_id, fields.secret_access_key,
region: fields[:region]
)
end
defp decode(other), do: other
defp xml_get_list(xml_map, key) do
case xml_map do
%{^key => item} when is_map(item) -> [item]
%{^key => items} when is_list(items) -> items
_ -> []
end
def dump(file_system) do
file_system
|> Map.from_struct()
|> Map.take([:bucket_url, :region, :access_key_id, :secret_access_key])
end
end

View file

@ -0,0 +1,355 @@
defmodule Livebook.FileSystem.S3.Client do
@moduledoc false
alias Livebook.FileSystem
alias Livebook.FileSystem.S3
@doc """
Sends a request to the bucket to get list of objects.
"""
@spec list_objects(S3.t(), keyword()) :: {:ok, map()} | {:error, String.t()}
def list_objects(file_system, opts) do
prefix = opts[:prefix]
delimiter = opts[:delimiter]
query = %{"list-type" => "2", "prefix" => prefix, "delimiter" => delimiter}
case get(file_system, "/", query: query) do
{:ok, 200, _headers, %{"ListBucketResult" => result}} ->
file_keys = xml_get_list(result, "Contents", "Key")
prefix_keys = xml_get_list(result, "CommonPrefixes", "Prefix")
{:ok, %{bucket: result["Name"], keys: file_keys ++ prefix_keys}}
other ->
request_response_to_error(other)
end
end
@doc """
Sends a request to the bucket to get the bucket name.
"""
@spec get_bucket_name(S3.t()) :: {:ok, String.t()} | {:error, String.t()}
def get_bucket_name(file_system) do
# We have bucket URL, but it's not straightforward to extract
# bucket name from the URL, because it may be either the path
# or a part of the host.
#
# Endpoints that return bucket information doesn't include the
# name, but the listing endpoint does, so we just list keys
# with an upper limit of 0 and retrieve the bucket name.
case get(file_system, "/", query: %{"list-type" => "2", "max-keys" => "0"}) do
{:ok, 200, _headers, %{"ListBucketResult" => result}} -> {:ok, result["Name"]}
other -> request_response_to_error(other)
end
end
@doc """
Sends a request to the bucket to get an object.
"""
@spec get_object(S3.t(), String.t()) :: {:ok, map()} | {:error, String.t()}
def get_object(file_system, key) do
case get(file_system, "/" <> encode_key(key), long: true, decode: false) do
{:ok, 200, _headers, body} -> {:ok, body}
{:ok, 404, _headers, _body} -> FileSystem.Utils.posix_error(:enoent)
other -> request_response_to_error(other)
end
end
@doc """
Sends a request to the bucket to download an object with multipart.
"""
@spec multipart_get_object(S3.t(), String.t(), Collectable.t()) ::
{:ok, map()} | {:error, String.t()}
def multipart_get_object(file_system, key, collectable) do
case download(file_system, "/" <> encode_key(key), collectable) do
{:ok, collectable} -> {:ok, collectable}
{:error, _message, 404} -> FileSystem.Utils.posix_error(:enoent)
{:error, message, _status} -> {:error, message}
end
end
@doc """
Sends a request to the bucket to update an object's content.
"""
@spec put_object(S3.t(), String.t(), String.t() | nil) :: :ok | {:error, String.t()}
def put_object(file_system, key, content) do
case put(file_system, "/" <> encode_key(key), body: content, long: true) do
{:ok, 200, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
end
@doc """
Sends a request to the bucket to get the object's ETAG from given key.
"""
@spec head_object(S3.t(), String.t()) :: {:ok, map()} | {:error, String.t()}
def head_object(file_system, key) do
with {:ok, 200, headers, _body} <- head(file_system, "/" <> encode_key(key)),
{:ok, etag} <- Livebook.Utils.HTTP.fetch_header(headers, "etag") do
{:ok, %{etag: etag}}
else
{:ok, 404, _headers, _body} -> FileSystem.Utils.posix_error(:enoent)
other -> request_response_to_error(other)
end
end
@doc """
Sends a request to the bucket to copy an object to given destination.
"""
@spec copy_object(S3.t(), String.t(), String.t(), String.t()) :: :ok | {:error, String.t()}
def copy_object(file_system, bucket, source_key, destination_key) do
copy_source = bucket <> "/" <> encode_key(source_key)
headers = [{"x-amz-copy-source", copy_source}]
case put(file_system, "/" <> encode_key(destination_key), headers: headers) do
{:ok, 200, _headers, _body} -> :ok
{:ok, 404, _headers, _body} -> FileSystem.Utils.posix_error(:enoent)
other -> request_response_to_error(other)
end
end
@doc """
Sends a request to the bucket to delete an object.
"""
@spec delete_object(S3.t(), String.t()) :: :ok | {:error, String.t()}
def delete_object(file_system, key) do
case delete(file_system, "/" <> encode_key(key)) do
{:ok, 204, _headers, _body} -> :ok
{:ok, 404, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
end
@doc """
Sends a request to the bucket to delete a list of objects.
"""
@spec delete_objects(S3.t(), list(String.t())) :: :ok | {:error, String.t()}
def delete_objects(file_system, keys) do
objects = Enum.map(keys, &%{"Key" => &1})
body =
%{"Delete" => %{"Object" => objects, "Quiet" => "true"}}
|> S3.XML.encode_to_iodata!()
|> IO.iodata_to_binary()
body_md5 = :crypto.hash(:md5, body)
headers = [{"Content-MD5", Base.encode64(body_md5)}]
case post(file_system, "/", query: %{"delete" => ""}, headers: headers, body: body) do
{:ok, 200, _headers, %{"Error" => _}} = result -> request_response_to_error(result)
{:ok, 200, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
end
@doc """
Sends a request to the bucket to check if object exists.
"""
@spec object_exists(S3.t(), String.t()) :: {:ok, boolean()} | {:error, String.t()}
def object_exists(file_system, key) do
# It is possible for /dir/obj to exist without the /dir/ object,
# but we still consider it as existing. That's why we list
# objects instead of checking the key directly.
with {:ok, %{keys: keys}} <- list_objects(file_system, prefix: key, delimiter: "/") do
exists? =
if String.ends_with?(key, "/"),
do: keys != [],
else: key in keys
{:ok, exists?}
end
end
@doc """
Sends a request to the bucket to multipart upload an object.
"""
@spec create_multipart_upload(S3.t(), String.t()) :: {:ok, String.t()} | {:error, String.t()}
def create_multipart_upload(file_system, key) do
query = %{"uploads" => ""}
case post(file_system, "/" <> encode_key(key), query: query, body: "") do
{:ok, 200, _headers, %{"InitiateMultipartUploadResult" => result}} ->
{:ok, result["UploadId"]}
other ->
request_response_to_error(other)
end
end
@doc """
Sends a request to the bucket to upload an object partially.
"""
@spec upload_part(S3.t(), String.t(), String.t(), pos_integer(), String.t()) ::
{:ok, map()} | {:error, String.t()}
def upload_part(file_system, key, upload_id, part_number, content) do
query = %{"uploadId" => upload_id, "partNumber" => part_number}
opts = [query: query, body: content, long: true]
with {:ok, 200, headers, _body} <- put(file_system, "/" <> encode_key(key), opts),
{:ok, etag} <- Livebook.Utils.HTTP.fetch_header(headers, "etag") do
{:ok, %{etag: etag}}
else
other -> request_response_to_error(other)
end
end
@doc """
Sends a request to the bucket to finish the multipart object upload.
"""
@spec complete_multipart_upload(S3.t(), String.t(), String.t(), list(String.t())) ::
:ok | {:error, String.t()}
def complete_multipart_upload(file_system, key, upload_id, etags) do
query = %{"uploadId" => upload_id}
parts = for {etag, n} <- Enum.with_index(etags, 1), do: %{"PartNumber" => n, "ETag" => etag}
body =
%{"CompleteMultipartUpload" => %{"Part" => parts}}
|> S3.XML.encode_to_iodata!()
|> IO.iodata_to_binary()
case post(file_system, "/" <> encode_key(key), query: query, body: body) do
{:ok, 200, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
end
@doc """
Sends a request to the bucket to abort the multipart object upload.
"""
@spec abort_multipart_upload(S3.t(), String.t(), String.t()) :: :ok | {:error, String.t()}
def abort_multipart_upload(file_system, key, upload_id) do
query = %{"uploadId" => upload_id}
case delete(file_system, "/" <> encode_key(key), query: query) do
{:ok, 204, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
end
# Convenient API
defp get(file_system, path, opts), do: request(file_system, :get, path, opts)
defp post(file_system, path, opts), do: request(file_system, :post, path, opts)
defp put(file_system, path, opts), do: request(file_system, :put, path, opts)
defp head(file_system, path), do: request(file_system, :head, path, [])
defp delete(file_system, path, opts \\ []), do: request(file_system, :delete, path, opts)
defp download(file_system, path, collectable, opts \\ []) do
query = opts[:query] || %{}
headers = opts[:headers] || []
url = build_url(file_system, path, query)
headers = sign_headers(file_system, :get, url, headers)
Livebook.Utils.HTTP.download(url, collectable, headers: headers)
end
# Private
defp encode_key(key) do
key
|> String.split("/")
|> Enum.map_join("/", fn segment -> URI.encode(segment, &URI.char_unreserved?/1) end)
end
defp build_url(file_system, path, query) do
query_string = URI.encode_query(query)
query_string = if query_string != "", do: "?#{query_string}", else: ""
file_system.bucket_url <> path <> query_string
end
defp sign_headers(file_system, method, url, headers, body \\ nil) do
now = NaiveDateTime.utc_now() |> NaiveDateTime.to_erl()
%{host: host} = URI.parse(file_system.bucket_url)
headers = [{"Host", host} | headers]
:aws_signature.sign_v4(
file_system.access_key_id,
file_system.secret_access_key,
file_system.region,
"s3",
now,
Atom.to_string(method),
url,
headers,
body || "",
uri_encode_path: false
)
end
defp request(file_system, method, path, opts) do
long = Keyword.get(opts, :long, false)
decode? = Keyword.get(opts, :decode, true)
query = opts[:query] || %{}
headers = opts[:headers] || []
body = opts[:body]
timeout = if long, do: 60_000, else: 30_000
url = build_url(file_system, path, query)
headers = sign_headers(file_system, method, url, headers, body)
body = body && {"application/octet-stream", body}
result =
Livebook.Utils.HTTP.request(method, url, headers: headers, body: body, timeout: timeout)
if decode?, do: decode(result), else: result
end
defp decode({:ok, status, headers, body}) do
if xml?(headers, body),
do: {:ok, status, headers, S3.XML.decode!(body)},
else: {:ok, status, headers, body}
end
defp decode({:error, _} = error), do: error
defp xml?(headers, body) do
guess_xml? = String.starts_with?(body, "<?xml")
case Livebook.Utils.HTTP.fetch_content_type(headers) do
{:ok, content_type} when content_type in ["text/xml", "application/xml"] -> true
# Apparently some requests return XML without content-type
:error when guess_xml? -> true
_otherwise -> false
end
end
defp request_response_to_error({:ok, 403, _headers, %{"Error" => %{"Message" => message}}}) do
{:error, "access denied, " <> Livebook.Utils.downcase_first(message)}
end
defp request_response_to_error({:ok, 403, _headers, _body}) do
{:error, "access denied"}
end
defp request_response_to_error({:ok, _status, _headers, %{"Error" => %{"Message" => message}}}) do
{:error, Livebook.Utils.downcase_first(message)}
end
defp request_response_to_error({:ok, _status, _headers, %{"Error" => [_ | _] = errors}}) do
[%{"Message" => message} | errors] = errors
{:error, Livebook.Utils.downcase_first(message) <> ", and #{length(errors)} more errors"}
end
defp request_response_to_error({:ok, _status, _headers, _body}) do
{:error, "unexpected response"}
end
defp request_response_to_error(_otherwise) do
{:error, "failed to make an http request"}
end
defp xml_get_list(result, xml_key, map_key) do
items =
case result do
%{^xml_key => item} when is_map(item) -> [item]
%{^xml_key => items} when is_list(items) -> items
_ -> []
end
Enum.map(items, & &1[map_key])
end
end

View file

@ -26,16 +26,6 @@ defmodule Livebook.Hubs do
end
end
@doc """
Gets a list of hubs from storage with given capabilities.
"""
@spec get_hubs(Provider.capabilities()) :: list(Provider.t())
def get_hubs(capabilities) do
for hub <- get_hubs(),
capability?(hub, capabilities),
do: hub
end
@doc """
Gets a list of metadatas from storage.
"""
@ -180,7 +170,7 @@ defmodule Livebook.Hubs do
"""
@spec connect_hubs() :: :ok
def connect_hubs do
for hub <- get_hubs([:connect]), do: connect_hub(hub)
for hub <- get_hubs(), do: connect_hub(hub)
:ok
end
@ -209,7 +199,7 @@ defmodule Livebook.Hubs do
"""
@spec get_secrets() :: list(Secret.t())
def get_secrets do
for hub <- get_hubs([:list_secrets]),
for hub <- get_hubs(),
secret <- Provider.get_secrets(hub),
do: secret
end
@ -219,13 +209,9 @@ defmodule Livebook.Hubs do
"""
@spec get_secrets(Provider.t()) :: list(Secret.t())
def get_secrets(hub) do
if capability?(hub, [:list_secrets]) do
hub
|> Provider.get_secrets()
|> Enum.sort()
else
[]
end
hub
|> Provider.get_secrets()
|> Enum.sort()
end
@doc """
@ -236,8 +222,6 @@ defmodule Livebook.Hubs do
| {:error, Ecto.Changeset.t()}
| {:transport_error, String.t()}
def create_secret(hub, %Secret{} = secret) do
true = capability?(hub, [:create_secret])
Provider.create_secret(hub, secret)
end
@ -277,12 +261,4 @@ defmodule Livebook.Hubs do
def verify_notebook_stamp(hub, notebook_source, stamp) do
Provider.verify_notebook_stamp(hub, notebook_source, stamp)
end
@doc """
Checks the hub capability for given hub.
"""
@spec capability?(Provider.t(), list(atom())) :: boolean()
def capability?(hub, capabilities) do
capabilities -- Provider.capabilities(hub) == []
end
end

View file

@ -82,8 +82,8 @@ defmodule Livebook.Hubs.Personal do
@doc """
Get the secrets list from storage.
"""
@spec get_secrets :: [Secret.t()]
def get_secrets do
@spec get_secrets() :: list(Secret.t())
def get_secrets() do
Enum.map(Storage.all(@secrets_namespace), &to_secret/1)
end
@ -163,8 +163,6 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Personal do
def disconnect(_personal), do: raise("not implemented")
def capabilities(_personal), do: ~w(list_secrets create_secret)a
def get_secrets(_personal) do
Personal.get_secrets()
end

View file

@ -3,9 +3,6 @@ defprotocol Livebook.Hubs.Provider do
alias Livebook.Secrets.Secret
@type capability :: :connect | :list_secrets | :create_secret
@type capabilities :: list(capability())
@typedoc """
An provider-specific map stored as notebook stamp.
@ -53,12 +50,6 @@ defprotocol Livebook.Hubs.Provider do
@spec disconnect(t()) :: :ok
def disconnect(hub)
@doc """
Gets the capabilities of the given hub.
"""
@spec capabilities(t()) :: capabilities()
def capabilities(hub)
@doc """
Gets the secrets of the given hub.
"""

View file

@ -131,8 +131,6 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Team do
def disconnect(team), do: TeamClient.stop(team.id)
def capabilities(_team), do: ~w(connect list_secrets create_secret)a
def get_secrets(team), do: TeamClient.get_secrets(team.id)
def create_secret(team, secret), do: Teams.create_secret(team, secret)

View file

@ -1,12 +1,14 @@
defmodule Livebook.FileSystem.S3Test do
use ExUnit.Case, async: true
use Livebook.DataCase, async: true
alias Livebook.FileSystem
alias Livebook.FileSystem.S3
setup do
bypass = Bypass.open()
{:ok, bypass: bypass}
file_system = build(:fs_s3, bucket_url: bucket_url(bypass.port))
{:ok, bypass: bypass, file_system: file_system}
end
describe "new/3" do
@ -30,13 +32,13 @@ defmodule Livebook.FileSystem.S3Test do
describe "FileSystem.default_path/1" do
test "returns the root path" do
file_system = S3.new("https://example.com/mybucket", "key", "secret")
file_system = build(:fs_s3, bucket_url: "https://example.com/mybucket", region: "auto")
assert FileSystem.default_path(file_system) == "/"
end
end
describe "common request errors" do
test "authorization failure", %{bypass: bypass} do
test "authorization failure", %{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
conn
|> Plug.Conn.put_resp_content_type("application/xml")
@ -47,14 +49,13 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
dir_path = "/dir/"
assert {:error, "access denied, reason for authorization failure"} =
FileSystem.list(file_system, dir_path, false)
end
test "an arbitrary error with message", %{bypass: bypass} do
test "an arbitrary error with message", %{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
conn
|> Plug.Conn.put_resp_content_type("application/xml")
@ -65,13 +66,12 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
dir_path = "/dir/"
assert {:error, "error message"} = FileSystem.list(file_system, dir_path, false)
end
test "successful response with unexpected body", %{bypass: bypass} do
test "successful response with unexpected body", %{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
conn
|> Plug.Conn.put_resp_content_type("application/xml")
@ -82,7 +82,6 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
dir_path = "/dir/"
assert {:error, "unexpected response"} = FileSystem.list(file_system, dir_path, false)
@ -90,7 +89,8 @@ defmodule Livebook.FileSystem.S3Test do
end
describe "FileSystem.list/3" do
test "returns an error when a nonexistent directory is given", %{bypass: bypass} do
test "returns an error when a nonexistent directory is given",
%{bypass: bypass, file_system: file_system} do
# When the directory doesn't exist, we get an empty list of matches
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
@ -105,13 +105,14 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
dir_path = "/dir/"
assert {:error, "no such file or directory"} = FileSystem.list(file_system, dir_path, false)
assert {:error, "no such file or directory"} =
FileSystem.list(file_system, dir_path, false)
end
test "does not return an error when the root directory is empty", %{bypass: bypass} do
test "does not return an error when the root directory is empty",
%{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
assert %{"delimiter" => "/"} = conn.params
@ -124,13 +125,13 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
dir_path = "/"
assert {:ok, []} = FileSystem.list(file_system, dir_path, false)
end
test "returns a list of absolute child object paths", %{bypass: bypass} do
test "returns a list of absolute child object paths",
%{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
assert %{"delimiter" => "/"} = conn.params
@ -152,7 +153,6 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
dir_path = "/"
assert {:ok, paths} = FileSystem.list(file_system, dir_path, false)
@ -164,7 +164,8 @@ defmodule Livebook.FileSystem.S3Test do
]
end
test "includes nested objects when called with recursive flag", %{bypass: bypass} do
test "includes nested objects when called with recursive flag",
%{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
assert %{"delimiter" => ""} = conn.params
@ -195,7 +196,6 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
dir_path = "/"
assert {:ok, paths} = FileSystem.list(file_system, dir_path, true)
@ -212,7 +212,8 @@ defmodule Livebook.FileSystem.S3Test do
end
describe "FileSystem.read/2" do
test "returns an error when a nonexistent key is given", %{bypass: bypass} do
test "returns an error when a nonexistent key is given",
%{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "GET", "/mybucket/nonexistent.txt", fn conn ->
conn
|> Plug.Conn.put_resp_content_type("application/xml")
@ -223,13 +224,13 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/nonexistent.txt"
assert {:error, "no such file or directory"} = FileSystem.read(file_system, file_path)
end
test "returns object contents under the given key", %{bypass: bypass} do
test "returns object contents under the given key",
%{bypass: bypass, file_system: file_system} do
content = """
<MyData>
<Info>this should not be parsed</Info>
@ -244,7 +245,6 @@ defmodule Livebook.FileSystem.S3Test do
|> Plug.Conn.resp(200, content)
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/dir/file.txt"
assert {:ok, ^content} = FileSystem.read(file_system, file_path)
@ -252,7 +252,7 @@ defmodule Livebook.FileSystem.S3Test do
end
describe "FileSystem.write/3" do
test "writes contents under the file key", %{bypass: bypass} do
test "writes contents under the file key", %{bypass: bypass, file_system: file_system} do
content = "content"
Bypass.expect_once(bypass, "PUT", "/mybucket/dir/file.txt", fn conn ->
@ -261,16 +261,14 @@ defmodule Livebook.FileSystem.S3Test do
Plug.Conn.resp(conn, 200, "")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/dir/file.txt"
assert :ok = FileSystem.write(file_system, file_path, content)
end
# Google Cloud Storage XML API returns this type of response.
test "returns success when the status is 200 even if the content type is text/html", %{
bypass: bypass
} do
test "returns success when the status is 200 even if the content type is text/html",
%{bypass: bypass, file_system: file_system} do
content = "content"
Bypass.expect_once(bypass, "PUT", "/mybucket/dir/file.txt", fn conn ->
@ -281,7 +279,6 @@ defmodule Livebook.FileSystem.S3Test do
|> Plug.Conn.resp(200, "")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/dir/file.txt"
assert :ok = FileSystem.write(file_system, file_path, content)
@ -289,14 +286,14 @@ defmodule Livebook.FileSystem.S3Test do
end
describe "FileSystem.create_dir/2" do
test "write empty content under the directory key", %{bypass: bypass} do
test "write empty content under the directory key",
%{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "PUT", "/mybucket/dir/", fn conn ->
assert {:ok, "", conn} = Plug.Conn.read_body(conn)
Plug.Conn.resp(conn, 200, "")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
dir_path = "/dir/"
assert :ok = FileSystem.create_dir(file_system, dir_path)
@ -304,7 +301,8 @@ defmodule Livebook.FileSystem.S3Test do
end
describe "FileSystem.remove/2" do
test "returns successful value when a nonexistent key is given", %{bypass: bypass} do
test "returns successful value when a nonexistent key is given",
%{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "DELETE", "/mybucket/file.txt", fn conn ->
conn
|> Plug.Conn.put_resp_content_type("application/xml")
@ -315,25 +313,23 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/file.txt"
assert :ok = FileSystem.remove(file_system, file_path)
end
test "deletes object under the corresponding key", %{bypass: bypass} do
test "deletes object under the corresponding key", %{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "DELETE", "/mybucket/file.txt", fn conn ->
Plug.Conn.resp(conn, 204, "")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/file.txt"
assert :ok = FileSystem.remove(file_system, file_path)
end
test "when a directory is given, recursively lists and batch deletes all matching keys",
%{bypass: bypass} do
%{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
assert %{"prefix" => "dir/", "delimiter" => ""} = conn.params
@ -391,7 +387,6 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
dir_path = "/dir/"
assert :ok = FileSystem.remove(file_system, dir_path)
@ -400,7 +395,7 @@ defmodule Livebook.FileSystem.S3Test do
describe "FileSystem.copy/3" do
test "raises an error if the given paths have different type" do
file_system = S3.new("https://example.com/mybucket", "key", "secret")
file_system = build(:fs_s3, bucket_url: "https://example.com/mybucket")
src_file_path = "/src_file.txt"
dest_dir_path = "/dir/"
@ -410,7 +405,7 @@ defmodule Livebook.FileSystem.S3Test do
end
test "given file paths, returns an error if the source object does not exist",
%{bypass: bypass} do
%{bypass: bypass, file_system: file_system} do
# Request for the bucket name
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
conn
@ -434,7 +429,6 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
src_file_path = "/src_file.txt"
dest_file_path = "/dest_file.txt"
@ -442,7 +436,8 @@ defmodule Livebook.FileSystem.S3Test do
FileSystem.copy(file_system, src_file_path, dest_file_path)
end
test "given file paths, copies contents into the new key", %{bypass: bypass} do
test "given file paths, copies contents into the new key",
%{bypass: bypass, file_system: file_system} do
# Request for the bucket name
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
conn
@ -465,7 +460,6 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
src_file_path = "/src_file.txt"
dest_file_path = "/dest_file.txt"
@ -473,7 +467,7 @@ defmodule Livebook.FileSystem.S3Test do
end
test "given directory paths, returns an error if the source directory does not exist",
%{bypass: bypass} do
%{bypass: bypass, file_system: file_system} do
# Directory listing with no results
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
assert %{"prefix" => "src_dir/", "delimiter" => ""} = conn.params
@ -487,7 +481,6 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
src_dir_path = "/src_dir/"
dest_dir_path = "/dest_dir/"
@ -496,7 +489,7 @@ defmodule Livebook.FileSystem.S3Test do
end
test "given directory paths, recursively lists all matching keys and individually copies objects",
%{bypass: bypass} do
%{bypass: bypass, file_system: file_system} do
# Directory listing
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
assert %{"prefix" => "src_dir/", "delimiter" => ""} = conn.params
@ -542,7 +535,6 @@ defmodule Livebook.FileSystem.S3Test do
end)
end
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
src_dir_path = "/src_dir/"
dest_dir_path = "/dest_dir/"
@ -552,7 +544,7 @@ defmodule Livebook.FileSystem.S3Test do
describe "FileSystem.rename/3" do
test "raises an error if the given paths have different type" do
file_system = S3.new("https://example.com/mybucket", "key", "secret")
file_system = build(:fs_s3, bucket_url: "https://example.com/mybucket")
src_file_path = "/src_file.txt"
dest_dir_path = "/dir/"
@ -561,7 +553,8 @@ defmodule Livebook.FileSystem.S3Test do
end
end
test "returns an error when the destination file exists", %{bypass: bypass} do
test "returns an error when the destination file exists",
%{bypass: bypass, file_system: file_system} do
# Existence is verified by listing
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
assert %{"prefix" => "dest_file.txt", "delimiter" => "/"} = conn.params
@ -578,7 +571,6 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
src_file_path = "/src_file.txt"
dest_file_path = "/dest_file.txt"
@ -589,7 +581,8 @@ defmodule Livebook.FileSystem.S3Test do
# Rename is implemented as copy and delete, both of which are
# tested separately, so here's just one integration test to
# verify this behaviour
test "given file paths, copies the content and deletes the destination", %{bypass: bypass} do
test "given file paths, copies the content and deletes the destination",
%{bypass: bypass, file_system: file_system} do
# Expects two requests:
# * destination existence check by listing, we return no entries
# * bucket name request
@ -618,7 +611,6 @@ defmodule Livebook.FileSystem.S3Test do
Plug.Conn.resp(conn, 204, "")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
src_file_path = "/src_file.txt"
dest_file_path = "/dest_file.txt"
@ -627,25 +619,25 @@ defmodule Livebook.FileSystem.S3Test do
end
describe "FileSystem.etag_for/2" do
test "returns an error when a nonexistent key is given", %{bypass: bypass} do
test "returns an error when a nonexistent key is given",
%{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "HEAD", "/mybucket/nonexistent.txt", fn conn ->
Plug.Conn.resp(conn, 404, "")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/nonexistent.txt"
assert {:error, "no such file or directory"} = FileSystem.etag_for(file_system, file_path)
end
test "returns the ETag value received from the server", %{bypass: bypass} do
test "returns the ETag value received from the server",
%{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "HEAD", "/mybucket/nonexistent.txt", fn conn ->
conn
|> Plug.Conn.put_resp_header("ETag", "value")
|> Plug.Conn.resp(200, "")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/nonexistent.txt"
assert {:ok, "value"} = FileSystem.etag_for(file_system, file_path)
@ -653,7 +645,8 @@ defmodule Livebook.FileSystem.S3Test do
end
describe "FileSystem.exists?/2" do
test "returns false when the given object doesn't exist", %{bypass: bypass} do
test "returns false when the given object doesn't exist",
%{bypass: bypass, file_system: file_system} do
# Existence is verified by listing
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
assert %{"prefix" => "file.txt", "delimiter" => "/"} = conn.params
@ -667,13 +660,12 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/file.txt"
assert {:ok, false} = FileSystem.exists?(file_system, file_path)
end
test "returns true when the given object exists", %{bypass: bypass} do
test "returns true when the given object exists", %{bypass: bypass, file_system: file_system} do
# Existence is verified by listing
Bypass.expect_once(bypass, "GET", "/mybucket", fn conn ->
assert %{"prefix" => "file.txt", "delimiter" => "/"} = conn.params
@ -690,7 +682,6 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/file.txt"
assert {:ok, true} = FileSystem.exists?(file_system, file_path)
@ -699,7 +690,7 @@ defmodule Livebook.FileSystem.S3Test do
describe "FileSystem.resolve_path/3" do
test "resolves relative paths" do
file_system = S3.new("https://example.com/mybucket/", "key", "secret")
file_system = build(:fs_s3, bucket_url: "https://example.com/mybucket/")
assert "/dir/" = FileSystem.resolve_path(file_system, "/dir/", "")
assert "/dir/file.txt" = FileSystem.resolve_path(file_system, "/dir/", "file.txt")
@ -712,7 +703,7 @@ defmodule Livebook.FileSystem.S3Test do
end
test "resolves absolute paths" do
file_system = S3.new("https://example.com/mybucket/", "key", "secret")
file_system = build(:fs_s3, bucket_url: "https://example.com/mybucket/")
assert "/" = FileSystem.resolve_path(file_system, "/dir/", "/")
assert "/file.txt" = FileSystem.resolve_path(file_system, "/dir/", "/file.txt")
@ -725,29 +716,31 @@ defmodule Livebook.FileSystem.S3Test do
describe "FileSystem chunked write" do
test "accumulates small chunks and sends a single request if the content is small",
%{bypass: bypass} do
%{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "PUT", "/mybucket/dir/file.txt", fn conn ->
assert {:ok, "ab", conn} = Plug.Conn.read_body(conn)
Plug.Conn.resp(conn, 200, "")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/dir/file.txt"
assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, part_size: 5_000)
assert {:ok, state} =
FileSystem.write_stream_init(file_system, file_path, part_size: 5_000)
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, "a")
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, "b")
assert :ok = FileSystem.write_stream_finish(file_system, state)
end
test "creates a multi-part upload for contents over 50MB", %{bypass: bypass} do
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
test "creates a multi-part upload for contents over 50MB",
%{bypass: bypass, file_system: file_system} do
file_path = "/dir/file.txt"
chunk_3kb = String.duplicate("a", 3_000)
assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, part_size: 5_000)
assert {:ok, state} =
FileSystem.write_stream_init(file_system, file_path, part_size: 5_000)
Bypass.expect_once(bypass, "POST", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploads" => ""} = conn.params
@ -830,13 +823,13 @@ defmodule Livebook.FileSystem.S3Test do
assert :ok = FileSystem.write_stream_finish(file_system, state)
end
test "aborts the multi-part upload when halted", %{bypass: bypass} do
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
test "aborts the multi-part upload when halted", %{bypass: bypass, file_system: file_system} do
file_path = "/dir/file.txt"
chunk_5kb = String.duplicate("a", 5_000)
assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, part_size: 5_000)
assert {:ok, state} =
FileSystem.write_stream_init(file_system, file_path, part_size: 5_000)
Bypass.expect_once(bypass, "POST", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploads" => ""} = conn.params
@ -870,13 +863,14 @@ defmodule Livebook.FileSystem.S3Test do
assert :ok = FileSystem.write_stream_halt(file_system, state)
end
test "aborts the multi-part upload when finish fails", %{bypass: bypass} do
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
test "aborts the multi-part upload when finish fails",
%{bypass: bypass, file_system: file_system} do
file_path = "/dir/file.txt"
chunk_5kb = String.duplicate("a", 5_000)
assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, part_size: 5_000)
assert {:ok, state} =
FileSystem.write_stream_init(file_system, file_path, part_size: 5_000)
Bypass.expect_once(bypass, "POST", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploads" => ""} = conn.params
@ -935,7 +929,8 @@ defmodule Livebook.FileSystem.S3Test do
end
describe "FileSystem.read_stream_into/2" do
test "returns an error when a nonexistent key is given", %{bypass: bypass} do
test "returns an error when a nonexistent key is given",
%{bypass: bypass, file_system: file_system} do
Bypass.expect_once(bypass, "GET", "/mybucket/nonexistent.txt", fn conn ->
conn
|> Plug.Conn.put_resp_content_type("application/xml")
@ -946,14 +941,13 @@ defmodule Livebook.FileSystem.S3Test do
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/nonexistent.txt"
assert {:error, "no such file or directory"} =
FileSystem.read_stream_into(file_system, file_path, <<>>)
end
test "collects regular response", %{bypass: bypass} do
test "collects regular response", %{bypass: bypass, file_system: file_system} do
content = """
<MyData>
<Info>this should not be parsed</Info>
@ -968,13 +962,12 @@ defmodule Livebook.FileSystem.S3Test do
|> Plug.Conn.resp(200, content)
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/dir/file.txt"
assert {:ok, ^content} = FileSystem.read_stream_into(file_system, file_path, <<>>)
end
test "collects chunked response", %{bypass: bypass} do
test "collects chunked response", %{bypass: bypass, file_system: file_system} do
chunk = String.duplicate("a", 2048)
Bypass.expect_once(bypass, "GET", "/mybucket/dir/file.txt", fn conn ->
@ -987,7 +980,6 @@ defmodule Livebook.FileSystem.S3Test do
end
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/dir/file.txt"
assert {:ok, content} = FileSystem.read_stream_into(file_system, file_path, <<>>)
@ -995,6 +987,79 @@ defmodule Livebook.FileSystem.S3Test do
end
end
describe "FileSystem.load/2" do
test "loads the region from fields map" do
fields = %{
bucket_url: "https://mybucket.s3.amazonaws.com",
region: "us-east-1",
access_key_id: "key",
secret_access_key: "secret"
}
hash = :crypto.hash(:sha256, fields.bucket_url)
id = "s3-#{Base.url_encode64(hash, padding: false)}"
assert FileSystem.load(%S3{}, fields) == %S3{
id: id,
bucket_url: fields.bucket_url,
region: fields.region,
access_key_id: fields.access_key_id,
secret_access_key: fields.secret_access_key
}
end
test "loads the region from bucket url" do
fields = %{
bucket_url: "https://mybucket.s3.us-east-1.amazonaws.com",
access_key_id: "key",
secret_access_key: "secret"
}
hash = :crypto.hash(:sha256, fields.bucket_url)
id = "s3-#{Base.url_encode64(hash, padding: false)}"
assert FileSystem.load(%S3{}, fields) == %S3{
id: id,
bucket_url: fields.bucket_url,
region: "us-east-1",
access_key_id: fields.access_key_id,
secret_access_key: fields.secret_access_key
}
end
test "loads the file system with string keys" do
fields = %{
"bucket_url" => "https://mybucket.s3.us-east-1.amazonaws.com",
"access_key_id" => "key",
"secret_access_key" => "secret"
}
hash = :crypto.hash(:sha256, fields["bucket_url"])
id = "s3-#{Base.url_encode64(hash, padding: false)}"
assert FileSystem.load(%S3{}, fields) == %S3{
id: id,
bucket_url: fields["bucket_url"],
region: "us-east-1",
access_key_id: fields["access_key_id"],
secret_access_key: fields["secret_access_key"]
}
end
end
describe "FileSystem.dump/1" do
test "dumps into a map ready to be stored" do
file_system = build(:fs_s3)
assert FileSystem.dump(file_system) == %{
bucket_url: "https://mybucket.s3.amazonaws.com",
region: "us-east-1",
access_key_id: "key",
secret_access_key: "secret"
}
end
end
# Helpers
defp bucket_url(port), do: "http://localhost:#{port}/mybucket"

View file

@ -24,10 +24,6 @@ defmodule Livebook.Hubs.ProviderTest do
assert_raise RuntimeError, "not implemented", fn -> Provider.disconnect(hub) end
end
test "capabilities/1", %{hub: hub} do
assert Provider.capabilities(hub) == [:list_secrets, :create_secret]
end
test "get_secrets/1 without startup secrets", %{hub: hub} do
secret = insert_secret(name: "GET_PERSONAL_SECRET")
assert secret in Provider.get_secrets(hub)

View file

@ -67,6 +67,19 @@ defmodule Livebook.Factory do
}
end
def build(:fs_s3) do
bucket_url = "https://mybucket.s3.amazonaws.com"
hash = :crypto.hash(:sha256, bucket_url)
%Livebook.FileSystem.S3{
id: "s3-#{Base.url_encode64(hash, padding: false)}",
bucket_url: bucket_url,
region: "us-east-1",
access_key_id: "key",
secret_access_key: "secret"
}
end
def build(factory_name, attrs) do
factory_name |> build() |> struct!(attrs)
end