mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-09-04 20:14:57 +08:00
Support reading file entry content from the runtime (#2044)
This commit is contained in:
parent
3231f2bd72
commit
c02eb984f8
9 changed files with 637 additions and 111 deletions
|
@ -42,8 +42,8 @@ defprotocol Livebook.FileSystem do
|
|||
|
||||
* `:local` - if the resource is local to its node
|
||||
|
||||
* `:global` - if the resource is external and available
|
||||
accessible from any node
|
||||
* `:global` - if the resource is external and accessible
|
||||
from any node
|
||||
|
||||
"""
|
||||
@spec type(t()) :: :local | :global
|
||||
|
|
|
@ -6,6 +6,35 @@ defprotocol Livebook.Runtime do
|
|||
# Usually a runtime involves a set of processes responsible for
|
||||
# evaluation, which could be running on a different node, however
|
||||
# the protocol does not require that.
|
||||
#
|
||||
# ## Files
|
||||
#
|
||||
# The runtime can request access to notebook files by sending a
|
||||
# request:
|
||||
#
|
||||
# * `{:runtime_file_entry_path_request, reply_to, name}`
|
||||
#
|
||||
# to which the runtime owner is supposed to reply with
|
||||
# `{:runtime_file_entry_path_reply, reply}` where `reply` is either
|
||||
# `{:ok, path}` or `{:error, message}` if accessing the file rails.
|
||||
# Note that `path` should be accessible within the runtime and can
|
||||
# be obtained using `transfer_file/4`.
|
||||
#
|
||||
# Similarly the runtime can request details about the file source:
|
||||
#
|
||||
# * `{:runtime_file_entry_spec_request, reply_to, name}`
|
||||
#
|
||||
# Instead of a path, the owner replies with a details map.
|
||||
#
|
||||
# ## Apps
|
||||
#
|
||||
# The runtime may be used to run Livebook apps and can request app
|
||||
# information by sending a request:
|
||||
#
|
||||
# * `{:runtime_app_info_request, reply_to}`
|
||||
#
|
||||
# The owner replies with `{:runtime_app_info_reply, info}`, where
|
||||
# info is a details map.
|
||||
|
||||
@typedoc """
|
||||
An arbitrary term identifying an evaluation container.
|
||||
|
@ -315,10 +344,10 @@ defprotocol Livebook.Runtime do
|
|||
|
||||
The runtime may ask for the file by sending a request:
|
||||
|
||||
* `{:runtime_file_lookup, reply_to, file_ref}`
|
||||
* `{:runtime_file_path_request, reply_to, file_ref}`
|
||||
|
||||
to which the runtime owner is supposed to reply with
|
||||
`{:runtime_file_lookup_reply, reply}` where `reply` is either
|
||||
`{:runtime_file_path_reply, reply}` where `reply` is either
|
||||
`{:ok, path}` or `:error` if no matching file can be found. Note
|
||||
that `path` should be accessible within the runtime and can be
|
||||
obtained using `transfer_file/4`.
|
||||
|
@ -409,7 +438,7 @@ defprotocol Livebook.Runtime do
|
|||
Outputs may include input fields. The evaluation may then request
|
||||
the current value of a previously rendered input by sending
|
||||
|
||||
* `{:runtime_evaluation_input, evaluation_ref, reply_to, input_id}`
|
||||
* `{:runtime_evaluation_input_request, evaluation_ref, reply_to, input_id}`
|
||||
|
||||
to the runtime owner who is supposed to reply with
|
||||
`{:runtime_evaluation_input_reply, reply}` where `reply` is either
|
||||
|
|
|
@ -261,8 +261,8 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
|
|||
|
||||
defp io_request({:livebook_get_file_path, file_id}, state) do
|
||||
# We could cache forever, but we don't want the cache to pile up
|
||||
# indefinitely, so we just reuse the input cache is cleared for
|
||||
# ever evaluation
|
||||
# indefinitely, so we just reuse the input cache which is cleared
|
||||
# for ever evaluation
|
||||
|
||||
cache_id = {:file_path, file_id}
|
||||
|
||||
|
@ -274,6 +274,32 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
|
|||
{input_cache[cache_id], %{state | input_cache: input_cache}}
|
||||
end
|
||||
|
||||
defp io_request({:livebook_get_file_entry_path, name}, state) do
|
||||
# Same as above as for caching
|
||||
|
||||
cache_id = {:file_entry_path, name}
|
||||
|
||||
input_cache =
|
||||
Map.put_new_lazy(state.input_cache, cache_id, fn ->
|
||||
request_file_entry_path(name, state)
|
||||
end)
|
||||
|
||||
{input_cache[cache_id], %{state | input_cache: input_cache}}
|
||||
end
|
||||
|
||||
defp io_request({:livebook_get_file_entry_spec, name}, state) do
|
||||
# Same as above as for caching
|
||||
|
||||
cache_id = {:file_entry_spec, name}
|
||||
|
||||
input_cache =
|
||||
Map.put_new_lazy(state.input_cache, cache_id, fn ->
|
||||
request_file_entry_spec(name, state)
|
||||
end)
|
||||
|
||||
{input_cache[cache_id], %{state | input_cache: input_cache}}
|
||||
end
|
||||
|
||||
# Token is a unique, reevaluation-safe opaque identifier
|
||||
defp io_request(:livebook_generate_token, state) do
|
||||
token = {state.ref, state.token_count}
|
||||
|
@ -351,52 +377,53 @@ defmodule Livebook.Runtime.Evaluator.IOProxy do
|
|||
end
|
||||
|
||||
defp request_input_value(input_id, state) do
|
||||
send(state.send_to, {:runtime_evaluation_input, state.ref, self(), input_id})
|
||||
request = {:runtime_evaluation_input_request, state.ref, self(), input_id}
|
||||
reply_tag = :runtime_evaluation_input_reply
|
||||
|
||||
ref = Process.monitor(state.send_to)
|
||||
|
||||
receive do
|
||||
{:runtime_evaluation_input_reply, {:ok, value}} ->
|
||||
Process.demonitor(ref, [:flush])
|
||||
{:ok, value}
|
||||
|
||||
{:runtime_evaluation_input_reply, :error} ->
|
||||
Process.demonitor(ref, [:flush])
|
||||
{:error, :not_found}
|
||||
|
||||
{:DOWN, ^ref, :process, _object, _reason} ->
|
||||
{:error, :terminated}
|
||||
with {:ok, reply} <- runtime_request(state, request, reply_tag) do
|
||||
with :error <- reply, do: {:error, :not_found}
|
||||
end
|
||||
end
|
||||
|
||||
defp request_file_path(file_id, state) do
|
||||
send(state.send_to, {:runtime_file_lookup, self(), file_id})
|
||||
request = {:runtime_file_path_request, self(), file_id}
|
||||
reply_tag = :runtime_file_path_reply
|
||||
|
||||
ref = Process.monitor(state.send_to)
|
||||
|
||||
receive do
|
||||
{:runtime_file_lookup_reply, {:ok, path}} ->
|
||||
Process.demonitor(ref, [:flush])
|
||||
{:ok, path}
|
||||
|
||||
{:runtime_file_lookup_reply, :error} ->
|
||||
Process.demonitor(ref, [:flush])
|
||||
{:error, :not_found}
|
||||
|
||||
{:DOWN, ^ref, :process, _object, _reason} ->
|
||||
{:error, :terminated}
|
||||
with {:ok, reply} <- runtime_request(state, request, reply_tag) do
|
||||
with :error <- reply, do: {:error, :not_found}
|
||||
end
|
||||
end
|
||||
|
||||
defp request_file_entry_path(name, state) do
|
||||
request = {:runtime_file_entry_path_request, self(), name}
|
||||
reply_tag = :runtime_file_entry_path_reply
|
||||
|
||||
with {:ok, reply} <- runtime_request(state, request, reply_tag), do: reply
|
||||
end
|
||||
|
||||
defp request_file_entry_spec(name, state) do
|
||||
request = {:runtime_file_entry_spec_request, self(), name}
|
||||
reply_tag = :runtime_file_entry_spec_reply
|
||||
|
||||
with {:ok, reply} <- runtime_request(state, request, reply_tag), do: reply
|
||||
end
|
||||
|
||||
defp request_app_info(state) do
|
||||
send(state.send_to, {:runtime_app_info_request, self()})
|
||||
request = {:runtime_app_info_request, self()}
|
||||
reply_tag = :runtime_app_info_reply
|
||||
|
||||
with {:ok, reply} <- runtime_request(state, request, reply_tag), do: reply
|
||||
end
|
||||
|
||||
defp runtime_request(state, request, reply_tag) do
|
||||
send(state.send_to, request)
|
||||
|
||||
ref = Process.monitor(state.send_to)
|
||||
|
||||
receive do
|
||||
{:runtime_app_info_reply, app_info} ->
|
||||
{^reply_tag, reply} ->
|
||||
Process.demonitor(ref, [:flush])
|
||||
{:ok, app_info}
|
||||
{:ok, reply}
|
||||
|
||||
{:DOWN, ^ref, :process, _object, _reason} ->
|
||||
{:error, :terminated}
|
||||
|
|
|
@ -598,14 +598,41 @@ defmodule Livebook.Session do
|
|||
GenServer.cast(pid, {:delete_file_entry, self(), name})
|
||||
end
|
||||
|
||||
@doc """
|
||||
Removes cache file for the given entry file if one exists.
|
||||
"""
|
||||
@spec clear_file_entry_cache(id(), String.t()) :: :ok
|
||||
def clear_file_entry_cache(session_id, name) do
|
||||
cache_file = file_entry_cache_file(session_id, name)
|
||||
FileSystem.File.remove(cache_file)
|
||||
:ok
|
||||
end
|
||||
|
||||
@doc """
|
||||
Checks whether caching applies to the given file entry.
|
||||
"""
|
||||
@spec file_entry_cacheable?(t(), Notebook.file_entry()) :: boolean()
|
||||
def file_entry_cacheable?(session, file_entry) do
|
||||
case file_entry do
|
||||
%{type: :attachment} ->
|
||||
not FileSystem.File.local?(session.files_dir)
|
||||
|
||||
%{type: :file, file: file} ->
|
||||
not FileSystem.File.local?(file)
|
||||
|
||||
%{type: :url} ->
|
||||
true
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Sends save request to the server.
|
||||
|
||||
If there's a file set and the notebook changed since the last save,
|
||||
it will be persisted to said file.
|
||||
|
||||
Note that notebooks are automatically persisted every @autosave_interval
|
||||
milliseconds.
|
||||
Note that notebooks are automatically persisted periodically as
|
||||
specified by the notebook settings.
|
||||
"""
|
||||
@spec save(pid()) :: :ok
|
||||
def save(pid) do
|
||||
|
@ -1415,7 +1442,7 @@ defmodule Livebook.Session do
|
|||
|> notify_update()}
|
||||
end
|
||||
|
||||
def handle_info({:runtime_evaluation_input, cell_id, reply_to, input_id}, state) do
|
||||
def handle_info({:runtime_evaluation_input_request, cell_id, reply_to, input_id}, state) do
|
||||
{reply, state} =
|
||||
with {:ok, cell, _section} <- Notebook.fetch_cell_and_section(state.data.notebook, cell_id),
|
||||
{:ok, value} <- Data.fetch_input_value_for_cell(state.data, input_id, cell_id) do
|
||||
|
@ -1430,17 +1457,55 @@ defmodule Livebook.Session do
|
|||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({:runtime_file_lookup, reply_to, file_ref}, state) do
|
||||
def handle_info({:runtime_file_path_request, reply_to, file_ref}, state) do
|
||||
path = registered_file_path(state.session_id, file_ref)
|
||||
|
||||
if File.exists?(path) do
|
||||
{:file, file_id} = file_ref
|
||||
|
||||
Runtime.transfer_file(state.data.runtime, path, file_id, fn path ->
|
||||
send(reply_to, {:runtime_file_lookup_reply, {:ok, path}})
|
||||
send(reply_to, {:runtime_file_path_reply, {:ok, path}})
|
||||
end)
|
||||
else
|
||||
send(reply_to, {:runtime_file_lookup_reply, :error})
|
||||
send(reply_to, {:runtime_file_path_reply, :error})
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({:runtime_file_entry_path_request, reply_to, name}, state) do
|
||||
file_entry_path(state, name, fn
|
||||
{:ok, path} ->
|
||||
file_id = file_entry_file_id(name)
|
||||
|
||||
Runtime.transfer_file(state.data.runtime, path, file_id, fn path ->
|
||||
send(reply_to, {:runtime_file_entry_path_reply, {:ok, path}})
|
||||
end)
|
||||
|
||||
{:error, message} ->
|
||||
send(reply_to, {:runtime_file_entry_path_reply, {:error, message}})
|
||||
end)
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
def handle_info({:runtime_file_entry_spec_request, reply_to, name}, state) do
|
||||
case file_entry_spec(state, name) do
|
||||
# In case of files we call transfer to ensure the file is local
|
||||
# to the runtime
|
||||
{:ok, %{type: :local, path: path}} ->
|
||||
file_id = file_entry_file_id(name)
|
||||
|
||||
Runtime.transfer_file(state.data.runtime, path, file_id, fn path ->
|
||||
spec = %{type: :local, path: path}
|
||||
send(reply_to, {:runtime_file_entry_spec_reply, {:ok, spec}})
|
||||
end)
|
||||
|
||||
{:ok, spec} ->
|
||||
send(reply_to, {:runtime_file_entry_spec_reply, {:ok, spec}})
|
||||
|
||||
{:error, message} ->
|
||||
send(reply_to, {:runtime_file_entry_spec_reply, {:error, message}})
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
|
@ -2016,6 +2081,22 @@ defmodule Livebook.Session do
|
|||
notify_update(state)
|
||||
end
|
||||
|
||||
defp after_operation(state, prev_state, {:add_file_entries, _client_id, _file_entries}) do
|
||||
replaced_names =
|
||||
Enum.map(
|
||||
prev_state.data.notebook.file_entries -- state.data.notebook.file_entries,
|
||||
& &1.name
|
||||
)
|
||||
|
||||
cleanup_file_entries(state, replaced_names)
|
||||
state
|
||||
end
|
||||
|
||||
defp after_operation(state, _prev_state, {:delete_file_entry, _client_id, name}) do
|
||||
cleanup_file_entries(state, [name])
|
||||
state
|
||||
end
|
||||
|
||||
defp after_operation(state, _prev_state, _operation), do: state
|
||||
|
||||
defp handle_actions(state, actions) do
|
||||
|
@ -2371,6 +2452,138 @@ defmodule Livebook.Session do
|
|||
%{state | registered_files: Map.new(other_files)}
|
||||
end
|
||||
|
||||
defp file_entry_path(state, name, callback) do
|
||||
file_entry = Enum.find(state.data.notebook.file_entries, &(&1.name == name))
|
||||
|
||||
case file_entry do
|
||||
%{type: :attachment, name: name} ->
|
||||
files_dir = files_dir_from_state(state)
|
||||
file = FileSystem.File.resolve(files_dir, name)
|
||||
file_entry_path_from_file(state, name, file, callback)
|
||||
|
||||
%{type: :file, name: name, file: file} ->
|
||||
file_entry_path_from_file(state, name, file, callback)
|
||||
|
||||
%{type: :url, name: name, url: url} ->
|
||||
file_entry_path_from_url(state, name, url, callback)
|
||||
|
||||
nil ->
|
||||
callback.({:error, "no file named #{inspect(name)} exists in the notebook"})
|
||||
end
|
||||
end
|
||||
|
||||
defp file_entry_path_from_file(state, name, file, callback) do
|
||||
if FileSystem.File.local?(file) do
|
||||
if FileSystem.File.exists?(file) == {:ok, true} do
|
||||
callback.({:ok, file.path})
|
||||
else
|
||||
callback.({:error, "no file exists at path #{inspect(file.path)}"})
|
||||
end
|
||||
else
|
||||
fetcher = fn cache_file ->
|
||||
FileSystem.File.copy(file, cache_file)
|
||||
end
|
||||
|
||||
cached_file_entry_path(state, name, fetcher, callback)
|
||||
end
|
||||
end
|
||||
|
||||
defp file_entry_path_from_url(state, name, url, callback) do
|
||||
fetcher = fn cache_file ->
|
||||
case fetch_content(url) do
|
||||
{:ok, content} -> FileSystem.File.write(cache_file, content)
|
||||
{:error, message, _} -> {:error, message}
|
||||
end
|
||||
end
|
||||
|
||||
cached_file_entry_path(state, name, fetcher, callback)
|
||||
end
|
||||
|
||||
defp cached_file_entry_path(state, name, fetcher, callback) do
|
||||
cache_file = file_entry_cache_file(state.session_id, name)
|
||||
|
||||
if FileSystem.File.exists?(cache_file) == {:ok, true} do
|
||||
callback.({:ok, cache_file.path})
|
||||
else
|
||||
Task.Supervisor.start_child(Livebook.TaskSupervisor, fn ->
|
||||
case fetcher.(cache_file) do
|
||||
:ok -> callback.({:ok, cache_file.path})
|
||||
{:error, message} -> callback.({:error, message})
|
||||
end
|
||||
end)
|
||||
end
|
||||
end
|
||||
|
||||
defp file_entry_cache_file(session_id, name) do
|
||||
tmp_dir = session_tmp_dir(session_id)
|
||||
FileSystem.File.resolve(tmp_dir, "files_cache/#{name}")
|
||||
end
|
||||
|
||||
defp file_entry_spec(state, name) do
|
||||
file_entry = Enum.find(state.data.notebook.file_entries, &(&1.name == name))
|
||||
|
||||
case file_entry do
|
||||
%{type: :attachment, name: name} ->
|
||||
files_dir = files_dir_from_state(state)
|
||||
file = FileSystem.File.resolve(files_dir, name)
|
||||
file_entry_spec_from_file(file)
|
||||
|
||||
%{type: :file, file: file} ->
|
||||
file_entry_spec_from_file(file)
|
||||
|
||||
%{type: :url, url: url} ->
|
||||
file_entry_spec_from_url(url)
|
||||
|
||||
nil ->
|
||||
{:error, "no file named #{inspect(name)} exists in the notebook"}
|
||||
end
|
||||
end
|
||||
|
||||
defp file_entry_spec_from_file(file) do
|
||||
if FileSystem.File.local?(file) do
|
||||
if FileSystem.File.exists?(file) == {:ok, true} do
|
||||
{:ok, %{type: :local, path: file.path}}
|
||||
else
|
||||
{:error, "no file exists at path #{inspect(file.path)}"}
|
||||
end
|
||||
else
|
||||
spec =
|
||||
case file.file_system do
|
||||
%FileSystem.S3{} = file_system ->
|
||||
"/" <> key = file.path
|
||||
|
||||
%{
|
||||
type: :s3,
|
||||
bucket_url: file_system.bucket_url,
|
||||
region: file_system.region,
|
||||
access_key_id: file_system.access_key_id,
|
||||
secret_access_key: file_system.secret_access_key,
|
||||
key: key
|
||||
}
|
||||
end
|
||||
|
||||
{:ok, spec}
|
||||
end
|
||||
end
|
||||
|
||||
defp file_entry_spec_from_url(url) do
|
||||
{:ok, %{type: :url, url: url}}
|
||||
end
|
||||
|
||||
defp file_entry_file_id(name), do: "notebook-file-entry-#{name}"
|
||||
|
||||
defp cleanup_file_entries(state, names) do
|
||||
for name <- names do
|
||||
cache_file = file_entry_cache_file(state.session_id, name)
|
||||
FileSystem.File.remove(cache_file)
|
||||
|
||||
if Runtime.connected?(state.data.runtime) do
|
||||
file_id = file_entry_file_id(name)
|
||||
Runtime.revoke_file(state.data.runtime, file_id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp before_close(state) do
|
||||
maybe_save_notebook_sync(state)
|
||||
broadcast_message(state.session_id, :session_closed)
|
||||
|
|
|
@ -54,6 +54,20 @@ defmodule LivebookWeb.SessionLive.FilesListComponent do
|
|||
<span>Copy to files</span>
|
||||
</button>
|
||||
</.menu_item>
|
||||
<.menu_item disabled={not Livebook.Session.file_entry_cacheable?(@session, file_entry)}>
|
||||
<button
|
||||
role="menuitem"
|
||||
phx-click={
|
||||
JS.push("clear_file_entry_cache",
|
||||
value: %{name: file_entry.name},
|
||||
target: @myself
|
||||
)
|
||||
}
|
||||
>
|
||||
<.remix_icon icon="eraser-line" />
|
||||
<span>Clear cache</span>
|
||||
</button>
|
||||
</.menu_item>
|
||||
<.menu_item variant={:danger}>
|
||||
<button
|
||||
role="menuitem"
|
||||
|
@ -106,66 +120,82 @@ defmodule LivebookWeb.SessionLive.FilesListComponent do
|
|||
|
||||
@impl true
|
||||
def handle_event("delete_file_entry", %{"name" => name}, socket) do
|
||||
file_entry = Enum.find(socket.assigns.file_entries, &(&1.name == name))
|
||||
session = socket.assigns.session
|
||||
if file_entry = find_file_entry(socket, name) do
|
||||
session = socket.assigns.session
|
||||
|
||||
on_confirm = fn socket, options ->
|
||||
Livebook.Session.delete_file_entry(session.pid, file_entry.name)
|
||||
on_confirm = fn socket, options ->
|
||||
Livebook.Session.delete_file_entry(session.pid, file_entry.name)
|
||||
|
||||
if Map.get(options, "delete_from_file_system", false) do
|
||||
file = FileSystem.File.resolve(session.files_dir, file_entry.name)
|
||||
if Map.get(options, "delete_from_file_system", false) do
|
||||
file = FileSystem.File.resolve(session.files_dir, file_entry.name)
|
||||
|
||||
case FileSystem.File.remove(file) do
|
||||
:ok ->
|
||||
socket
|
||||
case FileSystem.File.remove(file) do
|
||||
:ok ->
|
||||
socket
|
||||
|
||||
{:error, error} ->
|
||||
put_flash(socket, :error, "Failed to remove #{file.path}, reason: #{error}")
|
||||
{:error, error} ->
|
||||
put_flash(socket, :error, "Failed to remove #{file.path}, reason: #{error}")
|
||||
end
|
||||
else
|
||||
socket
|
||||
end
|
||||
else
|
||||
socket
|
||||
end
|
||||
|
||||
assigns = %{name: file_entry.name}
|
||||
|
||||
description = ~H"""
|
||||
Are you sure you want to delete this file - <span class="font-semibold">“<%= @name %>”</span>?
|
||||
"""
|
||||
|
||||
{:noreply,
|
||||
confirm(socket, on_confirm,
|
||||
title: "Delete file",
|
||||
description: description,
|
||||
confirm_text: "Delete",
|
||||
confirm_icon: "delete-bin-6-line",
|
||||
options:
|
||||
if file_entry.type == :attachment do
|
||||
[
|
||||
%{
|
||||
name: "delete_from_file_system",
|
||||
label: "Delete the corresponding file from the file system",
|
||||
default: true,
|
||||
disabled: false
|
||||
}
|
||||
]
|
||||
else
|
||||
[]
|
||||
end
|
||||
)}
|
||||
else
|
||||
{:noreply, socket}
|
||||
end
|
||||
|
||||
assigns = %{name: file_entry.name}
|
||||
|
||||
description = ~H"""
|
||||
Are you sure you want to delete this file - <span class="font-semibold">“<%= @name %>”</span>?
|
||||
"""
|
||||
|
||||
{:noreply,
|
||||
confirm(socket, on_confirm,
|
||||
title: "Delete file",
|
||||
description: description,
|
||||
confirm_text: "Delete",
|
||||
confirm_icon: "delete-bin-6-line",
|
||||
options:
|
||||
if file_entry.type == :attachment do
|
||||
[
|
||||
%{
|
||||
name: "delete_from_file_system",
|
||||
label: "Delete the corresponding file from the file system",
|
||||
default: true,
|
||||
disabled: false
|
||||
}
|
||||
]
|
||||
else
|
||||
[]
|
||||
end
|
||||
)}
|
||||
end
|
||||
|
||||
def handle_event("transfer_file_entry", %{"name" => name}, socket) do
|
||||
file_entry = Enum.find(socket.assigns.file_entries, &(&1.name == name))
|
||||
pid = self()
|
||||
id = socket.assigns.id
|
||||
session = socket.assigns.session
|
||||
if file_entry = find_file_entry(socket, name) do
|
||||
pid = self()
|
||||
id = socket.assigns.id
|
||||
session = socket.assigns.session
|
||||
|
||||
Task.Supervisor.async_nolink(Livebook.TaskSupervisor, fn ->
|
||||
file_entry_result = Livebook.Session.to_attachment_file_entry(session, file_entry)
|
||||
send_update(pid, __MODULE__, id: id, transfer_file_entry_result: file_entry_result)
|
||||
end)
|
||||
Task.Supervisor.async_nolink(Livebook.TaskSupervisor, fn ->
|
||||
file_entry_result = Livebook.Session.to_attachment_file_entry(session, file_entry)
|
||||
send_update(pid, __MODULE__, id: id, transfer_file_entry_result: file_entry_result)
|
||||
end)
|
||||
end
|
||||
|
||||
{:noreply, socket}
|
||||
end
|
||||
|
||||
def handle_event("clear_file_entry_cache", %{"name" => name}, socket) do
|
||||
if file_entry = find_file_entry(socket, name) do
|
||||
Livebook.Session.clear_file_entry_cache(socket.assigns.session.id, file_entry.name)
|
||||
end
|
||||
|
||||
{:noreply, socket}
|
||||
end
|
||||
|
||||
defp find_file_entry(socket, name) do
|
||||
Enum.find(socket.assigns.file_entries, &(&1.name == name))
|
||||
end
|
||||
end
|
||||
|
|
|
@ -139,7 +139,7 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do
|
|||
end
|
||||
|
||||
describe "evaluation file requests" do
|
||||
test "returns the before_evaluationd file", %{io: io} do
|
||||
test "returns the before_evaluation file", %{io: io} do
|
||||
IOProxy.before_evaluation(io, :ref1, "cell1")
|
||||
assert livebook_get_evaluation_file(io) == "cell1"
|
||||
end
|
||||
|
@ -151,7 +151,7 @@ defmodule Livebook.Runtime.Evaluator.IOProxyTest do
|
|||
|
||||
defp reply_to_input_request(ref, input_id, reply, times) do
|
||||
receive do
|
||||
{:runtime_evaluation_input, ^ref, reply_to, ^input_id} ->
|
||||
{:runtime_evaluation_input_request, ^ref, reply_to, ^input_id} ->
|
||||
send(reply_to, {:runtime_evaluation_input_reply, reply})
|
||||
reply_to_input_request(ref, input_id, reply, times - 1)
|
||||
end
|
||||
|
|
|
@ -138,7 +138,7 @@ defmodule Livebook.Runtime.EvaluatorTest do
|
|||
|
||||
Evaluator.evaluate_code(evaluator, :elixir, code, :code_1, [])
|
||||
|
||||
assert_receive {:runtime_evaluation_input, :code_1, reply_to, "input1"}
|
||||
assert_receive {:runtime_evaluation_input_request, :code_1, reply_to, "input1"}
|
||||
send(reply_to, {:runtime_evaluation_input_reply, {:ok, 10}})
|
||||
|
||||
assert_receive {:runtime_evaluation_response, :code_1, {:text, ansi_number(10)}, metadata()}
|
||||
|
|
|
@ -483,9 +483,9 @@ defmodule Livebook.SessionTest do
|
|||
Session.set_file(session.pid, file)
|
||||
|
||||
%{files_dir: files_dir} = Session.get_by_pid(session.pid)
|
||||
image_file = FileSystem.File.resolve(files_dir, "test.jpg")
|
||||
image_file = FileSystem.File.resolve(files_dir, "image.jpg")
|
||||
:ok = FileSystem.File.write(image_file, "")
|
||||
Session.add_file_entries(session.pid, [%{type: :attachment, name: "test.jpg"}])
|
||||
Session.add_file_entries(session.pid, [%{type: :attachment, name: "image.jpg"}])
|
||||
|
||||
unused_image_file = FileSystem.File.resolve(files_dir, "unused.jpg")
|
||||
:ok = FileSystem.File.write(unused_image_file, "")
|
||||
|
@ -501,7 +501,7 @@ defmodule Livebook.SessionTest do
|
|||
%{files_dir: new_files_dir} = Session.get_by_pid(session.pid)
|
||||
|
||||
assert {:ok, true} =
|
||||
FileSystem.File.exists?(FileSystem.File.resolve(new_files_dir, "test.jpg"))
|
||||
FileSystem.File.exists?(FileSystem.File.resolve(new_files_dir, "image.jpg"))
|
||||
|
||||
assert {:ok, false} =
|
||||
FileSystem.File.exists?(FileSystem.File.resolve(new_files_dir, "unused.jpg"))
|
||||
|
@ -514,9 +514,9 @@ defmodule Livebook.SessionTest do
|
|||
tmp_dir = FileSystem.File.local(tmp_dir <> "/")
|
||||
%{files_dir: files_dir} = session
|
||||
|
||||
image_file = FileSystem.File.resolve(files_dir, "test.jpg")
|
||||
image_file = FileSystem.File.resolve(files_dir, "image.jpg")
|
||||
:ok = FileSystem.File.write(image_file, "")
|
||||
Session.add_file_entries(session.pid, [%{type: :attachment, name: "test.jpg"}])
|
||||
Session.add_file_entries(session.pid, [%{type: :attachment, name: "image.jpg"}])
|
||||
|
||||
file = FileSystem.File.resolve(tmp_dir, "notebook.livemd")
|
||||
Session.set_file(session.pid, file)
|
||||
|
@ -525,7 +525,7 @@ defmodule Livebook.SessionTest do
|
|||
wait_for_session_update(session.pid)
|
||||
|
||||
assert {:ok, true} =
|
||||
FileSystem.File.exists?(FileSystem.File.resolve(tmp_dir, "files/test.jpg"))
|
||||
FileSystem.File.exists?(FileSystem.File.resolve(tmp_dir, "files/image.jpg"))
|
||||
|
||||
assert {:ok, false} = FileSystem.File.exists?(files_dir)
|
||||
end
|
||||
|
@ -599,15 +599,15 @@ defmodule Livebook.SessionTest do
|
|||
|
||||
runtime = connected_noop_runtime(self())
|
||||
Session.set_runtime(session.pid, runtime)
|
||||
send(session.pid, {:runtime_file_lookup, self(), old_file_ref})
|
||||
assert_receive {:runtime_file_lookup_reply, {:ok, old_path}}
|
||||
send(session.pid, {:runtime_file_path_request, self(), old_file_ref})
|
||||
assert_receive {:runtime_file_path_reply, {:ok, old_path}}
|
||||
|
||||
source_path = Path.join(tmp_dir, "new.txt")
|
||||
File.write!(source_path, "content")
|
||||
{:ok, new_file_ref} = Session.register_file(session.pid, source_path, "key")
|
||||
|
||||
send(session.pid, {:runtime_file_lookup, self(), new_file_ref})
|
||||
assert_receive {:runtime_file_lookup_reply, {:ok, new_path}}
|
||||
send(session.pid, {:runtime_file_path_request, self(), new_file_ref})
|
||||
assert_receive {:runtime_file_path_reply, {:ok, new_path}}
|
||||
|
||||
{:file, file_id} = old_file_ref
|
||||
assert_receive {:runtime_trace, :revoke_file, [^file_id]}
|
||||
|
@ -633,8 +633,8 @@ defmodule Livebook.SessionTest do
|
|||
|
||||
runtime = connected_noop_runtime(self())
|
||||
Session.set_runtime(session.pid, runtime)
|
||||
send(session.pid, {:runtime_file_lookup, self(), file_ref})
|
||||
assert_receive {:runtime_file_lookup_reply, {:ok, path}}
|
||||
send(session.pid, {:runtime_file_path_request, self(), file_ref})
|
||||
assert_receive {:runtime_file_path_reply, {:ok, path}}
|
||||
|
||||
send(client_pid, :stop)
|
||||
|
||||
|
@ -674,8 +674,8 @@ defmodule Livebook.SessionTest do
|
|||
|
||||
runtime = connected_noop_runtime(self())
|
||||
Session.set_runtime(session.pid, runtime)
|
||||
send(session.pid, {:runtime_file_lookup, self(), file_ref})
|
||||
assert_receive {:runtime_file_lookup_reply, {:ok, path}}
|
||||
send(session.pid, {:runtime_file_path_request, self(), file_ref})
|
||||
assert_receive {:runtime_file_path_reply, {:ok, path}}
|
||||
|
||||
Session.erase_outputs(session.pid)
|
||||
|
||||
|
@ -1520,6 +1520,233 @@ defmodule Livebook.SessionTest do
|
|||
end
|
||||
end
|
||||
|
||||
describe "accessing file entry" do
|
||||
test "replies with error when file entry does not exist" do
|
||||
session = start_session()
|
||||
|
||||
runtime = connected_noop_runtime(self())
|
||||
Session.set_runtime(session.pid, runtime)
|
||||
send(session.pid, {:runtime_file_entry_path_request, self(), "image.jpg"})
|
||||
|
||||
assert_receive {:runtime_file_entry_path_reply,
|
||||
{:error, ~s/no file named "image.jpg" exists in the notebook/}}
|
||||
|
||||
# Spec request
|
||||
send(session.pid, {:runtime_file_entry_spec_request, self(), "image.jpg"})
|
||||
|
||||
assert_receive {:runtime_file_entry_spec_reply,
|
||||
{:error, ~s/no file named "image.jpg" exists in the notebook/}}
|
||||
end
|
||||
|
||||
test "when nonexistent :attachment replies with error" do
|
||||
session = start_session()
|
||||
|
||||
Session.add_file_entries(session.pid, [%{type: :attachment, name: "image.jpg"}])
|
||||
|
||||
runtime = connected_noop_runtime(self())
|
||||
Session.set_runtime(session.pid, runtime)
|
||||
send(session.pid, {:runtime_file_entry_path_request, self(), "image.jpg"})
|
||||
|
||||
assert_receive {:runtime_file_entry_path_reply, {:error, "no file exists at path " <> _}}
|
||||
|
||||
# Spec request
|
||||
send(session.pid, {:runtime_file_entry_spec_request, self(), "image.jpg"})
|
||||
assert_receive {:runtime_file_entry_spec_reply, {:error, "no file exists at path " <> _}}
|
||||
end
|
||||
|
||||
test "when local :attachment replies with the direct path" do
|
||||
session = start_session()
|
||||
|
||||
%{files_dir: files_dir} = session
|
||||
image_file = FileSystem.File.resolve(files_dir, "image.jpg")
|
||||
:ok = FileSystem.File.write(image_file, "")
|
||||
Session.add_file_entries(session.pid, [%{type: :attachment, name: "image.jpg"}])
|
||||
|
||||
runtime = connected_noop_runtime(self())
|
||||
Session.set_runtime(session.pid, runtime)
|
||||
send(session.pid, {:runtime_file_entry_path_request, self(), "image.jpg"})
|
||||
|
||||
path = image_file.path
|
||||
assert_receive {:runtime_file_entry_path_reply, {:ok, ^path}}
|
||||
|
||||
# Spec request
|
||||
send(session.pid, {:runtime_file_entry_spec_request, self(), "image.jpg"})
|
||||
assert_receive {:runtime_file_entry_spec_reply, {:ok, %{type: :local, path: ^path}}}
|
||||
end
|
||||
|
||||
@tag :tmp_dir
|
||||
test "when local :file replies with the direct path", %{tmp_dir: tmp_dir} do
|
||||
session = start_session()
|
||||
|
||||
tmp_dir = FileSystem.File.local(tmp_dir <> "/")
|
||||
image_file = FileSystem.File.resolve(tmp_dir, "image.jpg")
|
||||
:ok = FileSystem.File.write(image_file, "content")
|
||||
Session.add_file_entries(session.pid, [%{type: :file, name: "image.jpg", file: image_file}])
|
||||
|
||||
runtime = connected_noop_runtime(self())
|
||||
Session.set_runtime(session.pid, runtime)
|
||||
send(session.pid, {:runtime_file_entry_path_request, self(), "image.jpg"})
|
||||
|
||||
path = image_file.path
|
||||
assert_receive {:runtime_file_entry_path_reply, {:ok, ^path}}
|
||||
|
||||
# Spec request
|
||||
send(session.pid, {:runtime_file_entry_spec_request, self(), "image.jpg"})
|
||||
assert_receive {:runtime_file_entry_spec_reply, {:ok, %{type: :local, path: ^path}}}
|
||||
end
|
||||
|
||||
test "when remote :file replies with the cached path" do
|
||||
bypass = Bypass.open()
|
||||
bucket_url = "http://localhost:#{bypass.port}/mybucket"
|
||||
s3_fs = FileSystem.S3.new(bucket_url, "key", "secret")
|
||||
|
||||
Bypass.expect_once(bypass, "GET", "/mybucket/image.jpg", fn conn ->
|
||||
Plug.Conn.resp(conn, 200, "content")
|
||||
end)
|
||||
|
||||
session = start_session()
|
||||
|
||||
image_file = FileSystem.File.new(s3_fs, "/image.jpg")
|
||||
Session.add_file_entries(session.pid, [%{type: :file, name: "image.jpg", file: image_file}])
|
||||
|
||||
runtime = connected_noop_runtime(self())
|
||||
Session.set_runtime(session.pid, runtime)
|
||||
send(session.pid, {:runtime_file_entry_path_request, self(), "image.jpg"})
|
||||
|
||||
assert_receive {:runtime_file_entry_path_reply, {:ok, path}}
|
||||
assert File.read(path) == {:ok, "content"}
|
||||
|
||||
# Subsequent requests use the cached file
|
||||
send(session.pid, {:runtime_file_entry_path_request, self(), "image.jpg"})
|
||||
assert_receive {:runtime_file_entry_path_reply, {:ok, ^path}}
|
||||
|
||||
# Spec request
|
||||
send(session.pid, {:runtime_file_entry_spec_request, self(), "image.jpg"})
|
||||
|
||||
assert_receive {:runtime_file_entry_spec_reply,
|
||||
{:ok,
|
||||
%{
|
||||
type: :s3,
|
||||
bucket_url: ^bucket_url,
|
||||
region: "auto",
|
||||
access_key_id: "key",
|
||||
secret_access_key: "secret",
|
||||
key: "image.jpg"
|
||||
}}}
|
||||
end
|
||||
|
||||
test "when :url replies with the cached path" do
|
||||
bypass = Bypass.open()
|
||||
url = "http://localhost:#{bypass.port}/image.jpg"
|
||||
|
||||
Bypass.expect_once(bypass, "GET", "/image.jpg", fn conn ->
|
||||
Plug.Conn.resp(conn, 200, "content")
|
||||
end)
|
||||
|
||||
session = start_session()
|
||||
|
||||
Session.add_file_entries(session.pid, [%{type: :url, name: "image.jpg", url: url}])
|
||||
|
||||
runtime = connected_noop_runtime(self())
|
||||
Session.set_runtime(session.pid, runtime)
|
||||
send(session.pid, {:runtime_file_entry_path_request, self(), "image.jpg"})
|
||||
|
||||
assert_receive {:runtime_file_entry_path_reply, {:ok, path}}
|
||||
assert File.read(path) == {:ok, "content"}
|
||||
|
||||
# Subsequent requests use the cached file
|
||||
send(session.pid, {:runtime_file_entry_path_request, self(), "image.jpg"})
|
||||
assert_receive {:runtime_file_entry_path_reply, {:ok, ^path}}
|
||||
|
||||
# Spec request
|
||||
send(session.pid, {:runtime_file_entry_spec_request, self(), "image.jpg"})
|
||||
assert_receive {:runtime_file_entry_spec_reply, {:ok, %{type: :url, url: ^url}}}
|
||||
end
|
||||
|
||||
test "removing file entry removes the cached file" do
|
||||
bypass = Bypass.open()
|
||||
url = "http://localhost:#{bypass.port}/image.jpg"
|
||||
|
||||
Bypass.expect_once(bypass, "GET", "/image.jpg", fn conn ->
|
||||
Plug.Conn.resp(conn, 200, "content")
|
||||
end)
|
||||
|
||||
session = start_session()
|
||||
|
||||
Session.add_file_entries(session.pid, [%{type: :url, name: "image.jpg", url: url}])
|
||||
|
||||
runtime = connected_noop_runtime(self())
|
||||
Session.set_runtime(session.pid, runtime)
|
||||
send(session.pid, {:runtime_file_entry_path_request, self(), "image.jpg"})
|
||||
|
||||
assert_receive {:runtime_file_entry_path_reply, {:ok, path}}
|
||||
|
||||
Session.delete_file_entry(session.pid, "image.jpg")
|
||||
wait_for_session_update(session.pid)
|
||||
|
||||
refute File.exists?(path)
|
||||
end
|
||||
|
||||
test "replacing file entry removes the cached file" do
|
||||
bypass = Bypass.open()
|
||||
url = "http://localhost:#{bypass.port}/image.jpg"
|
||||
|
||||
Bypass.expect_once(bypass, "GET", "/image.jpg", fn conn ->
|
||||
Plug.Conn.resp(conn, 200, "content")
|
||||
end)
|
||||
|
||||
session = start_session()
|
||||
|
||||
Session.add_file_entries(session.pid, [%{type: :url, name: "image.jpg", url: url}])
|
||||
|
||||
runtime = connected_noop_runtime(self())
|
||||
Session.set_runtime(session.pid, runtime)
|
||||
send(session.pid, {:runtime_file_entry_path_request, self(), "image.jpg"})
|
||||
|
||||
assert_receive {:runtime_file_entry_path_reply, {:ok, path}}
|
||||
|
||||
Session.add_file_entries(session.pid, [%{type: :attachment, name: "image.jpg"}])
|
||||
wait_for_session_update(session.pid)
|
||||
|
||||
refute File.exists?(path)
|
||||
end
|
||||
|
||||
test "clear_file_entry_cache/2 removes the cached file" do
|
||||
bypass = Bypass.open()
|
||||
url = "http://localhost:#{bypass.port}/image.jpg"
|
||||
|
||||
Bypass.expect_once(bypass, "GET", "/image.jpg", fn conn ->
|
||||
Plug.Conn.resp(conn, 200, "content")
|
||||
end)
|
||||
|
||||
session = start_session()
|
||||
|
||||
Session.add_file_entries(session.pid, [%{type: :url, name: "image.jpg", url: url}])
|
||||
|
||||
runtime = connected_noop_runtime(self())
|
||||
Session.set_runtime(session.pid, runtime)
|
||||
send(session.pid, {:runtime_file_entry_path_request, self(), "image.jpg"})
|
||||
|
||||
assert_receive {:runtime_file_entry_path_reply, {:ok, path}}
|
||||
assert File.read(path) == {:ok, "content"}
|
||||
|
||||
Session.clear_file_entry_cache(session.id, "image.jpg")
|
||||
|
||||
refute File.exists?(path)
|
||||
|
||||
# Next access downloads the file again
|
||||
|
||||
Bypass.expect_once(bypass, "GET", "/image.jpg", fn conn ->
|
||||
Plug.Conn.resp(conn, 200, "new content")
|
||||
end)
|
||||
|
||||
send(session.pid, {:runtime_file_entry_path_request, self(), "image.jpg"})
|
||||
|
||||
assert_receive {:runtime_file_entry_path_reply, {:ok, path}}
|
||||
assert File.read(path) == {:ok, "new content"}
|
||||
end
|
||||
end
|
||||
|
||||
defp start_session(opts \\ []) do
|
||||
opts = Keyword.merge([id: Utils.random_id()], opts)
|
||||
pid = start_supervised!({Session, opts}, id: opts[:id])
|
||||
|
|
|
@ -488,8 +488,8 @@ defmodule LivebookWeb.SessionLiveTest do
|
|||
|
||||
assert %{file_ref: file_ref, client_name: "data.txt"} = value
|
||||
|
||||
send(session.pid, {:runtime_file_lookup, self(), file_ref})
|
||||
assert_receive {:runtime_file_lookup_reply, {:ok, path}}
|
||||
send(session.pid, {:runtime_file_path_request, self(), file_ref})
|
||||
assert_receive {:runtime_file_path_reply, {:ok, path}}
|
||||
assert File.read!(path) == "content"
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue