Automatically evaluate changed parent cells (#1786)

This commit is contained in:
Jonatan Kłosko 2023-03-14 23:48:07 +01:00 committed by GitHub
parent 5b84de6682
commit 2ec24732bd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 105 additions and 90 deletions

View file

@ -1822,6 +1822,8 @@ defmodule Livebook.Session do
_prev_state,
{:apply_cell_delta, _client_id, cell_id, tag, _delta, _revision}
) do
hydrate_cell_source_digest(state, cell_id, tag)
with :secondary <- tag,
{:ok, %Cell.Smart{} = cell, _section} <-
Notebook.fetch_cell_and_section(state.data.notebook, cell_id) do
@ -1831,6 +1833,15 @@ defmodule Livebook.Session do
state
end
defp after_operation(
state,
_prev_state,
{:update_smart_cell, _client_id, cell_id, _attrs, _delta, _chunks, _reevaluate}
) do
hydrate_cell_source_digest(state, cell_id, :primary)
state
end
defp after_operation(state, _prev_state, {:set_secret, _client_id, secret}) do
if Runtime.connected?(state.data.runtime), do: set_runtime_secret(state, secret)
state
@ -2013,8 +2024,15 @@ defmodule Livebook.Session do
parent_locators = parent_locators_for_cell(state.data, cell)
Runtime.evaluate_code(state.data.runtime, cell.source, locator, parent_locators, opts)
evaluation_digest = :erlang.md5(cell.source)
handle_operation(state, {:evaluation_started, @client_id, cell.id, evaluation_digest})
state
end
defp hydrate_cell_source_digest(state, cell_id, tag) do
# Clients prune source, so they can't compute the digest, but it's
# necessary for evaluation to know which cells are changed, so we
# always propagate the digest change to the clients
digest = state.data.cell_infos[cell_id].sources[tag].digest
broadcast_message(state.session_id, {:hydrate_cell_source_digest, cell_id, tag, digest})
end
defp broadcast_operation(session_id, operation) do

View file

