diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index db57b56..32cfbf3 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -242,7 +242,7 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { in := func(bs []byte) { p.handlePacket(bs) } - out := make(chan []byte, 32) // TODO? what size makes sense + out := make(chan []byte, 1) defer close(out) go func() { var shadow int64 @@ -255,13 +255,17 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { shadow++ } } - send := func(msg []byte) { - msgLen := wire_encode_uint64(uint64(len(msg))) - buf := net.Buffers{tcp_msg[:], msgLen, msg} - buf.WriteTo(sock) - atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) - util_putBytes(msg) - } + send := make(chan []byte) + defer close(send) + go func() { + for msg := range send { + msgLen := wire_encode_uint64(uint64(len(msg))) + buf := net.Buffers{tcp_msg[:], msgLen, msg} + buf.WriteTo(sock) + atomic.AddUint64(&p.bytesSent, uint64(len(tcp_msg)+len(msgLen)+len(msg))) + util_putBytes(msg) + } + }() timerInterval := tcp_timeout * 2 / 3 timer := time.NewTimer(timerInterval) defer timer.Stop() @@ -278,9 +282,9 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { timer.Reset(timerInterval) select { case _ = <-timer.C: - send(nil) // TCP keep-alive traffic + send <- nil // TCP keep-alive traffic case msg := <-p.linkOut: - send(msg) + send <- msg case msg, ok := <-out: if !ok { return @@ -288,31 +292,32 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { put(msg) } for len(stack) > 0 { + // First make sure linkOut gets sent first, if it's non-empty select { case msg := <-p.linkOut: - send(msg) + send <- msg + default: + } + // Then block until we send or receive something + select { + case msg := <-p.linkOut: + send <- msg case msg, ok := <-out: if !ok { return } put(msg) - default: - msg := stack[len(stack)-1] + case send <- stack[len(stack)-1]: stack = stack[:len(stack)-1] - send(msg) p.updateQueueSize(-1) } } } }() p.out = func(msg []byte) { + p.updateQueueSize(1) defer func() { recover() }() - select { - case out <- msg: - p.updateQueueSize(1) - default: - util_putBytes(msg) - } + out <- msg } p.close = func() { sock.Close() } setNoDelay(sock, true)