Compute cells validity based on snapshots (#452)

This commit is contained in:
Jonatan Kłosko 2021-07-18 19:05:02 +02:00 committed by GitHub
parent 7f06d24f61
commit 6276aafa72
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 187 additions and 79 deletions

View file

@ -56,7 +56,9 @@ defmodule Livebook.Session.Data do
revision: cell_revision(),
deltas: list(Delta.t()),
revision_by_client_pid: %{pid() => cell_revision()},
snapshot: snapshot(),
evaluation_digest: String.t() | nil,
evaluation_snapshot: snapshot() | nil,
evaluation_time_ms: integer() | nil,
number_of_evaluations: non_neg_integer(),
bound_to_input_ids: MapSet.t(Cell.id())
@ -79,6 +81,23 @@ defmodule Livebook.Session.Data do
@type index :: non_neg_integer()
# Snapshot holds information about the cell evaluation dependencies,
# for example what's the previous cell, the number of times that
# cell was evaluated, the list of available inputs, etc.
# Whenever the snapshot changes, it implies a new evaluation context,
# and basically means the cell got stale.
#
# The snapshot comprises of two actual snapshots:
#
# * `deps_snapshot` - everything related to parent cells and their
# evaluations. This is recorded once the cell starts evaluating
#
# * `bound_inputs_snapshot` - snapshot of the inputs and their values
# used by cell evaluation. This is recorded once the cell finishes
# its evaluation
#
@type snapshot :: {deps_snapshot :: term(), bound_inputs_snapshot :: term()}
# Note that all operations carry the pid of whatever
# process originated the operation. Some operations
# like :apply_cell_delta and :report_cell_revision
@ -214,8 +233,8 @@ defmodule Livebook.Session.Data do
data
|> with_actions()
|> cancel_section_evaluation(section)
|> mark_section_and_dependent_cells_as_stale(section)
|> set_section_parent(section, parent_section)
|> compute_snapshots_and_validity()
|> set_dirty()
|> wrap_ok()
else
@ -231,7 +250,7 @@ defmodule Livebook.Session.Data do
|> cancel_section_evaluation(section)
|> add_action({:stop_evaluation, section})
|> unset_section_parent(section)
|> mark_section_and_dependent_cells_as_stale(section)
|> compute_snapshots_and_validity()
|> set_dirty()
|> wrap_ok()
else
@ -246,6 +265,7 @@ defmodule Livebook.Session.Data do
data
|> with_actions()
|> insert_cell(section_id, index, cell)
|> compute_snapshots_and_validity()
|> set_dirty()
|> wrap_ok()
end
@ -258,6 +278,7 @@ defmodule Livebook.Session.Data do
data
|> with_actions()
|> delete_section(section, delete_cells)
|> compute_snapshots_and_validity()
|> set_dirty()
|> wrap_ok()
else
@ -270,6 +291,7 @@ defmodule Livebook.Session.Data do
data
|> with_actions()
|> delete_cell(cell, section)
|> compute_snapshots_and_validity()
|> set_dirty()
|> wrap_ok()
end
@ -281,6 +303,7 @@ defmodule Livebook.Session.Data do
data
|> with_actions()
|> restore_cell(cell_bin_entry)
|> compute_snapshots_and_validity()
|> set_dirty()
|> wrap_ok()
else
@ -294,6 +317,7 @@ defmodule Livebook.Session.Data do
data
|> with_actions()
|> move_cell(cell, offset)
|> compute_snapshots_and_validity()
|> set_dirty()
|> wrap_ok()
else
@ -308,6 +332,7 @@ defmodule Livebook.Session.Data do
data
|> with_actions()
|> move_section(section, offset)
|> compute_snapshots_and_validity()
|> set_dirty()
|> wrap_ok()
else
@ -325,6 +350,7 @@ defmodule Livebook.Session.Data do
|> queue_cell_evaluation(cell, section)
|> maybe_start_runtime(data)
|> maybe_evaluate_queued()
|> compute_snapshots_and_validity()
|> wrap_ok()
else
_ -> :error
@ -362,8 +388,9 @@ defmodule Livebook.Session.Data do
|> with_actions()
|> add_cell_evaluation_response(cell, output)
|> finish_cell_evaluation(cell, section, metadata)
|> mark_dependent_cells_as_stale(cell)
|> compute_snapshots_and_validity()
|> maybe_evaluate_queued()
|> compute_snapshots_and_validity()
|> wrap_ok()
else
_ -> :error
@ -500,11 +527,10 @@ defmodule Livebook.Session.Data do
|> set_cell_attributes(cell, attrs)
|> then(fn {data, _} = data_actions ->
{:ok, updated_cell, _} = Notebook.fetch_cell_and_section(data.notebook, cell_id)
data_actions
|> maybe_invalidate_bound_cells(updated_cell, cell)
|> maybe_queue_bound_cells(updated_cell, cell)
maybe_queue_bound_cells(data_actions, updated_cell, cell)
end)
|> maybe_evaluate_queued()
|> compute_snapshots_and_validity()
|> set_dirty()
|> wrap_ok()
else
@ -591,13 +617,7 @@ defmodule Livebook.Session.Data do
data_actions
|> reduce(Enum.reverse(section.cells), &delete_cell(&1, &2, section))
else
if section.parent_id do
data_actions
|> unset_section_parent(section)
|> mark_section_and_dependent_cells_as_stale(section)
else
data_actions
end
data_actions
end
data_actions
@ -610,7 +630,6 @@ defmodule Livebook.Session.Data do
defp delete_cell({data, _} = data_actions, cell, section) do
data_actions
|> cancel_cell_evaluation(cell, section)
|> mark_dependent_cells_as_stale(cell)
|> add_action({:forget_evaluation, cell, section})
|> set!(
notebook: Notebook.delete_cell(data.notebook, cell.id),
@ -653,7 +672,7 @@ defmodule Livebook.Session.Data do
data_actions
|> set!(notebook: updated_notebook)
|> update_cells_status_after_moved(data.notebook)
|> unqueue_cells_after_moved(data.notebook)
end
defp move_section({data, _} = data_actions, section, offset) do
@ -661,10 +680,10 @@ defmodule Livebook.Session.Data do
data_actions
|> set!(notebook: updated_notebook)
|> update_cells_status_after_moved(data.notebook)
|> unqueue_cells_after_moved(data.notebook)
end
defp update_cells_status_after_moved({data, _} = data_actions, prev_notebook) do
defp unqueue_cells_after_moved({data, _} = data_actions, prev_notebook) do
relevant_cell? = fn cell -> is_struct(cell, Cell.Elixir) or is_struct(cell, Cell.Input) end
graph_before = Notebook.cell_dependency_graph(prev_notebook, cell_filter: relevant_cell?)
graph_after = Notebook.cell_dependency_graph(data.notebook, cell_filter: relevant_cell?)
@ -672,7 +691,7 @@ defmodule Livebook.Session.Data do
# For each path in the dependency graph, find the upmost cell
# which parent changed. From that point downwards all cells
# are invalidated. Then gather invalidated cells from all paths
# and mark as such.
# and unqueue them.
invalidted_cell_ids =
graph_after
@ -691,7 +710,6 @@ defmodule Livebook.Session.Data do
end)
data_actions
|> mark_cells_as_stale(invalidated_cells_with_section)
|> unqueue_cells_evaluation(invalidated_cells_with_section)
end
@ -782,52 +800,22 @@ defmodule Livebook.Session.Data do
|> Enum.join("\n")
end
defp finish_cell_evaluation(data_actions, cell, section, metadata) do
defp finish_cell_evaluation({data, _} = data_actions, cell, section, metadata) do
data_actions
|> update_cell_info!(cell.id, fn info ->
%{
info
| evaluation_status: :ready,
evaluation_time_ms: metadata.evaluation_time_ms,
number_of_evaluations: info.number_of_evaluations + 1
number_of_evaluations: info.number_of_evaluations + 1,
# After finished evaluation, take latest snapshot of bound inputs
evaluation_snapshot:
{elem(info.evaluation_snapshot, 0), bound_inputs_snapshot(data, cell)}
}
end)
|> set_section_info!(section.id, evaluating_cell_id: nil)
end
defp mark_dependent_cells_as_stale(data_actions, %Cell.Markdown{}), do: data_actions
defp mark_dependent_cells_as_stale({data, _} = data_actions, cell) do
dependent = dependent_cells_with_section(data, cell.id)
mark_cells_as_stale(data_actions, dependent)
end
defp mark_cells_as_stale({data, _} = data_actions, cells_with_section) do
invalidated_cells =
cells_with_section
|> Enum.map(fn {cell, _section} -> cell end)
|> Enum.filter(fn cell ->
is_struct(cell, Cell.Elixir) and data.cell_infos[cell.id].validity_status == :evaluated
end)
data_actions
|> reduce(invalidated_cells, &set_cell_info!(&1, &2.id, validity_status: :stale))
end
defp mark_section_and_dependent_cells_as_stale(data_actions, section) do
section.cells
|> Enum.find(fn cell -> is_struct(cell, Cell.Elixir) end)
|> case do
nil ->
data_actions
cell ->
data_actions
|> mark_cells_as_stale([{cell, section}])
|> mark_dependent_cells_as_stale(cell)
end
end
defp maybe_start_runtime({data, _} = data_actions, prev_data) do
if data.runtime == nil and not any_cell_queued?(prev_data) and any_cell_queued?(data) do
add_action(data_actions, :start_runtime)
@ -913,12 +901,12 @@ defmodule Livebook.Session.Data do
|> update_cell_info!(id, fn info ->
%{
info
| evaluation_status: :evaluating,
| # Note: we intentionally mark the cell as evaluating up front,
# so that another queue operation doesn't cause duplicated
# :start_evaluation action
evaluation_status: :evaluating,
evaluation_digest: nil,
# During evaluation notebook changes may invalidate the cell,
# so we mark it as up-to-date straight away and possibly mark
# it as stale during evaluation
validity_status: :evaluated,
evaluation_snapshot: info.snapshot,
bound_to_input_ids: MapSet.new()
}
end)
@ -964,7 +952,8 @@ defmodule Livebook.Session.Data do
:aborted
end,
evaluation_status: :ready,
evaluation_digest: nil
evaluation_digest: nil,
evaluation_snapshot: nil
}
end)
)
@ -1130,22 +1119,6 @@ defmodule Livebook.Session.Data do
|> set!(notebook: Notebook.update_cell(data.notebook, cell.id, &Map.merge(&1, attrs)))
end
defp maybe_invalidate_bound_cells({data, _} = data_actions, %Cell.Input{} = cell, prev_cell) do
if Cell.Input.invalidated?(cell, prev_cell) do
bound_cells = bound_cells_with_section(data, cell.id)
data_actions
|> reduce(bound_cells, fn data_actions, {bound_cell, section} ->
dependent = dependent_cells_with_section(data, bound_cell.id)
mark_cells_as_stale(data_actions, [{bound_cell, section} | dependent])
end)
else
data_actions
end
end
defp maybe_invalidate_bound_cells(data_actions, _cell, _prev_cell), do: data_actions
defp maybe_queue_bound_cells({data, _} = data_actions, %Cell.Input{} = cell, prev_cell) do
if Cell.Input.reactive_update?(cell, prev_cell) do
bound_cells = bound_cells_with_section(data, cell.id)
@ -1156,7 +1129,6 @@ defmodule Livebook.Session.Data do
|> queue_prerequisite_cells_evaluation(bound_cell)
|> queue_cell_evaluation(bound_cell, section)
end)
|> maybe_evaluate_queued()
else
data_actions
end
@ -1231,7 +1203,9 @@ defmodule Livebook.Session.Data do
evaluation_digest: nil,
evaluation_time_ms: nil,
number_of_evaluations: 0,
bound_to_input_ids: MapSet.new()
bound_to_input_ids: MapSet.new(),
snapshot: {:initial, :initial},
evaluation_snapshot: nil
}
end
@ -1303,4 +1277,114 @@ defmodule Livebook.Session.Data do
|> Notebook.child_cells_with_section(cell_id)
|> Enum.filter(fn {cell, _} -> is_struct(cell, Cell.Elixir) end)
end
# Computes cell snapshots and updates validity based on the new values.
defp compute_snapshots_and_validity(data_actions) do
data_actions
|> compute_snapshots()
|> update_validity()
end
defp compute_snapshots({data, _} = data_actions) do
graph =
Notebook.cell_dependency_graph(data.notebook, cell_filter: &is_struct(&1, Cell.Elixir))
cells_with_section = Notebook.elixir_cells_with_section(data.notebook)
inputs_by_id =
for section <- data.notebook.sections,
cell <- section.cells,
is_struct(cell, Cell.Input),
into: %{},
do: {cell.id, cell}
graph_with_inputs =
Notebook.cell_dependency_graph(data.notebook,
cell_filter: fn cell ->
is_struct(cell, Cell.Elixir) or is_struct(cell, Cell.Input)
end
)
cell_snapshots =
Enum.reduce(cells_with_section, %{}, fn {cell, section}, cell_snapshots ->
info = data.cell_infos[cell.id]
prev_cell_id = graph[cell.id]
is_branch? = section.parent_id != nil
parent_deps =
prev_cell_id &&
{
prev_cell_id,
cell_snapshots[prev_cell_id],
data.cell_infos[prev_cell_id].number_of_evaluations
}
input_deps =
graph_with_inputs
|> Graph.find_path(cell.id, nil)
|> Enum.map(fn cell_id -> cell_id && inputs_by_id[cell_id] end)
|> Enum.reject(&is_nil/1)
|> Enum.map(& &1.name)
|> Enum.sort()
|> Enum.dedup()
deps = {is_branch?, parent_deps, input_deps}
deps_snapshot = :erlang.phash2(deps)
inputs_snapshot =
if info.evaluation_status == :evaluating do
# While the cell is evaluating the bound inputs snapshot
# is not stable, so we reuse the previous snapshot
elem(info.snapshot, 1)
else
bound_inputs_snapshot(data, cell)
end
snapshot = {deps_snapshot, inputs_snapshot}
put_in(cell_snapshots[cell.id], snapshot)
end)
reduce(data_actions, cells_with_section, fn data_actions, {cell, _} ->
update_cell_info!(data_actions, cell.id, fn info ->
snapshot = cell_snapshots[cell.id]
%{info | snapshot: snapshot}
end)
end)
end
defp bound_inputs_snapshot(data, cell) do
%{bound_to_input_ids: bound_to_input_ids} = data.cell_infos[cell.id]
if Enum.empty?(bound_to_input_ids) do
:initial
else
for(
section <- data.notebook.sections,
cell <- section.cells,
is_struct(cell, Cell.Input),
cell.id in bound_to_input_ids,
do: {cell.name, cell.value}
)
|> :erlang.phash2()
end
end
defp update_validity({data, _} = data_actions) do
cells_with_section = Notebook.elixir_cells_with_section(data.notebook)
reduce(data_actions, cells_with_section, fn data_actions, {cell, _} ->
update_cell_info!(data_actions, cell.id, fn info ->
validity_status =
case info do
%{evaluation_snapshot: snapshot, snapshot: snapshot} -> :evaluated
%{evaluation_snapshot: nil, validity_status: :aborted} -> :aborted
%{evaluation_snapshot: nil} -> :fresh
_ -> :stale
end
%{info | validity_status: validity_status}
end)
end)
end
end

