diff --git a/src/tuntap/conn.go b/src/tuntap/conn.go index ab3179b..61cdb2b 100644 --- a/src/tuntap/conn.go +++ b/src/tuntap/conn.go @@ -96,8 +96,11 @@ func (s *tunConn) writer() error { if !ok { return errors.New("send closed") } - // TODO write timeout and close - if err := s.conn.WriteNoCopy(bs); err != nil { + msg := yggdrasil.FlowKeyMessage{ + FlowKey: util.GetFlowKey(bs), + Message: bs, + } + if err := s.conn.WriteNoCopy(msg); err != nil { if e, eok := err.(yggdrasil.ConnError); !eok { if e.Closed() { s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err) diff --git a/src/util/util.go b/src/util/util.go index 1158156..a588a35 100644 --- a/src/util/util.go +++ b/src/util/util.go @@ -106,3 +106,41 @@ func DecodeCoordString(in string) (out []uint64) { } return out } + +// GetFlowLabel takes an IP packet as an argument and returns some information about the traffic flow. +// For IPv4 packets, this is derived from the source and destination protocol and port numbers. +// For IPv6 packets, this is derived from the FlowLabel field of the packet if this was set, otherwise it's handled like IPv4. +// The FlowKey is then used internally by Yggdrasil for congestion control. +func GetFlowKey(bs []byte) uint64 { + // Work out the flowkey - this is used to determine which switch queue + // traffic will be pushed to in the event of congestion + var flowkey uint64 + // Get the IP protocol version from the packet + switch bs[0] & 0xf0 { + case 0x40: // IPv4 packet + // Check the packet meets minimum UDP packet length + if len(bs) >= 24 { + // Is the protocol TCP, UDP or SCTP? + if bs[9] == 0x06 || bs[9] == 0x11 || bs[9] == 0x84 { + ihl := bs[0] & 0x0f * 4 // Header length + flowkey = uint64(bs[9])<<32 /* proto */ | + uint64(bs[ihl+0])<<24 | uint64(bs[ihl+1])<<16 /* sport */ | + uint64(bs[ihl+2])<<8 | uint64(bs[ihl+3]) /* dport */ + } + } + case 0x60: // IPv6 packet + // Check if the flowlabel was specified in the packet header + flowkey = uint64(bs[1]&0x0f)<<16 | uint64(bs[2])<<8 | uint64(bs[3]) + // If the flowlabel isn't present, make protokey from proto | sport | dport + // if the packet meets minimum UDP packet length + if flowkey == 0 && len(bs) >= 48 { + // Is the protocol TCP, UDP or SCTP? + if bs[6] == 0x06 || bs[6] == 0x11 || bs[6] == 0x84 { + flowkey = uint64(bs[6])<<32 /* proto */ | + uint64(bs[40])<<24 | uint64(bs[41])<<16 /* sport */ | + uint64(bs[42])<<8 | uint64(bs[43]) /* dport */ + } + } + } + return flowkey +} diff --git a/src/yggdrasil/conn.go b/src/yggdrasil/conn.go index 2452a3d..20db931 100644 --- a/src/yggdrasil/conn.go +++ b/src/yggdrasil/conn.go @@ -183,11 +183,11 @@ func (c *Conn) Read(b []byte) (int, error) { } // Used internally by Write, the caller must not reuse the argument bytes when no error occurs -func (c *Conn) WriteNoCopy(bs []byte) error { +func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error { var err error sessionFunc := func() { // Does the packet exceed the permitted size for the session? - if uint16(len(bs)) > c.session.getMTU() { + if uint16(len(msg.Message)) > c.session.getMTU() { err = ConnError{errors.New("packet too big"), true, false, false, int(c.session.getMTU())} return } @@ -216,7 +216,7 @@ func (c *Conn) WriteNoCopy(bs []byte) error { } else { err = ConnError{errors.New("session closed"), false, false, true, 0} } - case c.session.send <- bs: + case c.session.send <- msg: } } return err @@ -225,10 +225,10 @@ func (c *Conn) WriteNoCopy(bs []byte) error { // Implements net.Conn.Write func (c *Conn) Write(b []byte) (int, error) { written := len(b) - bs := append(util.GetBytes(), b...) - err := c.WriteNoCopy(bs) + msg := FlowKeyMessage{Message: append(util.GetBytes(), b...)} + err := c.WriteNoCopy(msg) if err != nil { - util.PutBytes(bs) + util.PutBytes(msg.Message) written = 0 } return written, err diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index a11f6ae..7e2a325 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -166,7 +166,7 @@ func (r *router) handleTraffic(packet []byte) { return } select { - case sinfo.fromRouter <- &p: + case sinfo.fromRouter <- p: case <-sinfo.cancel.Finished(): util.PutBytes(p.Payload) } diff --git a/src/yggdrasil/session.go b/src/yggdrasil/session.go index f9c38fa..517947e 100644 --- a/src/yggdrasil/session.go +++ b/src/yggdrasil/session.go @@ -18,39 +18,39 @@ import ( // All the information we know about an active session. // This includes coords, permanent and ephemeral keys, handles and nonces, various sorts of timing information for timeout and maintenance, and some metadata for the admin API. type sessionInfo struct { - mutex sync.Mutex // Protects all of the below, use it any time you read/chance the contents of a session - core *Core // - reconfigure chan chan error // - theirAddr address.Address // - theirSubnet address.Subnet // - theirPermPub crypto.BoxPubKey // - theirSesPub crypto.BoxPubKey // - mySesPub crypto.BoxPubKey // - mySesPriv crypto.BoxPrivKey // - sharedSesKey crypto.BoxSharedKey // derived from session keys - theirHandle crypto.Handle // - myHandle crypto.Handle // - theirNonce crypto.BoxNonce // - theirNonceMask uint64 // - myNonce crypto.BoxNonce // - theirMTU uint16 // - myMTU uint16 // - wasMTUFixed bool // Was the MTU fixed by a receive error? - timeOpened time.Time // Time the sessino was opened - time time.Time // Time we last received a packet - mtuTime time.Time // time myMTU was last changed - pingTime time.Time // time the first ping was sent since the last received packet - pingSend time.Time // time the last ping was sent - coords []byte // coords of destination - reset bool // reset if coords change - tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation - bytesSent uint64 // Bytes of real traffic sent in this session - bytesRecvd uint64 // Bytes of real traffic received in this session - fromRouter chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn - init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use - cancel util.Cancellation // Used to terminate workers - recv chan []byte - send chan []byte + mutex sync.Mutex // Protects all of the below, use it any time you read/chance the contents of a session + core *Core // + reconfigure chan chan error // + theirAddr address.Address // + theirSubnet address.Subnet // + theirPermPub crypto.BoxPubKey // + theirSesPub crypto.BoxPubKey // + mySesPub crypto.BoxPubKey // + mySesPriv crypto.BoxPrivKey // + sharedSesKey crypto.BoxSharedKey // derived from session keys + theirHandle crypto.Handle // + myHandle crypto.Handle // + theirNonce crypto.BoxNonce // + theirNonceMask uint64 // + myNonce crypto.BoxNonce // + theirMTU uint16 // + myMTU uint16 // + wasMTUFixed bool // Was the MTU fixed by a receive error? + timeOpened time.Time // Time the sessino was opened + time time.Time // Time we last received a packet + mtuTime time.Time // time myMTU was last changed + pingTime time.Time // time the first ping was sent since the last received packet + pingSend time.Time // time the last ping was sent + coords []byte // coords of destination + reset bool // reset if coords change + tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation + bytesSent uint64 // Bytes of real traffic sent in this session + bytesRecvd uint64 // Bytes of real traffic received in this session + init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use + cancel util.Cancellation // Used to terminate workers + fromRouter chan wire_trafficPacket // Received packets go here, to be decrypted by the session + recv chan []byte // Decrypted packets go here, picked up by the associated Conn + send chan FlowKeyMessage // Packets with optional flow key go here, to be encrypted and sent } func (sinfo *sessionInfo) doFunc(f func()) { @@ -228,9 +228,9 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo { sinfo.myHandle = *crypto.NewHandle() sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) - sinfo.fromRouter = make(chan *wire_trafficPacket, 1) + sinfo.fromRouter = make(chan wire_trafficPacket, 1) sinfo.recv = make(chan []byte, 32) - sinfo.send = make(chan []byte, 32) + sinfo.send = make(chan FlowKeyMessage, 32) ss.sinfos[sinfo.myHandle] = &sinfo ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle go func() { @@ -442,13 +442,18 @@ func (sinfo *sessionInfo) startWorkers() { go sinfo.sendWorker() } +type FlowKeyMessage struct { + FlowKey uint64 + Message []byte +} + func (sinfo *sessionInfo) recvWorker() { // TODO move theirNonce etc into a struct that gets stored here, passed in over a channel // Since there's no reason for anywhere else in the session code to need to *read* it... // Only needs to be updated from the outside if a ping resets it... // That would get rid of the need to take a mutex for the sessionFunc var callbacks []chan func() - doRecv := func(p *wire_trafficPacket) { + doRecv := func(p wire_trafficPacket) { var bs []byte var err error var k crypto.BoxSharedKey @@ -524,16 +529,22 @@ func (sinfo *sessionInfo) sendWorker() { // TODO move info that this worker needs here, send updates via a channel // Otherwise we need to take a mutex to avoid races with update() var callbacks []chan func() - doSend := func(bs []byte) { + doSend := func(msg FlowKeyMessage) { var p wire_trafficPacket var k crypto.BoxSharedKey sessionFunc := func() { - sinfo.bytesSent += uint64(len(bs)) + sinfo.bytesSent += uint64(len(msg.Message)) p = wire_trafficPacket{ Coords: append([]byte(nil), sinfo.coords...), Handle: sinfo.theirHandle, Nonce: sinfo.myNonce, } + if msg.FlowKey != 0 { + // Helps ensure that traffic from this flow ends up in a separate queue from other flows + // The zero padding relies on the fact that the self-peer is always on port 0 + p.Coords = append(p.Coords, 0) + p.Coords = wire_put_uint64(msg.FlowKey, p.Coords) + } sinfo.myNonce.Increment() k = sinfo.sharedSesKey } @@ -542,12 +553,13 @@ func (sinfo *sessionInfo) sendWorker() { ch := make(chan func(), 1) poolFunc := func() { // Encrypt the packet - p.Payload, _ = crypto.BoxSeal(&k, bs, &p.Nonce) - packet := p.encode() + p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce) // The callback will send the packet callback := func() { + // Encoding may block on a util.GetBytes(), so kept out of the worker pool + packet := p.encode() // Cleanup - util.PutBytes(bs) + util.PutBytes(msg.Message) util.PutBytes(p.Payload) // Send the packet sinfo.core.router.out(packet) @@ -566,8 +578,8 @@ func (sinfo *sessionInfo) sendWorker() { f() case <-sinfo.cancel.Finished(): return - case bs := <-sinfo.send: - doSend(bs) + case msg := <-sinfo.send: + doSend(msg) } } select {