5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-23 02:01:36 +00:00

Merge pull request #510 from Arceliar/streamWrites

Send multiple packets from the switch at once
This commit is contained in:
Arceliar 2019-08-18 18:19:06 -05:00 committed by GitHub
commit c04816b4bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 88 additions and 78 deletions

View File

@ -556,8 +556,10 @@ func DEBUG_simLinkPeers(p, q *peer) {
goWorkers := func(source, dest *peer) { goWorkers := func(source, dest *peer) {
source.linkOut = make(chan []byte, 1) source.linkOut = make(chan []byte, 1)
send := make(chan []byte, 1) send := make(chan []byte, 1)
source.out = func(bs []byte) { source.out = func(bss [][]byte) {
send <- bs for _, bs := range bss {
send <- bs
}
} }
go source.linkLoop() go source.linkLoop()
go func() { go func() {

View File

@ -37,7 +37,7 @@ type linkInfo struct {
type linkInterfaceMsgIO interface { type linkInterfaceMsgIO interface {
readMsg() ([]byte, error) readMsg() ([]byte, error)
writeMsg([]byte) (int, error) writeMsgs([][]byte) (int, error)
close() error close() error
// These are temporary workarounds to stream semantics // These are temporary workarounds to stream semantics
_sendMetaBytes([]byte) error _sendMetaBytes([]byte) error
@ -207,11 +207,11 @@ func (intf *linkInterface) handler() error {
intf.link.core.peers.removePeer(intf.peer.port) intf.link.core.peers.removePeer(intf.peer.port)
}() }()
// Finish setting up the peer struct // Finish setting up the peer struct
out := make(chan []byte, 1) out := make(chan [][]byte, 1)
defer close(out) defer close(out)
intf.peer.out = func(msg []byte) { intf.peer.out = func(msgs [][]byte) {
defer func() { recover() }() defer func() { recover() }()
out <- msg out <- msgs
} }
intf.peer.linkOut = make(chan []byte, 1) intf.peer.linkOut = make(chan []byte, 1)
themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box)) themAddr := address.AddrForNodeID(crypto.GetNodeID(&intf.info.box))
@ -234,12 +234,12 @@ func (intf *linkInterface) handler() error {
interval := 4 * time.Second interval := 4 * time.Second
tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp
defer util.TimerStop(tcpTimer) defer util.TimerStop(tcpTimer)
send := func(bs []byte) { send := func(bss [][]byte) {
sendBlocked.Reset(time.Second) sendBlocked.Reset(time.Second)
intf.msgIO.writeMsg(bs) size, _ := intf.msgIO.writeMsgs(bss)
util.TimerStop(sendBlocked) util.TimerStop(sendBlocked)
select { select {
case signalSent <- len(bs) > 0: case signalSent <- size > 0:
default: default:
} }
} }
@ -247,7 +247,7 @@ func (intf *linkInterface) handler() error {
// First try to send any link protocol traffic // First try to send any link protocol traffic
select { select {
case msg := <-intf.peer.linkOut: case msg := <-intf.peer.linkOut:
send(msg) send([][]byte{msg})
continue continue
default: default:
} }
@ -259,19 +259,21 @@ func (intf *linkInterface) handler() error {
case <-tcpTimer.C: case <-tcpTimer.C:
intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s", intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
send(nil) send([][]byte{nil})
case <-sendAck: case <-sendAck:
intf.link.core.log.Tracef("Sending ack to %s: %s, source %s", intf.link.core.log.Tracef("Sending ack to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
send(nil) send([][]byte{nil})
case msg := <-intf.peer.linkOut: case msg := <-intf.peer.linkOut:
send(msg) send([][]byte{msg})
case msg, ok := <-out: case msgs, ok := <-out:
if !ok { if !ok {
return return
} }
send(msg) send(msgs)
util.PutBytes(msg) for _, msg := range msgs {
util.PutBytes(msg)
}
select { select {
case signalReady <- struct{}{}: case signalReady <- struct{}{}:
default: default:

View File

@ -109,7 +109,7 @@ type peer struct {
linkOut (chan []byte) // used for protocol traffic (to bypass queues) linkOut (chan []byte) // used for protocol traffic (to bypass queues)
doSend (chan struct{}) // tell the linkLoop to send a switchMsg doSend (chan struct{}) // tell the linkLoop to send a switchMsg
dinfo (chan *dhtInfo) // used to keep the DHT working dinfo (chan *dhtInfo) // used to keep the DHT working
out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes out func([][]byte) // Set up by whatever created the peers struct, used to send packets to other nodes
close func() // Called when a peer is removed, to close the underlying connection, or via admin api close func() // Called when a peer is removed, to close the underlying connection, or via admin api
} }
@ -250,11 +250,15 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
} }
// This just calls p.out(packet) for now. // This just calls p.out(packet) for now.
func (p *peer) sendPacket(packet []byte) { func (p *peer) sendPackets(packets [][]byte) {
// Is there ever a case where something more complicated is needed? // Is there ever a case where something more complicated is needed?
// What if p.out blocks? // What if p.out blocks?
atomic.AddUint64(&p.bytesSent, uint64(len(packet))) var size int
p.out(packet) for _, packet := range packets {
size += len(packet)
}
atomic.AddUint64(&p.bytesSent, uint64(size))
p.out(packets)
} }
// This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers. // This wraps the packet in the inner (ephemeral) and outer (permanent) crypto layers.

View File

@ -39,10 +39,10 @@ type router struct {
reconfigure chan chan error reconfigure chan chan error
addr address.Address addr address.Address
subnet address.Subnet subnet address.Subnet
in <-chan []byte // packets we received from the network, link to peer's "out" in <-chan [][]byte // packets we received from the network, link to peer's "out"
out func([]byte) // packets we're sending to the network, link to peer's "in" out func([]byte) // packets we're sending to the network, link to peer's "in"
reset chan struct{} // signal that coords changed (re-init sessions/dht) reset chan struct{} // signal that coords changed (re-init sessions/dht)
admin chan func() // pass a lambda for the admin socket to query stuff admin chan func() // pass a lambda for the admin socket to query stuff
nodeinfo nodeinfo nodeinfo nodeinfo
} }
@ -52,7 +52,7 @@ func (r *router) init(core *Core) {
r.reconfigure = make(chan chan error, 1) r.reconfigure = make(chan chan error, 1)
r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
in := make(chan []byte, 1) // TODO something better than this... in := make(chan [][]byte, 1) // TODO something better than this...
self := linkInterface{ self := linkInterface{
name: "(self)", name: "(self)",
info: linkInfo{ info: linkInfo{
@ -62,7 +62,7 @@ func (r *router) init(core *Core) {
}, },
} }
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil) p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
p.out = func(packet []byte) { in <- packet } p.out = func(packets [][]byte) { in <- packets }
r.in = in r.in = in
out := make(chan []byte, 32) out := make(chan []byte, 32)
go func() { go func() {
@ -114,8 +114,10 @@ func (r *router) mainLoop() {
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case p := <-r.in: case ps := <-r.in:
r.handleIn(p) for _, p := range ps {
r.handleIn(p)
}
case info := <-r.core.dht.peers: case info := <-r.core.dht.peers:
r.core.dht.insertPeer(info) r.core.dht.insertPeer(info)
case <-r.reset: case <-r.reset:

View File

@ -35,29 +35,19 @@ func (s *stream) init(rwc io.ReadWriteCloser) {
} }
// writeMsg writes a message with stream padding, and is *not* thread safe. // writeMsg writes a message with stream padding, and is *not* thread safe.
func (s *stream) writeMsg(bs []byte) (int, error) { func (s *stream) writeMsgs(bss [][]byte) (int, error) {
buf := s.outputBuffer[:0] buf := s.outputBuffer[:0]
buf = append(buf, streamMsg[:]) var written int
l := wire_put_uint64(uint64(len(bs)), util.GetBytes()) for _, bs := range bss {
defer util.PutBytes(l) buf = append(buf, streamMsg[:])
buf = append(buf, l) buf = append(buf, wire_encode_uint64(uint64(len(bs))))
padLen := len(buf[0]) + len(buf[1]) buf = append(buf, bs)
buf = append(buf, bs) written += len(bs)
totalLen := padLen + len(bs)
s.outputBuffer = buf[:0] // So we can reuse the same underlying array later
var bn int
for bn < totalLen {
n, err := buf.WriteTo(s.rwc)
bn += int(n)
if err != nil {
l := bn - padLen
if l < 0 {
l = 0
}
return l, err
}
} }
return len(bs), nil s.outputBuffer = buf[:0] // So we can reuse the same underlying array later
_, err := buf.WriteTo(s.rwc)
// TODO only include number of bytes from bs *successfully* written?
return written, err
} }
// readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe. // readMsg reads a message from the stream, accounting for stream padding, and is *not* thread safe.

View File

@ -709,7 +709,7 @@ func (t *switchTable) handleIn(packet []byte, idle map[switchPort]time.Time) boo
if best != nil { if best != nil {
// Send to the best idle next hop // Send to the best idle next hop
delete(idle, best.port) delete(idle, best.port)
best.sendPacket(packet) best.sendPackets([][]byte{packet})
return true return true
} }
// Didn't find anyone idle to send it to // Didn't find anyone idle to send it to
@ -784,39 +784,49 @@ func (t *switchTable) handleIdle(port switchPort) bool {
if to == nil { if to == nil {
return true return true
} }
var best string var packets [][]byte
var bestPriority float64 var psize int
t.queues.cleanup(t) t.queues.cleanup(t)
now := time.Now() now := time.Now()
for streamID, buf := range t.queues.bufs { for psize < 65535 {
// Filter over the streams that this node is closer to var best string
// Keep the one with the smallest queue var bestPriority float64
packet := buf.packets[0] for streamID, buf := range t.queues.bufs {
coords := switch_getPacketCoords(packet.bytes) // Filter over the streams that this node is closer to
priority := float64(now.Sub(packet.time)) / float64(buf.size) // Keep the one with the smallest queue
if priority > bestPriority && t.portIsCloser(coords, port) { packet := buf.packets[0]
best = streamID coords := switch_getPacketCoords(packet.bytes)
bestPriority = priority priority := float64(now.Sub(packet.time)) / float64(buf.size)
if priority > bestPriority && t.portIsCloser(coords, port) {
best = streamID
bestPriority = priority
}
} }
} if bestPriority != 0 {
if bestPriority != 0 { buf := t.queues.bufs[best]
buf := t.queues.bufs[best] var packet switch_packetInfo
var packet switch_packetInfo // TODO decide if this should be LIFO or FIFO
// TODO decide if this should be LIFO or FIFO packet, buf.packets = buf.packets[0], buf.packets[1:]
packet, buf.packets = buf.packets[0], buf.packets[1:] buf.size -= uint64(len(packet.bytes))
buf.size -= uint64(len(packet.bytes)) t.queues.size -= uint64(len(packet.bytes))
t.queues.size -= uint64(len(packet.bytes)) if len(buf.packets) == 0 {
if len(buf.packets) == 0 { delete(t.queues.bufs, best)
delete(t.queues.bufs, best) } else {
// Need to update the map, since buf was retrieved by value
t.queues.bufs[best] = buf
}
packets = append(packets, packet.bytes)
psize += len(packet.bytes)
} else { } else {
// Need to update the map, since buf was retrieved by value // Finished finding packets
t.queues.bufs[best] = buf break
} }
to.sendPacket(packet.bytes)
return true
} else {
return false
} }
if len(packets) > 0 {
to.sendPackets(packets)
return true
}
return false
} }
// The switch worker does routing lookups and sends packets to where they need to be // The switch worker does routing lookups and sends packets to where they need to be
@ -826,7 +836,7 @@ func (t *switchTable) doWorker() {
// Keep sending packets to the router // Keep sending packets to the router
self := t.core.peers.getPorts()[0] self := t.core.peers.getPorts()[0]
for bs := range sendingToRouter { for bs := range sendingToRouter {
self.sendPacket(bs) self.sendPackets([][]byte{bs})
} }
}() }()
go func() { go func() {