diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index 82d0aa9..4a6ae41 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -207,15 +207,16 @@ func (c *Core) GetSwitchQueues() SwitchQueues { HighestSize: switchTable.queues.maxsize, MaximumSize: switchTable.queues.totalMaxSize, } - for k, v := range switchTable.queues.bufs { - nexthop := switchTable.bestPortForCoords([]byte(k)) - queue := SwitchQueue{ - ID: k, - Size: v.size, - Packets: uint64(len(v.packets)), - Port: uint64(nexthop), + for port, pbuf := range switchTable.queues.bufs { + for k, v := range pbuf { + queue := SwitchQueue{ + ID: k, + Size: v.size, + Packets: uint64(len(v.packets)), + Port: uint64(port), + } + switchqueues.Queues = append(switchqueues.Queues, queue) } - switchqueues.Queues = append(switchqueues.Queues, queue) } } phony.Block(&c.switchTable, getSwitchQueues) diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 157ea52..fb40fc0 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -282,13 +282,6 @@ func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { }) } -// called by an AfterFunc if we seem to be blocked in a send syscall for a long time -func (intf *linkInterface) _notifySyscall() { - intf.link.core.switchTable.Act(intf, func() { - intf.link.core.switchTable._sendingIn(intf.peer.port) - }) -} - // we just sent something, so cancel any pending timer to send keep-alive traffic func (intf *linkInterface) _cancelStallTimer() { if intf.stallTimer != nil { @@ -402,19 +395,7 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool size += len(bs) } w.intf.notifySending(size, isLinkTraffic) - // start a timer that will fire if we get stuck in writeMsgs for an oddly long time - var once sync.Once - timer := time.AfterFunc(time.Millisecond, func() { - // 1 ms is kind of arbitrary - // the rationale is that this should be very long compared to a syscall - // but it's still short compared to end-to-end latency or human perception - once.Do(func() { - w.intf.Act(nil, w.intf._notifySyscall) - }) - }) w.intf.msgIO.writeMsgs(bss) - // Make sure we either stop the timer from doing anything or wait until it's done - once.Do(func() { timer.Stop() }) w.intf.notifySent(size, isLinkTraffic) // Cleanup for _, bs := range bss { diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 653b12f..899d143 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -177,7 +177,6 @@ type switchTable struct { phony.Inbox // Owns the below queues switch_buffers // Queues - not atomic so ONLY use through the actor idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor - sending map[switchPort]struct{} // peers known to be blocked in a send (somehow) } // Minimum allowed total size of switch queues. @@ -202,9 +201,8 @@ func (t *switchTable) init(core *Core) { t.queues.totalMaxSize = SwitchQueueTotalMinSize } core.config.Mutex.RUnlock() - t.queues.bufs = make(map[string]switch_buffer) + t.queues.bufs = make(map[switchPort]map[string]switch_buffer) t.idle = make(map[switchPort]struct{}) - t.sending = make(map[switchPort]struct{}) }) } @@ -666,27 +664,17 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort { // Handle an incoming packet // Either send it to ourself, or to the first idle peer that's free // Returns true if the packet has been handled somehow, false if it should be queued -func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sending map[switchPort]struct{}) bool { +func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) (bool, switchPort) { coords := switch_getPacketCoords(packet) closer := t.getCloser(coords) - if len(closer) == 0 { - // TODO? call the router directly, and remove the whole concept of a self peer? - self := t.core.peers.getPorts()[0] - self.sendPacketsFrom(t, [][]byte{packet}) - return true - } var best *closerInfo ports := t.core.peers.getPorts() for _, cinfo := range closer { to := ports[cinfo.elem.port] - //_, isIdle := idle[cinfo.elem.port] - _, isSending := sending[cinfo.elem.port] var update bool switch { case to == nil: // no port was found, ignore it - case isSending: - // the port is busy, ignore it case best == nil: // this is the first idle port we've found, so select it until we find a // better candidate port to use instead @@ -715,15 +703,20 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sen best = &b } } - if best != nil { - if _, isIdle := idle[best.elem.port]; isIdle { - delete(idle, best.elem.port) - ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) - return true - } + if best == nil { + // No closer peers + // TODO? call the router directly, and remove the whole concept of a self peer? + self := t.core.peers.getPorts()[0] + self.sendPacketsFrom(t, [][]byte{packet}) + return true, 0 } - // Didn't find anyone idle to send it to - return false + if _, isIdle := idle[best.elem.port]; isIdle { + delete(idle, best.elem.port) + ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet}) + return true, best.elem.port + } + // Best node isn't idle, so return port and let the packet be buffered + return false, best.elem.port } // Info about a buffered packet @@ -740,24 +733,29 @@ type switch_buffer struct { type switch_buffers struct { totalMaxSize uint64 - bufs map[string]switch_buffer // Buffers indexed by StreamID - size uint64 // Total size of all buffers, in bytes + bufs map[switchPort]map[string]switch_buffer // Buffers indexed by port and StreamID + size uint64 // Total size of all buffers, in bytes maxbufs int maxsize uint64 closer []closerInfo // Scratch space } func (b *switch_buffers) _cleanup(t *switchTable) { - for streamID, buf := range b.bufs { - // Remove queues for which we have no next hop - packet := buf.packets[0] - coords := switch_getPacketCoords(packet.bytes) - if len(t.getCloser(coords)) == 0 { - for _, packet := range buf.packets { - util.PutBytes(packet.bytes) + for port, pbufs := range b.bufs { + for streamID, buf := range pbufs { + // Remove queues for which we have no next hop + packet := buf.packets[0] + coords := switch_getPacketCoords(packet.bytes) + if len(t.getCloser(coords)) == 0 { + for _, packet := range buf.packets { + util.PutBytes(packet.bytes) + } + b.size -= buf.size + delete(pbufs, streamID) } - b.size -= buf.size - delete(b.bufs, streamID) + } + if len(pbufs) == 0 { + delete(b.bufs, port) } } @@ -765,23 +763,28 @@ func (b *switch_buffers) _cleanup(t *switchTable) { // Drop a random queue target := rand.Uint64() % b.size var size uint64 // running total - for streamID, buf := range b.bufs { - size += buf.size - if size < target { - continue + for port, pbufs := range b.bufs { + for streamID, buf := range pbufs { + size += buf.size + if size < target { + continue + } + var packet switch_packetInfo + packet, buf.packets = buf.packets[0], buf.packets[1:] + buf.size -= uint64(len(packet.bytes)) + b.size -= uint64(len(packet.bytes)) + util.PutBytes(packet.bytes) + if len(buf.packets) == 0 { + delete(pbufs, streamID) + if len(pbufs) == 0 { + delete(b.bufs, port) + } + } else { + // Need to update the map, since buf was retrieved by value + pbufs[streamID] = buf + } + break } - var packet switch_packetInfo - packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.size -= uint64(len(packet.bytes)) - b.size -= uint64(len(packet.bytes)) - util.PutBytes(packet.bytes) - if len(buf.packets) == 0 { - delete(b.bufs, streamID) - } else { - // Need to update the map, since buf was retrieved by value - b.bufs[streamID] = buf - } - break } } } @@ -799,32 +802,35 @@ func (t *switchTable) _handleIdle(port switchPort) bool { var psize int t.queues._cleanup(t) now := time.Now() + pbufs := t.queues.bufs[port] for psize < 65535 { var best string var bestPriority float64 - for streamID, buf := range t.queues.bufs { + for streamID, buf := range pbufs { // Filter over the streams that this node is closer to // Keep the one with the smallest queue packet := buf.packets[0] - coords := switch_getPacketCoords(packet.bytes) priority := float64(now.Sub(packet.time)) / float64(buf.size) - if priority >= bestPriority && t.portIsCloser(coords, port) { + if priority >= bestPriority { best = streamID bestPriority = priority } } if best != "" { - buf := t.queues.bufs[best] + buf := pbufs[best] var packet switch_packetInfo // TODO decide if this should be LIFO or FIFO packet, buf.packets = buf.packets[0], buf.packets[1:] buf.size -= uint64(len(packet.bytes)) t.queues.size -= uint64(len(packet.bytes)) if len(buf.packets) == 0 { - delete(t.queues.bufs, best) + delete(pbufs, best) + if len(pbufs) == 0 { + delete(t.queues.bufs, port) + } } else { // Need to update the map, since buf was retrieved by value - t.queues.bufs[best] = buf + pbufs[best] = buf } packets = append(packets, packet.bytes) psize += len(packet.bytes) @@ -848,11 +854,14 @@ func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) { func (t *switchTable) _packetIn(bytes []byte) { // Try to send it somewhere (or drop it if it's corrupt or at a dead end) - if !t._handleIn(bytes, t.idle, t.sending) { + if sent, best := t._handleIn(bytes, t.idle); !sent { // There's nobody free to take it right now, so queue it for later packet := switch_packetInfo{bytes, time.Now()} streamID := switch_getPacketStreamID(packet.bytes) - buf, bufExists := t.queues.bufs[streamID] + if _, isIn := t.queues.bufs[best]; !isIn { + t.queues.bufs[best] = make(map[string]switch_buffer) + } + buf, bufExists := t.queues.bufs[best][streamID] buf.packets = append(buf.packets, packet) buf.size += uint64(len(packet.bytes)) t.queues.size += uint64(len(packet.bytes)) @@ -860,13 +869,17 @@ func (t *switchTable) _packetIn(bytes []byte) { if t.queues.size > t.queues.maxsize { t.queues.maxsize = t.queues.size } - t.queues.bufs[streamID] = buf + t.queues.bufs[best][streamID] = buf if !bufExists { // Keep a track of the max total queue count. Only recalculate this // when the queue is new because otherwise repeating len(dict) might // cause unnecessary processing overhead - if len(t.queues.bufs) > t.queues.maxbufs { - t.queues.maxbufs = len(t.queues.bufs) + var count int + for _, pbufs := range t.queues.bufs { + count += len(pbufs) + } + if count > t.queues.maxbufs { + t.queues.maxbufs = count } } t.queues._cleanup(t) @@ -875,15 +888,8 @@ func (t *switchTable) _packetIn(bytes []byte) { func (t *switchTable) _idleIn(port switchPort) { // Try to find something to send to this peer - delete(t.sending, port) if !t._handleIdle(port) { // Didn't find anything ready to send yet, so stay idle t.idle[port] = struct{}{} } } - -func (t *switchTable) _sendingIn(port switchPort) { - if _, isIn := t.idle[port]; !isIn { - t.sending[port] = struct{}{} - } -}