Improve notebook file locking to work across nodes (#675)

* Improve notebook file locking to work across nodes

* Add node check for local file system opreations

* Replace node with host id

* Refactor process down cleanup

* Scope local file system with node

* local? -> type
This commit is contained in:
Jonatan Kłosko 2021-11-05 00:29:04 +01:00 committed by GitHub
parent 5e5bc2597a
commit 982a345ddc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 279 additions and 152 deletions

View file

@ -28,6 +28,26 @@ defprotocol Livebook.FileSystem do
@type access :: :read | :write | :read_write | :none
@doc """
Returns a term uniquely identifying the resource used as a file
system.
"""
@spec resource_identifier(t()) :: term()
def resource_identifier(file_system)
@doc """
Returns the file system type.
Based on the underlying resource, the type can be either:
* `:local` - if the resource is local to its node
* `:global` - if the resource is external and available
accessible from any node
"""
@spec type(t()) :: :local | :global
def type(file_system)
@doc """
Returns the default directory path.

View file

@ -52,6 +52,23 @@ defmodule Livebook.FileSystem.File do
new(FileSystem.Local.new(), path)
end
@doc """
Returns a term uniquely identifying the file together
with its file system.
"""
@spec resource_identifier(t()) :: term()
def resource_identifier(file) do
{FileSystem.resource_identifier(file.file_system), file.path}
end
@doc """
Checks if the given file is within a file system local to its node.
"""
@spec local?(t()) :: term()
def local?(file) do
FileSystem.type(file.file_system) == :local
end
@doc """
Returns a new file resulting from resolving `subject`
against `file`.

View file

@ -3,11 +3,16 @@ defmodule Livebook.FileSystem.Local do
# File system backed by local disk.
defstruct [:default_path]
defstruct [:origin_pid, :default_path]
alias Livebook.FileSystem
@type t :: %__MODULE__{
# We cannot just store the node, because when the struct is
# built, we may not yet be in distributed mode. Instead, we
# keep the pid of whatever process created this file system
# and we call node/1 on it whenever needed
origin_pid: pid(),
default_path: FileSystem.path()
}
@ -28,13 +33,21 @@ defmodule Livebook.FileSystem.Local do
FileSystem.Utils.assert_dir_path!(default_path)
%__MODULE__{default_path: default_path}
%__MODULE__{origin_pid: self(), default_path: default_path}
end
end
defimpl Livebook.FileSystem, for: Livebook.FileSystem.Local do
alias Livebook.FileSystem
def resource_identifier(file_system) do
{:local_file_system, node(file_system.origin_pid)}
end
def type(_file_system) do
:local
end
def default_path(file_system) do
file_system.default_path
end
@ -42,148 +55,178 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.Local do
def list(file_system, path, recursive) do
FileSystem.Utils.assert_dir_path!(path)
case File.ls(path) do
{:ok, filenames} ->
paths =
Enum.map(filenames, fn name ->
path = Path.join(path, name)
if File.dir?(path), do: path <> "/", else: path
with :ok <- ensure_local(file_system) do
case File.ls(path) do
{:ok, filenames} ->
paths =
Enum.map(filenames, fn name ->
path = Path.join(path, name)
if File.dir?(path), do: path <> "/", else: path
end)
to_traverse =
if recursive do
Enum.filter(paths, &FileSystem.Utils.dir_path?/1)
else
[]
end
Enum.reduce(to_traverse, {:ok, paths}, fn path, result ->
with {:ok, current_paths} <- result,
{:ok, new_paths} <- list(file_system, path, recursive) do
{:ok, current_paths ++ new_paths}
end
end)
to_traverse =
if recursive do
Enum.filter(paths, &FileSystem.Utils.dir_path?/1)
else
[]
end
Enum.reduce(to_traverse, {:ok, paths}, fn path, result ->
with {:ok, current_paths} <- result,
{:ok, new_paths} <- list(file_system, path, recursive) do
{:ok, current_paths ++ new_paths}
end
end)
{:error, error} ->
FileSystem.Utils.posix_error(error)
end
end
def read(_file_system, path) do
FileSystem.Utils.assert_regular_path!(path)
case File.read(path) do
{:ok, binary} -> {:ok, binary}
{:error, error} -> FileSystem.Utils.posix_error(error)
end
end
def write(_file_system, path, content) do
FileSystem.Utils.assert_regular_path!(path)
dir = Path.dirname(path)
with :ok <- File.mkdir_p(dir),
:ok <- File.write(path, content) do
:ok
else
{:error, error} -> FileSystem.Utils.posix_error(error)
end
end
def access(_file_system, path) do
case File.stat(path) do
{:ok, stat} -> {:ok, stat.access}
{:error, error} -> FileSystem.Utils.posix_error(error)
end
end
def create_dir(_file_system, path) do
FileSystem.Utils.assert_dir_path!(path)
case File.mkdir_p(path) do
:ok -> :ok
{:error, error} -> FileSystem.Utils.posix_error(error)
end
end
def remove(_file_system, path) do
case File.rm_rf(path) do
{:ok, _paths} -> :ok
{:error, error, _paths} -> FileSystem.Utils.posix_error(error)
end
end
def copy(_file_system, source_path, destination_path) do
FileSystem.Utils.assert_same_type!(source_path, destination_path)
containing_dir = Path.dirname(destination_path)
case File.mkdir_p(containing_dir) do
:ok ->
case File.cp_r(source_path, destination_path) do
{:ok, _paths} -> :ok
{:error, error, _path} -> FileSystem.Utils.posix_error(error)
end
{:error, error} ->
FileSystem.Utils.posix_error(error)
end
end
def rename(_file_system, source_path, destination_path) do
FileSystem.Utils.assert_same_type!(source_path, destination_path)
if File.exists?(destination_path) do
FileSystem.Utils.posix_error(:eexist)
else
containing_dir = Path.dirname(destination_path)
with :ok <- File.mkdir_p(containing_dir),
:ok <- File.rename(source_path, destination_path) do
:ok
else
{:error, error} ->
FileSystem.Utils.posix_error(error)
end
end
end
def etag_for(_file_system, path) do
case File.stat(path) do
{:ok, stat} ->
%{size: size, mtime: mtime} = stat
hash = {size, mtime} |> :erlang.phash2() |> Integer.to_string(16)
etag = <<?", hash::binary, ?">>
{:ok, etag}
def read(file_system, path) do
FileSystem.Utils.assert_regular_path!(path)
{:error, error} ->
FileSystem.Utils.posix_error(error)
end
end
def exists?(_file_system, path) do
if FileSystem.Utils.dir_path?(path) do
{:ok, File.dir?(path)}
else
{:ok, File.exists?(path)}
end
end
def resolve_path(_file_system, dir_path, subject) do
FileSystem.Utils.assert_dir_path!(dir_path)
if subject == "" do
dir_path
else
dir? = FileSystem.Utils.dir_path?(subject) or Path.basename(subject) in [".", ".."]
expanded_path = Path.expand(subject, dir_path)
if dir? do
FileSystem.Utils.ensure_dir_path(expanded_path)
else
expanded_path
with :ok <- ensure_local(file_system) do
case File.read(path) do
{:ok, binary} -> {:ok, binary}
{:error, error} -> FileSystem.Utils.posix_error(error)
end
end
end
def write(file_system, path, content) do
FileSystem.Utils.assert_regular_path!(path)
dir = Path.dirname(path)
with :ok <- ensure_local(file_system) do
with :ok <- File.mkdir_p(dir),
:ok <- File.write(path, content) do
:ok
else
{:error, error} -> FileSystem.Utils.posix_error(error)
end
end
end
def access(file_system, path) do
with :ok <- ensure_local(file_system) do
case File.stat(path) do
{:ok, stat} -> {:ok, stat.access}
{:error, error} -> FileSystem.Utils.posix_error(error)
end
end
end
def create_dir(file_system, path) do
FileSystem.Utils.assert_dir_path!(path)
with :ok <- ensure_local(file_system) do
case File.mkdir_p(path) do
:ok -> :ok
{:error, error} -> FileSystem.Utils.posix_error(error)
end
end
end
def remove(file_system, path) do
with :ok <- ensure_local(file_system) do
case File.rm_rf(path) do
{:ok, _paths} -> :ok
{:error, error, _paths} -> FileSystem.Utils.posix_error(error)
end
end
end
def copy(file_system, source_path, destination_path) do
FileSystem.Utils.assert_same_type!(source_path, destination_path)
containing_dir = Path.dirname(destination_path)
with :ok <- ensure_local(file_system) do
case File.mkdir_p(containing_dir) do
:ok ->
case File.cp_r(source_path, destination_path) do
{:ok, _paths} -> :ok
{:error, error, _path} -> FileSystem.Utils.posix_error(error)
end
{:error, error} ->
FileSystem.Utils.posix_error(error)
end
end
end
def rename(file_system, source_path, destination_path) do
FileSystem.Utils.assert_same_type!(source_path, destination_path)
with :ok <- ensure_local(file_system) do
if File.exists?(destination_path) do
FileSystem.Utils.posix_error(:eexist)
else
containing_dir = Path.dirname(destination_path)
with :ok <- File.mkdir_p(containing_dir),
:ok <- File.rename(source_path, destination_path) do
:ok
else
{:error, error} ->
FileSystem.Utils.posix_error(error)
end
end
end
end
def etag_for(file_system, path) do
with :ok <- ensure_local(file_system) do
case File.stat(path) do
{:ok, stat} ->
%{size: size, mtime: mtime} = stat
hash = {size, mtime} |> :erlang.phash2() |> Integer.to_string(16)
etag = <<?", hash::binary, ?">>
{:ok, etag}
{:error, error} ->
FileSystem.Utils.posix_error(error)
end
end
end
def exists?(file_system, path) do
with :ok <- ensure_local(file_system) do
if FileSystem.Utils.dir_path?(path) do
{:ok, File.dir?(path)}
else
{:ok, File.exists?(path)}
end
end
end
def resolve_path(file_system, dir_path, subject) do
FileSystem.Utils.assert_dir_path!(dir_path)
with :ok <- ensure_local(file_system) do
if subject == "" do
dir_path
else
dir? = FileSystem.Utils.dir_path?(subject) or Path.basename(subject) in [".", ".."]
expanded_path = Path.expand(subject, dir_path)
if dir? do
FileSystem.Utils.ensure_dir_path(expanded_path)
else
expanded_path
end
end
end
end
defp ensure_local(file_system) do
if node(file_system.origin_pid) == node() do
:ok
else
{:error, "this local file system belongs to a different host"}
end
end
end

View file

@ -59,6 +59,14 @@ defimpl Livebook.FileSystem, for: Livebook.FileSystem.S3 do
alias Livebook.Utils.HTTP
alias Livebook.FileSystem.S3.XML
def resource_identifier(file_system) do
{:s3, file_system.bucket_url}
end
def type(_file_system) do
:global
end
def default_path(_file_system) do
"/"
end

View file

@ -12,7 +12,7 @@ defmodule Livebook.Session.FileGuard do
alias Livebook.FileSystem
@type state :: %{
file_with_owner_ref: %{FileSystem.File.t() => reference()}
files: %{term() => {FileSystem.File.t(), owner_pid :: pid(), reference()}}
}
@name __MODULE__
@ -47,32 +47,52 @@ defmodule Livebook.Session.FileGuard do
@impl true
def init(_opts) do
{:ok, %{file_with_owner_ref: %{}}}
{:ok, %{files: %{}}}
end
@impl true
def handle_call({:lock, file, owner_pid}, _from, state) do
if Map.has_key?(state.file_with_owner_ref, file) do
file_id = FileSystem.File.resource_identifier(file)
if Map.has_key?(state.files, file_id) or lock_globally(file, file_id, owner_pid) == false do
{:reply, {:error, :already_in_use}, state}
else
monitor_ref = Process.monitor(owner_pid)
state = put_in(state.file_with_owner_ref[file], monitor_ref)
state = put_in(state.files[file_id], {file, owner_pid, monitor_ref})
{:reply, :ok, state}
end
end
@impl true
def handle_cast({:unlock, file}, state) do
{maybe_ref, state} = pop_in(state.file_with_owner_ref[file])
maybe_ref && Process.demonitor(maybe_ref, [:flush])
file_id = FileSystem.File.resource_identifier(file)
{maybe_file, state} = pop_in(state.files[file_id])
with {file, owner_pid, monitor_ref} <- maybe_file do
unlock_globally(file, file_id, owner_pid)
Process.demonitor(monitor_ref, [:flush])
end
{:noreply, state}
end
@impl true
def handle_info({:DOWN, ref, :process, _, _}, state) do
{file, ^ref} = Enum.find(state.file_with_owner_ref, &(elem(&1, 1) == ref))
{_, state} = pop_in(state.file_with_owner_ref[file])
{file_id, {file, owner_pid, ^ref}} =
Enum.find(state.files, &match?({_file_id, {_file, _owner_pid, ^ref}}, &1))
unlock_globally(file, file_id, owner_pid)
{_, state} = pop_in(state.files[file_id])
{:noreply, state}
end
defp lock_globally(file, file_id, owner_pid) do
FileSystem.File.local?(file) or :global.set_lock({file_id, owner_pid})
end
defp unlock_globally(file, file_id, owner_pid) do
FileSystem.File.local?(file) or :global.del_lock({file_id, owner_pid})
end
end

View file

@ -2,21 +2,40 @@ defmodule Livebook.Session.FileGuardTest do
use ExUnit.Case, async: false
alias Livebook.Session.FileGuard
alias Livebook.FileSystem
test "lock/2 returns an error if the given path is already locked" do
assert :ok = FileGuard.lock("/some/path", self())
assert {:error, :already_in_use} = FileGuard.lock("/some/path", self())
test "lock/2 returns an error if the given file is already locked" do
file = FileSystem.File.local("/some/path")
assert :ok = FileGuard.lock(file, self())
assert {:error, :already_in_use} = FileGuard.lock(file, self())
end
test "unlock/1 unlocks the given path" do
assert :ok = FileGuard.lock("/some/path", self())
:ok = FileGuard.unlock("/some/path")
assert :ok = FileGuard.lock("/some/path", self())
test "lock/2 is agnostic to irrelevant file system configuration" do
fs1 = FileSystem.Local.new(default_path: "/path/1/")
fs2 = FileSystem.Local.new(default_path: "/path/2/")
# The file system has different configuration, but it's the same resource
file1 = FileSystem.File.new(fs1, "/some/path")
file2 = FileSystem.File.new(fs2, "/some/path")
assert :ok = FileGuard.lock(file1, self())
assert {:error, :already_in_use} = FileGuard.lock(file2, self())
end
test "path is automatically unloacked when the owner process termiantes" do
test "unlock/1 unlocks the given file" do
file = FileSystem.File.local("/some/path")
assert :ok = FileGuard.lock(file, self())
:ok = FileGuard.unlock(file)
assert :ok = FileGuard.lock(file, self())
end
test "file is automatically unloacked when the owner process termiantes" do
file = FileSystem.File.local("/some/path")
owner = spawn(fn -> :ok end)
:ok = FileGuard.lock("/some/path", owner)
assert :ok = FileGuard.lock("/some/path", self())
:ok = FileGuard.lock(file, owner)
assert :ok = FileGuard.lock(file, self())
end
end