diff --git a/lib/livebook/file_system.ex b/lib/livebook/file_system.ex index 108bfd8b9..dfbdac88a 100644 --- a/lib/livebook/file_system.ex +++ b/lib/livebook/file_system.ex @@ -229,4 +229,16 @@ defprotocol Livebook.FileSystem do @spec read_stream_into(t(), path(), Collectable.t()) :: {:ok, Collectable.t()} | {:error, error()} def read_stream_into(file_system, path, collectable) + + @doc """ + Loads fields into given file system. + """ + @spec load(t(), map()) :: struct() + def load(file_system, fields) + + @doc """ + Transforms file system to the attributes map. + """ + @spec dump(t()) :: map() + def dump(file_system) end diff --git a/lib/livebook/file_system/local.ex b/lib/livebook/file_system/local.ex index 574726b1b..0625efb1d 100644 --- a/lib/livebook/file_system/local.ex +++ b/lib/livebook/file_system/local.ex @@ -298,4 +298,8 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.Local do error in File.Error -> FileSystem.Utils.posix_error(error.reason) end end + + def load(_file_system, _fields), do: raise("not implemented") + + def dump(_file_system), do: raise("not implemented") end diff --git a/lib/livebook/file_system/s3.ex b/lib/livebook/file_system/s3.ex index fac6a8625..64f2cd997 100644 --- a/lib/livebook/file_system/s3.ex +++ b/lib/livebook/file_system/s3.ex @@ -76,8 +76,7 @@ end defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do alias Livebook.FileSystem - alias Livebook.Utils.HTTP - alias Livebook.FileSystem.S3.XML + alias Livebook.FileSystem.S3 def resource_identifier(file_system) do {:s3, file_system.bucket_url} @@ -94,10 +93,10 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do def list(file_system, path, recursive) do FileSystem.Utils.assert_dir_path!(path) "/" <> dir_key = path - delimiter = if recursive, do: nil, else: "/" + opts = [prefix: dir_key, delimiter: delimiter] - with {:ok, %{keys: keys}} <- list_objects(file_system, prefix: dir_key, delimiter: delimiter) do + with {:ok, %{keys: keys}} <- S3.Client.list_objects(file_system, opts) do if keys == [] and dir_key != "" do FileSystem.Utils.posix_error(:enoent) else @@ -110,13 +109,15 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do def read(file_system, path) do FileSystem.Utils.assert_regular_path!(path) "/" <> key = path - get_object(file_system, key) + + S3.Client.get_object(file_system, key) end def write(file_system, path, content) do FileSystem.Utils.assert_regular_path!(path) "/" <> key = path - put_object(file_system, key, content) + + S3.Client.put_object(file_system, key, content) end def access(_file_system, _path) do @@ -126,25 +127,26 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do def create_dir(file_system, path) do FileSystem.Utils.assert_dir_path!(path) "/" <> key = path + # S3 has no concept of directories, but keys with trailing # slash are interpreted as such, so we create an empty # object for the given key - put_object(file_system, key, nil) + S3.Client.put_object(file_system, key, nil) end def remove(file_system, path) do "/" <> key = path if FileSystem.Utils.dir_path?(path) do - with {:ok, %{keys: keys}} <- list_objects(file_system, prefix: key) do + with {:ok, %{keys: keys}} <- S3.Client.list_objects(file_system, prefix: key) do if keys == [] do FileSystem.Utils.posix_error(:enoent) else - delete_objects(file_system, keys) + S3.Client.delete_objects(file_system, keys) end end else - delete_object(file_system, key) + S3.Client.delete_object(file_system, key) end end @@ -154,7 +156,8 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do "/" <> destination_key = destination_path if FileSystem.Utils.dir_path?(source_path) do - with {:ok, %{bucket: bucket, keys: keys}} <- list_objects(file_system, prefix: source_key) do + with {:ok, %{bucket: bucket, keys: keys}} <- + S3.Client.list_objects(file_system, prefix: source_key) do if keys == [] do FileSystem.Utils.posix_error(:enoent) else @@ -163,7 +166,7 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do renamed_key = String.replace_prefix(key, source_key, destination_key) Task.async(fn -> - copy_object(file_system, bucket, key, renamed_key) + S3.Client.copy_object(file_system, bucket, key, renamed_key) end) end) |> Task.await_many(:infinity) @@ -175,8 +178,8 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do end end else - with {:ok, bucket} <- get_bucket_name(file_system) do - copy_object(file_system, bucket, source_key, destination_key) + with {:ok, bucket} <- S3.Client.get_bucket_name(file_system) do + S3.Client.copy_object(file_system, bucket, source_key, destination_key) end end end @@ -185,7 +188,7 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do FileSystem.Utils.assert_same_type!(source_path, destination_path) "/" <> destination_key = destination_path - with {:ok, destination_exists?} <- object_exists(file_system, destination_key) do + with {:ok, destination_exists?} <- S3.Client.object_exists(file_system, destination_key) do if destination_exists? do FileSystem.Utils.posix_error(:eexist) else @@ -202,14 +205,15 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do FileSystem.Utils.assert_regular_path!(path) "/" <> key = path - with {:ok, %{etag: etag}} <- head_object(file_system, key) do + with {:ok, %{etag: etag}} <- S3.Client.head_object(file_system, key) do {:ok, etag} end end def exists?(file_system, path) do "/" <> key = path - object_exists(file_system, key) + + S3.Client.object_exists(file_system, key) end def resolve_path(_file_system, dir_path, subject) do @@ -245,7 +249,7 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do if state.upload_id do {:ok, state} else - with {:ok, upload_id} <- create_multipart_upload(file_system, state.key) do + with {:ok, upload_id} <- S3.Client.create_multipart_upload(file_system, state.key) do {:ok, %{state | upload_id: upload_id}} end end @@ -266,7 +270,8 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do parts = state.parts + 1 - with {:ok, %{etag: etag}} <- upload_part(file_system, state.key, state.upload_id, parts, part) do + with {:ok, %{etag: etag}} <- + S3.Client.upload_part(file_system, state.key, state.upload_id, parts, part) do {:ok, %{ state @@ -289,7 +294,7 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do with {:ok, state} <- maybe_state, :ok <- - complete_multipart_upload( + S3.Client.complete_multipart_upload( file_system, state.key, state.upload_id, @@ -298,18 +303,18 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do :ok else {:error, error} -> - abort_multipart_upload(file_system, state.key, state.upload_id) + S3.Client.abort_multipart_upload(file_system, state.key, state.upload_id) {:error, error} end else content = state.current_chunks |> Enum.reverse() |> IO.iodata_to_binary() - put_object(file_system, state.key, content) + S3.Client.put_object(file_system, state.key, content) end end def write_stream_halt(file_system, state) do if state.upload_id do - abort_multipart_upload(file_system, state.key, state.upload_id) + S3.Client.abort_multipart_upload(file_system, state.key, state.upload_id) else :ok end @@ -318,313 +323,25 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do def read_stream_into(file_system, path, collectable) do FileSystem.Utils.assert_regular_path!(path) "/" <> key = path - multipart_get_object(file_system, key, collectable) + + S3.Client.multipart_get_object(file_system, key, collectable) end - # Requests - - defp list_objects(file_system, opts) do - prefix = opts[:prefix] - delimiter = opts[:delimiter] - - query = %{"list-type" => "2", "prefix" => prefix, "delimiter" => delimiter} - - case request(file_system, :get, "/", query: query) |> decode() do - {:ok, 200, _headers, %{"ListBucketResult" => result}} -> - bucket = result["Name"] - file_keys = result |> xml_get_list("Contents") |> Enum.map(& &1["Key"]) - prefix_keys = result |> xml_get_list("CommonPrefixes") |> Enum.map(& &1["Prefix"]) - keys = file_keys ++ prefix_keys - {:ok, %{bucket: bucket, keys: keys}} - - other -> - request_response_to_error(other) - end - end - - defp get_bucket_name(file_system) do - # We have bucket URL, but it's not straightforward to extract - # bucket name from the URL, because it may be either the path - # or a part of the host. - # - # Endpoints that return bucket information doesn't include the - # name, but the listing endpoint does, so we just list keys - # with an upper limit of 0 and retrieve the bucket name. - - query = %{"list-type" => "2", "max-keys" => "0"} - - case request(file_system, :get, "/", query: query) |> decode() do - {:ok, 200, _headers, %{"ListBucketResult" => %{"Name" => bucket}}} -> - {:ok, bucket} - - other -> - request_response_to_error(other) - end - end - - defp get_object(file_system, key) do - case request(file_system, :get, "/" <> encode_key(key), long: true) do - {:ok, 200, _headers, body} -> {:ok, body} - {:ok, 404, _headers, _body} -> FileSystem.Utils.posix_error(:enoent) - other -> request_response_to_error(other) - end - end - - defp multipart_get_object(file_system, key, collectable) do - case download(file_system, "/" <> encode_key(key), collectable) do - {:ok, collectable} -> {:ok, collectable} - {:error, _message, 404} -> FileSystem.Utils.posix_error(:enoent) - {:error, message, _status} -> {:error, message} - end - end - - defp put_object(file_system, key, content) do - case request(file_system, :put, "/" <> encode_key(key), body: content, long: true) - |> decode() do - {:ok, 200, _headers, _body} -> :ok - other -> request_response_to_error(other) - end - end - - defp head_object(file_system, key) do - case request(file_system, :head, "/" <> encode_key(key)) do - {:ok, 200, headers, _body} -> - {:ok, etag} = HTTP.fetch_header(headers, "etag") - {:ok, %{etag: etag}} - - {:ok, 404, _headers, _body} -> - FileSystem.Utils.posix_error(:enoent) - - other -> - request_response_to_error(other) - end - end - - defp copy_object(file_system, bucket, source_key, destination_key) do - copy_source = bucket <> "/" <> encode_key(source_key) - - headers = [{"x-amz-copy-source", copy_source}] - - case request(file_system, :put, "/" <> encode_key(destination_key), headers: headers) - |> decode() do - {:ok, 200, _headers, _body} -> :ok - {:ok, 404, _headers, _body} -> FileSystem.Utils.posix_error(:enoent) - other -> request_response_to_error(other) - end - end - - defp delete_object(file_system, key) do - case request(file_system, :delete, "/" <> encode_key(key)) |> decode() do - {:ok, 204, _headers, _body} -> :ok - {:ok, 404, _headers, _body} -> :ok - other -> request_response_to_error(other) - end - end - - defp delete_objects(file_system, keys) do - objects = Enum.map(keys, fn key -> %{"Key" => key} end) - - body = - %{"Delete" => %{"Object" => objects, "Quiet" => "true"}} - |> XML.encode_to_iodata!() - |> IO.iodata_to_binary() - - body_md5 = :crypto.hash(:md5, body) |> Base.encode64() - - headers = [{"Content-MD5", body_md5}] - - case request(file_system, :post, "/", query: %{"delete" => ""}, headers: headers, body: body) - |> decode() do - {:ok, 200, _headers, %{"Error" => errors}} -> {:error, format_errors(errors)} - {:ok, 200, _headers, _body} -> :ok - other -> request_response_to_error(other) - end - end - - defp object_exists(file_system, key) do - # It is possible for /dir/obj to exist without the /dir/ object, - # but we still consider it as existing. That's why we list - # objects instead of checking the key directly. - - with {:ok, %{keys: keys}} <- list_objects(file_system, prefix: key, delimiter: "/") do - exists? = - if String.ends_with?(key, "/") do - keys != [] - else - key in keys - end - - {:ok, exists?} - end - end - - defp create_multipart_upload(file_system, key) do - query = %{"uploads" => ""} - - case request(file_system, :post, "/" <> encode_key(key), query: query, body: "") - |> decode() do - {:ok, 200, _headers, %{"InitiateMultipartUploadResult" => %{"UploadId" => upload_id}}} -> - {:ok, upload_id} - - other -> - request_response_to_error(other) - end - end - - defp upload_part(file_system, key, upload_id, part_number, content) do - query = %{"uploadId" => upload_id, "partNumber" => part_number} - - case request(file_system, :put, "/" <> encode_key(key), - query: query, - body: content, - long: true - ) - |> decode() do - {:ok, 200, headers, _body} -> - {:ok, etag} = HTTP.fetch_header(headers, "etag") - {:ok, %{etag: etag}} - - other -> - request_response_to_error(other) - end - end - - defp complete_multipart_upload(file_system, key, upload_id, etags) do - query = %{"uploadId" => upload_id} - - parts = - for {etag, n} <- Enum.with_index(etags, 1) do - %{"PartNumber" => n, "ETag" => etag} - end - - body = - %{"CompleteMultipartUpload" => %{"Part" => parts}} - |> XML.encode_to_iodata!() - |> IO.iodata_to_binary() - - case request(file_system, :post, "/" <> encode_key(key), query: query, body: body) - |> decode() do - {:ok, 200, _headers, _body} -> :ok - other -> request_response_to_error(other) - end - end - - defp abort_multipart_upload(file_system, key, upload_id) do - query = %{"uploadId" => upload_id} - - case request(file_system, :delete, "/" <> encode_key(key), query: query) |> decode() do - {:ok, 204, _headers, _body} -> :ok - other -> request_response_to_error(other) - end - end - - defp encode_key(key) do - key - |> String.split("/") - |> Enum.map_join("/", fn segment -> URI.encode(segment, &URI.char_unreserved?/1) end) - end - - defp request_response_to_error(error) - - defp request_response_to_error({:ok, 403, _headers, %{"Error" => %{"Message" => message}}}) do - {:error, "access denied, " <> Livebook.Utils.downcase_first(message)} - end - - defp request_response_to_error({:ok, 403, _headers, _body}) do - {:error, "access denied"} - end - - defp request_response_to_error({:ok, _status, _headers, %{"Error" => error}}) do - {:error, format_errors(error)} - end - - defp request_response_to_error({:ok, _status, _headers, _body}) do - {:error, "unexpected response"} - end - - defp request_response_to_error({:error, _error}) do - {:error, "failed to make an HTTP request"} - end - - defp format_errors(%{"Message" => message}) do - Livebook.Utils.downcase_first(message) - end - - defp format_errors([%{"Message" => message} | errors]) do - Livebook.Utils.downcase_first(message) <> ", and #{length(errors)} more errors" - end - - defp request(file_system, method, path, opts \\ []) do - query = opts[:query] || %{} - headers = opts[:headers] || [] - body = opts[:body] - long = Keyword.get(opts, :long, false) - - timeout_opts = if(long, do: [timeout: 60_000], else: []) - - url = url(file_system, path, query) - headers = headers(file_system, method, url, headers, body) - body = body && {"application/octet-stream", body} - - HTTP.request(method, url, [headers: headers, body: body] ++ timeout_opts) - end - - defp download(file_system, path, collectable, opts \\ []) do - query = opts[:query] || %{} - headers = opts[:headers] || [] - - url = url(file_system, path, query) - headers = headers(file_system, :get, url, headers) - - HTTP.download(url, collectable, headers: headers) - end - - defp url(file_system, path, query) do - file_system.bucket_url <> path <> "?" <> URI.encode_query(query) - end - - defp headers(file_system, method, url, headers, body \\ nil) do - now = NaiveDateTime.utc_now() |> NaiveDateTime.to_erl() - %{host: host} = URI.parse(file_system.bucket_url) - headers = [{"Host", host} | headers] - - :aws_signature.sign_v4( - file_system.access_key_id, - file_system.secret_access_key, - file_system.region, - "s3", - now, - Atom.to_string(method), - url, - headers, - body || "", - uri_encode_path: false + def load(_file_system, %{"bucket_url" => _} = fields) do + S3.new(fields["bucket_url"], fields["access_key_id"], fields["secret_access_key"], + region: fields["region"] ) end - defp decode({:ok, status, headers, body}) do - guess_xml? = String.starts_with?(body, " - {:ok, status, headers, XML.decode!(body)} - - # Apparently some requests return XML without content-type - :error when guess_xml? -> - {:ok, status, headers, XML.decode!(body)} - - _ -> - {:ok, status, headers, body} - end + def load(_file_system, fields) do + S3.new(fields.bucket_url, fields.access_key_id, fields.secret_access_key, + region: fields[:region] + ) end - defp decode(other), do: other - - defp xml_get_list(xml_map, key) do - case xml_map do - %{^key => item} when is_map(item) -> [item] - %{^key => items} when is_list(items) -> items - _ -> [] - end + def dump(file_system) do + file_system + |> Map.from_struct() + |> Map.take([:bucket_url, :region, :access_key_id, :secret_access_key]) end end diff --git a/lib/livebook/file_system/s3/client.ex b/lib/livebook/file_system/s3/client.ex new file mode 100644 index 000000000..37c512ccb --- /dev/null +++ b/lib/livebook/file_system/s3/client.ex @@ -0,0 +1,355 @@ +defmodule Livebook.FileSystem.S3.Client do + @moduledoc false + + alias Livebook.FileSystem + alias Livebook.FileSystem.S3 + + @doc """ + Sends a request to the bucket to get list of objects. + """ + @spec list_objects(S3.t(), keyword()) :: {:ok, map()} | {:error, String.t()} + def list_objects(file_system, opts) do + prefix = opts[:prefix] + delimiter = opts[:delimiter] + query = %{"list-type" => "2", "prefix" => prefix, "delimiter" => delimiter} + + case get(file_system, "/", query: query) do + {:ok, 200, _headers, %{"ListBucketResult" => result}} -> + file_keys = xml_get_list(result, "Contents", "Key") + prefix_keys = xml_get_list(result, "CommonPrefixes", "Prefix") + + {:ok, %{bucket: result["Name"], keys: file_keys ++ prefix_keys}} + + other -> + request_response_to_error(other) + end + end + + @doc """ + Sends a request to the bucket to get the bucket name. + """ + @spec get_bucket_name(S3.t()) :: {:ok, String.t()} | {:error, String.t()} + def get_bucket_name(file_system) do + # We have bucket URL, but it's not straightforward to extract + # bucket name from the URL, because it may be either the path + # or a part of the host. + # + # Endpoints that return bucket information doesn't include the + # name, but the listing endpoint does, so we just list keys + # with an upper limit of 0 and retrieve the bucket name. + + case get(file_system, "/", query: %{"list-type" => "2", "max-keys" => "0"}) do + {:ok, 200, _headers, %{"ListBucketResult" => result}} -> {:ok, result["Name"]} + other -> request_response_to_error(other) + end + end + + @doc """ + Sends a request to the bucket to get an object. + """ + @spec get_object(S3.t(), String.t()) :: {:ok, map()} | {:error, String.t()} + def get_object(file_system, key) do + case get(file_system, "/" <> encode_key(key), long: true, decode: false) do + {:ok, 200, _headers, body} -> {:ok, body} + {:ok, 404, _headers, _body} -> FileSystem.Utils.posix_error(:enoent) + other -> request_response_to_error(other) + end + end + + @doc """ + Sends a request to the bucket to download an object with multipart. + """ + @spec multipart_get_object(S3.t(), String.t(), Collectable.t()) :: + {:ok, map()} | {:error, String.t()} + def multipart_get_object(file_system, key, collectable) do + case download(file_system, "/" <> encode_key(key), collectable) do + {:ok, collectable} -> {:ok, collectable} + {:error, _message, 404} -> FileSystem.Utils.posix_error(:enoent) + {:error, message, _status} -> {:error, message} + end + end + + @doc """ + Sends a request to the bucket to update an object's content. + """ + @spec put_object(S3.t(), String.t(), String.t() | nil) :: :ok | {:error, String.t()} + def put_object(file_system, key, content) do + case put(file_system, "/" <> encode_key(key), body: content, long: true) do + {:ok, 200, _headers, _body} -> :ok + other -> request_response_to_error(other) + end + end + + @doc """ + Sends a request to the bucket to get the object's ETAG from given key. + """ + @spec head_object(S3.t(), String.t()) :: {:ok, map()} | {:error, String.t()} + def head_object(file_system, key) do + with {:ok, 200, headers, _body} <- head(file_system, "/" <> encode_key(key)), + {:ok, etag} <- Livebook.Utils.HTTP.fetch_header(headers, "etag") do + {:ok, %{etag: etag}} + else + {:ok, 404, _headers, _body} -> FileSystem.Utils.posix_error(:enoent) + other -> request_response_to_error(other) + end + end + + @doc """ + Sends a request to the bucket to copy an object to given destination. + """ + @spec copy_object(S3.t(), String.t(), String.t(), String.t()) :: :ok | {:error, String.t()} + def copy_object(file_system, bucket, source_key, destination_key) do + copy_source = bucket <> "/" <> encode_key(source_key) + headers = [{"x-amz-copy-source", copy_source}] + + case put(file_system, "/" <> encode_key(destination_key), headers: headers) do + {:ok, 200, _headers, _body} -> :ok + {:ok, 404, _headers, _body} -> FileSystem.Utils.posix_error(:enoent) + other -> request_response_to_error(other) + end + end + + @doc """ + Sends a request to the bucket to delete an object. + """ + @spec delete_object(S3.t(), String.t()) :: :ok | {:error, String.t()} + def delete_object(file_system, key) do + case delete(file_system, "/" <> encode_key(key)) do + {:ok, 204, _headers, _body} -> :ok + {:ok, 404, _headers, _body} -> :ok + other -> request_response_to_error(other) + end + end + + @doc """ + Sends a request to the bucket to delete a list of objects. + """ + @spec delete_objects(S3.t(), list(String.t())) :: :ok | {:error, String.t()} + def delete_objects(file_system, keys) do + objects = Enum.map(keys, &%{"Key" => &1}) + + body = + %{"Delete" => %{"Object" => objects, "Quiet" => "true"}} + |> S3.XML.encode_to_iodata!() + |> IO.iodata_to_binary() + + body_md5 = :crypto.hash(:md5, body) + headers = [{"Content-MD5", Base.encode64(body_md5)}] + + case post(file_system, "/", query: %{"delete" => ""}, headers: headers, body: body) do + {:ok, 200, _headers, %{"Error" => _}} = result -> request_response_to_error(result) + {:ok, 200, _headers, _body} -> :ok + other -> request_response_to_error(other) + end + end + + @doc """ + Sends a request to the bucket to check if object exists. + """ + @spec object_exists(S3.t(), String.t()) :: {:ok, boolean()} | {:error, String.t()} + def object_exists(file_system, key) do + # It is possible for /dir/obj to exist without the /dir/ object, + # but we still consider it as existing. That's why we list + # objects instead of checking the key directly. + + with {:ok, %{keys: keys}} <- list_objects(file_system, prefix: key, delimiter: "/") do + exists? = + if String.ends_with?(key, "/"), + do: keys != [], + else: key in keys + + {:ok, exists?} + end + end + + @doc """ + Sends a request to the bucket to multipart upload an object. + """ + @spec create_multipart_upload(S3.t(), String.t()) :: {:ok, String.t()} | {:error, String.t()} + def create_multipart_upload(file_system, key) do + query = %{"uploads" => ""} + + case post(file_system, "/" <> encode_key(key), query: query, body: "") do + {:ok, 200, _headers, %{"InitiateMultipartUploadResult" => result}} -> + {:ok, result["UploadId"]} + + other -> + request_response_to_error(other) + end + end + + @doc """ + Sends a request to the bucket to upload an object partially. + """ + @spec upload_part(S3.t(), String.t(), String.t(), pos_integer(), String.t()) :: + {:ok, map()} | {:error, String.t()} + def upload_part(file_system, key, upload_id, part_number, content) do + query = %{"uploadId" => upload_id, "partNumber" => part_number} + opts = [query: query, body: content, long: true] + + with {:ok, 200, headers, _body} <- put(file_system, "/" <> encode_key(key), opts), + {:ok, etag} <- Livebook.Utils.HTTP.fetch_header(headers, "etag") do + {:ok, %{etag: etag}} + else + other -> request_response_to_error(other) + end + end + + @doc """ + Sends a request to the bucket to finish the multipart object upload. + """ + @spec complete_multipart_upload(S3.t(), String.t(), String.t(), list(String.t())) :: + :ok | {:error, String.t()} + def complete_multipart_upload(file_system, key, upload_id, etags) do + query = %{"uploadId" => upload_id} + parts = for {etag, n} <- Enum.with_index(etags, 1), do: %{"PartNumber" => n, "ETag" => etag} + + body = + %{"CompleteMultipartUpload" => %{"Part" => parts}} + |> S3.XML.encode_to_iodata!() + |> IO.iodata_to_binary() + + case post(file_system, "/" <> encode_key(key), query: query, body: body) do + {:ok, 200, _headers, _body} -> :ok + other -> request_response_to_error(other) + end + end + + @doc """ + Sends a request to the bucket to abort the multipart object upload. + """ + @spec abort_multipart_upload(S3.t(), String.t(), String.t()) :: :ok | {:error, String.t()} + def abort_multipart_upload(file_system, key, upload_id) do + query = %{"uploadId" => upload_id} + + case delete(file_system, "/" <> encode_key(key), query: query) do + {:ok, 204, _headers, _body} -> :ok + other -> request_response_to_error(other) + end + end + + # Convenient API + + defp get(file_system, path, opts), do: request(file_system, :get, path, opts) + defp post(file_system, path, opts), do: request(file_system, :post, path, opts) + defp put(file_system, path, opts), do: request(file_system, :put, path, opts) + defp head(file_system, path), do: request(file_system, :head, path, []) + defp delete(file_system, path, opts \\ []), do: request(file_system, :delete, path, opts) + + defp download(file_system, path, collectable, opts \\ []) do + query = opts[:query] || %{} + headers = opts[:headers] || [] + url = build_url(file_system, path, query) + headers = sign_headers(file_system, :get, url, headers) + + Livebook.Utils.HTTP.download(url, collectable, headers: headers) + end + + # Private + + defp encode_key(key) do + key + |> String.split("/") + |> Enum.map_join("/", fn segment -> URI.encode(segment, &URI.char_unreserved?/1) end) + end + + defp build_url(file_system, path, query) do + query_string = URI.encode_query(query) + query_string = if query_string != "", do: "?#{query_string}", else: "" + + file_system.bucket_url <> path <> query_string + end + + defp sign_headers(file_system, method, url, headers, body \\ nil) do + now = NaiveDateTime.utc_now() |> NaiveDateTime.to_erl() + %{host: host} = URI.parse(file_system.bucket_url) + headers = [{"Host", host} | headers] + + :aws_signature.sign_v4( + file_system.access_key_id, + file_system.secret_access_key, + file_system.region, + "s3", + now, + Atom.to_string(method), + url, + headers, + body || "", + uri_encode_path: false + ) + end + + defp request(file_system, method, path, opts) do + long = Keyword.get(opts, :long, false) + decode? = Keyword.get(opts, :decode, true) + + query = opts[:query] || %{} + headers = opts[:headers] || [] + body = opts[:body] + timeout = if long, do: 60_000, else: 30_000 + + url = build_url(file_system, path, query) + headers = sign_headers(file_system, method, url, headers, body) + body = body && {"application/octet-stream", body} + + result = + Livebook.Utils.HTTP.request(method, url, headers: headers, body: body, timeout: timeout) + + if decode?, do: decode(result), else: result + end + + defp decode({:ok, status, headers, body}) do + if xml?(headers, body), + do: {:ok, status, headers, S3.XML.decode!(body)}, + else: {:ok, status, headers, body} + end + + defp decode({:error, _} = error), do: error + + defp xml?(headers, body) do + guess_xml? = String.starts_with?(body, " true + # Apparently some requests return XML without content-type + :error when guess_xml? -> true + _otherwise -> false + end + end + + defp request_response_to_error({:ok, 403, _headers, %{"Error" => %{"Message" => message}}}) do + {:error, "access denied, " <> Livebook.Utils.downcase_first(message)} + end + + defp request_response_to_error({:ok, 403, _headers, _body}) do + {:error, "access denied"} + end + + defp request_response_to_error({:ok, _status, _headers, %{"Error" => %{"Message" => message}}}) do + {:error, Livebook.Utils.downcase_first(message)} + end + + defp request_response_to_error({:ok, _status, _headers, %{"Error" => [_ | _] = errors}}) do + [%{"Message" => message} | errors] = errors + {:error, Livebook.Utils.downcase_first(message) <> ", and #{length(errors)} more errors"} + end + + defp request_response_to_error({:ok, _status, _headers, _body}) do + {:error, "unexpected response"} + end + + defp request_response_to_error(_otherwise) do + {:error, "failed to make an http request"} + end + + defp xml_get_list(result, xml_key, map_key) do + items = + case result do + %{^xml_key => item} when is_map(item) -> [item] + %{^xml_key => items} when is_list(items) -> items + _ -> [] + end + + Enum.map(items, & &1[map_key]) + end +end diff --git a/lib/livebook/hubs.ex b/lib/livebook/hubs.ex index 302cde6c3..bac126662 100644 --- a/lib/livebook/hubs.ex +++ b/lib/livebook/hubs.ex @@ -26,16 +26,6 @@ defmodule Livebook.Hubs do end end - @doc """ - Gets a list of hubs from storage with given capabilities. - """ - @spec get_hubs(Provider.capabilities()) :: list(Provider.t()) - def get_hubs(capabilities) do - for hub <- get_hubs(), - capability?(hub, capabilities), - do: hub - end - @doc """ Gets a list of metadatas from storage. """ @@ -180,7 +170,7 @@ defmodule Livebook.Hubs do """ @spec connect_hubs() :: :ok def connect_hubs do - for hub <- get_hubs([:connect]), do: connect_hub(hub) + for hub <- get_hubs(), do: connect_hub(hub) :ok end @@ -209,7 +199,7 @@ defmodule Livebook.Hubs do """ @spec get_secrets() :: list(Secret.t()) def get_secrets do - for hub <- get_hubs([:list_secrets]), + for hub <- get_hubs(), secret <- Provider.get_secrets(hub), do: secret end @@ -219,13 +209,9 @@ defmodule Livebook.Hubs do """ @spec get_secrets(Provider.t()) :: list(Secret.t()) def get_secrets(hub) do - if capability?(hub, [:list_secrets]) do - hub - |> Provider.get_secrets() - |> Enum.sort() - else - [] - end + hub + |> Provider.get_secrets() + |> Enum.sort() end @doc """ @@ -236,8 +222,6 @@ defmodule Livebook.Hubs do | {:error, Ecto.Changeset.t()} | {:transport_error, String.t()} def create_secret(hub, %Secret{} = secret) do - true = capability?(hub, [:create_secret]) - Provider.create_secret(hub, secret) end @@ -277,12 +261,4 @@ defmodule Livebook.Hubs do def verify_notebook_stamp(hub, notebook_source, stamp) do Provider.verify_notebook_stamp(hub, notebook_source, stamp) end - - @doc """ - Checks the hub capability for given hub. - """ - @spec capability?(Provider.t(), list(atom())) :: boolean() - def capability?(hub, capabilities) do - capabilities -- Provider.capabilities(hub) == [] - end end diff --git a/lib/livebook/hubs/personal.ex b/lib/livebook/hubs/personal.ex index 5644d606e..0240c741f 100644 --- a/lib/livebook/hubs/personal.ex +++ b/lib/livebook/hubs/personal.ex @@ -82,8 +82,8 @@ defmodule Livebook.Hubs.Personal do @doc """ Get the secrets list from storage. """ - @spec get_secrets :: [Secret.t()] - def get_secrets do + @spec get_secrets() :: list(Secret.t()) + def get_secrets() do Enum.map(Storage.all(@secrets_namespace), &to_secret/1) end @@ -163,8 +163,6 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Personal do def disconnect(_personal), do: raise("not implemented") - def capabilities(_personal), do: ~w(list_secrets create_secret)a - def get_secrets(_personal) do Personal.get_secrets() end diff --git a/lib/livebook/hubs/provider.ex b/lib/livebook/hubs/provider.ex index 8d986884f..b7cf27000 100644 --- a/lib/livebook/hubs/provider.ex +++ b/lib/livebook/hubs/provider.ex @@ -3,9 +3,6 @@ defprotocol Livebook.Hubs.Provider do alias Livebook.Secrets.Secret - @type capability :: :connect | :list_secrets | :create_secret - @type capabilities :: list(capability()) - @typedoc """ An provider-specific map stored as notebook stamp. @@ -53,12 +50,6 @@ defprotocol Livebook.Hubs.Provider do @spec disconnect(t()) :: :ok def disconnect(hub) - @doc """ - Gets the capabilities of the given hub. - """ - @spec capabilities(t()) :: capabilities() - def capabilities(hub) - @doc """ Gets the secrets of the given hub. """ diff --git a/lib/livebook/hubs/team.ex b/lib/livebook/hubs/team.ex index 44af0899e..0e99ecbf6 100644 --- a/lib/livebook/hubs/team.ex +++ b/lib/livebook/hubs/team.ex @@ -131,8 +131,6 @@ defimpl Livebook.Hubs.Provider, for: Livebook.Hubs.Team do def disconnect(team), do: TeamClient.stop(team.id) - def capabilities(_team), do: ~w(connect list_secrets create_secret)a - def get_secrets(team), do: TeamClient.get_secrets(team.id) def create_secret(team, secret), do: Teams.create_secret(team, secret) diff --git a/test/livebook/file_system/s3_test.exs b/test/livebook/file_system/s3_test.exs index 01b2d155c..280bf98ea 100644 --- a/test/livebook/file_system/s3_test.exs +++ b/test/livebook/file_system/s3_test.exs @@ -1,12 +1,14 @@ defmodule Livebook.FileSystem.S3Test do - use ExUnit.Case, async: true + use Livebook.DataCase, async: true alias Livebook.FileSystem alias Livebook.FileSystem.S3 setup do bypass = Bypass.open() - {:ok, bypass: bypass} + file_system = build(:fs_s3, bucket_url: bucket_url(bypass.port)) + + {:ok, bypass: bypass, file_system: file_system} end describe "new/3" do @@ -30,13 +32,13 @@ defmodule Livebook.FileSystem.S3Test do describe "FileSystem.default_path/1" do test "returns the root path" do - file_system = S3.new("https://example.com/mybucket", "key", "secret") + file_system = build(:fs_s3, bucket_url: "https://example.com/mybucket", region: "auto") assert FileSystem.default_path(file_system) == "/" end end describe "common request errors" do - test "authorization failure", %{bypass: bypass} do + test "authorization failure", %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> conn |> Plug.Conn.put_resp_content_type("application/xml") @@ -47,14 +49,13 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") dir_path = "/dir/" assert {:error, "access denied, reason for authorization failure"} = FileSystem.list(file_system, dir_path, false) end - test "an arbitrary error with message", %{bypass: bypass} do + test "an arbitrary error with message", %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> conn |> Plug.Conn.put_resp_content_type("application/xml") @@ -65,13 +66,12 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") dir_path = "/dir/" assert {:error, "error message"} = FileSystem.list(file_system, dir_path, false) end - test "successful response with unexpected body", %{bypass: bypass} do + test "successful response with unexpected body", %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> conn |> Plug.Conn.put_resp_content_type("application/xml") @@ -82,7 +82,6 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") dir_path = "/dir/" assert {:error, "unexpected response"} = FileSystem.list(file_system, dir_path, false) @@ -90,7 +89,8 @@ defmodule Livebook.FileSystem.S3Test do end describe "FileSystem.list/3" do - test "returns an error when a nonexistent directory is given", %{bypass: bypass} do + test "returns an error when a nonexistent directory is given", + %{bypass: bypass, file_system: file_system} do # When the directory doesn't exist, we get an empty list of matches Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> @@ -105,13 +105,14 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") dir_path = "/dir/" - assert {:error, "no such file or directory"} = FileSystem.list(file_system, dir_path, false) + assert {:error, "no such file or directory"} = + FileSystem.list(file_system, dir_path, false) end - test "does not return an error when the root directory is empty", %{bypass: bypass} do + test "does not return an error when the root directory is empty", + %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> assert %{"delimiter" => "/"} = conn.params @@ -124,13 +125,13 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") dir_path = "/" assert {:ok, []} = FileSystem.list(file_system, dir_path, false) end - test "returns a list of absolute child object paths", %{bypass: bypass} do + test "returns a list of absolute child object paths", + %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> assert %{"delimiter" => "/"} = conn.params @@ -152,7 +153,6 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") dir_path = "/" assert {:ok, paths} = FileSystem.list(file_system, dir_path, false) @@ -164,7 +164,8 @@ defmodule Livebook.FileSystem.S3Test do ] end - test "includes nested objects when called with recursive flag", %{bypass: bypass} do + test "includes nested objects when called with recursive flag", + %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> assert %{"delimiter" => ""} = conn.params @@ -195,7 +196,6 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") dir_path = "/" assert {:ok, paths} = FileSystem.list(file_system, dir_path, true) @@ -212,7 +212,8 @@ defmodule Livebook.FileSystem.S3Test do end describe "FileSystem.read/2" do - test "returns an error when a nonexistent key is given", %{bypass: bypass} do + test "returns an error when a nonexistent key is given", + %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "GET", "/mybucket/nonexistent.txt", fn conn -> conn |> Plug.Conn.put_resp_content_type("application/xml") @@ -223,13 +224,13 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/nonexistent.txt" assert {:error, "no such file or directory"} = FileSystem.read(file_system, file_path) end - test "returns object contents under the given key", %{bypass: bypass} do + test "returns object contents under the given key", + %{bypass: bypass, file_system: file_system} do content = """ this should not be parsed @@ -244,7 +245,6 @@ defmodule Livebook.FileSystem.S3Test do |> Plug.Conn.resp(200, content) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/dir/file.txt" assert {:ok, ^content} = FileSystem.read(file_system, file_path) @@ -252,7 +252,7 @@ defmodule Livebook.FileSystem.S3Test do end describe "FileSystem.write/3" do - test "writes contents under the file key", %{bypass: bypass} do + test "writes contents under the file key", %{bypass: bypass, file_system: file_system} do content = "content" Bypass.expect_once(bypass, "PUT", "/mybucket/dir/file.txt", fn conn -> @@ -261,16 +261,14 @@ defmodule Livebook.FileSystem.S3Test do Plug.Conn.resp(conn, 200, "") end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/dir/file.txt" assert :ok = FileSystem.write(file_system, file_path, content) end # Google Cloud Storage XML API returns this type of response. - test "returns success when the status is 200 even if the content type is text/html", %{ - bypass: bypass - } do + test "returns success when the status is 200 even if the content type is text/html", + %{bypass: bypass, file_system: file_system} do content = "content" Bypass.expect_once(bypass, "PUT", "/mybucket/dir/file.txt", fn conn -> @@ -281,7 +279,6 @@ defmodule Livebook.FileSystem.S3Test do |> Plug.Conn.resp(200, "") end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/dir/file.txt" assert :ok = FileSystem.write(file_system, file_path, content) @@ -289,14 +286,14 @@ defmodule Livebook.FileSystem.S3Test do end describe "FileSystem.create_dir/2" do - test "write empty content under the directory key", %{bypass: bypass} do + test "write empty content under the directory key", + %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "PUT", "/mybucket/dir/", fn conn -> assert {:ok, "", conn} = Plug.Conn.read_body(conn) Plug.Conn.resp(conn, 200, "") end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") dir_path = "/dir/" assert :ok = FileSystem.create_dir(file_system, dir_path) @@ -304,7 +301,8 @@ defmodule Livebook.FileSystem.S3Test do end describe "FileSystem.remove/2" do - test "returns successful value when a nonexistent key is given", %{bypass: bypass} do + test "returns successful value when a nonexistent key is given", + %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "DELETE", "/mybucket/file.txt", fn conn -> conn |> Plug.Conn.put_resp_content_type("application/xml") @@ -315,25 +313,23 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/file.txt" assert :ok = FileSystem.remove(file_system, file_path) end - test "deletes object under the corresponding key", %{bypass: bypass} do + test "deletes object under the corresponding key", %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "DELETE", "/mybucket/file.txt", fn conn -> Plug.Conn.resp(conn, 204, "") end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/file.txt" assert :ok = FileSystem.remove(file_system, file_path) end test "when a directory is given, recursively lists and batch deletes all matching keys", - %{bypass: bypass} do + %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> assert %{"prefix" => "dir/", "delimiter" => ""} = conn.params @@ -391,7 +387,6 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") dir_path = "/dir/" assert :ok = FileSystem.remove(file_system, dir_path) @@ -400,7 +395,7 @@ defmodule Livebook.FileSystem.S3Test do describe "FileSystem.copy/3" do test "raises an error if the given paths have different type" do - file_system = S3.new("https://example.com/mybucket", "key", "secret") + file_system = build(:fs_s3, bucket_url: "https://example.com/mybucket") src_file_path = "/src_file.txt" dest_dir_path = "/dir/" @@ -410,7 +405,7 @@ defmodule Livebook.FileSystem.S3Test do end test "given file paths, returns an error if the source object does not exist", - %{bypass: bypass} do + %{bypass: bypass, file_system: file_system} do # Request for the bucket name Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> conn @@ -434,7 +429,6 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") src_file_path = "/src_file.txt" dest_file_path = "/dest_file.txt" @@ -442,7 +436,8 @@ defmodule Livebook.FileSystem.S3Test do FileSystem.copy(file_system, src_file_path, dest_file_path) end - test "given file paths, copies contents into the new key", %{bypass: bypass} do + test "given file paths, copies contents into the new key", + %{bypass: bypass, file_system: file_system} do # Request for the bucket name Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> conn @@ -465,7 +460,6 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") src_file_path = "/src_file.txt" dest_file_path = "/dest_file.txt" @@ -473,7 +467,7 @@ defmodule Livebook.FileSystem.S3Test do end test "given directory paths, returns an error if the source directory does not exist", - %{bypass: bypass} do + %{bypass: bypass, file_system: file_system} do # Directory listing with no results Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> assert %{"prefix" => "src_dir/", "delimiter" => ""} = conn.params @@ -487,7 +481,6 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") src_dir_path = "/src_dir/" dest_dir_path = "/dest_dir/" @@ -496,7 +489,7 @@ defmodule Livebook.FileSystem.S3Test do end test "given directory paths, recursively lists all matching keys and individually copies objects", - %{bypass: bypass} do + %{bypass: bypass, file_system: file_system} do # Directory listing Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> assert %{"prefix" => "src_dir/", "delimiter" => ""} = conn.params @@ -542,7 +535,6 @@ defmodule Livebook.FileSystem.S3Test do end) end - file_system = S3.new(bucket_url(bypass.port), "key", "secret") src_dir_path = "/src_dir/" dest_dir_path = "/dest_dir/" @@ -552,7 +544,7 @@ defmodule Livebook.FileSystem.S3Test do describe "FileSystem.rename/3" do test "raises an error if the given paths have different type" do - file_system = S3.new("https://example.com/mybucket", "key", "secret") + file_system = build(:fs_s3, bucket_url: "https://example.com/mybucket") src_file_path = "/src_file.txt" dest_dir_path = "/dir/" @@ -561,7 +553,8 @@ defmodule Livebook.FileSystem.S3Test do end end - test "returns an error when the destination file exists", %{bypass: bypass} do + test "returns an error when the destination file exists", + %{bypass: bypass, file_system: file_system} do # Existence is verified by listing Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> assert %{"prefix" => "dest_file.txt", "delimiter" => "/"} = conn.params @@ -578,7 +571,6 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") src_file_path = "/src_file.txt" dest_file_path = "/dest_file.txt" @@ -589,7 +581,8 @@ defmodule Livebook.FileSystem.S3Test do # Rename is implemented as copy and delete, both of which are # tested separately, so here's just one integration test to # verify this behaviour - test "given file paths, copies the content and deletes the destination", %{bypass: bypass} do + test "given file paths, copies the content and deletes the destination", + %{bypass: bypass, file_system: file_system} do # Expects two requests: # * destination existence check by listing, we return no entries # * bucket name request @@ -618,7 +611,6 @@ defmodule Livebook.FileSystem.S3Test do Plug.Conn.resp(conn, 204, "") end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") src_file_path = "/src_file.txt" dest_file_path = "/dest_file.txt" @@ -627,25 +619,25 @@ defmodule Livebook.FileSystem.S3Test do end describe "FileSystem.etag_for/2" do - test "returns an error when a nonexistent key is given", %{bypass: bypass} do + test "returns an error when a nonexistent key is given", + %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "HEAD", "/mybucket/nonexistent.txt", fn conn -> Plug.Conn.resp(conn, 404, "") end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/nonexistent.txt" assert {:error, "no such file or directory"} = FileSystem.etag_for(file_system, file_path) end - test "returns the ETag value received from the server", %{bypass: bypass} do + test "returns the ETag value received from the server", + %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "HEAD", "/mybucket/nonexistent.txt", fn conn -> conn |> Plug.Conn.put_resp_header("ETag", "value") |> Plug.Conn.resp(200, "") end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/nonexistent.txt" assert {:ok, "value"} = FileSystem.etag_for(file_system, file_path) @@ -653,7 +645,8 @@ defmodule Livebook.FileSystem.S3Test do end describe "FileSystem.exists?/2" do - test "returns false when the given object doesn't exist", %{bypass: bypass} do + test "returns false when the given object doesn't exist", + %{bypass: bypass, file_system: file_system} do # Existence is verified by listing Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> assert %{"prefix" => "file.txt", "delimiter" => "/"} = conn.params @@ -667,13 +660,12 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/file.txt" assert {:ok, false} = FileSystem.exists?(file_system, file_path) end - test "returns true when the given object exists", %{bypass: bypass} do + test "returns true when the given object exists", %{bypass: bypass, file_system: file_system} do # Existence is verified by listing Bypass.expect_once(bypass, "GET", "/mybucket", fn conn -> assert %{"prefix" => "file.txt", "delimiter" => "/"} = conn.params @@ -690,7 +682,6 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/file.txt" assert {:ok, true} = FileSystem.exists?(file_system, file_path) @@ -699,7 +690,7 @@ defmodule Livebook.FileSystem.S3Test do describe "FileSystem.resolve_path/3" do test "resolves relative paths" do - file_system = S3.new("https://example.com/mybucket/", "key", "secret") + file_system = build(:fs_s3, bucket_url: "https://example.com/mybucket/") assert "/dir/" = FileSystem.resolve_path(file_system, "/dir/", "") assert "/dir/file.txt" = FileSystem.resolve_path(file_system, "/dir/", "file.txt") @@ -712,7 +703,7 @@ defmodule Livebook.FileSystem.S3Test do end test "resolves absolute paths" do - file_system = S3.new("https://example.com/mybucket/", "key", "secret") + file_system = build(:fs_s3, bucket_url: "https://example.com/mybucket/") assert "/" = FileSystem.resolve_path(file_system, "/dir/", "/") assert "/file.txt" = FileSystem.resolve_path(file_system, "/dir/", "/file.txt") @@ -725,29 +716,31 @@ defmodule Livebook.FileSystem.S3Test do describe "FileSystem chunked write" do test "accumulates small chunks and sends a single request if the content is small", - %{bypass: bypass} do + %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "PUT", "/mybucket/dir/file.txt", fn conn -> assert {:ok, "ab", conn} = Plug.Conn.read_body(conn) Plug.Conn.resp(conn, 200, "") end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/dir/file.txt" - assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, part_size: 5_000) + assert {:ok, state} = + FileSystem.write_stream_init(file_system, file_path, part_size: 5_000) + assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, "a") assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, "b") assert :ok = FileSystem.write_stream_finish(file_system, state) end - test "creates a multi-part upload for contents over 50MB", %{bypass: bypass} do - file_system = S3.new(bucket_url(bypass.port), "key", "secret") + test "creates a multi-part upload for contents over 50MB", + %{bypass: bypass, file_system: file_system} do file_path = "/dir/file.txt" chunk_3kb = String.duplicate("a", 3_000) - assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, part_size: 5_000) + assert {:ok, state} = + FileSystem.write_stream_init(file_system, file_path, part_size: 5_000) Bypass.expect_once(bypass, "POST", "/mybucket/dir/file.txt", fn conn -> assert %{"uploads" => ""} = conn.params @@ -830,13 +823,13 @@ defmodule Livebook.FileSystem.S3Test do assert :ok = FileSystem.write_stream_finish(file_system, state) end - test "aborts the multi-part upload when halted", %{bypass: bypass} do - file_system = S3.new(bucket_url(bypass.port), "key", "secret") + test "aborts the multi-part upload when halted", %{bypass: bypass, file_system: file_system} do file_path = "/dir/file.txt" chunk_5kb = String.duplicate("a", 5_000) - assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, part_size: 5_000) + assert {:ok, state} = + FileSystem.write_stream_init(file_system, file_path, part_size: 5_000) Bypass.expect_once(bypass, "POST", "/mybucket/dir/file.txt", fn conn -> assert %{"uploads" => ""} = conn.params @@ -870,13 +863,14 @@ defmodule Livebook.FileSystem.S3Test do assert :ok = FileSystem.write_stream_halt(file_system, state) end - test "aborts the multi-part upload when finish fails", %{bypass: bypass} do - file_system = S3.new(bucket_url(bypass.port), "key", "secret") + test "aborts the multi-part upload when finish fails", + %{bypass: bypass, file_system: file_system} do file_path = "/dir/file.txt" chunk_5kb = String.duplicate("a", 5_000) - assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, part_size: 5_000) + assert {:ok, state} = + FileSystem.write_stream_init(file_system, file_path, part_size: 5_000) Bypass.expect_once(bypass, "POST", "/mybucket/dir/file.txt", fn conn -> assert %{"uploads" => ""} = conn.params @@ -935,7 +929,8 @@ defmodule Livebook.FileSystem.S3Test do end describe "FileSystem.read_stream_into/2" do - test "returns an error when a nonexistent key is given", %{bypass: bypass} do + test "returns an error when a nonexistent key is given", + %{bypass: bypass, file_system: file_system} do Bypass.expect_once(bypass, "GET", "/mybucket/nonexistent.txt", fn conn -> conn |> Plug.Conn.put_resp_content_type("application/xml") @@ -946,14 +941,13 @@ defmodule Livebook.FileSystem.S3Test do """) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/nonexistent.txt" assert {:error, "no such file or directory"} = FileSystem.read_stream_into(file_system, file_path, <<>>) end - test "collects regular response", %{bypass: bypass} do + test "collects regular response", %{bypass: bypass, file_system: file_system} do content = """ this should not be parsed @@ -968,13 +962,12 @@ defmodule Livebook.FileSystem.S3Test do |> Plug.Conn.resp(200, content) end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/dir/file.txt" assert {:ok, ^content} = FileSystem.read_stream_into(file_system, file_path, <<>>) end - test "collects chunked response", %{bypass: bypass} do + test "collects chunked response", %{bypass: bypass, file_system: file_system} do chunk = String.duplicate("a", 2048) Bypass.expect_once(bypass, "GET", "/mybucket/dir/file.txt", fn conn -> @@ -987,7 +980,6 @@ defmodule Livebook.FileSystem.S3Test do end end) - file_system = S3.new(bucket_url(bypass.port), "key", "secret") file_path = "/dir/file.txt" assert {:ok, content} = FileSystem.read_stream_into(file_system, file_path, <<>>) @@ -995,6 +987,79 @@ defmodule Livebook.FileSystem.S3Test do end end + describe "FileSystem.load/2" do + test "loads the region from fields map" do + fields = %{ + bucket_url: "https://mybucket.s3.amazonaws.com", + region: "us-east-1", + access_key_id: "key", + secret_access_key: "secret" + } + + hash = :crypto.hash(:sha256, fields.bucket_url) + id = "s3-#{Base.url_encode64(hash, padding: false)}" + + assert FileSystem.load(%S3{}, fields) == %S3{ + id: id, + bucket_url: fields.bucket_url, + region: fields.region, + access_key_id: fields.access_key_id, + secret_access_key: fields.secret_access_key + } + end + + test "loads the region from bucket url" do + fields = %{ + bucket_url: "https://mybucket.s3.us-east-1.amazonaws.com", + access_key_id: "key", + secret_access_key: "secret" + } + + hash = :crypto.hash(:sha256, fields.bucket_url) + id = "s3-#{Base.url_encode64(hash, padding: false)}" + + assert FileSystem.load(%S3{}, fields) == %S3{ + id: id, + bucket_url: fields.bucket_url, + region: "us-east-1", + access_key_id: fields.access_key_id, + secret_access_key: fields.secret_access_key + } + end + + test "loads the file system with string keys" do + fields = %{ + "bucket_url" => "https://mybucket.s3.us-east-1.amazonaws.com", + "access_key_id" => "key", + "secret_access_key" => "secret" + } + + hash = :crypto.hash(:sha256, fields["bucket_url"]) + id = "s3-#{Base.url_encode64(hash, padding: false)}" + + assert FileSystem.load(%S3{}, fields) == %S3{ + id: id, + bucket_url: fields["bucket_url"], + region: "us-east-1", + access_key_id: fields["access_key_id"], + secret_access_key: fields["secret_access_key"] + } + end + end + + describe "FileSystem.dump/1" do + test "dumps into a map ready to be stored" do + file_system = build(:fs_s3) + + assert FileSystem.dump(file_system) == %{ + bucket_url: "https://mybucket.s3.amazonaws.com", + region: "us-east-1", + access_key_id: "key", + secret_access_key: "secret" + } + end + end + # Helpers defp bucket_url(port), do: "http://localhost:#{port}/mybucket" diff --git a/test/livebook/hubs/provider_test.exs b/test/livebook/hubs/provider_test.exs index df83ce191..7f9056bf8 100644 --- a/test/livebook/hubs/provider_test.exs +++ b/test/livebook/hubs/provider_test.exs @@ -24,10 +24,6 @@ defmodule Livebook.Hubs.ProviderTest do assert_raise RuntimeError, "not implemented", fn -> Provider.disconnect(hub) end end - test "capabilities/1", %{hub: hub} do - assert Provider.capabilities(hub) == [:list_secrets, :create_secret] - end - test "get_secrets/1 without startup secrets", %{hub: hub} do secret = insert_secret(name: "GET_PERSONAL_SECRET") assert secret in Provider.get_secrets(hub) diff --git a/test/support/factory.ex b/test/support/factory.ex index c44cb06f5..1243905e0 100644 --- a/test/support/factory.ex +++ b/test/support/factory.ex @@ -67,6 +67,19 @@ defmodule Livebook.Factory do } end + def build(:fs_s3) do + bucket_url = "https://mybucket.s3.amazonaws.com" + hash = :crypto.hash(:sha256, bucket_url) + + %Livebook.FileSystem.S3{ + id: "s3-#{Base.url_encode64(hash, padding: false)}", + bucket_url: bucket_url, + region: "us-east-1", + access_key_id: "key", + secret_access_key: "secret" + } + end + def build(factory_name, attrs) do factory_name |> build() |> struct!(attrs) end