diff --git a/src/yggdrasil/link.go b/src/yggdrasil/link.go index eca96eb..4ce374b 100644 --- a/src/yggdrasil/link.go +++ b/src/yggdrasil/link.go @@ -37,7 +37,7 @@ type linkInfo struct { type linkInterfaceMsgIO interface { readMsg() ([]byte, error) - writeMsg([]byte) (int, error) + writeMsgs([][]byte) (int, error) close() error // These are temporary workarounds to stream semantics _sendMetaBytes([]byte) error @@ -207,11 +207,11 @@ func (intf *linkInterface) handler() error { intf.link.core.peers.removePeer(intf.peer.port) }() // Finish setting up the peer struct - out := make(chan []byte, 1) + out := make(chan [][]byte, 1) defer close(out) - intf.peer.out = func(msg []byte) { + intf.peer.out = func(msgs [][]byte) { defer func() { recover() }() - out <- msg + out <- msgs } intf.peer.linkOut = make(chan []byte, 1) themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) @@ -234,12 +234,12 @@ func (intf *linkInterface) handler() error { interval := 4 * time.Second tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp defer util.TimerStop(tcpTimer) - send := func(bs []byte) { + send := func(bss [][]byte) { sendBlocked.Reset(time.Second) - intf.msgIO.writeMsg(bs) + size, _ := intf.msgIO.writeMsgs(bss) util.TimerStop(sendBlocked) select { - case signalSent <- len(bs) > 0: + case signalSent <- size > 0: default: } } @@ -247,7 +247,7 @@ func (intf *linkInterface) handler() error { // First try to send any link protocol traffic select { case msg := <-intf.peer.linkOut: - send(msg) + send([][]byte{msg}) continue default: } @@ -259,19 +259,21 @@ func (intf *linkInterface) handler() error { case <-tcpTimer.C: intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) - send(nil) + send([][]byte{nil}) case <-sendAck: intf.link.core.log.Tracef("Sending ack to %s: %s, source %s", strings.ToUpper(intf.info.linkType), themString, intf.info.local) - send(nil) + send([][]byte{nil}) case msg := <-intf.peer.linkOut: - send(msg) - case msg, ok := <-out: + send([][]byte{msg}) + case msgs, ok := <-out: if !ok { return } - send(msg) - util.PutBytes(msg) + send(msgs) + for _, msg := range msgs { + util.PutBytes(msg) + } select { case signalReady <- struct{}{}: default: diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 06201f9..379ca85 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -109,7 +109,7 @@ type peer struct { linkOut (chan []byte) // used for protocol traffic (to bypass queues) doSend (chan struct{}) // tell the linkLoop to send a switchMsg dinfo (chan *dhtInfo) // used to keep the DHT working - out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes + out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes close func() // Called when a peer is removed, to close the underlying connection, or via admin api } @@ -250,11 +250,15 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) { } // This just calls p.out(packet) for now. -func (p *peer) sendPacket(packet []byte) { +func (p *peer) sendPackets(packets [][]byte) { // Is there ever a case where something more complicated is needed? // What if p.out blocks? - atomic.AddUint64(&p.bytesSent, uint64(len(packet))) - p.out(packet) + var size int + for _, packet := range packets { + size += len(packet) + } + atomic.AddUint64(&p.bytesSent, uint64(size)) + p.out(packets) } // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 7e2a325..bdead84 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -39,10 +39,10 @@ type router struct { reconfigure chan chan error addr address.Address subnet address.Subnet - in <-chan []byte // packets we received from the network, link to peer's "out" - out func([]byte) // packets we're sending to the network, link to peer's "in" - reset chan struct{} // signal that coords changed (re-init sessions/dht) - admin chan func() // pass a lambda for the admin socket to query stuff + in <-chan [][]byte // packets we received from the network, link to peer's "out" + out func([]byte) // packets we're sending to the network, link to peer's "in" + reset chan struct{} // signal that coords changed (re-init sessions/dht) + admin chan func() // pass a lambda for the admin socket to query stuff nodeinfo nodeinfo } @@ -52,7 +52,7 @@ func (r *router) init(core *Core) { r.reconfigure = make(chan chan error, 1) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) - in := make(chan []byte, 1) // TODO something better than this... + in := make(chan [][]byte, 1) // TODO something better than this... self := linkInterface{ name: "(self)", info: linkInfo{ @@ -62,7 +62,7 @@ func (r *router) init(core *Core) { }, } p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) - p.out = func(packet []byte) { in <- packet } + p.out = func(packets [][]byte) { in <- packets } r.in = in out := make(chan []byte, 32) go func() { @@ -114,8 +114,10 @@ func (r *router) mainLoop() { defer ticker.Stop() for { select { - case p := <-r.in: - r.handleIn(p) + case ps := <-r.in: + for _, p := range ps { + r.handleIn(p) + } case info := <-r.core.dht.peers: r.core.dht.insertPeer(info) case <-r.reset: diff --git a/src/yggdrasil/stream.go b/src/yggdrasil/stream.go index 56d4754..4ab37c2 100644 --- a/src/yggdrasil/stream.go +++ b/src/yggdrasil/stream.go @@ -35,29 +35,19 @@ func (s *stream) init(rwc io.ReadWriteCloser) { } // writeMsg writes a message with stream padding, and is *not* thread safe. -func (s *stream) writeMsg(bs []byte) (int, error) { +func (s *stream) writeMsgs(bss [][]byte) (int, error) { buf := s.outputBuffer[:0] - buf = append(buf, streamMsg[:]) - l := wire_put_uint64(uint64(len(bs)), util.GetBytes()) - defer util.PutBytes(l) - buf = append(buf, l) - padLen := len(buf[0]) + len(buf[1]) - buf = append(buf, bs) - totalLen := padLen + len(bs) - s.outputBuffer = buf[:0] // So we can reuse the same underlying array later - var bn int - for bn < totalLen { - n, err := buf.WriteTo(s.rwc) - bn += int(n) - if err != nil { - l := bn - padLen - if l < 0 { - l = 0 - } - return l, err - } + var written int + for _, bs := range bss { + buf = append(buf, streamMsg[:]) + buf = append(buf, wire_encode_uint64(uint64(len(bs)))) + buf = append(buf, bs) + written += len(bs) } - return len(bs), nil + s.outputBuffer = buf[:0] // So we can reuse the same underlying array later + _, err := buf.WriteTo(s.rwc) + // TODO only include number of bytes from bs *successfully* written? + return written, err } // readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe. diff --git a/src/yggdrasil/switch.go b/src/yggdrasil/switch.go index d092625..9be8554 100644 --- a/src/yggdrasil/switch.go +++ b/src/yggdrasil/switch.go @@ -709,7 +709,7 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]time.Time) boo if best != nil { // Send to the best idle next hop delete(idle, best.port) - best.sendPacket(packet) + best.sendPackets([][]byte{packet}) return true } // Didn't find anyone idle to send it to @@ -812,7 +812,7 @@ func (t *switchTable) handleIdle(port switchPort) bool { // Need to update the map, since buf was retrieved by value t.queues.bufs[best] = buf } - to.sendPacket(packet.bytes) + to.sendPackets([][]byte{packet.bytes}) return true } else { return false @@ -826,7 +826,7 @@ func (t *switchTable) doWorker() { // Keep sending packets to the router self := t.core.peers.getPorts()[0] for bs := range sendingToRouter { - self.sendPacket(bs) + self.sendPackets([][]byte{bs}) } }() go func() {