diff --git a/scripts/crawl-dht.py b/scripts/crawl-dht.py deleted file mode 100644 index b5e9306..0000000 --- a/scripts/crawl-dht.py +++ /dev/null @@ -1,75 +0,0 @@ -import json -import socket -import sys -import time - -#gives the option to get data from an external server instead and send that -#if no options given it will default to localhost instead -if len(sys.argv) == 3: - host_port = (sys.argv[1], int(sys.argv[2])) -else: - host_port = ('localhost', 9001) - -def getDHTPingRequest(key, coords, target=None): - if target: - return '{{"keepalive":true, "request":"dhtPing", "box_pub_key":"{}", "coords":"{}", "target":"{}"}}'.format(key, coords, target) - else: - return '{{"keepalive":true, "request":"dhtPing", "box_pub_key":"{}", "coords":"{}"}}'.format(key, coords) - -def doRequest(req): - try: - ygg = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - ygg.connect(host_port) - ygg.send(req) - data = json.loads(ygg.recv(1024*15)) - return data - except: - return None - -visited = dict() # Add nodes after a successful lookup response -rumored = dict() # Add rumors about nodes to ping -timedout = dict() -def handleResponse(address, info, data): - global visited - global rumored - global timedout - timedout[str(address)] = {'box_pub_key':str(info['box_pub_key']), 'coords':str(info['coords'])} - if not data: return - if 'response' not in data: return - if 'nodes' not in data['response']: return - for addr,rumor in data['response']['nodes'].iteritems(): - if addr in visited: continue - rumored[addr] = rumor - if address not in visited: - # TODO? remove this, it's debug output that happens to be in the same format as yakamo's "current" json file - now = time.time() - visited[str(address)] = {'box_pub_key':str(info['box_pub_key']), 'coords':str(info['coords']), 'time':now} - if address in timedout: del timedout[address] - if len(visited) > 1: sys.stdout.write(",\n") - sys.stdout.write('"{}": ["{}", {}]'.format(address, info['coords'], int(now))) - sys.stdout.flush() -# End handleResponse - -# Get self info -selfInfo = doRequest('{"keepalive":true, "request":"getSelf"}') - -# Initialize dicts of visited/rumored nodes -for k,v in selfInfo['response']['self'].iteritems(): rumored[k] = v - -# Loop over rumored nodes and ping them, adding to visited if they respond -print '{"yggnodes": {' -while len(rumored) > 0: - for k,v in rumored.iteritems(): - handleResponse(k, v, doRequest(getDHTPingRequest(v['box_pub_key'], v['coords']))) - # These next two are imperfect workarounds to deal with old kad nodes - handleResponse(k, v, doRequest(getDHTPingRequest(v['box_pub_key'], v['coords'], '0'*128))) - handleResponse(k, v, doRequest(getDHTPingRequest(v['box_pub_key'], v['coords'], 'f'*128))) - break - del rumored[k] -print '\n}}' -#End - -# TODO do something with the results - -#print visited -#print timedout diff --git a/scripts/crawl.py b/scripts/crawl.py new file mode 100644 index 0000000..0800af7 --- /dev/null +++ b/scripts/crawl.py @@ -0,0 +1,106 @@ +import json +import socket +import sys +import time + +#gives the option to get data from an external server instead and send that +#if no options given it will default to localhost instead +if len(sys.argv) == 3: + socktype = socket.AF_INET + sockaddr = (sys.argv[1], int(sys.argv[2])) +elif len(sys.argv) == 2: + socktype = socket.AF_UNIX + sockaddr = sys.argv[1] +else: + socktype = socket.AF_UNIX + sockaddr = "/var/run/yggdrasil.sock" + +def getNodeInfoRequest(key): + return '{{"keepalive":true, "request":"getNodeInfo", "key":"{}"}}'.format(key) + +def getSelfRequest(key): + return '{{"keepalive":true, "request":"debug_remoteGetSelf", "key":"{}"}}'.format(key) + +def getPeersRequest(key): + return '{{"keepalive":true, "request":"debug_remoteGetPeers", "key":"{}"}}'.format(key) + +def getDHTRequest(key): + return '{{"keepalive":true, "request":"debug_remoteGetDHT", "key":"{}"}}'.format(key) + +def doRequest(req): + try: + ygg = socket.socket(socktype, socket.SOCK_STREAM) + ygg.connect(sockaddr) + ygg.send(req) + data = json.loads(ygg.recv(1024*15)) + return data + except: + return None + +visited = set() # Add nodes after a successful lookup response +rumored = set() # Add rumors about nodes to ping +timedout = set() +def handleNodeInfoResponse(publicKey, data): + global visited + global rumored + global timedout + if publicKey in visited: return + if not data: return + if 'response' not in data: return + out = dict() + for addr,v in data['response'].iteritems(): + out['address'] = addr + out['nodeinfo'] = v + selfInfo = doRequest(getSelfRequest(publicKey)) + if 'response' in selfInfo: + for _,v in selfInfo['response'].iteritems(): + if 'coords' in v: + out['coords'] = v['coords'] + peerInfo = doRequest(getPeersRequest(publicKey)) + if 'response' in peerInfo: + for _,v in peerInfo['response'].iteritems(): + if 'keys' not in v: continue + peers = v['keys'] + for key in peers: + if key in visited: continue + if key in timedout: continue + rumored.add(key) + out['peers'] = peers + dhtInfo = doRequest(getDHTRequest(publicKey)) + if 'response' in dhtInfo: + for _,v in dhtInfo['response'].iteritems(): + if 'keys' in v: + dht = v['keys'] + for key in dht: + if key in visited: continue + if key in timedout: continue + rumored.add(key) + out['dht'] = dht + out['time'] = time.time() + if len(visited) > 0: sys.stdout.write(",\n") + sys.stdout.write('"{}": {}'.format(publicKey, json.dumps(out))) + sys.stdout.flush() + visited.add(publicKey) +# End handleResponse + +# Get self info +selfInfo = doRequest('{"keepalive":true, "request":"getSelf"}') +for k,v in selfInfo['response']['self'].iteritems(): rumored.add(v['key']) + +# Initialize dicts of visited/rumored nodes +#for k,v in selfInfo['response']['self'].iteritems(): rumored[k] = v + +# Loop over rumored nodes and ping them, adding to visited if they respond +print '{"yggnodes": {' +while len(rumored) > 0: + for k in rumored: + handleNodeInfoResponse(k, doRequest(getNodeInfoRequest(k))) + break + rumored.remove(k) +print '\n}}' +#End + +# TODO do something with the results + +#print visited +#print timedout diff --git a/scripts/crawler.go b/scripts/crawler.go new file mode 100644 index 0000000..d66cc9e --- /dev/null +++ b/scripts/crawler.go @@ -0,0 +1,191 @@ +package main + +import ( + "encoding/json" + "fmt" + "net" + "sync" + "time" +) + +var waitgroup sync.WaitGroup +var visited sync.Map +var rumored sync.Map + +const MAX_RETRY = 3 + +func dial() (net.Conn, error) { + return net.DialTimeout("unix", "/var/run/yggdrasil.sock", time.Second) +} + +func getRequest(key, request string) map[string]interface{} { + return map[string]interface{}{ + "keepalive": true, + "request": request, + "key": key, + } +} + +func doRequest(request map[string]interface{}) map[string]interface{} { + req, err := json.Marshal(request) + if err != nil { + panic(err) + } + var res map[string]interface{} + for idx := 0; idx < MAX_RETRY; idx++ { + sock, err := dial() + if err != nil { + panic(err) + } + if _, err = sock.Write(req); err != nil { + panic(err) + } + bs := make([]byte, 65535) + n, err := sock.Read(bs) + if err != nil { + panic(bs) + } + bs = bs[:n] + if err = json.Unmarshal(bs, &res); err != nil { + panic(err) + } + // TODO parse res, check if there's an error + if res, ok := res["response"]; ok { + if _, isIn := res.(map[string]interface{})["error"]; isIn { + continue + } + } + break + } + return res +} + +func getNodeInfo(key string) map[string]interface{} { + return doRequest(getRequest(key, "getNodeInfo")) +} + +func getSelf(key string) map[string]interface{} { + return doRequest(getRequest(key, "debug_remoteGetSelf")) +} + +func getPeers(key string) map[string]interface{} { + return doRequest(getRequest(key, "debug_remoteGetPeers")) +} + +func getDHT(key string) map[string]interface{} { + return doRequest(getRequest(key, "debug_remoteGetDHT")) +} + +type rumorResult struct { + key string + res map[string]interface{} +} + +func doRumor(key string, out chan rumorResult) { + waitgroup.Add(1) + go func() { + defer waitgroup.Done() + if _, known := rumored.LoadOrStore(key, true); known { + return + } + defer rumored.Delete(key) + if _, known := visited.Load(key); known { + return + } + results := make(map[string]interface{}) + if res, ok := getNodeInfo(key)["response"]; ok { + for addr, v := range res.(map[string]interface{}) { + vm, ok := v.(map[string]interface{}) + if !ok { + return + } + results["address"] = addr + results["nodeinfo"] = vm + } + } + if res, ok := getSelf(key)["response"]; ok { + for _, v := range res.(map[string]interface{}) { + vm, ok := v.(map[string]interface{}) + if !ok { + return + } + if coords, ok := vm["coords"]; ok { + results["coords"] = coords + } + } + } + if res, ok := getPeers(key)["response"]; ok { + for _, v := range res.(map[string]interface{}) { + vm, ok := v.(map[string]interface{}) + if !ok { + return + } + if keys, ok := vm["keys"]; ok { + results["peers"] = keys + for _, key := range keys.([]interface{}) { + doRumor(key.(string), out) + } + } + } + } + if res, ok := getDHT(key)["response"]; ok { + for _, v := range res.(map[string]interface{}) { + vm, ok := v.(map[string]interface{}) + if !ok { + return + } + if keys, ok := vm["keys"]; ok { + results["dht"] = keys + for _, key := range keys.([]interface{}) { + doRumor(key.(string), out) + } + } + } + } + if len(results) > 0 { + if _, known := visited.LoadOrStore(key, true); known { + return + } + results["time"] = time.Now().Unix() + out <- rumorResult{key, results} + } + }() +} + +func doPrinter() (chan rumorResult, chan struct{}) { + results := make(chan rumorResult) + done := make(chan struct{}) + go func() { + defer close(done) + fmt.Println("{\"yggnodes\": {") + var notFirst bool + for result := range results { + // TODO correct output + res, err := json.Marshal(result.res) + if err != nil { + panic(err) + } + if notFirst { + fmt.Println(",") + } + fmt.Printf("\"%s\": %s", result.key, res) + notFirst = true + } + fmt.Println("\n}}") + }() + return results, done +} + +func main() { + self := doRequest(map[string]interface{}{"keepalive": true, "request": "getSelf"}) + res := self["response"].(map[string]interface{})["self"].(map[string]interface{}) + var key string + for _, v := range res { + key = v.(map[string]interface{})["key"].(string) + } + results, done := doPrinter() + doRumor(key, results) + waitgroup.Wait() + close(results) + <-done +} diff --git a/web/updateGraph.py b/web/updateGraph.py index 6d3cb97..a7210ad 100755 --- a/web/updateGraph.py +++ b/web/updateGraph.py @@ -1,11 +1,10 @@ #!/usr/bin/env python -from flask import Config -from database import NodeDB import graphPlotter import cgi import urllib, json -url = "http://y.yakamo.org:3000/current" +#url = "http://y.yakamo.org:3000/current" +url = "current" # nodes indexed by coords class NodeInfo: @@ -34,9 +33,18 @@ def generate_graph(time_limit=60*60*3): data = json.loads(response.read())["yggnodes"] toAdd = [] - for ip in data: - info = NodeInfo(ip, data[ip][0]) - if len(data[ip]) >= 3: info.label = data[ip][2] + for key in data: + if 'address' not in data[key] or 'coords' not in data[key]: continue + ip = data[key]['address'] + coords = data[key]['coords'] + info = NodeInfo(ip, coords) + try: + if 'nodeinfo' in data[key]: + if 'name' in data[key]['nodeinfo']: + label = str(data[key]['nodeinfo']['name']) + if len(label) <= 32: + info.label = label + except: pass info.label = cgi.escape(info.label) toAdd.append(info) @@ -67,16 +75,5 @@ def generate_graph(time_limit=60*60*3): with open('static/graph.json', 'w') as f: f.write(js) - -def load_graph_from_db(time_limit): - config = Config('./') - config.from_pyfile('web_config.cfg') - - with NodeDB(config) as db: - nodes = db.get_nodes(time_limit) - edges = db.get_edges(nodes, 60*60*24*7) - return (nodes, edges) - - if __name__ == '__main__': generate_graph()