5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-09-20 03:42:32 +00:00

Merge branch 'pathfinder' of https://github.com/Arceliar/yggdrasil-go into future

This commit is contained in:
Arceliar 2021-05-08 06:45:10 -05:00
commit ae96148008
9 changed files with 228 additions and 149 deletions

View File

@ -28,6 +28,7 @@ type dhtInfo struct {
recv time.Time // When we last received a message recv time.Time // When we last received a message
pings int // Time out if at least 3 consecutive maintenance pings drop pings int // Time out if at least 3 consecutive maintenance pings drop
throttle time.Duration throttle time.Duration
path []byte // source route the destination, learned from response rpath
dirty bool // Set to true if we've used this node in ping responses (for queries about someone other than the person doing the asking, i.e. real searches) since the last time we heard from the node dirty bool // Set to true if we've used this node in ping responses (for queries about someone other than the person doing the asking, i.e. real searches) since the last time we heard from the node
} }
@ -89,12 +90,17 @@ func (t *dht) reconfigure() {
// Resets the DHT in response to coord changes. // Resets the DHT in response to coord changes.
// This empties all info from the DHT and drops outstanding requests. // This empties all info from the DHT and drops outstanding requests.
func (t *dht) reset() { func (t *dht) reset() {
t.reqs = make(map[dhtReqKey]time.Time)
for _, info := range t.table { for _, info := range t.table {
if t.isImportant(info) { if t.isImportant(info) {
t.ping(info, nil) // This will source route if a path is already known
if info.path != nil {
// In case the source route died, but the dest coords are still OK...
info.path = nil
t.ping(info, nil) t.ping(info, nil)
} }
} }
t.reqs = make(map[dhtReqKey]time.Time) }
t.table = make(map[crypto.NodeID]*dhtInfo) t.table = make(map[crypto.NodeID]*dhtInfo)
t.imp = nil t.imp = nil
} }
@ -117,6 +123,9 @@ func (t *dht) lookup(nodeID *crypto.NodeID, everything bool) []*dhtInfo {
results = newRes results = newRes
results = results[:dht_lookup_size] results = results[:dht_lookup_size]
} }
for _, info := range results {
info.dirty = true
}
return results return results
} }
@ -185,7 +194,7 @@ func dht_ordered(first, second, third *crypto.NodeID) bool {
// Reads a request, performs a lookup, and responds. // Reads a request, performs a lookup, and responds.
// Update info about the node that sent the request. // Update info about the node that sent the request.
func (t *dht) handleReq(req *dhtReq) { func (t *dht) handleReq(req *dhtReq, rpath []byte) {
// Send them what they asked for // Send them what they asked for
res := dhtRes{ res := dhtRes{
Key: t.router.core.boxPub, Key: t.router.core.boxPub,
@ -193,7 +202,7 @@ func (t *dht) handleReq(req *dhtReq) {
Dest: req.Dest, Dest: req.Dest,
Infos: t.lookup(&req.Dest, false), Infos: t.lookup(&req.Dest, false),
} }
t.sendRes(&res, req) t.sendRes(&res, req, rpath)
// Also add them to our DHT // Also add them to our DHT
info := dhtInfo{ info := dhtInfo{
key: req.Key, key: req.Key,
@ -213,13 +222,15 @@ func (t *dht) handleReq(req *dhtReq) {
} }
// Sends a lookup response to the specified node. // Sends a lookup response to the specified node.
func (t *dht) sendRes(res *dhtRes, req *dhtReq) { func (t *dht) sendRes(res *dhtRes, req *dhtReq, rpath []byte) {
// Send a reply for a dhtReq // Send a reply for a dhtReq
bs := res.encode() bs := res.encode()
shared := t.router.sessions.getSharedKey(&t.router.core.boxPriv, &req.Key) shared := t.router.sessions.getSharedKey(&t.router.core.boxPriv, &req.Key)
payload, nonce := crypto.BoxSeal(shared, bs, nil) payload, nonce := crypto.BoxSeal(shared, bs, nil)
path := append([]byte{0}, switch_reverseCoordBytes(rpath)...)
p := wire_protoTrafficPacket{ p := wire_protoTrafficPacket{
Coords: req.Coords, Offset: 1,
Coords: path,
ToKey: req.Key, ToKey: req.Key,
FromKey: t.router.core.boxPub, FromKey: t.router.core.boxPub,
Nonce: *nonce, Nonce: *nonce,
@ -242,7 +253,7 @@ func (t *dht) addCallback(rq *dhtReqKey, callback func(*dhtRes)) {
// Reads a lookup response, checks that we had sent a matching request, and processes the response info. // Reads a lookup response, checks that we had sent a matching request, and processes the response info.
// This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses // This mainly consists of updating the node we asked in our DHT (they responded, so we know they're still alive), and deciding if we want to do anything with their responses
func (t *dht) handleRes(res *dhtRes) { func (t *dht) handleRes(res *dhtRes, rpath []byte) {
rq := dhtReqKey{res.Key, res.Dest} rq := dhtReqKey{res.Key, res.Dest}
if callbacks, isIn := t.callbacks[rq]; isIn { if callbacks, isIn := t.callbacks[rq]; isIn {
for _, callback := range callbacks { for _, callback := range callbacks {
@ -258,6 +269,7 @@ func (t *dht) handleRes(res *dhtRes) {
rinfo := dhtInfo{ rinfo := dhtInfo{
key: res.Key, key: res.Key,
coords: res.Coords, coords: res.Coords,
path: switch_reverseCoordBytes(rpath),
} }
if t.isImportant(&rinfo) { if t.isImportant(&rinfo) {
t.insert(&rinfo) t.insert(&rinfo)
@ -289,6 +301,10 @@ func (t *dht) sendReq(req *dhtReq, dest *dhtInfo) {
Nonce: *nonce, Nonce: *nonce,
Payload: payload, Payload: payload,
} }
if dest.path != nil {
p.Coords = append([]byte{0}, dest.path...)
p.Offset += 1
}
packet := p.encode() packet := p.encode()
t.router.out(packet) t.router.out(packet)
rq := dhtReqKey{dest.key, req.Dest} rq := dhtReqKey{dest.key, req.Dest}

View File

@ -54,7 +54,8 @@ func (q *packetQueue) drop() bool {
} }
func (q *packetQueue) push(packet []byte) { func (q *packetQueue) push(packet []byte) {
id := pqStreamID(peer_getPacketCoords(packet)) // just coords for now _, coords := wire_getTrafficOffsetAndCoords(packet)
id := pqStreamID(coords) // just coords for now
info := pqPacketInfo{packet: packet, time: time.Now()} info := pqPacketInfo{packet: packet, time: time.Now()}
for idx := range q.streams { for idx := range q.streams {
if q.streams[idx].id == id { if q.streams[idx].id == id {

View File

@ -236,13 +236,6 @@ func (p *peer) _handlePacket(packet []byte) {
} }
} }
// Get the coords of a packet without decoding
func peer_getPacketCoords(packet []byte) []byte {
_, pTypeLen := wire_decode_uint64(packet)
coords, _ := wire_decode_coords(packet[pTypeLen:])
return coords
}
// Called to handle traffic or protocolTraffic packets. // Called to handle traffic or protocolTraffic packets.
// In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node.
func (p *peer) _handleTraffic(packet []byte) { func (p *peer) _handleTraffic(packet []byte) {
@ -250,8 +243,26 @@ func (p *peer) _handleTraffic(packet []byte) {
// Drop traffic if the peer isn't in the switch // Drop traffic if the peer isn't in the switch
return return
} }
coords := peer_getPacketCoords(packet) obs, coords := wire_getTrafficOffsetAndCoords(packet)
next := p.table.lookup(coords) offset, _ := wire_decode_uint64(obs)
ports := switch_getPorts(coords)
if offset == 0 {
offset = p.table.getOffset(ports)
}
var next switchPort
if offset == 0 {
// Greedy routing, find the best next hop
next = p.table.lookup(ports)
} else {
// Source routing, read next hop from coords and update offset/obs
if int(offset) < len(ports) {
next = ports[offset]
offset += 1
// FIXME this breaks if offset is > 127, it's just for testing
wire_put_uint64(offset, obs[:0])
}
}
packet = wire_put_uint64(uint64(p.port), packet)
if nPeer, isIn := p.ports[next]; isIn { if nPeer, isIn := p.ports[next]; isIn {
nPeer.sendPacketFrom(p, packet) nPeer.sendPacketFrom(p, packet)
} }

View File

@ -196,54 +196,54 @@ func (r *router) _handleProto(packet []byte) {
} }
switch bsType { switch bsType {
case wire_SessionPing: case wire_SessionPing:
r._handlePing(bs, &p.FromKey) r._handlePing(bs, &p.FromKey, p.RPath)
case wire_SessionPong: case wire_SessionPong:
r._handlePong(bs, &p.FromKey) r._handlePong(bs, &p.FromKey, p.RPath)
case wire_NodeInfoRequest: case wire_NodeInfoRequest:
fallthrough fallthrough
case wire_NodeInfoResponse: case wire_NodeInfoResponse:
r._handleNodeInfo(bs, &p.FromKey) r._handleNodeInfo(bs, &p.FromKey)
case wire_DHTLookupRequest: case wire_DHTLookupRequest:
r._handleDHTReq(bs, &p.FromKey) r._handleDHTReq(bs, &p.FromKey, p.RPath)
case wire_DHTLookupResponse: case wire_DHTLookupResponse:
r._handleDHTRes(bs, &p.FromKey) r._handleDHTRes(bs, &p.FromKey, p.RPath)
default: default:
} }
} }
// Decodes session pings from wire format and passes them to sessions.handlePing where they either create or update a session. // Decodes session pings from wire format and passes them to sessions.handlePing where they either create or update a session.
func (r *router) _handlePing(bs []byte, fromKey *crypto.BoxPubKey) { func (r *router) _handlePing(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) {
ping := sessionPing{} ping := sessionPing{}
if !ping.decode(bs) { if !ping.decode(bs) {
return return
} }
ping.SendPermPub = *fromKey ping.SendPermPub = *fromKey
r.sessions.handlePing(&ping) r.sessions.handlePing(&ping, rpath)
} }
// Handles session pongs (which are really pings with an extra flag to prevent acknowledgement). // Handles session pongs (which are really pings with an extra flag to prevent acknowledgement).
func (r *router) _handlePong(bs []byte, fromKey *crypto.BoxPubKey) { func (r *router) _handlePong(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) {
r._handlePing(bs, fromKey) r._handlePing(bs, fromKey, rpath)
} }
// Decodes dht requests and passes them to dht.handleReq to trigger a lookup/response. // Decodes dht requests and passes them to dht.handleReq to trigger a lookup/response.
func (r *router) _handleDHTReq(bs []byte, fromKey *crypto.BoxPubKey) { func (r *router) _handleDHTReq(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) {
req := dhtReq{} req := dhtReq{}
if !req.decode(bs) { if !req.decode(bs) {
return return
} }
req.Key = *fromKey req.Key = *fromKey
r.dht.handleReq(&req) r.dht.handleReq(&req, rpath)
} }
// Decodes dht responses and passes them to dht.handleRes to update the DHT table and further pass them to the search code (if applicable). // Decodes dht responses and passes them to dht.handleRes to update the DHT table and further pass them to the search code (if applicable).
func (r *router) _handleDHTRes(bs []byte, fromKey *crypto.BoxPubKey) { func (r *router) _handleDHTRes(bs []byte, fromKey *crypto.BoxPubKey, rpath []byte) {
res := dhtRes{} res := dhtRes{}
if !res.decode(bs) { if !res.decode(bs) {
return return
} }
res.Key = *fromKey res.Key = *fromKey
r.dht.handleRes(&res) r.dht.handleRes(&res, rpath)
} }
// Decodes nodeinfo request // Decodes nodeinfo request

View File

@ -50,6 +50,7 @@ type sessionInfo struct {
conn *Conn // The associated Conn object conn *Conn // The associated Conn object
callbacks []chan func() // Finished work from crypto workers callbacks []chan func() // Finished work from crypto workers
table *lookupTable // table.self is a locator where we get our coords table *lookupTable // table.self is a locator where we get our coords
path []byte // Path from self to destination
} }
// Represents a session ping/pong packet, and includes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU. // Represents a session ping/pong packet, and includes information like public keys, a session handle, coords, a timestamp to prevent replays, and the tun/tap MTU.
@ -65,41 +66,48 @@ type sessionPing struct {
// Updates session info in response to a ping, after checking that the ping is OK. // Updates session info in response to a ping, after checking that the ping is OK.
// Returns true if the session was updated, or false otherwise. // Returns true if the session was updated, or false otherwise.
func (s *sessionInfo) _update(p *sessionPing) bool { func (sinfo *sessionInfo) _update(p *sessionPing, rpath []byte) bool {
if !(p.Tstamp > s.tstamp) { if !(p.Tstamp > sinfo.tstamp) {
// To protect against replay attacks // To protect against replay attacks
return false return false
} }
if p.SendPermPub != s.theirPermPub { if p.SendPermPub != sinfo.theirPermPub {
// Should only happen if two sessions got the same handle // Should only happen if two sessions got the same handle
// That shouldn't be allowed anyway, but if it happens then let one time out // That shouldn't be allowed anyway, but if it happens then let one time out
return false return false
} }
if p.SendSesPub != s.theirSesPub { if p.SendSesPub != sinfo.theirSesPub {
s.theirSesPub = p.SendSesPub sinfo.path = nil
s.theirHandle = p.Handle sinfo.theirSesPub = p.SendSesPub
s.sharedSesKey = *crypto.GetSharedKey(&s.mySesPriv, &s.theirSesPub) sinfo.theirHandle = p.Handle
s.theirNonce = crypto.BoxNonce{} sinfo.sharedSesKey = *crypto.GetSharedKey(&sinfo.mySesPriv, &sinfo.theirSesPub)
sinfo.theirNonce = crypto.BoxNonce{}
} }
if p.MTU >= 1280 || p.MTU == 0 { if p.MTU >= 1280 || p.MTU == 0 {
s.theirMTU = p.MTU sinfo.theirMTU = p.MTU
if s.conn != nil { if sinfo.conn != nil {
s.conn.setMTU(s, s._getMTU()) sinfo.conn.setMTU(sinfo, sinfo._getMTU())
} }
} }
if !bytes.Equal(s.coords, p.Coords) { if !bytes.Equal(sinfo.coords, p.Coords) {
// allocate enough space for additional coords // allocate enough space for additional coords
s.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...) sinfo.coords = append(make([]byte, 0, len(p.Coords)+11), p.Coords...)
path := switch_reverseCoordBytes(rpath)
sinfo.path = append(sinfo.path[:0], path...)
defer sinfo._sendPingPong(false, nil)
} else if p.IsPong {
path := switch_reverseCoordBytes(rpath)
sinfo.path = append(sinfo.path[:0], path...)
} }
s.time = time.Now() sinfo.time = time.Now()
s.tstamp = p.Tstamp sinfo.tstamp = p.Tstamp
s.reset = false sinfo.reset = false
defer func() { recover() }() // Recover if the below panics defer func() { recover() }() // Recover if the below panics
select { select {
case <-s.init: case <-sinfo.init:
default: default:
// Unblock anything waiting for the session to initialize // Unblock anything waiting for the session to initialize
close(s.init) close(sinfo.init)
} }
return true return true
} }
@ -304,13 +312,13 @@ func (ss *sessions) getSharedKey(myPriv *crypto.BoxPrivKey,
// Sends a session ping by calling sendPingPong in ping mode. // Sends a session ping by calling sendPingPong in ping mode.
func (sinfo *sessionInfo) ping(from phony.Actor) { func (sinfo *sessionInfo) ping(from phony.Actor) {
sinfo.Act(from, func() { sinfo.Act(from, func() {
sinfo._sendPingPong(false) sinfo._sendPingPong(false, nil)
}) })
} }
// Calls getPing, sets the appropriate ping/pong flag, encodes to wire format, and send it. // Calls getPing, sets the appropriate ping/pong flag, encodes to wire format, and send it.
// Updates the time the last ping was sent in the session info. // Updates the time the last ping was sent in the session info.
func (sinfo *sessionInfo) _sendPingPong(isPong bool) { func (sinfo *sessionInfo) _sendPingPong(isPong bool, path []byte) {
ping := sinfo._getPing() ping := sinfo._getPing()
ping.IsPong = isPong ping.IsPong = isPong
bs := ping.encode() bs := ping.encode()
@ -322,10 +330,14 @@ func (sinfo *sessionInfo) _sendPingPong(isPong bool) {
Nonce: *nonce, Nonce: *nonce,
Payload: payload, Payload: payload,
} }
if path != nil {
p.Coords = append([]byte{0}, path...)
p.Offset += 1
}
packet := p.encode() packet := p.encode()
// TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first // TODO rewrite the below if/when the peer struct becomes an actor, to not go through the router first
sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) }) sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.out(packet) })
if sinfo.pingTime.Before(sinfo.time) { if !isPong {
sinfo.pingTime = time.Now() sinfo.pingTime = time.Now()
} }
} }
@ -339,7 +351,7 @@ func (sinfo *sessionInfo) setConn(from phony.Actor, conn *Conn) {
// Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful. // Handles a session ping, creating a session if needed and calling update, then possibly responding with a pong if the ping was in ping mode and the update was successful.
// If the session has a packet cached (common when first setting up a session), it will be sent. // If the session has a packet cached (common when first setting up a session), it will be sent.
func (ss *sessions) handlePing(ping *sessionPing) { func (ss *sessions) handlePing(ping *sessionPing, rpath []byte) {
// Get the corresponding session (or create a new session) // Get the corresponding session (or create a new session)
sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub) sinfo, isIn := ss.getByTheirPerm(&ping.SendPermPub)
switch { switch {
@ -368,11 +380,11 @@ func (ss *sessions) handlePing(ping *sessionPing) {
if sinfo != nil { if sinfo != nil {
sinfo.Act(ss.router, func() { sinfo.Act(ss.router, func() {
// Update the session // Update the session
if !sinfo._update(ping) { /*panic("Should not happen in testing")*/ if !sinfo._update(ping, rpath) { /*panic("Should not happen in testing")*/
return return
} }
if !ping.IsPong { if !ping.IsPong {
sinfo._sendPingPong(true) sinfo._sendPingPong(true, switch_reverseCoordBytes(rpath))
} }
}) })
} }
@ -413,6 +425,8 @@ func (ss *sessions) reset() {
sinfo := _sinfo // So we can safely put it in a closure sinfo := _sinfo // So we can safely put it in a closure
sinfo.Act(ss.router, func() { sinfo.Act(ss.router, func() {
sinfo.reset = true sinfo.reset = true
sinfo._sendPingPong(false, sinfo.path)
sinfo._sendPingPong(false, nil)
}) })
} }
} }
@ -483,12 +497,20 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
return return
} }
sinfo.bytesSent += uint64(len(msg.Message)) sinfo.bytesSent += uint64(len(msg.Message))
coords := append([]byte(nil), sinfo.coords...) var coords []byte
var offset uint64
if len(sinfo.path) > 0 {
coords = append([]byte{0}, sinfo.path...)
offset += 1
} else {
coords = append([]byte(nil), sinfo.coords...)
}
if msg.FlowKey != 0 { if msg.FlowKey != 0 {
coords = append(coords, 0) coords = append(coords, 0)
coords = append(coords, wire_encode_uint64(msg.FlowKey)...) coords = append(coords, wire_encode_uint64(msg.FlowKey)...)
} }
p := wire_trafficPacket{ p := wire_trafficPacket{
Offset: offset,
Coords: coords, Coords: coords,
Handle: sinfo.theirHandle, Handle: sinfo.theirHandle,
Nonce: sinfo.myNonce, Nonce: sinfo.myNonce,
@ -503,6 +525,9 @@ func (sinfo *sessionInfo) _send(msg FlowKeyMessage) {
sinfo.sessions.router.Act(sinfo, func() { sinfo.sessions.router.Act(sinfo, func() {
sinfo.sessions.router.out(packet) sinfo.sessions.router.out(packet)
}) })
if time.Since(sinfo.pingTime) > 3*time.Second {
sinfo._sendPingPong(false, nil)
}
} }
ch <- callback ch <- callback
sinfo.checkCallbacks() sinfo.checkCallbacks()

View File

@ -113,7 +113,8 @@ func (s *stream) readMsgFromBuffer() ([]byte, error) {
if msgLen > streamMsgSize { if msgLen > streamMsgSize {
return nil, errors.New("oversized message") return nil, errors.New("oversized message")
} }
msg := pool_getBytes(int(msgLen)) msg := pool_getBytes(int(msgLen + 10)) // Extra padding for up to 1 more switchPort
msg = msg[:msgLen]
_, err = io.ReadFull(s.inputBuffer, msg) _, err = io.ReadFull(s.inputBuffer, msg)
return msg, err return msg, err
} }

View File

@ -23,7 +23,6 @@ const (
switch_timeout = time.Minute switch_timeout = time.Minute
switch_updateInterval = switch_timeout / 2 switch_updateInterval = switch_timeout / 2
switch_throttle = switch_updateInterval / 2 switch_throttle = switch_updateInterval / 2
switch_faster_threshold = 240 //Number of switch updates before switching to a faster parent
) )
// The switch locator represents the topology and network state dependent info about a node, minus the signatures that go with it. // The switch locator represents the topology and network state dependent info about a node, minus the signatures that go with it.
@ -140,7 +139,6 @@ type peerInfo struct {
locator switchLocator // Should be able to respond with signatures upon request locator switchLocator // Should be able to respond with signatures upon request
degree uint64 // Self-reported degree degree uint64 // Self-reported degree
time time.Time // Time this node was last seen time time.Time // Time this node was last seen
faster map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower
port switchPort // Interface number of this peer port switchPort // Interface number of this peer
msg switchMsg // The wire switchMsg used msg switchMsg // The wire switchMsg used
readBlock bool // True if the link notified us of a read that blocked too long readBlock bool // True if the link notified us of a read that blocked too long
@ -427,37 +425,12 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
doUpdate := false doUpdate := false
oldSender := t.data.peers[fromPort] oldSender := t.data.peers[fromPort]
if !equiv(&sender.locator, &oldSender.locator) { if !equiv(&sender.locator, &oldSender.locator) {
// Reset faster info, we'll start refilling it right after this
sender.faster = nil
doUpdate = true doUpdate = true
} }
// Update the matrix of peer "faster" thresholds
if reprocessing { if reprocessing {
sender.faster = oldSender.faster
sender.time = oldSender.time sender.time = oldSender.time
sender.readBlock = oldSender.readBlock sender.readBlock = oldSender.readBlock
sender.writeBlock = oldSender.writeBlock sender.writeBlock = oldSender.writeBlock
} else {
sender.faster = make(map[switchPort]uint64, len(oldSender.faster))
for port, peer := range t.data.peers {
if port == fromPort {
continue
} else if sender.locator.root != peer.locator.root || sender.locator.tstamp > peer.locator.tstamp {
// We were faster than this node, so increment, as long as we don't overflow because of it
if oldSender.faster[peer.port] < switch_faster_threshold {
sender.faster[port] = oldSender.faster[peer.port] + 1
} else {
sender.faster[port] = switch_faster_threshold
}
} else {
// Slower than this node, penalize (more than the reward amount)
if oldSender.faster[port] > 1 {
sender.faster[port] = oldSender.faster[peer.port] - 2
} else {
sender.faster[port] = 0
}
}
}
} }
if sender.blocked() != oldSender.blocked() { if sender.blocked() != oldSender.blocked() {
doUpdate = true doUpdate = true
@ -496,35 +469,11 @@ func (t *switchTable) _handleMsg(msg *switchMsg, fromPort switchPort, reprocessi
case noParent: case noParent:
// We currently have no working parent, and at this point in the switch statement, anything is better than nothing. // We currently have no working parent, and at this point in the switch statement, anything is better than nothing.
updateRoot = true updateRoot = true
case sender.faster[t.parent] >= switch_faster_threshold:
// The is reliably faster than the current parent.
updateRoot = true
case !sender.blocked() && oldParent.blocked(): case !sender.blocked() && oldParent.blocked():
// Replace a blocked parent // Replace a blocked parent
updateRoot = true updateRoot = true
case reprocessing && sender.blocked() && !oldParent.blocked(): case reprocessing && sender.blocked() && !oldParent.blocked():
// Don't replace an unblocked parent when reprocessing // Don't replace an unblocked parent when reprocessing
case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]:
// The sender seems to be reliably faster than the current parent, so switch to them instead.
updateRoot = true
case sender.port != t.parent:
// Ignore further cases if the sender isn't our parent.
case !reprocessing && !equiv(&sender.locator, &t.data.locator):
// Special case:
// If coords changed, then we need to penalize this node somehow, to prevent flapping.
// First, reset all faster-related info to 0.
// Then, de-parent the node and reprocess all messages to find a new parent.
t.parent = 0
for _, peer := range t.data.peers {
if peer.port == sender.port {
continue
}
t._handleMsg(&peer.msg, peer.port, true)
}
// Process the sender last, to avoid keeping them as a parent if at all possible.
t._handleMsg(&sender.msg, sender.port, true)
case now.Sub(t.time) < switch_throttle:
// We've already gotten an update from this root recently, so ignore this one to avoid flooding.
case sender.locator.tstamp > t.data.locator.tstamp: case sender.locator.tstamp > t.data.locator.tstamp:
// The timestamp was updated, so we need to update locally and send to our peers. // The timestamp was updated, so we need to update locally and send to our peers.
updateRoot = true updateRoot = true
@ -631,13 +580,11 @@ func (t *switchTable) start() error {
return nil return nil
} }
func (t *lookupTable) lookup(coords []byte) switchPort { func (t *lookupTable) lookup(ports []switchPort) switchPort {
var offset int
here := &t._start here := &t._start
for offset < len(coords) { for idx := range ports {
port, l := wire_decode_uint64(coords[offset:]) port := ports[idx]
offset += l if next, ok := here.next[port]; ok {
if next, ok := here.next[switchPort(port)]; ok {
here = next here = next
} else { } else {
break break
@ -645,3 +592,56 @@ func (t *lookupTable) lookup(coords []byte) switchPort {
} }
return here.port return here.port
} }
func switch_getPorts(coords []byte) []switchPort {
var ports []switchPort
var offset int
for offset < len(coords) {
port, l := wire_decode_uint64(coords[offset:])
offset += l
ports = append(ports, switchPort(port))
}
return ports
}
func switch_reverseCoordBytes(coords []byte) []byte {
a := switch_getPorts(coords)
for i := len(a)/2 - 1; i >= 0; i-- {
opp := len(a) - 1 - i
a[i], a[opp] = a[opp], a[i]
}
var reversed []byte
for _, sPort := range a {
reversed = wire_put_uint64(uint64(sPort), reversed)
}
return reversed
}
func (t *lookupTable) isDescendant(ports []switchPort) bool {
// Note that this returns true for anyone in the subtree that starts at us
// That includes ourself, so we are our own descendant by this logic...
if len(t.self.coords) >= len(ports) {
// Our coords are longer, so they can't be our descendant
return false
}
for idx := range t.self.coords {
if ports[idx] != t.self.coords[idx] {
return false
}
}
return true
}
func (t *lookupTable) getOffset(ports []switchPort) uint64 {
// If they're our descendant, this returns the length of our coords, used as an offset for source routing
// If they're not our descendant, this returns 0
var offset uint64
for idx := range t.self.coords {
if idx < len(ports) && ports[idx] == t.self.coords[idx] {
offset += 1
} else {
return 0
}
}
return offset
}

View File

@ -24,7 +24,7 @@ func version_getBaseMetadata() version_metadata {
return version_metadata{ return version_metadata{
meta: [4]byte{'m', 'e', 't', 'a'}, meta: [4]byte{'m', 'e', 't', 'a'},
ver: 0, ver: 0,
minorVer: 2, minorVer: 0,
} }
} }

View File

@ -96,9 +96,9 @@ func wire_encode_coords(coords []byte) []byte {
// Puts a length prefix and the coords into bs, returns the wire formatted coords. // Puts a length prefix and the coords into bs, returns the wire formatted coords.
// Useful in hot loops where we don't want to allocate and we know the rest of the later parts of the slice are safe to overwrite. // Useful in hot loops where we don't want to allocate and we know the rest of the later parts of the slice are safe to overwrite.
func wire_put_coords(coords []byte, bs []byte) []byte { func wire_put_vslice(slice []byte, bs []byte) []byte {
bs = wire_put_uint64(uint64(len(coords)), bs) bs = wire_put_uint64(uint64(len(slice)), bs)
bs = append(bs, coords...) bs = append(bs, slice...)
return bs return bs
} }
@ -194,14 +194,14 @@ func wire_chop_slice(toSlice []byte, fromSlice *[]byte) bool {
return true return true
} }
// A utility function to extract coords from a slice and advance the source slices, returning true if successful. // A utility function to extract a length-prefixed slice (such as coords) from a slice and advance the source slices, returning true if successful.
func wire_chop_coords(toCoords *[]byte, fromSlice *[]byte) bool { func wire_chop_vslice(toSlice *[]byte, fromSlice *[]byte) bool {
coords, coordLen := wire_decode_coords(*fromSlice) slice, sliceLen := wire_decode_coords(*fromSlice)
if coordLen == 0 { if sliceLen == 0 { // sliceLen is length-prefix size + slice size, in bytes
return false return false
} }
*toCoords = append((*toCoords)[:0], coords...) *toSlice = append((*toSlice)[:0], slice...)
*fromSlice = (*fromSlice)[coordLen:] *fromSlice = (*fromSlice)[sliceLen:]
return true return true
} }
@ -222,10 +222,12 @@ func wire_chop_uint64(toUInt64 *uint64, fromSlice *[]byte) bool {
// The wire format for ordinary IPv6 traffic encapsulated by the network. // The wire format for ordinary IPv6 traffic encapsulated by the network.
type wire_trafficPacket struct { type wire_trafficPacket struct {
Offset uint64
Coords []byte Coords []byte
Handle crypto.Handle Handle crypto.Handle
Nonce crypto.BoxNonce Nonce crypto.BoxNonce
Payload []byte Payload []byte
RPath []byte
} }
// Encodes a wire_trafficPacket into its wire format. // Encodes a wire_trafficPacket into its wire format.
@ -233,10 +235,12 @@ type wire_trafficPacket struct {
func (p *wire_trafficPacket) encode() []byte { func (p *wire_trafficPacket) encode() []byte {
bs := pool_getBytes(0) bs := pool_getBytes(0)
bs = wire_put_uint64(wire_Traffic, bs) bs = wire_put_uint64(wire_Traffic, bs)
bs = wire_put_coords(p.Coords, bs) bs = wire_put_uint64(p.Offset, bs)
bs = wire_put_vslice(p.Coords, bs)
bs = append(bs, p.Handle[:]...) bs = append(bs, p.Handle[:]...)
bs = append(bs, p.Nonce[:]...) bs = append(bs, p.Nonce[:]...)
bs = append(bs, p.Payload...) bs = wire_put_vslice(p.Payload, bs)
bs = append(bs, p.RPath...)
return bs return bs
} }
@ -250,35 +254,42 @@ func (p *wire_trafficPacket) decode(bs []byte) bool {
return false return false
case pType != wire_Traffic: case pType != wire_Traffic:
return false return false
case !wire_chop_coords(&p.Coords, &bs): case !wire_chop_uint64(&p.Offset, &bs):
return false
case !wire_chop_vslice(&p.Coords, &bs):
return false return false
case !wire_chop_slice(p.Handle[:], &bs): case !wire_chop_slice(p.Handle[:], &bs):
return false return false
case !wire_chop_slice(p.Nonce[:], &bs): case !wire_chop_slice(p.Nonce[:], &bs):
return false return false
case !wire_chop_vslice(&p.Payload, &bs):
return false
} }
p.Payload = append(p.Payload, bs...) p.RPath = append(p.RPath[:0], bs...)
return true return true
} }
// The wire format for protocol traffic, such as dht req/res or session ping/pong packets. // The wire format for protocol traffic, such as dht req/res or session ping/pong packets.
type wire_protoTrafficPacket struct { type wire_protoTrafficPacket struct {
Offset uint64
Coords []byte Coords []byte
ToKey crypto.BoxPubKey ToKey crypto.BoxPubKey
FromKey crypto.BoxPubKey FromKey crypto.BoxPubKey
Nonce crypto.BoxNonce Nonce crypto.BoxNonce
Payload []byte Payload []byte
RPath []byte
} }
// Encodes a wire_protoTrafficPacket into its wire format. // Encodes a wire_protoTrafficPacket into its wire format.
func (p *wire_protoTrafficPacket) encode() []byte { func (p *wire_protoTrafficPacket) encode() []byte {
coords := wire_encode_coords(p.Coords)
bs := wire_encode_uint64(wire_ProtocolTraffic) bs := wire_encode_uint64(wire_ProtocolTraffic)
bs = append(bs, coords...) bs = wire_put_uint64(p.Offset, bs)
bs = wire_put_vslice(p.Coords, bs)
bs = append(bs, p.ToKey[:]...) bs = append(bs, p.ToKey[:]...)
bs = append(bs, p.FromKey[:]...) bs = append(bs, p.FromKey[:]...)
bs = append(bs, p.Nonce[:]...) bs = append(bs, p.Nonce[:]...)
bs = append(bs, p.Payload...) bs = wire_put_vslice(p.Payload, bs)
bs = append(bs, p.RPath...)
return bs return bs
} }
@ -290,7 +301,9 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool {
return false return false
case pType != wire_ProtocolTraffic: case pType != wire_ProtocolTraffic:
return false return false
case !wire_chop_coords(&p.Coords, &bs): case !wire_chop_uint64(&p.Offset, &bs):
return false
case !wire_chop_vslice(&p.Coords, &bs):
return false return false
case !wire_chop_slice(p.ToKey[:], &bs): case !wire_chop_slice(p.ToKey[:], &bs):
return false return false
@ -298,11 +311,23 @@ func (p *wire_protoTrafficPacket) decode(bs []byte) bool {
return false return false
case !wire_chop_slice(p.Nonce[:], &bs): case !wire_chop_slice(p.Nonce[:], &bs):
return false return false
case !wire_chop_vslice(&p.Payload, &bs):
return false
} }
p.Payload = bs p.RPath = append(p.RPath[:0], bs...)
return true return true
} }
// Get the offset and coord slices of a (protocol) traffic packet without decoding
func wire_getTrafficOffsetAndCoords(packet []byte) ([]byte, []byte) {
_, offsetBegin := wire_decode_uint64(packet)
_, offsetLen := wire_decode_uint64(packet[offsetBegin:])
offsetEnd := offsetBegin + offsetLen
offset := packet[offsetBegin:offsetEnd]
coords, _ := wire_decode_coords(packet[offsetEnd:])
return offset, coords
}
// The wire format for link protocol traffic, namely switchMsg. // The wire format for link protocol traffic, namely switchMsg.
// There's really two layers of this, with the outer layer using permanent keys, and the inner layer using ephemeral keys. // There's really two layers of this, with the outer layer using permanent keys, and the inner layer using ephemeral keys.
// The keys themselves are exchanged as part of the connection setup, and then omitted from the packets. // The keys themselves are exchanged as part of the connection setup, and then omitted from the packets.
@ -373,7 +398,7 @@ func (p *sessionPing) decode(bs []byte) bool {
return false return false
case !wire_chop_uint64(&tstamp, &bs): case !wire_chop_uint64(&tstamp, &bs):
return false return false
case !wire_chop_coords(&p.Coords, &bs): case !wire_chop_vslice(&p.Coords, &bs):
return false return false
case !wire_chop_uint64(&mtu, &bs): case !wire_chop_uint64(&mtu, &bs):
mtu = 1280 mtu = 1280
@ -397,7 +422,7 @@ func (p *nodeinfoReqRes) encode() []byte {
pTypeVal = wire_NodeInfoRequest pTypeVal = wire_NodeInfoRequest
} }
bs := wire_encode_uint64(pTypeVal) bs := wire_encode_uint64(pTypeVal)
bs = wire_put_coords(p.SendCoords, bs) bs = wire_put_vslice(p.SendCoords, bs)
if pTypeVal == wire_NodeInfoResponse { if pTypeVal == wire_NodeInfoResponse {
bs = append(bs, p.NodeInfo...) bs = append(bs, p.NodeInfo...)
} }
@ -412,7 +437,7 @@ func (p *nodeinfoReqRes) decode(bs []byte) bool {
return false return false
case pType != wire_NodeInfoRequest && pType != wire_NodeInfoResponse: case pType != wire_NodeInfoRequest && pType != wire_NodeInfoResponse:
return false return false
case !wire_chop_coords(&p.SendCoords, &bs): case !wire_chop_vslice(&p.SendCoords, &bs):
return false return false
} }
if p.IsResponse = pType == wire_NodeInfoResponse; p.IsResponse { if p.IsResponse = pType == wire_NodeInfoResponse; p.IsResponse {
@ -446,7 +471,7 @@ func (r *dhtReq) decode(bs []byte) bool {
return false return false
case pType != wire_DHTLookupRequest: case pType != wire_DHTLookupRequest:
return false return false
case !wire_chop_coords(&r.Coords, &bs): case !wire_chop_vslice(&r.Coords, &bs):
return false return false
case !wire_chop_slice(r.Dest[:], &bs): case !wire_chop_slice(r.Dest[:], &bs):
return false return false
@ -477,7 +502,7 @@ func (r *dhtRes) decode(bs []byte) bool {
return false return false
case pType != wire_DHTLookupResponse: case pType != wire_DHTLookupResponse:
return false return false
case !wire_chop_coords(&r.Coords, &bs): case !wire_chop_vslice(&r.Coords, &bs):
return false return false
case !wire_chop_slice(r.Dest[:], &bs): case !wire_chop_slice(r.Dest[:], &bs):
return false return false
@ -487,7 +512,7 @@ func (r *dhtRes) decode(bs []byte) bool {
switch { switch {
case !wire_chop_slice(info.key[:], &bs): case !wire_chop_slice(info.key[:], &bs):
return false return false
case !wire_chop_coords(&info.coords, &bs): case !wire_chop_vslice(&info.coords, &bs):
return false return false
} }
r.Infos = append(r.Infos, &info) r.Infos = append(r.Infos, &info)