Allow native clients to receive events from the server (#1649)

This commit is contained in:
Wojtek Mach 2023-01-19 16:48:51 +01:00 committed by GitHub
parent 30a2412c15
commit 346a08dfb9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 372 additions and 144 deletions

View file

@ -28,11 +28,13 @@ defmodule Demo.Server do
{:ok, pid} = ElixirKit.start()
ref = Process.monitor(pid)
log("init")
ElixirKit.publish("log", "Hello from Elixir!")
Task.start(fn ->
for i <- 5..1//-1 do
log("Stopping in #{i}...")
message = "Stopping in #{i}..."
log(message)
ElixirKit.publish("log", message)
Process.sleep(1000)
end

View file

@ -4,6 +4,20 @@ class Demo
{
ElixirKit.API.Start(name: "demo");
ElixirKit.API.Publish("log", "Hello from C#!");
ElixirKit.API.Subscribe((name, data) =>
{
switch (name)
{
case "log":
Console.WriteLine($"[client] {data}");
break;
default:
throw new Exception($"unknown event {name}");
}
});
ElixirKit.API.WaitForExit();
}
}

View file

@ -4,15 +4,24 @@ import ElixirKit
@main
struct Demo {
public static func main() {
ElixirKit.API.start(name: "demo")
// Capture ctrl+c
signal(SIGINT) { signal in
ElixirKit.API.stop()
exit(signal)
}
ElixirKit.API.start(name: "demo")
ElixirKit.API.publish("log", "Hello from Swift!")
ElixirKit.API.addObserver(queue: .main) { (name, data) in
switch name {
case "log":
print("[client] " + data)
default:
fatalError("unknown event \(name)")
}
}
ElixirKit.API.waitUntilExit()
}
}

View file

@ -10,6 +10,10 @@ using System.Net.Sockets;
namespace ElixirKit;
public delegate void ExitHandler(int ExitCode);
public delegate void EventHandler(string Name, string Data);
public static class API
{
private static Release? release;
@ -32,14 +36,6 @@ public static class API
return mainInstance;
}
static void ensureMainInstance()
{
if (!mainInstance)
{
throw new Exception("Not on main instance");
}
}
public static bool HasExited {
get {
ensureMainInstance();
@ -86,12 +82,25 @@ public static class API
{
if (mainInstance)
{
release!.Publish(name, data);
release!.Send($"{name}:{data}");
}
else
{
var message = Release.EncodeEventMessage(name, data);
PipeWriteLine(message);
PipeWriteLine($"{name}:{data}");
}
}
public static void Subscribe(EventHandler handler)
{
ensureMainInstance();
release!.Subscribe(handler);
}
static void ensureMainInstance()
{
if (!mainInstance)
{
throw new Exception("Not on main instance");
}
}
@ -114,29 +123,25 @@ public static class API
}
}
public delegate void ExitHandler(int ExitCode);
internal class Release
{
Process startProcess;
NetworkStream stream;
TcpListener listener;
TcpClient client;
Process process;
Logger logger;
Listener listener;
Client client;
internal bool HasExited {
get {
return startProcess.HasExited;
return process.HasExited;
}
}
public Release(string name, ExitHandler? exited = null, string? logPath = null)
{
var endpoint = new IPEndPoint(IPAddress.Loopback, 0);
listener = new(endpoint);
listener.Start();
var port = ((IPEndPoint)listener.LocalEndpoint).Port;
logger = new Logger(logPath);
listener = new Listener();
startProcess = new Process()
process = new Process()
{
StartInfo = new ProcessStartInfo()
{
@ -149,85 +154,69 @@ internal class Release
}
};
startProcess.StartInfo.Arguments = "start";
startProcess.StartInfo.EnvironmentVariables.Add("ELIXIRKIT_PORT", $"{port}");
if (logPath != null)
{
logger.Capture(process);
}
process.StartInfo.Arguments = "start";
process.StartInfo.EnvironmentVariables.Add("ELIXIRKIT_PORT", $"{listener.Port}");
if (exited != null)
{
startProcess.EnableRaisingEvents = true;
startProcess.Exited += (sender, args) =>
process.EnableRaisingEvents = true;
process.Exited += (sender, args) =>
{
exited(startProcess.ExitCode);
exited(process.ExitCode);
};
}
startProcess.OutputDataReceived += (sender, e) =>
process.OutputDataReceived += (sender, e) =>
{
if (!String.IsNullOrEmpty(e.Data)) { Console.WriteLine(e.Data); }
};
startProcess.ErrorDataReceived += (sender, e) =>
process.ErrorDataReceived += (sender, e) =>
{
if (!String.IsNullOrEmpty(e.Data)) { Console.Error.WriteLine(e.Data); }
};
if (logPath != null)
{
var logWriter = File.AppendText(logPath);
logWriter.AutoFlush = true;
logger.Capture(process);
startProcess.OutputDataReceived += (sender, e) =>
{
if (!String.IsNullOrEmpty(e.Data)) { logWriter.WriteLine(e.Data); }
};
process.Start();
process.BeginOutputReadLine();
process.BeginErrorReadLine();
startProcess.ErrorDataReceived += (sender, e) =>
{
if (!String.IsNullOrEmpty(e.Data)) { logWriter.WriteLine(e.Data); }
};
}
startProcess.Start();
startProcess.BeginOutputReadLine();
startProcess.BeginErrorReadLine();
client = listener.AcceptTcpClient();
stream = client.GetStream();
var tcpClient = listener.TcpListener.AcceptTcpClient();
client = new Client(tcpClient);
}
internal static string EncodeEventMessage(string name, string data)
public void Send(string message)
{
var bytes = System.Text.Encoding.UTF8.GetBytes(data);
var encoded = System.Convert.ToBase64String(bytes);
return $"event:{name}:{encoded}";
client.Send(message);
}
public void Publish(string name, string data) {
Send(EncodeEventMessage(name, data));
}
internal void Send(string message)
public void Subscribe(EventHandler handler)
{
var bytes = Encoding.UTF8.GetBytes(message + "\n");
stream.Write(bytes, 0, bytes.Length);
client.Subscribe(handler);
}
public int Stop()
{
if (HasExited)
{
return startProcess!.ExitCode;
return process!.ExitCode;
}
client.Close();
listener.Stop();
client.TcpClient.Close();
listener.TcpListener.Stop();
return WaitForExit();
}
public int WaitForExit()
{
startProcess!.WaitForExit();
return startProcess!.ExitCode;
process!.WaitForExit();
return process!.ExitCode;
}
private string relScript(string name)
@ -245,3 +234,100 @@ internal class Release
}
}
}
internal class Logger {
private StreamWriter? writer;
public Logger(string? path)
{
if (path != null)
{
writer = File.AppendText(path);
writer.AutoFlush = true;
}
}
public void Capture(Process process)
{
if (writer != null)
{
process.OutputDataReceived += (sender, e) =>
{
if (!String.IsNullOrEmpty(e.Data)) { writer.WriteLine(e.Data); }
};
process.ErrorDataReceived += (sender, e) =>
{
if (!String.IsNullOrEmpty(e.Data)) { writer.WriteLine(e.Data); }
};
}
}
}
internal class Listener
{
public int Port;
public TcpListener TcpListener;
public Listener()
{
var endpoint = new IPEndPoint(IPAddress.Loopback, 0);
TcpListener = new(endpoint);
TcpListener.Start();
Port = ((IPEndPoint)TcpListener.LocalEndpoint).Port;
}
}
internal class Client
{
public TcpClient TcpClient;
private NetworkStream stream;
public Client(TcpClient tcpClient)
{
TcpClient = tcpClient;
stream = tcpClient.GetStream();
}
public void Send(string payload)
{
var payloadBytes = System.Text.Encoding.UTF8.GetBytes(payload);
var size = System.Convert.ToUInt32(IPAddress.HostToNetworkOrder(payloadBytes.Length));
var headerBytes = BitConverter.GetBytes(size);
stream.Write(headerBytes, 0, headerBytes.Length);
stream.Write(payloadBytes, 0, payloadBytes.Length);
}
public void Subscribe(EventHandler handler)
{
var t = new Task(() => {
receiveMessage(handler);
});
t.Start();
}
private void receiveMessage(EventHandler handler)
{
var sizeBuf = new byte[4];
var sizeBytesRead = stream.Read(sizeBuf, 0, sizeBuf.Length);
// socket is closed
if (sizeBytesRead == 0) { return; }
var size = IPAddress.NetworkToHostOrder((int)BitConverter.ToUInt32(sizeBuf, 0));
var payloadBuf = new byte[size];
var payloadBytesRead = stream.Read(payloadBuf, 0, size);
// socket is closed
if (payloadBytesRead == 0) { return; }
var payload = System.Text.Encoding.UTF8.GetString(payloadBuf);
var parts = payload.Split(new char[] {':'}, 2);
var name = parts[0];
var data = parts[1];
handler(name, data);
receiveMessage(handler);
}
}

