5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-25 23:01:38 +00:00

slightly better way for the tcp sender goroutine(s) to block waiting for work

This commit is contained in:
Arceliar 2018-06-22 20:39:57 -05:00
parent fd074a4364
commit 0021f3463f

View File

@ -242,7 +242,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
in := func(bs []byte) {
p.handlePacket(bs)
}
out := make(chan []byte, 32) // TODO? what size makes sense
out := make(chan []byte, 1)
defer close(out)
go func() {
var shadow int64
@ -255,13 +255,17 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
shadow++
}
}
send := func(msg []byte) {
msgLen := wire_encode_uint64(uint64(len(msg)))
buf := net.Buffers{tcp_msg[:], msgLen, msg}
buf.WriteTo(sock)
atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg)))
util_putBytes(msg)
}
send := make(chan []byte)
defer close(send)
go func() {
for msg := range send {
msgLen := wire_encode_uint64(uint64(len(msg)))
buf := net.Buffers{tcp_msg[:], msgLen, msg}
buf.WriteTo(sock)
atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg)))
util_putBytes(msg)
}
}()
timerInterval := tcp_timeout * 2 / 3
timer := time.NewTimer(timerInterval)
defer timer.Stop()
@ -278,9 +282,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
timer.Reset(timerInterval)
select {
case _ = <-timer.C:
send(nil) // TCP keep-alive traffic
send <- nil // TCP keep-alive traffic
case msg := <-p.linkOut:
send(msg)
send <- msg
case msg, ok := <-out:
if !ok {
return
@ -288,31 +292,32 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
put(msg)
}
for len(stack) > 0 {
// First make sure linkOut gets sent first, if it's non-empty
select {
case msg := <-p.linkOut:
send(msg)
send <- msg
default:
}
// Then block until we send or receive something
select {
case msg := <-p.linkOut:
send <- msg
case msg, ok := <-out:
if !ok {
return
}
put(msg)
default:
msg := stack[len(stack)-1]
case send <- stack[len(stack)-1]:
stack = stack[:len(stack)-1]
send(msg)
p.updateQueueSize(-1)
}
}
}
}()
p.out = func(msg []byte) {
p.updateQueueSize(1)
defer func() { recover() }()
select {
case out <- msg:
p.updateQueueSize(1)
default:
util_putBytes(msg)
}
out <- msg
}
p.close = func() { sock.Close() }
setNoDelay(sock, true)