5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-23 00:51:35 +00:00

add a dedicated switch worker and start using it for lookups

This commit is contained in:
Arceliar 2018-06-23 19:08:32 -05:00
parent 2ae213c255
commit 988f4ad265
4 changed files with 53 additions and 22 deletions

View File

@ -101,6 +101,11 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
return err return err
} }
if err := c.switchTable.start(); err != nil {
c.log.Println("Failed to start switch")
return err
}
if err := c.router.start(); err != nil { if err := c.router.start(); err != nil {
c.log.Println("Failed to start router") c.log.Println("Failed to start router")
return err return err

View File

@ -49,6 +49,7 @@ func (c *Core) Init() {
bpub, bpriv := newBoxKeys() bpub, bpriv := newBoxKeys()
spub, spriv := newSigKeys() spub, spriv := newSigKeys()
c.init(bpub, bpriv, spub, spriv) c.init(bpub, bpriv, spub, spriv)
c.switchTable.start()
c.router.start() c.router.start()
} }

View File

@ -229,19 +229,7 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
// Drop traffic until the peer manages to send us at least one good switchMsg // Drop traffic until the peer manages to send us at least one good switchMsg
return return
} }
coords, coordLen := wire_decode_coords(packet[pTypeLen:]) p.core.switchTable.packetIn <- packet
if coordLen >= len(packet) {
return
} // No payload
toPort := p.core.switchTable.lookup(coords)
if toPort == p.port {
return
}
to := p.core.peers.getPorts()[toPort]
if to == nil {
return
}
to.sendPacket(packet)
} }
// This just calls p.out(packet) for now. // This just calls p.out(packet) for now.

View File

@ -164,6 +164,7 @@ type switchTable struct {
data switchData data switchData
updater atomic.Value //*sync.Once updater atomic.Value //*sync.Once
table atomic.Value //lookupTable table atomic.Value //lookupTable
packetIn chan []byte // Incoming packets for the worker to handle
} }
// Initializes the switchTable struct. // Initializes the switchTable struct.
@ -177,6 +178,7 @@ func (t *switchTable) init(core *Core, key sigPubKey) {
t.updater.Store(&sync.Once{}) t.updater.Store(&sync.Once{})
t.table.Store(lookupTable{}) t.table.Store(lookupTable{})
t.drop = make(map[sigPubKey]int64) t.drop = make(map[sigPubKey]int64)
t.packetIn = make(chan []byte, 1024)
} }
// Safely gets a copy of this node's locator. // Safely gets a copy of this node's locator.
@ -438,6 +440,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) {
return return
} }
////////////////////////////////////////////////////////////////////////////////
// The rest of these are related to the switch worker
// This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions. // This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions.
func (t *switchTable) updateTable() { func (t *switchTable) updateTable() {
// WARNING this should only be called from within t.data.updater.Do() // WARNING this should only be called from within t.data.updater.Do()
@ -506,3 +512,34 @@ func (t *switchTable) lookup(dest []byte) switchPort {
} }
return best return best
} }
// Starts the switch worker
func (t *switchTable) start() error {
t.core.log.Println("Starting switch")
go t.doWorker()
return nil
}
func (t *switchTable) handleIn(packet []byte) {
// Get the coords, skipping the first byte (the pType)
_, pTypeLen := wire_decode_uint64(packet)
coords, coordLen := wire_decode_coords(packet[pTypeLen:])
if coordLen >= len(packet) {
util_putBytes(packet)
return
} // No payload
toPort := t.lookup(coords)
to := t.core.peers.getPorts()[toPort]
if to == nil {
util_putBytes(packet)
return
}
to.sendPacket(packet)
}
// The switch worker does routing lookups and sends packets to where they need to be
func (t *switchTable) doWorker() {
for packet := range t.packetIn {
t.handleIn(packet)
}
}