View file

@ -5,6 +5,8 @@ public class API {
static var process: Process?
private static var release: Release?
static let didReceiveEvent = NSNotification.Name("elixirkit.event")
public static var isRunning: Bool {
get {
release != nil && release!.isRunning;
@ -16,32 +18,40 @@ public class API {
}
public static func publish(_ name: String, _ data: String) {
release!.publish(name, data)
release?.publish(name, data)
}
public static func stop() {
release!.stop();
release?.stop();
}
public static func waitUntilExit() {
release!.waitUntilExit();
release?.waitUntilExit();
}
public static func addObserver(queue: OperationQueue?, using: @escaping (((String, String)) -> Void)) {
NotificationCenter.default.addObserver(forName: didReceiveEvent, object: nil, queue: queue) { n in
let (name, data) = n.object! as! (String, String)
using((name, data))
}
}
}
private class Release {
let process: Process
let logger: Logger
let listener: NWListener
let startProcess: Process
var connection: Connection?
let semaphore = DispatchSemaphore(value: 0)
var logHandle: FileHandle?
var connection: NWConnection?
var isRunning: Bool {
get {
startProcess.isRunning
process.isRunning
}
}
init(name: String, logPath: String? = nil, terminationHandler: ((Process) -> Void)? = nil) {
logger = Logger(logPath: logPath)
listener = try! NWListener(using: .tcp, on: .any)
let bundle = Bundle.main
@ -54,45 +64,17 @@ private class Release {
rootDir = bundle.bundlePath
}
startProcess = Process()
process = Process()
process.launchPath = "\(rootDir)/rel/bin/\(name)"
process.arguments = ["start"]
process.terminationHandler = terminationHandler
if logPath != nil {
let logPath = logPath!
let fm = FileManager.default
if !fm.fileExists(atPath: logPath) { fm.createFile(atPath: logPath, contents: Data()) }
logHandle = FileHandle(forUpdatingAtPath: logPath)!
logHandle!.seekToEndOfFile()
let stdout = Pipe()
let stderr = Pipe()
startProcess.standardOutput = stdout
startProcess.standardError = stderr
let stdouth = stdout.fileHandleForReading
let stderrh = stderr.fileHandleForReading
stdouth.waitForDataInBackgroundAndNotify()
stderrh.waitForDataInBackgroundAndNotify()
NotificationCenter.default.addObserver(
self,
selector: #selector(receiveStdout(n:)),
name: NSNotification.Name.NSFileHandleDataAvailable,
object: stdouth
)
NotificationCenter.default.addObserver(
self,
selector: #selector(receiveStderr(n:)),
name: NSNotification.Name.NSFileHandleDataAvailable,
object: stderrh
)
logger.capture(process: process)
}
startProcess.launchPath = "\(rootDir)/rel/bin/\(name)"
startProcess.arguments = ["start"]
startProcess.terminationHandler = terminationHandler
listener.stateUpdateHandler = stateDidChange(to:)
listener.newConnectionHandler = didAccept(connection:)
listener.newConnectionHandler = didAccept(conn:)
listener.start(queue: .global())
let timeout = DispatchTime.now() + DispatchTimeInterval.seconds(5)
@ -102,45 +84,92 @@ private class Release {
}
}
func stateDidChange(to state: NWListener.State) {
public func stop() {
connection!.cancel()
listener.cancel()
waitUntilExit()
}
public func waitUntilExit() {
process.waitUntilExit()
}
public func publish(_ name: String, _ data: String) {
connection!.send("\(name):\(data)")
}
private func stateDidChange(to state: NWListener.State) {
switch state {
case .ready:
start(port: listener.port!.rawValue.description)
case .failed(let error):
print("Server error: \(error.localizedDescription)")
print("Listener error: \(error.localizedDescription)")
exit(EXIT_FAILURE)
default:
break
}
}
func start(port: String) {
private func start(port: String) {
var env = ProcessInfo.processInfo.environment
env["ELIXIRKIT_PORT"] = port
startProcess.environment = env
try! startProcess.run()
process.environment = env
try! process.run()
}
func didAccept(connection: NWConnection) {
self.connection = connection
self.connection!.start(queue: .main)
private func didAccept(conn: NWConnection) {
self.connection = Connection(conn: conn, logger: logger)
semaphore.signal()
}
}
func send(_ string: String) {
connection!.send(
content: (string + "\n").data(using: .utf8),
completion: .contentProcessed { error in
if error != nil {
print(error!)
}
}
// Logs to stdout and a log file (if given)
private class Logger {
var logHandle: FileHandle?
init(logPath: String?) {
if let logPath = logPath {
let fm = FileManager.default
if !fm.fileExists(atPath: logPath) { fm.createFile(atPath: logPath, contents: Data()) }
logHandle = FileHandle(forUpdatingAtPath: logPath)!
logHandle!.seekToEndOfFile()
}
}
public func capture(process: Process) {
let stdout = Pipe()
let stderr = Pipe()
process.standardOutput = stdout
process.standardError = stderr
let stdouth = stdout.fileHandleForReading
let stderrh = stderr.fileHandleForReading
stdouth.waitForDataInBackgroundAndNotify()
stderrh.waitForDataInBackgroundAndNotify()
NotificationCenter.default.addObserver(
self,
selector: #selector(receiveStdout(n:)),
name: NSNotification.Name.NSFileHandleDataAvailable,
object: stdouth
)
NotificationCenter.default.addObserver(
self,
selector: #selector(receiveStderr(n:)),
name: NSNotification.Name.NSFileHandleDataAvailable,
object: stderrh
)
}
public func puts(_ string: String) {
let data = (string + "\n").data(using: .utf8)!
logHandle?.write(data)
FileHandle.standardOutput.write(data)
}
@objc
func receiveStdout(n: NSNotification) {
private func receiveStdout(n: NSNotification) {
let h = n.object as! FileHandle
let data = h.availableData
if !data.isEmpty {
@ -151,29 +180,107 @@ private class Release {
}
@objc
func receiveStderr(n: NSNotification) {
private func receiveStderr(n: NSNotification) {
let h = n.object as! FileHandle
let data = h.availableData
if !data.isEmpty {
logHandle!.write(data)
FileHandle.standardError.write(data)
logHandle!.write(data)
h.waitForDataInBackgroundAndNotify()
}
}
}
public func publish(_ name: String, _ data: String) {
let encoded = data.data(using: .utf8)!.base64EncodedString()
let message = "event:\(name):\(encoded)"
send(message)
private struct Connection {
let conn: NWConnection
let logger: Logger
init(conn: NWConnection, logger: Logger) {
self.logger = logger
self.conn = conn
self.conn.stateUpdateHandler = stateDidChange(to:)
self.conn.start(queue: .main)
}
public func stop() {
connection!.cancel()
listener.cancel()
waitUntilExit()
func stateDidChange(to state: NWConnection.State) {
switch state {
case .ready:
receiveEventMessage()
case .failed(let error):
logger.puts("\(error)")
exit(EXIT_FAILURE)
default:
break
}
}
public func waitUntilExit() {
startProcess.waitUntilExit()
// Receives event message from the socket and posts it as a notification.
// A message contains a header which is a big endian uint32 that is the length of the payload that follows.
func receiveEventMessage() {
conn.receive(minimumIncompleteLength: 4, maximumLength: 4) { (data, _context, isComplete, error) in
if (isComplete) {
return
}
if let error = error {
switch error {
case .posix(POSIXError.ECANCELED):
// socket is closed, ignore.
()
default:
logger.puts("\(error)")
}
return
}
let size = Int(data!.withUnsafeBytes { pointer in CFSwapInt32BigToHost(pointer.load(as: UInt32.self)) })
self.conn.receive(minimumIncompleteLength: size, maximumLength: size) { (payload, _context, isComplete, error) in
if (isComplete) {
return
}
if let error = error {
switch error {
case .posix(POSIXError.ECANCELED):
// socket is closed, ignore.
()
default:
logger.puts("\(error)")
}
return
}
let parts = payload!.split(separator: UInt8(ascii:":"), maxSplits: 1)
let eventName = String(data: parts[0], encoding: .utf8)!
let eventData = String(data: parts[1], encoding: .utf8)!
NotificationCenter.default.post(name: API.didReceiveEvent, object: (eventName, eventData))
self.receiveEventMessage()
}
}
}
func send(_ string: String) {
var message = Data()
let data = string.data(using: .utf8)!
withUnsafeBytes(of: UInt32(data.count).bigEndian) { message.append(contentsOf: $0) }
message.append(data)
conn.send(
content: message,
completion: .contentProcessed { error in
if error != nil {
print(error!)
}
}
)
}
func cancel() {
conn.cancel()
}
}

View file

@ -2,4 +2,8 @@ defmodule ElixirKit do
def start do
Supervisor.start_child(ElixirKit.Supervisor, {ElixirKit.Server, self()})
end
def publish(name, data) do
GenServer.call(ElixirKit.Server, {:send_event, name, data})
end
end

View file

@ -10,14 +10,20 @@ defmodule ElixirKit.Server do
@impl true
def init(pid) do
port = System.fetch_env!("ELIXIRKIT_PORT") |> String.to_integer()
{:ok, socket} = :gen_tcp.connect('localhost', port, mode: :binary, packet: :line)
{:ok, socket} = :gen_tcp.connect('localhost', port, mode: :binary, packet: 4)
{:ok, %{pid: pid, socket: socket}}
end
@impl true
def handle_info({:tcp, socket, "event:" <> rest}, state) when socket == state.socket do
[name, data] = rest |> String.trim_trailing() |> String.split(":")
data = Base.decode64!(data)
def handle_call({:send_event, name, data}, _from, state) do
payload = [name, ?:, data]
:ok = :gen_tcp.send(state.socket, payload)
{:reply, :ok, state}
end
@impl true
def handle_info({:tcp, socket, payload}, state) when socket == state.socket do
[name, data] = :binary.split(payload, ":")
send(state.pid, {:event, name, data})
{:noreply, state}
end