diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index d1cb760..e47ea2f 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -137,7 +137,6 @@ func (c *Conn) Read(b []byte) (int, error) { sinfo := c.session cancel := c.getDeadlineCancellation(&c.readDeadline) defer cancel.Cancel(nil) - var bs []byte for { // Wait for some traffic to come through from the session select { @@ -147,54 +146,25 @@ func (c *Conn) Read(b []byte) (int, error) { } else { return 0, ConnError{errors.New("session closed"), false, false, true, 0} } - case p, ok := <-sinfo.recv: - // If the session is closed then do nothing - if !ok { - return 0, ConnError{errors.New("session closed"), false, false, true, 0} - } + case bs := <-sinfo.recv: var err error - sessionFunc := func() { - defer util.PutBytes(p.Payload) - // If the nonce is bad then drop the packet and return an error - if !sinfo.nonceIsOK(&p.Nonce) { - err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0} - return - } - // Decrypt the packet - var isOK bool - bs, isOK = crypto.BoxOpen(&sinfo.sharedSesKey, p.Payload, &p.Nonce) - // Check if we were unable to decrypt the packet for some reason and - // return an error if we couldn't - if !isOK { - err = ConnError{errors.New("packet dropped due to decryption failure"), false, true, false, 0} - return - } - // Update the session - sinfo.updateNonce(&p.Nonce) - sinfo.time = time.Now() - sinfo.bytesRecvd += uint64(len(bs)) - } - sinfo.doFunc(sessionFunc) - // Something went wrong in the session worker so abort - if err != nil { - if ce, ok := err.(*ConnError); ok && ce.Temporary() { - continue - } - return 0, err + n := len(bs) + if len(bs) > len(b) { + n = len(b) + err = ConnError{errors.New("read buffer too small for entire packet"), false, true, false, 0} } // Copy results to the output slice and clean up copy(b, bs) util.PutBytes(bs) // If we've reached this point then everything went to plan, return the // number of bytes we populated back into the given slice - return len(bs), nil + return n, err } } } func (c *Conn) Write(b []byte) (bytesWritten int, err error) { sinfo := c.session - var packet []byte written := len(b) sessionFunc := func() { // Does the packet exceed the permitted size for the session? @@ -202,18 +172,6 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { written, err = 0, ConnError{errors.New("packet too big"), true, false, false, int(sinfo.getMTU())} return } - // Encrypt the packet - payload, nonce := crypto.BoxSeal(&sinfo.sharedSesKey, b, &sinfo.myNonce) - defer util.PutBytes(payload) - // Construct the wire packet to send to the router - p := wire_trafficPacket{ - Coords: sinfo.coords, - Handle: sinfo.theirHandle, - Nonce: *nonce, - Payload: payload, - } - packet = p.encode() - sinfo.bytesSent += uint64(len(b)) // The rest of this work is session keep-alive traffic doSearch := func() { routerWork := func() { @@ -244,11 +202,10 @@ func (c *Conn) Write(b []byte) (bytesWritten int, err error) { } } sinfo.doFunc(sessionFunc) - // Give the packet to the router if written > 0 { - sinfo.core.router.out(packet) + bs := append(util.GetBytes(), b...) + sinfo.send <- bs } - // Finally return the number of bytes we wrote return written, err } diff --git a/src/yggdrasil/dialer.go b/src/yggdrasil/dialer.go index 6b24cfb..6ce2e8a 100644 --- a/src/yggdrasil/dialer.go +++ b/src/yggdrasil/dialer.go @@ -69,6 +69,7 @@ func (d *Dialer) DialByNodeIDandMask(nodeID, nodeMask *crypto.NodeID) (*Conn, er defer t.Stop() select { case <-conn.session.init: + conn.session.startWorkers(conn.cancel) return conn, nil case <-t.C: conn.Close() diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index c5e1dde..7701286 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -166,7 +166,7 @@ func (r *router) handleTraffic(packet []byte) { return } select { - case sinfo.recv <- &p: // FIXME ideally this should be front drop + case sinfo.fromRouter <- &p: // FIXME ideally this should be front drop default: util.PutBytes(p.Payload) } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index eca3bb0..0552ec1 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -6,11 +6,13 @@ package yggdrasil import ( "bytes" + "errors" "sync" "time" "github.com/yggdrasil-network/yggdrasil-go/src/address" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "github.com/yggdrasil-network/yggdrasil-go/src/util" ) // All the information we know about an active session. @@ -44,8 +46,11 @@ type sessionInfo struct { tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation bytesSent uint64 // Bytes of real traffic sent in this session bytesRecvd uint64 // Bytes of real traffic received in this session - recv chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn + fromRouter chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use + cancel util.Cancellation // Used to terminate workers + recv chan []byte + send chan []byte } func (sinfo *sessionInfo) doFunc(f func()) { @@ -222,7 +227,9 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.myHandle = *crypto.NewHandle() sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) - sinfo.recv = make(chan *wire_trafficPacket, 32) + sinfo.fromRouter = make(chan *wire_trafficPacket, 32) + sinfo.recv = make(chan []byte, 32) + sinfo.send = make(chan []byte, 32) ss.sinfos[sinfo.myHandle] = &sinfo ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle return &sinfo @@ -355,6 +362,7 @@ func (ss *sessions) handlePing(ping *sessionPing) { for i := range conn.nodeMask { conn.nodeMask[i] = 0xFF } + conn.session.startWorkers(conn.cancel) ss.listener.conn <- conn } ss.listenerMutex.Unlock() @@ -418,3 +426,89 @@ func (ss *sessions) reset() { }) } } + +//////////////////////////////////////////////////////////////////////////////// +//////////////////////////// Worker Functions Below //////////////////////////// +//////////////////////////////////////////////////////////////////////////////// + +func (sinfo *sessionInfo) startWorkers(cancel util.Cancellation) { + sinfo.cancel = cancel + go sinfo.recvWorker() + go sinfo.sendWorker() +} + +func (sinfo *sessionInfo) recvWorker() { + // TODO move theirNonce etc into a struct that gets stored here, passed in over a channel + // Since there's no reason for anywhere else in the session code to need to *read* it... + // Only needs to be updated from the outside if a ping resets it... + // That would get rid of the need to take a mutex for the sessionFunc + for { + select { + case <-sinfo.cancel.Finished(): + return + case p := <-sinfo.fromRouter: + var bs []byte + var err error + sessionFunc := func() { + defer util.PutBytes(p.Payload) + // If the nonce is bad then drop the packet and return an error + if !sinfo.nonceIsOK(&p.Nonce) { + err = ConnError{errors.New("packet dropped due to invalid nonce"), false, true, false, 0} + return + } + // Decrypt the packet + var isOK bool + bs, isOK = crypto.BoxOpen(&sinfo.sharedSesKey, p.Payload, &p.Nonce) + // Check if we were unable to decrypt the packet for some reason and + // return an error if we couldn't + if !isOK { + err = ConnError{errors.New("packet dropped due to decryption failure"), false, true, false, 0} + return + } + // Update the session + sinfo.updateNonce(&p.Nonce) + sinfo.time = time.Now() + sinfo.bytesRecvd += uint64(len(bs)) + } + sinfo.doFunc(sessionFunc) + if len(bs) > 0 { + if err != nil { + // Bad packet, drop it + util.PutBytes(bs) + } else { + // Pass the packet to the buffer for Conn.Read + sinfo.recv <- bs + } + } + } + } +} + +func (sinfo *sessionInfo) sendWorker() { + // TODO move info that this worker needs here, send updates via a channel + // Otherwise we need to take a mutex to avoid races with update() + for { + select { + case <-sinfo.cancel.Finished(): + return + case bs := <-sinfo.send: + // TODO + var packet []byte + sessionFunc := func() { + sinfo.bytesSent += uint64(len(bs)) + payload, nonce := crypto.BoxSeal(&sinfo.sharedSesKey, bs, &sinfo.myNonce) + defer util.PutBytes(payload) + // Construct the wire packet to send to the router + p := wire_trafficPacket{ + Coords: sinfo.coords, + Handle: sinfo.theirHandle, + Nonce: *nonce, + Payload: payload, + } + packet = p.encode() + } + sinfo.doFunc(sessionFunc) + sinfo.core.router.out(packet) + } + } +}