diff --git a/lib/livebook/runtime.ex b/lib/livebook/runtime.ex index 23be0159a..a601420c7 100644 --- a/lib/livebook/runtime.ex +++ b/lib/livebook/runtime.ex @@ -42,6 +42,9 @@ defprotocol Livebook.Runtime do * `{:evaluation_stdout, ref, string}` - output captured during evaluation * `{:evaluation_response, ref, response}` - final result of the evaluation + If the evaluation state within a container is lost (e.g. a process goes down), + the runtime can send `{:container_down, container_ref, message}` to notify the owner. + ## Options * `:file` - file to which the evaluated code belongs. Most importantly, diff --git a/lib/livebook/runtime/erl_dist/manager.ex b/lib/livebook/runtime/erl_dist/manager.ex index 53b485d1e..89da6c2e3 100644 --- a/lib/livebook/runtime/erl_dist/manager.ex +++ b/lib/livebook/runtime/erl_dist/manager.ex @@ -156,6 +156,22 @@ defmodule Livebook.Runtime.ErlDist.Manager do {:stop, :normal, state} end + def handle_info({:DOWN, _, :process, pid, reason}, state) do + state.evaluators + |> Enum.find(fn {_container_ref, evaluator_pid} -> + evaluator_pid == pid + end) + |> case do + {container_ref, _} -> + message = Exception.format_exit(reason) + send(state.owner, {:container_down, container_ref, message}) + {:noreply, %{state | evaluators: Map.delete(state.evaluators, container_ref)}} + + nil -> + {:noreply, state} + end + end + def handle_info(_message, state), do: {:noreply, state} @impl true @@ -201,6 +217,7 @@ defmodule Livebook.Runtime.ErlDist.Manager do state else {:ok, evaluator} = ErlDist.EvaluatorSupervisor.start_evaluator() + Process.monitor(evaluator) %{state | evaluators: Map.put(state.evaluators, container_ref, evaluator)} end end diff --git a/lib/livebook/session.ex b/lib/livebook/session.ex index 7b245ddd0..34e8c1b9f 100644 --- a/lib/livebook/session.ex +++ b/lib/livebook/session.ex @@ -459,6 +459,13 @@ defmodule Livebook.Session do {:noreply, handle_operation(state, operation)} end + def handle_info({:container_down, :main, message}, state) do + broadcast_error(state.session_id, "evaluation process terminated - #{message}") + + operation = {:reflect_evaluation_failure, self()} + {:noreply, handle_operation(state, operation)} + end + def handle_info(:autosave, state) do Process.send_after(self(), :autosave, @autosave_interval) {:noreply, maybe_save_notebook(state)} diff --git a/lib/livebook/session/data.ex b/lib/livebook/session/data.ex index 2883b2d40..a1b68548b 100644 --- a/lib/livebook/session/data.ex +++ b/lib/livebook/session/data.ex @@ -58,7 +58,7 @@ defmodule Livebook.Session.Data do @type cell_revision :: non_neg_integer() - @type cell_validity_status :: :fresh | :evaluated | :stale + @type cell_validity_status :: :fresh | :evaluated | :stale | :aborted @type cell_evaluation_status :: :ready | :queued | :evaluating @type index :: non_neg_integer() @@ -80,6 +80,7 @@ defmodule Livebook.Session.Data do | {:queue_cell_evaluation, pid(), Cell.id()} | {:add_cell_evaluation_stdout, pid(), Cell.id(), String.t()} | {:add_cell_evaluation_response, pid(), Cell.id(), Evaluator.evaluation_response()} + | {:reflect_evaluation_failure, pid()} | {:cancel_cell_evaluation, pid(), Cell.id()} | {:set_notebook_name, pid(), String.t()} | {:set_section_name, pid(), Section.id(), String.t()} @@ -270,6 +271,13 @@ defmodule Livebook.Session.Data do end end + def apply_operation(data, {:reflect_evaluation_failure, _client_pid}) do + data + |> with_actions() + |> clear_evaluation() + |> wrap_ok() + end + def apply_operation(data, {:cancel_cell_evaluation, _client_pid, id}) do with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id) do case data.cell_infos[cell.id].evaluation_status do @@ -577,11 +585,19 @@ defmodule Livebook.Session.Data do |> set_section_info!(section.id, evaluating_cell_id: nil, evaluation_queue: []) |> reduce( section.cells, - &set_cell_info!(&1, &2.id, - validity_status: :fresh, - evaluation_status: :ready, - evaluation_digest: nil - ) + &update_cell_info!(&1, &2.id, fn info -> + %{ + info + | validity_status: + if info.validity_status == :fresh and info.evaluation_status != :evaluating do + :fresh + else + :aborted + end, + evaluation_status: :ready, + evaluation_digest: nil + } + end) ) end diff --git a/lib/livebook_web/live/cell_component.ex b/lib/livebook_web/live/cell_component.ex index bcaa30222..d012132e8 100644 --- a/lib/livebook_web/live/cell_component.ex +++ b/lib/livebook_web/live/cell_component.ex @@ -269,6 +269,8 @@ defmodule LivebookWeb.CellComponent do |> String.split("\n") end + defp render_cell_status(validity_status, evaluation_status, changed) + defp render_cell_status(_, :evaluating, changed) do render_status_indicator("Evaluating", "bg-blue-500", "bg-blue-400", changed) end @@ -285,6 +287,10 @@ defmodule LivebookWeb.CellComponent do render_status_indicator("Stale", "bg-yellow-200", nil, changed) end + defp render_cell_status(:aborted, _, _) do + render_status_indicator("Aborted", "bg-red-400", nil, false) + end + defp render_cell_status(_, _, _), do: nil defp render_status_indicator(text, circle_class, animated_circle_class, show_changed) do @@ -302,7 +308,9 @@ defmodule LivebookWeb.CellComponent do ">* - + <%= if @animated_circle_class do %> + + <% end %> diff --git a/lib/livebook_web/live/session_live.ex b/lib/livebook_web/live/session_live.ex index c9f6aa64f..b8c10d449 100644 --- a/lib/livebook_web/live/session_live.ex +++ b/lib/livebook_web/live/session_live.ex @@ -359,13 +359,13 @@ defmodule LivebookWeb.SessionLive do end def handle_info({:error, error}, socket) do - message = error |> to_string() |> String.capitalize() + message = error |> to_string() |> upcase_first() {:noreply, put_flash(socket, :error, message)} end def handle_info({:info, info}, socket) do - message = info |> to_string() |> String.capitalize() + message = info |> to_string() |> upcase_first() {:noreply, put_flash(socket, :info, message)} end @@ -450,6 +450,11 @@ defmodule LivebookWeb.SessionLive do end end + def upcase_first(string) do + {head, tail} = String.split_at(string, 1) + String.upcase(head) <> tail + end + defp insert_cell_next_to(socket, cell_id, type, idx_offset: idx_offset) do {:ok, cell, section} = Notebook.fetch_cell_and_section(socket.private.data.notebook, cell_id) index = Enum.find_index(section.cells, &(&1 == cell)) diff --git a/test/livebook/evaluator_test.exs b/test/livebook/evaluator_test.exs index f3188f706..9a9ee26d4 100644 --- a/test/livebook/evaluator_test.exs +++ b/test/livebook/evaluator_test.exs @@ -69,7 +69,8 @@ defmodule Livebook.EvaluatorTest do Evaluator.evaluate_code(evaluator, self(), code, :code_1) assert_receive {:evaluation_response, :code_1, - {:error, :error, %FunctionClauseError{}, [{List, :first, 1, _location}]}} + {:error, :error, %FunctionClauseError{}, + [{List, :first, _arity, _location}]}} end test "in case of an error returns only the relevant part of stacktrace", %{ diff --git a/test/livebook/runtime/erl_dist/manager_test.exs b/test/livebook/runtime/erl_dist/manager_test.exs index 0f101d051..638ea1295 100644 --- a/test/livebook/runtime/erl_dist/manager_test.exs +++ b/test/livebook/runtime/erl_dist/manager_test.exs @@ -68,4 +68,21 @@ defmodule Livebook.Runtime.ErlDist.ManagerTest do Manager.stop(node()) end end + + @tag capture_log: true + test "notifies the owner when an evaluator goes down" do + Manager.start() + Manager.set_owner(node(), self()) + + code = """ + spawn_link(fn -> raise "sad cat" end) + """ + + Manager.evaluate_code(node(), code, :container1, :evaluation1) + + assert_receive {:container_down, :container1, message} + assert message =~ "sad cat" + + Manager.stop(node()) + end end diff --git a/test/livebook/session/data_test.exs b/test/livebook/session/data_test.exs index c93edf9f6..e07fb689e 100644 --- a/test/livebook/session/data_test.exs +++ b/test/livebook/session/data_test.exs @@ -790,6 +790,36 @@ defmodule Livebook.Session.DataTest do end end + describe "apply_operation/2 given :reflect_evaluation_failure" do + test "clears evaluation queue and marks evaluated and evaluating cells as aborted" do + data = + data_after_operations!([ + {:insert_section, self(), 0, "s1"}, + {:insert_cell, self(), "s1", 0, :elixir, "c1"}, + {:insert_cell, self(), "s1", 1, :elixir, "c2"}, + {:insert_cell, self(), "s1", 2, :elixir, "c3"}, + {:queue_cell_evaluation, self(), "c1"}, + {:queue_cell_evaluation, self(), "c2"}, + {:queue_cell_evaluation, self(), "c3"}, + {:add_cell_evaluation_response, self(), "c1", {:ok, [1, 2, 3]}} + ]) + + operation = {:reflect_evaluation_failure, self()} + + assert {:ok, + %{ + cell_infos: %{ + "c1" => %{validity_status: :aborted, evaluation_status: :ready}, + "c2" => %{validity_status: :aborted, evaluation_status: :ready}, + "c3" => %{validity_status: :fresh, evaluation_status: :ready} + }, + section_infos: %{ + "s1" => %{evaluating_cell_id: nil, evaluation_queue: []} + } + }, _actions} = Data.apply_operation(data, operation) + end + end + describe "apply_operation/2 given :cancel_cell_evaluation" do test "returns an error given invalid cell id" do data = Data.new() @@ -829,8 +859,8 @@ defmodule Livebook.Session.DataTest do assert {:ok, %{ cell_infos: %{ - "c1" => %{validity_status: :fresh, evaluation_status: :ready}, - "c2" => %{validity_status: :fresh, evaluation_status: :ready}, + "c1" => %{validity_status: :aborted, evaluation_status: :ready}, + "c2" => %{validity_status: :aborted, evaluation_status: :ready}, "c3" => %{validity_status: :fresh, evaluation_status: :ready} }, section_infos: %{ @@ -1317,7 +1347,7 @@ defmodule Livebook.Session.DataTest do assert {:ok, %{ cell_infos: %{ - "c1" => %{validity_status: :fresh, evaluation_status: :ready}, + "c1" => %{validity_status: :aborted, evaluation_status: :ready}, "c2" => %{validity_status: :fresh, evaluation_status: :ready}, "c3" => %{validity_status: :fresh, evaluation_status: :ready}, "c4" => %{validity_status: :fresh, evaluation_status: :ready}