diff --git a/cmd/yggdrasil/main.go b/cmd/yggdrasil/main.go index ee5375b..d93f5d2 100644 --- a/cmd/yggdrasil/main.go +++ b/cmd/yggdrasil/main.go @@ -191,19 +191,8 @@ func main() { } // Check to see if the peers are in a parsable format, if not then default // them to the TCP scheme - for index, peer := range dat["Peers"].([]interface{}) { - uri := peer.(string) - if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { - continue - } - if strings.HasPrefix(uri, "tcp:") { - uri = uri[4:] - } - (dat["Peers"].([]interface{}))[index] = "tcp://" + uri - } - // Now do the same with the interface peers - for intf, peers := range dat["InterfacePeers"].(map[string]interface{}) { - for index, peer := range peers.([]interface{}) { + if peers, ok := dat["Peers"].([]interface{}); ok { + for index, peer := range peers { uri := peer.(string) if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { continue @@ -211,7 +200,22 @@ func main() { if strings.HasPrefix(uri, "tcp:") { uri = uri[4:] } - ((dat["InterfacePeers"].(map[string]interface{}))[intf]).([]interface{})[index] = "tcp://" + uri + (dat["Peers"].([]interface{}))[index] = "tcp://" + uri + } + } + // Now do the same with the interface peers + if interfacepeers, ok := dat["InterfacePeers"].(map[string]interface{}); ok { + for intf, peers := range interfacepeers { + for index, peer := range peers.([]interface{}) { + uri := peer.(string) + if strings.HasPrefix(uri, "tcp://") || strings.HasPrefix(uri, "socks://") { + continue + } + if strings.HasPrefix(uri, "tcp:") { + uri = uri[4:] + } + ((dat["InterfacePeers"].(map[string]interface{}))[intf]).([]interface{})[index] = "tcp://" + uri + } } } // Overlay our newly mapped configuration onto the autoconf node config that diff --git a/cmd/yggdrasilctl/main.go b/cmd/yggdrasilctl/main.go index b37c256..8135815 100644 --- a/cmd/yggdrasilctl/main.go +++ b/cmd/yggdrasilctl/main.go @@ -87,6 +87,7 @@ func main() { logger.Println("Falling back to platform default", defaults.GetDefaults().DefaultAdminListen) } } else { + endpoint = *server logger.Println("Using endpoint", endpoint, "from command line") } diff --git a/src/config/config.go b/src/config/config.go index 9671eca..b5a1f89 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -2,23 +2,24 @@ package config // NodeConfig defines all configuration values needed to run a signle yggdrasil node type NodeConfig struct { - Listen string `comment:"Listen address for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."` - AdminListen string `comment:"Listen address for admin connections. Default is to listen for local\nconnections either on TCP/9001 or a UNIX socket depending on your\nplatform. Use this value for yggdrasilctl -endpoint=X. To disable\nthe admin socket, use the value \"none\" instead."` - Peers []string `comment:"List of connection strings for static peers in URI format, e.g.\ntcp://a.b.c.d:e or socks://a.b.c.d:e/f.g.h.i:j."` - InterfacePeers map[string][]string `comment:"List of connection strings for static peers in URI format, arranged\nby source interface, e.g. { \"eth0\": [ tcp://a.b.c.d:e ] }. Note that\nSOCKS peerings will NOT be affected by this option and should go in\nthe \"Peers\" section instead."` - ReadTimeout int32 `comment:"Read timeout for connections, specified in milliseconds. If less\nthan 6000 and not negative, 6000 (the default) is used. If negative,\nreads won't time out."` - AllowedEncryptionPublicKeys []string `comment:"List of peer encryption public keys to allow or incoming TCP\nconnections from. If left empty/undefined then all connections\nwill be allowed by default."` - EncryptionPublicKey string `comment:"Your public encryption key. Your peers may ask you for this to put\ninto their AllowedEncryptionPublicKeys configuration."` - EncryptionPrivateKey string `comment:"Your private encryption key. DO NOT share this with anyone!"` - SigningPublicKey string `comment:"Your public signing key. You should not ordinarily need to share\nthis with anyone."` - SigningPrivateKey string `comment:"Your private signing key. DO NOT share this with anyone!"` - MulticastInterfaces []string `comment:"Regular expressions for which interfaces multicast peer discovery\nshould be enabled on. If none specified, multicast peer discovery is\ndisabled. The default value is .* which uses all interfaces."` - IfName string `comment:"Local network interface name for TUN/TAP adapter, or \"auto\" to select\nan interface automatically, or \"none\" to run without TUN/TAP."` - IfTAPMode bool `comment:"Set local network interface to TAP mode rather than TUN mode if\nsupported by your platform - option will be ignored if not."` - IfMTU int `comment:"Maximux Transmission Unit (MTU) size for your local TUN/TAP interface.\nDefault is the largest supported size for your platform. The lowest\npossible value is 1280."` - SessionFirewall SessionFirewall `comment:"The session firewall controls who can send/receive network traffic\nto/from. This is useful if you want to protect this node without\nresorting to using a real firewall. This does not affect traffic\nbeing routed via this node to somewhere else. Rules are prioritised as\nfollows: blacklist, whitelist, always allow outgoing, direct, remote."` - TunnelRouting TunnelRouting `comment:"Allow tunneling non-Yggdrasil traffic over Yggdrasil. This effectively\nallows you to use Yggdrasil to route to, or to bridge other networks,\nsimilar to a VPN tunnel. Tunnelling works between any two nodes and\ndoes not require them to be directly peered."` - SwitchOptions SwitchOptions `comment:"Advanced options for tuning the switch. Normally you will not need\nto edit these options."` + Listen string `comment:"Listen address for peer connections. Default is to listen for all\nTCP connections over IPv4 and IPv6 with a random port."` + AdminListen string `comment:"Listen address for admin connections. Default is to listen for local\nconnections either on TCP/9001 or a UNIX socket depending on your\nplatform. Use this value for yggdrasilctl -endpoint=X. To disable\nthe admin socket, use the value \"none\" instead."` + Peers []string `comment:"List of connection strings for static peers in URI format, e.g.\ntcp://a.b.c.d:e or socks://a.b.c.d:e/f.g.h.i:j."` + InterfacePeers map[string][]string `comment:"List of connection strings for static peers in URI format, arranged\nby source interface, e.g. { \"eth0\": [ tcp://a.b.c.d:e ] }. Note that\nSOCKS peerings will NOT be affected by this option and should go in\nthe \"Peers\" section instead."` + ReadTimeout int32 `comment:"Read timeout for connections, specified in milliseconds. If less\nthan 6000 and not negative, 6000 (the default) is used. If negative,\nreads won't time out."` + AllowedEncryptionPublicKeys []string `comment:"List of peer encryption public keys to allow or incoming TCP\nconnections from. If left empty/undefined then all connections\nwill be allowed by default."` + EncryptionPublicKey string `comment:"Your public encryption key. Your peers may ask you for this to put\ninto their AllowedEncryptionPublicKeys configuration."` + EncryptionPrivateKey string `comment:"Your private encryption key. DO NOT share this with anyone!"` + SigningPublicKey string `comment:"Your public signing key. You should not ordinarily need to share\nthis with anyone."` + SigningPrivateKey string `comment:"Your private signing key. DO NOT share this with anyone!"` + MulticastInterfaces []string `comment:"Regular expressions for which interfaces multicast peer discovery\nshould be enabled on. If none specified, multicast peer discovery is\ndisabled. The default value is .* which uses all interfaces."` + IfName string `comment:"Local network interface name for TUN/TAP adapter, or \"auto\" to select\nan interface automatically, or \"none\" to run without TUN/TAP."` + IfTAPMode bool `comment:"Set local network interface to TAP mode rather than TUN mode if\nsupported by your platform - option will be ignored if not."` + IfMTU int `comment:"Maximux Transmission Unit (MTU) size for your local TUN/TAP interface.\nDefault is the largest supported size for your platform. The lowest\npossible value is 1280."` + SessionFirewall SessionFirewall `comment:"The session firewall controls who can send/receive network traffic\nto/from. This is useful if you want to protect this node without\nresorting to using a real firewall. This does not affect traffic\nbeing routed via this node to somewhere else. Rules are prioritised as\nfollows: blacklist, whitelist, always allow outgoing, direct, remote."` + TunnelRouting TunnelRouting `comment:"Allow tunneling non-Yggdrasil traffic over Yggdrasil. This effectively\nallows you to use Yggdrasil to route to, or to bridge other networks,\nsimilar to a VPN tunnel. Tunnelling works between any two nodes and\ndoes not require them to be directly peered."` + SwitchOptions SwitchOptions `comment:"Advanced options for tuning the switch. Normally you will not need\nto edit these options."` + NodeInfo map[string]interface{} `comment:"Optional node info. This must be a { \"key\": \"value\", ... } map\nor set as null. This is entirely optional but, if set, is visible\nto the whole network on request."` //Net NetConfig `comment:"Extended options for connecting to peers over other networks."` } diff --git a/src/yggdrasil/admin.go b/src/yggdrasil/admin.go index 1f77050..5028c9b 100644 --- a/src/yggdrasil/admin.go +++ b/src/yggdrasil/admin.go @@ -324,6 +324,23 @@ func (a *admin) init(c *Core, listenaddr string) { return admin_info{}, err } }) + a.addHandler("getNodeInfo", []string{"box_pub_key", "coords", "[nocache]"}, func(in admin_info) (admin_info, error) { + var nocache bool + if in["nocache"] != nil { + nocache = in["nocache"].(string) == "true" + } + result, err := a.admin_getNodeInfo(in["box_pub_key"].(string), in["coords"].(string), nocache) + if err == nil { + var m map[string]interface{} + if err = json.Unmarshal(result, &m); err == nil { + return admin_info{"nodeinfo": m}, nil + } else { + return admin_info{}, err + } + } else { + return admin_info{}, err + } + }) } // start runs the admin API socket to listen for / respond to admin API calls. @@ -802,6 +819,52 @@ func (a *admin) admin_dhtPing(keyString, coordString, targetString string) (dhtR return dhtRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString)) } +func (a *admin) admin_getNodeInfo(keyString, coordString string, nocache bool) (nodeinfoPayload, error) { + var key crypto.BoxPubKey + if keyBytes, err := hex.DecodeString(keyString); err != nil { + return nodeinfoPayload{}, err + } else { + copy(key[:], keyBytes) + } + if !nocache { + if response, err := a.core.nodeinfo.getCachedNodeInfo(key); err == nil { + return response, nil + } + } + var coords []byte + for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") { + if cstr == "" { + // Special case, happens if trimmed is the empty string, e.g. this is the root + continue + } + if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil { + return nodeinfoPayload{}, err + } else { + coords = append(coords, uint8(u64)) + } + } + response := make(chan *nodeinfoPayload, 1) + sendNodeInfoRequest := func() { + a.core.nodeinfo.addCallback(key, func(nodeinfo *nodeinfoPayload) { + defer func() { recover() }() + select { + case response <- nodeinfo: + default: + } + }) + a.core.nodeinfo.sendNodeInfo(key, coords, false) + } + a.core.router.doAdmin(sendNodeInfoRequest) + go func() { + time.Sleep(6 * time.Second) + close(response) + }() + for res := range response { + return *res, nil + } + return nodeinfoPayload{}, errors.New(fmt.Sprintf("getNodeInfo timeout: %s", keyString)) +} + // getResponse_dot returns a response for a graphviz dot formatted representation of the known parts of the network. // This is color-coded and labeled, and includes the self node, switch peers, nodes known to the DHT, and nodes with open sessions. // The graph is structured as a tree with directed links leading away from the root. diff --git a/src/yggdrasil/core.go b/src/yggdrasil/core.go index 2f9c451..99330e1 100644 --- a/src/yggdrasil/core.go +++ b/src/yggdrasil/core.go @@ -33,6 +33,7 @@ type Core struct { admin admin searches searches multicast multicast + nodeinfo nodeinfo tcp tcpInterface log *log.Logger ifceExpr []*regexp.Regexp // the zone of link-local IPv6 peers must match this @@ -123,6 +124,9 @@ func (c *Core) Start(nc *config.NodeConfig, log *log.Logger) error { c.init(&boxPub, &boxPriv, &sigPub, &sigPriv) c.admin.init(c, nc.AdminListen) + c.nodeinfo.init(c) + c.nodeinfo.setNodeInfo(nc.NodeInfo) + if err := c.tcp.init(c, nc.Listen, nc.ReadTimeout); err != nil { c.log.Println("Failed to start TCP interface") return err @@ -238,6 +242,16 @@ func (c *Core) GetSubnet() *net.IPNet { return &net.IPNet{IP: subnet, Mask: net.CIDRMask(64, 128)} } +// Gets the nodeinfo. +func (c *Core) GetNodeInfo() nodeinfoPayload { + return c.nodeinfo.getNodeInfo() +} + +// Sets the nodeinfo. +func (c *Core) SetNodeInfo(nodeinfo interface{}) { + c.nodeinfo.setNodeInfo(nodeinfo) +} + // Sets the output logger of the Yggdrasil node after startup. This may be // useful if you want to redirect the output later. func (c *Core) SetLogger(log *log.Logger) { diff --git a/src/yggdrasil/nodeinfo.go b/src/yggdrasil/nodeinfo.go new file mode 100644 index 0000000..f525bec --- /dev/null +++ b/src/yggdrasil/nodeinfo.go @@ -0,0 +1,177 @@ +package yggdrasil + +import ( + "encoding/json" + "errors" + "runtime" + "sync" + "time" + + "github.com/yggdrasil-network/yggdrasil-go/src/crypto" +) + +type nodeinfo struct { + core *Core + myNodeInfo nodeinfoPayload + myNodeInfoMutex sync.RWMutex + callbacks map[crypto.BoxPubKey]nodeinfoCallback + callbacksMutex sync.Mutex + cache map[crypto.BoxPubKey]nodeinfoCached + cacheMutex sync.RWMutex +} + +type nodeinfoPayload []byte + +type nodeinfoCached struct { + payload nodeinfoPayload + created time.Time +} + +type nodeinfoCallback struct { + call func(nodeinfo *nodeinfoPayload) + created time.Time +} + +// Represents a session nodeinfo packet. +type nodeinfoReqRes struct { + SendPermPub crypto.BoxPubKey // Sender's permanent key + SendCoords []byte // Sender's coords + IsResponse bool + NodeInfo nodeinfoPayload +} + +// Initialises the nodeinfo cache/callback maps, and starts a goroutine to keep +// the cache/callback maps clean of stale entries +func (m *nodeinfo) init(core *Core) { + m.core = core + m.callbacks = make(map[crypto.BoxPubKey]nodeinfoCallback) + m.cache = make(map[crypto.BoxPubKey]nodeinfoCached) + + go func() { + for { + m.callbacksMutex.Lock() + for boxPubKey, callback := range m.callbacks { + if time.Since(callback.created) > time.Minute { + delete(m.callbacks, boxPubKey) + } + } + m.callbacksMutex.Unlock() + m.cacheMutex.Lock() + for boxPubKey, cache := range m.cache { + if time.Since(cache.created) > time.Hour { + delete(m.cache, boxPubKey) + } + } + m.cacheMutex.Unlock() + time.Sleep(time.Second * 30) + } + }() +} + +// Add a callback for a nodeinfo lookup +func (m *nodeinfo) addCallback(sender crypto.BoxPubKey, call func(nodeinfo *nodeinfoPayload)) { + m.callbacksMutex.Lock() + defer m.callbacksMutex.Unlock() + m.callbacks[sender] = nodeinfoCallback{ + created: time.Now(), + call: call, + } +} + +// Handles the callback, if there is one +func (m *nodeinfo) callback(sender crypto.BoxPubKey, nodeinfo nodeinfoPayload) { + m.callbacksMutex.Lock() + defer m.callbacksMutex.Unlock() + if callback, ok := m.callbacks[sender]; ok { + callback.call(&nodeinfo) + delete(m.callbacks, sender) + } +} + +// Get the current node's nodeinfo +func (m *nodeinfo) getNodeInfo() nodeinfoPayload { + m.myNodeInfoMutex.RLock() + defer m.myNodeInfoMutex.RUnlock() + return m.myNodeInfo +} + +// Set the current node's nodeinfo +func (m *nodeinfo) setNodeInfo(given interface{}) error { + m.myNodeInfoMutex.Lock() + defer m.myNodeInfoMutex.Unlock() + newnodeinfo := map[string]interface{}{ + "buildname": GetBuildName(), + "buildversion": GetBuildVersion(), + "buildplatform": runtime.GOOS, + "buildarch": runtime.GOARCH, + } + if nodeinfomap, ok := given.(map[string]interface{}); ok { + for key, value := range nodeinfomap { + if _, ok := newnodeinfo[key]; ok { + continue + } + newnodeinfo[key] = value + } + } + if newjson, err := json.Marshal(newnodeinfo); err == nil { + if len(newjson) > 16384 { + return errors.New("NodeInfo exceeds max length of 16384 bytes") + } + m.myNodeInfo = newjson + return nil + } else { + return err + } +} + +// Add nodeinfo into the cache for a node +func (m *nodeinfo) addCachedNodeInfo(key crypto.BoxPubKey, payload nodeinfoPayload) { + m.cacheMutex.Lock() + defer m.cacheMutex.Unlock() + m.cache[key] = nodeinfoCached{ + created: time.Now(), + payload: payload, + } +} + +// Get a nodeinfo entry from the cache +func (m *nodeinfo) getCachedNodeInfo(key crypto.BoxPubKey) (nodeinfoPayload, error) { + m.cacheMutex.RLock() + defer m.cacheMutex.RUnlock() + if nodeinfo, ok := m.cache[key]; ok { + return nodeinfo.payload, nil + } + return nodeinfoPayload{}, errors.New("No cache entry found") +} + +// Handles a nodeinfo request/response - called from the router +func (m *nodeinfo) handleNodeInfo(nodeinfo *nodeinfoReqRes) { + if nodeinfo.IsResponse { + m.callback(nodeinfo.SendPermPub, nodeinfo.NodeInfo) + m.addCachedNodeInfo(nodeinfo.SendPermPub, nodeinfo.NodeInfo) + } else { + m.sendNodeInfo(nodeinfo.SendPermPub, nodeinfo.SendCoords, true) + } +} + +// Send nodeinfo request or response - called from the router +func (m *nodeinfo) sendNodeInfo(key crypto.BoxPubKey, coords []byte, isResponse bool) { + table := m.core.switchTable.table.Load().(lookupTable) + nodeinfo := nodeinfoReqRes{ + SendCoords: table.self.getCoords(), + IsResponse: isResponse, + NodeInfo: m.core.nodeinfo.getNodeInfo(), + } + bs := nodeinfo.encode() + shared := m.core.sessions.getSharedKey(&m.core.boxPriv, &key) + payload, nonce := crypto.BoxSeal(shared, bs, nil) + p := wire_protoTrafficPacket{ + Coords: coords, + ToKey: key, + FromKey: m.core.boxPub, + Nonce: *nonce, + Payload: payload, + } + packet := p.encode() + m.core.router.out(packet) +} diff --git a/src/yggdrasil/peer.go b/src/yggdrasil/peer.go index 2fb505d..a2b94b6 100644 --- a/src/yggdrasil/peer.go +++ b/src/yggdrasil/peer.go @@ -82,20 +82,19 @@ type peer struct { bytesSent uint64 // To track bandwidth usage for getPeers bytesRecvd uint64 // To track bandwidth usage for getPeers // BUG: sync/atomic, 32 bit platforms need the above to be the first element - core *Core - port switchPort - box crypto.BoxPubKey - sig crypto.SigPubKey - shared crypto.BoxSharedKey - linkShared crypto.BoxSharedKey - endpoint string - friendlyName string - firstSeen time.Time // To track uptime for getPeers - linkOut (chan []byte) // used for protocol traffic (to bypass queues) - doSend (chan struct{}) // tell the linkLoop to send a switchMsg - dinfo *dhtInfo // used to keep the DHT working - out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes - close func() // Called when a peer is removed, to close the underlying connection, or via admin api + core *Core + port switchPort + box crypto.BoxPubKey + sig crypto.SigPubKey + shared crypto.BoxSharedKey + linkShared crypto.BoxSharedKey + endpoint string + firstSeen time.Time // To track uptime for getPeers + linkOut (chan []byte) // used for protocol traffic (to bypass queues) + doSend (chan struct{}) // tell the linkLoop to send a switchMsg + dinfo (chan *dhtInfo) // used to keep the DHT working + out func([]byte) // Set up by whatever created the peers struct, used to send packets to other nodes + close func() // Called when a peer is removed, to close the underlying connection, or via admin api } // Creates a new peer with the specified box, sig, and linkShared keys, using the lowest unocupied port number. @@ -108,6 +107,7 @@ func (ps *peers) newPeer(box *crypto.BoxPubKey, sig *crypto.SigPubKey, linkShare endpoint: endpoint, firstSeen: now, doSend: make(chan struct{}, 1), + dinfo: make(chan *dhtInfo, 1), core: ps.core} ps.mutex.Lock() defer ps.mutex.Unlock() @@ -180,6 +180,8 @@ func (p *peer) doSendSwitchMsgs() { func (p *peer) linkLoop() { tick := time.NewTicker(time.Second) defer tick.Stop() + p.doSendSwitchMsgs() + var dinfo *dhtInfo for { select { case _, ok := <-p.doSend: @@ -187,12 +189,10 @@ func (p *peer) linkLoop() { return } p.sendSwitchMsg() + case dinfo = <-p.dinfo: case _ = <-tick.C: - //break // FIXME disabled the below completely to test something - pdinfo := p.dinfo // FIXME this is a bad workarond NPE on the next line - if pdinfo != nil { - dinfo := *pdinfo - p.core.dht.peers <- &dinfo + if dinfo != nil { + p.core.dht.peers <- dinfo } } } @@ -222,8 +222,9 @@ func (p *peer) handlePacket(packet []byte) { // Called to handle traffic or protocolTraffic packets. // In either case, this reads from the coords of the packet header, does a switch lookup, and forwards to the next node. func (p *peer) handleTraffic(packet []byte, pTypeLen int) { - if p.port != 0 && p.dinfo == nil { - // Drop traffic until the peer manages to send us at least one good switchMsg + table := p.core.switchTable.getTable() + if _, isIn := table.elems[p.port]; !isIn && p.port != 0 { + // Drop traffic if the peer isn't in the switch return } p.core.switchTable.packetIn <- packet @@ -327,9 +328,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { p.core.switchTable.handleMsg(&msg, p.port) if !p.core.switchTable.checkRoot(&msg) { // Bad switch message - // Stop forwarding traffic from it - // Stop refreshing it in the DHT - p.dinfo = nil + p.dinfo <- nil return } // Pass a mesage to the dht informing it that this peer (still) exists @@ -338,8 +337,7 @@ func (p *peer) handleSwitchMsg(packet []byte) { key: p.box, coords: loc.getCoords(), } - //p.core.dht.peers <- &dinfo - p.dinfo = &dinfo + p.dinfo <- &dinfo } // This generates the bytes that we sign or check the signature of for a switchMsg. diff --git a/src/yggdrasil/router.go b/src/yggdrasil/router.go index 91972b3..6c92869 100644 --- a/src/yggdrasil/router.go +++ b/src/yggdrasil/router.go @@ -413,6 +413,10 @@ func (r *router) handleProto(packet []byte) { r.handlePing(bs, &p.FromKey) case wire_SessionPong: r.handlePong(bs, &p.FromKey) + case wire_NodeInfoRequest: + fallthrough + case wire_NodeInfoResponse: + r.handleNodeInfo(bs, &p.FromKey) case wire_DHTLookupRequest: r.handleDHTReq(bs, &p.FromKey) case wire_DHTLookupResponse: @@ -457,6 +461,16 @@ func (r *router) handleDHTRes(bs []byte, fromKey *crypto.BoxPubKey) { r.core.dht.handleRes(&res) } +// Decodes nodeinfo request +func (r *router) handleNodeInfo(bs []byte, fromKey *crypto.BoxPubKey) { + req := nodeinfoReqRes{} + if !req.decode(bs) { + return + } + req.SendPermPub = *fromKey + r.core.nodeinfo.handleNodeInfo(&req) +} + // Passed a function to call. // This will send the function to r.admin and block until it finishes. // It's used by the admin socket to ask the router mainLoop goroutine about information in the session or dht structs, which cannot be read safely from outside that goroutine. diff --git a/src/yggdrasil/tcp.go b/src/yggdrasil/tcp.go index 045932a..873dd63 100644 --- a/src/yggdrasil/tcp.go +++ b/src/yggdrasil/tcp.go @@ -35,14 +35,6 @@ const tcp_msgSize = 2048 + 65535 // TODO figure out what makes sense const default_tcp_timeout = 6 * time.Second const tcp_ping_interval = (default_tcp_timeout * 2 / 3) -// Wrapper function for non tcp/ip connections. -func setNoDelay(c net.Conn, delay bool) { - tcp, ok := c.(*net.TCPConn) - if ok { - tcp.SetNoDelay(delay) - } -} - // The TCP listener and information about active TCP connections, to avoid duplication. type tcpInterface struct { core *Core @@ -62,6 +54,18 @@ type tcpInfo struct { remoteAddr string } +// Wrapper function to set additional options for specific connection types. +func (iface *tcpInterface) setExtraOptions(c net.Conn) { + switch sock := c.(type) { + case *net.TCPConn: + sock.SetNoDelay(true) + sock.SetKeepAlive(true) + sock.SetKeepAlivePeriod(iface.tcp_timeout) + // TODO something for socks5 + default: + } +} + // Returns the address of the listener. func (iface *tcpInterface) getAddr() *net.TCPAddr { return iface.serv.Addr().(*net.TCPAddr) @@ -209,6 +213,7 @@ func (iface *tcpInterface) call(saddr string, socksaddr *string, sintf string) { // It defers a bunch of cleanup stuff to tear down all of these things when the reader exists (e.g. due to a closed connection or a timeout). func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { defer sock.Close() + iface.setExtraOptions(sock) // Get our keys myLinkPub, myLinkPriv := crypto.NewBoxKeys() // ephemeral link keys meta := version_getBaseMetadata() @@ -346,7 +351,6 @@ func (iface *tcpInterface) handler(sock net.Conn, incoming bool) { out <- msg } p.close = func() { sock.Close() } - setNoDelay(sock, true) go p.linkLoop() defer func() { // Put all of our cleanup here... diff --git a/src/yggdrasil/wire.go b/src/yggdrasil/wire.go index 08e8a62..782af43 100644 --- a/src/yggdrasil/wire.go +++ b/src/yggdrasil/wire.go @@ -21,6 +21,8 @@ const ( wire_SessionPong // inside protocol traffic header wire_DHTLookupRequest // inside protocol traffic header wire_DHTLookupResponse // inside protocol traffic header + wire_NodeInfoRequest // inside protocol traffic header + wire_NodeInfoResponse // inside protocol traffic header ) // Calls wire_put_uint64 on a nil slice. @@ -358,6 +360,47 @@ func (p *sessionPing) decode(bs []byte) bool { //////////////////////////////////////////////////////////////////////////////// +// Encodes a nodeinfoReqRes into its wire format. +func (p *nodeinfoReqRes) encode() []byte { + var pTypeVal uint64 + if p.IsResponse { + pTypeVal = wire_NodeInfoResponse + } else { + pTypeVal = wire_NodeInfoRequest + } + bs := wire_encode_uint64(pTypeVal) + bs = wire_put_coords(p.SendCoords, bs) + if pTypeVal == wire_NodeInfoResponse { + bs = append(bs, p.NodeInfo...) + } + return bs +} + +// Decodes an encoded nodeinfoReqRes into the struct, returning true if successful. +func (p *nodeinfoReqRes) decode(bs []byte) bool { + var pType uint64 + switch { + case !wire_chop_uint64(&pType, &bs): + return false + case pType != wire_NodeInfoRequest && pType != wire_NodeInfoResponse: + return false + case !wire_chop_coords(&p.SendCoords, &bs): + return false + } + if p.IsResponse = pType == wire_NodeInfoResponse; p.IsResponse { + if len(bs) == 0 { + return false + } + p.NodeInfo = make(nodeinfoPayload, len(bs)) + if !wire_chop_slice(p.NodeInfo[:], &bs) { + return false + } + } + return true +} + +//////////////////////////////////////////////////////////////////////////////// + // Encodes a dhtReq into its wire format. func (r *dhtReq) encode() []byte { coords := wire_encode_coords(r.Coords)