From 2ec24732bd9a4ca8d11dd30559671d439b92f736 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Tue, 14 Mar 2023 23:48:07 +0100 Subject: [PATCH] Automatically evaluate changed parent cells (#1786) --- lib/livebook/session.ex | 22 ++++- lib/livebook/session/data.ex | 131 ++++++++++++++------------ lib/livebook_web/live/session_live.ex | 8 ++ test/livebook/session/data_test.exs | 27 ------ test/livebook/session_test.exs | 7 +- 5 files changed, 105 insertions(+), 90 deletions(-) diff --git a/lib/livebook/session.ex b/lib/livebook/session.ex index dd8eaa0d4..e1bfb9df0 100644 --- a/lib/livebook/session.ex +++ b/lib/livebook/session.ex @@ -1822,6 +1822,8 @@ defmodule Livebook.Session do _prev_state, {:apply_cell_delta, _client_id, cell_id, tag, _delta, _revision} ) do + hydrate_cell_source_digest(state, cell_id, tag) + with :secondary <- tag, {:ok, %Cell.Smart{} = cell, _section} <- Notebook.fetch_cell_and_section(state.data.notebook, cell_id) do @@ -1831,6 +1833,15 @@ defmodule Livebook.Session do state end + defp after_operation( + state, + _prev_state, + {:update_smart_cell, _client_id, cell_id, _attrs, _delta, _chunks, _reevaluate} + ) do + hydrate_cell_source_digest(state, cell_id, :primary) + state + end + defp after_operation(state, _prev_state, {:set_secret, _client_id, secret}) do if Runtime.connected?(state.data.runtime), do: set_runtime_secret(state, secret) state @@ -2013,8 +2024,15 @@ defmodule Livebook.Session do parent_locators = parent_locators_for_cell(state.data, cell) Runtime.evaluate_code(state.data.runtime, cell.source, locator, parent_locators, opts) - evaluation_digest = :erlang.md5(cell.source) - handle_operation(state, {:evaluation_started, @client_id, cell.id, evaluation_digest}) + state + end + + defp hydrate_cell_source_digest(state, cell_id, tag) do + # Clients prune source, so they can't compute the digest, but it's + # necessary for evaluation to know which cells are changed, so we + # always propagate the digest change to the clients + digest = state.data.cell_infos[cell_id].sources[tag].digest + broadcast_message(state.session_id, {:hydrate_cell_source_digest, cell_id, tag, digest}) end defp broadcast_operation(session_id, operation) do diff --git a/lib/livebook/session/data.ex b/lib/livebook/session/data.ex index 8b60f7a68..9d6b9bf70 100644 --- a/lib/livebook/session/data.ex +++ b/lib/livebook/session/data.ex @@ -91,7 +91,8 @@ defmodule Livebook.Session.Data do @type cell_source_info :: %{ revision: cell_revision(), deltas: list(Delta.t()), - revision_by_client_id: %{client_id() => cell_revision()} + revision_by_client_id: %{client_id() => cell_revision()}, + digest: String.t() | nil } @type cell_eval_info :: %{ @@ -183,7 +184,6 @@ defmodule Livebook.Session.Data do | {:move_cell, client_id(), Cell.id(), offset :: integer()} | {:move_section, client_id(), Section.id(), offset :: integer()} | {:queue_cells_evaluation, client_id(), list(Cell.id())} - | {:evaluation_started, client_id(), Cell.id(), binary()} | {:add_cell_evaluation_output, client_id(), Cell.id(), term()} | {:add_cell_evaluation_response, client_id(), Cell.id(), term(), metadata :: map()} | {:bind_input, client_id(), code_cell_id :: Cell.id(), input_id()} @@ -543,19 +543,6 @@ defmodule Livebook.Session.Data do end end - def apply_operation(data, {:evaluation_started, _client_id, id, evaluation_digest}) do - with {:ok, cell, _section} <- Notebook.fetch_cell_and_section(data.notebook, id), - Cell.evaluable?(cell), - :evaluating <- data.cell_infos[cell.id].eval.status do - data - |> with_actions() - |> update_cell_eval_info!(cell.id, &%{&1 | evaluation_digest: evaluation_digest}) - |> wrap_ok() - else - _ -> :error - end - end - def apply_operation(data, {:add_cell_evaluation_output, _client_id, id, output}) do with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, id) do data @@ -1416,24 +1403,26 @@ defmodule Livebook.Session.Data do data_actions |> set!(notebook: Notebook.update_cell(data.notebook, cell.id, &%{&1 | outputs: []})) - |> update_cell_eval_info!(cell.id, fn eval_info -> - %{ - eval_info - | # Note: we intentionally mark the cell as evaluating up front, - # so that another queue operation doesn't cause duplicated - # :start_evaluation action - status: :evaluating, - evaluation_number: eval_info.evaluation_number + 1, - outputs_batch_number: eval_info.outputs_batch_number + 1, - evaluation_digest: nil, - new_bound_to_input_ids: MapSet.new(), - # Keep the notebook state before evaluation - data: data, - # This is a rough estimate, the exact time is measured in the - # evaluator itself - evaluation_start: DateTime.utc_now(), - evaluation_end: nil - } + |> update_cell_info!(cell.id, fn info -> + update_in(info.eval, fn eval_info -> + %{ + eval_info + | # Note: we intentionally mark the cell as evaluating up front, + # so that another queue operation doesn't cause duplicated + # :start_evaluation action + status: :evaluating, + evaluation_number: eval_info.evaluation_number + 1, + outputs_batch_number: eval_info.outputs_batch_number + 1, + evaluation_digest: info.sources.primary.digest, + new_bound_to_input_ids: MapSet.new(), + # Keep the notebook state before evaluation + data: data, + # This is a rough estimate, the exact time is measured in the + # evaluator itself + evaluation_start: DateTime.utc_now(), + evaluation_end: nil + } + end) end) |> set_section_info!(section.id, evaluating_cell_id: cell.id, @@ -1500,7 +1489,7 @@ defmodule Livebook.Session.Data do |> Notebook.parent_cells_with_section(cell_id) |> Enum.filter(fn {cell, _section} -> info = data.cell_infos[cell.id] - Cell.evaluable?(cell) and info.eval.validity != :evaluated and info.eval.status == :ready + Cell.evaluable?(cell) and cell_outdated?(data, cell) and info.eval.status == :ready end) |> Enum.reverse() @@ -1574,15 +1563,17 @@ defmodule Livebook.Session.Data do js_view, editor ) do - updated_cell = - %{cell | chunks: chunks, js_view: js_view, editor: editor} |> apply_delta_to_cell(delta) + cell = %{cell | chunks: chunks, js_view: js_view, editor: editor} + source_info = data.cell_infos[cell.id].sources.primary + {updated_cell, source_info} = apply_delta_to_cell(cell, source_info, :primary, delta) data_actions |> set!(notebook: Notebook.update_cell(data.notebook, cell.id, fn _ -> updated_cell end)) |> update_cell_info!(cell.id, &%{&1 | status: :started}) |> update_cell_info!(cell.id, fn info -> info = %{info | status: :started} - put_in(info.sources.secondary, new_source_info(data.clients_map)) + info = put_in(info.sources.primary, source_info) + put_in(info.sources.secondary, new_source_info(editor && editor.source, data.clients_map)) end) |> add_action({:broadcast_delta, client_id, updated_cell, :primary, delta}) end @@ -1594,10 +1585,15 @@ defmodule Livebook.Session.Data do _attrs -> attrs end - updated_cell = %{cell | attrs: new_attrs, chunks: chunks} |> apply_delta_to_cell(delta) + cell = %{cell | attrs: new_attrs, chunks: chunks} + source_info = data.cell_infos[cell.id].sources.primary + {updated_cell, source_info} = apply_delta_to_cell(cell, source_info, :primary, delta) data_actions |> set!(notebook: Notebook.update_cell(data.notebook, cell.id, fn _ -> updated_cell end)) + |> update_cell_info!(cell.id, fn info -> + put_in(info.sources.primary, source_info) + end) |> add_action({:broadcast_delta, client_id, updated_cell, :primary, delta}) end @@ -1757,11 +1753,8 @@ defmodule Livebook.Session.Data do source_info end - updated_cell = - update_in(cell, source_access(cell, tag), fn - :__pruned__ -> :__pruned__ - source -> JSInterop.apply_delta_to_string(transformed_new_delta, source) - end) + {updated_cell, source_info} = + apply_delta_to_cell(cell, source_info, tag, transformed_new_delta) data_actions |> set!(notebook: Notebook.update_cell(data.notebook, cell.id, fn _ -> updated_cell end)) @@ -1769,16 +1762,30 @@ defmodule Livebook.Session.Data do |> add_action({:broadcast_delta, client_id, updated_cell, tag, transformed_new_delta}) end + # Note: the clients drop cell's source once it's no longer needed + defp apply_delta_to_cell(%{source: :__pruned__} = cell, source_info, _tag, _delta) do + {cell, source_info} + end + + defp apply_delta_to_cell(cell, source_info, tag, delta) do + cell = + update_in(cell, source_access(cell, tag), fn + :__pruned__ -> :__pruned__ + source -> JSInterop.apply_delta_to_string(delta, source) + end) + + source_info = + case get_in(cell, source_access(cell, tag)) do + :__pruned__ -> source_info + source -> %{source_info | digest: :erlang.md5(source)} + end + + {cell, source_info} + end + defp source_access(%Cell.Smart{}, :secondary), do: [Access.key(:editor), :source] defp source_access(_cell, :primary), do: [Access.key(:source)] - # Note: the clients drop cell's source once it's no longer needed - defp apply_delta_to_cell(%{source: :__pruned__} = cell, _delta), do: cell - - defp apply_delta_to_cell(cell, delta) do - update_in(cell.source, &JSInterop.apply_delta_to_string(delta, &1)) - end - defp report_revision(data_actions, client_id, cell, tag, revision) do data_actions |> update_cell_info!(cell.id, fn info -> @@ -2029,34 +2036,39 @@ defmodule Livebook.Session.Data do } end - defp new_cell_info(%Cell.Markdown{}, clients_map) do + defp new_cell_info(%Cell.Markdown{} = cell, clients_map) do %{ - sources: %{primary: new_source_info(clients_map)} + sources: %{primary: new_source_info(cell.source, clients_map)} } end - defp new_cell_info(%Cell.Code{}, clients_map) do + defp new_cell_info(%Cell.Code{} = cell, clients_map) do %{ - sources: %{primary: new_source_info(clients_map)}, + sources: %{primary: new_source_info(cell.source, clients_map)}, eval: new_eval_info() } end - defp new_cell_info(%Cell.Smart{}, clients_map) do + defp new_cell_info(%Cell.Smart{} = cell, clients_map) do %{ - sources: %{primary: new_source_info(clients_map), secondary: new_source_info(clients_map)}, + sources: %{ + primary: new_source_info(cell.source, clients_map), + secondary: new_source_info(cell.editor && cell.editor.source, clients_map) + }, eval: new_eval_info(), status: :dead } end - defp new_source_info(clients_map) do + defp new_source_info(source, clients_map) do + digest = source && :erlang.md5(source) client_ids = Map.keys(clients_map) %{ revision: 0, deltas: [], - revision_by_client_id: Map.new(client_ids, &{&1, 0}) + revision_by_client_id: Map.new(client_ids, &{&1, 0}), + digest: digest } end @@ -2473,8 +2485,7 @@ defmodule Livebook.Session.Data do @spec cell_outdated?(t(), Cell.t()) :: boolean() def cell_outdated?(data, cell) do info = data.cell_infos[cell.id] - digest = :erlang.md5(cell.source) - info.eval.validity != :evaluated or info.eval.evaluation_digest != digest + info.eval.validity != :evaluated or info.eval.evaluation_digest != info.sources.primary.digest end @doc """ diff --git a/lib/livebook_web/live/session_live.ex b/lib/livebook_web/live/session_live.ex index 98e2a4de0..b4828a522 100644 --- a/lib/livebook_web/live/session_live.ex +++ b/lib/livebook_web/live/session_live.ex @@ -1250,6 +1250,14 @@ defmodule LivebookWeb.SessionLive do |> assign(data_view: data_to_view(data))} end + def handle_info({:hydrate_cell_source_digest, cell_id, tag, digest}, socket) do + data = socket.private.data + data = put_in(data.cell_infos[cell_id].sources[tag].digest, digest) + # We don't recompute data_view, because for the cell indicator we + # still compute the digest on the client side + {:noreply, assign_private(socket, data: data)} + end + def handle_info({:session_updated, session}, socket) do {:noreply, assign(socket, :session, session)} end diff --git a/test/livebook/session/data_test.exs b/test/livebook/session/data_test.exs index 8008e9a04..e85bbbc2c 100644 --- a/test/livebook/session/data_test.exs +++ b/test/livebook/session/data_test.exs @@ -1762,30 +1762,6 @@ defmodule Livebook.Session.DataTest do end end - describe "apply_operation/2 given :evaluation_started" do - test "updates cell evaluation digest" do - data = - data_after_operations!([ - {:insert_section, @cid, 0, "s1"}, - {:insert_cell, @cid, "s1", 0, :code, "c1", %{}}, - {:set_runtime, @cid, connected_noop_runtime()}, - evaluate_cells_operations(["setup"]), - {:queue_cells_evaluation, @cid, ["c1"]} - ]) - - operation = {:evaluation_started, @cid, "c1", "digest"} - - assert {:ok, - %{ - cell_infos: %{ - "c1" => %{ - eval: %{evaluation_digest: "digest"} - } - } - }, []} = Data.apply_operation(data, operation) - end - end - describe "apply_operation/2 given :add_cell_evaluation_output" do test "updates the cell outputs" do data = @@ -4002,8 +3978,6 @@ defmodule Livebook.Session.DataTest do end end - @empty_digest :erlang.md5("") - describe "cell_ids_for_full_evaluation/2" do test "includes changed cells with dependent ones" do data = @@ -4183,7 +4157,6 @@ defmodule Livebook.Session.DataTest do ) [ - {:evaluation_started, @cid, cell_id, @empty_digest}, for input_id <- bind_inputs[cell_id] || [] do {:bind_input, @cid, cell_id, input_id} end, diff --git a/test/livebook/session_test.exs b/test/livebook/session_test.exs index 8c26cc375..ffc384f28 100644 --- a/test/livebook/session_test.exs +++ b/test/livebook/session_test.exs @@ -350,6 +350,10 @@ defmodule Livebook.SessionTest do assert_receive {:operation, {:apply_cell_delta, _client_id, ^cell_id, :primary, ^delta, ^revision}} + + # Sends new digest to clients + digest = :erlang.md5("cats") + assert_receive {:hydrate_cell_source_digest, ^cell_id, :primary, ^digest} end end @@ -975,9 +979,10 @@ defmodule Livebook.SessionTest do send(session_pid, {:pong, metadata, %{ref: "ref"}}) + # Sends new digest to clients cell_id = smart_cell.id new_digest = :erlang.md5("2") - assert_receive {:operation, {:evaluation_started, "__server__", ^cell_id, ^new_digest}} + assert_receive {:hydrate_cell_source_digest, ^cell_id, :primary, ^new_digest} end end