From 9d7e7288c68093f8e857ab83bc44203102a72ca0 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 23 Aug 2019 18:47:15 -0500 Subject: [PATCH] start migrating the router to an actor --- go.mod | 1 + go.sum | 2 + src/yggdrasil/conn.go | 2 +- src/yggdrasil/dht.go | 2 - src/yggdrasil/peer.go | 2 +- src/yggdrasil/router.go | 84 +++++++++++++++++++++-------------------- src/yggdrasil/search.go | 2 +- src/yggdrasil/switch.go | 10 +---- 8 files changed, 51 insertions(+), 54 deletions(-) diff --git a/go.mod b/go.mod index 5569496..3f5cae8 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,7 @@ module github.com/yggdrasil-network/yggdrasil-go require ( + github.com/Arceliar/phony v0.0.0-20190821233739-c7f353f14438 github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 github.com/hashicorp/go-syslog v1.0.0 github.com/hjson/hjson-go v0.0.0-20181010104306-a25ecf6bd222 diff --git a/go.sum b/go.sum index 756ed5a..22276c2 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/Arceliar/phony v0.0.0-20190821233739-c7f353f14438 h1:t4tRgrItIq2ap4O31yOuWm17lUiyzf8gf/P+bEfgmrw= +github.com/Arceliar/phony v0.0.0-20190821233739-c7f353f14438/go.mod h1:2Q9yJvg2PlMrnOEa3RTEy9hElWAICo/D8HTUDqAHUAo= github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8 h1:WD8iJ37bRNwvETMfVTusVSAi0WdXTpfNVGY2aHycNKY= github.com/gologme/log v0.0.0-20181207131047-4e5d8ccb38e8/go.mod h1:gq31gQ8wEHkR+WekdWsqDuf8pXTUZA9BnnzTuPz1Y9U= github.com/hashicorp/go-syslog v1.0.0 h1:KaodqZuhUoZereWVIYmpUgZysurB1kBLX2j0MwMrUAE= diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 25330aa..c77e19d 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -134,7 +134,7 @@ func (c *Conn) doSearch() { sinfo.continueSearch() } } - go func() { c.core.router.admin <- routerWork }() + go c.core.router.doAdmin(routerWork) } func (c *Conn) getDeadlineCancellation(value *atomic.Value) (util.Cancellation, bool) { diff --git a/src/yggdrasil/dht.go b/src/yggdrasil/dht.go index b53e29c..d35d3aa 100644 --- a/src/yggdrasil/dht.go +++ b/src/yggdrasil/dht.go @@ -68,7 +68,6 @@ type dht struct { core *Core reconfigure chan chan error nodeID crypto.NodeID - peers chan *dhtInfo // other goroutines put incoming dht updates here reqs map[dhtReqKey]time.Time // Keeps track of recent outstanding requests callbacks map[dhtReqKey][]dht_callbackInfo // Search and admin lookup callbacks // These next two could be replaced by a single linked list or similar... @@ -87,7 +86,6 @@ func (t *dht) init(c *Core) { } }() t.nodeID = *t.core.NodeID() - t.peers = make(chan *dhtInfo, 1024) t.callbacks = make(map[dhtReqKey][]dht_callbackInfo) t.reset() } diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 379ca85..fcd9364 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -210,7 +210,7 @@ func (p *peer) linkLoop() { case dinfo = <-p.dinfo: case _ = <-tick.C: if dinfo != nil { - p.core.dht.peers <- dinfo + p.core.router.insertPeer(&p.core.router, dinfo) } } } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index bdead84..4ce6b7e 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -30,19 +30,19 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util" + + "github.com/Arceliar/phony" ) // The router struct has channels to/from the adapter device and a self peer (0), which is how messages are passed between this node and the peers/switch layer. // The router's mainLoop goroutine is responsible for managing all information related to the dht, searches, and crypto sessions. type router struct { + phony.Actor core *Core reconfigure chan chan error addr address.Address subnet address.Subnet - in <-chan [][]byte // packets we received from the network, link to peer's "out" - out func([]byte) // packets we're sending to the network, link to peer's "in" - reset chan struct{} // signal that coords changed (re-init sessions/dht) - admin chan func() // pass a lambda for the admin socket to query stuff + out func([]byte) // packets we're sending to the network, link to peer's "in" nodeinfo nodeinfo } @@ -52,7 +52,6 @@ func (r *router) init(core *Core) { r.reconfigure = make(chan chan error, 1) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) - in := make(chan [][]byte, 1) // TODO something better than this... self := linkInterface{ name: "(self)", info: linkInfo{ @@ -62,8 +61,10 @@ func (r *router) init(core *Core) { }, } p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) - p.out = func(packets [][]byte) { in <- packets } - r.in = in + p.out = func(packets [][]byte) { + // TODO make peers and/or the switch into actors, have them pass themselves as the from field + r.handlePackets(r, packets) + } out := make(chan []byte, 32) go func() { for packet := range out { @@ -90,8 +91,6 @@ func (r *router) init(core *Core) { } }() r.out = func(packet []byte) { out2 <- packet } - r.reset = make(chan struct{}, 1) - r.admin = make(chan func(), 32) r.nodeinfo.init(r.core) r.core.config.Mutex.RLock() r.nodeinfo.setNodeInfo(r.core.config.Current.NodeInfo, r.core.config.Current.NodeInfoPrivacy) @@ -105,42 +104,55 @@ func (r *router) start() error { return nil } -// Takes traffic from the adapter and passes it to router.send, or from r.in and handles incoming traffic. -// Also adds new peer info to the DHT. -// Also resets the DHT and sesssions in the event of a coord change. -// Also does periodic maintenance stuff. +// In practice, the switch will call this with 1 packet +func (r *router) handlePackets(from phony.IActor, packets [][]byte) { + r.EnqueueFrom(from, func() { + for _, packet := range packets { + r.handlePacket(packet) + } + }) +} + +// Insert a peer info into the dht, TODO? make the dht a separate actor +func (r *router) insertPeer(from phony.IActor, info *dhtInfo) { + r.EnqueueFrom(from, func() { + r.core.dht.insertPeer(info) + }) +} + +// Reset sessions and DHT after the switch sees our coords change +func (r *router) reset(from phony.IActor) { + r.EnqueueFrom(from, func() { + r.core.sessions.reset() + r.core.dht.reset() + }) +} + +// TODO remove reconfigure so this is just a ticker loop +// and then find something better than a ticker loop to schedule things... func (r *router) mainLoop() { ticker := time.NewTicker(time.Second) defer ticker.Stop() for { select { - case ps := <-r.in: - for _, p := range ps { - r.handleIn(p) - } - case info := <-r.core.dht.peers: - r.core.dht.insertPeer(info) - case <-r.reset: - r.core.sessions.reset() - r.core.dht.reset() case <-ticker.C: - { + r.SyncExec(func() { // Any periodic maintenance stuff goes here r.core.switchTable.doMaintenance() r.core.dht.doMaintenance() r.core.sessions.cleanup() - } - case f := <-r.admin: - f() + }) case e := <-r.reconfigure: - current := r.core.config.GetCurrent() - e <- r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy) + r.SyncExec(func() { + current := r.core.config.GetCurrent() + e <- r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy) + }) } } } // Checks incoming traffic type and passes it to the appropriate handler. -func (r *router) handleIn(packet []byte) { +func (r *router) handlePacket(packet []byte) { pType, pTypeLen := wire_decode_uint64(packet) if pTypeLen == 0 { return @@ -263,17 +275,7 @@ func (r *router) handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) { r.nodeinfo.handleNodeInfo(&req) } -// Passed a function to call. -// This will send the function to r.admin and block until it finishes. -// It's used by the admin socket to ask the router mainLoop goroutine about information in the session or dht structs, which cannot be read safely from outside that goroutine. +// TODO remove this, have things either be actors that send message or else call SyncExec directly func (r *router) doAdmin(f func()) { - // Pass this a function that needs to be run by the router's main goroutine - // It will pass the function to the router and wait for the router to finish - done := make(chan struct{}) - newF := func() { - f() - close(done) - } - r.admin <- newF - <-done + r.SyncExec(f) } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index b970fe5..c035e72 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -165,7 +165,7 @@ func (sinfo *searchInfo) continueSearch() { } go func() { time.Sleep(search_RETRY_TIME) - sinfo.core.router.admin <- retryLater + sinfo.core.router.doAdmin(retryLater) }() } diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index cc316d1..86ae102 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -245,10 +245,7 @@ func (t *switchTable) cleanRoot() { if t.data.locator.root != t.key { t.data.seq++ t.updater.Store(&sync.Once{}) - select { - case t.core.router.reset <- struct{}{}: - default: - } + t.core.router.reset(&t.core.router) } t.data.locator = switchLocator{root: t.key, tstamp: now.Unix()} t.core.peers.sendSwitchMsgs() @@ -511,10 +508,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep if !equiv(&sender.locator, &t.data.locator) { doUpdate = true t.data.seq++ - select { - case t.core.router.reset <- struct{}{}: - default: - } + t.core.router.reset(&t.core.router) } if t.data.locator.tstamp != sender.locator.tstamp { t.time = now