mirror of
https://github.com/gravitl/netmaker.git
synced 2025-10-06 20:05:46 +08:00
implmented chunking
This commit is contained in:
parent
83250980eb
commit
98609ac61e
2 changed files with 22 additions and 20 deletions
9
mq/mq.go
9
mq/mq.go
|
@ -53,12 +53,13 @@ func Ping(client mqtt.Client, msg mqtt.Message) {
|
||||||
}
|
}
|
||||||
_, decryptErr := decryptMsg(&node, msg.Payload())
|
_, decryptErr := decryptMsg(&node, msg.Payload())
|
||||||
if decryptErr != nil {
|
if decryptErr != nil {
|
||||||
logger.Log(0, "error updating node ", node.ID, err.Error())
|
logger.Log(0, "error decrypting when updating node ", node.ID, decryptErr.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
node.SetLastCheckIn()
|
node.SetLastCheckIn()
|
||||||
if err := logic.UpdateNode(&node, &node); err != nil {
|
if err := logic.UpdateNode(&node, &node); err != nil {
|
||||||
logger.Log(0, "error updating node ", err.Error())
|
logger.Log(0, "error updating node", node.Name, node.ID, " on checkin", err.Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
logger.Log(3, "ping processed for node", node.ID)
|
logger.Log(3, "ping processed for node", node.ID)
|
||||||
// --TODO --set client version once feature is implemented.
|
// --TODO --set client version once feature is implemented.
|
||||||
|
@ -84,7 +85,6 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
|
||||||
logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
|
logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.Log(1, "Update Node Handler", id)
|
|
||||||
var newNode models.Node
|
var newNode models.Node
|
||||||
if err := json.Unmarshal(decrypted, &newNode); err != nil {
|
if err := json.Unmarshal(decrypted, &newNode); err != nil {
|
||||||
logger.Log(1, "error unmarshaling payload ", err.Error())
|
logger.Log(1, "error unmarshaling payload ", err.Error())
|
||||||
|
@ -92,12 +92,13 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
|
||||||
}
|
}
|
||||||
if err := logic.UpdateNode(¤tNode, &newNode); err != nil {
|
if err := logic.UpdateNode(¤tNode, &newNode); err != nil {
|
||||||
logger.Log(1, "error saving node", err.Error())
|
logger.Log(1, "error saving node", err.Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if err := PublishPeerUpdate(&newNode); err != nil {
|
if err := PublishPeerUpdate(&newNode); err != nil {
|
||||||
logger.Log(1, "error publishing peer update ", err.Error())
|
logger.Log(1, "error publishing peer update ", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.Log(1, "no need to update peers")
|
logger.Log(1, "Updated node", id, newNode.Name)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3,7 +3,6 @@ package ncutils
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"encoding/gob"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
@ -11,7 +10,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
chunkSize = 16128 // 16128 bytes max message size
|
chunkSize = 16000 // 16128 bytes max message size
|
||||||
)
|
)
|
||||||
|
|
||||||
// BoxEncrypt - encrypts traffic box
|
// BoxEncrypt - encrypts traffic box
|
||||||
|
@ -66,7 +65,7 @@ func Chunk(message []byte, recipientPubKey *[32]byte, senderPrivateKey *[32]byte
|
||||||
|
|
||||||
// DeChunk - "de" chunks and decrypts a message
|
// DeChunk - "de" chunks and decrypts a message
|
||||||
func DeChunk(chunkedMsg []byte, senderPublicKey *[32]byte, recipientPrivateKey *[32]byte) ([]byte, error) {
|
func DeChunk(chunkedMsg []byte, senderPublicKey *[32]byte, recipientPrivateKey *[32]byte) ([]byte, error) {
|
||||||
chunks, err := convertMsgToBytes(chunkedMsg)
|
chunks, err := convertMsgToBytes(chunkedMsg) // convert the message to it's original chunks form
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -84,26 +83,28 @@ func DeChunk(chunkedMsg []byte, senderPublicKey *[32]byte, recipientPrivateKey *
|
||||||
|
|
||||||
// == private ==
|
// == private ==
|
||||||
|
|
||||||
|
var splitKey = []byte("|(,)(,)|")
|
||||||
|
|
||||||
// ConvertMsgToBytes - converts a message (MQ) to it's chunked version
|
// ConvertMsgToBytes - converts a message (MQ) to it's chunked version
|
||||||
// decode action
|
// decode action
|
||||||
func convertMsgToBytes(msg []byte) ([][]byte, error) {
|
func convertMsgToBytes(msg []byte) ([][]byte, error) {
|
||||||
var buffer = bytes.NewBuffer(msg)
|
splitMsg := bytes.Split(msg, splitKey)
|
||||||
var dec = gob.NewDecoder(buffer)
|
return splitMsg, nil
|
||||||
var result [][]byte
|
|
||||||
var err = dec.Decode(&result)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return result, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ConvertBytesToMsg - converts the chunked message into a MQ message
|
// ConvertBytesToMsg - converts the chunked message into a MQ message
|
||||||
// encode action
|
// encode action
|
||||||
func convertBytesToMsg(b [][]byte) ([]byte, error) {
|
func convertBytesToMsg(b [][]byte) ([]byte, error) {
|
||||||
var buffer bytes.Buffer
|
// var totalSize = len(b) * len(splitKey)
|
||||||
var enc = gob.NewEncoder(&buffer)
|
// for _, buf := range b {
|
||||||
if err := enc.Encode(b); err != nil {
|
// totalSize += len(buf)
|
||||||
return nil, err
|
// }
|
||||||
|
var buffer []byte // allocate a buffer with adequate sizing
|
||||||
|
for i := range b { // append bytes to it with key
|
||||||
|
buffer = append(buffer, b[i]...)
|
||||||
|
if i != len(b)-1 {
|
||||||
|
buffer = append(buffer, splitKey...)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return buffer.Bytes(), nil
|
return buffer, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue