5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-14 04:30:32 +00:00

Merge pull request #374 from yggdrasil-network/develop

Version 0.3.4
This commit is contained in:
Neil Alexander 2019-03-12 10:02:04 +00:00 committed by GitHub
commit 43643e0307
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 1031 additions and 327 deletions

View File

@ -3,7 +3,7 @@
# Check https://circleci.com/docs/2.0/language-go/ for more details # Check https://circleci.com/docs/2.0/language-go/ for more details
version: 2 version: 2
jobs: jobs:
build: build-linux:
docker: docker:
- image: circleci/golang:1.11 - image: circleci/golang:1.11
@ -44,16 +44,94 @@ jobs:
mv *.deb /tmp/upload/ mv *.deb /tmp/upload/
- run: - run:
name: Build for macOS name: Build for EdgeRouter
command: | command: |
rm -f {yggdrasil,yggdrasilctl} rm -f {yggdrasil,yggdrasilctl}
GOOS=darwin GOARCH=amd64 ./build && mv yggdrasil /tmp/upload/$CINAME-$CIVERSION-darwin-amd64 && mv yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-darwin-amd64; git clone https://github.com/neilalexander/vyatta-yggdrasil /tmp/vyatta-yggdrasil;
cd /tmp/vyatta-yggdrasil;
BUILDDIR_YGG=$CIRCLE_WORKING_DIRECTORY ./build-edgerouter-x $CIRCLE_BRANCH;
BUILDDIR_YGG=$CIRCLE_WORKING_DIRECTORY ./build-edgerouter-lite $CIRCLE_BRANCH;
mv *.deb /tmp/upload;
- persist_to_workspace:
root: /tmp
paths:
- upload
build-macos:
macos:
xcode: "10.0.0"
working_directory: ~/go/src/github.com/yggdrasil-network/yggdrasil-go
steps:
- checkout
- run:
name: Create artifact upload directory and set variables
command: |
mkdir /tmp/upload
echo 'export CINAME=$(sh contrib/semver/name.sh)' >> $BASH_ENV
echo 'export CIVERSION=$(sh contrib/semver/version.sh --bare)' >> $BASH_ENV
echo 'export PATH=$PATH:/usr/local/go/bin:~/go/bin' >> $BASH_ENV
git config --global user.email "$(git log --format='%ae' HEAD -1)";
git config --global user.name "$(git log --format='%an' HEAD -1)";
echo -e "Host *\n\tStrictHostKeyChecking no\n" >> ~/.ssh/config
- run:
name: Install Go 1.11
command: |
cd /tmp
curl -LO https://dl.google.com/go/go1.11.5.darwin-amd64.pkg
sudo installer -pkg /tmp/go1.11.5.darwin-amd64.pkg -target /
- run:
name: Install Gomobile
command: |
GO111MODULE=off go get golang.org/x/mobile/cmd/gomobile
gomobile init
- run:
name: Build for macOS
command: |
GO111MODULE=on GOOS=darwin GOARCH=amd64 ./build
cp yggdrasil /tmp/upload/$CINAME-$CIVERSION-darwin-amd64
cp yggdrasilctl /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-darwin-amd64;
- run: - run:
name: Build for macOS (.pkg format) name: Build for macOS (.pkg format)
command: | command: |
rm -rf {yggdrasil,yggdrasilctl} PKGARCH=amd64 sh contrib/macos/create-pkg.sh
GOOS=darwin GOARCH=amd64 ./build && PKGARCH=amd64 sh contrib/macos/create-pkg.sh && mv *.pkg /tmp/upload/ mv *.pkg /tmp/upload/
#- run:
# name: Build framework for iOS (.framework format)
# command: |
# sudo GO111MODULE=off go get -v github.com/yggdrasil-network/yggdrasil-go/cmd/...
# sudo GO111MODULE=off go get -v github.com/yggdrasil-network/yggdrasil-go/src/...
# GO111MODULE=off ./build -i
# mv *.framework /tmp/upload
- persist_to_workspace:
root: /tmp
paths:
- upload
build-other:
docker:
- image: circleci/golang:1.11
steps:
- checkout
- run:
name: Create artifact upload directory and set variables
command: |
mkdir /tmp/upload
echo 'export CINAME=$(sh contrib/semver/name.sh)' >> $BASH_ENV
echo 'export CIVERSION=$(sh contrib/semver/version.sh --bare)' >> $BASH_ENV
git config --global user.email "$(git log --format='%ae' HEAD -1)";
git config --global user.name "$(git log --format='%an' HEAD -1)";
- run: - run:
name: Build for OpenBSD name: Build for OpenBSD
@ -83,16 +161,31 @@ jobs:
GOOS=windows GOARCH=amd64 ./build && mv yggdrasil.exe /tmp/upload/$CINAME-$CIVERSION-windows-amd64.exe && mv yggdrasilctl.exe /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-windows-amd64.exe; GOOS=windows GOARCH=amd64 ./build && mv yggdrasil.exe /tmp/upload/$CINAME-$CIVERSION-windows-amd64.exe && mv yggdrasilctl.exe /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-windows-amd64.exe;
GOOS=windows GOARCH=386 ./build && mv yggdrasil.exe /tmp/upload/$CINAME-$CIVERSION-windows-i386.exe && mv yggdrasilctl.exe /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-windows-i386.exe; GOOS=windows GOARCH=386 ./build && mv yggdrasil.exe /tmp/upload/$CINAME-$CIVERSION-windows-i386.exe && mv yggdrasilctl.exe /tmp/upload/$CINAME-$CIVERSION-yggdrasilctl-windows-i386.exe;
- run: - persist_to_workspace:
name: Build for EdgeRouter root: /tmp
command: | paths:
rm -f {yggdrasil,yggdrasilctl} - upload
git clone https://github.com/neilalexander/vyatta-yggdrasil /tmp/vyatta-yggdrasil;
cd /tmp/vyatta-yggdrasil; upload:
BUILDDIR_YGG=$CIRCLE_WORKING_DIRECTORY ./build-edgerouter-x $CIRCLE_BRANCH; machine: true
BUILDDIR_YGG=$CIRCLE_WORKING_DIRECTORY ./build-edgerouter-lite $CIRCLE_BRANCH;
mv *.deb /tmp/upload; steps:
- attach_workspace:
at: /tmp
- store_artifacts: - store_artifacts:
path: /tmp/upload path: /tmp/upload
destination: / destination: /
workflows:
version: 2
build-all:
jobs:
- build-linux
- build-macos
- build-other
- upload:
requires:
- build-linux
- build-macos
- build-other

View File

@ -25,7 +25,33 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
- in case of vulnerabilities. - in case of vulnerabilities.
--> -->
## [0.3.3] - 2018-02-18 ## [0.3.4] - 2019-03-12
### Added
- Support for multiple listeners (although currently only TCP listeners are supported)
- New multicast behaviour where each multicast interface is given it's own link-local listener and does not depend on the `Listen` configuration
- Blocking detection in the switch to avoid parenting a blocked peer
- Support for adding and removing listeners and multicast interfaces when reloading configuration during runtime
- Yggdrasil will now attempt to clean up UNIX admin sockets on startup if left behind by a previous crash
- Admin socket `getTunnelRouting` and `setTunnelRouting` calls for enabling and disabling crypto-key routing during runtime
- On macOS, Yggdrasil will now try to wake up AWDL on start-up when `awdl0` is a configured multicast interface, to keep it awake after system sleep, and to stop waking it when no longer needed
- Added `LinkLocalTCPPort` option for controlling the port number that link-local TCP listeners will listen on by default when setting up `MulticastInterfaces` (a node restart is currently required for changes to `LinkLocalTCPPort` to take effect - it cannot be updated by reloading config during runtime)
### Changed
- The `Listen` configuration statement is now an array instead of a string
- The `Listen` configuration statement should now conform to the same formatting as peers with the protocol prefix, e.g. `tcp://[::]:0`
- Session workers are now non-blocking
- Multicast interval is now fixed at every 15 seconds and network interfaces are reevaluated for eligibility on each interval (where before the interval depended upon the number of configured multicast interfaces and evaluation only took place at startup)
- Dead connections are now closed in the link handler as opposed to the switch
- Peer forwarding is now prioritised instead of randomised
### Fixed
- Admin socket `getTunTap` call now returns properly instead of claiming no interface is enabled in all cases
- Handling of `getRoutes` etc in `yggdrasilctl` is now working
- Local interface names are no longer leaked in multicast packets
- Link-local TCP connections, particularly those initiated because of multicast beacons, are now always correctly scoped for the target interface
- Yggdrasil now correctly responds to multicast interfaces going up and down during runtime
## [0.3.3] - 2019-02-18
### Added ### Added
- Dynamic reconfiguration, which allows reloading the configuration file to make changes during runtime by sending a `SIGHUP` signal (note: this only works with `-useconffile` and not `-useconf` and currently reconfiguring TUN/TAP is not supported) - Dynamic reconfiguration, which allows reloading the configuration file to make changes during runtime by sending a `SIGHUP` signal (note: this only works with `-useconffile` and not `-useconf` and currently reconfiguring TUN/TAP is not supported)
- Support for building Yggdrasil as an iOS or Android framework if the appropriate tools (e.g. `gomobile`/`gobind` + SDKs) are available - Support for building Yggdrasil as an iOS or Android framework if the appropriate tools (e.g. `gomobile`/`gobind` + SDKs) are available

View File

@ -134,12 +134,20 @@ func readConfig(useconf *bool, useconffile *string, normaliseconf *bool) *nodeCo
} }
} }
} }
// Do a quick check for old-format Listen statement so that mapstructure
// doesn't fail and crash
if listen, ok := dat["Listen"].(string); ok {
if strings.HasPrefix(listen, "tcp://") {
dat["Listen"] = []string{listen}
} else {
dat["Listen"] = []string{"tcp://" + listen}
}
}
// Overlay our newly mapped configuration onto the autoconf node config that // Overlay our newly mapped configuration onto the autoconf node config that
// we generated above. // we generated above.
if err = mapstructure.Decode(dat, &cfg); err != nil { if err = mapstructure.Decode(dat, &cfg); err != nil {
panic(err) panic(err)
} }
return cfg return cfg
} }

View File

