diff --git a/src/yggdrasil/api.go b/src/yggdrasil/api.go index cbf232a..2bc5c81 100644 --- a/src/yggdrasil/api.go +++ b/src/yggdrasil/api.go @@ -191,7 +191,7 @@ func (c *Core) GetSwitchQueues() SwitchQueues { Size: switchTable.queues.size, HighestCount: uint64(switchTable.queues.maxbufs), HighestSize: switchTable.queues.maxsize, - MaximumSize: switchTable.queueTotalMaxSize, + MaximumSize: switchTable.queues.totalMaxSize, } for k, v := range switchTable.queues.bufs { nexthop := switchTable.bestPortForCoords([]byte(k)) diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index fb2142c..40982cd 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -174,7 +174,9 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) (*config.NodeState, c.config.Mutex.RLock() if c.config.Current.SwitchOptions.MaxTotalQueueSize >= SwitchQueueTotalMinSize { - c.switchTable.queueTotalMaxSize = c.config.Current.SwitchOptions.MaxTotalQueueSize + c.switchTable.doAdmin(func() { + c.switchTable.queues.totalMaxSize = c.config.Current.SwitchOptions.MaxTotalQueueSize + }) } c.config.Mutex.RUnlock() diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index 824afd3..1389f41 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -318,7 +318,9 @@ func (intf *linkInterface) handler() error { isAlive = true if !isReady { // (Re-)enable in the switch - intf.link.core.switchTable.idleIn <- intf.peer.port + intf.link.core.switchTable.EnqueueFrom(nil, func() { + intf.link.core.switchTable._idleIn(intf.peer.port) + }) isReady = true } if gotMsg && !sendTimerRunning { @@ -355,7 +357,9 @@ func (intf *linkInterface) handler() error { isReady = false } else { // Keep enabled in the switch - intf.link.core.switchTable.idleIn <- intf.peer.port + intf.link.core.switchTable.EnqueueFrom(nil, func() { + intf.link.core.switchTable._idleIn(intf.peer.port) + }) isReady = true } case <-sendBlocked.C: diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 4bebc17..9a6f75f 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -242,7 +242,7 @@ func (p *peer) _handleTraffic(packet []byte, pTypeLen int) { // Drop traffic if the peer isn't in the switch return } - p.core.switchTable.packetIn <- packet + p.core.switchTable.packetInFrom(p, packet) } func (p *peer) sendPacketsFrom(from phony.IActor, packets [][]byte) { diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index 3f4c41f..20552ae 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -164,22 +164,19 @@ type switchData struct { // All the information stored by the switch. type switchTable struct { - core *Core - reconfigure chan chan error - key crypto.SigPubKey // Our own key - time time.Time // Time when locator.tstamp was last updated - drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root - mutex sync.RWMutex // Lock for reads/writes of switchData - parent switchPort // Port of whatever peer is our parent, or self if we're root - data switchData // - updater atomic.Value // *sync.Once - table atomic.Value // lookupTable - phony.Actor // Owns the below - packetIn chan []byte // Incoming packets for the worker to handle - idleIn chan switchPort // Incoming idle notifications from peer links - queues switch_buffers // Queues - not atomic so ONLY use through admin chan - queueTotalMaxSize uint64 // Maximum combined size of queues - idle map[switchPort]time.Time // idle peers + core *Core + reconfigure chan chan error + key crypto.SigPubKey // Our own key + time time.Time // Time when locator.tstamp was last updated + drop map[crypto.SigPubKey]int64 // Tstamp associated with a dropped root + mutex sync.RWMutex // Lock for reads/writes of switchData + parent switchPort // Port of whatever peer is our parent, or self if we're root + data switchData // + updater atomic.Value // *sync.Once + table atomic.Value // lookupTable + phony.Actor // Owns the below + queues switch_buffers // Queues - not atomic so ONLY use through the actor + idle map[switchPort]time.Time // idle peers - not atomic so ONLY use through the actor } // Minimum allowed total size of switch queues. @@ -197,12 +194,11 @@ func (t *switchTable) init(core *Core) { t.updater.Store(&sync.Once{}) t.table.Store(lookupTable{}) t.drop = make(map[crypto.SigPubKey]int64) - t.packetIn = make(chan []byte, 1024) - t.idleIn = make(chan switchPort, 1024) - t.queueTotalMaxSize = SwitchQueueTotalMinSize - t.idle = make(map[switchPort]time.Time) - t.queues.switchTable = t - t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string) + <-t.SyncExec(func() { + t.queues.totalMaxSize = SwitchQueueTotalMinSize + t.queues.bufs = make(map[string]switch_buffer) + t.idle = make(map[switchPort]time.Time) + }) } // Safely gets a copy of this node's locator. @@ -727,12 +723,12 @@ type switch_buffer struct { } type switch_buffers struct { - switchTable *switchTable - bufs map[string]switch_buffer // Buffers indexed by StreamID - size uint64 // Total size of all buffers, in bytes - maxbufs int - maxsize uint64 - closer []closerInfo // Scratch space + totalMaxSize uint64 + bufs map[string]switch_buffer // Buffers indexed by StreamID + size uint64 // Total size of all buffers, in bytes + maxbufs int + maxsize uint64 + closer []closerInfo // Scratch space } func (b *switch_buffers) _cleanup(t *switchTable) { @@ -749,7 +745,7 @@ func (b *switch_buffers) _cleanup(t *switchTable) { } } - for b.size > b.switchTable.queueTotalMaxSize { + for b.size > b.totalMaxSize { // Drop a random queue target := rand.Uint64() % b.size var size uint64 // running total @@ -828,6 +824,12 @@ func (t *switchTable) _handleIdle(port switchPort) bool { return false } +func (t *switchTable) packetInFrom(from phony.IActor, bytes []byte) { + t.EnqueueFrom(from, func() { + t._packetIn(bytes) + }) +} + func (t *switchTable) _packetIn(bytes []byte) { // Try to send it somewhere (or drop it if it's corrupt or at a dead end) if !t._handleIn(bytes, t.idle) { @@ -868,10 +870,6 @@ func (t *switchTable) doWorker() { for { //t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs)) select { - case bytes := <-t.packetIn: - <-t.SyncExec(func() { t._packetIn(bytes) }) - case port := <-t.idleIn: - <-t.SyncExec(func() { t._idleIn(port) }) case e := <-t.reconfigure: e <- nil }