5
0
mirror of https://github.com/cwinfo/yggdrasil-go.git synced 2024-11-23 02:01:36 +00:00

Reimplement getNodeInfo, dhtPing, get/add/removeAllowedEncryptionPublicKey, add/removePeer

This commit is contained in:
Neil Alexander 2019-05-20 19:51:44 +01:00
parent e9e2d7bc6f
commit 5b8d8a9341
No known key found for this signature in database
GPG Key ID: A02A2019A2BB0944
2 changed files with 293 additions and 236 deletions

View File

@ -1,12 +1,14 @@
package admin package admin
import ( import (
"encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"net" "net"
"net/url" "net/url"
"os" "os"
"strconv"
"strings" "strings"
"time" "time"
@ -166,132 +168,131 @@ func (a *AdminSocket) Init(c *yggdrasil.Core, state *config.NodeState, log *log.
} }
return Info{"sessions": sessions}, nil return Info{"sessions": sessions}, nil
}) })
/* a.AddHandler("addPeer", []string{"uri", "[interface]"}, func(in Info) (Info, error) {
a.AddHandler("addPeer", []string{"uri", "[interface]"}, func(in Info) (Info, error) { // Set sane defaults
// Set sane defaults intf := ""
intf := "" // Has interface been specified?
// Has interface been specified? if itf, ok := in["interface"]; ok {
if itf, ok := in["interface"]; ok { intf = itf.(string)
intf = itf.(string) }
} if a.core.AddPeer(in["uri"].(string), intf) == nil {
if a.addPeer(in["uri"].(string), intf) == nil { return Info{
return Info{ "added": []string{
"added": []string{ in["uri"].(string),
in["uri"].(string), },
}, }, nil
}, nil } else {
} else { return Info{
return Info{ "not_added": []string{
"not_added": []string{ in["uri"].(string),
in["uri"].(string), },
}, }, errors.New("Failed to add peer")
}, errors.New("Failed to add peer") }
} })
}) a.AddHandler("removePeer", []string{"port"}, func(in Info) (Info, error) {
a.AddHandler("removePeer", []string{"port"}, func(in Info) (Info, error) { port, err := strconv.ParseInt(fmt.Sprint(in["port"]), 10, 64)
if a.removePeer(fmt.Sprint(in["port"])) == nil { if err != nil {
return Info{ return Info{}, err
"removed": []string{ }
fmt.Sprint(in["port"]), if a.core.DisconnectPeer(uint64(port)) == nil {
}, return Info{
}, nil "removed": []string{
} else { fmt.Sprint(port),
return Info{ },
"not_removed": []string{ }, nil
fmt.Sprint(in["port"]), } else {
}, return Info{
}, errors.New("Failed to remove peer") "not_removed": []string{
} fmt.Sprint(port),
}) },
a.AddHandler("getAllowedEncryptionPublicKeys", []string{}, func(in Info) (Info, error) { }, errors.New("Failed to remove peer")
return Info{"allowed_box_pubs": a.getAllowedEncryptionPublicKeys()}, nil }
}) })
a.AddHandler("addAllowedEncryptionPublicKey", []string{"box_pub_key"}, func(in Info) (Info, error) { a.AddHandler("getAllowedEncryptionPublicKeys", []string{}, func(in Info) (Info, error) {
if a.addAllowedEncryptionPublicKey(in["box_pub_key"].(string)) == nil { return Info{"allowed_box_pubs": a.core.GetAllowedEncryptionPublicKeys()}, nil
return Info{ })
"added": []string{ a.AddHandler("addAllowedEncryptionPublicKey", []string{"box_pub_key"}, func(in Info) (Info, error) {
in["box_pub_key"].(string), if a.core.AddAllowedEncryptionPublicKey(in["box_pub_key"].(string)) == nil {
}, return Info{
}, nil "added": []string{
} else { in["box_pub_key"].(string),
return Info{ },
"not_added": []string{ }, nil
in["box_pub_key"].(string), } else {
}, return Info{
}, errors.New("Failed to add allowed key") "not_added": []string{
} in["box_pub_key"].(string),
}) },
a.AddHandler("removeAllowedEncryptionPublicKey", []string{"box_pub_key"}, func(in Info) (Info, error) { }, errors.New("Failed to add allowed key")
if a.removeAllowedEncryptionPublicKey(in["box_pub_key"].(string)) == nil { }
return Info{ })
"removed": []string{ a.AddHandler("removeAllowedEncryptionPublicKey", []string{"box_pub_key"}, func(in Info) (Info, error) {
in["box_pub_key"].(string), if a.core.RemoveAllowedEncryptionPublicKey(in["box_pub_key"].(string)) == nil {
}, return Info{
}, nil "removed": []string{
} else { in["box_pub_key"].(string),
return Info{ },
"not_removed": []string{ }, nil
in["box_pub_key"].(string), } else {
}, return Info{
}, errors.New("Failed to remove allowed key") "not_removed": []string{
} in["box_pub_key"].(string),
}) },
a.AddHandler("dhtPing", []string{"box_pub_key", "coords", "[target]"}, func(in Info) (Info, error) { }, errors.New("Failed to remove allowed key")
if in["target"] == nil { }
in["target"] = "none" })
} a.AddHandler("dhtPing", []string{"box_pub_key", "coords", "[target]"}, func(in Info) (Info, error) {
result, err := a.admin_dhtPing(in["box_pub_key"].(string), in["coords"].(string), in["target"].(string)) if in["target"] == nil {
if err == nil { in["target"] = "none"
infos := make(map[string]map[string]string, len(result.Infos)) }
for _, dinfo := range result.Infos { result, err := a.core.DHTPing(in["box_pub_key"].(string), in["coords"].(string), in["target"].(string))
info := map[string]string{ if err == nil {
"box_pub_key": hex.EncodeToString(dinfo.key[:]), infos := make(map[string]map[string]string, len(result.Infos))
"coords": fmt.Sprintf("%v", dinfo.coords), for _, dinfo := range result.Infos {
} info := map[string]string{
addr := net.IP(address.AddrForNodeID(crypto.GetNodeID(&dinfo.key))[:]).String() "box_pub_key": hex.EncodeToString(dinfo.PublicKey[:]),
infos[addr] = info "coords": fmt.Sprintf("%v", dinfo.Coords),
} }
return Info{"nodes": infos}, nil addr := net.IP(address.AddrForNodeID(crypto.GetNodeID(&dinfo.PublicKey))[:]).String()
infos[addr] = info
}
return Info{"nodes": infos}, nil
} else {
return Info{}, err
}
})
a.AddHandler("getNodeInfo", []string{"[box_pub_key]", "[coords]", "[nocache]"}, func(in Info) (Info, error) {
var nocache bool
if in["nocache"] != nil {
nocache = in["nocache"].(string) == "true"
}
var box_pub_key, coords string
if in["box_pub_key"] == nil && in["coords"] == nil {
nodeinfo := a.core.MyNodeInfo()
var jsoninfo interface{}
if err := json.Unmarshal(nodeinfo, &jsoninfo); err != nil {
return Info{}, err
} else {
return Info{"nodeinfo": jsoninfo}, nil
}
} else if in["box_pub_key"] == nil || in["coords"] == nil {
return Info{}, errors.New("Expecting both box_pub_key and coords")
} else {
box_pub_key = in["box_pub_key"].(string)
coords = in["coords"].(string)
}
result, err := a.core.GetNodeInfo(box_pub_key, coords, nocache)
if err == nil {
var m map[string]interface{}
if err = json.Unmarshal(result, &m); err == nil {
return Info{"nodeinfo": m}, nil
} else { } else {
return Info{}, err return Info{}, err
} }
}) } else {
a.AddHandler("getNodeInfo", []string{"[box_pub_key]", "[coords]", "[nocache]"}, func(in Info) (Info, error) { return Info{}, err
var nocache bool }
if in["nocache"] != nil { })
nocache = in["nocache"].(string) == "true"
}
var box_pub_key, coords string
if in["box_pub_key"] == nil && in["coords"] == nil {
var nodeinfo []byte
a.core.router.doAdmin(func() {
nodeinfo = []byte(a.core.router.nodeinfo.getNodeInfo())
})
var jsoninfo interface{}
if err := json.Unmarshal(nodeinfo, &jsoninfo); err != nil {
return Info{}, err
} else {
return Info{"nodeinfo": jsoninfo}, nil
}
} else if in["box_pub_key"] == nil || in["coords"] == nil {
return Info{}, errors.New("Expecting both box_pub_key and coords")
} else {
box_pub_key = in["box_pub_key"].(string)
coords = in["coords"].(string)
}
result, err := a.admin_getNodeInfo(box_pub_key, coords, nocache)
if err == nil {
var m map[string]interface{}
if err = json.Unmarshal(result, &m); err == nil {
return Info{"nodeinfo": m}, nil
} else {
return Info{}, err
}
} else {
return Info{}, err
}
})
*/
} }
// start runs the admin API socket to listen for / respond to admin API calls. // start runs the admin API socket to listen for / respond to admin API calls.
@ -458,112 +459,6 @@ func (a *AdminSocket) handleRequest(conn net.Conn) {
} }
} }
/*
// Send a DHT ping to the node with the provided key and coords, optionally looking up the specified target NodeID.
func (a *AdminSocket) admin_dhtPing(keyString, coordString, targetString string) (dhtRes, error) {
var key crypto.BoxPubKey
if keyBytes, err := hex.DecodeString(keyString); err != nil {
return dhtRes{}, err
} else {
copy(key[:], keyBytes)
}
var coords []byte
for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") {
if cstr == "" {
// Special case, happens if trimmed is the empty string, e.g. this is the root
continue
}
if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil {
return dhtRes{}, err
} else {
coords = append(coords, uint8(u64))
}
}
resCh := make(chan *dhtRes, 1)
info := dhtInfo{
key: key,
coords: coords,
}
target := *info.getNodeID()
if targetString == "none" {
// Leave the default target in place
} else if targetBytes, err := hex.DecodeString(targetString); err != nil {
return dhtRes{}, err
} else if len(targetBytes) != len(target) {
return dhtRes{}, errors.New("Incorrect target NodeID length")
} else {
var target crypto.NodeID
copy(target[:], targetBytes)
}
rq := dhtReqKey{info.key, target}
sendPing := func() {
a.core.dht.addCallback(&rq, func(res *dhtRes) {
defer func() { recover() }()
select {
case resCh <- res:
default:
}
})
a.core.dht.ping(&info, &target)
}
a.core.router.doAdmin(sendPing)
go func() {
time.Sleep(6 * time.Second)
close(resCh)
}()
for res := range resCh {
return *res, nil
}
return dhtRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString))
}
func (a *AdminSocket) admin_getNodeInfo(keyString, coordString string, nocache bool) (nodeinfoPayload, error) {
var key crypto.BoxPubKey
if keyBytes, err := hex.DecodeString(keyString); err != nil {
return nodeinfoPayload{}, err
} else {
copy(key[:], keyBytes)
}
if !nocache {
if response, err := a.core.router.nodeinfo.getCachedNodeInfo(key); err == nil {
return response, nil
}
}
var coords []byte
for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") {
if cstr == "" {
// Special case, happens if trimmed is the empty string, e.g. this is the root
continue
}
if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil {
return nodeinfoPayload{}, err
} else {
coords = append(coords, uint8(u64))
}
}
response := make(chan *nodeinfoPayload, 1)
sendNodeInfoRequest := func() {
a.core.router.nodeinfo.addCallback(key, func(nodeinfo *nodeinfoPayload) {
defer func() { recover() }()
select {
case response <- nodeinfo:
default:
}
})
a.core.router.nodeinfo.sendNodeInfo(key, coords, false)
}
a.core.router.doAdmin(sendNodeInfoRequest)
go func() {
time.Sleep(6 * time.Second)
close(response)
}()
for res := range response {
return *res, nil
}
return nodeinfoPayload{}, errors.New(fmt.Sprintf("getNodeInfo timeout: %s", keyString))
}
*/
// getResponse_dot returns a response for a graphviz dot formatted // getResponse_dot returns a response for a graphviz dot formatted
// representation of the known parts of the network. This is color-coded and // representation of the known parts of the network. This is color-coded and
// labeled, and includes the self node, switch peers, nodes known to the DHT, // labeled, and includes the self node, switch peers, nodes known to the DHT,

