Implement streaming for downloads and file systems (#2072)

Co-authored-by: José Valim <jose.valim@dashbit.co>
This commit is contained in:
Jonatan Kłosko 2023-07-14 22:16:52 +02:00 committed by GitHub
parent 9e7e2f1707
commit 19a5124d1a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 917 additions and 52 deletions

View file

@ -98,6 +98,15 @@ defmodule Livebook.Config do
Application.get_env(:livebook, :data_path) || :filename.basedir(:user_data, "livebook")
end
@doc """
Returns path to Livebook temporary dir.
"""
@spec tmp_path() :: String.t()
def tmp_path() do
tmp_dir = System.tmp_dir!() |> Path.expand()
Path.join(tmp_dir, "livebook")
end
@doc """
Returns the apps path.
"""

View file

@ -174,4 +174,60 @@ defprotocol Livebook.FileSystem do
"""
@spec resolve_path(t(), path(), String.t()) :: path()
def resolve_path(file_system, dir_path, subject)
@doc """
Initializes chunked write to the given file.
Should return the initial state, which is then reduced over in
`write_stream_chunk/3`
"""
@spec write_stream_init(t(), path(), keyword()) :: {:ok, state} | {:error, error()}
when state: term()
def write_stream_init(file_system, path, opts)
@doc """
Writes a file chunk.
There is no assumption on the chunk size, you can accumulate chunks
in `state` and perform the write operation once the desired chunk
size is achieved.
"""
@spec write_stream_chunk(t(), state, binary()) :: {:ok, state} | {:error, error()}
when state: term()
def write_stream_chunk(file_system, state, chunk)
@doc """
Finalizes chunked write operation.
This function is called when all chunks have been successfully
written.
Note that if the finish operation fails, `write_stream_halt/2`
is **not** expected to be called, so you should do the necessary
cleanup here in case of failure as well.
"""
@spec write_stream_finish(t(), state) :: :ok | {:error, error()} when state: term()
def write_stream_finish(file_system, state)
@doc """
Halts chunked write operation.
This function is called when writing any of the chunks fails or the
writing is aborted by the caller.
"""
@spec write_stream_halt(t(), state) :: :ok | {:error, error()} when state: term()
def write_stream_halt(file_system, state)
@doc """
Similar to `read/2`, but streams file contents into `collectable`
chunk by chunk.
The `Collectable` protocol does not make room for gracefully
signalling an error, so implementations generally raise an
exception. `read_stream_into/3` is not expected to raise, so make
sure to convert collectable exceptions into an error tuple.
"""
@spec read_stream_into(t(), path(), Collectable.t()) ::
{:ok, Collectable.t()} | {:error, error()}
def read_stream_into(file_system, path, collectable)
end

View file

@ -227,8 +227,8 @@ defmodule Livebook.FileSystem.File do
end
defp copy_regular_file(source, destination) do
with {:ok, content} <- read(source) do
write(destination, content)
with {:ok, _} <- read_stream_into(source, destination) do
:ok
end
end
@ -290,4 +290,45 @@ defmodule Livebook.FileSystem.File do
end
end)
end
@doc """
Similar to `read/2`, but streams file contents into `collectable`
chunk by chunk.
"""
@spec read_stream_into(t(), Collectable.t()) ::
{:ok, Collectable.t()} | {:error, FileSystem.error()}
def read_stream_into(file, collectable) do
FileSystem.read_stream_into(file.file_system, file.path, collectable)
end
end
defimpl Collectable, for: Livebook.FileSystem.File do
def into(%Livebook.FileSystem.File{file_system: file_system, path: path} = file) do
state = file_system |> Livebook.FileSystem.write_stream_init(path, []) |> unwrap!()
collector = fn
state, {:cont, chunk} when is_binary(chunk) ->
file_system
|> Livebook.FileSystem.write_stream_chunk(state, chunk)
|> unwrap!()
state, :done ->
file_system
|> Livebook.FileSystem.write_stream_finish(state)
|> unwrap!()
file
state, :halt ->
file_system
|> Livebook.FileSystem.write_stream_halt(state)
|> unwrap!()
end
{state, collector}
end
defp unwrap!(:ok), do: :ok
defp unwrap!({:ok, result}), do: result
defp unwrap!({:error, error}), do: raise(error)
end

View file

@ -40,6 +40,8 @@ end
defimpl Livebook.FileSystem, for: Livebook.FileSystem.Local do
alias Livebook.FileSystem
@stream_chunk_size_in_bytes 16384
def resource_identifier(file_system) do
{:local_file_system, node(file_system.origin_pid)}
end
@ -241,4 +243,59 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.Local do
{:error, "this local file system belongs to a different host"}
end
end
def write_stream_init(_file_system, path, _opts) do
FileSystem.Utils.assert_regular_path!(path)
downloads_dir = Path.join(Livebook.Config.tmp_path(), "downloads")
download_path = Path.join(downloads_dir, Livebook.Utils.random_id())
with :ok <- File.mkdir_p(downloads_dir),
{:ok, device} <- File.open(download_path, [:write]) do
{:ok, %{path: path, download_path: download_path, device: device}}
else
{:error, error} -> FileSystem.Utils.posix_error(error)
end
end
def write_stream_chunk(_file_system, state, chunk) when is_binary(chunk) do
case IO.binwrite(state.device, chunk) do
:ok -> {:ok, state}
{:error, error} -> FileSystem.Utils.posix_error(error)
end
end
def write_stream_finish(_file_system, state) do
File.close(state.device)
with :ok <- File.mkdir_p(Path.dirname(state.path)),
:ok <- File.rename(state.download_path, state.path) do
:ok
else
{:error, error} ->
File.rm(state.download_path)
FileSystem.Utils.posix_error(error)
end
end
def write_stream_halt(_file_system, state) do
File.close(state.device)
File.rm(state.download_path)
:ok
end
def read_stream_into(_file_system, path, collectable) do
FileSystem.Utils.assert_regular_path!(path)
try do
result =
path
|> File.stream!([], @stream_chunk_size_in_bytes)
|> Enum.into(collectable)
{:ok, result}
rescue
error in File.Error -> FileSystem.Utils.posix_error(error.reason)
end
end
end

View file

@ -216,6 +216,111 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
FileSystem.Utils.resolve_unix_like_path(dir_path, subject)
end
def write_stream_init(_file_system, path, opts) do
opts = Keyword.validate!(opts, part_size: 50_000_000)
FileSystem.Utils.assert_regular_path!(path)
"/" <> key = path
{:ok,
%{
key: key,
parts: 0,
etags: [],
current_chunks: [],
current_size: 0,
part_size: opts[:part_size],
upload_id: nil
}}
end
def write_stream_chunk(file_system, state, chunk) when is_binary(chunk) do
chunk_size = byte_size(chunk)
state = update_in(state.current_size, &(&1 + chunk_size))
state = update_in(state.current_chunks, &[chunk | &1])
if state.current_size >= state.part_size do
maybe_state =
if state.upload_id do
{:ok, state}
else
with {:ok, upload_id} <- create_multipart_upload(file_system, state.key) do
{:ok, %{state | upload_id: upload_id}}
end
end
with {:ok, state} <- maybe_state do
upload_part_from_state(file_system, state, state.part_size)
end
else
{:ok, state}
end
end
defp upload_part_from_state(file_system, state, part_size) do
<<part::binary-size(part_size), rest::binary>> =
state.current_chunks
|> Enum.reverse()
|> IO.iodata_to_binary()
parts = state.parts + 1
with {:ok, %{etag: etag}} <- upload_part(file_system, state.key, state.upload_id, parts, part) do
{:ok,
%{
state
| current_chunks: [rest],
current_size: byte_size(rest),
etags: [etag | state.etags],
parts: parts
}}
end
end
def write_stream_finish(file_system, state) do
if state.upload_id do
maybe_state =
if state.current_size > 0 do
upload_part_from_state(file_system, state, state.current_size)
else
{:ok, state}
end
with {:ok, state} <- maybe_state,
:ok <-
complete_multipart_upload(
file_system,
state.key,
state.upload_id,
Enum.reverse(state.etags)
) do
:ok
else
{:error, error} ->
abort_multipart_upload(file_system, state.key, state.upload_id)
{:error, error}
end
else
content = state.current_chunks |> Enum.reverse() |> IO.iodata_to_binary()
put_object(file_system, state.key, content)
end
end
def write_stream_halt(file_system, state) do
if state.upload_id do
abort_multipart_upload(file_system, state.key, state.upload_id)
else
:ok
end
end
def read_stream_into(file_system, path, collectable) do
FileSystem.Utils.assert_regular_path!(path)
"/" <> key = path
multipart_get_object(file_system, key, collectable)
end
# Requests
defp list_objects(file_system, opts) do
@ -258,15 +363,24 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
end
defp get_object(file_system, key) do
case request(file_system, :get, "/" <> encode_key(key)) do
case request(file_system, :get, "/" <> encode_key(key), long: true) do
{:ok, 200, _headers, body} -> {:ok, body}
{:ok, 404, _headers, _body} -> FileSystem.Utils.posix_error(:enoent)
other -> request_response_to_error(other)
end
end
defp multipart_get_object(file_system, key, collectable) do
case download(file_system, "/" <> encode_key(key), collectable) do
{:ok, collectable} -> {:ok, collectable}
{:error, _message, 404} -> FileSystem.Utils.posix_error(:enoent)
{:error, message, _status} -> {:error, message}
end
end
defp put_object(file_system, key, content) do
case request(file_system, :put, "/" <> encode_key(key), body: content) |> decode() do
case request(file_system, :put, "/" <> encode_key(key), body: content, long: true)
|> decode() do
{:ok, 200, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
@ -344,6 +458,66 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
end
end
defp create_multipart_upload(file_system, key) do
query = %{"uploads" => ""}
case request(file_system, :post, "/" <> encode_key(key), query: query, body: "")
|> decode() do
{:ok, 200, _headers, %{"InitiateMultipartUploadResult" => %{"UploadId" => upload_id}}} ->
{:ok, upload_id}
other ->
request_response_to_error(other)
end
end
defp upload_part(file_system, key, upload_id, part_number, content) do
query = %{"uploadId" => upload_id, "partNumber" => part_number}
case request(file_system, :put, "/" <> encode_key(key),
query: query,
body: content,
long: true
)
|> decode() do
{:ok, 200, headers, _body} ->
{:ok, etag} = HTTP.fetch_header(headers, "etag")
{:ok, %{etag: etag}}
other ->
request_response_to_error(other)
end
end
defp complete_multipart_upload(file_system, key, upload_id, etags) do
query = %{"uploadId" => upload_id}
parts =
for {etag, n} <- Enum.with_index(etags, 1) do
%{"PartNumber" => n, "ETag" => etag}
end
body =
%{"CompleteMultipartUpload" => %{"Part" => parts}}
|> XML.encode_to_iodata!()
|> IO.iodata_to_binary()
case request(file_system, :post, "/" <> encode_key(key), query: query, body: body)
|> decode() do
{:ok, 200, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
end
defp abort_multipart_upload(file_system, key, upload_id) do
query = %{"uploadId" => upload_id}
case request(file_system, :delete, "/" <> encode_key(key), query: query) |> decode() do
{:ok, 204, _headers, _body} -> :ok
other -> request_response_to_error(other)
end
end
defp encode_key(key) do
key
|> String.split("/")
@ -384,38 +558,61 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
query = opts[:query] || %{}
headers = opts[:headers] || []
body = opts[:body]
long = Keyword.get(opts, :long, false)
%{host: host} = URI.parse(file_system.bucket_url)
timeout_opts = if(long, do: [timeout: 60_000], else: [])
url = file_system.bucket_url <> path <> "?" <> URI.encode_query(query)
url = url(file_system, path, query)
headers = headers(file_system, method, url, headers, body)
body = body && {"application/octet-stream", body}
HTTP.request(method, url, [headers: headers, body: body] ++ timeout_opts)
end
defp download(file_system, path, collectable, opts \\ []) do
query = opts[:query] || %{}
headers = opts[:headers] || []
url = url(file_system, path, query)
headers = headers(file_system, :get, url, headers)
HTTP.download(url, collectable, headers: headers)
end
defp url(file_system, path, query) do
file_system.bucket_url <> path <> "?" <> URI.encode_query(query)
end
defp headers(file_system, method, url, headers, body \\ nil) do
now = NaiveDateTime.utc_now() |> NaiveDateTime.to_erl()
%{host: host} = URI.parse(file_system.bucket_url)
headers = [{"Host", host} | headers]
headers =
:aws_signature.sign_v4(
file_system.access_key_id,
file_system.secret_access_key,
file_system.region,
"s3",
now,
Atom.to_string(method),
url,
headers,
body || "",
uri_encode_path: false
)
body = body && {"application/octet-stream", body}
HTTP.request(method, url, headers: headers, body: body)
:aws_signature.sign_v4(
file_system.access_key_id,
file_system.secret_access_key,
file_system.region,
"s3",
now,
Atom.to_string(method),
url,
headers,
body || "",
uri_encode_path: false
)
end
defp decode({:ok, status, headers, body}) do
guess_xml? = String.starts_with?(body, "<?xml")
case HTTP.fetch_content_type(headers) do
{:ok, content_type} when content_type in ["text/xml", "application/xml"] ->
{:ok, status, headers, XML.decode!(body)}
# Apparently some requests return XML without content-type
:error when guess_xml? ->
{:ok, status, headers, XML.decode!(body)}
_ ->
{:ok, status, headers, body}
end

View file

@ -1719,7 +1719,7 @@ defmodule Livebook.Session do
end
defp session_tmp_dir(session_id) do
livebook_tmp_path()
Livebook.Config.tmp_path()
|> Path.join("sessions/#{session_id}")
|> FileSystem.Utils.ensure_dir_path()
|> FileSystem.File.local()
@ -1735,7 +1735,7 @@ defmodule Livebook.Session do
"""
@spec local_assets_path(String.t()) :: String.t()
def local_assets_path(hash) do
Path.join([livebook_tmp_path(), "assets", encode_path_component(hash)])
Path.join([Livebook.Config.tmp_path(), "assets", encode_path_component(hash)])
end
@doc """
@ -1765,11 +1765,6 @@ defmodule Livebook.Session do
String.replace(component, [".", "/", "\\", ":"], "_")
end
defp livebook_tmp_path() do
tmp_dir = System.tmp_dir!() |> Path.expand()
Path.join(tmp_dir, "livebook")
end
defp initialize_files_from(state, {:inline, contents_map}) do
write_attachment_file_entries(state, fn destination_file, file_entry ->
case Map.fetch(contents_map, file_entry.name) do
@ -1786,8 +1781,8 @@ defmodule Livebook.Session do
|> Livebook.Utils.expand_url(file_entry.name)
|> Livebook.Notebook.ContentLoader.rewrite_url()
case fetch_content(source_url) do
{:ok, content} -> FileSystem.File.write(destination_file, content)
case download_content(source_url, destination_file) do
:ok -> :ok
{:error, _message, 404} -> :ok
{:error, message, _status} -> {:error, message}
end
@ -2491,8 +2486,8 @@ defmodule Livebook.Session do
defp file_entry_path_from_url(state, name, url, callback) do
fetcher = fn cache_file ->
case fetch_content(url) do
{:ok, content} -> FileSystem.File.write(cache_file, content)
case download_content(url, cache_file) do
:ok -> :ok
{:error, message, _} -> {:error, message}
end
end
@ -2731,11 +2726,9 @@ defmodule Livebook.Session do
def to_attachment_file_entry(session, %{type: :url} = file_entry) do
destination = FileSystem.File.resolve(session.files_dir, file_entry.name)
case fetch_content(file_entry.url) do
{:ok, content} ->
with :ok <- FileSystem.File.write(destination, content) do
{:ok, %{name: file_entry.name, type: :attachment}}
end
case download_content(file_entry.url, destination) do
:ok ->
{:ok, %{name: file_entry.name, type: :attachment}}
{:error, message, _status} ->
{:error, message}
@ -2746,16 +2739,13 @@ defmodule Livebook.Session do
{:ok, file_entry}
end
defp fetch_content(url) do
case Livebook.Utils.HTTP.request(:get, url) do
{:ok, 200, _headers, body} ->
{:ok, body}
defp download_content(url, file) do
case Livebook.Utils.HTTP.download(url, file) do
{:ok, _file} ->
:ok
{:ok, status, _headers, _body} ->
{:error, "failed to download file from the given URL", status}
_ ->
{:error, "failed to download file from the given URL", nil}
{:error, message, status} ->
{:error, "download failed, " <> message, status}
end
end
end

View file

@ -83,6 +83,125 @@ defmodule Livebook.Utils.HTTP do
end)
end
@doc """
Downloads resource at the given URL into `collectable`.
If collectable raises and error, it is rescued and an error tuple
is returned.
## Options
* `:headers` - request headers
"""
@spec download(String.t(), Collectable.t(), keyword()) ::
{:ok, Collectable.t()} | {:error, String.t(), status()}
def download(url, collectable, opts \\ []) do
headers = build_headers(opts[:headers] || [])
request = {url, headers}
http_opts = [ssl: http_ssl_opts()]
caller = self()
receiver = fn reply_info ->
request_id = elem(reply_info, 0)
# Cancel the request if the caller terminates
if Process.alive?(caller) do
send(caller, {:http, reply_info})
else
:httpc.cancel_request(request_id)
end
end
opts = [stream: :self, sync: false, receiver: receiver]
{:ok, request_id} = :httpc.request(:get, request, http_opts, opts)
try do
{acc, collector} = Collectable.into(collectable)
try do
download_loop(%{
request_id: request_id,
total_size: nil,
size: nil,
acc: acc,
collector: collector
})
catch
kind, reason ->
collector.(acc, :halt)
:httpc.cancel_request(request_id)
exception = Exception.normalize(kind, reason, __STACKTRACE__)
{:error, Exception.message(exception), nil}
else
{:ok, state} ->
acc = state.collector.(state.acc, :done)
{:ok, acc}
{:error, message, status} ->
collector.(acc, :halt)
:httpc.cancel_request(request_id)
{:error, message, status}
end
catch
kind, reason ->
:httpc.cancel_request(request_id)
exception = Exception.normalize(kind, reason, __STACKTRACE__)
{:error, Exception.message(exception), nil}
end
end
defp download_loop(state) do
receive do
{:http, reply_info} when elem(reply_info, 0) == state.request_id ->
download_receive(state, reply_info)
end
end
defp download_receive(_state, {_, {:error, error}}) do
{:error, "reason: #{inspect(error)}", nil}
end
defp download_receive(state, {_, {{_, 200, _}, _headers, body}}) do
acc = state.collector.(state.acc, {:cont, body})
{:ok, %{state | acc: acc}}
end
defp download_receive(_state, {_, {{_, status, _}, _headers, _body}}) do
{:error, "got HTTP status: #{status}", status}
end
defp download_receive(state, {_, :stream_start, headers}) do
total_size = total_size(headers)
download_loop(%{state | total_size: total_size, size: 0})
end
defp download_receive(state, {_, :stream, body_part}) do
acc = state.collector.(state.acc, {:cont, body_part})
state = %{state | acc: acc}
part_size = byte_size(body_part)
state = update_in(state.size, &(&1 + part_size))
download_loop(state)
end
defp download_receive(state, {_, :stream_end, _headers}) do
{:ok, state}
end
defp total_size(headers) do
case List.keyfind(headers, ~c"content-length", 0) do
{_, content_length} ->
List.to_integer(content_length)
_ ->
nil
end
end
# Load SSL certificates
crt_file = CAStore.file_path()

View file

@ -268,7 +268,7 @@ defmodule Livebook.FileSystem.FileTest do
describe "copy/2" do
@tag :tmp_dir
test "supports regular files from different file systems via explicit read and write",
test "supports regular files from different file systems via stream read and write",
%{tmp_dir: tmp_dir} do
bypass = Bypass.open()
s3_fs = FileSystem.S3.new("http://localhost:#{bypass.port}/mybucket", "key", "secret")
@ -281,6 +281,7 @@ defmodule Livebook.FileSystem.FileTest do
src_file = FileSystem.File.new(local_fs, Path.join(tmp_dir, "src_file.txt"))
dest_file = FileSystem.File.new(s3_fs, "/dest_file.txt")
# Note: the content is small, so write is a single request
Bypass.expect_once(bypass, "PUT", "/mybucket/dest_file.txt", fn conn ->
assert {:ok, "content", conn} = Plug.Conn.read_body(conn)
@ -291,7 +292,7 @@ defmodule Livebook.FileSystem.FileTest do
end
@tag :tmp_dir
test "supports directories from different file systems via explicit read and write",
test "supports directories from different file systems via stream read and write",
%{tmp_dir: tmp_dir} do
bypass = Bypass.open()
s3_fs = FileSystem.S3.new("http://localhost:#{bypass.port}/mybucket", "key", "secret")
@ -309,6 +310,7 @@ defmodule Livebook.FileSystem.FileTest do
src_dir = FileSystem.File.new(local_fs, Path.join(tmp_dir, "src_dir") <> "/")
dest_dir = FileSystem.File.new(s3_fs, "/dest_dir/")
# Note: the content is small, so write is a single request
Bypass.expect_once(bypass, "PUT", "/mybucket/dest_dir/nested/file.txt", fn conn ->
assert {:ok, "content", conn} = Plug.Conn.read_body(conn)
Plug.Conn.resp(conn, 200, "")
@ -442,4 +444,34 @@ defmodule Livebook.FileSystem.FileTest do
assert %{path: p("/dir/.txt")} = FileSystem.File.ensure_extension(dir, ".txt")
end
end
describe "Collectable into" do
@tag :tmp_dir
test "uses chunked write to file", %{tmp_dir: tmp_dir} do
path = Path.join(tmp_dir, "dir/file.txt")
file = FileSystem.File.local(path)
chunk = String.duplicate("a", 2048)
chunk |> List.duplicate(10) |> Enum.into(file)
assert FileSystem.File.read(file) == {:ok, String.duplicate(chunk, 10)}
end
end
describe "read_stream_into/2" do
@tag :tmp_dir
test "collects file contents", %{tmp_dir: tmp_dir} do
create_tree!(tmp_dir,
dir: [
"file.txt": "content"
]
)
path = Path.join(tmp_dir, "dir/file.txt")
file = FileSystem.File.local(path)
assert {:ok, "content"} = FileSystem.File.read_stream_into(file, <<>>)
end
end
end

View file

@ -516,4 +516,96 @@ defmodule Livebook.FileSystem.LocalTest do
)
end
end
describe "FileSystem chunked write" do
@tag :tmp_dir
test "writes chunks to file", %{tmp_dir: tmp_dir} do
file_system = Local.new()
file_path = Path.join(tmp_dir, "file.txt")
assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, [])
chunk = String.duplicate("a", 2048)
state =
for _ <- 1..10, reduce: state do
state ->
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, chunk)
state
end
assert :ok = FileSystem.write_stream_finish(file_system, state)
assert File.read!(file_path) == String.duplicate(chunk, 10)
end
@tag :tmp_dir
test "creates nonexistent directories", %{tmp_dir: tmp_dir} do
file_system = Local.new()
file_path = Path.join(tmp_dir, "dir/nested/file.txt")
assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, [])
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, "a")
assert :ok = FileSystem.write_stream_finish(file_system, state)
assert File.read!(file_path) == "a"
end
@tag :tmp_dir
test "overrides existing files on finish", %{tmp_dir: tmp_dir} do
create_tree!(tmp_dir,
"file.txt": "content"
)
file_system = Local.new()
file_path = Path.join(tmp_dir, "file.txt")
assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, [])
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, "new content")
assert :ok = FileSystem.write_stream_finish(file_system, state)
assert File.read!(file_path) == "new content"
end
@tag :tmp_dir
test "does not overrides existing files when halted", %{tmp_dir: tmp_dir} do
create_tree!(tmp_dir,
"file.txt": "content"
)
file_system = Local.new()
file_path = Path.join(tmp_dir, "file.txt")
assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, [])
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, "new content")
assert :ok = FileSystem.write_stream_halt(file_system, state)
assert File.read!(file_path) == "content"
end
end
describe "FileSystem.read_stream_into/2" do
@tag :tmp_dir
test "returns an error when a nonexistent file is given", %{tmp_dir: tmp_dir} do
file_system = Local.new()
file_path = Path.join(tmp_dir, "nonexistent.txt")
assert {:error, "no such file or directory"} =
FileSystem.read_stream_into(file_system, file_path, <<>>)
end
@tag :tmp_dir
test "collects file contents", %{tmp_dir: tmp_dir} do
create_tree!(tmp_dir,
dir: [
"file.txt": "content"
]
)
file_system = Local.new()
file_path = Path.join(tmp_dir, "dir/file.txt")
assert {:ok, "content"} = FileSystem.read_stream_into(file_system, file_path, <<>>)
end
end
end

View file

@ -723,6 +723,278 @@ defmodule Livebook.FileSystem.S3Test do
end
end
describe "FileSystem chunked write" do
test "accumulates small chunks and sends a single request if the content is small",
%{bypass: bypass} do
Bypass.expect_once(bypass, "PUT", "/mybucket/dir/file.txt", fn conn ->
assert {:ok, "ab", conn} = Plug.Conn.read_body(conn)
Plug.Conn.resp(conn, 200, "")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/dir/file.txt"
assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, part_size: 5_000)
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, "a")
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, "b")
assert :ok = FileSystem.write_stream_finish(file_system, state)
end
test "creates a multi-part upload for contents over 50MB", %{bypass: bypass} do
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/dir/file.txt"
chunk_3kb = String.duplicate("a", 3_000)
assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, part_size: 5_000)
Bypass.expect_once(bypass, "POST", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploads" => ""} = conn.params
# AWS does not return Content-Type for this request, so we emulate that
Plug.Conn.resp(conn, 200, """
<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult>
<UploadId>1</UploadId>
</InitiateMultipartUploadResult>
""")
end)
Bypass.expect_once(bypass, "PUT", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploadId" => "1", "partNumber" => "1"} = conn.params
assert {:ok, body, conn} = Plug.Conn.read_body(conn)
assert byte_size(body) == 5_000
conn
|> Plug.Conn.put_resp_header("ETag", "value1")
|> Plug.Conn.resp(200, "")
end)
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, chunk_3kb)
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, chunk_3kb)
Bypass.expect_once(bypass, "PUT", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploadId" => "1", "partNumber" => "2"} = conn.params
assert {:ok, body, conn} = Plug.Conn.read_body(conn)
assert byte_size(body) == 5_000
conn
|> Plug.Conn.put_resp_header("ETag", "value2")
|> Plug.Conn.resp(200, "")
end)
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, chunk_3kb)
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, chunk_3kb)
Bypass.expect_once(bypass, "PUT", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploadId" => "1", "partNumber" => "3"} = conn.params
assert {:ok, body, conn} = Plug.Conn.read_body(conn)
assert byte_size(body) == 2_000
conn
|> Plug.Conn.put_resp_header("ETag", "value3")
|> Plug.Conn.resp(200, "")
end)
expected_body =
"""
<CompleteMultipartUpload>
<Part>
<ETag>value1</ETag>
<PartNumber>1</PartNumber>
</Part>
<Part>
<ETag>value2</ETag>
<PartNumber>2</PartNumber>
</Part>
<Part>
<ETag>value3</ETag>
<PartNumber>3</PartNumber>
</Part>
</CompleteMultipartUpload>
"""
|> String.replace(~r/\s/, "")
Bypass.expect_once(bypass, "POST", "/mybucket/dir/file.txt", fn conn ->
assert {:ok, ^expected_body, conn} = Plug.Conn.read_body(conn)
conn
|> Plug.Conn.put_resp_content_type("application/xml")
|> Plug.Conn.resp(200, """
<CompleteMultipartUploadResult>
</CompleteMultipartUploadResult>
""")
end)
assert :ok = FileSystem.write_stream_finish(file_system, state)
end
test "aborts the multi-part upload when halted", %{bypass: bypass} do
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/dir/file.txt"
chunk_5kb = String.duplicate("a", 5_000)
assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, part_size: 5_000)
Bypass.expect_once(bypass, "POST", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploads" => ""} = conn.params
# AWS does not return Content-Type for this request, so we emulate that
Plug.Conn.resp(conn, 200, """
<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult>
<UploadId>1</UploadId>
</InitiateMultipartUploadResult>
""")
end)
Bypass.expect_once(bypass, "PUT", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploadId" => "1", "partNumber" => "1"} = conn.params
assert {:ok, body, conn} = Plug.Conn.read_body(conn)
assert byte_size(body) == 5_000
conn
|> Plug.Conn.put_resp_header("ETag", "value1")
|> Plug.Conn.resp(200, "")
end)
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, chunk_5kb)
Bypass.expect_once(bypass, "DELETE", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploadId" => "1"} = conn.params
Plug.Conn.resp(conn, 204, "")
end)
assert :ok = FileSystem.write_stream_halt(file_system, state)
end
test "aborts the multi-part upload when finish fails", %{bypass: bypass} do
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/dir/file.txt"
chunk_5kb = String.duplicate("a", 5_000)
assert {:ok, state} = FileSystem.write_stream_init(file_system, file_path, part_size: 5_000)
Bypass.expect_once(bypass, "POST", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploads" => ""} = conn.params
# AWS does not return Content-Type for this request, so we emulate that
Plug.Conn.resp(conn, 200, """
<?xml version="1.0" encoding="UTF-8"?>
<InitiateMultipartUploadResult>
<UploadId>1</UploadId>
</InitiateMultipartUploadResult>
""")
end)
Bypass.expect_once(bypass, "PUT", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploadId" => "1", "partNumber" => "1"} = conn.params
assert {:ok, body, conn} = Plug.Conn.read_body(conn)
assert byte_size(body) == 5_000
conn
|> Plug.Conn.put_resp_header("ETag", "value1")
|> Plug.Conn.resp(200, "")
end)
assert {:ok, state} = FileSystem.write_stream_chunk(file_system, state, chunk_5kb)
expected_body =
"""
<CompleteMultipartUpload>
<Part>
<ETag>value1</ETag>
<PartNumber>1</PartNumber>
</Part>
</CompleteMultipartUpload>
"""
|> String.replace(~r/\s/, "")
Bypass.expect_once(bypass, "POST", "/mybucket/dir/file.txt", fn conn ->
assert {:ok, ^expected_body, conn} = Plug.Conn.read_body(conn)
conn
|> Plug.Conn.put_resp_content_type("application/xml")
|> Plug.Conn.resp(500, """
<Error>
<Message>Error message</Message>
</Error>
""")
end)
Bypass.expect_once(bypass, "DELETE", "/mybucket/dir/file.txt", fn conn ->
assert %{"uploadId" => "1"} = conn.params
Plug.Conn.resp(conn, 204, "")
end)
assert {:error, "error message"} = FileSystem.write_stream_finish(file_system, state)
end
end
describe "FileSystem.read_stream_into/2" do
test "returns an error when a nonexistent key is given", %{bypass: bypass} do
Bypass.expect_once(bypass, "GET", "/mybucket/nonexistent.txt", fn conn ->
conn
|> Plug.Conn.put_resp_content_type("application/xml")
|> Plug.Conn.resp(404, """
<Error>
<Message>The specified key does not exist.</Message>
</Error>
""")
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/nonexistent.txt"
assert {:error, "no such file or directory"} =
FileSystem.read_stream_into(file_system, file_path, <<>>)
end
test "collects regular response", %{bypass: bypass} do
content = """
<MyData>
<Info>this should not be parsed</Info>
</MyData>
"""
Bypass.expect_once(bypass, "GET", "/mybucket/dir/file.txt", fn conn ->
# When reading the content should be returned as binary,
# regardless the content type
conn
|> Plug.Conn.put_resp_content_type("application/xml")
|> Plug.Conn.resp(200, content)
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/dir/file.txt"
assert {:ok, ^content} = FileSystem.read_stream_into(file_system, file_path, <<>>)
end
test "collects chunked response", %{bypass: bypass} do
chunk = String.duplicate("a", 2048)
Bypass.expect_once(bypass, "GET", "/mybucket/dir/file.txt", fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
for _ <- 1..10, reduce: conn do
conn ->
{:ok, conn} = Plug.Conn.chunk(conn, chunk)
conn
end
end)
file_system = S3.new(bucket_url(bypass.port), "key", "secret")
file_path = "/dir/file.txt"
assert {:ok, content} = FileSystem.read_stream_into(file_system, file_path, <<>>)
assert content == String.duplicate(chunk, 10)
end
end
# Helpers
defp bucket_url(port), do: "http://localhost:#{port}/mybucket"

View file

@ -1537,14 +1537,14 @@ defmodule Livebook.SessionTest do
url = "http://localhost:#{bypass.port}/files/image.jpg"
Bypass.expect_once(bypass, "GET", "/files/image.jpg", fn conn ->
Plug.Conn.resp(conn, 404, "not fount")
Plug.Conn.resp(conn, 404, "not found")
end)
session = start_session()
file_entry = %{type: :url, name: "image.jpg", url: url}
assert Session.to_attachment_file_entry(session, file_entry) ==
{:error, "failed to download file from the given URL"}
{:error, "download failed, got HTTP status: 404"}
end
end