mirror of
https://github.com/livebook-dev/livebook.git
synced 2025-10-11 14:06:20 +08:00
Tag file input id (#1674)
This commit is contained in:
parent
69df6e0ad4
commit
50c23d0e4c
6 changed files with 47 additions and 40 deletions
|
@ -89,5 +89,5 @@ defmodule Livebook.Notebook.Cell do
|
||||||
Checks if the given term is a file input value (info map).
|
Checks if the given term is a file input value (info map).
|
||||||
"""
|
"""
|
||||||
defguard is_file_input_value(value)
|
defguard is_file_input_value(value)
|
||||||
when is_map_key(value, :file_id) and is_map_key(value, :client_name)
|
when is_map_key(value, :file_ref) and is_map_key(value, :client_name)
|
||||||
end
|
end
|
||||||
|
|
|
@ -269,13 +269,13 @@ defprotocol Livebook.Runtime do
|
||||||
@type editor :: %{language: String.t(), placement: :bottom | :top, source: String.t()}
|
@type editor :: %{language: String.t(), placement: :bottom | :top, source: String.t()}
|
||||||
|
|
||||||
@typedoc """
|
@typedoc """
|
||||||
An opaque file identifier.
|
An opaque file reference.
|
||||||
|
|
||||||
Such identifier can be obtained from a file input, for example.
|
Such reference can be obtained from a file input, for example.
|
||||||
|
|
||||||
The runtime may ask for the file by sending a request:
|
The runtime may ask for the file by sending a request:
|
||||||
|
|
||||||
* `{:runtime_file_lookup, reply_to, file_id}`
|
* `{:runtime_file_lookup, reply_to, file_ref}`
|
||||||
|
|
||||||
to which the runtime owner is supposed to reply with
|
to which the runtime owner is supposed to reply with
|
||||||
`{:runtime_file_lookup_reply, reply}` where `reply` is either
|
`{:runtime_file_lookup_reply, reply}` where `reply` is either
|
||||||
|
@ -283,7 +283,7 @@ defprotocol Livebook.Runtime do
|
||||||
that `path` should be accessible within the runtime and can be
|
that `path` should be accessible within the runtime and can be
|
||||||
obtained using `transfer_file/4`.
|
obtained using `transfer_file/4`.
|
||||||
"""
|
"""
|
||||||
@type file_id :: String.t()
|
@type file_ref :: {:file, id :: String.t()}
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Returns relevant information about the runtime.
|
Returns relevant information about the runtime.
|
||||||
|
|
|
@ -569,16 +569,16 @@ defmodule Livebook.Session do
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@spec register_file(pid(), String.t(), String.t(), keyword()) ::
|
@spec register_file(pid(), String.t(), String.t(), keyword()) ::
|
||||||
{:ok, file_id :: String.t()} | :error
|
{:ok, Runtime.file_ref()} | :error
|
||||||
def register_file(pid, source_path, key, opts \\ []) do
|
def register_file(pid, source_path, key, opts \\ []) do
|
||||||
opts = Keyword.validate!(opts, [:linked_client_id])
|
opts = Keyword.validate!(opts, [:linked_client_id])
|
||||||
|
|
||||||
%{file_id: file_id, path: path} = GenServer.call(pid, :register_file_init)
|
%{file_ref: file_ref, path: path} = GenServer.call(pid, :register_file_init)
|
||||||
|
|
||||||
with :ok <- File.mkdir_p(Path.dirname(path)),
|
with :ok <- File.mkdir_p(Path.dirname(path)),
|
||||||
:ok <- File.cp(source_path, path) do
|
:ok <- File.cp(source_path, path) do
|
||||||
GenServer.cast(pid, {:register_file_finish, file_id, key, opts[:linked_client_id]})
|
GenServer.cast(pid, {:register_file_finish, file_ref, key, opts[:linked_client_id]})
|
||||||
{:ok, file_id}
|
{:ok, file_ref}
|
||||||
else
|
else
|
||||||
_ -> :error
|
_ -> :error
|
||||||
end
|
end
|
||||||
|
@ -765,8 +765,9 @@ defmodule Livebook.Session do
|
||||||
|
|
||||||
def handle_call(:register_file_init, _from, state) do
|
def handle_call(:register_file_init, _from, state) do
|
||||||
file_id = Utils.random_id()
|
file_id = Utils.random_id()
|
||||||
path = registered_file_path(state.session_id, file_id)
|
file_ref = {:file, file_id}
|
||||||
reply = %{file_id: file_id, path: path}
|
path = registered_file_path(state.session_id, file_ref)
|
||||||
|
reply = %{file_ref: file_ref, path: path}
|
||||||
{:reply, reply, state}
|
{:reply, reply, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -1067,21 +1068,21 @@ defmodule Livebook.Session do
|
||||||
{:noreply, maybe_save_notebook_async(state)}
|
{:noreply, maybe_save_notebook_async(state)}
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_cast({:register_file_finish, file_id, key, linked_client_id}, state) do
|
def handle_cast({:register_file_finish, file_ref, key, linked_client_id}, state) do
|
||||||
{current_info, state} = pop_in(state.registered_files[key])
|
{current_info, state} = pop_in(state.registered_files[key])
|
||||||
|
|
||||||
if current_info do
|
if current_info do
|
||||||
schedule_file_deletion(state, current_info.file_id)
|
schedule_file_deletion(state, current_info.file_ref)
|
||||||
end
|
end
|
||||||
|
|
||||||
state =
|
state =
|
||||||
if linked_client_id == nil or Map.has_key?(state.data.clients_map, linked_client_id) do
|
if linked_client_id == nil or Map.has_key?(state.data.clients_map, linked_client_id) do
|
||||||
put_in(state.registered_files[key], %{
|
put_in(state.registered_files[key], %{
|
||||||
file_id: file_id,
|
file_ref: file_ref,
|
||||||
linked_client_id: linked_client_id
|
linked_client_id: linked_client_id
|
||||||
})
|
})
|
||||||
else
|
else
|
||||||
schedule_file_deletion(state, file_id)
|
schedule_file_deletion(state, file_ref)
|
||||||
state
|
state
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -1158,10 +1159,12 @@ defmodule Livebook.Session do
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_info({:runtime_file_lookup, reply_to, file_id}, state) do
|
def handle_info({:runtime_file_lookup, reply_to, file_ref}, state) do
|
||||||
path = registered_file_path(state.session_id, file_id)
|
path = registered_file_path(state.session_id, file_ref)
|
||||||
|
|
||||||
if File.exists?(path) do
|
if File.exists?(path) do
|
||||||
|
{:file, file_id} = file_ref
|
||||||
|
|
||||||
Runtime.transfer_file(state.data.runtime, path, file_id, fn path ->
|
Runtime.transfer_file(state.data.runtime, path, file_id, fn path ->
|
||||||
send(reply_to, {:runtime_file_lookup_reply, {:ok, path}})
|
send(reply_to, {:runtime_file_lookup_reply, {:ok, path}})
|
||||||
end)
|
end)
|
||||||
|
@ -1278,18 +1281,19 @@ defmodule Livebook.Session do
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
end
|
end
|
||||||
|
|
||||||
def handle_info({:delete_registered_file, file_id}, state) do
|
def handle_info({:delete_registered_file, file_ref}, state) do
|
||||||
path = registered_file_path(state.session_id, file_id)
|
path = registered_file_path(state.session_id, file_ref)
|
||||||
|
|
||||||
case File.rm_rf(path) do
|
case File.rm_rf(path) do
|
||||||
{:ok, _} ->
|
{:ok, _} ->
|
||||||
if Runtime.connected?(state.data.runtime) do
|
if Runtime.connected?(state.data.runtime) do
|
||||||
|
{:file, file_id} = file_ref
|
||||||
Runtime.revoke_file(state.data.runtime, file_id)
|
Runtime.revoke_file(state.data.runtime, file_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
{:error, _, _} ->
|
{:error, _, _} ->
|
||||||
# Deletion may fail if the file is still open, so we retry later
|
# Deletion may fail if the file is still open, so we retry later
|
||||||
schedule_file_deletion(state, path)
|
schedule_file_deletion(state, file_ref)
|
||||||
end
|
end
|
||||||
|
|
||||||
{:noreply, state}
|
{:noreply, state}
|
||||||
|
@ -1692,7 +1696,7 @@ defmodule Livebook.Session do
|
||||||
for {_input_id, value} <- input_values do
|
for {_input_id, value} <- input_values do
|
||||||
case value do
|
case value do
|
||||||
value when is_file_input_value(value) ->
|
value when is_file_input_value(value) ->
|
||||||
schedule_file_deletion(state, value.file_id)
|
schedule_file_deletion(state, value.file_ref)
|
||||||
|
|
||||||
_ ->
|
_ ->
|
||||||
:ok
|
:ok
|
||||||
|
@ -1884,15 +1888,15 @@ defmodule Livebook.Session do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp registered_file_path(session_id, file_id) do
|
defp registered_file_path(session_id, {:file, file_id}) do
|
||||||
%{path: session_dir} = session_tmp_dir(session_id)
|
%{path: session_dir} = session_tmp_dir(session_id)
|
||||||
Path.join([session_dir, "registered_files", file_id])
|
Path.join([session_dir, "registered_files", file_id])
|
||||||
end
|
end
|
||||||
|
|
||||||
defp schedule_file_deletion(state, file_id) do
|
defp schedule_file_deletion(state, file_ref) do
|
||||||
Process.send_after(
|
Process.send_after(
|
||||||
self(),
|
self(),
|
||||||
{:delete_registered_file, file_id},
|
{:delete_registered_file, file_ref},
|
||||||
state.registered_file_deletion_delay
|
state.registered_file_deletion_delay
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
@ -1904,7 +1908,7 @@ defmodule Livebook.Session do
|
||||||
end)
|
end)
|
||||||
|
|
||||||
for {_key, info} <- client_files do
|
for {_key, info} <- client_files do
|
||||||
schedule_file_deletion(state, info.file_id)
|
schedule_file_deletion(state, info.file_ref)
|
||||||
end
|
end
|
||||||
|
|
||||||
%{state | registered_files: Map.new(other_files)}
|
%{state | registered_files: Map.new(other_files)}
|
||||||
|
|
|
@ -59,7 +59,7 @@ defmodule LivebookWeb.Output.FileInputComponent do
|
||||||
if entry.done? do
|
if entry.done? do
|
||||||
socket
|
socket
|
||||||
|> consume_uploaded_entries(:file, fn %{path: path}, entry ->
|
|> consume_uploaded_entries(:file, fn %{path: path}, entry ->
|
||||||
{:ok, file_id} =
|
{:ok, file_ref} =
|
||||||
if socket.assigns.local do
|
if socket.assigns.local do
|
||||||
key = "#{socket.assigns.input_id}-#{socket.assigns.client_id}"
|
key = "#{socket.assigns.input_id}-#{socket.assigns.client_id}"
|
||||||
|
|
||||||
|
@ -71,11 +71,11 @@ defmodule LivebookWeb.Output.FileInputComponent do
|
||||||
Livebook.Session.register_file(socket.assigns.session_pid, path, key)
|
Livebook.Session.register_file(socket.assigns.session_pid, path, key)
|
||||||
end
|
end
|
||||||
|
|
||||||
{:ok, {file_id, entry.client_name}}
|
{:ok, {file_ref, entry.client_name}}
|
||||||
end)
|
end)
|
||||||
|> case do
|
|> case do
|
||||||
[{file_id, client_name}] ->
|
[{file_ref, client_name}] ->
|
||||||
value = %{file_id: file_id, client_name: client_name}
|
value = %{file_ref: file_ref, client_name: client_name}
|
||||||
|
|
||||||
send_update(LivebookWeb.Output.InputComponent,
|
send_update(LivebookWeb.Output.InputComponent,
|
||||||
id: socket.assigns.input_component_id,
|
id: socket.assigns.input_component_id,
|
||||||
|
|
|
@ -477,18 +477,18 @@ defmodule Livebook.SessionTest do
|
||||||
|
|
||||||
source_path = Path.join(tmp_dir, "old.txt")
|
source_path = Path.join(tmp_dir, "old.txt")
|
||||||
File.write!(source_path, "content")
|
File.write!(source_path, "content")
|
||||||
{:ok, old_file_id} = Session.register_file(session.pid, source_path, "key")
|
{:ok, old_file_ref} = Session.register_file(session.pid, source_path, "key")
|
||||||
|
|
||||||
runtime = connected_noop_runtime()
|
runtime = connected_noop_runtime()
|
||||||
Session.set_runtime(session.pid, runtime)
|
Session.set_runtime(session.pid, runtime)
|
||||||
send(session.pid, {:runtime_file_lookup, self(), old_file_id})
|
send(session.pid, {:runtime_file_lookup, self(), old_file_ref})
|
||||||
assert_receive {:runtime_file_lookup_reply, {:ok, old_path}}
|
assert_receive {:runtime_file_lookup_reply, {:ok, old_path}}
|
||||||
|
|
||||||
source_path = Path.join(tmp_dir, "new.txt")
|
source_path = Path.join(tmp_dir, "new.txt")
|
||||||
File.write!(source_path, "content")
|
File.write!(source_path, "content")
|
||||||
{:ok, new_file_id} = Session.register_file(session.pid, source_path, "key")
|
{:ok, new_file_ref} = Session.register_file(session.pid, source_path, "key")
|
||||||
|
|
||||||
send(session.pid, {:runtime_file_lookup, self(), new_file_id})
|
send(session.pid, {:runtime_file_lookup, self(), new_file_ref})
|
||||||
assert_receive {:runtime_file_lookup_reply, {:ok, new_path}}
|
assert_receive {:runtime_file_lookup_reply, {:ok, new_path}}
|
||||||
|
|
||||||
Process.sleep(10)
|
Process.sleep(10)
|
||||||
|
@ -514,12 +514,12 @@ defmodule Livebook.SessionTest do
|
||||||
source_path = Path.join(tmp_dir, "old.txt")
|
source_path = Path.join(tmp_dir, "old.txt")
|
||||||
File.write!(source_path, "content")
|
File.write!(source_path, "content")
|
||||||
|
|
||||||
{:ok, file_id} =
|
{:ok, file_ref} =
|
||||||
Session.register_file(session.pid, source_path, "key", linked_client_id: client_id)
|
Session.register_file(session.pid, source_path, "key", linked_client_id: client_id)
|
||||||
|
|
||||||
runtime = connected_noop_runtime()
|
runtime = connected_noop_runtime()
|
||||||
Session.set_runtime(session.pid, runtime)
|
Session.set_runtime(session.pid, runtime)
|
||||||
send(session.pid, {:runtime_file_lookup, self(), file_id})
|
send(session.pid, {:runtime_file_lookup, self(), file_ref})
|
||||||
assert_receive {:runtime_file_lookup_reply, {:ok, path}}
|
assert_receive {:runtime_file_lookup_reply, {:ok, path}}
|
||||||
|
|
||||||
send(client_pid, :stop)
|
send(client_pid, :stop)
|
||||||
|
@ -550,13 +550,16 @@ defmodule Livebook.SessionTest do
|
||||||
source_path = Path.join(tmp_dir, "old.txt")
|
source_path = Path.join(tmp_dir, "old.txt")
|
||||||
File.write!(source_path, "content")
|
File.write!(source_path, "content")
|
||||||
|
|
||||||
{:ok, file_id} = Session.register_file(session.pid, source_path, "key")
|
{:ok, file_ref} = Session.register_file(session.pid, source_path, "key")
|
||||||
|
|
||||||
Session.set_input_value(session.pid, "input1", %{file_id: file_id, client_name: "data.txt"})
|
Session.set_input_value(session.pid, "input1", %{
|
||||||
|
file_ref: file_ref,
|
||||||
|
client_name: "data.txt"
|
||||||
|
})
|
||||||
|
|
||||||
runtime = connected_noop_runtime()
|
runtime = connected_noop_runtime()
|
||||||
Session.set_runtime(session.pid, runtime)
|
Session.set_runtime(session.pid, runtime)
|
||||||
send(session.pid, {:runtime_file_lookup, self(), file_id})
|
send(session.pid, {:runtime_file_lookup, self(), file_ref})
|
||||||
assert_receive {:runtime_file_lookup_reply, {:ok, path}}
|
assert_receive {:runtime_file_lookup_reply, {:ok, path}}
|
||||||
|
|
||||||
Session.erase_outputs(session.pid)
|
Session.erase_outputs(session.pid)
|
||||||
|
|
|
@ -408,9 +408,9 @@ defmodule LivebookWeb.SessionLiveTest do
|
||||||
|
|
||||||
assert %{input_values: %{"input1" => value}} = Session.get_data(session.pid)
|
assert %{input_values: %{"input1" => value}} = Session.get_data(session.pid)
|
||||||
|
|
||||||
assert %{file_id: file_id, client_name: "data.txt"} = value
|
assert %{file_ref: file_ref, client_name: "data.txt"} = value
|
||||||
|
|
||||||
send(session.pid, {:runtime_file_lookup, self(), file_id})
|
send(session.pid, {:runtime_file_lookup, self(), file_ref})
|
||||||
assert_receive {:runtime_file_lookup_reply, {:ok, path}}
|
assert_receive {:runtime_file_lookup_reply, {:ok, path}}
|
||||||
assert File.read!(path) == "content"
|
assert File.read!(path) == "content"
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Reference in a new issue