Add support for file input (#1622)

This commit is contained in:
Jonatan Kłosko 2023-01-04 21:44:04 +01:00 committed by GitHub
parent d4bda6042c
commit 17ab1ae472
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 466 additions and 63 deletions

View file

@ -187,11 +187,9 @@ const AudioInput = {
pushAudio(audioInfo) {
this.pushEventTo(this.props.phxTarget, "change", {
value: {
data: bufferToBase64(this.encodeAudio(audioInfo)),
num_channels: audioInfo.numChannels,
sampling_rate: audioInfo.samplingRate,
},
data: bufferToBase64(this.encodeAudio(audioInfo)),
num_channels: audioInfo.numChannels,
sampling_rate: audioInfo.samplingRate,
});
},

View file

@ -265,11 +265,9 @@ const ImageInput = {
pushImage(canvas) {
this.pushEventTo(this.props.phxTarget, "change", {
value: {
data: canvasToBase64(canvas, this.props.format),
height: canvas.height,
width: canvas.width,
},
data: canvasToBase64(canvas, this.props.format),
height: canvas.height,
width: canvas.width,
});
},

View file

@ -84,4 +84,10 @@ defmodule Livebook.Notebook.Cell do
"""
@spec setup_cell_id() :: id()
def setup_cell_id(), do: @setup_cell_id
@doc """
Checks if the given term is a file input value (info map).
"""
defguard is_file_input_value(value)
when is_map_key(value, :path) and is_map_key(value, :client_name)
end

View file

@ -416,6 +416,25 @@ defprotocol Livebook.Runtime do
@spec read_file(Runtime.t(), String.t()) :: {:ok, binary()} | {:error, String.t()}
def read_file(runtime, path)
@doc """
Transfers file at `path` to the runtime.
This operation is asynchronous and `callback` is called with the
path of the transferred file (on the runtime host) once the transfer
is complete.
If the runtime is on the same host as the caller, the implementation
may simply use `path`.
"""
@spec transfer_file(t(), String.t(), String.t(), (path :: String.t() | nil -> any())) :: :ok
def transfer_file(runtime, path, file_id, callback)
@doc """
Cleans up resources allocated with `transfer_file/4`, if any.
"""
@spec revoke_file(t(), String.t()) :: :ok
def revoke_file(runtime, file_id)
@doc """
Starts a smart cell of the given kind.

View file

@ -115,6 +115,14 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Attached do
RuntimeServer.read_file(runtime.server_pid, path)
end
def transfer_file(runtime, path, file_id, callback) do
RuntimeServer.transfer_file(runtime.server_pid, path, file_id, callback)
end
def revoke_file(runtime, file_id) do
RuntimeServer.revoke_file(runtime.server_pid, file_id)
end
def start_smart_cell(runtime, kind, ref, attrs, parent_locators) do
RuntimeServer.start_smart_cell(runtime.server_pid, kind, ref, attrs, parent_locators)
end

View file

@ -278,6 +278,14 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.ElixirStandalone do
RuntimeServer.read_file(runtime.server_pid, path)
end
def transfer_file(runtime, path, file_id, callback) do
RuntimeServer.transfer_file(runtime.server_pid, path, file_id, callback)
end
def revoke_file(runtime, file_id) do
RuntimeServer.revoke_file(runtime.server_pid, file_id)
end
def start_smart_cell(runtime, kind, ref, attrs, parent_locators) do
RuntimeServer.start_smart_cell(runtime.server_pid, kind, ref, attrs, parent_locators)
end

View file

@ -96,6 +96,14 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Embedded do
RuntimeServer.read_file(runtime.server_pid, path)
end
def transfer_file(runtime, path, file_id, callback) do
RuntimeServer.transfer_file(runtime.server_pid, path, file_id, callback)
end
def revoke_file(runtime, file_id) do
RuntimeServer.revoke_file(runtime.server_pid, file_id)
end
def start_smart_cell(runtime, kind, ref, attrs, parent_locators) do
RuntimeServer.start_smart_cell(runtime.server_pid, kind, ref, attrs, parent_locators)
end

View file

@ -184,7 +184,10 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do
@impl true
def handle_call({:start_runtime_server, opts}, _from, state) do
opts = Keyword.put_new(opts, :ebin_path, ebin_path(state.tmp_dir))
opts =
opts
|> Keyword.put_new(:ebin_path, ebin_path(state.tmp_dir))
|> Keyword.put_new(:tmp_dir, child_tmp_dir(state.tmp_dir))
{:ok, server_pid} =
DynamicSupervisor.start_child(state.server_supervisor, {ErlDist.RuntimeServer, opts})
@ -205,6 +208,9 @@ defmodule Livebook.Runtime.ErlDist.NodeManager do
defp ebin_path(nil), do: nil
defp ebin_path(tmp_dir), do: Path.join(tmp_dir, "ebin")
defp child_tmp_dir(nil), do: nil
defp child_tmp_dir(tmp_dir), do: Path.join(tmp_dir, random_id())
defp random_id() do
:crypto.strong_rand_bytes(20) |> Base.encode32(case: :lower)
end

View file

@ -43,6 +43,10 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
* `:ebin_path` - a directory to write modules bytecode into. When
not specified, modules are not written to disk
* `:tmp_dir` - a temporary directory to write files into, such as
those from file inputs. When not specified, operations relying
on the directory are not possible
"""
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts)
@ -146,6 +150,61 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
end
end
@doc """
Transfers file at `path` to the runtime.
If the runtime is at the same host as the caller, no copying occurs.
See `Livebook.Runtime` for more details.
"""
@spec transfer_file(pid(), String.t(), String.t(), (path :: String.t() | nil -> any())) :: :ok
def transfer_file(pid, path, file_id, callback) do
if same_host?(pid) do
callback.(path)
else
Task.Supervisor.start_child(Livebook.TaskSupervisor, fn ->
md5 = file_md5(path)
target_path =
case GenServer.call(pid, {:transfer_file_open, file_id, md5}, :infinity) do
{:noop, target_path} ->
target_path
{:transfer, target_path, target_pid} ->
try do
path
|> File.stream!([], 2048)
|> Enum.each(fn chunk -> IO.binwrite(target_pid, chunk) end)
target_path
rescue
_error -> nil
after
File.close(target_pid)
end
end
callback.(target_path)
end)
end
:ok
end
@doc """
Removes the file created by `transfer_file/4`, if any.
See `Livebook.Runtime` for more details.
"""
@spec revoke_file(pid(), String.t()) :: :ok
def revoke_file(pid, file_id) do
unless same_host?(pid) do
GenServer.cast(pid, {:revoke_file, file_id})
end
:ok
end
@doc """
Starts a new smart cell.
"""
@ -242,7 +301,8 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
memory_timer_ref: nil,
last_evaluator: nil,
initial_path: System.get_env("PATH", ""),
ebin_path: Keyword.get(opts, :ebin_path)
ebin_path: Keyword.get(opts, :ebin_path),
tmp_dir: Keyword.get(opts, :tmp_dir)
}}
end
@ -533,6 +593,13 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
{:noreply, state}
end
def handle_cast({:revoke_file, file_id}, state) do
target_path = file_path(state, file_id)
File.rm(target_path)
{:noreply, state}
end
@impl true
def handle_call({:read_file, path}, {from_pid, _}, state) do
# Delegate reading to a separate task and let the caller
@ -554,6 +621,31 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
{:reply, {result_ref, task_pid}, state}
end
def handle_call({:transfer_file_open, file_id, md5}, _from, state) do
reply =
if target_path = file_path(state, file_id) do
current_md5 = if File.exists?(target_path), do: file_md5(target_path)
if current_md5 == md5 do
{:noop, target_path}
else
target_path |> Path.dirname() |> File.mkdir_p!()
target_pid = File.open!(target_path, [:binary, :write])
{:transfer, target_path, target_pid}
end
else
{:noop, nil}
end
{:reply, reply, state}
end
defp file_path(state, file_id) do
if tmp_dir = state.tmp_dir do
Path.join([tmp_dir, "files", file_id])
end
end
defp ensure_evaluator(state, container_ref) do
if Map.has_key?(state.evaluators, container_ref) do
state
@ -737,4 +829,23 @@ defmodule Livebook.Runtime.ErlDist.RuntimeServer do
_ -> ":"
end
end
defp same_host?(pid) do
node_host(node()) == node_host(node(pid))
end
defp node_host(node) do
[_nodename, hostname] =
node
|> Atom.to_charlist()
|> :string.split(~c"@")
hostname
end
defp file_md5(path) do
File.stream!(path, [], 2048)
|> Enum.reduce(:erlang.md5_init(), &:erlang.md5_update(&2, &1))
|> :erlang.md5_final()
end
end

View file

@ -59,6 +59,8 @@ defmodule Livebook.Session do
use GenServer, restart: :temporary
import Livebook.Notebook.Cell, only: [is_file_input_value: 1]
alias Livebook.Session.{Data, FileGuard}
alias Livebook.{Utils, Notebook, Delta, Runtime, LiveMarkdown, FileSystem}
alias Livebook.Users.User
@ -1064,16 +1066,21 @@ defmodule Livebook.Session do
end
def handle_info({:runtime_evaluation_input, cell_id, reply_to, input_id}, state) do
{reply, state} =
state =
with {:ok, cell, _section} <- Notebook.fetch_cell_and_section(state.data.notebook, cell_id),
{:ok, value} <- Data.fetch_input_value_for_cell(state.data, input_id, cell_id) do
state = handle_operation(state, {:bind_input, @client_id, cell.id, input_id})
{{:ok, value}, state}
else
_ -> {:error, state}
end
send(reply_to, {:runtime_evaluation_input_reply, reply})
prepare_input_value(input_id, value, state.data, fn value ->
send(reply_to, {:runtime_evaluation_input_reply, {:ok, value}})
end)
state
else
_ ->
send(reply_to, {:runtime_evaluation_input_reply, :error})
state
end
{:noreply, state}
end
@ -1245,6 +1252,7 @@ defmodule Livebook.Session do
@doc """
Returns a local path to the directory for all assets for hash.
"""
@spec local_assets_path(String.t()) :: String.t()
def local_assets_path(hash) do
Path.join([livebook_tmp_path(), "assets", encode_path_component(hash)])
end
@ -1272,6 +1280,15 @@ defmodule Livebook.Session do
end
end
@doc """
Returns a local path to the given file input file.
"""
@spec local_file_input_path(id(), Data.input_id()) :: String.t()
def local_file_input_path(session_id, input_id) do
%{path: session_dir} = session_tmp_dir(session_id)
Path.join([session_dir, "file_inputs", input_id])
end
defp encode_path_component(component) do
String.replace(component, [".", "/", "\\", ":"], "_")
end
@ -1576,6 +1593,24 @@ defmodule Livebook.Session do
state
end
defp handle_action(state, {:clean_up_input_values, input_values}) do
for {input_id, value} <- input_values do
case value do
value when is_file_input_value(value) ->
if Runtime.connected?(state.data.runtime) do
Runtime.revoke_file(state.data.runtime, input_id)
end
File.rm(value.path)
_ ->
:ok
end
end
state
end
defp handle_action(state, _action), do: state
defp start_evaluation(state, cell, section) do
@ -1603,6 +1638,17 @@ defmodule Livebook.Session do
handle_operation(state, {:evaluation_started, @client_id, cell.id, evaluation_digest})
end
defp prepare_input_value(input_id, value, data, on_value) when is_file_input_value(value) do
Runtime.transfer_file(data.runtime, value.path, input_id, fn path ->
value = %{path: path, client_name: value.client_name}
on_value.(value)
end)
end
defp prepare_input_value(_input_id, value, _data, on_value) do
on_value.(value)
end
defp broadcast_operation(session_id, operation) do
broadcast_message(session_id, {:operation, operation})
end

View file

@ -196,6 +196,7 @@ defmodule Livebook.Session.Data do
| {:set_smart_cell_parents, Cell.t(), Section.t(),
parent :: {Cell.t(), Section.t()} | nil}
| {:broadcast_delta, client_id(), Cell.t(), cell_source_tag(), Delta.t()}
| {:clean_up_input_values, %{input_id() => term()}}
@doc """
Returns a fresh notebook session state.
@ -1698,7 +1699,15 @@ defmodule Livebook.Session.Data do
data_actions
else
used_input_ids = data.notebook |> initial_input_values() |> Map.keys()
set!(data_actions, input_values: Map.take(data.input_values, used_input_ids))
{input_values, unused_input_values} = Map.split(data.input_values, used_input_ids)
if unused_input_values == %{} do
data_actions
else
data_actions
|> set!(input_values: input_values)
|> add_action({:clean_up_input_values, unused_input_values})
end
end
end

View file

@ -213,12 +213,18 @@ defmodule LivebookWeb.Output do
"""
end
defp render_output({:input, attrs}, %{id: id, input_values: input_values, client_id: client_id}) do
defp render_output({:input, attrs}, %{
id: id,
input_values: input_values,
client_id: client_id,
session_id: session_id
}) do
live_component(Output.InputComponent,
id: id,
attrs: attrs,
input_values: input_values,
client_id: client_id
client_id: client_id,
session_id: session_id
)
end

View file

@ -42,7 +42,7 @@ defmodule LivebookWeb.Output.AudioInputComponent do
phx-hook="AudioInput"
phx-update="ignore"
data-id={@id}
data-phx-target={@target}
data-phx-target={@myself}
data-format={@format}
data-sampling-rate={@sampling_rate}
data-endianness={@endianness}
@ -86,4 +86,22 @@ defmodule LivebookWeb.Output.AudioInputComponent do
</div>
"""
end
@impl true
def handle_event("change", params, socket) do
value = %{
data: Base.decode64!(params["data"]),
num_channels: params["num_channels"],
sampling_rate: params["sampling_rate"],
format: socket.assigns.format
}
send_update(LivebookWeb.Output.InputComponent,
id: socket.assigns.input_component_id,
event: :change,
value: value
)
{:noreply, socket}
end
end

View file

@ -0,0 +1,96 @@
defmodule LivebookWeb.Output.FileInputComponent do
use LivebookWeb, :live_component
@impl true
def mount(socket) do
{:ok, assign(socket, initialized: false)}
end
@impl true
def update(assigns, socket) do
socket = assign(socket, assigns)
socket =
if socket.assigns.initialized do
socket
else
socket
|> allow_upload(:file,
accept: socket.assigns.accept,
max_entries: 1,
progress: &handle_progress/3,
auto_upload: true
)
|> assign(initialized: true)
end
{:ok, socket}
end
@impl true
def render(assigns) do
~H"""
<form id={"#{@id}-root"} phx-change="validate" phx-target={@myself}>
<label
class="inline-flex flex-col gap-4 p-4 border-2 border-dashed border-gray-200 rounded-lg cursor-pointer"
phx-drop-target={@uploads.file.ref}
phx-hook="Dropzone"
id="upload-file-dropzone"
>
<div class="flex justify-center text-gray-500">
<%= if @value do %>
<%= @value.client_name %>
<% else %>
Click to select a file or drag a local file here
<% end %>
</div>
<.live_file_input upload={@uploads.file} class="hidden" />
</label>
</form>
"""
end
@impl true
def handle_event("validate", %{}, socket) do
{:noreply, socket}
end
defp handle_progress(:file, entry, socket) do
if entry.done? do
socket
|> consume_uploaded_entries(:file, fn %{path: path}, entry ->
destination_path =
Livebook.Session.local_file_input_path(
socket.assigns.session_id,
socket.assigns.input_id
)
destination_path |> Path.dirname() |> File.mkdir_p!()
File.cp!(path, destination_path)
{:ok, {destination_path, entry.client_name}}
end)
|> case do
[{path, client_name}] ->
# The path is always the same, so we include a random version
# to reflect a new value
value = %{
path: path,
client_name: client_name,
version: Livebook.Utils.random_short_id()
}
send_update(LivebookWeb.Output.InputComponent,
id: socket.assigns.input_component_id,
event: :change,
value: value
)
[] ->
:ok
end
end
{:noreply, socket}
end
end

View file

@ -42,7 +42,7 @@ defmodule LivebookWeb.Output.ImageInputComponent do
phx-hook="ImageInput"
phx-update="ignore"
data-id={@id}
data-phx-target={@target}
data-phx-target={@myself}
data-height={@height}
data-width={@width}
data-format={@format}
@ -95,4 +95,22 @@ defmodule LivebookWeb.Output.ImageInputComponent do
</div>
"""
end
@impl true
def handle_event("change", params, socket) do
value = %{
data: Base.decode64!(params["data"]),
height: params["height"],
width: params["width"],
format: socket.assigns.format
}
send_update(LivebookWeb.Output.InputComponent,
id: socket.assigns.input_component_id,
event: :change,
value: value
)
{:noreply, socket}
end
end

View file

@ -7,6 +7,10 @@ defmodule LivebookWeb.Output.InputComponent do
end
@impl true
def update(%{event: :change, value: value}, socket) do
{:ok, handle_change(socket, value)}
end
def update(assigns, socket) do
value = assigns.input_values[assigns.attrs.id]
@ -29,12 +33,12 @@ defmodule LivebookWeb.Output.InputComponent do
<.live_component
module={LivebookWeb.Output.ImageInputComponent}
id={"#{@id}-input"}
input_component_id={@id}
value={@value}
height={@attrs.size && elem(@attrs.size, 0)}
width={@attrs.size && elem(@attrs.size, 1)}
format={@attrs.format}
fit={@attrs.fit}
target={@myself}
/>
</div>
"""
@ -50,10 +54,30 @@ defmodule LivebookWeb.Output.InputComponent do
<.live_component
module={LivebookWeb.Output.AudioInputComponent}
id={"#{@id}-input"}
input_component_id={@id}
value={@value}
format={@attrs.format}
sampling_rate={@attrs.sampling_rate}
target={@myself}
/>
</div>
"""
end
def render(%{attrs: %{type: :file}} = assigns) do
~H"""
<div id={"#{@id}-form-#{@counter}"}>
<div class="input-label">
<%= @attrs.label %>
</div>
<.live_component
module={LivebookWeb.Output.FileInputComponent}
id={"#{@id}-input"}
input_component_id={@id}
value={@value}
accept={@attrs.accept}
input_id={@attrs.id}
session_id={@session_id}
/>
</div>
"""
@ -178,9 +202,9 @@ defmodule LivebookWeb.Output.InputComponent do
@impl true
def handle_event("change", %{"value" => html_value}, socket) do
case handle_change(socket, html_value) do
{:ok, socket} ->
{:noreply, socket}
case parse(html_value, socket.assigns.attrs) do
{:ok, value} ->
{:noreply, handle_change(socket, value)}
:error ->
# Force the current value
@ -189,8 +213,9 @@ defmodule LivebookWeb.Output.InputComponent do
end
def handle_event("submit", %{"value" => html_value}, socket) do
case handle_change(socket, html_value) do
{:ok, socket} ->
case parse(html_value, socket.assigns.attrs) do
{:ok, value} ->
socket = handle_change(socket, value)
send(self(), {:queue_bound_cells_evaluation, socket.assigns.attrs.id})
{:noreply, socket}
@ -199,22 +224,16 @@ defmodule LivebookWeb.Output.InputComponent do
end
end
defp handle_change(socket, html_value) do
case parse(html_value, socket.assigns.attrs) do
{:ok, value} ->
prev_value = socket.assigns.value
defp handle_change(socket, value) do
prev_value = socket.assigns.value
socket = assign(socket, value: value)
socket = assign(socket, value: value)
if value != prev_value do
report_change(socket)
end
{:ok, socket}
{:error, _error} ->
:error
if value != prev_value do
report_change(socket)
end
socket
end
defp report_change(%{assigns: assigns} = socket) do
@ -285,26 +304,6 @@ defmodule LivebookWeb.Output.InputComponent do
{:ok, html_value}
end
defp parse(html_value, %{type: :image} = attrs) do
{:ok,
%{
data: Base.decode64!(html_value["data"]),
height: html_value["height"],
width: html_value["width"],
format: attrs.format
}}
end
defp parse(html_value, %{type: :audio} = attrs) do
{:ok,
%{
data: Base.decode64!(html_value["data"]),
num_channels: html_value["num_channels"],
sampling_rate: html_value["sampling_rate"],
format: attrs.format
}}
end
defp report_event(socket, value) do
topic = socket.assigns.attrs.ref
event = %{value: value, origin: socket.assigns.client_id, type: :change}

View file

@ -359,6 +359,52 @@ defmodule LivebookWeb.SessionLiveTest do
assert_receive {:event, :form_ref, %{data: %{name: "sherlock"}, type: :submit}}
end
test "file input", %{conn: conn, session: session, test: test} do
section_id = insert_section(session.pid)
Process.register(self(), test)
input = %{
ref: :input_ref,
id: "input1",
type: :file,
label: "File",
default: nil,
destination: test,
accept: :any
}
Session.subscribe(session.id)
insert_cell_with_output(session.pid, section_id, {:input, input})
{:ok, view, _} = live(conn, "/sessions/#{session.id}")
view
|> file_input(~s/[data-el-outputs-container] form/, :file, [
%{
last_modified: 1_594_171_879_000,
name: "data.txt",
content: "content",
size: 7,
type: "text/plain"
}
])
|> render_upload("data.txt")
assert %{input_values: %{"input1" => value}} = Session.get_data(session.pid)
assert File.read!(value.path) == "content"
assert value.client_name == "data.txt"
# When the input disappears, the file should be removed
Session.erase_outputs(session.pid)
wait_for_session_update(session.pid)
refute File.exists?(value.path)
end
end
describe "outputs" do

View file

@ -27,6 +27,9 @@ defmodule Livebook.Runtime.NoopRuntime do
def handle_intellisense(_, _, _, _), do: make_ref()
def read_file(_, _), do: raise("not implemented")
def transfer_file(_, _, _, _), do: raise("not implemented")
def revoke_file(_, _), do: raise("not implemented")
def start_smart_cell(_, _, _, _, _), do: :ok
def set_smart_cell_parent_locators(_, _, _), do: :ok
def stop_smart_cell(_, _), do: :ok