From 945930aa2ccbc327ae6bef0ec8db36b65a398a17 Mon Sep 17 00:00:00 2001 From: Arceliar Date: Fri, 3 Apr 2020 00:32:26 -0500 Subject: [PATCH] WIP have peer actors queue packets, temporarily a single simple FIFO queue with head drop --- src/yggdrasil/api.go | 29 ---- src/yggdrasil/link.go | 18 +-- src/yggdrasil/packetqueue.go | 39 +++++ src/yggdrasil/peer.go | 43 ++++-- src/yggdrasil/router.go | 9 +- src/yggdrasil/switch.go | 291 +---------------------------------- 6 files changed, 91 insertions(+), 338 deletions(-) create mode 100644 src/yggdrasil/packetqueue.go diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index a722dc5..31ece6b 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -199,35 +199,6 @@ func (c *Core) GetDHT() []DHTEntry { return dhtentries } -// GetSwitchQueues returns information about the switch queues that are -// currently in effect. These values can change within an instant. -func (c *Core) GetSwitchQueues() SwitchQueues { - var switchqueues SwitchQueues - switchTable := &c.switchTable - getSwitchQueues := func() { - switchqueues = SwitchQueues{ - Count: uint64(len(switchTable.queues.bufs)), - Size: switchTable.queues.size, - HighestCount: uint64(switchTable.queues.maxbufs), - HighestSize: switchTable.queues.maxsize, - MaximumSize: switchTable.queues.totalMaxSize, - } - for port, pbuf := range switchTable.queues.bufs { - for k, v := range pbuf { - queue := SwitchQueue{ - ID: k, - Size: v.size, - Packets: uint64(len(v.packets)), - Port: uint64(port), - } - switchqueues.Queues = append(switchqueues.Queues, queue) - } - } - } - phony.Block(&c.switchTable, getSwitchQueues) - return switchqueues -} - // GetSessions returns a list of open sessions from this node to other nodes. func (c *Core) GetSessions() []Session { var sessions []Session diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 978e8ea..1501799 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -62,7 +62,7 @@ type linkInterface struct { keepAliveTimer *time.Timer // Fires to send keep-alive traffic stallTimer *time.Timer // Fires to signal that no incoming traffic (including keep-alive) has been seen closeTimer *time.Timer // Fires when the link has been idle so long we need to close it - inSwitch bool // True if the switch is tracking this link + isIdle bool // True if the peer actor knows the link is idle stalled bool // True if we haven't been receiving any response traffic unstalled bool // False if an idle notification to the switch hasn't been sent because we stalled (or are first starting up) } @@ -278,7 +278,7 @@ const ( func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) { intf.Act(&intf.writer, func() { if !isLinkTraffic { - intf.inSwitch = false + intf.isIdle = false } intf.sendTimer = time.AfterFunc(sendTime, intf.notifyBlockedSend) intf._cancelStallTimer() @@ -311,7 +311,7 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { intf.sendTimer.Stop() intf.sendTimer = nil if !isLinkTraffic { - intf._notifySwitch() + intf._notifyIdle() } if size > 0 && intf.stallTimer == nil { intf.stallTimer = time.AfterFunc(stallTime, intf.notifyStalled) @@ -320,15 +320,13 @@ func (intf *linkInterface) notifySent(size int, isLinkTraffic bool) { } // Notify the switch that we're ready for more traffic, assuming we're not in a stalled state -func (intf *linkInterface) _notifySwitch() { - if !intf.inSwitch { +func (intf *linkInterface) _notifyIdle() { + if !intf.isIdle { if intf.stalled { intf.unstalled = false } else { - intf.inSwitch = true - intf.link.core.switchTable.Act(intf, func() { - intf.link.core.switchTable._idleIn(intf.peer.port) - }) + intf.isIdle = true + intf.peer.Act(intf, intf.peer._handleIdle) } } } @@ -364,7 +362,7 @@ func (intf *linkInterface) notifyRead(size int) { } intf.stalled = false if !intf.unstalled { - intf._notifySwitch() + intf._notifyIdle() intf.unstalled = true } if size > 0 && intf.stallTimer == nil { diff --git a/src/yggdrasil/packetqueue.go b/src/yggdrasil/packetqueue.go new file mode 100644 index 0000000..ac66c0d --- /dev/null +++ b/src/yggdrasil/packetqueue.go @@ -0,0 +1,39 @@ +package yggdrasil + +import "github.com/yggdrasil-network/yggdrasil-go/src/util" + +// TODO take max size from config +const MAX_PACKET_QUEUE_SIZE = 1048576 // 1 MB + +// TODO separate queues per e.g. traffic flow +type packetQueue struct { + packets [][]byte + size uint32 +} + +func (q *packetQueue) cleanup() { + for q.size > MAX_PACKET_QUEUE_SIZE { + if packet, success := q.pop(); success { + util.PutBytes(packet) + } else { + panic("attempted to drop packet from empty queue") + break + } + } +} + +func (q *packetQueue) push(packet []byte) { + q.packets = append(q.packets, packet) + q.size += uint32(len(packet)) + q.cleanup() +} + +func (q *packetQueue) pop() ([]byte, bool) { + if len(q.packets) > 0 { + packet := q.packets[0] + q.packets = q.packets[1:] + q.size -= uint32(len(packet)) + return packet, true + } + return nil, false +} diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 9acb932..bc9de04 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -100,6 +100,8 @@ type peer struct { bytesRecvd uint64 ports map[switchPort]*peer table *lookupTable + queue packetQueue + idle bool } func (ps *peers) updateTables(from phony.Actor, table *lookupTable) { @@ -243,6 +245,13 @@ func (p *peer) _handlePacket(packet []byte) { } } +// Get the coords of a packet without decoding +func peer_getPacketCoords(packet []byte) []byte { + _, pTypeLen := wire_decode_uint64(packet) + coords, _ := wire_decode_coords(packet[pTypeLen:]) + return coords +} + // Called to handle traffic or protocolTraffic packets. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. func (p *peer) _handleTraffic(packet []byte) { @@ -250,7 +259,7 @@ func (p *peer) _handleTraffic(packet []byte) { // Drop traffic if the peer isn't in the switch return } - coords := switch_getPacketCoords(packet) + coords := peer_getPacketCoords(packet) next := p.table.lookup(coords) if nPeer, isIn := p.ports[next]; isIn { nPeer.sendPacketsFrom(p, [][]byte{packet}) @@ -264,17 +273,33 @@ func (p *peer) sendPacketsFrom(from phony.Actor, packets [][]byte) { }) } -// This just calls p.out(packet) for now. func (p *peer) _sendPackets(packets [][]byte) { - // Is there ever a case where something more complicated is needed? - // What if p.out blocks? - var size int for _, packet := range packets { - size += len(packet) + p.queue.push(packet) + } + if p.idle { + p.idle = false + p._handleIdle() + } +} + +func (p *peer) _handleIdle() { + var packets [][]byte + var size uint64 + for size < 65535 { + if packet, success := p.queue.pop(); success { + packets = append(packets, packet) + size += uint64(len(packet)) + } else { + break + } + } + if len(packets) > 0 { + p.bytesSent += uint64(size) + p.out(packets) + } else { + p.idle = true } - p.bytesSent += uint64(size) - // FIXME need to manage queues here or else things can block! - p.out(packets) } // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 40b8303..1be9466 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -67,7 +67,14 @@ func (r *router) init(core *Core) { // FIXME don't block here! p = r.core.peers._newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) }) - p.out = func(packets [][]byte) { r.handlePackets(p, packets) } + p.out = func(packets [][]byte) { + r.handlePackets(p, packets) + r.Act(p, func() { + // after the router handle the packets, notify the peer that it's ready for more + p.Act(r, p._handleIdle) + }) + } + p.Act(r, p._handleIdle) r.out = func(bs []byte) { p.handlePacketFrom(r, bs) } r.nodeinfo.init(r.core) r.core.config.Mutex.RLock() diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 2661b46..091596b 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -164,13 +164,11 @@ type switchData struct { type switchTable struct { core *Core key crypto.SigPubKey // Our own key + phony.Inbox // Owns the below time time.Time // Time when locator.tstamp was last updated drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root parent switchPort // Port of whatever peer is our parent, or self if we're root data switchData // - phony.Inbox // Owns the below - queues switch_buffers // Queues - not atomic so ONLY use through the actor - idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor } // Minimum allowed total size of switch queues. @@ -185,18 +183,7 @@ func (t *switchTable) init(core *Core) { peers := make(map[switchPort]peerInfo) t.data = switchData{locator: locator, peers: peers} t.drop = make(map[crypto.SigPubKey]int64) - phony.Block(t, func() { - core.config.Mutex.RLock() - if core.config.Current.SwitchOptions.MaxTotalQueueSize > SwitchQueueTotalMinSize { - t.queues.totalMaxSize = core.config.Current.SwitchOptions.MaxTotalQueueSize - } else { - t.queues.totalMaxSize = SwitchQueueTotalMinSize - } - core.config.Mutex.RUnlock() - t.queues.bufs = make(map[switchPort]map[string]switch_buffer) - t.idle = make(map[switchPort]struct{}) - }) - t._updateTable() + phony.Block(t, t._updateTable) } func (t *switchTable) reconfigure() { @@ -557,73 +544,6 @@ func (t *switchTable) start() error { return nil } -type closerInfo struct { - elem tableElem - dist int -} - -// Return a map of ports onto distance, keeping only ports closer to the destination than this node -// If the map is empty (or nil), then no peer is closer -/* -func (t *switchTable) getCloser(dest []byte) []closerInfo { - table := t.getTable() - myDist := table.self.dist(dest) - if myDist == 0 { - // Skip the iteration step if it's impossible to be closer - return nil - } - var closer []closerInfo - for _, info := range table.elems { - dist := info.locator.dist(dest) - if dist < myDist { - closer = append(closer, closerInfo{info, dist}) - } - } - return closer -} -*/ - -// Returns true if the peer is closer to the destination than ourself -/* -func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool { - table := t.getTable() - if info, isIn := table.elems[port]; isIn { - theirDist := info.locator.dist(dest) - myDist := table.self.dist(dest) - return theirDist < myDist - } else { - return false - } -} -*/ - -// Get the coords of a packet without decoding -func switch_getPacketCoords(packet []byte) []byte { - _, pTypeLen := wire_decode_uint64(packet) - coords, _ := wire_decode_coords(packet[pTypeLen:]) - return coords -} - -// Returns a unique string for each stream of traffic -// Equal to coords -// The sender may append arbitrary info to the end of coords (as long as it's begins with a 0x00) to designate separate traffic streams -// Currently, it's the IPv6 next header type and the first 2 uint16 of the next header -// This is equivalent to the TCP/UDP protocol numbers and the source / dest ports -// TODO figure out if something else would make more sense (other transport protocols?) -func switch_getPacketStreamID(packet []byte) string { - return string(switch_getPacketCoords(packet)) -} - -// Returns the flowlabel from a given set of coords -func switch_getFlowLabelFromCoords(in []byte) []byte { - for i, v := range in { - if v == 0 { - return in[i+1:] - } - } - return []byte{} -} - // Find the best port to forward to for a given set of coords func (t *lookupTable) lookup(coords []byte) switchPort { var bestPort switchPort @@ -660,210 +580,3 @@ func (t *lookupTable) lookup(coords []byte) switchPort { } return bestPort } - -// Handle an incoming packet -// Either send it to ourself, or to the first idle peer that's free -// Returns true if the packet has been handled somehow, false if it should be queued -func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) (bool, switchPort) { - /* - coords := switch_getPacketCoords(packet) - table := t.getTable() - port := table.lookup(coords) - ports := t.core.peers.getPorts() - peer := ports[port] - if peer == nil { - // FIXME hack, if the peer disappeared durring a race then don't buffer - return true, 0 - } - if _, isIdle := idle[port]; isIdle || port == 0 { - // Either no closer peers, or the closest peer is idle - delete(idle, port) - peer.sendPacketsFrom(t, [][]byte{packet}) - return true, port - } - // There's a closer peer, but it's not idle, so buffer it - return false, port - */ - return true, 0 -} - -// Info about a buffered packet -type switch_packetInfo struct { - bytes []byte - time time.Time // Timestamp of when the packet arrived -} - -// Used to keep track of buffered packets -type switch_buffer struct { - packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large - size uint64 // Total queue size in bytes -} - -type switch_buffers struct { - totalMaxSize uint64 - bufs map[switchPort]map[string]switch_buffer // Buffers indexed by port and StreamID - size uint64 // Total size of all buffers, in bytes - maxbufs int - maxsize uint64 -} - -func (b *switch_buffers) _cleanup(t *switchTable) { - /* - for port, pbufs := range b.bufs { - for streamID, buf := range pbufs { - // Remove queues for which we have no next hop - packet := buf.packets[0] - coords := switch_getPacketCoords(packet.bytes) - if len(t.getCloser(coords)) == 0 { - for _, packet := range buf.packets { - util.PutBytes(packet.bytes) - } - b.size -= buf.size - delete(pbufs, streamID) - } - } - if len(pbufs) == 0 { - delete(b.bufs, port) - } - } - - for b.size > b.totalMaxSize { - // Drop a random queue - target := rand.Uint64() % b.size - var size uint64 // running total - for port, pbufs := range b.bufs { - for streamID, buf := range pbufs { - size += buf.size - if size < target { - continue - } - var packet switch_packetInfo - packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.size -= uint64(len(packet.bytes)) - b.size -= uint64(len(packet.bytes)) - util.PutBytes(packet.bytes) - if len(buf.packets) == 0 { - delete(pbufs, streamID) - if len(pbufs) == 0 { - delete(b.bufs, port) - } - } else { - // Need to update the map, since buf was retrieved by value - pbufs[streamID] = buf - } - break - } - } - } - */ -} - -// Handles incoming idle notifications -// Loops over packets and sends the newest one that's OK for this peer to send -// Returns true if the peer is no longer idle, false if it should be added to the idle list -func (t *switchTable) _handleIdle(port switchPort) bool { - // TODO? only send packets for which this is the best next hop that isn't currently blocked sending - /* - to := t.core.peers.getPorts()[port] - if to == nil { - return true - } - var packets [][]byte - var psize int - t.queues._cleanup(t) - now := time.Now() - pbufs := t.queues.bufs[port] - for psize < 65535 { - var best *string - var bestPriority float64 - for streamID, buf := range pbufs { - // Filter over the streams that this node is closer to - // Keep the one with the smallest queue - packet := buf.packets[0] - priority := float64(now.Sub(packet.time)) / float64(buf.size) - if priority >= bestPriority { - b := streamID // copy since streamID is mutated in the loop - best = &b - bestPriority = priority - } - } - if best != nil { - buf := pbufs[*best] - var packet switch_packetInfo - // TODO decide if this should be LIFO or FIFO - packet, buf.packets = buf.packets[0], buf.packets[1:] - buf.size -= uint64(len(packet.bytes)) - t.queues.size -= uint64(len(packet.bytes)) - if len(buf.packets) == 0 { - delete(pbufs, *best) - if len(pbufs) == 0 { - delete(t.queues.bufs, port) - } - } else { - // Need to update the map, since buf was retrieved by value - pbufs[*best] = buf - - } - packets = append(packets, packet.bytes) - psize += len(packet.bytes) - } else { - // Finished finding packets - break - } - } - if len(packets) > 0 { - to.sendPacketsFrom(t, packets) - return true - } - return false - */ - return false -} - -func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) { - t.Act(from, func() { - t._packetIn(bytes) - }) -} - -func (t *switchTable) _packetIn(bytes []byte) { - // Try to send it somewhere (or drop it if it's corrupt or at a dead end) - if sent, best := t._handleIn(bytes, t.idle); !sent { - // There's nobody free to take it right now, so queue it for later - packet := switch_packetInfo{bytes, time.Now()} - streamID := switch_getPacketStreamID(packet.bytes) - if _, isIn := t.queues.bufs[best]; !isIn { - t.queues.bufs[best] = make(map[string]switch_buffer) - } - buf, bufExists := t.queues.bufs[best][streamID] - buf.packets = append(buf.packets, packet) - buf.size += uint64(len(packet.bytes)) - t.queues.size += uint64(len(packet.bytes)) - // Keep a track of the max total queue size - if t.queues.size > t.queues.maxsize { - t.queues.maxsize = t.queues.size - } - t.queues.bufs[best][streamID] = buf - if !bufExists { - // Keep a track of the max total queue count. Only recalculate this - // when the queue is new because otherwise repeating len(dict) might - // cause unnecessary processing overhead - var count int - for _, pbufs := range t.queues.bufs { - count += len(pbufs) - } - if count > t.queues.maxbufs { - t.queues.maxbufs = count - } - } - t.queues._cleanup(t) - } -} - -func (t *switchTable) _idleIn(port switchPort) { - // Try to find something to send to this peer - if !t._handleIdle(port) { - // Didn't find anything ready to send yet, so stay idle - t.idle[port] = struct{}{} - } -}