diff --git a/src/yggdrasil/debug.go b/src/yggdrasil/debug.go index c029ae9..5242a6e 100644 --- a/src/yggdrasil/debug.go +++ b/src/yggdrasil/debug.go @@ -481,13 +481,17 @@ func DEBUG_simLinkPeers(p, q *peer) { } }() p.out = func(bs []byte) { + p.core.switchTable.idleIn <- p.port go q.handlePacket(bs) } q.out = func(bs []byte) { + q.core.switchTable.idleIn <- q.port go p.handlePacket(bs) } go p.linkLoop() go q.linkLoop() + p.core.switchTable.idleIn <- p.port + q.core.switchTable.idleIn <- q.port } func (c *Core) DEBUG_simFixMTU() { diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index d5bdc5a..f0dd3d0 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -74,9 +74,8 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) { ps.ports.Store(ports) } -// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral), a handler for their outgoing traffic, and queue sizes for local backpressure. +// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic type peer struct { - queueSize int64 // used to track local backpressure bytesSent uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers // BUG: sync/atomic, 32 bit platforms need the above to be the first element @@ -94,16 +93,6 @@ type peer struct { close func() // Called when a peer is removed, to close the underlying connection, or via admin api } -// Size of the queue of packets to be sent to the node. -func (p *peer) getQueueSize() int64 { - return atomic.LoadInt64(&p.queueSize) -} - -// Used to increment or decrement the queue. -func (p *peer) updateQueueSize(delta int64) { - atomic.AddInt64(&p.queueSize, delta) -} - // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unocupied port number. func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKey) *peer { now := time.Now() diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index f74e1c5..4c1f0f8 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -503,11 +503,12 @@ func (t *switchTable) lookup(dest []byte) switchPort { if !(dist < myDist) { continue } - p, isIn := ports[info.port] + //p, isIn := ports[info.port] + _, isIn := ports[info.port] if !isIn { continue } - cost := int64(dist) + p.getQueueSize() + cost := int64(dist) // + p.getQueueSize() if cost < bestCost { best = info.port bestCost = cost @@ -573,6 +574,7 @@ func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { // Handle an incoming packet // Either send it to ourself, or to the first idle peer that's free +// Returns true if the packet has been handled somehow, false if it should be queued func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { // Get the coords, skipping the first byte (the pType) _, pTypeLen := wire_decode_uint64(packet) @@ -599,6 +601,27 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool return false } +// Handles incoming idle notifications +// Loops over packets and sends the newest one that's OK for this peer to send +// Returns true if the peer is no longer idle, false if it should be added to the idle list +func (t *switchTable) handleIdle(port switchPort, packets *[][]byte) bool { + to := t.core.peers.getPorts()[port] + if to == nil { + return true + } + for idx := len(*packets) - 1; idx >= 0; idx-- { + packet := (*packets)[idx] + _, pTypeLen := wire_decode_uint64(packet) + coords, _ := wire_decode_coords(packet[pTypeLen:]) + if t.portIsCloser(coords, port) { + to.sendPacket(packet) + *packets = append((*packets)[:idx], (*packets)[idx+1:]...) + return true + } + } + return false +} + // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { var packets [][]byte // Should really be a linked list @@ -606,13 +629,9 @@ func (t *switchTable) doWorker() { for { select { case packet := <-t.packetIn: - idle = make(map[switchPort]struct{}) - for port := range t.getTable().elems { - idle[port] = struct{}{} - } - // TODO correcty fill idle, so the above can be removed + // Try to send it somewhere (or drop it if it's corrupt or at a dead end) if !t.handleIn(packet, idle) { - // There's nobody free to take it now, so queue it + // There's nobody free to take it right now, so queue it for later packets = append(packets, packet) for len(packets) > 32 { util_putBytes(packets[0]) @@ -620,9 +639,11 @@ func (t *switchTable) doWorker() { } } case port := <-t.idleIn: - // TODO the part that loops over packets and finds something to send - // Didn't find anything to send, so add this port to the idle list - idle[port] = struct{}{} + // Try to find something to send to this peer + if !t.handleIdle(port, &packets) { + // Didn't find anything ready to send yet, so stay idle + idle[port] = struct{}{} + } } } } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 0df2d52..12147d4 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -19,7 +19,6 @@ import ( "fmt" "math/rand" "net" - "sort" "sync" "sync/atomic" "time" @@ -243,26 +242,15 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { in := func(bs []byte) { p.handlePacket(bs) } - out := make(chan []byte, 1024) // Should be effectively infinite, but gets fed into finite LIFO stack + out := make(chan []byte, 1) defer close(out) go func() { - var shadow int64 - var stack [][]byte - put := func(msg []byte) { - stack = append(stack, msg) - sort.SliceStable(stack, func(i, j int) bool { - // Sort in reverse order, with smallest messages at the end - return len(stack[i]) >= len(stack[j]) - }) - for len(stack) > 32 { - util_putBytes(stack[0]) - stack = stack[1:] - shadow++ - } - } + // This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic send := make(chan []byte) defer close(send) go func() { + // This goroutine does the actual socket write operations + // The parent goroutine aggregates things for it and feeds them in for msg := range send { msgLen := wire_encode_uint64(uint64(len(msg))) buf := net.Buffers{tcp_msg[:], msgLen, msg} @@ -275,10 +263,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { timer := time.NewTimer(timerInterval) defer timer.Stop() for { - if shadow != 0 { - p.updateQueueSize(-shadow) - shadow = 0 + select { + case msg := <-p.linkOut: + // Always send outgoing link traffic first, if needed + send <- msg + continue + default: } + // Otherwise wait reset the timer and wait for something to do timer.Stop() select { case <-timer.C: @@ -294,34 +286,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { if !ok { return } - put(msg) - } - for len(stack) > 0 { - // First make sure linkOut gets sent first, if it's non-empty - select { - case msg := <-p.linkOut: - send <- msg - continue - default: - } - // Then block until we send or receive something - select { - case msg := <-p.linkOut: - send <- msg - case msg, ok := <-out: - if !ok { - return - } - put(msg) - case send <- stack[len(stack)-1]: - stack = stack[:len(stack)-1] - p.updateQueueSize(-1) - } + send <- msg // Block until the socket writer has the packet + // Now inform the switch that we're ready for more traffic + p.core.switchTable.idleIn <- p.port } } }() + p.core.switchTable.idleIn <- p.port // Start in the idle state p.out = func(msg []byte) { - p.updateQueueSize(1) defer func() { recover() }() out <- msg }