diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 57ee5c6..785a5f7 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -213,7 +213,7 @@ func (c *Core) GetSessions() []Session { workerFunc := func() { session = Session{ Coords: append([]uint64{}, wire_coordsBytestoUint64s(sinfo.coords)...), - MTU: sinfo.getMTU(), + MTU: sinfo._getMTU(), BytesSent: sinfo.bytesSent, BytesRecvd: sinfo.bytesRecvd, Uptime: time.Now().Sub(sinfo.timeOpened), diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index c77e19d..51dd950 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -190,8 +190,8 @@ func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error { var err error sessionFunc := func() { // Does the packet exceed the permitted size for the session? - if uint16(len(msg.Message)) > c.session.getMTU() { - err = ConnError{errors.New("packet too big"), true, false, false, int(c.session.getMTU())} + if uint16(len(msg.Message)) > c.session._getMTU() { + err = ConnError{errors.New("packet too big"), true, false, false, int(c.session._getMTU())} return } // The rest of this work is session keep-alive traffic @@ -201,10 +201,10 @@ func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error { // TODO double check that the above condition is correct c.doSearch() } else { - c.core.sessions.ping(c.session) + c.session.ping(c.session) // TODO send from self if this becomes an actor } case c.session.reset && c.session.pingTime.Before(c.session.time): - c.core.sessions.ping(c.session) + c.session.ping(c.session) // TODO send from self if this becomes an actor default: // Don't do anything, to keep traffic throttled } } diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index b24400d..9552184 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -98,7 +98,7 @@ func (r *router) insertPeer(from phony.IActor, info *dhtInfo) { // Reset sessions and DHT after the switch sees our coords change func (r *router) reset(from phony.IActor) { r.EnqueueFrom(from, func() { - r.core.sessions.reset() + r.core.sessions.reset(r) r.core.dht.reset() }) } @@ -111,14 +111,14 @@ func (r *router) _mainLoop() { for { select { case <-ticker.C: - r.SyncExec(func() { + <-r.SyncExec(func() { // Any periodic maintenance stuff goes here r.core.switchTable.doMaintenance() r.core.dht.doMaintenance() r.core.sessions.cleanup() }) case e := <-r.reconfigure: - r.SyncExec(func() { + <-r.SyncExec(func() { current := r.core.config.GetCurrent() e <- r.nodeinfo.setNodeInfo(current.NodeInfo, current.NodeInfoPrivacy) }) @@ -252,5 +252,5 @@ func (r *router) _handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) { // TODO remove this, have things either be actors that send message or else call SyncExec directly func (r *router) doAdmin(f func()) { - r.SyncExec(f) + <-r.SyncExec(f) } diff --git a/src/yggdrasil/search.go b/src/yggdrasil/search.go index c035e72..56adda8 100644 --- a/src/yggdrasil/search.go +++ b/src/yggdrasil/search.go @@ -213,7 +213,7 @@ func (sinfo *searchInfo) checkDHTRes(res *dhtRes) bool { } // FIXME (!) replay attacks could mess with coords? Give it a handle (tstamp)? sess.coords = res.Coords - sinfo.core.sessions.ping(sess) + sess.ping(&sinfo.core.router) sinfo.callback(sess, nil) // Cleanup delete(sinfo.core.searches.searches, res.Dest) diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index 7cd92db..f28e014 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -14,6 +14,8 @@ import ( "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util" + + "github.com/Arceliar/phony" ) // Duration that we keep track of old nonces per session, to allow some out-of-order packet delivery @@ -37,7 +39,7 @@ func (h nonceHeap) peek() *crypto.BoxNonce { return &h[len(h)-1] } // All the information we know about an active session. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { - mutex sync.Mutex // Protects all of the below, use it any time you read/chance the contents of a session + phony.Actor // Protects all of the below, use it any time you read/change the contents of a session core *Core // reconfigure chan chan error // theirAddr address.Address // @@ -46,6 +48,7 @@ type sessionInfo struct { theirSesPub crypto.BoxPubKey // mySesPub crypto.BoxPubKey // mySesPriv crypto.BoxPrivKey // + sharedPermKey crypto.BoxSharedKey // used for session pings sharedSesKey crypto.BoxSharedKey // derived from session keys theirHandle crypto.Handle // myHandle crypto.Handle // @@ -73,10 +76,9 @@ type sessionInfo struct { send chan FlowKeyMessage // Packets with optional flow key go here, to be encrypted and sent } +// TODO remove this, call SyncExec directly func (sinfo *sessionInfo) doFunc(f func()) { - sinfo.mutex.Lock() - defer sinfo.mutex.Unlock() - f() + <-sinfo.SyncExec(f) } // Represents a session ping/pong packet, andincludes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. @@ -92,7 +94,7 @@ type sessionPing struct { // Updates session info in response to a ping, after checking that the ping is OK. // Returns true if the session was updated, or false otherwise. -func (s *sessionInfo) update(p *sessionPing) bool { +func (s *sessionInfo) _update(p *sessionPing) bool { if !(p.Tstamp > s.tstamp) { // To protect against replay attacks return false @@ -214,6 +216,7 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.core = ss.core sinfo.reconfigure = make(chan chan error, 1) sinfo.theirPermPub = *theirPermKey + sinfo.sharedPermKey = *ss.getSharedKey(&ss.core.boxPriv, &sinfo.theirPermPub) pub, priv := crypto.NewBoxKeys() sinfo.mySesPub = *pub sinfo.mySesPriv = *priv @@ -257,7 +260,9 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { go func() { // Run cleanup when the session is canceled <-sinfo.cancel.Finished() - sinfo.core.router.doAdmin(sinfo.close) + sinfo.core.router.doAdmin(func() { + sinfo.core.sessions.removeSession(&sinfo) + }) }() go sinfo.startWorkers() return &sinfo @@ -292,7 +297,7 @@ func (ss *sessions) cleanup() { } // Closes a session, removing it from sessions maps. -func (sinfo *sessionInfo) close() { +func (ss *sessions) removeSession(sinfo *sessionInfo) { if s := sinfo.core.sessions.sinfos[sinfo.myHandle]; s == sinfo { delete(sinfo.core.sessions.sinfos, sinfo.myHandle) delete(sinfo.core.sessions.byTheirPerm, sinfo.theirPermPub) @@ -300,11 +305,11 @@ func (sinfo *sessionInfo) close() { } // Returns a session ping appropriate for the given session info. -func (ss *sessions) getPing(sinfo *sessionInfo) sessionPing { - loc := ss.core.switchTable.getLocator() +func (sinfo *sessionInfo) _getPing() sessionPing { + loc := sinfo.core.switchTable.getLocator() coords := loc.getCoords() - ref := sessionPing{ - SendPermPub: ss.core.boxPub, + ping := sessionPing{ + SendPermPub: sinfo.core.boxPub, Handle: sinfo.myHandle, SendSesPub: sinfo.mySesPub, Tstamp: time.Now().Unix(), @@ -312,7 +317,7 @@ func (ss *sessions) getPing(sinfo *sessionInfo) sessionPing { MTU: sinfo.myMTU, } sinfo.myNonce.Increment() - return ref + return ping } // Gets the shared key for a pair of box keys. @@ -339,27 +344,29 @@ func (ss *sessions) getSharedKey(myPriv *crypto.BoxPrivKey, } // Sends a session ping by calling sendPingPong in ping mode. -func (ss *sessions) ping(sinfo *sessionInfo) { - ss.sendPingPong(sinfo, false) +func (sinfo *sessionInfo) ping(from phony.IActor) { + sinfo.EnqueueFrom(from, func() { + sinfo._sendPingPong(false) + }) } // Calls getPing, sets the appropriate ping/pong flag, encodes to wire format, and send it. // Updates the time the last ping was sent in the session info. -func (ss *sessions) sendPingPong(sinfo *sessionInfo, isPong bool) { - ping := ss.getPing(sinfo) +func (sinfo *sessionInfo) _sendPingPong(isPong bool) { + ping := sinfo._getPing() ping.IsPong = isPong bs := ping.encode() - shared := ss.getSharedKey(&ss.core.boxPriv, &sinfo.theirPermPub) - payload, nonce := crypto.BoxSeal(shared, bs, nil) + payload, nonce := crypto.BoxSeal(&sinfo.sharedPermKey, bs, nil) p := wire_protoTrafficPacket{ Coords: sinfo.coords, ToKey: sinfo.theirPermPub, - FromKey: ss.core.boxPub, + FromKey: sinfo.core.boxPub, Nonce: *nonce, Payload: payload, } packet := p.encode() - ss.core.router.out(packet) + // TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first + sinfo.core.router.EnqueueFrom(sinfo, func() { sinfo.core.router.out(packet) }) if sinfo.pingTime.Before(sinfo.time) { sinfo.pingTime = time.Now() } @@ -371,9 +378,9 @@ func (ss *sessions) handlePing(ping *sessionPing) { // Get the corresponding session (or create a new session) sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub) switch { + case ping.IsPong: // This is a response, not an initial ping, so ignore it. case isIn: // Session already exists case !ss.isSessionAllowed(&ping.SendPermPub, false): // Session is not allowed - case ping.IsPong: // This is a response, not an initial ping, so ignore it. default: ss.listenerMutex.Lock() if ss.listener != nil { @@ -393,13 +400,13 @@ func (ss *sessions) handlePing(ping *sessionPing) { ss.listenerMutex.Unlock() } if sinfo != nil { - sinfo.doFunc(func() { + sinfo.EnqueueFrom(&ss.core.router, func() { // Update the session - if !sinfo.update(ping) { /*panic("Should not happen in testing")*/ + if !sinfo._update(ping) { /*panic("Should not happen in testing")*/ return } if !ping.IsPong { - ss.sendPingPong(sinfo, true) + sinfo._sendPingPong(true) } }) } @@ -408,7 +415,7 @@ func (ss *sessions) handlePing(ping *sessionPing) { // Get the MTU of the session. // Will be equal to the smaller of this node's MTU or the remote node's MTU. // If sending over links with a maximum message size (this was a thing with the old UDP code), it could be further lowered, to a minimum of 1280. -func (sinfo *sessionInfo) getMTU() uint16 { +func (sinfo *sessionInfo) _getMTU() uint16 { if sinfo.theirMTU == 0 || sinfo.myMTU == 0 { return 0 } @@ -419,7 +426,7 @@ func (sinfo *sessionInfo) getMTU() uint16 { } // Checks if a packet's nonce is recent enough to fall within the window of allowed packets, and not already received. -func (sinfo *sessionInfo) nonceIsOK(theirNonce *crypto.BoxNonce) bool { +func (sinfo *sessionInfo) _nonceIsOK(theirNonce *crypto.BoxNonce) bool { // The bitmask is to allow for some non-duplicate out-of-order packets if theirNonce.Minus(&sinfo.theirNonce) > 0 { // This is newer than the newest nonce we've seen @@ -437,7 +444,7 @@ func (sinfo *sessionInfo) nonceIsOK(theirNonce *crypto.BoxNonce) bool { } // Updates the nonce mask by (possibly) shifting the bitmask and setting the bit corresponding to this nonce to 1, and then updating the most recent nonce -func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) { +func (sinfo *sessionInfo) _updateNonce(theirNonce *crypto.BoxNonce) { // Start with some cleanup for len(sinfo.theirNonceHeap) > 64 { if time.Since(sinfo.theirNonceMap[*sinfo.theirNonceHeap.peek()]) < nonceWindow { @@ -459,9 +466,9 @@ func (sinfo *sessionInfo) updateNonce(theirNonce *crypto.BoxNonce) { // Resets all sessions to an uninitialized state. // Called after coord changes, so attemtps to use a session will trigger a new ping and notify the remote end of the coord change. -func (ss *sessions) reset() { +func (ss *sessions) reset(from phony.IActor) { for _, sinfo := range ss.sinfos { - sinfo.doFunc(func() { + sinfo.EnqueueFrom(from, func() { sinfo.reset = true }) } @@ -492,7 +499,7 @@ func (sinfo *sessionInfo) recvWorker() { var err error var k crypto.BoxSharedKey sessionFunc := func() { - if !sinfo.nonceIsOK(&p.Nonce) { + if !sinfo._nonceIsOK(&p.Nonce) { err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0} return } @@ -514,12 +521,12 @@ func (sinfo *sessionInfo) recvWorker() { return } sessionFunc = func() { - if k != sinfo.sharedSesKey || !sinfo.nonceIsOK(&p.Nonce) { + if k != sinfo.sharedSesKey || !sinfo._nonceIsOK(&p.Nonce) { // The session updated in the mean time, so return an error err = ConnError{errors.New("session updated during crypto operation"), false, true, false, 0} return } - sinfo.updateNonce(&p.Nonce) + sinfo._updateNonce(&p.Nonce) sinfo.time = time.Now() sinfo.bytesRecvd += uint64(len(bs)) }