mirror of
https://github.com/cwinfo/yggdrasil-go.git
synced 2024-12-26 11:15:41 +00:00
commit
885ba4452d
@ -11,6 +11,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- checkout
|
||||
|
||||
- run:
|
||||
name: Create artifact upload directory and set variables
|
||||
command: |
|
||||
@ -19,13 +20,19 @@ jobs:
|
||||
echo 'export CIVERSION=$(sh contrib/semver/version.sh | cut -c 2-)' >> $BASH_ENV
|
||||
|
||||
- run:
|
||||
name: Build for Linux (including Debian packages)
|
||||
name: Install alien
|
||||
command: |
|
||||
sudo apt-get install -y alien
|
||||
|
||||
- run:
|
||||
name: Build for Linux (including Debian packages and RPMs)
|
||||
command: |
|
||||
PKGARCH=amd64 sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-amd64 && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-amd64;
|
||||
PKGARCH=i386 sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-i386 && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-i386;
|
||||
PKGARCH=mipsel sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-mipsel && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-mipsel;
|
||||
PKGARCH=mips sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-mips && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-mips;
|
||||
PKGARCH=armhf sh contrib/deb/generate.sh && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-linux-armh && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-linux-armhf;
|
||||
sudo alien --to-rpm yggdrasil*.deb --scripts --keep-version && mv *.rpm /tmp/upload/;
|
||||
mv *.deb /tmp/upload/
|
||||
|
||||
- run:
|
||||
|
87
CHANGELOG.md
Normal file
87
CHANGELOG.md
Normal file
@ -0,0 +1,87 @@
|
||||
# Changelog
|
||||
All notable changes to this project will be documented in this file.
|
||||
|
||||
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
|
||||
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
|
||||
|
||||
<!-- Use this as a template
|
||||
## [X.Y.Z] - YYYY-MM-DD
|
||||
### Added
|
||||
- for new features.
|
||||
|
||||
### Changed
|
||||
- for changes in existing functionality.
|
||||
|
||||
### Deprecated
|
||||
- for soon-to-be removed features.
|
||||
|
||||
### Removed
|
||||
- for now removed features.
|
||||
|
||||
### Fixed
|
||||
- for any bug fixes.
|
||||
|
||||
### Security
|
||||
- in case of vulnerabilities.
|
||||
-->
|
||||
|
||||
## [0.2.3] - 2018-06-29
|
||||
### Added
|
||||
- Begin keeping changelog (incomplete and possibly inaccurate information before this point).
|
||||
- Build RPMs in CircleCI using alien. This provides package support for Fedora, Red Hat Enterprise Linux, CentOS and other RPM-based distributions.
|
||||
|
||||
### Changed
|
||||
- Local backpressure improvements.
|
||||
- Change `box_pub_key` to `key` in admin API for simplicity.
|
||||
- Session cleanup.
|
||||
|
||||
## [0.2.2] - 2018-06-21
|
||||
### Added
|
||||
- Add `yggdrasilconf` utility for testing with the `vyatta-yggdrasil` package.
|
||||
- Add a randomized retry delay after TCP disconnects, to prevent synchronization livelocks.
|
||||
|
||||
### Changed
|
||||
- Update build script to strip by default, which significantly reduces the size of the binary.
|
||||
- Add debug `-d` and UPX `-u` flags to the `build` script.
|
||||
- Start pprof in debug builds based on an environment variable (e.g. `PPROFLISTEN=localhost:6060`), instead of a flag.
|
||||
|
||||
### Fixed
|
||||
- Fix typo in big-endian BOM so that both little-endian and big-endian UTF-16 files are detected correctly.
|
||||
|
||||
## [0.2.1] - 2018-06-15
|
||||
### Changed
|
||||
- The address range was moved from `fd00::/8` to `200::/7`. This range was chosen as it is marked as deprecated. The change prevents overlap with other ULA privately assigned ranges.
|
||||
|
||||
### Fixed
|
||||
- UTF-16 detection conversion for configuration files, which can particularly be a problem on Windows 10 if a configuration file is generated from within PowerShell.
|
||||
- Fixes to the Debian package control file.
|
||||
- Fixes to the launchd service for macOS.
|
||||
- Fixes to the DHT and switch.
|
||||
|
||||
## [0.2.0] - 2018-06-13
|
||||
### Added
|
||||
- Exchange version information during connection setup, to prevent connections with incompatible versions.
|
||||
|
||||
### Changed
|
||||
- Wire format changes (backwards incompatible).
|
||||
- Less maintenance traffic per peer.
|
||||
- Exponential back-off for DHT maintenance traffic (less maintenance traffic for known good peers).
|
||||
- Iterative DHT (added some time between v0.1.0 and here).
|
||||
- Use local queue sizes for a sort of local-only backpressure routing, instead of the removed bandwidth estimates, when deciding where to send a packet.
|
||||
|
||||
### Removed
|
||||
- UDP peering, this may be added again if/when a better implementation appears.
|
||||
- Per peer bandwidth estimation, as this has been replaced with an early local backpressure implementation.
|
||||
|
||||
## [0.1.0] - 2018-02-01
|
||||
### Added
|
||||
- Adopt semantic versioning.
|
||||
|
||||
### Changed
|
||||
- Wire format changes (backwards incompatible).
|
||||
- Many other undocumented changes leading up to this release and before the next one.
|
||||
|
||||
## [0.0.1] - 2017-12-28
|
||||
### Added
|
||||
- First commit.
|
||||
- Initial public release.
|
@ -51,12 +51,12 @@ ip netns exec node4 ip link set lo up
|
||||
ip netns exec node5 ip link set lo up
|
||||
ip netns exec node6 ip link set lo up
|
||||
|
||||
ip netns exec node1 ./run --autoconf --pprof &> /dev/null &
|
||||
ip netns exec node2 ./run --autoconf --pprof &> /dev/null &
|
||||
ip netns exec node3 ./run --autoconf --pprof &> /dev/null &
|
||||
ip netns exec node4 ./run --autoconf --pprof &> /dev/null &
|
||||
ip netns exec node5 ./run --autoconf --pprof &> /dev/null &
|
||||
ip netns exec node6 ./run --autoconf --pprof &> /dev/null &
|
||||
ip netns exec node1 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
|
||||
ip netns exec node2 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
|
||||
ip netns exec node3 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
|
||||
ip netns exec node4 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
|
||||
ip netns exec node5 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
|
||||
ip netns exec node6 env PPROFLISTEN=localhost:6060 ./run --autoconf &> /dev/null &
|
||||
|
||||
echo "Started, to continue you should (possibly w/ sudo):"
|
||||
echo "kill" $(jobs -p)
|
||||
|
@ -189,34 +189,34 @@ func (a *admin) init(c *Core, listenaddr string) {
|
||||
a.addHandler("getAllowedEncryptionPublicKeys", []string{}, func(in admin_info) (admin_info, error) {
|
||||
return admin_info{"allowed_box_pubs": a.getAllowedEncryptionPublicKeys()}, nil
|
||||
})
|
||||
a.addHandler("addAllowedEncryptionPublicKey", []string{"box_pub_key"}, func(in admin_info) (admin_info, error) {
|
||||
if a.addAllowedEncryptionPublicKey(in["box_pub_key"].(string)) == nil {
|
||||
a.addHandler("addAllowedEncryptionPublicKey", []string{"key"}, func(in admin_info) (admin_info, error) {
|
||||
if a.addAllowedEncryptionPublicKey(in["key"].(string)) == nil {
|
||||
return admin_info{
|
||||
"added": []string{
|
||||
in["box_pub_key"].(string),
|
||||
in["key"].(string),
|
||||
},
|
||||
}, nil
|
||||
} else {
|
||||
return admin_info{
|
||||
"not_added": []string{
|
||||
in["box_pub_key"].(string),
|
||||
in["key"].(string),
|
||||
},
|
||||
}, errors.New("Failed to add allowed box pub key")
|
||||
}, errors.New("Failed to add allowed key")
|
||||
}
|
||||
})
|
||||
a.addHandler("removeAllowedEncryptionPublicKey", []string{"box_pub_key"}, func(in admin_info) (admin_info, error) {
|
||||
if a.removeAllowedEncryptionPublicKey(in["box_pub_key"].(string)) == nil {
|
||||
a.addHandler("removeAllowedEncryptionPublicKey", []string{"key"}, func(in admin_info) (admin_info, error) {
|
||||
if a.removeAllowedEncryptionPublicKey(in["key"].(string)) == nil {
|
||||
return admin_info{
|
||||
"removed": []string{
|
||||
in["box_pub_key"].(string),
|
||||
in["key"].(string),
|
||||
},
|
||||
}, nil
|
||||
} else {
|
||||
return admin_info{
|
||||
"not_removed": []string{
|
||||
in["box_pub_key"].(string),
|
||||
in["key"].(string),
|
||||
},
|
||||
}, errors.New("Failed to remove allowed box pub key")
|
||||
}, errors.New("Failed to remove allowed key")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -101,6 +101,11 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.switchTable.start(); err != nil {
|
||||
c.log.Println("Failed to start switch")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.router.start(); err != nil {
|
||||
c.log.Println("Failed to start router")
|
||||
return err
|
||||
|
@ -49,6 +49,7 @@ func (c *Core) Init() {
|
||||
bpub, bpriv := newBoxKeys()
|
||||
spub, spriv := newSigKeys()
|
||||
c.init(bpub, bpriv, spub, spriv)
|
||||
c.switchTable.start()
|
||||
c.router.start()
|
||||
}
|
||||
|
||||
@ -140,7 +141,42 @@ func (l *switchLocator) DEBUG_getCoords() []byte {
|
||||
}
|
||||
|
||||
func (c *Core) DEBUG_switchLookup(dest []byte) switchPort {
|
||||
return c.switchTable.lookup(dest)
|
||||
return c.switchTable.DEBUG_lookup(dest)
|
||||
}
|
||||
|
||||
// This does the switch layer lookups that decide how to route traffic.
|
||||
// Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree.
|
||||
// Traffic must be routed to a node that is closer to the destination via the metric space distance.
|
||||
// In the event that two nodes are equally close, it gets routed to the one with the longest uptime (due to the order that things are iterated over).
|
||||
// The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself.
|
||||
// Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists.
|
||||
func (t *switchTable) DEBUG_lookup(dest []byte) switchPort {
|
||||
table := t.getTable()
|
||||
myDist := table.self.dist(dest)
|
||||
if myDist == 0 {
|
||||
return 0
|
||||
}
|
||||
// cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow
|
||||
ports := t.core.peers.getPorts()
|
||||
var best switchPort
|
||||
bestCost := int64(^uint64(0) >> 1)
|
||||
for _, info := range table.elems {
|
||||
dist := info.locator.dist(dest)
|
||||
if !(dist < myDist) {
|
||||
continue
|
||||
}
|
||||
//p, isIn := ports[info.port]
|
||||
_, isIn := ports[info.port]
|
||||
if !isIn {
|
||||
continue
|
||||
}
|
||||
cost := int64(dist) // + p.getQueueSize()
|
||||
if cost < bestCost {
|
||||
best = info.port
|
||||
bestCost = cost
|
||||
}
|
||||
}
|
||||
return best
|
||||
}
|
||||
|
||||
/*
|
||||
@ -480,13 +516,17 @@ func DEBUG_simLinkPeers(p, q *peer) {
|
||||
}
|
||||
}()
|
||||
p.out = func(bs []byte) {
|
||||
p.core.switchTable.idleIn <- p.port
|
||||
go q.handlePacket(bs)
|
||||
}
|
||||
q.out = func(bs []byte) {
|
||||
q.core.switchTable.idleIn <- q.port
|
||||
go p.handlePacket(bs)
|
||||
}
|
||||
go p.linkLoop()
|
||||
go q.linkLoop()
|
||||
p.core.switchTable.idleIn <- p.port
|
||||
q.core.switchTable.idleIn <- q.port
|
||||
}
|
||||
|
||||
func (c *Core) DEBUG_simFixMTU() {
|
||||
|
@ -74,9 +74,8 @@ func (ps *peers) putPorts(ports map[switchPort]*peer) {
|
||||
ps.ports.Store(ports)
|
||||
}
|
||||
|
||||
// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral), a handler for their outgoing traffic, and queue sizes for local backpressure.
|
||||
// Information known about a peer, including thier box/sig keys, precomputed shared keys (static and ephemeral) and a handler for their outgoing traffic
|
||||
type peer struct {
|
||||
queueSize int64 // used to track local backpressure
|
||||
bytesSent uint64 // To track bandwidth usage for getPeers
|
||||
bytesRecvd uint64 // To track bandwidth usage for getPeers
|
||||
// BUG: sync/atomic, 32 bit platforms need the above to be the first element
|
||||
@ -94,16 +93,6 @@ type peer struct {
|
||||
close func() // Called when a peer is removed, to close the underlying connection, or via admin api
|
||||
}
|
||||
|
||||
// Size of the queue of packets to be sent to the node.
|
||||
func (p *peer) getQueueSize() int64 {
|
||||
return atomic.LoadInt64(&p.queueSize)
|
||||
}
|
||||
|
||||
// Used to increment or decrement the queue.
|
||||
func (p *peer) updateQueueSize(delta int64) {
|
||||
atomic.AddInt64(&p.queueSize, delta)
|
||||
}
|
||||
|
||||
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unocupied port number.
|
||||
func (ps *peers) newPeer(box *boxPubKey, sig *sigPubKey, linkShared *boxSharedKey) *peer {
|
||||
now := time.Now()
|
||||
@ -229,19 +218,7 @@ func (p *peer) handleTraffic(packet []byte, pTypeLen int) {
|
||||
// Drop traffic until the peer manages to send us at least one good switchMsg
|
||||
return
|
||||
}
|
||||
coords, coordLen := wire_decode_coords(packet[pTypeLen:])
|
||||
if coordLen >= len(packet) {
|
||||
return
|
||||
} // No payload
|
||||
toPort := p.core.switchTable.lookup(coords)
|
||||
if toPort == p.port {
|
||||
return
|
||||
}
|
||||
to := p.core.peers.getPorts()[toPort]
|
||||
if to == nil {
|
||||
return
|
||||
}
|
||||
to.sendPacket(packet)
|
||||
p.core.switchTable.packetIn <- packet
|
||||
}
|
||||
|
||||
// This just calls p.out(packet) for now.
|
||||
|
@ -101,6 +101,8 @@ func (r *router) mainLoop() {
|
||||
// Any periodic maintenance stuff goes here
|
||||
r.core.switchTable.doMaintenance()
|
||||
r.core.dht.doMaintenance()
|
||||
r.core.sessions.cleanup()
|
||||
r.core.sigs.cleanup()
|
||||
util_getBytes() // To slowly drain things
|
||||
}
|
||||
case f := <-r.admin:
|
||||
|
@ -89,7 +89,8 @@ func (s *sessionInfo) timedout() bool {
|
||||
// Sessions are indexed by handle.
|
||||
// Additionally, stores maps of address/subnet onto keys, and keys onto handles.
|
||||
type sessions struct {
|
||||
core *Core
|
||||
core *Core
|
||||
lastCleanup time.Time
|
||||
// Maps known permanent keys to their shared key, used by DHT a lot
|
||||
permShared map[boxPubKey]*boxSharedKey
|
||||
// Maps (secret) handle onto session info
|
||||
@ -111,6 +112,7 @@ func (ss *sessions) init(core *Core) {
|
||||
ss.byTheirPerm = make(map[boxPubKey]*handle)
|
||||
ss.addrToPerm = make(map[address]*boxPubKey)
|
||||
ss.subnetToPerm = make(map[subnet]*boxPubKey)
|
||||
ss.lastCleanup = time.Now()
|
||||
}
|
||||
|
||||
// Gets the session corresponding to a given handle.
|
||||
@ -202,13 +204,6 @@ func (ss *sessions) createSession(theirPermKey *boxPubKey) *sessionInfo {
|
||||
sinfo.send = make(chan []byte, 32)
|
||||
sinfo.recv = make(chan *wire_trafficPacket, 32)
|
||||
go sinfo.doWorker()
|
||||
// Do some cleanup
|
||||
// Time thresholds almost certainly could use some adjusting
|
||||
for _, s := range ss.sinfos {
|
||||
if s.timedout() {
|
||||
s.close()
|
||||
}
|
||||
}
|
||||
ss.sinfos[sinfo.myHandle] = &sinfo
|
||||
ss.byMySes[sinfo.mySesPub] = &sinfo.myHandle
|
||||
ss.byTheirPerm[sinfo.theirPermPub] = &sinfo.myHandle
|
||||
@ -217,6 +212,19 @@ func (ss *sessions) createSession(theirPermKey *boxPubKey) *sessionInfo {
|
||||
return &sinfo
|
||||
}
|
||||
|
||||
func (ss *sessions) cleanup() {
|
||||
// Time thresholds almost certainly could use some adjusting
|
||||
if time.Since(ss.lastCleanup) < time.Minute {
|
||||
return
|
||||
}
|
||||
for _, s := range ss.sinfos {
|
||||
if s.timedout() {
|
||||
s.close()
|
||||
}
|
||||
}
|
||||
ss.lastCleanup = time.Now()
|
||||
}
|
||||
|
||||
// Closes a session, removing it from sessions maps and killing the worker goroutine.
|
||||
func (sinfo *sessionInfo) close() {
|
||||
delete(sinfo.core.sessions.sinfos, sinfo.myHandle)
|
||||
|
@ -71,16 +71,20 @@ func (m *sigManager) isChecked(key *sigPubKey, sig *sigBytes, bs []byte) bool {
|
||||
func (m *sigManager) putChecked(key *sigPubKey, newsig *sigBytes, bs []byte) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
now := time.Now()
|
||||
if time.Since(m.lastCleaned) > 60*time.Second {
|
||||
// Since we have the write lock anyway, do some cleanup
|
||||
for s, k := range m.checked {
|
||||
if time.Since(k.time) > 60*time.Second {
|
||||
delete(m.checked, s)
|
||||
}
|
||||
}
|
||||
m.lastCleaned = now
|
||||
}
|
||||
k := knownSig{key: *key, sig: *newsig, bs: bs, time: now}
|
||||
k := knownSig{key: *key, sig: *newsig, bs: bs, time: time.Now()}
|
||||
m.checked[*newsig] = k
|
||||
}
|
||||
|
||||
func (m *sigManager) cleanup() {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
if time.Since(m.lastCleaned) < time.Minute {
|
||||
return
|
||||
}
|
||||
for s, k := range m.checked {
|
||||
if time.Since(k.time) > time.Minute {
|
||||
delete(m.checked, s)
|
||||
}
|
||||
}
|
||||
m.lastCleaned = time.Now()
|
||||
}
|
||||
|
@ -12,7 +12,6 @@ package yggdrasil
|
||||
// A little annoying to do with constant changes from backpressure
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -139,7 +138,7 @@ type tableElem struct {
|
||||
// This is the subset of the information about all peers needed to make routing decisions, and it stored separately in an atomically accessed table, which gets hammered in the "hot loop" of the routing logic (see: peer.handleTraffic in peers.go).
|
||||
type lookupTable struct {
|
||||
self switchLocator
|
||||
elems []tableElem
|
||||
elems map[switchPort]tableElem
|
||||
}
|
||||
|
||||
// This is switch information which is mutable and needs to be modified by other goroutines, but is not accessed atomically.
|
||||
@ -155,15 +154,17 @@ type switchData struct {
|
||||
|
||||
// All the information stored by the switch.
|
||||
type switchTable struct {
|
||||
core *Core
|
||||
key sigPubKey // Our own key
|
||||
time time.Time // Time when locator.tstamp was last updated
|
||||
parent switchPort // Port of whatever peer is our parent, or self if we're root
|
||||
drop map[sigPubKey]int64 // Tstamp associated with a dropped root
|
||||
mutex sync.RWMutex // Lock for reads/writes of switchData
|
||||
data switchData
|
||||
updater atomic.Value //*sync.Once
|
||||
table atomic.Value //lookupTable
|
||||
core *Core
|
||||
key sigPubKey // Our own key
|
||||
time time.Time // Time when locator.tstamp was last updated
|
||||
parent switchPort // Port of whatever peer is our parent, or self if we're root
|
||||
drop map[sigPubKey]int64 // Tstamp associated with a dropped root
|
||||
mutex sync.RWMutex // Lock for reads/writes of switchData
|
||||
data switchData
|
||||
updater atomic.Value //*sync.Once
|
||||
table atomic.Value //lookupTable
|
||||
packetIn chan []byte // Incoming packets for the worker to handle
|
||||
idleIn chan switchPort // Incoming idle notifications from peer links
|
||||
}
|
||||
|
||||
// Initializes the switchTable struct.
|
||||
@ -177,6 +178,8 @@ func (t *switchTable) init(core *Core, key sigPubKey) {
|
||||
t.updater.Store(&sync.Once{})
|
||||
t.table.Store(lookupTable{})
|
||||
t.drop = make(map[sigPubKey]int64)
|
||||
t.packetIn = make(chan []byte, 1024)
|
||||
t.idleIn = make(chan switchPort, 1024)
|
||||
}
|
||||
|
||||
// Safely gets a copy of this node's locator.
|
||||
@ -438,6 +441,10 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort) {
|
||||
return
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
// The rest of these are related to the switch worker
|
||||
|
||||
// This is called via a sync.Once to update the atomically readable subset of switch information that gets used for routing decisions.
|
||||
func (t *switchTable) updateTable() {
|
||||
// WARNING this should only be called from within t.data.updater.Do()
|
||||
@ -452,7 +459,7 @@ func (t *switchTable) updateTable() {
|
||||
defer t.mutex.RUnlock()
|
||||
newTable := lookupTable{
|
||||
self: t.data.locator.clone(),
|
||||
elems: make([]tableElem, 0, len(t.data.peers)),
|
||||
elems: make(map[switchPort]tableElem, len(t.data.peers)),
|
||||
}
|
||||
for _, pinfo := range t.data.peers {
|
||||
//if !pinfo.forward { continue }
|
||||
@ -461,48 +468,214 @@ func (t *switchTable) updateTable() {
|
||||
}
|
||||
loc := pinfo.locator.clone()
|
||||
loc.coords = loc.coords[:len(loc.coords)-1] // Remove the them->self link
|
||||
newTable.elems = append(newTable.elems, tableElem{
|
||||
newTable.elems[pinfo.port] = tableElem{
|
||||
locator: loc,
|
||||
port: pinfo.port,
|
||||
})
|
||||
}
|
||||
}
|
||||
sort.SliceStable(newTable.elems, func(i, j int) bool {
|
||||
return t.data.peers[newTable.elems[i].port].firstSeen.Before(t.data.peers[newTable.elems[j].port].firstSeen)
|
||||
})
|
||||
t.table.Store(newTable)
|
||||
}
|
||||
|
||||
// This does the switch layer lookups that decide how to route traffic.
|
||||
// Traffic uses greedy routing in a metric space, where the metric distance between nodes is equal to the distance between them on the tree.
|
||||
// Traffic must be routed to a node that is closer to the destination via the metric space distance.
|
||||
// In the event that two nodes are equally close, it gets routed to the one with the longest uptime (due to the order that things are iterated over).
|
||||
// The size of the outgoing packet queue is added to a node's tree distance when the cost of forwarding to a node, subject to the constraint that the real tree distance puts them closer to the destination than ourself.
|
||||
// Doing so adds a limited form of backpressure routing, based on local information, which allows us to forward traffic around *local* bottlenecks, provided that another greedy path exists.
|
||||
func (t *switchTable) lookup(dest []byte) switchPort {
|
||||
// Returns a copy of the atomically-updated table used for switch lookups
|
||||
func (t *switchTable) getTable() lookupTable {
|
||||
t.updater.Load().(*sync.Once).Do(t.updateTable)
|
||||
table := t.table.Load().(lookupTable)
|
||||
return t.table.Load().(lookupTable)
|
||||
}
|
||||
|
||||
// Starts the switch worker
|
||||
func (t *switchTable) start() error {
|
||||
t.core.log.Println("Starting switch")
|
||||
go t.doWorker()
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if a packet should go to the self node
|
||||
// This means there's no node closer to the destination than us
|
||||
// This is mainly used to identify packets addressed to us, or that hit a blackhole
|
||||
func (t *switchTable) selfIsClosest(dest []byte) bool {
|
||||
table := t.getTable()
|
||||
myDist := table.self.dist(dest)
|
||||
if myDist == 0 {
|
||||
return 0
|
||||
// Skip the iteration step if it's impossible to be closer
|
||||
return true
|
||||
}
|
||||
// cost is in units of (expected distance) + (expected queue size), where expected distance is used as an approximation of the minimum backpressure gradient needed for packets to flow
|
||||
ports := t.core.peers.getPorts()
|
||||
var best switchPort
|
||||
bestCost := int64(^uint64(0) >> 1)
|
||||
for _, info := range table.elems {
|
||||
dist := info.locator.dist(dest)
|
||||
if !(dist < myDist) {
|
||||
continue
|
||||
}
|
||||
p, isIn := ports[info.port]
|
||||
if !isIn {
|
||||
continue
|
||||
}
|
||||
cost := int64(dist) + p.getQueueSize()
|
||||
if cost < bestCost {
|
||||
best = info.port
|
||||
bestCost = cost
|
||||
if dist < myDist {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Returns true if the peer is closer to the destination than ourself
|
||||
func (t *switchTable) portIsCloser(dest []byte, port switchPort) bool {
|
||||
table := t.getTable()
|
||||
if info, isIn := table.elems[port]; isIn {
|
||||
theirDist := info.locator.dist(dest)
|
||||
myDist := table.self.dist(dest)
|
||||
return theirDist < myDist
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Get the coords of a packet without decoding
|
||||
func switch_getPacketCoords(packet []byte) []byte {
|
||||
_, pTypeLen := wire_decode_uint64(packet)
|
||||
coords, _ := wire_decode_coords(packet[pTypeLen:])
|
||||
return coords
|
||||
}
|
||||
|
||||
// Returns a unique string for each stream of traffic
|
||||
// Equal to type+coords+handle for traffic packets
|
||||
// Equal to type+coords+toKey+fromKey for protocol traffic packets
|
||||
func switch_getPacketStreamID(packet []byte) string {
|
||||
pType, pTypeLen := wire_decode_uint64(packet)
|
||||
_, coordLen := wire_decode_coords(packet[pTypeLen:])
|
||||
end := pTypeLen + coordLen
|
||||
switch {
|
||||
case pType == wire_Traffic:
|
||||
end += handleLen // handle
|
||||
case pType == wire_ProtocolTraffic:
|
||||
end += 2 * boxPubKeyLen
|
||||
default:
|
||||
end = 0
|
||||
}
|
||||
if end > len(packet) {
|
||||
end = len(packet)
|
||||
}
|
||||
return string(packet[:end])
|
||||
}
|
||||
|
||||
// Handle an incoming packet
|
||||
// Either send it to ourself, or to the first idle peer that's free
|
||||
// Returns true if the packet has been handled somehow, false if it should be queued
|
||||
func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool {
|
||||
coords := switch_getPacketCoords(packet)
|
||||
ports := t.core.peers.getPorts()
|
||||
if t.selfIsClosest(coords) {
|
||||
// TODO? call the router directly, and remove the whole concept of a self peer?
|
||||
ports[0].sendPacket(packet)
|
||||
return true
|
||||
}
|
||||
table := t.getTable()
|
||||
myDist := table.self.dist(coords)
|
||||
var best *peer
|
||||
bestDist := myDist
|
||||
for port := range idle {
|
||||
if to := ports[port]; to != nil {
|
||||
if info, isIn := table.elems[to.port]; isIn {
|
||||
dist := info.locator.dist(coords)
|
||||
if !(dist < bestDist) {
|
||||
continue
|
||||
}
|
||||
best = to
|
||||
bestDist = dist
|
||||
}
|
||||
}
|
||||
}
|
||||
if best != nil {
|
||||
// Send to the best idle next hop
|
||||
delete(idle, best.port)
|
||||
best.sendPacket(packet)
|
||||
return true
|
||||
} else {
|
||||
// Didn't find anyone idle to send it to
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Info about a buffered packet
|
||||
type switch_packetInfo struct {
|
||||
bytes []byte
|
||||
time time.Time // Timestamp of when the packet arrived
|
||||
}
|
||||
|
||||
// Used to keep track of buffered packets
|
||||
type switch_buffer struct {
|
||||
packets []switch_packetInfo // Currently buffered packets, which may be dropped if it grows too large
|
||||
count uint64 // Total queue size, including dropped packets
|
||||
}
|
||||
|
||||
func (b *switch_buffer) dropTimedOut() {
|
||||
// TODO figure out what timeout makes sense
|
||||
const timeout = 25 * time.Millisecond
|
||||
now := time.Now()
|
||||
for len(b.packets) > 0 && now.Sub(b.packets[0].time) > timeout {
|
||||
util_putBytes(b.packets[0].bytes)
|
||||
b.packets = b.packets[1:]
|
||||
}
|
||||
}
|
||||
|
||||
// Handles incoming idle notifications
|
||||
// Loops over packets and sends the newest one that's OK for this peer to send
|
||||
// Returns true if the peer is no longer idle, false if it should be added to the idle list
|
||||
func (t *switchTable) handleIdle(port switchPort, buffs map[string]switch_buffer) bool {
|
||||
to := t.core.peers.getPorts()[port]
|
||||
if to == nil {
|
||||
return true
|
||||
}
|
||||
var best string
|
||||
var bestSize uint64
|
||||
for streamID, buf := range buffs {
|
||||
// Filter over the streams that this node is closer to
|
||||
// Keep the one with the smallest queue
|
||||
buf.dropTimedOut()
|
||||
if len(buf.packets) == 0 {
|
||||
delete(buffs, streamID)
|
||||
continue
|
||||
}
|
||||
buffs[streamID] = buf
|
||||
packet := buf.packets[0]
|
||||
coords := switch_getPacketCoords(packet.bytes)
|
||||
if (bestSize == 0 || buf.count < bestSize) && t.portIsCloser(coords, port) {
|
||||
best = streamID
|
||||
bestSize = buf.count
|
||||
}
|
||||
}
|
||||
if bestSize != 0 {
|
||||
buf := buffs[best]
|
||||
var packet switch_packetInfo
|
||||
// TODO decide if this should be LIFO or FIFO
|
||||
packet, buf.packets = buf.packets[0], buf.packets[1:]
|
||||
buf.count--
|
||||
if len(buf.packets) == 0 {
|
||||
delete(buffs, best)
|
||||
} else {
|
||||
buffs[best] = buf
|
||||
}
|
||||
to.sendPacket(packet.bytes)
|
||||
return true
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// The switch worker does routing lookups and sends packets to where they need to be
|
||||
func (t *switchTable) doWorker() {
|
||||
buffs := make(map[string]switch_buffer) // Packets per PacketStreamID (string)
|
||||
idle := make(map[switchPort]struct{}) // this is to deduplicate things
|
||||
for {
|
||||
select {
|
||||
case packet := <-t.packetIn:
|
||||
// Try to send it somewhere (or drop it if it's corrupt or at a dead end)
|
||||
if !t.handleIn(packet, idle) {
|
||||
// There's nobody free to take it right now, so queue it for later
|
||||
streamID := switch_getPacketStreamID(packet)
|
||||
buf := buffs[streamID]
|
||||
buf.dropTimedOut()
|
||||
pinfo := switch_packetInfo{packet, time.Now()}
|
||||
buf.packets = append(buf.packets, pinfo)
|
||||
buf.count++
|
||||
buffs[streamID] = buf
|
||||
}
|
||||
case port := <-t.idleIn:
|
||||
// Try to find something to send to this peer
|
||||
if !t.handleIdle(port, buffs) {
|
||||
// Didn't find anything ready to send yet, so stay idle
|
||||
idle[port] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
return best
|
||||
}
|
||||
|
@ -242,19 +242,10 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
|
||||
in := func(bs []byte) {
|
||||
p.handlePacket(bs)
|
||||
}
|
||||
out := make(chan []byte, 32) // TODO? what size makes sense
|
||||
out := make(chan []byte, 1)
|
||||
defer close(out)
|
||||
go func() {
|
||||
var shadow int64
|
||||
var stack [][]byte
|
||||
put := func(msg []byte) {
|
||||
stack = append(stack, msg)
|
||||
for len(stack) > 32 {
|
||||
util_putBytes(stack[0])
|
||||
stack = stack[1:]
|
||||
shadow++
|
||||
}
|
||||
}
|
||||
// This goroutine waits for outgoing packets, link protocol traffic, or sends idle keep-alive traffic
|
||||
send := func(msg []byte) {
|
||||
msgLen := wire_encode_uint64(uint64(len(msg)))
|
||||
buf := net.Buffers{tcp_msg[:], msgLen, msg}
|
||||
@ -266,10 +257,14 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
|
||||
timer := time.NewTimer(timerInterval)
|
||||
defer timer.Stop()
|
||||
for {
|
||||
if shadow != 0 {
|
||||
p.updateQueueSize(-shadow)
|
||||
shadow = 0
|
||||
select {
|
||||
case msg := <-p.linkOut:
|
||||
// Always send outgoing link traffic first, if needed
|
||||
send(msg)
|
||||
continue
|
||||
default:
|
||||
}
|
||||
// Otherwise wait reset the timer and wait for something to do
|
||||
timer.Stop()
|
||||
select {
|
||||
case <-timer.C:
|
||||
@ -285,34 +280,16 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
put(msg)
|
||||
}
|
||||
for len(stack) > 0 {
|
||||
select {
|
||||
case msg := <-p.linkOut:
|
||||
send(msg)
|
||||
case msg, ok := <-out:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
put(msg)
|
||||
default:
|
||||
msg := stack[len(stack)-1]
|
||||
stack = stack[:len(stack)-1]
|
||||
send(msg)
|
||||
p.updateQueueSize(-1)
|
||||
}
|
||||
send(msg) // Block until the socket write has finished
|
||||
// Now inform the switch that we're ready for more traffic
|
||||
p.core.switchTable.idleIn <- p.port
|
||||
}
|
||||
}
|
||||
}()
|
||||
p.core.switchTable.idleIn <- p.port // Start in the idle state
|
||||
p.out = func(msg []byte) {
|
||||
defer func() { recover() }()
|
||||
select {
|
||||
case out <- msg:
|
||||
p.updateQueueSize(1)
|
||||
default:
|
||||
util_putBytes(msg)
|
||||
}
|
||||
out <- msg
|
||||
}
|
||||
p.close = func() { sock.Close() }
|
||||
setNoDelay(sock, true)
|
||||
|
Loading…
Reference in New Issue
Block a user