@ -91,7 +91,8 @@ defmodule Livebook.Session.Data do
@type cell_source_info :: %{
revision: cell_revision(),
deltas: list(Delta.t()),
revision_by_client_id: %{client_id() => cell_revision()}
revision_by_client_id: %{client_id() => cell_revision()},
digest: String.t() | nil
}
@type cell_eval_info :: %{
@ -183,7 +184,6 @@ defmodule Livebook.Session.Data do
| {:move_cell, client_id(), Cell.id(), offset :: integer()}
| {:move_section, client_id(), Section.id(), offset :: integer()}
| {:queue_cells_evaluation, client_id(), list(Cell.id())}
| {:evaluation_started, client_id(), Cell.id(), binary()}
| {:add_cell_evaluation_output, client_id(), Cell.id(), term()}
| {:add_cell_evaluation_response, client_id(), Cell.id(), term(), metadata :: map()}
| {:bind_input, client_id(), code_cell_id :: Cell.id(), input_id()}
@ -543,19 +543,6 @@ defmodule Livebook.Session.Data do
end
end
def apply_operation(data, {:evaluation_started, _client_id, id, evaluation_digest}) do
with {:ok, cell, _section} <- Notebook.fetch_cell_and_section(data.notebook, id),
Cell.evaluable?(cell),
:evaluating <- data.cell_infos[cell.id].eval.status do
data
|> with_actions()
|> update_cell_eval_info!(cell.id, &%{&1 | evaluation_digest: evaluation_digest})
|> wrap_ok()
else
_ -> :error
end
end
def apply_operation(data, {:add_cell_evaluation_output, _client_id, id, output}) do
with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, id) do
data
@ -1416,24 +1403,26 @@ defmodule Livebook.Session.Data do
data_actions
|> set!(notebook: Notebook.update_cell(data.notebook, cell.id, &%{&1 | outputs: []}))
|> update_cell_eval_info!(cell.id, fn eval_info ->
%{
eval_info
| # Note: we intentionally mark the cell as evaluating up front,
# so that another queue operation doesn't cause duplicated
# :start_evaluation action
status: :evaluating,
evaluation_number: eval_info.evaluation_number + 1,
outputs_batch_number: eval_info.outputs_batch_number + 1,
evaluation_digest: nil,
new_bound_to_input_ids: MapSet.new(),
# Keep the notebook state before evaluation
data: data,
# This is a rough estimate, the exact time is measured in the
# evaluator itself
evaluation_start: DateTime.utc_now(),
evaluation_end: nil
}
|> update_cell_info!(cell.id, fn info ->
update_in(info.eval, fn eval_info ->
%{
eval_info
| # Note: we intentionally mark the cell as evaluating up front,
# so that another queue operation doesn't cause duplicated
# :start_evaluation action
status: :evaluating,
evaluation_number: eval_info.evaluation_number + 1,
outputs_batch_number: eval_info.outputs_batch_number + 1,
evaluation_digest: info.sources.primary.digest,
new_bound_to_input_ids: MapSet.new(),
# Keep the notebook state before evaluation
data: data,
# This is a rough estimate, the exact time is measured in the
# evaluator itself
evaluation_start: DateTime.utc_now(),
evaluation_end: nil
}
end)
end)
|> set_section_info!(section.id,
evaluating_cell_id: cell.id,
@ -1500,7 +1489,7 @@ defmodule Livebook.Session.Data do
|> Notebook.parent_cells_with_section(cell_id)
|> Enum.filter(fn {cell, _section} ->
info = data.cell_infos[cell.id]
Cell.evaluable?(cell) and info.eval.validity != :evaluated and info.eval.status == :ready
Cell.evaluable?(cell) and cell_outdated?(data, cell) and info.eval.status == :ready
end)
|> Enum.reverse()
@ -1574,15 +1563,17 @@ defmodule Livebook.Session.Data do
js_view,
editor
) do
updated_cell =
%{cell | chunks: chunks, js_view: js_view, editor: editor} |> apply_delta_to_cell(delta)
cell = %{cell | chunks: chunks, js_view: js_view, editor: editor}
source_info = data.cell_infos[cell.id].sources.primary
{updated_cell, source_info} = apply_delta_to_cell(cell, source_info, :primary, delta)
data_actions
|> set!(notebook: Notebook.update_cell(data.notebook, cell.id, fn _ -> updated_cell end))
|> update_cell_info!(cell.id, &%{&1 | status: :started})
|> update_cell_info!(cell.id, fn info ->
info = %{info | status: :started}
put_in(info.sources.secondary, new_source_info(data.clients_map))
info = put_in(info.sources.primary, source_info)
put_in(info.sources.secondary, new_source_info(editor && editor.source, data.clients_map))
end)
|> add_action({:broadcast_delta, client_id, updated_cell, :primary, delta})
end
@ -1594,10 +1585,15 @@ defmodule Livebook.Session.Data do
_attrs -> attrs
end
updated_cell = %{cell | attrs: new_attrs, chunks: chunks} |> apply_delta_to_cell(delta)
cell = %{cell | attrs: new_attrs, chunks: chunks}
source_info = data.cell_infos[cell.id].sources.primary
{updated_cell, source_info} = apply_delta_to_cell(cell, source_info, :primary, delta)
data_actions
|> set!(notebook: Notebook.update_cell(data.notebook, cell.id, fn _ -> updated_cell end))
|> update_cell_info!(cell.id, fn info ->
put_in(info.sources.primary, source_info)
end)
|> add_action({:broadcast_delta, client_id, updated_cell, :primary, delta})
end
@ -1757,11 +1753,8 @@ defmodule Livebook.Session.Data do
source_info
end
updated_cell =
update_in(cell, source_access(cell, tag), fn
:__pruned__ -> :__pruned__
source -> JSInterop.apply_delta_to_string(transformed_new_delta, source)
end)
{updated_cell, source_info} =
apply_delta_to_cell(cell, source_info, tag, transformed_new_delta)
data_actions
|> set!(notebook: Notebook.update_cell(data.notebook, cell.id, fn _ -> updated_cell end))
@ -1769,16 +1762,30 @@ defmodule Livebook.Session.Data do
|> add_action({:broadcast_delta, client_id, updated_cell, tag, transformed_new_delta})
end
# Note: the clients drop cell's source once it's no longer needed
defp apply_delta_to_cell(%{source: :__pruned__} = cell, source_info, _tag, _delta) do
{cell, source_info}
end
defp apply_delta_to_cell(cell, source_info, tag, delta) do
cell =
update_in(cell, source_access(cell, tag), fn
:__pruned__ -> :__pruned__
source -> JSInterop.apply_delta_to_string(delta, source)
end)
source_info =
case get_in(cell, source_access(cell, tag)) do
:__pruned__ -> source_info
source -> %{source_info | digest: :erlang.md5(source)}
end
{cell, source_info}
end
defp source_access(%Cell.Smart{}, :secondary), do: [Access.key(:editor), :source]
defp source_access(_cell, :primary), do: [Access.key(:source)]
# Note: the clients drop cell's source once it's no longer needed
defp apply_delta_to_cell(%{source: :__pruned__} = cell, _delta), do: cell
defp apply_delta_to_cell(cell, delta) do
update_in(cell.source, &JSInterop.apply_delta_to_string(delta, &1))
end
defp report_revision(data_actions, client_id, cell, tag, revision) do
data_actions
|> update_cell_info!(cell.id, fn info ->
@ -2029,34 +2036,39 @@ defmodule Livebook.Session.Data do
}
end
defp new_cell_info(%Cell.Markdown{}, clients_map) do
defp new_cell_info(%Cell.Markdown{} = cell, clients_map) do
%{
sources: %{primary: new_source_info(clients_map)}
sources: %{primary: new_source_info(cell.source, clients_map)}
}
end
defp new_cell_info(%Cell.Code{}, clients_map) do
defp new_cell_info(%Cell.Code{} = cell, clients_map) do
%{
sources: %{primary: new_source_info(clients_map)},
sources: %{primary: new_source_info(cell.source, clients_map)},
eval: new_eval_info()
}
end
defp new_cell_info(%Cell.Smart{}, clients_map) do
defp new_cell_info(%Cell.Smart{} = cell, clients_map) do
%{
sources: %{primary: new_source_info(clients_map), secondary: new_source_info(clients_map)},
sources: %{
primary: new_source_info(cell.source, clients_map),
secondary: new_source_info(cell.editor && cell.editor.source, clients_map)
},
eval: new_eval_info(),
status: :dead
}
end
defp new_source_info(clients_map) do
defp new_source_info(source, clients_map) do
digest = source && :erlang.md5(source)
client_ids = Map.keys(clients_map)
%{
revision: 0,
deltas: [],
revision_by_client_id: Map.new(client_ids, &{&1, 0})
revision_by_client_id: Map.new(client_ids, &{&1, 0}),
digest: digest
}
end
@ -2473,8 +2485,7 @@ defmodule Livebook.Session.Data do
@spec cell_outdated?(t(), Cell.t()) :: boolean()
def cell_outdated?(data, cell) do
info = data.cell_infos[cell.id]
digest = :erlang.md5(cell.source)
info.eval.validity != :evaluated or info.eval.evaluation_digest != digest
info.eval.validity != :evaluated or info.eval.evaluation_digest != info.sources.primary.digest
end
@doc """

View file

@ -1250,6 +1250,14 @@ defmodule LivebookWeb.SessionLive do
|> assign(data_view: data_to_view(data))}
end
def handle_info({:hydrate_cell_source_digest, cell_id, tag, digest}, socket) do
data = socket.private.data
data = put_in(data.cell_infos[cell_id].sources[tag].digest, digest)
# We don't recompute data_view, because for the cell indicator we
# still compute the digest on the client side
{:noreply, assign_private(socket, data: data)}
end
def handle_info({:session_updated, session}, socket) do
{:noreply, assign(socket, :session, session)}
end

View file

@ -1762,30 +1762,6 @@ defmodule Livebook.Session.DataTest do
end
end
describe "apply_operation/2 given :evaluation_started" do
test "updates cell evaluation digest" do
data =
data_after_operations!([
{:insert_section, @cid, 0, "s1"},
{:insert_cell, @cid, "s1", 0, :code, "c1", %{}},
{:set_runtime, @cid, connected_noop_runtime()},
evaluate_cells_operations(["setup"]),
{:queue_cells_evaluation, @cid, ["c1"]}
])
operation = {:evaluation_started, @cid, "c1", "digest"}
assert {:ok,
%{
cell_infos: %{
"c1" => %{
eval: %{evaluation_digest: "digest"}
}
}
}, []} = Data.apply_operation(data, operation)
end
end
describe "apply_operation/2 given :add_cell_evaluation_output" do
test "updates the cell outputs" do
data =
@ -4002,8 +3978,6 @@ defmodule Livebook.Session.DataTest do
end
end
@empty_digest :erlang.md5("")
describe "cell_ids_for_full_evaluation/2" do
test "includes changed cells with dependent ones" do
data =
@ -4183,7 +4157,6 @@ defmodule Livebook.Session.DataTest do
)
[
{:evaluation_started, @cid, cell_id, @empty_digest},
for input_id <- bind_inputs[cell_id] || [] do
{:bind_input, @cid, cell_id, input_id}
end,

View file

@ -350,6 +350,10 @@ defmodule Livebook.SessionTest do
assert_receive {:operation,
{:apply_cell_delta, _client_id, ^cell_id, :primary, ^delta, ^revision}}
# Sends new digest to clients
digest = :erlang.md5("cats")
assert_receive {:hydrate_cell_source_digest, ^cell_id, :primary, ^digest}
end
end
@ -975,9 +979,10 @@ defmodule Livebook.SessionTest do
send(session_pid, {:pong, metadata, %{ref: "ref"}})
# Sends new digest to clients
cell_id = smart_cell.id
new_digest = :erlang.md5("2")
assert_receive {:operation, {:evaluation_started, "__server__", ^cell_id, ^new_digest}}
assert_receive {:hydrate_cell_source_digest, ^cell_id, :primary, ^new_digest}
end
end