@ -388,16 +388,30 @@ func main() {
} }
} }
case "getroutes": case "getroutes":
if _, ok := res["routes"]; !ok { if routes, ok := res["routes"].(map[string]interface{}); !ok {
fmt.Println("No routes found") fmt.Println("No routes found")
} else if res["routes"] == nil { } else {
if res["routes"] == nil || len(routes) == 0 {
fmt.Println("No routes found") fmt.Println("No routes found")
} else { } else {
fmt.Println("Routes:") fmt.Println("Routes:")
for _, v := range res["routes"].([]interface{}) { for k, v := range routes {
fmt.Println("-", v) if pv, ok := v.(string); ok {
fmt.Println("-", k, " via ", pv)
} }
} }
}
}
case "settunnelrouting":
fallthrough
case "gettunnelrouting":
if enabled, ok := res["enabled"].(bool); !ok {
fmt.Println("Tunnel routing is disabled")
} else if !enabled {
fmt.Println("Tunnel routing is disabled")
} else {
fmt.Println("Tunnel routing is enabled")
}
default: default:
if json, err := json.MarshalIndent(recv["response"], "", " "); err == nil { if json, err := json.MarshalIndent(recv["response"], "", " "); err == nil {
fmt.Println(string(json)) fmt.Println(string(json))

View File

@ -0,0 +1,23 @@
# Last Modified: Sat Mar 9 06:08:02 2019
#include <tunables/global>
/usr/bin/yggdrasil {
#include <abstractions/base>
capability net_admin,
network inet stream,
network inet dgram,
network inet6 dgram,
network inet6 stream,
network netlink raw,
/lib/@{multiarch}/ld-*.so mr,
/proc/sys/net/core/somaxconn r,
/dev/net/tun rw,
/usr/bin/yggdrasil mr,
/etc/yggdrasil.conf rw,
/run/yggdrasil.sock rw,
}

View File

@ -0,0 +1,77 @@
#!/bin/sh
CONFFILE="/etc/yggdrasil.conf"
genconf() {
/usr/bin/yggdrasil -genconf > "$1"
return $?
}
probetun() {
modprobe tun
return $?
}
start() {
if [ ! -f "$CONFFILE" ]; then
printf 'Generating configuration file: '
if genconf "$CONFFILE"; then
echo "OK"
else
echo "FAIL"
return 1
fi
fi
if [ ! -e /dev/net/tun ]; then
printf 'Inserting TUN module: '
if probetun; then
echo "OK"
else
echo "FAIL"
return 1
fi
fi
printf 'Starting yggdrasil: '
if start-stop-daemon -S -q -b -x /usr/bin/yggdrasil \
-- -useconffile "$CONFFILE"; then
echo "OK"
else
echo "FAIL"
fi
}
stop() {
printf "Stopping yggdrasil: "
if start-stop-daemon -K -q -x /usr/bin/yggdrasil; then
echo "OK"
else
echo "FAIL"
fi
}
reload() {
printf "Reloading yggdrasil: "
if start-stop-daemon -K -q -s HUP -x /usr/bin/yggdrasil; then
echo "OK"
else
echo "FAIL"
start
fi
}
restart() {
stop
start
}
case "$1" in
start|stop|restart|reload)
"$1";;
*)
echo "Usage: $0 {start|stop|restart|reload}"
exit 1
esac
exit 0

View File

@ -8,7 +8,7 @@
<array> <array>
<string>sh</string> <string>sh</string>
<string>-c</string> <string>-c</string>
<string>/usr/local/bin/yggdrasil -useconf &lt; /etc/yggdrasil.conf</string> <string>/usr/local/bin/yggdrasil -useconffile /etc/yggdrasil.conf</string>
</array> </array>
<key>KeepAlive</key> <key>KeepAlive</key>
<true/> <true/>

55
contrib/openrc/yggdrasil Executable file
View File

@ -0,0 +1,55 @@
#!/sbin/openrc-run
description="An experiment in scalable routing as an encrypted IPv6 overlay network."
CONFFILE="/etc/yggdrasil.conf"
pidfile="/run/${RC_SVCNAME}.pid"
command="/usr/bin/yggdrasil"
extra_started_commands="reload"
depend() {
use net dns logger
}
start_pre() {
if [ ! -f "${CONFFILE}" ]; then
ebegin "Generating new configuration file into ${CONFFILE}"
if ! eval ${command} -genconf > ${CONFFILE}; then
eerror "Failed to generate configuration file"
exit 1
fi
fi
if [ ! -e /dev/net/tun ]; then
ebegin "Inserting TUN module"
if ! modprobe tun; then
eerror "Failed to insert TUN kernel module"
exit 1
fi
fi
}
start() {
ebegin "Starting ${RC_SVCNAME}"
start-stop-daemon --start --quiet \
--pidfile "${pidfile}" \
--make-pidfile \
--background \
--stdout /var/log/yggdrasil.stdout.log \
--stderr /var/log/yggdrasil.stderr.log \
--exec "${command}" -- -useconffile "${CONFFILE}"
eend $?
}
reload() {
ebegin "Reloading ${RC_SVCNAME}"
start-stop-daemon --signal HUP --pidfile "${pidfile}"
eend $?
}
stop() {
ebegin "Stopping ${RC_SVCNAME}"
start-stop-daemon --stop --pidfile "${pidfile}" --exec "${command}"
eend $?
}

View File

@ -6,13 +6,15 @@ import "os"
import "strings" import "strings"
import "strconv" import "strconv"
import "time" import "time"
import "log"
import "runtime" import "runtime"
import "runtime/pprof" import "runtime/pprof"
import "flag" import "flag"
import "github.com/gologme/log"
import . "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil" import . "github.com/yggdrasil-network/yggdrasil-go/src/yggdrasil"
import . "github.com/yggdrasil-network/yggdrasil-go/src/crypto"
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////

View File

