mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-11-23 04:21:34 +00:00
commit
a87581b0fa
@ -276,6 +276,13 @@ func (intf *linkInterface) notifySending(size int, isLinkTraffic bool) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// called by an AfterFunc if we seem to be blocked in a send syscall for a long time
|
||||||
|
func (intf *linkInterface) _notifySyscall() {
|
||||||
|
intf.link.core.switchTable.Act(intf, func() {
|
||||||
|
intf.link.core.switchTable._sendingIn(intf.peer.port)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// we just sent something, so cancel any pending timer to send keep-alive traffic
|
// we just sent something, so cancel any pending timer to send keep-alive traffic
|
||||||
func (intf *linkInterface) _cancelStallTimer() {
|
func (intf *linkInterface) _cancelStallTimer() {
|
||||||
if intf.stallTimer != nil {
|
if intf.stallTimer != nil {
|
||||||
@ -284,9 +291,11 @@ func (intf *linkInterface) _cancelStallTimer() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// called by an AfterFunc if we appear to have timed out
|
// This gets called from a time.AfterFunc, and notifies the switch that we appear
|
||||||
|
// to have gotten blocked on a write, so the switch should start routing traffic
|
||||||
|
// through other links, if alternatives exist
|
||||||
func (intf *linkInterface) notifyBlockedSend() {
|
func (intf *linkInterface) notifyBlockedSend() {
|
||||||
intf.Act(nil, func() { // Sent from a time.AfterFunc
|
intf.Act(nil, func() {
|
||||||
if intf.sendTimer != nil {
|
if intf.sendTimer != nil {
|
||||||
//As far as we know, we're still trying to send, and the timer fired.
|
//As far as we know, we're still trying to send, and the timer fired.
|
||||||
intf.link.core.switchTable.blockPeer(intf.peer.port)
|
intf.link.core.switchTable.blockPeer(intf.peer.port)
|
||||||
@ -380,7 +389,19 @@ func (w *linkWriter) sendFrom(from phony.Actor, bss [][]byte, isLinkTraffic bool
|
|||||||
size += len(bs)
|
size += len(bs)
|
||||||
}
|
}
|
||||||
w.intf.notifySending(size, isLinkTraffic)
|
w.intf.notifySending(size, isLinkTraffic)
|
||||||
|
// start a timer that will fire if we get stuck in writeMsgs for an oddly long time
|
||||||
|
var once sync.Once
|
||||||
|
timer := time.AfterFunc(time.Millisecond, func() {
|
||||||
|
// 1 ms is kind of arbitrary
|
||||||
|
// the rationale is that this should be very long compared to a syscall
|
||||||
|
// but it's still short compared to end-to-end latency or human perception
|
||||||
|
once.Do(func() {
|
||||||
|
w.intf.Act(nil, w.intf._notifySyscall)
|
||||||
|
})
|
||||||
|
})
|
||||||
w.intf.msgIO.writeMsgs(bss)
|
w.intf.msgIO.writeMsgs(bss)
|
||||||
|
// Make sure we either stop the timer from doing anything or wait until it's done
|
||||||
|
once.Do(func() { timer.Stop() })
|
||||||
w.intf.notifySent(size, isLinkTraffic)
|
w.intf.notifySent(size, isLinkTraffic)
|
||||||
// Cleanup
|
// Cleanup
|
||||||
for _, bs := range bss {
|
for _, bs := range bss {
|
||||||
|
@ -177,6 +177,7 @@ type switchTable struct {
|
|||||||
phony.Inbox // Owns the below
|
phony.Inbox // Owns the below
|
||||||
queues switch_buffers // Queues - not atomic so ONLY use through the actor
|
queues switch_buffers // Queues - not atomic so ONLY use through the actor
|
||||||
idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor
|
idle map[switchPort]struct{} // idle peers - not atomic so ONLY use through the actor
|
||||||
|
sending map[switchPort]struct{} // peers known to be blocked in a send (somehow)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Minimum allowed total size of switch queues.
|
// Minimum allowed total size of switch queues.
|
||||||
@ -203,6 +204,7 @@ func (t *switchTable) init(core *Core) {
|
|||||||
core.config.Mutex.RUnlock()
|
core.config.Mutex.RUnlock()
|
||||||
t.queues.bufs = make(map[string]switch_buffer)
|
t.queues.bufs = make(map[string]switch_buffer)
|
||||||
t.idle = make(map[switchPort]struct{})
|
t.idle = make(map[switchPort]struct{})
|
||||||
|
t.sending = make(map[switchPort]struct{})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -527,7 +529,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
|
|||||||
t.parent = sender.port
|
t.parent = sender.port
|
||||||
t.core.peers.sendSwitchMsgs(t)
|
t.core.peers.sendSwitchMsgs(t)
|
||||||
}
|
}
|
||||||
if doUpdate {
|
if true || doUpdate {
|
||||||
t.updater.Store(&sync.Once{})
|
t.updater.Store(&sync.Once{})
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@ -664,7 +666,7 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort {
|
|||||||
// Handle an incoming packet
|
// Handle an incoming packet
|
||||||
// Either send it to ourself, or to the first idle peer that's free
|
// Either send it to ourself, or to the first idle peer that's free
|
||||||
// Returns true if the packet has been handled somehow, false if it should be queued
|
// Returns true if the packet has been handled somehow, false if it should be queued
|
||||||
func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) bool {
|
func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}, sending map[switchPort]struct{}) bool {
|
||||||
coords := switch_getPacketCoords(packet)
|
coords := switch_getPacketCoords(packet)
|
||||||
closer := t.getCloser(coords)
|
closer := t.getCloser(coords)
|
||||||
if len(closer) == 0 {
|
if len(closer) == 0 {
|
||||||
@ -677,12 +679,13 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo
|
|||||||
ports := t.core.peers.getPorts()
|
ports := t.core.peers.getPorts()
|
||||||
for _, cinfo := range closer {
|
for _, cinfo := range closer {
|
||||||
to := ports[cinfo.elem.port]
|
to := ports[cinfo.elem.port]
|
||||||
_, isIdle := idle[cinfo.elem.port]
|
//_, isIdle := idle[cinfo.elem.port]
|
||||||
|
_, isSending := sending[cinfo.elem.port]
|
||||||
var update bool
|
var update bool
|
||||||
switch {
|
switch {
|
||||||
case to == nil:
|
case to == nil:
|
||||||
// no port was found, ignore it
|
// no port was found, ignore it
|
||||||
case !isIdle:
|
case isSending:
|
||||||
// the port is busy, ignore it
|
// the port is busy, ignore it
|
||||||
case best == nil:
|
case best == nil:
|
||||||
// this is the first idle port we've found, so select it until we find a
|
// this is the first idle port we've found, so select it until we find a
|
||||||
@ -702,6 +705,7 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo
|
|||||||
// has a n older tstamp, so presumably a worse path
|
// has a n older tstamp, so presumably a worse path
|
||||||
case cinfo.elem.time.Before(best.elem.time):
|
case cinfo.elem.time.Before(best.elem.time):
|
||||||
// same tstamp, but got it earlier, so presumably a better path
|
// same tstamp, but got it earlier, so presumably a better path
|
||||||
|
//t.core.log.Println("DEBUG new best:", best.elem.time, cinfo.elem.time)
|
||||||
update = true
|
update = true
|
||||||
default:
|
default:
|
||||||
// the search for a port has finished
|
// the search for a port has finished
|
||||||
@ -712,10 +716,11 @@ func (t *switchTable) _handleIn(packet []byte, idle map[switchPort]struct{}) boo
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if best != nil {
|
if best != nil {
|
||||||
// Send to the best idle next hop
|
if _, isIdle := idle[best.elem.port]; isIdle {
|
||||||
delete(idle, best.elem.port)
|
delete(idle, best.elem.port)
|
||||||
ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet})
|
ports[best.elem.port].sendPacketsFrom(t, [][]byte{packet})
|
||||||
return true
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// Didn't find anyone idle to send it to
|
// Didn't find anyone idle to send it to
|
||||||
return false
|
return false
|
||||||
@ -785,6 +790,7 @@ func (b *switch_buffers) _cleanup(t *switchTable) {
|
|||||||
// Loops over packets and sends the newest one that's OK for this peer to send
|
// Loops over packets and sends the newest one that's OK for this peer to send
|
||||||
// Returns true if the peer is no longer idle, false if it should be added to the idle list
|
// Returns true if the peer is no longer idle, false if it should be added to the idle list
|
||||||
func (t *switchTable) _handleIdle(port switchPort) bool {
|
func (t *switchTable) _handleIdle(port switchPort) bool {
|
||||||
|
// TODO? only send packets for which this is the best next hop that isn't currently blocked sending
|
||||||
to := t.core.peers.getPorts()[port]
|
to := t.core.peers.getPorts()[port]
|
||||||
if to == nil {
|
if to == nil {
|
||||||
return true
|
return true
|
||||||
@ -842,7 +848,7 @@ func (t *switchTable) packetInFrom(from phony.Actor, bytes []byte) {
|
|||||||
|
|
||||||
func (t *switchTable) _packetIn(bytes []byte) {
|
func (t *switchTable) _packetIn(bytes []byte) {
|
||||||
// Try to send it somewhere (or drop it if it's corrupt or at a dead end)
|
// Try to send it somewhere (or drop it if it's corrupt or at a dead end)
|
||||||
if !t._handleIn(bytes, t.idle) {
|
if !t._handleIn(bytes, t.idle, t.sending) {
|
||||||
// There's nobody free to take it right now, so queue it for later
|
// There's nobody free to take it right now, so queue it for later
|
||||||
packet := switch_packetInfo{bytes, time.Now()}
|
packet := switch_packetInfo{bytes, time.Now()}
|
||||||
streamID := switch_getPacketStreamID(packet.bytes)
|
streamID := switch_getPacketStreamID(packet.bytes)
|
||||||
@ -869,8 +875,15 @@ func (t *switchTable) _packetIn(bytes []byte) {
|
|||||||
|
|
||||||
func (t *switchTable) _idleIn(port switchPort) {
|
func (t *switchTable) _idleIn(port switchPort) {
|
||||||
// Try to find something to send to this peer
|
// Try to find something to send to this peer
|
||||||
|
delete(t.sending, port)
|
||||||
if !t._handleIdle(port) {
|
if !t._handleIdle(port) {
|
||||||
// Didn't find anything ready to send yet, so stay idle
|
// Didn't find anything ready to send yet, so stay idle
|
||||||
t.idle[port] = struct{}{}
|
t.idle[port] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *switchTable) _sendingIn(port switchPort) {
|
||||||
|
if _, isIn := t.idle[port]; !isIn {
|
||||||
|
t.sending[port] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -233,8 +233,9 @@ func (t *tcp) call(saddr string, options interface{}, sintf string) {
|
|||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
// Block new calls for a little while, to mitigate livelock scenarios
|
// Block new calls for a little while, to mitigate livelock scenarios
|
||||||
time.Sleep(default_timeout)
|
rand.Seed(time.Now().UnixNano())
|
||||||
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
|
delay := default_timeout + time.Duration(rand.Intn(10000))*time.Millisecond
|
||||||
|
time.Sleep(delay)
|
||||||
t.mutex.Lock()
|
t.mutex.Lock()
|
||||||
delete(t.calls, callname)
|
delete(t.calls, callname)
|
||||||
t.mutex.Unlock()
|
t.mutex.Unlock()
|
||||||
|
Loading…
Reference in New Issue
Block a user