5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-29 21:21:35 +00:00

Merge pull request #491 from Arceliar/flowkey

Fix the old flowkey stuff so congestion control actually works...
This commit is contained in:
Neil Alexander 2019-08-07 10:33:17 +01:00 committed by GitHub
commit c99ed9fb60
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 105 additions and 52 deletions

View File

@ -96,8 +96,11 @@ func (s *tunConn) writer() error {
if !ok { if !ok {
return errors.New("send closed") return errors.New("send closed")
} }
// TODO write timeout and close msg := yggdrasil.FlowKeyMessage{
if err := s.conn.WriteNoCopy(bs); err != nil { FlowKey: util.GetFlowKey(bs),
Message: bs,
}
if err := s.conn.WriteNoCopy(msg); err != nil {
if e, eok := err.(yggdrasil.ConnError); !eok { if e, eok := err.(yggdrasil.ConnError); !eok {
if e.Closed() { if e.Closed() {
s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err) s.tun.log.Debugln(s.conn.String(), "TUN/TAP generic write debug:", err)

View File

@ -106,3 +106,41 @@ func DecodeCoordString(in string) (out []uint64) {
} }
return out return out
} }
// GetFlowLabel takes an IP packet as an argument and returns some information about the traffic flow.
// For IPv4 packets, this is derived from the source and destination protocol and port numbers.
// For IPv6 packets, this is derived from the FlowLabel field of the packet if this was set, otherwise it's handled like IPv4.
// The FlowKey is then used internally by Yggdrasil for congestion control.
func GetFlowKey(bs []byte) uint64 {
// Work out the flowkey - this is used to determine which switch queue
// traffic will be pushed to in the event of congestion
var flowkey uint64
// Get the IP protocol version from the packet
switch bs[0] & 0xf0 {
case 0x40: // IPv4 packet
// Check the packet meets minimum UDP packet length
if len(bs) >= 24 {
// Is the protocol TCP, UDP or SCTP?
if bs[9] == 0x06 || bs[9] == 0x11 || bs[9] == 0x84 {
ihl := bs[0] & 0x0f * 4 // Header length
flowkey = uint64(bs[9])<<32 /* proto */ |
uint64(bs[ihl+0])<<24 | uint64(bs[ihl+1])<<16 /* sport */ |
uint64(bs[ihl+2])<<8 | uint64(bs[ihl+3]) /* dport */
}
}
case 0x60: // IPv6 packet
// Check if the flowlabel was specified in the packet header
flowkey = uint64(bs[1]&0x0f)<<16 | uint64(bs[2])<<8 | uint64(bs[3])
// If the flowlabel isn't present, make protokey from proto | sport | dport
// if the packet meets minimum UDP packet length
if flowkey == 0 && len(bs) >= 48 {
// Is the protocol TCP, UDP or SCTP?
if bs[6] == 0x06 || bs[6] == 0x11 || bs[6] == 0x84 {
flowkey = uint64(bs[6])<<32 /* proto */ |
uint64(bs[40])<<24 | uint64(bs[41])<<16 /* sport */ |
uint64(bs[42])<<8 | uint64(bs[43]) /* dport */
}
}
}
return flowkey
}

View File

@ -183,11 +183,11 @@ func (c *Conn) Read(b []byte) (int, error) {
} }
// Used internally by Write, the caller must not reuse the argument bytes when no error occurs // Used internally by Write, the caller must not reuse the argument bytes when no error occurs
func (c *Conn) WriteNoCopy(bs []byte) error { func (c *Conn) WriteNoCopy(msg FlowKeyMessage) error {
var err error var err error
sessionFunc := func() { sessionFunc := func() {
// Does the packet exceed the permitted size for the session? // Does the packet exceed the permitted size for the session?
if uint16(len(bs)) > c.session.getMTU() { if uint16(len(msg.Message)) > c.session.getMTU() {
err = ConnError{errors.New("packet too big"), true, false, false, int(c.session.getMTU())} err = ConnError{errors.New("packet too big"), true, false, false, int(c.session.getMTU())}
return return
} }
@ -216,7 +216,7 @@ func (c *Conn) WriteNoCopy(bs []byte) error {
} else { } else {
err = ConnError{errors.New("session closed"), false, false, true, 0} err = ConnError{errors.New("session closed"), false, false, true, 0}
} }
case c.session.send <- bs: case c.session.send <- msg:
} }
} }
return err return err
@ -225,10 +225,10 @@ func (c *Conn) WriteNoCopy(bs []byte) error {
// Implements net.Conn.Write // Implements net.Conn.Write
func (c *Conn) Write(b []byte) (int, error) { func (c *Conn) Write(b []byte) (int, error) {
written := len(b) written := len(b)
bs := append(util.GetBytes(), b...) msg := FlowKeyMessage{Message: append(util.GetBytes(), b...)}
err := c.WriteNoCopy(bs) err := c.WriteNoCopy(msg)
if err != nil { if err != nil {
util.PutBytes(bs) util.PutBytes(msg.Message)
written = 0 written = 0
} }
return written, err return written, err

View File

@ -166,7 +166,7 @@ func (r *router) handleTraffic(packet []byte) {
return return
} }
select { select {
case sinfo.fromRouter <- &p: case sinfo.fromRouter <- p:
case <-sinfo.cancel.Finished(): case <-sinfo.cancel.Finished():
util.PutBytes(p.Payload) util.PutBytes(p.Payload)
} }

View File

@ -46,11 +46,11 @@ type sessionInfo struct {
tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation tstamp int64 // ATOMIC - tstamp from their last session ping, replay attack mitigation
bytesSent uint64 // Bytes of real traffic sent in this session bytesSent uint64 // Bytes of real traffic sent in this session
bytesRecvd uint64 // Bytes of real traffic received in this session bytesRecvd uint64 // Bytes of real traffic received in this session
fromRouter chan *wire_trafficPacket // Received packets go here, picked up by the associated Conn
init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use init chan struct{} // Closed when the first session pong arrives, used to signal that the session is ready for initial use
cancel util.Cancellation // Used to terminate workers cancel util.Cancellation // Used to terminate workers
recv chan []byte fromRouter chan wire_trafficPacket // Received packets go here, to be decrypted by the session
send chan []byte recv chan []byte // Decrypted packets go here, picked up by the associated Conn
send chan FlowKeyMessage // Packets with optional flow key go here, to be encrypted and sent
} }
func (sinfo *sessionInfo) doFunc(f func()) { func (sinfo *sessionInfo) doFunc(f func()) {
@ -228,9 +228,9 @@ func (ss *sessions) createSession(theirPermKey *crypto.BoxPubKey) *sessionInfo {
sinfo.myHandle = *crypto.NewHandle() sinfo.myHandle = *crypto.NewHandle()
sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirAddr = *address.AddrForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub)) sinfo.theirSubnet = *address.SubnetForNodeID(crypto.GetNodeID(&sinfo.theirPermPub))
sinfo.fromRouter = make(chan *wire_trafficPacket, 1) sinfo.fromRouter = make(chan wire_trafficPacket, 1)
sinfo.recv = make(chan []byte, 32) sinfo.recv = make(chan []byte, 32)
sinfo.send = make(chan []byte, 32) sinfo.send = make(chan FlowKeyMessage, 32)
ss.sinfos[sinfo.myHandle] = &sinfo ss.sinfos[sinfo.myHandle] = &sinfo
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
go func() { go func() {
@ -442,13 +442,18 @@ func (sinfo *sessionInfo) startWorkers() {
go sinfo.sendWorker() go sinfo.sendWorker()
} }
type FlowKeyMessage struct {
FlowKey uint64
Message []byte
}
func (sinfo *sessionInfo) recvWorker() { func (sinfo *sessionInfo) recvWorker() {
// TODO move theirNonce etc into a struct that gets stored here, passed in over a channel // TODO move theirNonce etc into a struct that gets stored here, passed in over a channel
// Since there's no reason for anywhere else in the session code to need to *read* it... // Since there's no reason for anywhere else in the session code to need to *read* it...
// Only needs to be updated from the outside if a ping resets it... // Only needs to be updated from the outside if a ping resets it...
// That would get rid of the need to take a mutex for the sessionFunc // That would get rid of the need to take a mutex for the sessionFunc
var callbacks []chan func() var callbacks []chan func()
doRecv := func(p *wire_trafficPacket) { doRecv := func(p wire_trafficPacket) {
var bs []byte var bs []byte
var err error var err error
var k crypto.BoxSharedKey var k crypto.BoxSharedKey
@ -524,16 +529,22 @@ func (sinfo *sessionInfo) sendWorker() {
// TODO move info that this worker needs here, send updates via a channel // TODO move info that this worker needs here, send updates via a channel
// Otherwise we need to take a mutex to avoid races with update() // Otherwise we need to take a mutex to avoid races with update()
var callbacks []chan func() var callbacks []chan func()
doSend := func(bs []byte) { doSend := func(msg FlowKeyMessage) {
var p wire_trafficPacket var p wire_trafficPacket
var k crypto.BoxSharedKey var k crypto.BoxSharedKey
sessionFunc := func() { sessionFunc := func() {
sinfo.bytesSent += uint64(len(bs)) sinfo.bytesSent += uint64(len(msg.Message))
p = wire_trafficPacket{ p = wire_trafficPacket{
Coords: append([]byte(nil), sinfo.coords...), Coords: append([]byte(nil), sinfo.coords...),
Handle: sinfo.theirHandle, Handle: sinfo.theirHandle,
Nonce: sinfo.myNonce, Nonce: sinfo.myNonce,
} }
if msg.FlowKey != 0 {
// Helps ensure that traffic from this flow ends up in a separate queue from other flows
// The zero padding relies on the fact that the self-peer is always on port 0
p.Coords = append(p.Coords, 0)
p.Coords = wire_put_uint64(msg.FlowKey, p.Coords)
}
sinfo.myNonce.Increment() sinfo.myNonce.Increment()
k = sinfo.sharedSesKey k = sinfo.sharedSesKey
} }
@ -542,12 +553,13 @@ func (sinfo *sessionInfo) sendWorker() {
ch := make(chan func(), 1) ch := make(chan func(), 1)
poolFunc := func() { poolFunc := func() {
// Encrypt the packet // Encrypt the packet
p.Payload, _ = crypto.BoxSeal(&k, bs, &p.Nonce) p.Payload, _ = crypto.BoxSeal(&k, msg.Message, &p.Nonce)
packet := p.encode()
// The callback will send the packet // The callback will send the packet
callback := func() { callback := func() {
// Encoding may block on a util.GetBytes(), so kept out of the worker pool
packet := p.encode()
// Cleanup // Cleanup
util.PutBytes(bs) util.PutBytes(msg.Message)
util.PutBytes(p.Payload) util.PutBytes(p.Payload)
// Send the packet // Send the packet
sinfo.core.router.out(packet) sinfo.core.router.out(packet)
@ -566,8 +578,8 @@ func (sinfo *sessionInfo) sendWorker() {
f() f()
case <-sinfo.cancel.Finished(): case <-sinfo.cancel.Finished():
return return
case bs := <-sinfo.send: case msg := <-sinfo.send:
doSend(bs) doSend(msg)
} }
} }
select { select {