livebook/elixirkit/elixirkit_swift/Sources/ElixirKit/ElixirKit.swift

301 lines
8.5 KiB
Swift
Raw Normal View History

2023-01-17 04:09:47 +08:00
import Foundation
import Network
public class API {
static var process: Process?
private static var release: Release?
static let didReceiveEvent = NSNotification.Name("elixirkit.event")
2023-01-17 04:09:47 +08:00
public static var isRunning: Bool {
get {
release != nil && release!.isRunning;
}
}
public static func start(
name: String,
logPath: String? = nil,
readyHandler: @escaping () -> Void,
terminationHandler: ((Process) -> Void)? = nil) {
release = Release(name: name, logPath: logPath, readyHandler: readyHandler, terminationHandler: terminationHandler)
2023-01-17 04:09:47 +08:00
}
public static func publish(_ name: String, _ data: String) {
release!.publish(name, data)
2023-01-17 04:09:47 +08:00
}
public static func stop() {
release!.stop();
2023-01-17 04:09:47 +08:00
}
public static func waitUntilExit() {
release!.waitUntilExit();
}
public static func addObserver(queue: OperationQueue?, using: @escaping (((String, String)) -> Void)) {
NotificationCenter.default.addObserver(forName: didReceiveEvent, object: nil, queue: queue) { n in
let (name, data) = n.object! as! (String, String)
using((name, data))
}
2023-01-17 04:09:47 +08:00
}
}
private class Release {
let process: Process
let logger: Logger
2023-01-17 04:09:47 +08:00
let listener: NWListener
var connection: Connection?
let readyHandler: () -> Void
2023-01-17 04:09:47 +08:00
let semaphore = DispatchSemaphore(value: 0)
var isRunning: Bool {
get {
process.isRunning
2023-01-17 04:09:47 +08:00
}
}
init(
name: String,
logPath: String? = nil,
readyHandler: @escaping () -> Void,
terminationHandler: ((Process) -> Void)? = nil) {
self.readyHandler = readyHandler
logger = Logger(logPath: logPath)
2023-01-17 04:09:47 +08:00
listener = try! NWListener(using: .tcp, on: .any)
let bundle = Bundle.main
var rootDir = "";
if bundle.bundlePath.hasSuffix(".app") {
rootDir = "\(bundle.bundlePath)/Contents/Resources"
}
else {
rootDir = bundle.bundlePath
}
process = Process()
process.launchPath = "\(rootDir)/rel/bin/\(name)"
process.arguments = ["start"]
process.terminationHandler = terminationHandler
2023-01-17 04:09:47 +08:00
if logPath != nil {
logger.capture(process: process)
2023-01-17 04:09:47 +08:00
}
listener.stateUpdateHandler = stateDidChange(to:)
listener.newConnectionHandler = didAccept(conn:)
2023-01-17 04:09:47 +08:00
listener.start(queue: .global())
let seconds = 5
2023-03-28 21:15:30 +08:00
let timeout = DispatchTime.now() + DispatchTimeInterval.seconds(seconds)
2023-01-17 04:09:47 +08:00
if semaphore.wait(timeout: timeout) == .timedOut {
2023-03-28 21:15:30 +08:00
fatalError("waited for connection for more than \(seconds)s")
2023-01-17 04:09:47 +08:00
}
}
public func stop() {
connection?.cancel()
listener.cancel()
waitUntilExit()
}
public func waitUntilExit() {
process.waitUntilExit()
}
public func publish(_ name: String, _ data: String) {
connection!.send("\(name):\(data)")
}
private func stateDidChange(to state: NWListener.State) {
2023-01-17 04:09:47 +08:00
switch state {
case .ready:
start(port: listener.port!.rawValue.description)
case .failed(let error):
print("Listener error: \(error.localizedDescription)")
2023-01-17 04:09:47 +08:00
exit(EXIT_FAILURE)
default:
break
}
}
private func start(port: String) {
2023-01-17 04:09:47 +08:00
var env = ProcessInfo.processInfo.environment
env["ELIXIRKIT_PORT"] = port
process.environment = env
try! process.run()
semaphore.signal()
2023-01-17 04:09:47 +08:00
}
private func didAccept(conn: NWConnection) {
self.connection = Connection(conn: conn, logger: logger)
readyHandler()
2023-01-17 04:09:47 +08:00
}
}
2023-01-17 04:09:47 +08:00
// Logs to stdout and a log file (if given)
private class Logger {
var logHandle: FileHandle?
init(logPath: String?) {
if let logPath = logPath {
let fm = FileManager.default
if !fm.fileExists(atPath: logPath) { fm.createFile(atPath: logPath, contents: Data()) }
logHandle = FileHandle(forUpdatingAtPath: logPath)!
logHandle!.seekToEndOfFile()
}
}
public func capture(process: Process) {
let stdout = Pipe()
let stderr = Pipe()
process.standardOutput = stdout
process.standardError = stderr
let stdouth = stdout.fileHandleForReading
let stderrh = stderr.fileHandleForReading
stdouth.waitForDataInBackgroundAndNotify()
stderrh.waitForDataInBackgroundAndNotify()
NotificationCenter.default.addObserver(
self,
selector: #selector(receiveStdout(n:)),
name: NSNotification.Name.NSFileHandleDataAvailable,
object: stdouth
)
NotificationCenter.default.addObserver(
self,
selector: #selector(receiveStderr(n:)),
name: NSNotification.Name.NSFileHandleDataAvailable,
object: stderrh
2023-01-17 04:09:47 +08:00
)
}
public func puts(_ string: String) {
let data = (string + "\n").data(using: .utf8)!
logHandle?.write(data)
FileHandle.standardOutput.write(data)
}
2023-01-17 04:09:47 +08:00
@objc
private func receiveStdout(n: NSNotification) {
2023-01-17 04:09:47 +08:00
let h = n.object as! FileHandle
let data = h.availableData
if !data.isEmpty {
FileHandle.standardOutput.write(data)
logHandle!.write(data)
h.waitForDataInBackgroundAndNotify()
}
}
@objc
private func receiveStderr(n: NSNotification) {
2023-01-17 04:09:47 +08:00
let h = n.object as! FileHandle
let data = h.availableData
if !data.isEmpty {
FileHandle.standardError.write(data)
logHandle!.write(data)
2023-01-17 04:09:47 +08:00
h.waitForDataInBackgroundAndNotify()
}
}
}
2023-01-17 04:09:47 +08:00
private struct Connection {
let conn: NWConnection
let logger: Logger
init(conn: NWConnection, logger: Logger) {
self.logger = logger
self.conn = conn
self.conn.stateUpdateHandler = stateDidChange(to:)
self.conn.start(queue: .main)
2023-01-17 04:09:47 +08:00
}
func stateDidChange(to state: NWConnection.State) {
switch state {
case .ready:
receiveEventMessage()
case .failed(let error):
logger.puts("\(error)")
exit(EXIT_FAILURE)
default:
break
}
2023-01-17 04:09:47 +08:00
}
// Receives event message from the socket and posts it as a notification.
// A message contains a header which is a big endian uint32 that is the length of the payload that follows.
func receiveEventMessage() {
conn.receive(minimumIncompleteLength: 4, maximumLength: 4) { (data, _context, isComplete, error) in
if (isComplete) {
return
}
if let error = error {
switch error {
case .posix(POSIXError.ECANCELED):
// socket is closed, ignore.
()
default:
logger.puts("\(error)")
}
return
}
let size = Int(data!.withUnsafeBytes { pointer in CFSwapInt32BigToHost(pointer.load(as: UInt32.self)) })
self.conn.receive(minimumIncompleteLength: size, maximumLength: size) { (payload, _context, isComplete, error) in
if (isComplete) {
return
}
if let error = error {
switch error {
case .posix(POSIXError.ECANCELED):
// socket is closed, ignore.
()
default:
logger.puts("\(error)")
}
return
}
let parts = payload!.split(separator: UInt8(ascii:":"), maxSplits: 1)
let eventName = String(data: parts[0], encoding: .utf8)!
let eventData = String(data: parts[1], encoding: .utf8)!
NotificationCenter.default.post(name: API.didReceiveEvent, object: (eventName, eventData))
self.receiveEventMessage()
}
}
}
func send(_ string: String) {
var message = Data()
let data = string.data(using: .utf8)!
withUnsafeBytes(of: UInt32(data.count).bigEndian) { message.append(contentsOf: $0) }
message.append(data)
conn.send(
content: message,
completion: .contentProcessed { error in
if error != nil {
print(error!)
}
}
)
}
func cancel() {
conn.cancel()
2023-01-17 04:09:47 +08:00
}
}