@ -12,7 +12,7 @@ import (
// NodeConfig defines all configuration values needed to run a signle yggdrasil node // NodeConfig defines all configuration values needed to run a signle yggdrasil node
type NodeConfig struct { type NodeConfig struct {
Listen string `comment:"Listen address for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."` Listen []string `comment:"Listen addresses for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."`
AdminListen string `comment:"Listen address for admin connections. Default is to listen for local\nconnections either on TCP/9001 or a UNIX socket depending on your\nplatform. Use this value for yggdrasilctl -endpoint=X. To disable\nthe admin socket, use the value \"none\" instead."` AdminListen string `comment:"Listen address for admin connections. Default is to listen for local\nconnections either on TCP/9001 or a UNIX socket depending on your\nplatform. Use this value for yggdrasilctl -endpoint=X. To disable\nthe admin socket, use the value \"none\" instead."`
Peers []string `comment:"List of connection strings for static peers in URI format, e.g.\ntcp://a.b.c.d:e or socks://a.b.c.d:e/f.g.h.i:j."` Peers []string `comment:"List of connection strings for static peers in URI format, e.g.\ntcp://a.b.c.d:e or socks://a.b.c.d:e/f.g.h.i:j."`
InterfacePeers map[string][]string `comment:"List of connection strings for static peers in URI format, arranged\nby source interface, e.g. { \"eth0\": [ tcp://a.b.c.d:e ] }. Note that\nSOCKS peerings will NOT be affected by this option and should go in\nthe \"Peers\" section instead."` InterfacePeers map[string][]string `comment:"List of connection strings for static peers in URI format, arranged\nby source interface, e.g. { \"eth0\": [ tcp://a.b.c.d:e ] }. Note that\nSOCKS peerings will NOT be affected by this option and should go in\nthe \"Peers\" section instead."`
@ -22,6 +22,7 @@ type NodeConfig struct {
SigningPublicKey string `comment:"Your public signing key. You should not ordinarily need to share\nthis with anyone."` SigningPublicKey string `comment:"Your public signing key. You should not ordinarily need to share\nthis with anyone."`
SigningPrivateKey string `comment:"Your private signing key. DO NOT share this with anyone!"` SigningPrivateKey string `comment:"Your private signing key. DO NOT share this with anyone!"`
MulticastInterfaces []string `comment:"Regular expressions for which interfaces multicast peer discovery\nshould be enabled on. If none specified, multicast peer discovery is\ndisabled. The default value is .* which uses all interfaces."` MulticastInterfaces []string `comment:"Regular expressions for which interfaces multicast peer discovery\nshould be enabled on. If none specified, multicast peer discovery is\ndisabled. The default value is .* which uses all interfaces."`
LinkLocalTCPPort uint16 `comment:"The port number to be used for the link-local TCP listeners for the\nconfigured MulticastInterfaces. This option does not affect listeners\nspecified in the Listen option. Unless you plan to firewall link-local\ntraffic, it is best to leave this as the default value of 0. This\noption cannot currently be changed by reloading config during runtime."`
IfName string `comment:"Local network interface name for TUN/TAP adapter, or \"auto\" to select\nan interface automatically, or \"none\" to run without TUN/TAP."` IfName string `comment:"Local network interface name for TUN/TAP adapter, or \"auto\" to select\nan interface automatically, or \"none\" to run without TUN/TAP."`
IfTAPMode bool `comment:"Set local network interface to TAP mode rather than TUN mode if\nsupported by your platform - option will be ignored if not."` IfTAPMode bool `comment:"Set local network interface to TAP mode rather than TUN mode if\nsupported by your platform - option will be ignored if not."`
IfMTU int `comment:"Maximux Transmission Unit (MTU) size for your local TUN/TAP interface.\nDefault is the largest supported size for your platform. The lowest\npossible value is 1280."` IfMTU int `comment:"Maximux Transmission Unit (MTU) size for your local TUN/TAP interface.\nDefault is the largest supported size for your platform. The lowest\npossible value is 1280."`
@ -30,13 +31,6 @@ type NodeConfig struct {
SwitchOptions SwitchOptions `comment:"Advanced options for tuning the switch. Normally you will not need\nto edit these options."` SwitchOptions SwitchOptions `comment:"Advanced options for tuning the switch. Normally you will not need\nto edit these options."`
NodeInfoPrivacy bool `comment:"By default, nodeinfo contains some defaults including the platform,\narchitecture and Yggdrasil version. These can help when surveying\nthe network and diagnosing network routing problems. Enabling\nnodeinfo privacy prevents this, so that only items specified in\n\"NodeInfo\" are sent back if specified."` NodeInfoPrivacy bool `comment:"By default, nodeinfo contains some defaults including the platform,\narchitecture and Yggdrasil version. These can help when surveying\nthe network and diagnosing network routing problems. Enabling\nnodeinfo privacy prevents this, so that only items specified in\n\"NodeInfo\" are sent back if specified."`
NodeInfo map[string]interface{} `comment:"Optional node info. This must be a { \"key\": \"value\", ... } map\nor set as null. This is entirely optional but, if set, is visible\nto the whole network on request."` NodeInfo map[string]interface{} `comment:"Optional node info. This must be a { \"key\": \"value\", ... } map\nor set as null. This is entirely optional but, if set, is visible\nto the whole network on request."`
//Net NetConfig `comment:"Extended options for connecting to peers over other networks."`
}
// NetConfig defines network/proxy related configuration values
type NetConfig struct {
Tor TorConfig `comment:"Experimental options for configuring peerings over Tor."`
I2P I2PConfig `comment:"Experimental options for configuring peerings over I2P."`
} }
// SessionFirewall controls the session firewall configuration // SessionFirewall controls the session firewall configuration
@ -71,18 +65,16 @@ type SwitchOptions struct {
// isAutoconf is that the TCP and UDP ports will likely end up with different // isAutoconf is that the TCP and UDP ports will likely end up with different
// port numbers. // port numbers.
func GenerateConfig(isAutoconf bool) *NodeConfig { func GenerateConfig(isAutoconf bool) *NodeConfig {
// Create a new core.
//core := Core{}
// Generate encryption keys. // Generate encryption keys.
bpub, bpriv := crypto.NewBoxKeys() bpub, bpriv := crypto.NewBoxKeys()
spub, spriv := crypto.NewSigKeys() spub, spriv := crypto.NewSigKeys()
// Create a node configuration and populate it. // Create a node configuration and populate it.
cfg := NodeConfig{} cfg := NodeConfig{}
if isAutoconf { if isAutoconf {
cfg.Listen = "[::]:0" cfg.Listen = []string{"tcp://[::]:0"}
} else { } else {
r1 := rand.New(rand.NewSource(time.Now().UnixNano())) r1 := rand.New(rand.NewSource(time.Now().UnixNano()))
cfg.Listen = fmt.Sprintf("[::]:%d", r1.Intn(65534-32768)+32768) cfg.Listen = []string{fmt.Sprintf("tcp://[::]:%d", r1.Intn(65534-32768)+32768)}
} }
cfg.AdminListen = defaults.GetDefaults().DefaultAdminListen cfg.AdminListen = defaults.GetDefaults().DefaultAdminListen
cfg.EncryptionPublicKey = hex.EncodeToString(bpub[:]) cfg.EncryptionPublicKey = hex.EncodeToString(bpub[:])

View File

@ -1,8 +0,0 @@
package config
// I2PConfig is the configuration structure for i2p related configuration
type I2PConfig struct {
Keyfile string // private key file or empty string for ephemeral keys
Addr string // address of i2p api connector
Enabled bool
}

View File

@ -1,8 +0,0 @@
package config
// TorConfig is the configuration structure for Tor Proxy related values
type TorConfig struct {
OnionKeyfile string // hidden service private key for ADD_ONION (currently unimplemented)
ControlAddr string // tor control port address
Enabled bool
}

View File

@ -56,3 +56,40 @@ func TimerStop(t *time.Timer) bool {
} }
return true return true
} }
// Run a blocking function with a timeout.
// Returns true if the function returns.
// Returns false if the timer fires.
// The blocked function remains blocked--the caller is responsible for somehow killing it.
func FuncTimeout(f func(), timeout time.Duration) bool {
success := make(chan struct{})
go func() {
defer close(success)
f()
}()
timer := time.NewTimer(timeout)
defer TimerStop(timer)
select {
case <-success:
return true
case <-timer.C:
return false
}
}
// This calculates the difference between two arrays and returns items
// that appear in A but not in B - useful somewhat when reconfiguring
// and working out what configuration items changed
func Difference(a, b []string) []string {
ab := []string{}
mb := map[string]bool{}
for _, x := range b {
mb[x] = true
}
for _, x := range a {
if !mb[x] {
ab = append(ab, x)
}
}
return ab
}

View File

@ -173,9 +173,10 @@ func (a *admin) init(c *Core) {
}) })
a.addHandler("getTunTap", []string{}, func(in admin_info) (r admin_info, e error) { a.addHandler("getTunTap", []string{}, func(in admin_info) (r admin_info, e error) {
defer func() { defer func() {
recover() if err := recover(); err != nil {
r = admin_info{"none": admin_info{}} r = admin_info{"none": admin_info{}}
e = nil e = nil
}
}() }()
return admin_info{ return admin_info{
@ -251,6 +252,23 @@ func (a *admin) init(c *Core) {
}, errors.New("Failed to remove allowed key") }, errors.New("Failed to remove allowed key")
} }
}) })
a.addHandler("getTunnelRouting", []string{}, func(in admin_info) (admin_info, error) {
enabled := false
a.core.router.doAdmin(func() {
enabled = a.core.router.cryptokey.isEnabled()
})
return admin_info{"enabled": enabled}, nil
})
a.addHandler("setTunnelRouting", []string{"enabled"}, func(in admin_info) (admin_info, error) {
enabled := false
if e, ok := in["enabled"].(bool); ok {
enabled = e
}
a.core.router.doAdmin(func() {
a.core.router.cryptokey.setEnabled(enabled)
})
return admin_info{"enabled": enabled}, nil
})
a.addHandler("addSourceSubnet", []string{"subnet"}, func(in admin_info) (admin_info, error) { a.addHandler("addSourceSubnet", []string{"subnet"}, func(in admin_info) (admin_info, error) {
var err error var err error
a.core.router.doAdmin(func() { a.core.router.doAdmin(func() {
@ -402,7 +420,18 @@ func (a *admin) listen() {
switch strings.ToLower(u.Scheme) { switch strings.ToLower(u.Scheme) {
case "unix": case "unix":
if _, err := os.Stat(a.listenaddr[7:]); err == nil { if _, err := os.Stat(a.listenaddr[7:]); err == nil {
a.core.log.Warnln("WARNING:", a.listenaddr[7:], "already exists and may be in use by another process") a.core.log.Debugln("Admin socket", a.listenaddr[7:], "already exists, trying to clean up")
if _, err := net.DialTimeout("unix", a.listenaddr[7:], time.Second*2); err == nil || err.(net.Error).Timeout() {
a.core.log.Errorln("Admin socket", a.listenaddr[7:], "already exists and is in use by another process")
os.Exit(1)
} else {
if err := os.Remove(a.listenaddr[7:]); err == nil {
a.core.log.Debugln(a.listenaddr[7:], "was cleaned up")
} else {
a.core.log.Errorln(a.listenaddr[7:], "already exists and was not cleaned up:", err)
os.Exit(1)
}
}
} }
a.listener, err = net.Listen("unix", a.listenaddr[7:]) a.listener, err = net.Listen("unix", a.listenaddr[7:])
if err == nil { if err == nil {
@ -562,18 +591,9 @@ func (a *admin) printInfos(infos []admin_nodeInfo) string {
// addPeer triggers a connection attempt to a node. // addPeer triggers a connection attempt to a node.
func (a *admin) addPeer(addr string, sintf string) error { func (a *admin) addPeer(addr string, sintf string) error {
u, err := url.Parse(addr) err := a.core.link.call(addr, sintf)
if err == nil { if err != nil {
switch strings.ToLower(u.Scheme) { return err
case "tcp":
a.core.tcp.connect(u.Host, sintf)
case "socks":
a.core.tcp.connectSOCKS(u.Host, u.Path[1:])
default:
return errors.New("invalid peer: " + addr)
}
} else {
return errors.New("invalid peer: " + addr)
} }
return nil return nil
} }
@ -655,7 +675,8 @@ func (a *admin) getData_getPeers() []admin_nodeInfo {
{"uptime", int(time.Since(p.firstSeen).Seconds())}, {"uptime", int(time.Since(p.firstSeen).Seconds())},
{"bytes_sent", atomic.LoadUint64(&p.bytesSent)}, {"bytes_sent", atomic.LoadUint64(&p.bytesSent)},
{"bytes_recvd", atomic.LoadUint64(&p.bytesRecvd)}, {"bytes_recvd", atomic.LoadUint64(&p.bytesRecvd)},
{"endpoint", p.endpoint}, {"proto", p.intf.info.linkType},
{"endpoint", p.intf.name},
{"box_pub_key", hex.EncodeToString(p.box[:])}, {"box_pub_key", hex.EncodeToString(p.box[:])},
} }
peerInfos = append(peerInfos, info) peerInfos = append(peerInfos, info)
@ -681,7 +702,8 @@ func (a *admin) getData_getSwitchPeers() []admin_nodeInfo {
{"port", elem.port}, {"port", elem.port},
{"bytes_sent", atomic.LoadUint64(&peer.bytesSent)}, {"bytes_sent", atomic.LoadUint64(&peer.bytesSent)},
{"bytes_recvd", atomic.LoadUint64(&peer.bytesRecvd)}, {"bytes_recvd", atomic.LoadUint64(&peer.bytesRecvd)},
{"endpoint", peer.endpoint}, {"proto", peer.intf.info.linkType},
{"endpoint", peer.intf.info.remote},
{"box_pub_key", hex.EncodeToString(peer.box[:])}, {"box_pub_key", hex.EncodeToString(peer.box[:])},
} }
peerInfos = append(peerInfos, info) peerInfos = append(peerInfos, info)

View File

@ -8,6 +8,7 @@ import (
type awdl struct { type awdl struct {
link *link link *link
reconfigure chan chan error
mutex sync.RWMutex // protects interfaces below mutex sync.RWMutex // protects interfaces below
interfaces map[string]*awdlInterface interfaces map[string]*awdlInterface
} }
@ -49,8 +50,15 @@ func (a *awdl) init(l *link) error {
a.link = l a.link = l
a.mutex.Lock() a.mutex.Lock()
a.interfaces = make(map[string]*awdlInterface) a.interfaces = make(map[string]*awdlInterface)
a.reconfigure = make(chan chan error, 1)
a.mutex.Unlock() a.mutex.Unlock()
go func() {
for e := range a.reconfigure {
e <- nil
}
}()
return nil return nil
} }

View File

@ -44,7 +44,6 @@ type Core struct {
admin admin admin admin
searches searches searches searches
multicast multicast multicast multicast
tcp tcpInterface
link link link link
log *log.Logger log *log.Logger
} }
@ -125,11 +124,15 @@ func (c *Core) addPeerLoop() {
// UpdateConfig updates the configuration in Core and then signals the // UpdateConfig updates the configuration in Core and then signals the
// various module goroutines to reconfigure themselves if needed // various module goroutines to reconfigure themselves if needed
func (c *Core) UpdateConfig(config *config.NodeConfig) { func (c *Core) UpdateConfig(config *config.NodeConfig) {
c.log.Infoln("Reloading configuration...")
c.configMutex.Lock() c.configMutex.Lock()
c.configOld = c.config c.configOld = c.config
c.config = *config c.config = *config
c.configMutex.Unlock() c.configMutex.Unlock()
errors := 0
components := []chan chan error{ components := []chan chan error{
c.admin.reconfigure, c.admin.reconfigure,
c.searches.reconfigure, c.searches.reconfigure,
@ -140,7 +143,7 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) {
c.router.tun.reconfigure, c.router.tun.reconfigure,
c.router.cryptokey.reconfigure, c.router.cryptokey.reconfigure,
c.switchTable.reconfigure, c.switchTable.reconfigure,
c.tcp.reconfigure, c.link.reconfigure,
c.multicast.reconfigure, c.multicast.reconfigure,
} }
@ -148,9 +151,16 @@ func (c *Core) UpdateConfig(config *config.NodeConfig) {
response := make(chan error) response := make(chan error)
component <- response component <- response
if err := <-response; err != nil { if err := <-response; err != nil {
c.log.Println(err) c.log.Errorln(err)
errors++
} }
} }
if errors > 0 {
c.log.Warnln(errors, "modules reported errors during configuration reload")
} else {
c.log.Infoln("Configuration reloaded successfully")
}
} }
// GetBuildName gets the current build name. This is usually injected if built // GetBuildName gets the current build name. This is usually injected if built
@ -194,11 +204,6 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error {
c.init() c.init()
if err := c.tcp.init(c); err != nil {
c.log.Errorln("Failed to start TCP interface")
return err
}
if err := c.link.init(c); err != nil { if err := c.link.init(c); err != nil {
c.log.Errorln("Failed to start link interfaces") c.log.Errorln("Failed to start link interfaces")
return err return err

View File

@ -97,7 +97,15 @@ func (c *Core) DEBUG_getPeers() *peers {
} }
func (ps *peers) DEBUG_newPeer(box crypto.BoxPubKey, sig crypto.SigPubKey, link crypto.BoxSharedKey) *peer { func (ps *peers) DEBUG_newPeer(box crypto.BoxPubKey, sig crypto.SigPubKey, link crypto.BoxSharedKey) *peer {
return ps.newPeer(&box, &sig, &link, "(simulator)", nil) sim := linkInterface{
name: "(simulator)",
info: linkInfo{
local: "(simulator)",
remote: "(simulator)",
linkType: "sim",
},
}
return ps.newPeer(&box, &sig, &link, &sim, nil)
} }
/* /*
@ -449,19 +457,19 @@ func (c *Core) DEBUG_addSOCKSConn(socksaddr, peeraddr string) {
//* //*
func (c *Core) DEBUG_setupAndStartGlobalTCPInterface(addrport string) { func (c *Core) DEBUG_setupAndStartGlobalTCPInterface(addrport string) {
c.config.Listen = addrport c.config.Listen = []string{addrport}
if err := c.tcp.init(c /*, addrport, 0*/); err != nil { if err := c.link.init(c); err != nil {
c.log.Println("Failed to start TCP interface:", err) c.log.Println("Failed to start interfaces:", err)
panic(err) panic(err)
} }
} }
func (c *Core) DEBUG_getGlobalTCPAddr() *net.TCPAddr { func (c *Core) DEBUG_getGlobalTCPAddr() *net.TCPAddr {
return c.tcp.serv.Addr().(*net.TCPAddr) return c.link.tcp.getAddr()
} }
func (c *Core) DEBUG_addTCPConn(saddr string) { func (c *Core) DEBUG_addTCPConn(saddr string) {
c.tcp.call(saddr, nil, "") c.link.tcp.call(saddr, nil, "")
} }
//*/ //*/

View File

@ -4,9 +4,12 @@ import (
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt" "fmt"
"io"
"net" "net"
"net/url"
"strings" "strings"
"sync" "sync"
//"sync/atomic" //"sync/atomic"
"time" "time"
@ -17,9 +20,11 @@ import (
type link struct { type link struct {
core *Core core *Core
reconfigure chan chan error
mutex sync.RWMutex // protects interfaces below mutex sync.RWMutex // protects interfaces below
interfaces map[linkInfo]*linkInterface interfaces map[linkInfo]*linkInterface
awdl awdl // AWDL interface support awdl awdl // AWDL interface support
tcp tcp // TCP interface support
// TODO timeout (to remove from switch), read from config.ReadTimeout // TODO timeout (to remove from switch), read from config.ReadTimeout
} }
@ -55,16 +60,72 @@ func (l *link) init(c *Core) error {
l.core = c l.core = c
l.mutex.Lock() l.mutex.Lock()
l.interfaces = make(map[linkInfo]*linkInterface) l.interfaces = make(map[linkInfo]*linkInterface)
l.reconfigure = make(chan chan error)
l.mutex.Unlock() l.mutex.Unlock()
if err := l.awdl.init(l); err != nil { if err := l.tcp.init(l); err != nil {
l.core.log.Errorln("Failed to start AWDL interface") c.log.Errorln("Failed to start TCP interface")
return err return err
} }
if err := l.awdl.init(l); err != nil {
c.log.Errorln("Failed to start AWDL interface")
return err
}
go func() {
for {
e := <-l.reconfigure
tcpresponse := make(chan error)
awdlresponse := make(chan error)
l.tcp.reconfigure <- tcpresponse
if err := <-tcpresponse; err != nil {
e <- err
continue
}
l.awdl.reconfigure <- awdlresponse
if err := <-awdlresponse; err != nil {
e <- err
continue
}
e <- nil
}
}()
return nil return nil
} }
func (l *link) call(uri string, sintf string) error {
u, err := url.Parse(uri)
if err != nil {
return err
}
pathtokens := strings.Split(strings.Trim(u.Path, "/"), "/")
switch u.Scheme {
case "tcp":
l.tcp.call(u.Host, nil, sintf)
case "socks":
l.tcp.call(pathtokens[0], u.Host, sintf)
default:
return errors.New("unknown call scheme: " + u.Scheme)
}
return nil
}
func (l *link) listen(uri string) error {
u, err := url.Parse(uri)
if err != nil {
return err
}
switch u.Scheme {
case "tcp":
_, err := l.tcp.listen(u.Host)
return err
default:
return errors.New("unknown listen scheme: " + u.Scheme)
}
}
func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) { func (l *link) create(msgIO linkInterfaceMsgIO, name, linkType, local, remote string, incoming, force bool) (*linkInterface, error) {
// Technically anything unique would work for names, but lets pick something human readable, just for debugging // Technically anything unique would work for names, but lets pick something human readable, just for debugging
intf := linkInterface{ intf := linkInterface{
@ -91,11 +152,16 @@ func (intf *linkInterface) handler() error {
meta.link = *myLinkPub meta.link = *myLinkPub
metaBytes := meta.encode() metaBytes := meta.encode()
// TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer) // TODO timeouts on send/recv (goroutine for send/recv, channel select w/ timer)
err := intf.msgIO._sendMetaBytes(metaBytes) var err error
if !util.FuncTimeout(func() { err = intf.msgIO._sendMetaBytes(metaBytes) }, 30*time.Second) {
return errors.New("timeout on metadata send")
}
if err != nil { if err != nil {
return err return err
} }
metaBytes, err = intf.msgIO._recvMetaBytes() if !util.FuncTimeout(func() { metaBytes, err = intf.msgIO._recvMetaBytes() }, 30*time.Second) {
return errors.New("timeout on metadata recv")
}
if err != nil { if err != nil {
return err return err
} }
@ -109,8 +175,8 @@ func (intf *linkInterface) handler() error {
return errors.New("failed to connect: wrong version") return errors.New("failed to connect: wrong version")
} }
// Check if we're authorized to connect to this key / IP // Check if we're authorized to connect to this key / IP
if !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) { if !intf.incoming && !intf.force && !intf.link.core.peers.isAllowedEncryptionPublicKey(&meta.box) {
intf.link.core.log.Debugf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s", intf.link.core.log.Warnf("%s connection to %s forbidden: AllowedEncryptionPublicKeys does not contain key %s",
strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:])) strings.ToUpper(intf.info.linkType), intf.info.remote, hex.EncodeToString(meta.box[:]))
intf.msgIO.close() intf.msgIO.close()
return nil return nil
@ -141,7 +207,7 @@ func (intf *linkInterface) handler() error {
intf.link.mutex.Unlock() intf.link.mutex.Unlock()
// Create peer // Create peer
shared := crypto.GetSharedKey(myLinkPriv, &meta.link) shared := crypto.GetSharedKey(myLinkPriv, &meta.link)
intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf.name, func() { intf.msgIO.close() }) intf.peer = intf.link.core.peers.newPeer(&meta.box, &meta.sig, shared, intf, func() { intf.msgIO.close() })
if intf.peer == nil { if intf.peer == nil {
return errors.New("failed to create peer") return errors.New("failed to create peer")
} }
@ -162,14 +228,15 @@ func (intf *linkInterface) handler() error {
themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote) themString := fmt.Sprintf("%s@%s", themAddrString, intf.info.remote)
intf.link.core.log.Infof("Connected %s: %s, source %s", intf.link.core.log.Infof("Connected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
defer intf.link.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
// Start the link loop // Start the link loop
go intf.peer.linkLoop() go intf.peer.linkLoop()
// Start the writer // Start the writer
signalReady := make(chan struct{}, 1) signalReady := make(chan struct{}, 1)
signalSent := make(chan bool, 1) signalSent := make(chan bool, 1)
sendAck := make(chan struct{}, 1) sendAck := make(chan struct{}, 1)
sendBlocked := time.NewTimer(time.Second)
defer util.TimerStop(sendBlocked)
util.TimerStop(sendBlocked)
go func() { go func() {
defer close(signalReady) defer close(signalReady)
defer close(signalSent) defer close(signalSent)
@ -177,7 +244,9 @@ func (intf *linkInterface) handler() error {
tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp tcpTimer := time.NewTimer(interval) // used for backwards compat with old tcp
defer util.TimerStop(tcpTimer) defer util.TimerStop(tcpTimer)
send := func(bs []byte) { send := func(bs []byte) {
sendBlocked.Reset(time.Second)
intf.msgIO.writeMsg(bs) intf.msgIO.writeMsg(bs)
util.TimerStop(sendBlocked)
select { select {
case signalSent <- len(bs) > 0: case signalSent <- len(bs) > 0:
default: default:
@ -197,15 +266,15 @@ func (intf *linkInterface) handler() error {
// Now block until something is ready or the timer triggers keepalive traffic // Now block until something is ready or the timer triggers keepalive traffic
select { select {
case <-tcpTimer.C: case <-tcpTimer.C:
intf.link.core.log.Debugf("Sending (legacy) keep-alive to %s: %s, source %s", intf.link.core.log.Tracef("Sending (legacy) keep-alive to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
send(nil) send(nil)
case <-sendAck: case <-sendAck:
intf.link.core.log.Debugf("Sending ack to %s: %s, source %s", intf.link.core.log.Tracef("Sending ack to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
send(nil) send(nil)
case msg := <-intf.peer.linkOut: case msg := <-intf.peer.linkOut:
intf.msgIO.writeMsg(msg) send(msg)
case msg, ok := <-out: case msg, ok := <-out:
if !ok { if !ok {
return return
@ -216,8 +285,8 @@ func (intf *linkInterface) handler() error {
case signalReady <- struct{}{}: case signalReady <- struct{}{}:
default: default:
} }
intf.link.core.log.Debugf("Sending packet to %s: %s, source %s", //intf.link.core.log.Tracef("Sending packet to %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) // strings.ToUpper(intf.info.linkType), themString, intf.info.local)
} }
} }
}() }()
@ -225,26 +294,32 @@ func (intf *linkInterface) handler() error {
// Used to enable/disable activity in the switch // Used to enable/disable activity in the switch
signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive signalAlive := make(chan bool, 1) // True = real packet, false = keep-alive
defer close(signalAlive) defer close(signalAlive)
ret := make(chan error, 1) // How we signal the return value when multiple goroutines are involved
go func() { go func() {
var isAlive bool var isAlive bool
var isReady bool var isReady bool
var sendTimerRunning bool var sendTimerRunning bool
var recvTimerRunning bool var recvTimerRunning bool
recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed recvTime := 6 * time.Second // TODO set to ReadTimeout from the config, reset if it gets changed
closeTime := 2 * switch_timeout // TODO or maybe this makes more sense for ReadTimeout?...
sendTime := time.Second sendTime := time.Second
sendTimer := time.NewTimer(sendTime) sendTimer := time.NewTimer(sendTime)
defer util.TimerStop(sendTimer) defer util.TimerStop(sendTimer)
recvTimer := time.NewTimer(recvTime) recvTimer := time.NewTimer(recvTime)
defer util.TimerStop(recvTimer) defer util.TimerStop(recvTimer)
closeTimer := time.NewTimer(closeTime)
defer util.TimerStop(closeTimer)
for { for {
intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t", //intf.link.core.log.Debugf("State of %s: %s, source %s :: isAlive %t isReady %t sendTimerRunning %t recvTimerRunning %t",
strings.ToUpper(intf.info.linkType), themString, intf.info.local, // strings.ToUpper(intf.info.linkType), themString, intf.info.local,
isAlive, isReady, sendTimerRunning, recvTimerRunning) // isAlive, isReady, sendTimerRunning, recvTimerRunning)
select { select {
case gotMsg, ok := <-signalAlive: case gotMsg, ok := <-signalAlive:
if !ok { if !ok {
return return
} }
util.TimerStop(closeTimer)
closeTimer.Reset(closeTime)
util.TimerStop(recvTimer) util.TimerStop(recvTimer)
recvTimerRunning = false recvTimerRunning = false
isAlive = true isAlive = true
@ -261,7 +336,7 @@ func (intf *linkInterface) handler() error {
sendTimerRunning = true sendTimerRunning = true
} }
if !gotMsg { if !gotMsg {
intf.link.core.log.Debugf("Received ack from %s: %s, source %s", intf.link.core.log.Tracef("Received ack from %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local) strings.ToUpper(intf.info.linkType), themString, intf.info.local)
} }
case sentMsg, ok := <-signalSent: case sentMsg, ok := <-signalSent:
@ -290,6 +365,10 @@ func (intf *linkInterface) handler() error {
intf.link.core.switchTable.idleIn <- intf.peer.port intf.link.core.switchTable.idleIn <- intf.peer.port
isReady = true isReady = true
} }
case <-sendBlocked.C:
// We blocked while trying to send something
isReady = false
intf.link.core.switchTable.blockPeer(intf.peer.port)
case <-sendTimer.C: case <-sendTimer.C:
// We haven't sent anything, so signal a send of a 0 packet to let them know we're alive // We haven't sent anything, so signal a send of a 0 packet to let them know we're alive
select { select {
@ -299,6 +378,15 @@ func (intf *linkInterface) handler() error {
case <-recvTimer.C: case <-recvTimer.C:
// We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding // We haven't received anything, so assume there's a problem and don't return this node to the switch until they start responding
isAlive = false isAlive = false
intf.link.core.switchTable.blockPeer(intf.peer.port)
case <-closeTimer.C:
// We haven't received anything in a really long time, so things have died at the switch level and then some...
// Just close the connection at this point...
select {
case ret <- errors.New("timeout"):
default:
}
intf.msgIO.close()
} }
} }
}() }()
@ -309,7 +397,13 @@ func (intf *linkInterface) handler() error {
intf.peer.handlePacket(msg) intf.peer.handlePacket(msg)
} }
if err != nil { if err != nil {
return err if err != io.EOF {
select {
case ret <- err:
default:
}
}
break
} }
select { select {
case signalAlive <- len(msg) > 0: case signalAlive <- len(msg) > 0:
@ -317,5 +411,15 @@ func (intf *linkInterface) handler() error {
} }
} }
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
return nil // Remember to set `err` to something useful before returning
select {
case err = <-ret:
intf.link.core.log.Infof("Disconnected %s: %s, source %s; error: %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local, err)
default:
err = nil
intf.link.core.log.Infof("Disconnected %s: %s, source %s",
strings.ToUpper(intf.info.linkType), themString, intf.info.local)
}
return err
} }

View File

@ -5,7 +5,6 @@ import (
"fmt" "fmt"
"net" "net"
"regexp" "regexp"
"sync"
"time" "time"
"golang.org/x/net/ipv6" "golang.org/x/net/ipv6"
@ -16,19 +15,20 @@ type multicast struct {
reconfigure chan chan error reconfigure chan chan error
sock *ipv6.PacketConn sock *ipv6.PacketConn
groupAddr string groupAddr string
myAddr *net.TCPAddr listeners map[string]*tcpListener
myAddrMutex sync.RWMutex listenPort uint16
} }
func (m *multicast) init(core *Core) { func (m *multicast) init(core *Core) {
m.core = core m.core = core
m.reconfigure = make(chan chan error, 1) m.reconfigure = make(chan chan error, 1)
m.listeners = make(map[string]*tcpListener)
m.core.configMutex.RLock()
m.listenPort = m.core.config.LinkLocalTCPPort
m.core.configMutex.RUnlock()
go func() { go func() {
for { for {
e := <-m.reconfigure e := <-m.reconfigure
m.myAddrMutex.Lock()
m.myAddr = m.core.tcp.getAddr()
m.myAddrMutex.Unlock()
e <- nil e <- nil
} }
}() }()
@ -50,7 +50,7 @@ func (m *multicast) start() error {
} }
listenString := fmt.Sprintf("[::]:%v", addr.Port) listenString := fmt.Sprintf("[::]:%v", addr.Port)
lc := net.ListenConfig{ lc := net.ListenConfig{
Control: multicastReuse, Control: m.multicastReuse,
} }
conn, err := lc.ListenPacket(context.Background(), "udp6", listenString) conn, err := lc.ListenPacket(context.Background(), "udp6", listenString)
if err != nil { if err != nil {
@ -61,19 +61,20 @@ func (m *multicast) start() error {
// Windows can't set this flag, so we need to handle it in other ways // Windows can't set this flag, so we need to handle it in other ways
} }
go m.multicastStarted()
go m.listen() go m.listen()
go m.announce() go m.announce()
} }
return nil return nil
} }
func (m *multicast) interfaces() []net.Interface { func (m *multicast) interfaces() map[string]net.Interface {
// Get interface expressions from config // Get interface expressions from config
m.core.configMutex.RLock() m.core.configMutex.RLock()
exprs := m.core.config.MulticastInterfaces exprs := m.core.config.MulticastInterfaces
m.core.configMutex.RUnlock() m.core.configMutex.RUnlock()
// Ask the system for network interfaces // Ask the system for network interfaces
var interfaces []net.Interface interfaces := make(map[string]net.Interface)
allifaces, err := net.Interfaces() allifaces, err := net.Interfaces()
if err != nil { if err != nil {
panic(err) panic(err)
@ -93,12 +94,14 @@ func (m *multicast) interfaces() []net.Interface {
continue continue
} }
for _, expr := range exprs { for _, expr := range exprs {
// Compile each regular expression
e, err := regexp.Compile(expr) e, err := regexp.Compile(expr)
if err != nil { if err != nil {
panic(err) panic(err)
} }
// Does the interface match the regular expression? Store it if so
if e.MatchString(iface.Name) { if e.MatchString(iface.Name) {
interfaces = append(interfaces, iface) interfaces[iface.Name] = iface
} }
} }
} }
@ -106,10 +109,6 @@ func (m *multicast) interfaces() []net.Interface {
} }
func (m *multicast) announce() { func (m *multicast) announce() {
var anAddr net.TCPAddr
m.myAddrMutex.Lock()
m.myAddr = m.core.tcp.getAddr()
m.myAddrMutex.Unlock()
groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr) groupAddr, err := net.ResolveUDPAddr("udp6", m.groupAddr)
if err != nil { if err != nil {
panic(err) panic(err)
@ -119,33 +118,69 @@ func (m *multicast) announce() {
panic(err) panic(err)
} }
for { for {
for _, iface := range m.interfaces() { interfaces := m.interfaces()
m.sock.JoinGroup(&iface, groupAddr) // There might be interfaces that we configured listeners for but are no
// longer up - if that's the case then we should stop the listeners
for name, listener := range m.listeners {
if _, ok := interfaces[name]; !ok {
listener.stop <- true
delete(m.listeners, name)
m.core.log.Debugln("No longer multicasting on", name)
}
}
// Now that we have a list of valid interfaces from the operating system,
// we can start checking if we can send multicasts on them
for _, iface := range interfaces {
// Find interface addresses
addrs, err := iface.Addrs() addrs, err := iface.Addrs()
if err != nil { if err != nil {
panic(err) panic(err)
} }
m.myAddrMutex.RLock()
anAddr.Port = m.myAddr.Port
m.myAddrMutex.RUnlock()
for _, addr := range addrs { for _, addr := range addrs {
addrIP, _, _ := net.ParseCIDR(addr.String()) addrIP, _, _ := net.ParseCIDR(addr.String())
// Ignore IPv4 addresses
if addrIP.To4() != nil { if addrIP.To4() != nil {
continue continue
} // IPv6 only }
// Ignore non-link-local addresses
if !addrIP.IsLinkLocalUnicast() { if !addrIP.IsLinkLocalUnicast() {
continue continue
} }
anAddr.IP = addrIP // Join the multicast group
anAddr.Zone = iface.Name m.sock.JoinGroup(&iface, groupAddr)
// Try and see if we already have a TCP listener for this interface
var listener *tcpListener
if l, ok := m.listeners[iface.Name]; !ok || l.listener == nil {
// No listener was found - let's create one
listenaddr := fmt.Sprintf("[%s%%%s]:%d", addrIP, iface.Name, m.listenPort)
if li, err := m.core.link.tcp.listen(listenaddr); err == nil {
m.core.log.Debugln("Started multicasting on", iface.Name)
// Store the listener so that we can stop it later if needed
m.listeners[iface.Name] = li
listener = li
} else {
m.core.log.Warnln("Not multicasting on", iface.Name, "due to error:", err)
}
} else {
// An existing listener was found
listener = m.listeners[iface.Name]
}
// Make sure nothing above failed for some reason
if listener == nil {
continue
}
// Get the listener details and construct the multicast beacon
lladdr := listener.listener.Addr().String()
if a, err := net.ResolveTCPAddr("tcp6", lladdr); err == nil {
a.Zone = ""
destAddr.Zone = iface.Name destAddr.Zone = iface.Name
msg := []byte(anAddr.String()) msg := []byte(a.String())
m.sock.WriteTo(msg, nil, destAddr) m.sock.WriteTo(msg, nil, destAddr)
}
break break
} }
time.Sleep(time.Second)
} }
time.Sleep(time.Second) time.Sleep(time.Second * 15)
} }
} }
@ -180,8 +215,9 @@ func (m *multicast) listen() {
if addr.IP.String() != from.IP.String() { if addr.IP.String() != from.IP.String() {
continue continue
} }
addr.Zone = from.Zone addr.Zone = ""
saddr := addr.String() if err := m.core.link.call("tcp://"+addr.String(), from.Zone); err != nil {
m.core.tcp.connect(saddr, addr.Zone) m.core.log.Debugln("Call from multicast failed:", err)
}
} }
} }

View File

@ -2,10 +2,54 @@
package yggdrasil package yggdrasil
import "syscall" /*
import "golang.org/x/sys/unix" #cgo CFLAGS: -x objective-c
#cgo LDFLAGS: -framework Foundation
#import <Foundation/Foundation.h>
NSNetServiceBrowser *serviceBrowser;
void StartAWDLBrowsing() {
if (serviceBrowser == nil) {
serviceBrowser = [[NSNetServiceBrowser alloc] init];
serviceBrowser.includesPeerToPeer = YES;
}
[serviceBrowser searchForServicesOfType:@"_yggdrasil._tcp" inDomain:@""];
}
void StopAWDLBrowsing() {
if (serviceBrowser == nil) {
return;
}
[serviceBrowser stop];
}
*/
import "C"
import (
"syscall"
"time"
func multicastReuse(network string, address string, c syscall.RawConn) error { "golang.org/x/sys/unix"
)
var awdlGoroutineStarted bool
func (m *multicast) multicastStarted() {
if awdlGoroutineStarted {
return
}
m.core.log.Infoln("Multicast discovery will wake up AWDL if required")
awdlGoroutineStarted = true
for {
C.StopAWDLBrowsing()
for _, intf := range m.interfaces() {
if intf.Name == "awdl0" {
C.StartAWDLBrowsing()
break
}
}
time.Sleep(time.Minute)
}
}
func (m *multicast) multicastReuse(network string, address string, c syscall.RawConn) error {
var control error var control error
var reuseport error var reuseport error
var recvanyif error var recvanyif error

View File

@ -4,6 +4,10 @@ package yggdrasil
import "syscall" import "syscall"
func multicastReuse(network string, address string, c syscall.RawConn) error { func (m *multicast) multicastStarted() {
}
func (m *multicast) multicastReuse(network string, address string, c syscall.RawConn) error {
return nil return nil
} }

View File

@ -5,7 +5,11 @@ package yggdrasil
import "syscall" import "syscall"
import "golang.org/x/sys/unix" import "golang.org/x/sys/unix"
func multicastReuse(network string, address string, c syscall.RawConn) error { func (m *multicast) multicastStarted() {
}
func (m *multicast) multicastReuse(network string, address string, c syscall.RawConn) error {
var control error var control error
var reuseport error var reuseport error

View File

@ -5,7 +5,11 @@ package yggdrasil
import "syscall" import "syscall"
import "golang.org/x/sys/windows" import "golang.org/x/sys/windows"
func multicastReuse(network string, address string, c syscall.RawConn) error { func (m *multicast) multicastStarted() {
}
func (m *multicast) multicastReuse(network string, address string, c syscall.RawConn) error {
var control error var control error
var reuseaddr error var reuseaddr error

View File

@ -98,6 +98,7 @@ type peer struct {
bytesRecvd uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers
// BUG: sync/atomic, 32 bit platforms need the above to be the first element // BUG: sync/atomic, 32 bit platforms need the above to be the first element
core *Core core *Core
intf *linkInterface
port switchPort port switchPort
box crypto.BoxPubKey box crypto.BoxPubKey
sig crypto.SigPubKey sig crypto.SigPubKey
@ -113,18 +114,19 @@ type peer struct {
} }
// Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number. // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unoccupied port number.
func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, endpoint string, closer func()) *peer { func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShared *crypto.BoxSharedKey, intf *linkInterface, closer func()) *peer {
now := time.Now() now := time.Now()
p := peer{box: *box, p := peer{box: *box,
sig: *sig, sig: *sig,
shared: *crypto.GetSharedKey(&ps.core.boxPriv, box), shared: *crypto.GetSharedKey(&ps.core.boxPriv, box),
linkShared: *linkShared, linkShared: *linkShared,
endpoint: endpoint,
firstSeen: now, firstSeen: now,
doSend: make(chan struct{}, 1), doSend: make(chan struct{}, 1),
dinfo: make(chan *dhtInfo, 1), dinfo: make(chan *dhtInfo, 1),
close: closer, close: closer,
core: ps.core} core: ps.core,
intf: intf,
}
ps.mutex.Lock() ps.mutex.Lock()
defer ps.mutex.Unlock() defer ps.mutex.Unlock()
oldPorts := ps.getPorts() oldPorts := ps.getPorts()

View File

@ -66,19 +66,44 @@ func (r *router) init(core *Core) {
r.reconfigure = make(chan chan error, 1) r.reconfigure = make(chan chan error, 1)
r.addr = *address.AddrForNodeID(&r.core.dht.nodeID) r.addr = *address.AddrForNodeID(&r.core.dht.nodeID)
r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID) r.subnet = *address.SubnetForNodeID(&r.core.dht.nodeID)
in := make(chan []byte, 32) // TODO something better than this... in := make(chan []byte, 1) // TODO something better than this...
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, "(self)", nil) self := linkInterface{
p.out = func(packet []byte) { name: "(self)",
// This is to make very sure it never blocks info: linkInfo{
select { local: "(self)",
case in <- packet: remote: "(self)",
return linkType: "self",
default: },
util.PutBytes(packet)
}
} }
p := r.core.peers.newPeer(&r.core.boxPub, &r.core.sigPub, &crypto.BoxSharedKey{}, &self, nil)
p.out = func(packet []byte) { in <- packet }
r.in = in r.in = in
r.out = func(packet []byte) { p.handlePacket(packet) } // The caller is responsible for go-ing if it needs to not block out := make(chan []byte, 32)
go func() {
for packet := range out {
p.handlePacket(packet)
}
}()
out2 := make(chan []byte, 32)
go func() {
// This worker makes sure r.out never blocks
// It will buffer traffic long enough for the switch worker to take it
// If (somehow) you can send faster than the switch can receive, then this would use unbounded memory
// But crypto slows sends enough that the switch should always be able to take the packets...
var buf [][]byte
for {
buf = append(buf, <-out2)
for len(buf) > 0 {
select {
case bs := <-out2:
buf = append(buf, bs)
case out <- buf[0]:
buf = buf[1:]
}
}
}
}()
r.out = func(packet []byte) { out2 <- packet }
r.toRecv = make(chan router_recvPacket, 32) r.toRecv = make(chan router_recvPacket, 32)
recv := make(chan []byte, 32) recv := make(chan []byte, 32)
send := make(chan []byte, 32) send := make(chan []byte, 32)
@ -305,7 +330,6 @@ func (r *router) sendPacket(bs []byte) {
// Don't continue - drop the packet // Don't continue - drop the packet
return return
} }
sinfo.send <- bs sinfo.send <- bs
} }
} }

View File

@ -525,17 +525,36 @@ func (ss *sessions) resetInits() {
// It handles calling the relatively expensive crypto operations. // It handles calling the relatively expensive crypto operations.
// It's also responsible for checking nonces and dropping out-of-date/duplicate packets, or else calling the function to update nonces if the packet is OK. // It's also responsible for checking nonces and dropping out-of-date/duplicate packets, or else calling the function to update nonces if the packet is OK.
func (sinfo *sessionInfo) doWorker() { func (sinfo *sessionInfo) doWorker() {
send := make(chan []byte, 32)
defer close(send)
go func() {
for bs := range send {
sinfo.doSend(bs)
}
}()
recv := make(chan *wire_trafficPacket, 32)
defer close(recv)
go func() {
for p := range recv {
sinfo.doRecv(p)
}
}()
for { for {
select { select {
case p, ok := <-sinfo.recv: case p, ok := <-sinfo.recv:
if ok { if ok {
sinfo.doRecv(p) select {
case recv <- p:
default:
// We need something to not block, and it's best to drop it before we decrypt
util.PutBytes(p.Payload)
}
} else { } else {
return return
} }
case bs, ok := <-sinfo.send: case bs, ok := <-sinfo.send:
if ok { if ok {
sinfo.doSend(bs) send <- bs
} else { } else {
return return
} }
@ -625,8 +644,5 @@ func (sinfo *sessionInfo) doRecv(p *wire_trafficPacket) {
sinfo.updateNonce(&p.Nonce) sinfo.updateNonce(&p.Nonce)
sinfo.time = time.Now() sinfo.time = time.Now()
sinfo.bytesRecvd += uint64(len(bs)) sinfo.bytesRecvd += uint64(len(bs))
select { sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo}
case sinfo.core.router.toRecv <- router_recvPacket{bs, sinfo}:
default: // avoid deadlocks, maybe do this somewhere else?...
}
} }

View File

@ -131,6 +131,7 @@ type peerInfo struct {
faster map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower faster map[switchPort]uint64 // Counter of how often a node is faster than the current parent, penalized extra if slower
port switchPort // Interface number of this peer port switchPort // Interface number of this peer
msg switchMsg // The wire switchMsg used msg switchMsg // The wire switchMsg used
blocked bool // True if the link is blocked, used to avoid parenting a blocked link
} }
// This is just a uint64 with a named type for clarity reasons. // This is just a uint64 with a named type for clarity reasons.
@ -176,6 +177,7 @@ type switchTable struct {
admin chan func() // Pass a lambda for the admin socket to query stuff admin chan func() // Pass a lambda for the admin socket to query stuff
queues switch_buffers // Queues - not atomic so ONLY use through admin chan queues switch_buffers // Queues - not atomic so ONLY use through admin chan
queueTotalMaxSize uint64 // Maximum combined size of queues queueTotalMaxSize uint64 // Maximum combined size of queues
toRouter chan []byte // Packets to be sent to the router
} }
// Minimum allowed total size of switch queues. // Minimum allowed total size of switch queues.
@ -199,6 +201,7 @@ func (t *switchTable) init(core *Core) {
t.idleIn = make(chan switchPort, 1024) t.idleIn = make(chan switchPort, 1024)
t.admin = make(chan func()) t.admin = make(chan func())
t.queueTotalMaxSize = SwitchQueueTotalMinSize t.queueTotalMaxSize = SwitchQueueTotalMinSize
t.toRouter = make(chan []byte, 1)
} }
// Safely gets a copy of this node's locator. // Safely gets a copy of this node's locator.
@ -215,7 +218,6 @@ func (t *switchTable) doMaintenance() {
defer t.mutex.Unlock() // Release lock when we're done defer t.mutex.Unlock() // Release lock when we're done
t.cleanRoot() t.cleanRoot()
t.cleanDropped() t.cleanDropped()
t.cleanPeers()
} }
// Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out. // Updates the root periodically if it is ourself, or promotes ourself to root if we're better than the current root or if the current root has timed out.
@ -255,6 +257,29 @@ func (t *switchTable) cleanRoot() {
} }
} }
// Blocks and, if possible, unparents a peer
func (t *switchTable) blockPeer(port switchPort) {
t.mutex.Lock()
defer t.mutex.Unlock()
peer, isIn := t.data.peers[port]
if !isIn {
return
}
peer.blocked = true
t.data.peers[port] = peer
if port != t.parent {
return
}
t.parent = 0
for _, info := range t.data.peers {
if info.port == port {
continue
}
t.unlockedHandleMsg(&info.msg, info.port, true)
}
t.unlockedHandleMsg(&peer.msg, peer.port, true)
}
// Removes a peer. // Removes a peer.
// Must be called by the router mainLoop goroutine, e.g. call router.doAdmin with a lambda that calls this. // Must be called by the router mainLoop goroutine, e.g. call router.doAdmin with a lambda that calls this.
// If the removed peer was this node's parent, it immediately tries to find a new parent. // If the removed peer was this node's parent, it immediately tries to find a new parent.
@ -272,28 +297,6 @@ func (t *switchTable) forgetPeer(port switchPort) {
} }
} }
// Clean all unresponsive peers from the table, needed in case a peer stops updating.
// Needed in case a non-parent peer keeps the connection open but stops sending updates.
// Also reclaims space from deleted peers by copying the map.
func (t *switchTable) cleanPeers() {
now := time.Now()
for port, peer := range t.data.peers {
if now.Sub(peer.time) > switch_timeout+switch_throttle {
// Longer than switch_timeout to make sure we don't remove a working peer because the root stopped responding.
delete(t.data.peers, port)
go t.core.peers.removePeer(port) // TODO figure out if it's safe to do this without a goroutine, or make it safe
}
}
if _, isIn := t.data.peers[t.parent]; !isIn {
// The root timestamp would probably time out before this happens, but better safe than sorry.
// We removed the current parent, so find a new one.
t.parent = 0
for _, peer := range t.data.peers {
t.unlockedHandleMsg(&peer.msg, peer.port, true)
}
}
}
// Dropped is a list of roots that are better than the current root, but stopped sending new timestamps. // Dropped is a list of roots that are better than the current root, but stopped sending new timestamps.
// If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos. // If we switch to a new root, and that root is better than an old root that previously timed out, then we can clean up the old dropped root infos.
// This function is called periodically to do that cleanup. // This function is called periodically to do that cleanup.
@ -416,6 +419,7 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
if reprocessing { if reprocessing {
sender.faster = oldSender.faster sender.faster = oldSender.faster
sender.time = oldSender.time sender.time = oldSender.time
sender.blocked = oldSender.blocked
} else { } else {
sender.faster = make(map[switchPort]uint64, len(oldSender.faster)) sender.faster = make(map[switchPort]uint64, len(oldSender.faster))
for port, peer := range t.data.peers { for port, peer := range t.data.peers {
@ -475,6 +479,11 @@ func (t *switchTable) unlockedHandleMsg(msg *switchMsg, fromPort switchPort, rep
case sender.faster[t.parent] >= switch_faster_threshold: case sender.faster[t.parent] >= switch_faster_threshold:
// The is reliably faster than the current parent. // The is reliably faster than the current parent.
updateRoot = true updateRoot = true
case !sender.blocked && oldParent.blocked:
// Replace a blocked parent
updateRoot = true
case reprocessing && sender.blocked && !oldParent.blocked:
// Don't replace an unblocked parent when reprocessing
case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]: case reprocessing && sender.faster[t.parent] > oldParent.faster[sender.port]:
// The sender seems to be reliably faster than the current parent, so switch to them instead. // The sender seems to be reliably faster than the current parent, so switch to them instead.
updateRoot = true updateRoot = true
@ -570,23 +579,23 @@ func (t *switchTable) start() error {
return nil return nil
} }
// Check if a packet should go to the self node // Return a map of ports onto distance, keeping only ports closer to the destination than this node
// This means there's no node closer to the destination than us // If the map is empty (or nil), then no peer is closer
// This is mainly used to identify packets addressed to us, or that hit a blackhole func (t *switchTable) getCloser(dest []byte) map[switchPort]int {
func (t *switchTable) selfIsClosest(dest []byte) bool {
table := t.getTable() table := t.getTable()
myDist := table.self.dist(dest) myDist := table.self.dist(dest)
if myDist == 0 { if myDist == 0 {
// Skip the iteration step if it's impossible to be closer // Skip the iteration step if it's impossible to be closer
return true return nil
} }
closer := make(map[switchPort]int, len(table.elems))
for _, info := range table.elems { for _, info := range table.elems {
dist := info.locator.dist(dest) dist := info.locator.dist(dest)
if dist < myDist { if dist < myDist {
return false closer[info.port] = dist
} }
} }
return true return closer
} }
// Returns true if the peer is closer to the destination than ourself // Returns true if the peer is closer to the destination than ourself
@ -637,28 +646,42 @@ func (t *switchTable) bestPortForCoords(coords []byte) switchPort {
// Handle an incoming packet // Handle an incoming packet
// Either send it to ourself, or to the first idle peer that's free // Either send it to ourself, or to the first idle peer that's free
// Returns true if the packet has been handled somehow, false if it should be queued // Returns true if the packet has been handled somehow, false if it should be queued
func (t *switchTable) handleIn(packet []byte, idle map[switchPort]struct{}) bool { func (t *switchTable) handleIn(packet []byte, idle map[switchPort]time.Time) bool {
coords := switch_getPacketCoords(packet) coords := switch_getPacketCoords(packet)
ports := t.core.peers.getPorts() closer := t.getCloser(coords)
if t.selfIsClosest(coords) { if len(closer) == 0 {
// TODO? call the router directly, and remove the whole concept of a self peer? // TODO? call the router directly, and remove the whole concept of a self peer?
ports[0].sendPacket(packet) t.toRouter <- packet
return true return true
} }
table := t.getTable()
myDist := table.self.dist(coords)
var best *peer var best *peer
bestDist := myDist var bestDist int
for port := range idle { var bestTime time.Time
if to := ports[port]; to != nil { ports := t.core.peers.getPorts()
if info, isIn := table.elems[to.port]; isIn { for port, dist := range closer {
dist := info.locator.dist(coords) to := ports[port]
if !(dist < bestDist) { thisTime, isIdle := idle[port]
continue var update bool
switch {
case to == nil:
//nothing
case !isIdle:
//nothing
case best == nil:
update = true
case dist < bestDist:
update = true
case dist > bestDist:
//nothing
case thisTime.Before(bestTime):
update = true
default:
//nothing
} }
if update {
best = to best = to
bestDist = dist bestDist = dist
} bestTime = thisTime
} }
} }
if best != nil { if best != nil {
@ -697,7 +720,7 @@ func (b *switch_buffers) cleanup(t *switchTable) {
// Remove queues for which we have no next hop // Remove queues for which we have no next hop
packet := buf.packets[0] packet := buf.packets[0]
coords := switch_getPacketCoords(packet.bytes) coords := switch_getPacketCoords(packet.bytes)
if t.selfIsClosest(coords) { if len(t.getCloser(coords)) == 0 {
for _, packet := range buf.packets { for _, packet := range buf.packets {
util.PutBytes(packet.bytes) util.PutBytes(packet.bytes)
} }
@ -776,11 +799,38 @@ func (t *switchTable) handleIdle(port switchPort) bool {
// The switch worker does routing lookups and sends packets to where they need to be // The switch worker does routing lookups and sends packets to where they need to be
func (t *switchTable) doWorker() { func (t *switchTable) doWorker() {
sendingToRouter := make(chan []byte, 1)
go func() {
// Keep sending packets to the router
self := t.core.peers.getPorts()[0]
for bs := range sendingToRouter {
self.sendPacket(bs)
}
}()
go func() {
// Keep taking packets from the idle worker and sending them to the above whenever it's idle, keeping anything extra in a (fifo, head-drop) buffer
var buf [][]byte
for {
buf = append(buf, <-t.toRouter)
for len(buf) > 0 {
select {
case bs := <-t.toRouter:
buf = append(buf, bs)
for len(buf) > 32 {
util.PutBytes(buf[0])
buf = buf[1:]
}
case sendingToRouter <- buf[0]:
buf = buf[1:]
}
}
}
}()
t.queues.switchTable = t t.queues.switchTable = t
t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string) t.queues.bufs = make(map[string]switch_buffer) // Packets per PacketStreamID (string)
idle := make(map[switchPort]struct{}) // this is to deduplicate things idle := make(map[switchPort]time.Time) // this is to deduplicate things
for { for {
t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs)) //t.core.log.Debugf("Switch state: idle = %d, buffers = %d", len(idle), len(t.queues.bufs))
select { select {
case bytes := <-t.packetIn: case bytes := <-t.packetIn:
// Try to send it somewhere (or drop it if it's corrupt or at a dead end) // Try to send it somewhere (or drop it if it's corrupt or at a dead end)
@ -811,7 +861,7 @@ func (t *switchTable) doWorker() {
// Try to find something to send to this peer // Try to find something to send to this peer
if !t.handleIdle(port) { if !t.handleIdle(port) {
// Didn't find anything ready to send yet, so stay idle // Didn't find anything ready to send yet, so stay idle
idle[port] = struct{}{} idle[port] = time.Now()
} }
case f := <-t.admin: case f := <-t.admin:
f() f()

View File

@ -24,35 +24,29 @@ import (
"golang.org/x/net/proxy" "golang.org/x/net/proxy"
"github.com/yggdrasil-network/yggdrasil-go/src/crypto" "github.com/yggdrasil-network/yggdrasil-go/src/util"
) )
const default_timeout = 6 * time.Second const default_timeout = 6 * time.Second
const tcp_ping_interval = (default_timeout * 2 / 3) const tcp_ping_interval = (default_timeout * 2 / 3)
// The TCP listener and information about active TCP connections, to avoid duplication. // The TCP listener and information about active TCP connections, to avoid duplication.
type tcpInterface struct { type tcp struct {
core *Core link *link
reconfigure chan chan error reconfigure chan chan error
serv net.Listener
stop chan bool
addr string
mutex sync.Mutex // Protecting the below mutex sync.Mutex // Protecting the below
listeners map[string]*tcpListener
calls map[string]struct{} calls map[string]struct{}
conns map[tcpInfo](chan struct{}) conns map[linkInfo](chan struct{})
} }
// This is used as the key to a map that tracks existing connections, to prevent multiple connections to the same keys and local/remote address pair from occuring. type tcpListener struct {
// Different address combinations are allowed, so multi-homing is still technically possible (but not necessarily advisable). listener net.Listener
type tcpInfo struct { stop chan bool
box crypto.BoxPubKey
sig crypto.SigPubKey
localAddr string
remoteAddr string
} }
// Wrapper function to set additional options for specific connection types. // Wrapper function to set additional options for specific connection types.
func (iface *tcpInterface) setExtraOptions(c net.Conn) { func (t *tcp) setExtraOptions(c net.Conn) {
switch sock := c.(type) { switch sock := c.(type) {
case *net.TCPConn: case *net.TCPConn:
sock.SetNoDelay(true) sock.SetNoDelay(true)
@ -62,104 +56,152 @@ func (iface *tcpInterface) setExtraOptions(c net.Conn) {
} }
// Returns the address of the listener. // Returns the address of the listener.
func (iface *tcpInterface) getAddr() *net.TCPAddr { func (t *tcp) getAddr() *net.TCPAddr {
return iface.serv.Addr().(*net.TCPAddr) // TODO: Fix this, because this will currently only give a single address
} // to multicast.go, which obviously is not great, but right now multicast.go
// doesn't have the ability to send more than one address in a packet either
// Attempts to initiate a connection to the provided address. t.mutex.Lock()
func (iface *tcpInterface) connect(addr string, intf string) { defer t.mutex.Unlock()
iface.call(addr, nil, intf) for _, l := range t.listeners {
} return l.listener.Addr().(*net.TCPAddr)
}
// Attempst to initiate a connection to the provided address, viathe provided socks proxy address. return nil
func (iface *tcpInterface) connectSOCKS(socksaddr, peeraddr string) {
iface.call(peeraddr, &socksaddr, "")
} }
// Initializes the struct. // Initializes the struct.
func (iface *tcpInterface) init(core *Core) (err error) { func (t *tcp) init(l *link) error {
iface.core = core t.link = l
iface.stop = make(chan bool, 1) t.reconfigure = make(chan chan error, 1)
iface.reconfigure = make(chan chan error, 1) t.mutex.Lock()
t.calls = make(map[string]struct{})
t.conns = make(map[linkInfo](chan struct{}))
t.listeners = make(map[string]*tcpListener)
t.mutex.Unlock()
go func() { go func() {
for { for {
e := <-iface.reconfigure e := <-t.reconfigure
iface.core.configMutex.RLock() t.link.core.configMutex.RLock()
updated := iface.core.config.Listen != iface.core.configOld.Listen added := util.Difference(t.link.core.config.Listen, t.link.core.configOld.Listen)
iface.core.configMutex.RUnlock() deleted := util.Difference(t.link.core.configOld.Listen, t.link.core.config.Listen)
if updated { t.link.core.configMutex.RUnlock()
iface.stop <- true if len(added) > 0 || len(deleted) > 0 {
iface.serv.Close() for _, a := range added {
e <- iface.listen() if a[:6] != "tcp://" {
continue
}
if _, err := t.listen(a[6:]); err != nil {
e <- err
continue
}
}
for _, d := range deleted {
if d[:6] != "tcp://" {
continue
}
t.mutex.Lock()
if listener, ok := t.listeners[d[6:]]; ok {
t.mutex.Unlock()
listener.stop <- true
} else {
t.mutex.Unlock()
}
}
e <- nil
} else { } else {
e <- nil e <- nil
} }
} }
}() }()
return iface.listen() t.link.core.configMutex.RLock()
defer t.link.core.configMutex.RUnlock()
for _, listenaddr := range t.link.core.config.Listen {
if listenaddr[:6] != "tcp://" {
continue
}
if _, err := t.listen(listenaddr[6:]); err != nil {
return err
}
}
return nil
} }
func (iface *tcpInterface) listen() error { func (t *tcp) listen(listenaddr string) (*tcpListener, error) {
var err error var err error
iface.core.configMutex.RLock()
iface.addr = iface.core.config.Listen
iface.core.configMutex.RUnlock()
ctx := context.Background() ctx := context.Background()
lc := net.ListenConfig{ lc := net.ListenConfig{
Control: iface.tcpContext, Control: t.tcpContext,
} }
iface.serv, err = lc.Listen(ctx, "tcp", iface.addr) listener, err := lc.Listen(ctx, "tcp", listenaddr)
if err == nil { if err == nil {
iface.mutex.Lock() l := tcpListener{
iface.calls = make(map[string]struct{}) listener: listener,
iface.conns = make(map[tcpInfo](chan struct{})) stop: make(chan bool),
iface.mutex.Unlock() }
go iface.listener() go t.listener(&l, listenaddr)
return nil return &l, nil
} }
return err return nil, err
} }
// Runs the listener, which spawns off goroutines for incoming connections. // Runs the listener, which spawns off goroutines for incoming connections.
func (iface *tcpInterface) listener() { func (t *tcp) listener(l *tcpListener, listenaddr string) {
defer iface.serv.Close() if l == nil {
iface.core.log.Infoln("Listening for TCP on:", iface.serv.Addr().String()) return
}
// Track the listener so that we can find it again in future
t.mutex.Lock()
if _, isIn := t.listeners[listenaddr]; isIn {
t.mutex.Unlock()
l.listener.Close()
return
} else {
t.listeners[listenaddr] = l
t.mutex.Unlock()
}
// And here we go!
accepted := make(chan bool)
defer func() {
t.link.core.log.Infoln("Stopping TCP listener on:", l.listener.Addr().String())
l.listener.Close()
t.mutex.Lock()
delete(t.listeners, listenaddr)
t.mutex.Unlock()
}()
t.link.core.log.Infoln("Listening for TCP on:", l.listener.Addr().String())
for { for {
sock, err := iface.serv.Accept() var sock net.Conn
if err != nil { var err error
iface.core.log.Errorln("Failed to accept connection:", err) // Listen in a separate goroutine, as that way it does not block us from
return // receiving "stop" events
} go func() {
sock, err = l.listener.Accept()
accepted <- true
}()
// Wait for either an accepted connection, or a message telling us to stop
// the TCP listener
select { select {
case <-iface.stop: case <-accepted:
iface.core.log.Errorln("Stopping listener")
return
default:
if err != nil { if err != nil {
panic(err) t.link.core.log.Errorln("Failed to accept connection:", err)
return
} }
go iface.handler(sock, true) go t.handler(sock, true, nil)
case <-l.stop:
return
} }
} }
} }
// Checks if we already have a connection to this node
func (iface *tcpInterface) isAlreadyConnected(info tcpInfo) bool {
iface.mutex.Lock()
defer iface.mutex.Unlock()
_, isIn := iface.conns[info]
return isIn
}
// Checks if we already are calling this address // Checks if we already are calling this address
func (iface *tcpInterface) isAlreadyCalling(saddr string) bool { func (t *tcp) isAlreadyCalling(saddr string) bool {
iface.mutex.Lock() t.mutex.Lock()
defer iface.mutex.Unlock() defer t.mutex.Unlock()
_, isIn := iface.calls[saddr] _, isIn := t.calls[saddr]
return isIn return isIn
} }
@ -168,34 +210,39 @@ func (iface *tcpInterface) isAlreadyCalling(saddr string) bool {
// If the dial is successful, it launches the handler. // If the dial is successful, it launches the handler.
// When finished, it removes the outgoing call, so reconnection attempts can be made later. // When finished, it removes the outgoing call, so reconnection attempts can be made later.
// This all happens in a separate goroutine that it spawns. // This all happens in a separate goroutine that it spawns.
func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { func (t *tcp) call(saddr string, options interface{}, sintf string) {
go func() { go func() {
callname := saddr callname := saddr
if sintf != "" { if sintf != "" {
callname = fmt.Sprintf("%s/%s", saddr, sintf) callname = fmt.Sprintf("%s/%s", saddr, sintf)
} }
if iface.isAlreadyCalling(callname) { if t.isAlreadyCalling(callname) {
return return
} }
iface.mutex.Lock() t.mutex.Lock()
iface.calls[callname] = struct{}{} t.calls[callname] = struct{}{}
iface.mutex.Unlock() t.mutex.Unlock()
defer func() { defer func() {
// Block new calls for a little while, to mitigate livelock scenarios // Block new calls for a little while, to mitigate livelock scenarios
time.Sleep(default_timeout) time.Sleep(default_timeout)
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
iface.mutex.Lock() t.mutex.Lock()
delete(iface.calls, callname) delete(t.calls, callname)
iface.mutex.Unlock() t.mutex.Unlock()
}() }()
var conn net.Conn var conn net.Conn
var err error var err error
if socksaddr != nil { socksaddr, issocks := options.(string)
if issocks {
if sintf != "" { if sintf != "" {
return return
} }
dialerdst, er := net.ResolveTCPAddr("tcp", socksaddr)
if er != nil {
return
}
var dialer proxy.Dialer var dialer proxy.Dialer
dialer, err = proxy.SOCKS5("tcp", *socksaddr, nil, proxy.Direct) dialer, err = proxy.SOCKS5("tcp", dialerdst.String(), nil, proxy.Direct)
if err != nil { if err != nil {
return return
} }
@ -210,9 +257,20 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) {
addr: saddr, addr: saddr,
}, },
} }
t.handler(conn, false, dialerdst.String())
} else { } else {
dst, err := net.ResolveTCPAddr("tcp", saddr)
if err != nil {
return
}
if dst.IP.IsLinkLocalUnicast() {
dst.Zone = sintf
if dst.Zone == "" {
return
}
}
dialer := net.Dialer{ dialer := net.Dialer{
Control: iface.tcpContext, Control: t.tcpContext,
} }
if sintf != "" { if sintf != "" {
ief, err := net.InterfaceByName(sintf) ief, err := net.InterfaceByName(sintf)
@ -224,10 +282,6 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) {
} }
addrs, err := ief.Addrs() addrs, err := ief.Addrs()
if err == nil { if err == nil {
dst, err := net.ResolveTCPAddr("tcp", saddr)
if err != nil {
return
}
for addrindex, addr := range addrs { for addrindex, addr := range addrs {
src, _, err := net.ParseCIDR(addr.String()) src, _, err := net.ParseCIDR(addr.String())
if err != nil { if err != nil {
@ -261,31 +315,39 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) {
} }
} }
} }
conn, err = dialer.Dial("tcp", dst.String())
conn, err = dialer.Dial("tcp", saddr)
if err != nil { if err != nil {
t.link.core.log.Debugln("Failed to dial TCP:", err)
return return
} }
t.handler(conn, false, nil)
} }
iface.handler(conn, false)
}() }()
} }
func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { func (t *tcp) handler(sock net.Conn, incoming bool, options interface{}) {
defer sock.Close() defer sock.Close()
iface.setExtraOptions(sock) t.setExtraOptions(sock)
stream := stream{} stream := stream{}
stream.init(sock) stream.init(sock)
local, _, _ := net.SplitHostPort(sock.LocalAddr().String()) local, _, _ := net.SplitHostPort(sock.LocalAddr().String())
remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String()) remote, _, _ := net.SplitHostPort(sock.RemoteAddr().String())
remotelinklocal := net.ParseIP(remote).IsLinkLocalUnicast() remotelinklocal := net.ParseIP(remote).IsLinkLocalUnicast()
name := "tcp://" + sock.RemoteAddr().String() var name string
link, err := iface.core.link.create(&stream, name, "tcp", local, remote, incoming, remotelinklocal) var proto string
if socksaddr, issocks := options.(string); issocks {
name = "socks://" + socksaddr + "/" + sock.RemoteAddr().String()
proto = "socks"
} else {
name = "tcp://" + sock.RemoteAddr().String()
proto = "tcp"
}
link, err := t.link.core.link.create(&stream, name, proto, local, remote, incoming, remotelinklocal)
if err != nil { if err != nil {
iface.core.log.Println(err) t.link.core.log.Println(err)
panic(err) panic(err)
} }
iface.core.log.Debugln("DEBUG: starting handler for", name) t.link.core.log.Debugln("DEBUG: starting handler for", name)
err = link.handler() err = link.handler()
iface.core.log.Debugln("DEBUG: stopped handler for", name, err) t.link.core.log.Debugln("DEBUG: stopped handler for", name, err)
} }

View File

@ -10,7 +10,7 @@ import (
// WARNING: This context is used both by net.Dialer and net.Listen in tcp.go // WARNING: This context is used both by net.Dialer and net.Listen in tcp.go
func (iface *tcpInterface) tcpContext(network, address string, c syscall.RawConn) error { func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error {
var control error var control error
var recvanyif error var recvanyif error

View File

@ -8,6 +8,6 @@ import (
// WARNING: This context is used both by net.Dialer and net.Listen in tcp.go // WARNING: This context is used both by net.Dialer and net.Listen in tcp.go
func (iface *tcpInterface) tcpContext(network, address string, c syscall.RawConn) error { func (t *tcp) tcpContext(network, address string, c syscall.RawConn) error {
return nil return nil
} }