From 6276aafa727eb1cd00d2ade52068ea45e0ec61dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Sun, 18 Jul 2021 19:05:02 +0200 Subject: [PATCH] Compute cells validity based on snapshots (#452) --- lib/livebook/session/data.ex | 242 +++++++++++++++++++--------- test/livebook/session/data_test.exs | 24 +++ 2 files changed, 187 insertions(+), 79 deletions(-) diff --git a/lib/livebook/session/data.ex b/lib/livebook/session/data.ex index 21fdb1546..4c2f46aef 100644 --- a/lib/livebook/session/data.ex +++ b/lib/livebook/session/data.ex @@ -56,7 +56,9 @@ defmodule Livebook.Session.Data do revision: cell_revision(), deltas: list(Delta.t()), revision_by_client_pid: %{pid() => cell_revision()}, + snapshot: snapshot(), evaluation_digest: String.t() | nil, + evaluation_snapshot: snapshot() | nil, evaluation_time_ms: integer() | nil, number_of_evaluations: non_neg_integer(), bound_to_input_ids: MapSet.t(Cell.id()) @@ -79,6 +81,23 @@ defmodule Livebook.Session.Data do @type index :: non_neg_integer() + # Snapshot holds information about the cell evaluation dependencies, + # for example what's the previous cell, the number of times that + # cell was evaluated, the list of available inputs, etc. + # Whenever the snapshot changes, it implies a new evaluation context, + # and basically means the cell got stale. + # + # The snapshot comprises of two actual snapshots: + # + # * `deps_snapshot` - everything related to parent cells and their + # evaluations. This is recorded once the cell starts evaluating + # + # * `bound_inputs_snapshot` - snapshot of the inputs and their values + # used by cell evaluation. This is recorded once the cell finishes + # its evaluation + # + @type snapshot :: {deps_snapshot :: term(), bound_inputs_snapshot :: term()} + # Note that all operations carry the pid of whatever # process originated the operation. Some operations # like :apply_cell_delta and :report_cell_revision @@ -214,8 +233,8 @@ defmodule Livebook.Session.Data do data |> with_actions() |> cancel_section_evaluation(section) - |> mark_section_and_dependent_cells_as_stale(section) |> set_section_parent(section, parent_section) + |> compute_snapshots_and_validity() |> set_dirty() |> wrap_ok() else @@ -231,7 +250,7 @@ defmodule Livebook.Session.Data do |> cancel_section_evaluation(section) |> add_action({:stop_evaluation, section}) |> unset_section_parent(section) - |> mark_section_and_dependent_cells_as_stale(section) + |> compute_snapshots_and_validity() |> set_dirty() |> wrap_ok() else @@ -246,6 +265,7 @@ defmodule Livebook.Session.Data do data |> with_actions() |> insert_cell(section_id, index, cell) + |> compute_snapshots_and_validity() |> set_dirty() |> wrap_ok() end @@ -258,6 +278,7 @@ defmodule Livebook.Session.Data do data |> with_actions() |> delete_section(section, delete_cells) + |> compute_snapshots_and_validity() |> set_dirty() |> wrap_ok() else @@ -270,6 +291,7 @@ defmodule Livebook.Session.Data do data |> with_actions() |> delete_cell(cell, section) + |> compute_snapshots_and_validity() |> set_dirty() |> wrap_ok() end @@ -281,6 +303,7 @@ defmodule Livebook.Session.Data do data |> with_actions() |> restore_cell(cell_bin_entry) + |> compute_snapshots_and_validity() |> set_dirty() |> wrap_ok() else @@ -294,6 +317,7 @@ defmodule Livebook.Session.Data do data |> with_actions() |> move_cell(cell, offset) + |> compute_snapshots_and_validity() |> set_dirty() |> wrap_ok() else @@ -308,6 +332,7 @@ defmodule Livebook.Session.Data do data |> with_actions() |> move_section(section, offset) + |> compute_snapshots_and_validity() |> set_dirty() |> wrap_ok() else @@ -325,6 +350,7 @@ defmodule Livebook.Session.Data do |> queue_cell_evaluation(cell, section) |> maybe_start_runtime(data) |> maybe_evaluate_queued() + |> compute_snapshots_and_validity() |> wrap_ok() else _ -> :error @@ -362,8 +388,9 @@ defmodule Livebook.Session.Data do |> with_actions() |> add_cell_evaluation_response(cell, output) |> finish_cell_evaluation(cell, section, metadata) - |> mark_dependent_cells_as_stale(cell) + |> compute_snapshots_and_validity() |> maybe_evaluate_queued() + |> compute_snapshots_and_validity() |> wrap_ok() else _ -> :error @@ -500,11 +527,10 @@ defmodule Livebook.Session.Data do |> set_cell_attributes(cell, attrs) |> then(fn {data, _} = data_actions -> {:ok, updated_cell, _} = Notebook.fetch_cell_and_section(data.notebook, cell_id) - - data_actions - |> maybe_invalidate_bound_cells(updated_cell, cell) - |> maybe_queue_bound_cells(updated_cell, cell) + maybe_queue_bound_cells(data_actions, updated_cell, cell) end) + |> maybe_evaluate_queued() + |> compute_snapshots_and_validity() |> set_dirty() |> wrap_ok() else @@ -591,13 +617,7 @@ defmodule Livebook.Session.Data do data_actions |> reduce(Enum.reverse(section.cells), &delete_cell(&1, &2, section)) else - if section.parent_id do - data_actions - |> unset_section_parent(section) - |> mark_section_and_dependent_cells_as_stale(section) - else - data_actions - end + data_actions end data_actions @@ -610,7 +630,6 @@ defmodule Livebook.Session.Data do defp delete_cell({data, _} = data_actions, cell, section) do data_actions |> cancel_cell_evaluation(cell, section) - |> mark_dependent_cells_as_stale(cell) |> add_action({:forget_evaluation, cell, section}) |> set!( notebook: Notebook.delete_cell(data.notebook, cell.id), @@ -653,7 +672,7 @@ defmodule Livebook.Session.Data do data_actions |> set!(notebook: updated_notebook) - |> update_cells_status_after_moved(data.notebook) + |> unqueue_cells_after_moved(data.notebook) end defp move_section({data, _} = data_actions, section, offset) do @@ -661,10 +680,10 @@ defmodule Livebook.Session.Data do data_actions |> set!(notebook: updated_notebook) - |> update_cells_status_after_moved(data.notebook) + |> unqueue_cells_after_moved(data.notebook) end - defp update_cells_status_after_moved({data, _} = data_actions, prev_notebook) do + defp unqueue_cells_after_moved({data, _} = data_actions, prev_notebook) do relevant_cell? = fn cell -> is_struct(cell, Cell.Elixir) or is_struct(cell, Cell.Input) end graph_before = Notebook.cell_dependency_graph(prev_notebook, cell_filter: relevant_cell?) graph_after = Notebook.cell_dependency_graph(data.notebook, cell_filter: relevant_cell?) @@ -672,7 +691,7 @@ defmodule Livebook.Session.Data do # For each path in the dependency graph, find the upmost cell # which parent changed. From that point downwards all cells # are invalidated. Then gather invalidated cells from all paths - # and mark as such. + # and unqueue them. invalidted_cell_ids = graph_after @@ -691,7 +710,6 @@ defmodule Livebook.Session.Data do end) data_actions - |> mark_cells_as_stale(invalidated_cells_with_section) |> unqueue_cells_evaluation(invalidated_cells_with_section) end @@ -782,52 +800,22 @@ defmodule Livebook.Session.Data do |> Enum.join("\n") end - defp finish_cell_evaluation(data_actions, cell, section, metadata) do + defp finish_cell_evaluation({data, _} = data_actions, cell, section, metadata) do data_actions |> update_cell_info!(cell.id, fn info -> %{ info | evaluation_status: :ready, evaluation_time_ms: metadata.evaluation_time_ms, - number_of_evaluations: info.number_of_evaluations + 1 + number_of_evaluations: info.number_of_evaluations + 1, + # After finished evaluation, take latest snapshot of bound inputs + evaluation_snapshot: + {elem(info.evaluation_snapshot, 0), bound_inputs_snapshot(data, cell)} } end) |> set_section_info!(section.id, evaluating_cell_id: nil) end - defp mark_dependent_cells_as_stale(data_actions, %Cell.Markdown{}), do: data_actions - - defp mark_dependent_cells_as_stale({data, _} = data_actions, cell) do - dependent = dependent_cells_with_section(data, cell.id) - mark_cells_as_stale(data_actions, dependent) - end - - defp mark_cells_as_stale({data, _} = data_actions, cells_with_section) do - invalidated_cells = - cells_with_section - |> Enum.map(fn {cell, _section} -> cell end) - |> Enum.filter(fn cell -> - is_struct(cell, Cell.Elixir) and data.cell_infos[cell.id].validity_status == :evaluated - end) - - data_actions - |> reduce(invalidated_cells, &set_cell_info!(&1, &2.id, validity_status: :stale)) - end - - defp mark_section_and_dependent_cells_as_stale(data_actions, section) do - section.cells - |> Enum.find(fn cell -> is_struct(cell, Cell.Elixir) end) - |> case do - nil -> - data_actions - - cell -> - data_actions - |> mark_cells_as_stale([{cell, section}]) - |> mark_dependent_cells_as_stale(cell) - end - end - defp maybe_start_runtime({data, _} = data_actions, prev_data) do if data.runtime == nil and not any_cell_queued?(prev_data) and any_cell_queued?(data) do add_action(data_actions, :start_runtime) @@ -913,12 +901,12 @@ defmodule Livebook.Session.Data do |> update_cell_info!(id, fn info -> %{ info - | evaluation_status: :evaluating, + | # Note: we intentionally mark the cell as evaluating up front, + # so that another queue operation doesn't cause duplicated + # :start_evaluation action + evaluation_status: :evaluating, evaluation_digest: nil, - # During evaluation notebook changes may invalidate the cell, - # so we mark it as up-to-date straight away and possibly mark - # it as stale during evaluation - validity_status: :evaluated, + evaluation_snapshot: info.snapshot, bound_to_input_ids: MapSet.new() } end) @@ -964,7 +952,8 @@ defmodule Livebook.Session.Data do :aborted end, evaluation_status: :ready, - evaluation_digest: nil + evaluation_digest: nil, + evaluation_snapshot: nil } end) ) @@ -1130,22 +1119,6 @@ defmodule Livebook.Session.Data do |> set!(notebook: Notebook.update_cell(data.notebook, cell.id, &Map.merge(&1, attrs))) end - defp maybe_invalidate_bound_cells({data, _} = data_actions, %Cell.Input{} = cell, prev_cell) do - if Cell.Input.invalidated?(cell, prev_cell) do - bound_cells = bound_cells_with_section(data, cell.id) - - data_actions - |> reduce(bound_cells, fn data_actions, {bound_cell, section} -> - dependent = dependent_cells_with_section(data, bound_cell.id) - mark_cells_as_stale(data_actions, [{bound_cell, section} | dependent]) - end) - else - data_actions - end - end - - defp maybe_invalidate_bound_cells(data_actions, _cell, _prev_cell), do: data_actions - defp maybe_queue_bound_cells({data, _} = data_actions, %Cell.Input{} = cell, prev_cell) do if Cell.Input.reactive_update?(cell, prev_cell) do bound_cells = bound_cells_with_section(data, cell.id) @@ -1156,7 +1129,6 @@ defmodule Livebook.Session.Data do |> queue_prerequisite_cells_evaluation(bound_cell) |> queue_cell_evaluation(bound_cell, section) end) - |> maybe_evaluate_queued() else data_actions end @@ -1231,7 +1203,9 @@ defmodule Livebook.Session.Data do evaluation_digest: nil, evaluation_time_ms: nil, number_of_evaluations: 0, - bound_to_input_ids: MapSet.new() + bound_to_input_ids: MapSet.new(), + snapshot: {:initial, :initial}, + evaluation_snapshot: nil } end @@ -1303,4 +1277,114 @@ defmodule Livebook.Session.Data do |> Notebook.child_cells_with_section(cell_id) |> Enum.filter(fn {cell, _} -> is_struct(cell, Cell.Elixir) end) end + + # Computes cell snapshots and updates validity based on the new values. + defp compute_snapshots_and_validity(data_actions) do + data_actions + |> compute_snapshots() + |> update_validity() + end + + defp compute_snapshots({data, _} = data_actions) do + graph = + Notebook.cell_dependency_graph(data.notebook, cell_filter: &is_struct(&1, Cell.Elixir)) + + cells_with_section = Notebook.elixir_cells_with_section(data.notebook) + + inputs_by_id = + for section <- data.notebook.sections, + cell <- section.cells, + is_struct(cell, Cell.Input), + into: %{}, + do: {cell.id, cell} + + graph_with_inputs = + Notebook.cell_dependency_graph(data.notebook, + cell_filter: fn cell -> + is_struct(cell, Cell.Elixir) or is_struct(cell, Cell.Input) + end + ) + + cell_snapshots = + Enum.reduce(cells_with_section, %{}, fn {cell, section}, cell_snapshots -> + info = data.cell_infos[cell.id] + prev_cell_id = graph[cell.id] + + is_branch? = section.parent_id != nil + + parent_deps = + prev_cell_id && + { + prev_cell_id, + cell_snapshots[prev_cell_id], + data.cell_infos[prev_cell_id].number_of_evaluations + } + + input_deps = + graph_with_inputs + |> Graph.find_path(cell.id, nil) + |> Enum.map(fn cell_id -> cell_id && inputs_by_id[cell_id] end) + |> Enum.reject(&is_nil/1) + |> Enum.map(& &1.name) + |> Enum.sort() + |> Enum.dedup() + + deps = {is_branch?, parent_deps, input_deps} + deps_snapshot = :erlang.phash2(deps) + + inputs_snapshot = + if info.evaluation_status == :evaluating do + # While the cell is evaluating the bound inputs snapshot + # is not stable, so we reuse the previous snapshot + elem(info.snapshot, 1) + else + bound_inputs_snapshot(data, cell) + end + + snapshot = {deps_snapshot, inputs_snapshot} + put_in(cell_snapshots[cell.id], snapshot) + end) + + reduce(data_actions, cells_with_section, fn data_actions, {cell, _} -> + update_cell_info!(data_actions, cell.id, fn info -> + snapshot = cell_snapshots[cell.id] + %{info | snapshot: snapshot} + end) + end) + end + + defp bound_inputs_snapshot(data, cell) do + %{bound_to_input_ids: bound_to_input_ids} = data.cell_infos[cell.id] + + if Enum.empty?(bound_to_input_ids) do + :initial + else + for( + section <- data.notebook.sections, + cell <- section.cells, + is_struct(cell, Cell.Input), + cell.id in bound_to_input_ids, + do: {cell.name, cell.value} + ) + |> :erlang.phash2() + end + end + + defp update_validity({data, _} = data_actions) do + cells_with_section = Notebook.elixir_cells_with_section(data.notebook) + + reduce(data_actions, cells_with_section, fn data_actions, {cell, _} -> + update_cell_info!(data_actions, cell.id, fn info -> + validity_status = + case info do + %{evaluation_snapshot: snapshot, snapshot: snapshot} -> :evaluated + %{evaluation_snapshot: nil, validity_status: :aborted} -> :aborted + %{evaluation_snapshot: nil} -> :fresh + _ -> :stale + end + + %{info | validity_status: validity_status} + end) + end) + end end diff --git a/test/livebook/session/data_test.exs b/test/livebook/session/data_test.exs index 6fe1f5447..1269936f7 100644 --- a/test/livebook/session/data_test.exs +++ b/test/livebook/session/data_test.exs @@ -1078,6 +1078,30 @@ defmodule Livebook.Session.DataTest do } }, []} = Data.apply_operation(data, operation) end + + test "moving a cell back and forth doesn't impact validity" do + data = + data_after_operations!([ + {:insert_section, self(), 0, "s1"}, + # Add cells + {:insert_cell, self(), "s1", 0, :elixir, "c1"}, + {:insert_cell, self(), "s1", 1, :elixir, "c2"}, + {:insert_cell, self(), "s1", 2, :elixir, "c3"}, + # Evaluate cells + {:set_runtime, self(), NoopRuntime.new()}, + {:queue_cell_evaluation, self(), "c1"}, + {:add_cell_evaluation_response, self(), "c1", @eval_resp, @eval_meta}, + {:queue_cell_evaluation, self(), "c2"}, + {:add_cell_evaluation_response, self(), "c2", @eval_resp, @eval_meta}, + {:queue_cell_evaluation, self(), "c3"}, + {:add_cell_evaluation_response, self(), "c3", @eval_resp, @eval_meta} + ]) + + {:ok, data_moved, []} = Data.apply_operation(data, {:move_cell, self(), "c2", -1}) + {:ok, data_reversed, []} = Data.apply_operation(data_moved, {:move_cell, self(), "c2", 1}) + + assert data_reversed == data + end end describe "apply_operation/2 given :move_section" do