View File

@ -3,8 +3,11 @@ package yggdrasil
import ( import (
"encoding/hex" "encoding/hex"
"errors" "errors"
"fmt"
"net" "net"
"sort" "sort"
"strconv"
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
@ -47,6 +50,17 @@ type DHTEntry struct {
LastSeen time.Duration LastSeen time.Duration
} }
// DHTRes represents a DHT response, as returned by DHTPing.
type DHTRes struct {
PublicKey crypto.BoxPubKey // key of the sender
Coords []byte // coords of the sender
Dest crypto.NodeID // the destination node ID
Infos []DHTEntry // response
}
// NodeInfoPayload represents a RequestNodeInfo response, in bytes.
type NodeInfoPayload nodeinfoPayload
// SwitchQueues represents information from the switch related to link // SwitchQueues represents information from the switch related to link
// congestion and a list of switch queues created in response to congestion on a // congestion and a list of switch queues created in response to congestion on a
// given link. // given link.
@ -123,7 +137,7 @@ func (c *Core) GetSwitchPeers() []SwitchPeer {
} }
coords := elem.locator.getCoords() coords := elem.locator.getCoords()
info := SwitchPeer{ info := SwitchPeer{
Coords: coords, Coords: append([]byte{}, coords...),
BytesSent: atomic.LoadUint64(&peer.bytesSent), BytesSent: atomic.LoadUint64(&peer.bytesSent),
BytesRecvd: atomic.LoadUint64(&peer.bytesRecvd), BytesRecvd: atomic.LoadUint64(&peer.bytesRecvd),
Port: uint64(elem.port), Port: uint64(elem.port),
@ -151,7 +165,7 @@ func (c *Core) GetDHT() []DHTEntry {
}) })
for _, v := range dhtentry { for _, v := range dhtentry {
info := DHTEntry{ info := DHTEntry{
Coords: v.coords, Coords: append([]byte{}, v.coords...),
LastSeen: now.Sub(v.recv), LastSeen: now.Sub(v.recv),
} }
copy(info.PublicKey[:], v.key[:]) copy(info.PublicKey[:], v.key[:])
@ -198,7 +212,7 @@ func (c *Core) GetSessions() []Session {
for _, sinfo := range c.sessions.sinfos { for _, sinfo := range c.sessions.sinfos {
// TODO? skipped known but timed out sessions? // TODO? skipped known but timed out sessions?
session := Session{ session := Session{
Coords: sinfo.coords, Coords: append([]byte{}, sinfo.coords...),
MTU: sinfo.getMTU(), MTU: sinfo.getMTU(),
BytesSent: sinfo.bytesSent, BytesSent: sinfo.bytesSent,
BytesRecvd: sinfo.bytesRecvd, BytesRecvd: sinfo.bytesRecvd,
@ -319,7 +333,7 @@ func (c *Core) RouterAddresses() (address.Address, address.Subnet) {
} }
// NodeInfo gets the currently configured nodeinfo. // NodeInfo gets the currently configured nodeinfo.
func (c *Core) NodeInfo() nodeinfoPayload { func (c *Core) MyNodeInfo() nodeinfoPayload {
return c.router.nodeinfo.getNodeInfo() return c.router.nodeinfo.getNodeInfo()
} }
@ -329,6 +343,56 @@ func (c *Core) SetNodeInfo(nodeinfo interface{}, nodeinfoprivacy bool) {
c.router.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy) c.router.nodeinfo.setNodeInfo(nodeinfo, nodeinfoprivacy)
} }
// GetNodeInfo requests nodeinfo from a remote node, as specified by the public
// key and coordinates specified. The third parameter specifies whether a cached
// result is acceptable - this results in less traffic being generated than is
// necessary when, e.g. crawling the network.
func (c *Core) GetNodeInfo(keyString, coordString string, nocache bool) (NodeInfoPayload, error) {
var key crypto.BoxPubKey
if keyBytes, err := hex.DecodeString(keyString); err != nil {
return NodeInfoPayload{}, err
} else {
copy(key[:], keyBytes)
}
if !nocache {
if response, err := c.router.nodeinfo.getCachedNodeInfo(key); err == nil {
return NodeInfoPayload(response), nil
}
}
var coords []byte
for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") {
if cstr == "" {
// Special case, happens if trimmed is the empty string, e.g. this is the root
continue
}
if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil {
return NodeInfoPayload{}, err
} else {
coords = append(coords, uint8(u64))
}
}
response := make(chan *nodeinfoPayload, 1)
sendNodeInfoRequest := func() {
c.router.nodeinfo.addCallback(key, func(nodeinfo *nodeinfoPayload) {
defer func() { recover() }()
select {
case response <- nodeinfo:
default:
}
})
c.router.nodeinfo.sendNodeInfo(key, coords, false)
}
c.router.doAdmin(sendNodeInfoRequest)
go func() {
time.Sleep(6 * time.Second)
close(response)
}()
for res := range response {
return NodeInfoPayload(*res), nil
}
return NodeInfoPayload{}, errors.New(fmt.Sprintf("getNodeInfo timeout: %s", keyString))
}
// SetLogger sets the output logger of the Yggdrasil node after startup. This // SetLogger sets the output logger of the Yggdrasil node after startup. This
// may be useful if you want to redirect the output later. // may be useful if you want to redirect the output later.
func (c *Core) SetLogger(log *log.Logger) { func (c *Core) SetLogger(log *log.Logger) {
@ -354,6 +418,14 @@ func (c *Core) AddPeer(addr string, sintf string) error {
return nil return nil
} }
// RemovePeer is not implemented yet.
func (c *Core) RemovePeer(addr string, sintf string) error {
// TODO: Implement a reverse of AddPeer, where we look up the port number
// based on the addr and sintf, disconnect it and then remove it from the
// peers list so we don't reconnect to it later
return errors.New("not implemented")
}
// CallPeer calls a peer once. This should be specified in the peer URI format, // CallPeer calls a peer once. This should be specified in the peer URI format,
// e.g.: // e.g.:
// tcp://a.b.c.d:e // tcp://a.b.c.d:e
@ -364,9 +436,99 @@ func (c *Core) CallPeer(addr string, sintf string) error {
return c.link.call(addr, sintf) return c.link.call(addr, sintf)
} }
// AddAllowedEncryptionPublicKey adds an allowed public key. This allow peerings // DisconnectPeer disconnects a peer once. This should be specified as a port
// to be restricted only to keys that you have selected. // number.
func (c *Core) AddAllowedEncryptionPublicKey(boxStr string) error { func (c *Core) DisconnectPeer(port uint64) error {
//return c.admin.addAllowedEncryptionPublicKey(boxStr) c.peers.removePeer(switchPort(port))
return nil return nil
} }
// GetAllowedEncryptionPublicKeys returns the public keys permitted for incoming
// peer connections.
func (c *Core) GetAllowedEncryptionPublicKeys() []string {
return c.peers.getAllowedEncryptionPublicKeys()
}
// AddAllowedEncryptionPublicKey whitelists a key for incoming peer connections.
func (c *Core) AddAllowedEncryptionPublicKey(bstr string) (err error) {
c.peers.addAllowedEncryptionPublicKey(bstr)
return nil
}
// RemoveAllowedEncryptionPublicKey removes a key from the whitelist for
// incoming peer connections. If none are set, an empty list permits all
// incoming connections.
func (c *Core) RemoveAllowedEncryptionPublicKey(bstr string) (err error) {
c.peers.removeAllowedEncryptionPublicKey(bstr)
return nil
}
// Send a DHT ping to the node with the provided key and coords, optionally looking up the specified target NodeID.
func (c *Core) DHTPing(keyString, coordString, targetString string) (DHTRes, error) {
var key crypto.BoxPubKey
if keyBytes, err := hex.DecodeString(keyString); err != nil {
return DHTRes{}, err
} else {
copy(key[:], keyBytes)
}
var coords []byte
for _, cstr := range strings.Split(strings.Trim(coordString, "[]"), " ") {
if cstr == "" {
// Special case, happens if trimmed is the empty string, e.g. this is the root
continue
}
if u64, err := strconv.ParseUint(cstr, 10, 8); err != nil {
return DHTRes{}, err
} else {
coords = append(coords, uint8(u64))
}
}
resCh := make(chan *dhtRes, 1)
info := dhtInfo{
key: key,
coords: coords,
}
target := *info.getNodeID()
if targetString == "none" {
// Leave the default target in place
} else if targetBytes, err := hex.DecodeString(targetString); err != nil {
return DHTRes{}, err
} else if len(targetBytes) != len(target) {
return DHTRes{}, errors.New("Incorrect target NodeID length")
} else {
var target crypto.NodeID
copy(target[:], targetBytes)
}
rq := dhtReqKey{info.key, target}
sendPing := func() {
c.dht.addCallback(&rq, func(res *dhtRes) {
defer func() { recover() }()
select {
case resCh <- res:
default:
}
})
c.dht.ping(&info, &target)
}
c.router.doAdmin(sendPing)
go func() {
time.Sleep(6 * time.Second)
close(resCh)
}()
// TODO: do something better than the below...
for res := range resCh {
r := DHTRes{
Coords: append([]byte{}, res.Coords...),
}
copy(r.PublicKey[:], res.Key[:])
for _, i := range res.Infos {
e := DHTEntry{
Coords: append([]byte{}, i.coords...),
}
copy(e.PublicKey[:], i.key[:])
r.Infos = append(r.Infos, e)
}
return r, nil
}
return DHTRes{}, errors.New(fmt.Sprintf("DHT ping timeout: %s", keyString))
}