Handle evaluator process failures (#121)

* Handle evaluator process failures

* Add test

* Use Exception.format_exit/1
This commit is contained in:
Jonatan Kłosko 2021-03-29 19:52:06 +02:00 committed by GitHub
parent e335ecb791
commit a1bf2d999e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 117 additions and 13 deletions

View file

@ -42,6 +42,9 @@ defprotocol Livebook.Runtime do
* `{:evaluation_stdout, ref, string}` - output captured during evaluation * `{:evaluation_stdout, ref, string}` - output captured during evaluation
* `{:evaluation_response, ref, response}` - final result of the evaluation * `{:evaluation_response, ref, response}` - final result of the evaluation
If the evaluation state within a container is lost (e.g. a process goes down),
the runtime can send `{:container_down, container_ref, message}` to notify the owner.
## Options ## Options
* `:file` - file to which the evaluated code belongs. Most importantly, * `:file` - file to which the evaluated code belongs. Most importantly,

View file

@ -156,6 +156,22 @@ defmodule Livebook.Runtime.ErlDist.Manager do
{:stop, :normal, state} {:stop, :normal, state}
end end
def handle_info({:DOWN, _, :process, pid, reason}, state) do
state.evaluators
|> Enum.find(fn {_container_ref, evaluator_pid} ->
evaluator_pid == pid
end)
|> case do
{container_ref, _} ->
message = Exception.format_exit(reason)
send(state.owner, {:container_down, container_ref, message})
{:noreply, %{state | evaluators: Map.delete(state.evaluators, container_ref)}}
nil ->
{:noreply, state}
end
end
def handle_info(_message, state), do: {:noreply, state} def handle_info(_message, state), do: {:noreply, state}
@impl true @impl true
@ -201,6 +217,7 @@ defmodule Livebook.Runtime.ErlDist.Manager do
state state
else else
{:ok, evaluator} = ErlDist.EvaluatorSupervisor.start_evaluator() {:ok, evaluator} = ErlDist.EvaluatorSupervisor.start_evaluator()
Process.monitor(evaluator)
%{state | evaluators: Map.put(state.evaluators, container_ref, evaluator)} %{state | evaluators: Map.put(state.evaluators, container_ref, evaluator)}
end end
end end

View file

@ -459,6 +459,13 @@ defmodule Livebook.Session do
{:noreply, handle_operation(state, operation)} {:noreply, handle_operation(state, operation)}
end end
def handle_info({:container_down, :main, message}, state) do
broadcast_error(state.session_id, "evaluation process terminated - #{message}")
operation = {:reflect_evaluation_failure, self()}
{:noreply, handle_operation(state, operation)}
end
def handle_info(:autosave, state) do def handle_info(:autosave, state) do
Process.send_after(self(), :autosave, @autosave_interval) Process.send_after(self(), :autosave, @autosave_interval)
{:noreply, maybe_save_notebook(state)} {:noreply, maybe_save_notebook(state)}

View file

@ -58,7 +58,7 @@ defmodule Livebook.Session.Data do
@type cell_revision :: non_neg_integer() @type cell_revision :: non_neg_integer()
@type cell_validity_status :: :fresh | :evaluated | :stale @type cell_validity_status :: :fresh | :evaluated | :stale | :aborted
@type cell_evaluation_status :: :ready | :queued | :evaluating @type cell_evaluation_status :: :ready | :queued | :evaluating
@type index :: non_neg_integer() @type index :: non_neg_integer()
@ -80,6 +80,7 @@ defmodule Livebook.Session.Data do
| {:queue_cell_evaluation, pid(), Cell.id()} | {:queue_cell_evaluation, pid(), Cell.id()}
| {:add_cell_evaluation_stdout, pid(), Cell.id(), String.t()} | {:add_cell_evaluation_stdout, pid(), Cell.id(), String.t()}
| {:add_cell_evaluation_response, pid(), Cell.id(), Evaluator.evaluation_response()} | {:add_cell_evaluation_response, pid(), Cell.id(), Evaluator.evaluation_response()}
| {:reflect_evaluation_failure, pid()}
| {:cancel_cell_evaluation, pid(), Cell.id()} | {:cancel_cell_evaluation, pid(), Cell.id()}
| {:set_notebook_name, pid(), String.t()} | {:set_notebook_name, pid(), String.t()}
| {:set_section_name, pid(), Section.id(), String.t()} | {:set_section_name, pid(), Section.id(), String.t()}
@ -270,6 +271,13 @@ defmodule Livebook.Session.Data do
end end
end end
def apply_operation(data, {:reflect_evaluation_failure, _client_pid}) do
data
|> with_actions()
|> clear_evaluation()
|> wrap_ok()
end
def apply_operation(data, {:cancel_cell_evaluation, _client_pid, id}) do def apply_operation(data, {:cancel_cell_evaluation, _client_pid, id}) do
with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id) do with {:ok, cell, section} <- Notebook.fetch_cell_and_section(data.notebook, id) do
case data.cell_infos[cell.id].evaluation_status do case data.cell_infos[cell.id].evaluation_status do
@ -577,11 +585,19 @@ defmodule Livebook.Session.Data do
|> set_section_info!(section.id, evaluating_cell_id: nil, evaluation_queue: []) |> set_section_info!(section.id, evaluating_cell_id: nil, evaluation_queue: [])
|> reduce( |> reduce(
section.cells, section.cells,
&set_cell_info!(&1, &2.id, &update_cell_info!(&1, &2.id, fn info ->
validity_status: :fresh, %{
evaluation_status: :ready, info
evaluation_digest: nil | validity_status:
) if info.validity_status == :fresh and info.evaluation_status != :evaluating do
:fresh
else
:aborted
end,
evaluation_status: :ready,
evaluation_digest: nil
}
end)
) )
end end

