Await previous machine to release volume when reconnecting fly runtime (#2737)

This commit is contained in:
Jonatan Kłosko 2024-08-09 13:54:05 +02:00
parent 1f1e05eacf
commit 29ca926f39
2 changed files with 44 additions and 5 deletions

View file

@ -210,6 +210,24 @@ defmodule Livebook.FlyAPI do
end
end
@doc """
Waits for the machine to be destroyed.
"""
@spec await_machine_destroyed(String.t(), String.t(), String.t(), pos_integer()) ::
:ok | {:error, error}
def await_machine_destroyed(token, app_name, machine_id, timeout_s) when timeout_s <= 60 do
# Contrarily to the above, if we expect the machine to be destroying,
# it should take a short time, so we don't retry requests and expect
# a rather short timeout
with {:ok, _data} <-
flaps_request(token, "/v1/apps/#{app_name}/machines/#{machine_id}/wait",
params: %{state: "destroyed", timeout: timeout_s},
retry: false
) do
:ok
end
end
defp flaps_request(token, path, opts \\ []) do
opts =
[base_url: @flaps_url, url: path, auth: {:bearer, token}]

View file

@ -47,7 +47,7 @@ defmodule Livebook.Runtime.Fly do
# configuration `inet_dist_listen_options` -> `ipv6_v6only`, which
# has OS-specific value. However, we don't rely on this here.
defstruct [:config, :node, :server_pid]
defstruct [:config, :node, :server_pid, :machine_id, :previous_machine_id]
use GenServer, restart: :temporary
@ -56,7 +56,9 @@ defmodule Livebook.Runtime.Fly do
@type t :: %__MODULE__{
config: config(),
node: node() | nil,
server_pid: pid() | nil
server_pid: pid() | nil,
machine_id: String.t() | nil,
previous_machine_id: String.t() | nil
}
@type config :: %{
@ -114,7 +116,15 @@ defmodule Livebook.Runtime.Fly do
|> :erlang.term_to_binary()
|> Base.encode64()
with {:ok, machine_id, machine_ip} <-
with :ok <-
(if config.volume_id && runtime.previous_machine_id do
with_log(caller, "await resources", fn ->
await_previous_machine_destroyed(config, runtime.previous_machine_id)
end)
else
:ok
end),
{:ok, machine_id, machine_ip} <-
with_log(caller, "create machine", fn ->
create_machine(config, runtime_data)
end),
@ -141,7 +151,7 @@ defmodule Livebook.Runtime.Fly do
send(primary_pid, :node_initialized)
runtime = %{runtime | node: child_node, server_pid: server_pid}
runtime = %{runtime | node: child_node, server_pid: server_pid, machine_id: machine_id}
send(caller, {:runtime_connect_done, self(), {:ok, runtime}})
{:noreply, %{state | primary_ref: primary_ref, proxy_port: proxy_port}}
@ -209,6 +219,14 @@ defmodule Livebook.Runtime.Fly do
end
end
defp await_previous_machine_destroyed(config, machine_id) do
# We wait only to ensure the volume is detached. If waiting fails,
# we ignore the error and try to create the machine anyway, if the
# volume is attached, creation will fail
_ = Livebook.FlyAPI.await_machine_destroyed(config.token, config.app_name, machine_id, 5)
:ok
end
defp await_machine_started(config, machine_id) do
case Livebook.FlyAPI.await_machine_started(config.token, config.app_name, machine_id) do
:ok ->
@ -407,7 +425,10 @@ defimpl Livebook.Runtime, for: Livebook.Runtime.Fly do
end
def duplicate(runtime) do
Livebook.Runtime.Fly.new(runtime.config)
%Livebook.Runtime.Fly{
config: runtime.config,
previous_machine_id: runtime.machine_id
}
end
def evaluate_code(runtime, language, code, locator, parent_locators, opts \\ []) do