View file

@ -1078,6 +1078,30 @@ defmodule Livebook.Session.DataTest do
}
}, []} = Data.apply_operation(data, operation)
end
test "moving a cell back and forth doesn't impact validity" do
data =
data_after_operations!([
{:insert_section, self(), 0, "s1"},
# Add cells
{:insert_cell, self(), "s1", 0, :elixir, "c1"},
{:insert_cell, self(), "s1", 1, :elixir, "c2"},
{:insert_cell, self(), "s1", 2, :elixir, "c3"},
# Evaluate cells
{:set_runtime, self(), NoopRuntime.new()},
{:queue_cell_evaluation, self(), "c1"},
{:add_cell_evaluation_response, self(), "c1", @eval_resp, @eval_meta},
{:queue_cell_evaluation, self(), "c2"},
{:add_cell_evaluation_response, self(), "c2", @eval_resp, @eval_meta},
{:queue_cell_evaluation, self(), "c3"},
{:add_cell_evaluation_response, self(), "c3", @eval_resp, @eval_meta}
])
{:ok, data_moved, []} = Data.apply_operation(data, {:move_cell, self(), "c2", -1})
{:ok, data_reversed, []} = Data.apply_operation(data_moved, {:move_cell, self(), "c2", 1})
assert data_reversed == data
end
end
describe "apply_operation/2 given :move_section" do