From 8beeb48d1b0b1bcd1e99ce37d51edf2a9592970f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20K=C5=82osko?= Date: Thu, 14 Jan 2021 19:41:11 +0100 Subject: [PATCH] Implement evaluation cancellation (#7) * Implement evaluation cancellation * Forger cell evaluation on deletion * Further operation fixes * Implement new side effects approach --- lib/live_book/session.ex | 94 ++++++------ lib/live_book/session/data.ex | 204 ++++++++++++++++++--------- test/live_book/session/data_test.exs | 177 +++++++++++++++++++---- test/live_book/session_test.exs | 17 +++ 4 files changed, 353 insertions(+), 139 deletions(-) diff --git a/lib/live_book/session.ex b/lib/live_book/session.ex index 14c0a1972..1365ef33e 100644 --- a/lib/live_book/session.ex +++ b/lib/live_book/session.ex @@ -96,6 +96,14 @@ defmodule LiveBook.Session do GenServer.cast(name(session_id), {:queue_cell_evaluation, cell_id}) end + @doc """ + Asynchronously sends cell evaluation cancellation request to the server. + """ + @spec cancel_cell_evaluation(id(), Cell.id()) :: :ok + def cancel_cell_evaluation(session_id, cell_id) do + GenServer.cast(name(session_id), {:cancel_cell_evaluation, cell_id}) + end + @doc """ Synchronously stops the server. """ @@ -137,10 +145,7 @@ defmodule LiveBook.Session do def handle_cast({:delete_section, section_id}, state) do operation = {:delete_section, section_id} - - handle_operation(state, operation, fn new_state -> - delete_section_evaluator(new_state, section_id) - end) + handle_operation(state, operation) end def handle_cast({:delete_cell, cell_id}, state) do @@ -150,10 +155,12 @@ defmodule LiveBook.Session do def handle_cast({:queue_cell_evaluation, cell_id}, state) do operation = {:queue_cell_evaluation, cell_id} + handle_operation(state, operation) + end - handle_operation(state, operation, fn new_state -> - maybe_trigger_evaluations(state, new_state) - end) + def handle_cast({:cancel_cell_evaluation, cell_id}, state) do + operation = {:cancel_cell_evaluation, cell_id} + handle_operation(state, operation) end @impl true @@ -168,10 +175,7 @@ defmodule LiveBook.Session do def handle_info({:evaluator_response, cell_id, response}, state) do operation = {:add_cell_evaluation_response, cell_id, response} - - handle_operation(state, operation, fn new_state -> - maybe_trigger_evaluations(state, new_state) - end) + handle_operation(state, operation) end # --- @@ -181,71 +185,67 @@ defmodule LiveBook.Session do # * broadcasts the operation to all clients immediately, # so that they can update their local `Data` # * applies the operation to own local `Data` - # * optionally performs a relevant task (e.g. starts cell evaluation), + # * if necessary, performs the relevant tasks (e.g. starts cell evaluation), # to reflect the new `Data` # defp handle_operation(state, operation) do - handle_operation(state, operation, fn state -> state end) - end - - defp handle_operation(state, operation, handle_new_state) do broadcast_operation(state.session_id, operation) case Data.apply_operation(state.data, operation) do - {:ok, new_data} -> + {:ok, new_data, actions} -> new_state = %{state | data: new_data} - {:noreply, handle_new_state.(new_state)} + {:noreply, handle_actions(new_state, actions)} :error -> {:noreply, state} end end + defp handle_actions(state, actions) do + Enum.reduce(actions, state, &handle_action(&2, &1)) + end + + defp handle_action(state, {:start_evaluation, cell, section}) do + trigger_evaluation(state, cell, section) + end + + defp handle_action(state, {:stop_evaluation, section}) do + delete_section_evaluator(state, section.id) + end + + defp handle_action(state, {:forget_evaluation, cell, section}) do + with {:ok, evaluator} <- fetch_section_evaluator(state, section.id) do + Evaluator.forget_evaluation(evaluator, cell.id) + end + + state + end + defp broadcast_operation(session_id, operation) do message = {:operation, operation} Phoenix.PubSub.broadcast(LiveBook.PubSub, "sessions:#{session_id}", message) end - # Compares sections in the old and new state and if a new cell - # has been marked as evaluating it triggers the actual evaluation task. - defp maybe_trigger_evaluations(old_state, new_state) do - Enum.reduce(new_state.data.notebook.sections, new_state, fn section, state -> - case {Data.get_evaluating_cell_id(old_state.data, section.id), - Data.get_evaluating_cell_id(new_state.data, section.id)} do - {_, nil} -> - # No cell to evaluate - state - - {cell_id, cell_id} -> - # The evaluating cell hasn't changed, so it must be already evaluating - state - - {_, cell_id} -> - # The evaluating cell changed, so we trigger the evaluation to reflect that - trigger_evaluation(state, cell_id) - end - end) - end - - defp trigger_evaluation(state, cell_id) do - notebook = state.data.notebook - {:ok, cell, section} = Notebook.fetch_cell_and_section(notebook, cell_id) + defp trigger_evaluation(state, cell, section) do {state, evaluator} = get_section_evaluator(state, section.id) - %{source: source} = cell prev_ref = - case Notebook.parent_cells(notebook, cell_id) do + case Notebook.parent_cells(state.data.notebook, cell.id) do [parent | _] -> parent.id [] -> :initial end - Evaluator.evaluate_code(evaluator, self(), source, cell_id, prev_ref) + Evaluator.evaluate_code(evaluator, self(), cell.source, cell.id, prev_ref) state end + defp fetch_section_evaluator(state, section_id) do + Map.fetch(state.evaluators, section_id) + end + defp get_section_evaluator(state, section_id) do - case Map.fetch(state.evaluators, section_id) do + case fetch_section_evaluator(state, section_id) do {:ok, evaluator} -> {state, evaluator} @@ -257,7 +257,7 @@ defmodule LiveBook.Session do end defp delete_section_evaluator(state, section_id) do - case Map.fetch(state.evaluators, section_id) do + case fetch_section_evaluator(state, section_id) do {:ok, evaluator} -> EvaluatorSupervisor.terminate_evaluator(evaluator) %{state | evaluators: Map.delete(state.evaluators, section_id)} diff --git a/lib/live_book/session/data.ex b/lib/live_book/session/data.ex index d033b737b..dc639af80 100644 --- a/lib/live_book/session/data.ex +++ b/lib/live_book/session/data.ex @@ -60,6 +60,12 @@ defmodule LiveBook.Session.Data do | {:queue_cell_evaluation, Cell.id()} | {:add_cell_evaluation_stdout, Cell.id(), String.t()} | {:add_cell_evaluation_response, Cell.id(), Evaluator.evaluation_response()} + | {:cancel_cell_evaluation, Cell.id()} + + @type action :: + {:start_evaluation, Cell.t(), Section.t()} + | {:stop_evaluation, Section.t()} + | {:forget_evaluation, Cell.t(), Section.t()} @doc """ Returns a fresh notebook session state. @@ -88,21 +94,25 @@ defmodule LiveBook.Session.Data do the system reflects the new structure. For instance, when a new cell is marked as evaluating, the session process should take care of triggering actual evaluation. - Returns `{:ok, data}` on correct application or `:error` if the operation - is not valid. The `:error` is generally expected given the collaborative - nature of sessions. For example if there are simultaneous deletion - and evaluation operations on the same cell, we may perform delete first, + Returns `{:ok, data, actions}` if the operation is valid, where `data` is the result + of applying said operation to the given data, and `actions` is a list + of side effects that should be performed for the new data to hold true. + + Returns `:error` if the operation is not valid. The `:error` is generally + expected given the collaborative nature of sessions. For example if there are + simultaneous deletion and evaluation operations on the same cell, we may perform delete first, in which case the evaluation is no longer valid (there's no cell with the given id). By returning `:error` we simply notify the caller that no changes were applied, so any related actions can be ignored. """ - @spec apply_operation(t(), operation()) :: {:ok, t()} | :error + @spec apply_operation(t(), operation()) :: {:ok, t(), list(action())} | :error def apply_operation(data, operation) def apply_operation(data, {:insert_section, index, id}) do section = %{Section.new() | id: id} data + |> with_actions() |> insert_section(index, section) |> wrap_ok() end @@ -112,34 +122,44 @@ defmodule LiveBook.Session.Data do cell = %{Cell.new(type) | id: id} data + |> with_actions() |> insert_cell(section_id, index, cell) |> wrap_ok() end end def apply_operation(data, {:delete_section, id}) do - with {:ok, section} <- Notebook.fetch_section(data.notebook, id), - # If a cell within this section is being evaluated, it should be cancelled first - nil <- data.section_infos[section.id].evaluating_cell_id do + with {:ok, section} <- Notebook.fetch_section(data.notebook, id) do data + |> with_actions() |> delete_section(section) |> wrap_ok() - else - _ -> :error end end def apply_operation(data, {:delete_cell, id}) do - with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id), - # If the cell is being evaluated, it should be cancelled first - false <- data.cell_infos[cell.id].status == :evaluating do - data - |> unqueue_cell_evaluation_if_any(cell, section) - |> mark_dependent_cells_as_stale(cell) + with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id) do + case data.cell_infos[cell.id].status do + :evaluating -> + data + |> with_actions() + |> clear_section_evaluation(section) + + :queued -> + data + |> with_actions() + |> unqueue_cell_evaluation(cell, section) + |> unqueue_dependent_cells_evaluation(cell, section) + |> mark_dependent_cells_as_stale(cell) + + _ -> + data + |> with_actions() + |> mark_dependent_cells_as_stale(cell) + end |> delete_cell(cell) + |> add_action({:forget_evaluation, cell, section}) |> wrap_ok() - else - _ -> :error end end @@ -147,10 +167,9 @@ defmodule LiveBook.Session.Data do with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id), :elixir <- cell.type, false <- data.cell_infos[cell.id].status in [:queued, :evaluating] do - prerequisites_queue = fresh_parent_cells_queue(data, cell) - data - |> reduce(prerequisites_queue, &queue_cell_evaluation(&1, &2, section)) + |> with_actions() + |> queue_prerequisite_cells_evaluation(cell, section) |> queue_cell_evaluation(cell, section) |> maybe_evaluate_queued() |> wrap_ok() @@ -162,6 +181,7 @@ defmodule LiveBook.Session.Data do def apply_operation(data, {:add_cell_evaluation_stdout, id, string}) do with {:ok, cell, _} <- Notebook.fetch_cell_and_section(data.notebook, id) do data + |> with_actions() |> add_cell_evaluation_stdout(cell, string) |> wrap_ok() end @@ -170,6 +190,7 @@ defmodule LiveBook.Session.Data do def apply_operation(data, {:add_cell_evaluation_response, id, response}) do with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id) do data + |> with_actions() |> add_cell_evaluation_response(cell, response) |> finish_cell_evaluation(cell, section) |> mark_dependent_cells_as_stale(cell) @@ -178,26 +199,53 @@ defmodule LiveBook.Session.Data do end end + def apply_operation(data, {:cancel_cell_evaluation, id}) do + with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id) do + case data.cell_infos[cell.id].status do + :evaluating -> + data + |> with_actions() + |> clear_section_evaluation(section) + |> wrap_ok() + + :queued -> + data + |> with_actions() + |> unqueue_cell_evaluation(cell, section) + |> unqueue_dependent_cells_evaluation(cell, section) + |> mark_dependent_cells_as_stale(cell) + |> wrap_ok() + + _ -> + :error + end + end + end + # === - defp insert_section(data, index, section) do - data + defp with_actions(data, actions \\ []), do: {data, actions} + + defp wrap_ok({data, actions}), do: {:ok, data, actions} + + defp insert_section({data, _} = data_actions, index, section) do + data_actions |> set!( notebook: Notebook.insert_section(data.notebook, index, section), section_infos: Map.put(data.section_infos, section.id, new_section_info()) ) end - defp insert_cell(data, section_id, index, cell) do - data + defp insert_cell({data, _} = data_actions, section_id, index, cell) do + data_actions |> set!( notebook: Notebook.insert_cell(data.notebook, section_id, index, cell), cell_infos: Map.put(data.cell_infos, cell.id, new_cell_info()) ) end - defp delete_section(data, section) do - data + defp delete_section({data, _} = data_actions, section) do + data_actions |> set!( notebook: Notebook.delete_section(data.notebook, section.id), section_infos: Map.delete(data.section_infos, section.id), @@ -206,8 +254,8 @@ defmodule LiveBook.Session.Data do |> reduce(section.cells, &delete_cell_info/2) end - defp delete_cell(data, cell) do - data + defp delete_cell({data, _} = data_actions, cell) do + data_actions |> set!( notebook: Notebook.delete_cell(data.notebook, cell.id), deleted_cells: [cell | data.deleted_cells] @@ -215,88 +263,115 @@ defmodule LiveBook.Session.Data do |> delete_cell_info(cell) end - defp delete_cell_info(data, cell) do - data + defp delete_cell_info({data, _} = data_actions, cell) do + data_actions |> set!(cell_infos: Map.delete(data.cell_infos, cell.id)) end - defp queue_cell_evaluation(data, cell, section) do - data + defp queue_cell_evaluation(data_actions, cell, section) do + data_actions |> update_section_info!(section.id, fn section -> %{section | evaluation_queue: section.evaluation_queue ++ [cell.id]} end) |> set_cell_info!(cell.id, status: :queued) end - defp unqueue_cell_evaluation_if_any(data, cell, section) do - data + defp unqueue_cell_evaluation(data_actions, cell, section) do + data_actions |> update_section_info!(section.id, fn section -> %{section | evaluation_queue: List.delete(section.evaluation_queue, cell.id)} end) |> set_cell_info!(cell.id, status: :stale) end - defp add_cell_evaluation_stdout(data, _cell, _string) do - data + defp add_cell_evaluation_stdout({data, _} = data_actions, _cell, _string) do + data_actions |> set!( # TODO: add stdout to cell outputs notebook: data.notebook ) end - defp add_cell_evaluation_response(data, _cell, _response) do - data + defp add_cell_evaluation_response({data, _} = data_actions, _cell, _response) do + data_actions |> set!( # TODO: add result to outputs notebook: data.notebook ) end - defp finish_cell_evaluation(data, cell, section) do - data + defp finish_cell_evaluation(data_actions, cell, section) do + data_actions |> set_cell_info!(cell.id, status: :evaluated, evaluated_at: DateTime.utc_now()) |> set_section_info!(section.id, evaluating_cell_id: nil) end - defp mark_dependent_cells_as_stale(data, cell) do - invalidated_cells = evaluated_child_cells(data, cell) + defp mark_dependent_cells_as_stale({data, _} = data_actions, cell) do + invalidated_cells = child_cells_with_status(data, cell, :evaluated) - data + data_actions |> reduce(invalidated_cells, &set_cell_info!(&1, &2.id, status: :stale)) end defp fresh_parent_cells_queue(data, cell) do data.notebook |> Notebook.parent_cells(cell.id) - |> Enum.filter(fn parent -> data.cell_infos[parent.id].status == :fresh end) + |> Enum.take_while(fn parent -> data.cell_infos[parent.id].status == :fresh end) |> Enum.reverse() end - defp evaluated_child_cells(data, cell) do + defp child_cells_with_status(data, cell, status) do data.notebook |> Notebook.child_cells(cell.id) - # Mark only evaluted cells as stale - |> Enum.filter(fn cell -> data.cell_infos[cell.id].status == :evaluated end) + |> Enum.filter(fn cell -> data.cell_infos[cell.id].status == status end) end # If there are idle sections with non-empty evaluation queue, # the next queued cell for evaluation. - defp maybe_evaluate_queued(data) do - Enum.reduce(data.notebook.sections, data, fn section, data -> + defp maybe_evaluate_queued({data, _} = data_actions) do + Enum.reduce(data.notebook.sections, data_actions, fn section, data_actions -> + {data, _} = data_actions + case data.section_infos[section.id] do %{evaluating_cell_id: nil, evaluation_queue: [id | ids]} -> - data + cell = Enum.find(section.cells, &(&1.id == id)) + + data_actions |> set!(notebook: Notebook.update_cell(data.notebook, id, &%{&1 | outputs: []})) |> set_cell_info!(id, status: :evaluating) |> set_section_info!(section.id, evaluating_cell_id: id, evaluation_queue: ids) + |> add_action({:start_evaluation, cell, section}) _ -> - data + data_actions end end) end - defp wrap_ok(value), do: {:ok, value} + defp clear_section_evaluation(data_actions, section) do + data_actions + |> set_section_info!(section.id, evaluating_cell_id: nil, evaluation_queue: []) + |> reduce(section.cells, &set_cell_info!(&1, &2.id, status: :fresh)) + |> add_action({:stop_evaluation, section}) + end + + defp queue_prerequisite_cells_evaluation({data, _} = data_actions, cell, section) do + prerequisites_queue = fresh_parent_cells_queue(data, cell) + + data_actions + |> reduce(prerequisites_queue, &queue_cell_evaluation(&1, &2, section)) + end + + defp unqueue_dependent_cells_evaluation({data, _} = data_actions, cell, section) do + queued_dependent_cells = child_cells_with_status(data, cell, :queued) + + data_actions + |> reduce(queued_dependent_cells, &unqueue_cell_evaluation(&1, &2, section)) + end + + defp add_action({data, actions}, action) do + {data, actions ++ [action]} + end defp new_section_info() do %{ @@ -314,40 +389,41 @@ defmodule LiveBook.Session.Data do } end - defp set!(data, changes) do + defp set!({data, actions}, changes) do Enum.reduce(changes, data, fn {key, value}, info -> Map.replace!(info, key, value) end) + |> with_actions(actions) end - defp set_cell_info!(data, cell_id, changes) do - update_cell_info!(data, cell_id, fn info -> + defp set_cell_info!(data_actions, cell_id, changes) do + update_cell_info!(data_actions, cell_id, fn info -> Enum.reduce(changes, info, fn {key, value}, info -> Map.replace!(info, key, value) end) end) end - defp update_cell_info!(data, cell_id, fun) do + defp update_cell_info!({data, _} = data_actions, cell_id, fun) do cell_infos = Map.update!(data.cell_infos, cell_id, fun) - set!(data, cell_infos: cell_infos) + set!(data_actions, cell_infos: cell_infos) end - defp set_section_info!(data, section_id, changes) do - update_section_info!(data, section_id, fn info -> + defp set_section_info!(data_actions, section_id, changes) do + update_section_info!(data_actions, section_id, fn info -> Enum.reduce(changes, info, fn {key, value}, info -> Map.replace!(info, key, value) end) end) end - defp update_section_info!(data, section_id, fun) do + defp update_section_info!({data, _} = data_actions, section_id, fun) do section_infos = Map.update!(data.section_infos, section_id, fun) - set!(data, section_infos: section_infos) + set!(data_actions, section_infos: section_infos) end - defp reduce(data, list, reducer) do - Enum.reduce(list, data, fn elem, data -> reducer.(data, elem) end) + defp reduce(data_actions, list, reducer) do + Enum.reduce(list, data_actions, fn elem, data_actions -> reducer.(data_actions, elem) end) end @doc """ diff --git a/test/live_book/session/data_test.exs b/test/live_book/session/data_test.exs index 0e6693d49..71dae5f78 100644 --- a/test/live_book/session/data_test.exs +++ b/test/live_book/session/data_test.exs @@ -15,7 +15,7 @@ defmodule LiveBook.Session.DataTest do sections: [%{id: "s1"}] }, section_infos: %{"s1" => _} - }} = Data.apply_operation(data, operation) + }, []} = Data.apply_operation(data, operation) end end @@ -42,7 +42,7 @@ defmodule LiveBook.Session.DataTest do ] }, cell_infos: %{"c1" => _} - }} = Data.apply_operation(data, operation) + }, []} = Data.apply_operation(data, operation) end end @@ -53,18 +53,6 @@ defmodule LiveBook.Session.DataTest do assert :error = Data.apply_operation(data, operation) end - test "returns an error for an evaluating section" do - data = - data_after_operations!([ - {:insert_section, 0, "s1"}, - {:insert_cell, "s1", 0, :elixir, "c1"}, - {:queue_cell_evaluation, "c1"} - ]) - - operation = {:delete_section, "s1"} - assert :error = Data.apply_operation(data, operation) - end - test "removes the section from notebook and session info, adds to deleted sections" do data = data_after_operations!([ @@ -81,7 +69,7 @@ defmodule LiveBook.Session.DataTest do }, section_infos: ^empty_map, deleted_sections: [%{id: "s1"}] - }} = Data.apply_operation(data, operation) + }, []} = Data.apply_operation(data, operation) end end @@ -92,16 +80,23 @@ defmodule LiveBook.Session.DataTest do assert :error = Data.apply_operation(data, operation) end - test "returns an error for an evaluating cell" do + test "if the cell is evaluating, cencels section evaluation" do data = data_after_operations!([ {:insert_section, 0, "s1"}, {:insert_cell, "s1", 0, :elixir, "c1"}, - {:queue_cell_evaluation, "c1"} + {:insert_cell, "s1", 1, :elixir, "c2"}, + {:queue_cell_evaluation, "c1"}, + {:queue_cell_evaluation, "c2"} ]) operation = {:delete_cell, "c1"} - assert :error = Data.apply_operation(data, operation) + + assert {:ok, + %{ + cell_infos: %{"c2" => %{status: :fresh}}, + section_infos: %{"s1" => %{evaluating_cell_id: nil, evaluation_queue: []}} + }, _actions} = Data.apply_operation(data, operation) end test "removes the cell from notebook and session info, adds to deleted cells" do @@ -121,7 +116,7 @@ defmodule LiveBook.Session.DataTest do }, cell_infos: ^empty_map, deleted_cells: [%{id: "c1"}] - }} = Data.apply_operation(data, operation) + }, _actions} = Data.apply_operation(data, operation) end test "unqueues the cell if it's queued for evaluation" do @@ -139,7 +134,7 @@ defmodule LiveBook.Session.DataTest do assert {:ok, %{ section_infos: %{"s1" => %{evaluation_queue: []}} - }} = Data.apply_operation(data, operation) + }, _actions} = Data.apply_operation(data, operation) end test "marks evaluated child cells as stale" do @@ -160,7 +155,20 @@ defmodule LiveBook.Session.DataTest do assert {:ok, %{ cell_infos: %{"c2" => %{status: :stale}} - }} = Data.apply_operation(data, operation) + }, _actions} = Data.apply_operation(data, operation) + end + + test "returns forget evaluation action" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"} + ]) + + operation = {:delete_cell, "c1"} + + assert {:ok, _data, [{:forget_evaluation, %{id: "c1"}, %{id: "s1"}}]} = + Data.apply_operation(data, operation) end end @@ -207,7 +215,20 @@ defmodule LiveBook.Session.DataTest do %{ cell_infos: %{"c1" => %{status: :evaluating}}, section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: []}} - }} = Data.apply_operation(data, operation) + }, _actions} = Data.apply_operation(data, operation) + end + + test "returns start evaluation action if the corresponding section is idle" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"} + ]) + + operation = {:queue_cell_evaluation, "c1"} + + assert {:ok, _data, [{:start_evaluation, %{id: "c1"}, %{id: "s1"}}]} = + Data.apply_operation(data, operation) end test "marks the cell as queued if the corresponding section is already evaluating" do @@ -225,7 +246,7 @@ defmodule LiveBook.Session.DataTest do %{ cell_infos: %{"c2" => %{status: :queued}}, section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: ["c2"]}} - }} = Data.apply_operation(data, operation) + }, []} = Data.apply_operation(data, operation) end end @@ -255,7 +276,7 @@ defmodule LiveBook.Session.DataTest do %{ cell_infos: %{"c1" => %{status: :evaluated}}, section_infos: %{"s1" => %{evaluating_cell_id: nil, evaluation_queue: []}} - }} = Data.apply_operation(data, operation) + }, []} = Data.apply_operation(data, operation) end test "marks next queued cell in this section as evaluating if there is one" do @@ -274,7 +295,9 @@ defmodule LiveBook.Session.DataTest do %{ cell_infos: %{"c2" => %{status: :evaluating}}, section_infos: %{"s1" => %{evaluating_cell_id: "c2", evaluation_queue: []}} - }} = Data.apply_operation(data, operation) + }, + [{:start_evaluation, %{id: "c2"}, %{id: "s1"}}]} = + Data.apply_operation(data, operation) end test "if parent cells are not executed, marks them for evaluation first" do @@ -294,7 +317,9 @@ defmodule LiveBook.Session.DataTest do "c2" => %{status: :queued} }, section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: ["c2"]}} - }} = Data.apply_operation(data, operation) + }, + [{:start_evaluation, %{id: "c1"}, %{id: "s1"}}]} = + Data.apply_operation(data, operation) end test "marks evaluated child cells as stale" do @@ -317,14 +342,110 @@ defmodule LiveBook.Session.DataTest do assert {:ok, %{ cell_infos: %{"c2" => %{status: :stale}} - }} = Data.apply_operation(data, operation) + }, []} = Data.apply_operation(data, operation) + end + end + + describe "apply_operation/2 given :cancel_cell_evaluation" do + test "returns an error given invalid cell id" do + data = Data.new() + operation = {:cancel_cell_evaluation, "nonexistent"} + assert :error = Data.apply_operation(data, operation) + end + + test "returns an error for an evaluated cell" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:queue_cell_evaluation, "c1"}, + {:add_cell_evaluation_response, "c1", {:ok, [1, 2, 3]}} + ]) + + operation = {:cancel_cell_evaluation, "c1"} + assert :error = Data.apply_operation(data, operation) + end + + test "if the cell is evaluating, clears the corresponding section evaluation and the queue" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:insert_cell, "s1", 1, :elixir, "c2"}, + {:queue_cell_evaluation, "c1"}, + {:queue_cell_evaluation, "c2"} + ]) + + operation = {:cancel_cell_evaluation, "c1"} + + assert {:ok, + %{ + cell_infos: %{"c1" => %{status: :fresh}, "c2" => %{status: :fresh}}, + section_infos: %{"s1" => %{evaluating_cell_id: nil, evaluation_queue: []}} + }, _actions} = Data.apply_operation(data, operation) + end + + test "if the cell is evaluating, returns stop evaluation action" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:insert_cell, "s1", 1, :elixir, "c2"}, + {:queue_cell_evaluation, "c1"}, + {:queue_cell_evaluation, "c2"} + ]) + + operation = {:cancel_cell_evaluation, "c1"} + + assert {:ok, _data, [{:stop_evaluation, %{id: "s1"}}]} = + Data.apply_operation(data, operation) + end + + test "if the cell is queued, unqueues it" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:insert_cell, "s1", 1, :elixir, "c2"}, + {:queue_cell_evaluation, "c1"}, + {:queue_cell_evaluation, "c2"} + ]) + + operation = {:cancel_cell_evaluation, "c2"} + + assert {:ok, + %{ + cell_infos: %{"c2" => %{status: :stale}}, + section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: []}} + }, []} = Data.apply_operation(data, operation) + end + + test "if the cell is queued, unqueues dependent cells that are also queued" do + data = + data_after_operations!([ + {:insert_section, 0, "s1"}, + {:insert_cell, "s1", 0, :elixir, "c1"}, + {:insert_cell, "s1", 1, :elixir, "c2"}, + {:insert_cell, "s1", 2, :elixir, "c3"}, + {:queue_cell_evaluation, "c1"}, + {:queue_cell_evaluation, "c2"}, + {:queue_cell_evaluation, "c3"} + ]) + + operation = {:cancel_cell_evaluation, "c2"} + + assert {:ok, + %{ + cell_infos: %{"c3" => %{status: :stale}}, + section_infos: %{"s1" => %{evaluating_cell_id: "c1", evaluation_queue: []}} + }, []} = Data.apply_operation(data, operation) end end defp data_after_operations!(operations) do Enum.reduce(operations, Data.new(), fn operation, data -> case Data.apply_operation(data, operation) do - {:ok, data} -> + {:ok, data, _action} -> data :error -> diff --git a/test/live_book/session_test.exs b/test/live_book/session_test.exs index 5bac527d1..e047c4584 100644 --- a/test/live_book/session_test.exs +++ b/test/live_book/session_test.exs @@ -72,6 +72,18 @@ defmodule LiveBook.SessionTest do end end + describe "cancel_cell_evaluation/2" do + test "sends a cancel evaluation operation to subscribers", %{session_id: session_id} do + Phoenix.PubSub.subscribe(LiveBook.PubSub, "sessions:#{session_id}") + + {_section_id, cell_id} = insert_section_and_cell(session_id) + queue_evaluation(session_id, cell_id) + + Session.cancel_cell_evaluation(session_id, cell_id) + assert_receive {:operation, {:cancel_cell_evaluation, ^cell_id}} + end + end + defp insert_section_and_cell(session_id) do Session.insert_section(session_id, 0) assert_receive {:operation, {:insert_section, 0, section_id}} @@ -80,4 +92,9 @@ defmodule LiveBook.SessionTest do {section_id, cell_id} end + + defp queue_evaluation(session_id, cell_id) do + Session.queue_cell_evaluation(session_id, cell_id) + assert_receive {:operation, {:add_cell_evaluation_response, ^cell_id, _}} + end end