View file

@ -269,6 +269,8 @@ defmodule LivebookWeb.CellComponent do
|> String.split("\n") |> String.split("\n")
end end
defp render_cell_status(validity_status, evaluation_status, changed)
defp render_cell_status(_, :evaluating, changed) do defp render_cell_status(_, :evaluating, changed) do
render_status_indicator("Evaluating", "bg-blue-500", "bg-blue-400", changed) render_status_indicator("Evaluating", "bg-blue-500", "bg-blue-400", changed)
end end
@ -285,6 +287,10 @@ defmodule LivebookWeb.CellComponent do
render_status_indicator("Stale", "bg-yellow-200", nil, changed) render_status_indicator("Stale", "bg-yellow-200", nil, changed)
end end
defp render_cell_status(:aborted, _, _) do
render_status_indicator("Aborted", "bg-red-400", nil, false)
end
defp render_cell_status(_, _, _), do: nil defp render_cell_status(_, _, _), do: nil
defp render_status_indicator(text, circle_class, animated_circle_class, show_changed) do defp render_status_indicator(text, circle_class, animated_circle_class, show_changed) do
@ -302,7 +308,9 @@ defmodule LivebookWeb.CellComponent do
<span class="<%= unless(@show_changed, do: "invisible") %>">*</span> <span class="<%= unless(@show_changed, do: "invisible") %>">*</span>
</div> </div>
<span class="flex relative h-3 w-3"> <span class="flex relative h-3 w-3">
<span class="animate-ping absolute inline-flex h-3 w-3 rounded-full <%= @animated_circle_class %> opacity-75"></span> <%= if @animated_circle_class do %>
<span class="animate-ping absolute inline-flex h-3 w-3 rounded-full <%= @animated_circle_class %> opacity-75"></span>
<% end %>
<span class="relative inline-flex rounded-full h-3 w-3 <%= @circle_class %>"></span> <span class="relative inline-flex rounded-full h-3 w-3 <%= @circle_class %>"></span>
</span> </span>
</div> </div>

View file

@ -359,13 +359,13 @@ defmodule LivebookWeb.SessionLive do
end end
def handle_info({:error, error}, socket) do def handle_info({:error, error}, socket) do
message = error |> to_string() |> String.capitalize() message = error |> to_string() |> upcase_first()
{:noreply, put_flash(socket, :error, message)} {:noreply, put_flash(socket, :error, message)}
end end
def handle_info({:info, info}, socket) do def handle_info({:info, info}, socket) do
message = info |> to_string() |> String.capitalize() message = info |> to_string() |> upcase_first()
{:noreply, put_flash(socket, :info, message)} {:noreply, put_flash(socket, :info, message)}
end end
@ -450,6 +450,11 @@ defmodule LivebookWeb.SessionLive do
end end
end end
def upcase_first(string) do
{head, tail} = String.split_at(string, 1)
String.upcase(head) <> tail
end
defp insert_cell_next_to(socket, cell_id, type, idx_offset: idx_offset) do defp insert_cell_next_to(socket, cell_id, type, idx_offset: idx_offset) do
{:ok, cell, section} = Notebook.fetch_cell_and_section(socket.private.data.notebook, cell_id) {:ok, cell, section} = Notebook.fetch_cell_and_section(socket.private.data.notebook, cell_id)
index = Enum.find_index(section.cells, &(&1 == cell)) index = Enum.find_index(section.cells, &(&1 == cell))

