From 7695a3fcbfabc56d2b70d313086c956b75a0bf03 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Sun, 24 Jun 2018 20:20:07 -0500 Subject: [PATCH] try using a simpler FIFO order for each backpressure buffer, since there are other mechanisms to penalize the flooding node, leads to better TCP throughput without affecting traffic between other nodes (does affect traffic in the same session, but there's hypothetically workarounds to that) --- src/yggdrasil/switch.go | 22 +++++++++++----------- src/yggdrasil/tcp.go | 28 +++++++++++----------------- 2 files changed, 22 insertions(+), 28 deletions(-) diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index ae1c391..6c5fd21 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -589,17 +589,17 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool // Handles incoming idle notifications // Loops over packets and sends the newest one that's OK for this peer to send // Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bool { +func (t *switchTable) handleIdle(port switchPort, buffs map[string][][]byte) bool { to := t.core.peers.getPorts()[port] if to == nil { return true } var best string var bestSize int - for streamID, packets := range stacks { + for streamID, packets := range buffs { // Filter over the streams that this node is closer to // Keep the one with the smallest queue - packet := packets[len(packets)-1] + packet := packets[0] coords := switch_getPacketCoords(packet) if (bestSize == 0 || len(packets) < bestSize) && t.portIsCloser(coords, port) { best = streamID @@ -607,13 +607,13 @@ func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bo } } if bestSize != 0 { - packets := stacks[best] + packets := buffs[best] var packet []byte - packet, packets = packets[len(packets)-1], packets[:len(packets)-1] + packet, packets = packets[0], packets[1:] if len(packets) == 0 { - delete(stacks, best) + delete(buffs, best) } else { - stacks[best] = packets + buffs[best] = packets } to.sendPacket(packet) return true @@ -624,7 +624,7 @@ func (t *switchTable) handleIdle(port switchPort, stacks map[string][][]byte) bo // The switch worker does routing lookups and sends packets to where they need to be func (t *switchTable) doWorker() { - stacks := make(map[string][][]byte) // Packets per PacketStreamID (string) + buffs := make(map[string][][]byte) // Packets per PacketStreamID (string) idle := make(map[switchPort]struct{}) // this is to deduplicate things for { select { @@ -633,16 +633,16 @@ func (t *switchTable) doWorker() { if !t.handleIn(packet, idle) { // There's nobody free to take it right now, so queue it for later streamID := switch_getPacketStreamID(packet) - packets := append(stacks[streamID], packet) + packets := append(buffs[streamID], packet) for len(packets) > 32 { util_putBytes(packets[0]) packets = packets[1:] } - stacks[streamID] = packets + buffs[streamID] = packets } case port := <-t.idleIn: // Try to find something to send to this peer - if !t.handleIdle(port, stacks) { + if !t.handleIdle(port, buffs) { // Didn't find anything ready to send yet, so stay idle idle[port] = struct{}{} } diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 12147d4..80ae284 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -246,19 +246,13 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { defer close(out) go func() { // This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic - send := make(chan []byte) - defer close(send) - go func() { - // This goroutine does the actual socket write operations - // The parent goroutine aggregates things for it and feeds them in - for msg := range send { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf := net.Buffers{tcp_msg[:], msgLen, msg} - buf.WriteTo(sock) - atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) - util_putBytes(msg) - } - }() + send := func(msg []byte) { + msgLen := wire_encode_uint64(uint64(len(msg))) + buf := net.Buffers{tcp_msg[:], msgLen, msg} + buf.WriteTo(sock) + atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) + util_putBytes(msg) + } timerInterval := tcp_timeout * 2 / 3 timer := time.NewTimer(timerInterval) defer timer.Stop() @@ -266,7 +260,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { select { case msg := <-p.linkOut: // Always send outgoing link traffic first, if needed - send <- msg + send(msg) continue default: } @@ -279,14 +273,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { timer.Reset(timerInterval) select { case _ = <-timer.C: - send <- nil // TCP keep-alive traffic + send(nil) // TCP keep-alive traffic case msg := <-p.linkOut: - send <- msg + send(msg) case msg, ok := <-out: if !ok { return } - send <- msg // Block until the socket writer has the packet + send(msg) // Block until the socket write has finished // Now inform the switch that we're ready for more traffic p.core.switchTable.idleIn <- p.port }