From 9304873047a6f3be84fef00c4be528f65e8ee2d7 Mon Sep 17 00:00:00 2001 From: Neil Alexander Date: Sun, 5 Jan 2020 22:15:52 +0000 Subject: [PATCH] Convert nodeinfo to actor --- src/yggdrasil/api.go | 19 +++---- src/yggdrasil/nodeinfo.go | 114 +++++++++++++++++++++++--------------- src/yggdrasil/router.go | 1 + 3 files changed, 77 insertions(+), 57 deletions(-) diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 7f82c26..693dbd0 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -390,17 +390,14 @@ func (c *Core) SetMaximumSessionMTU(mtu uint16) { // necessary when, e.g. crawling the network. func (c *Core) GetNodeInfo(key crypto.BoxPubKey, coords []uint64, nocache bool) (NodeInfoPayload, error) { response := make(chan *NodeInfoPayload, 1) - sendNodeInfoRequest := func() { - c.router.nodeinfo.addCallback(key, func(nodeinfo *NodeInfoPayload) { - defer func() { recover() }() - select { - case response <- nodeinfo: - default: - } - }) - c.router.nodeinfo.sendNodeInfo(key, wire_coordsUint64stoBytes(coords), false) - } - phony.Block(&c.router, sendNodeInfoRequest) + c.router.nodeinfo.addCallback(key, func(nodeinfo *NodeInfoPayload) { + defer func() { recover() }() + select { + case response <- nodeinfo: + default: + } + }) + c.router.nodeinfo.sendNodeInfo(key, wire_coordsUint64stoBytes(coords), false) timer := time.AfterFunc(6*time.Second, func() { close(response) }) defer timer.Stop() for res := range response { diff --git a/src/yggdrasil/nodeinfo.go b/src/yggdrasil/nodeinfo.go index 8a5d787..c3e9a27 100644 --- a/src/yggdrasil/nodeinfo.go +++ b/src/yggdrasil/nodeinfo.go @@ -5,21 +5,19 @@ import ( "errors" "runtime" "strings" - "sync" "time" + "github.com/Arceliar/phony" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/version" ) type nodeinfo struct { - core *Core - myNodeInfo NodeInfoPayload - myNodeInfoMutex sync.RWMutex - callbacks map[crypto.BoxPubKey]nodeinfoCallback - callbacksMutex sync.Mutex - cache map[crypto.BoxPubKey]nodeinfoCached - cacheMutex sync.RWMutex + phony.Inbox + core *Core + myNodeInfo NodeInfoPayload + callbacks map[crypto.BoxPubKey]nodeinfoCallback + cache map[crypto.BoxPubKey]nodeinfoCached } type nodeinfoCached struct { @@ -43,35 +41,43 @@ type nodeinfoReqRes struct { // Initialises the nodeinfo cache/callback maps, and starts a goroutine to keep // the cache/callback maps clean of stale entries func (m *nodeinfo) init(core *Core) { + m.Act(m, func() { + m._init(core) + }) +} + +func (m *nodeinfo) _init(core *Core) { m.core = core m.callbacks = make(map[crypto.BoxPubKey]nodeinfoCallback) m.cache = make(map[crypto.BoxPubKey]nodeinfoCached) - var f func() - f = func() { - m.callbacksMutex.Lock() - for boxPubKey, callback := range m.callbacks { - if time.Since(callback.created) > time.Minute { - delete(m.callbacks, boxPubKey) - } + m._cleanup() +} + +func (m *nodeinfo) _cleanup() { + for boxPubKey, callback := range m.callbacks { + if time.Since(callback.created) > time.Minute { + delete(m.callbacks, boxPubKey) } - m.callbacksMutex.Unlock() - m.cacheMutex.Lock() - for boxPubKey, cache := range m.cache { - if time.Since(cache.created) > time.Hour { - delete(m.cache, boxPubKey) - } - } - m.cacheMutex.Unlock() - time.AfterFunc(time.Second*30, f) } - go f() + for boxPubKey, cache := range m.cache { + if time.Since(cache.created) > time.Hour { + delete(m.cache, boxPubKey) + } + } + time.AfterFunc(time.Second*30, func() { + m.Act(m, m._cleanup) + }) } // Add a callback for a nodeinfo lookup func (m *nodeinfo) addCallback(sender crypto.BoxPubKey, call func(nodeinfo *NodeInfoPayload)) { - m.callbacksMutex.Lock() - defer m.callbacksMutex.Unlock() + m.Act(m, func() { + m._addCallback(sender, call) + }) +} + +func (m *nodeinfo) _addCallback(sender crypto.BoxPubKey, call func(nodeinfo *NodeInfoPayload)) { m.callbacks[sender] = nodeinfoCallback{ created: time.Now(), call: call, @@ -79,9 +85,7 @@ func (m *nodeinfo) addCallback(sender crypto.BoxPubKey, call func(nodeinfo *Node } // Handles the callback, if there is one -func (m *nodeinfo) callback(sender crypto.BoxPubKey, nodeinfo NodeInfoPayload) { - m.callbacksMutex.Lock() - defer m.callbacksMutex.Unlock() +func (m *nodeinfo) _callback(sender crypto.BoxPubKey, nodeinfo NodeInfoPayload) { if callback, ok := m.callbacks[sender]; ok { callback.call(&nodeinfo) delete(m.callbacks, sender) @@ -89,16 +93,26 @@ func (m *nodeinfo) callback(sender crypto.BoxPubKey, nodeinfo NodeInfoPayload) { } // Get the current node's nodeinfo -func (m *nodeinfo) getNodeInfo() NodeInfoPayload { - m.myNodeInfoMutex.RLock() - defer m.myNodeInfoMutex.RUnlock() +func (m *nodeinfo) getNodeInfo() (p NodeInfoPayload) { + phony.Block(m, func() { + p = m._getNodeInfo() + }) + return +} + +func (m *nodeinfo) _getNodeInfo() NodeInfoPayload { return m.myNodeInfo } // Set the current node's nodeinfo -func (m *nodeinfo) setNodeInfo(given interface{}, privacy bool) error { - m.myNodeInfoMutex.Lock() - defer m.myNodeInfoMutex.Unlock() +func (m *nodeinfo) setNodeInfo(given interface{}, privacy bool) (err error) { + phony.Block(m, func() { + err = m._setNodeInfo(given, privacy) + }) + return +} + +func (m *nodeinfo) _setNodeInfo(given interface{}, privacy bool) error { defaults := map[string]interface{}{ "buildname": version.BuildName(), "buildversion": version.BuildVersion(), @@ -134,9 +148,7 @@ func (m *nodeinfo) setNodeInfo(given interface{}, privacy bool) error { } // Add nodeinfo into the cache for a node -func (m *nodeinfo) addCachedNodeInfo(key crypto.BoxPubKey, payload NodeInfoPayload) { - m.cacheMutex.Lock() - defer m.cacheMutex.Unlock() +func (m *nodeinfo) _addCachedNodeInfo(key crypto.BoxPubKey, payload NodeInfoPayload) { m.cache[key] = nodeinfoCached{ created: time.Now(), payload: payload, @@ -144,9 +156,7 @@ func (m *nodeinfo) addCachedNodeInfo(key crypto.BoxPubKey, payload NodeInfoPaylo } // Get a nodeinfo entry from the cache -func (m *nodeinfo) getCachedNodeInfo(key crypto.BoxPubKey) (NodeInfoPayload, error) { - m.cacheMutex.RLock() - defer m.cacheMutex.RUnlock() +func (m *nodeinfo) _getCachedNodeInfo(key crypto.BoxPubKey) (NodeInfoPayload, error) { if nodeinfo, ok := m.cache[key]; ok { return nodeinfo.payload, nil } @@ -155,21 +165,33 @@ func (m *nodeinfo) getCachedNodeInfo(key crypto.BoxPubKey) (NodeInfoPayload, err // Handles a nodeinfo request/response - called from the router func (m *nodeinfo) handleNodeInfo(nodeinfo *nodeinfoReqRes) { + m.Act(m, func() { + m._handleNodeInfo(nodeinfo) + }) +} + +func (m *nodeinfo) _handleNodeInfo(nodeinfo *nodeinfoReqRes) { if nodeinfo.IsResponse { - m.callback(nodeinfo.SendPermPub, nodeinfo.NodeInfo) - m.addCachedNodeInfo(nodeinfo.SendPermPub, nodeinfo.NodeInfo) + m._callback(nodeinfo.SendPermPub, nodeinfo.NodeInfo) + m._addCachedNodeInfo(nodeinfo.SendPermPub, nodeinfo.NodeInfo) } else { - m.sendNodeInfo(nodeinfo.SendPermPub, nodeinfo.SendCoords, true) + m._sendNodeInfo(nodeinfo.SendPermPub, nodeinfo.SendCoords, true) } } // Send nodeinfo request or response - called from the router func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) { + m.Act(m, func() { + m._sendNodeInfo(key, coords, isResponse) + }) +} + +func (m *nodeinfo) _sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) { table := m.core.switchTable.table.Load().(lookupTable) nodeinfo := nodeinfoReqRes{ SendCoords: table.self.getCoords(), IsResponse: isResponse, - NodeInfo: m.getNodeInfo(), + NodeInfo: m._getNodeInfo(), } bs := nodeinfo.encode() shared := m.core.router.sessions.getSharedKey(&m.core.boxPriv, &key) diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 64c8170..bd6eefd 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -78,6 +78,7 @@ func (r *router) init(core *Core) { func (r *router) reconfigure() { // Reconfigure the router current := r.core.config.GetCurrent() + r.core.log.Println("Reloading NodeInfo...") if err := r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy); err != nil { r.core.log.Errorln("Error reloading NodeInfo:", err) } else {