View file

@ -69,7 +69,8 @@ defmodule Livebook.EvaluatorTest do
Evaluator.evaluate_code(evaluator, self(), code, :code_1) Evaluator.evaluate_code(evaluator, self(), code, :code_1)
assert_receive {:evaluation_response, :code_1, assert_receive {:evaluation_response, :code_1,
{:error, :error, %FunctionClauseError{}, [{List, :first, 1, _location}]}} {:error, :error, %FunctionClauseError{},
[{List, :first, _arity, _location}]}}
end end
test "in case of an error returns only the relevant part of stacktrace", %{ test "in case of an error returns only the relevant part of stacktrace", %{

View file

@ -68,4 +68,21 @@ defmodule Livebook.Runtime.ErlDist.ManagerTest do
Manager.stop(node()) Manager.stop(node())
end end
end end
@tag capture_log: true
test "notifies the owner when an evaluator goes down" do
Manager.start()
Manager.set_owner(node(), self())
code = """
spawn_link(fn -> raise "sad cat" end)
"""
Manager.evaluate_code(node(), code, :container1, :evaluation1)
assert_receive {:container_down, :container1, message}
assert message =~ "sad cat"
Manager.stop(node())
end
end end

View file

@ -790,6 +790,36 @@ defmodule Livebook.Session.DataTest do
end end
end end
describe "apply_operation/2 given :reflect_evaluation_failure" do
test "clears evaluation queue and marks evaluated and evaluating cells as aborted" do
data =
data_after_operations!([
{:insert_section, self(), 0, "s1"},
{:insert_cell, self(), "s1", 0, :elixir, "c1"},
{:insert_cell, self(), "s1", 1, :elixir, "c2"},
{:insert_cell, self(), "s1", 2, :elixir, "c3"},
{:queue_cell_evaluation, self(), "c1"},
{:queue_cell_evaluation, self(), "c2"},
{:queue_cell_evaluation, self(), "c3"},
{:add_cell_evaluation_response, self(), "c1", {:ok, [1, 2, 3]}}
])
operation = {:reflect_evaluation_failure, self()}
assert {:ok,
%{
cell_infos: %{
"c1" => %{validity_status: :aborted, evaluation_status: :ready},
"c2" => %{validity_status: :aborted, evaluation_status: :ready},
"c3" => %{validity_status: :fresh, evaluation_status: :ready}
},
section_infos: %{
"s1" => %{evaluating_cell_id: nil, evaluation_queue: []}
}
}, _actions} = Data.apply_operation(data, operation)
end
end
describe "apply_operation/2 given :cancel_cell_evaluation" do describe "apply_operation/2 given :cancel_cell_evaluation" do
test "returns an error given invalid cell id" do test "returns an error given invalid cell id" do
data = Data.new() data = Data.new()
@ -829,8 +859,8 @@ defmodule Livebook.Session.DataTest do
assert {:ok, assert {:ok,
%{ %{
cell_infos: %{ cell_infos: %{
"c1" => %{validity_status: :fresh, evaluation_status: :ready}, "c1" => %{validity_status: :aborted, evaluation_status: :ready},
"c2" => %{validity_status: :fresh, evaluation_status: :ready}, "c2" => %{validity_status: :aborted, evaluation_status: :ready},
"c3" => %{validity_status: :fresh, evaluation_status: :ready} "c3" => %{validity_status: :fresh, evaluation_status: :ready}
}, },
section_infos: %{ section_infos: %{
@ -1317,7 +1347,7 @@ defmodule Livebook.Session.DataTest do
assert {:ok, assert {:ok,
%{ %{
cell_infos: %{ cell_infos: %{
"c1" => %{validity_status: :fresh, evaluation_status: :ready}, "c1" => %{validity_status: :aborted, evaluation_status: :ready},
"c2" => %{validity_status: :fresh, evaluation_status: :ready}, "c2" => %{validity_status: :fresh, evaluation_status: :ready},
"c3" => %{validity_status: :fresh, evaluation_status: :ready}, "c3" => %{validity_status: :fresh, evaluation_status: :ready},
"c4" => %{validity_status: :fresh, evaluation_status: :ready} "c4" => %{validity_status: :fresh, evaluation_status: :ready}