Download

Documentation

Community

Development

Ticket #8: autodiscovery4

File autodiscovery4, 15.3 KB (added by redduck666, 5 months ago)
Line 
1diff --git a/pydra_server/cluster/amf/interface.py b/pydra_server/cluster/amf/interface.py
2index 0df523b..f19840a 100644
3--- a/pydra_server/cluster/amf/interface.py
4+++ b/pydra_server/cluster/amf/interface.py
5@@ -211,6 +211,20 @@ class AMFInterface(pb.Root):
6         """
7         return self.master._running
8 
9+    @authenticated
10+    def list_known_nodes(self, _):
11+        """
12+        list know_nodes
13+        """
14+        # cast to list, doesn't seem to digest set
15+        return list(self.master.known_nodes)
16+
17+    @authenticated
18+    def connect(self, _):
19+        """
20+        allows the gui to make the master aware of the new node without restart
21+        """
22+        return self.master.connect()
23 
24     @authenticated
25     def run_task(self, _, task_key, args=None):
26diff --git a/pydra_server/cluster/auth/master_avatar.py b/pydra_server/cluster/auth/master_avatar.py
27index e0e750e..6bf9d99 100644
28--- a/pydra_server/cluster/auth/master_avatar.py
29+++ b/pydra_server/cluster/auth/master_avatar.py
30@@ -38,6 +38,13 @@ class MasterAvatar(RSAAvatar):
31         logger.info('Master connected to node')
32 
33 
34+    def perspective_get_key(self):
35+        """
36+        Return the node's public key, so we can do a duplicate detection on the
37+        master side
38+        """
39+        return self.chunks()
40+
41     def save_key(self, json_key):
42         """
43         Callback to save public key from the master       
44diff --git a/pydra_server/cluster/auth/rsa_auth.py b/pydra_server/cluster/auth/rsa_auth.py
45index 230c1c2..f9767be 100644
46--- a/pydra_server/cluster/auth/rsa_auth.py
47+++ b/pydra_server/cluster/auth/rsa_auth.py
48@@ -142,13 +142,19 @@ class RSAAvatar(pb.Avatar):
49         #send the nodes public key.  serialize it and encrypt it
50         #the key must be broken into chunks for it to be signed
51         #for ease recompiling it we'll store the chunks as a list
52+        key_chunks = []
53+        for i in self.chunks():
54+            enc = self.client_key.encrypt(i, None)
55+            key_chunks.append(enc[0])
56+
57+        return key_chunks
58+       
59+    def chunks(self):
60         json_key = simplejson.dumps(self.server_pub_key)
61         key_chunks = []
62         chunk = 128
63         for i in range(int(math.ceil(len(json_key)/(chunk*1.0)))):
64-            enc = self.client_key.encrypt(json_key[i*chunk:i*chunk+chunk], None)
65-            key_chunks.append(enc[0])
66-
67+            key_chunks.append(json_key[i*chunk:i*chunk+chunk])
68         return key_chunks
69 
70 
71diff --git a/pydra_server/cluster/master.py b/pydra_server/cluster/master.py
72index ef16f30..8859448 100755
73--- a/pydra_server/cluster/master.py
74+++ b/pydra_server/cluster/master.py
75@@ -46,6 +46,12 @@ import datetime
76 
77 from threading import Lock
78 
79+# should be executed before any other reactor stuff to prevent from using non
80+# glib2 event loop which we need for dbus
81+
82+from twisted.internet import glib2reactor
83+glib2reactor.install()
84+
85 from zope.interface import implements
86 from twisted.cred import portal, checkers
87 from twisted.spread import pb
88@@ -56,6 +62,8 @@ from twisted.web import server, resource
89 from twisted.cred import credentials
90 from django.utils import simplejson
91 import settings
92+import dbus, avahi
93+from dbus.mainloop.glib import DBusGMainLoop
94 
95 from pydra_server.models import Node, TaskInstance
96 from pydra_server.cluster.constants import *
97@@ -123,6 +131,7 @@ class Master(object):
98         #cluster management
99         self.workers = {}
100         self.nodes = self.load_nodes()
101+        self.known_nodes = set()
102         self._workers_idle = []
103         self._workers_working = {}
104 
105@@ -143,6 +152,54 @@ class Master(object):
106 
107         self.host = 'localhost'
108         self.port = 18800
109+        self.autodiscovery()
110+
111+    def autodiscovery(self, callback=None):
112+        """
113+        set up the dbus loop, and add the callbacks for adding nodes on the fly
114+
115+        based on http://avahi.org/wiki/PythonBrowseExample
116+        """
117+        from pydra_server.models import pydraSettings
118+
119+        def service_resolved(*args):
120+            # at this point we have all the info about the node we need
121+            if pydraSettings.multicast_all:
122+
123+                # add the node (without the restart)
124+                Node.objects.create(host=args[7], port=args[8])
125+                self.connect()
126+            else:
127+                self.known_nodes.add((args[7], args[8]))
128+
129+        def print_error(*args):
130+            logger.info("Couldn't resolve avahi name: %s" % str(args))
131+
132+        def node_found(interface, protocol, name, stype, domain, flags):
133+            if flags & avahi.LOOKUP_RESULT_LOCAL:
134+                    # local service, skip
135+                    pass
136+
137+            server.ResolveService(interface, protocol, name, stype,
138+                domain, avahi.PROTO_UNSPEC, dbus.UInt32(0),
139+                reply_handler=service_resolved, error_handler=print_error)
140+
141+
142+        # initialize dbus stuff needed for discovery
143+        loop = DBusGMainLoop()
144+
145+        bus = dbus.SystemBus(mainloop=loop)
146+
147+        server = dbus.Interface( bus.get_object(avahi.DBUS_NAME, '/'),
148+                'org.freedesktop.Avahi.Server')
149+
150+        sbrowser = dbus.Interface(bus.get_object(avahi.DBUS_NAME,
151+                server.ServiceBrowserNew(avahi.IF_UNSPEC,
152+                    avahi.PROTO_UNSPEC, '_pydra._tcp', 'local', dbus.UInt32(0))),
153+                avahi.DBUS_INTERFACE_SERVICE_BROWSER)
154+
155+        sbrowser.connect_to_signal("ItemNew", node_found)
156+
157 
158     def get_services(self):
159         """
160@@ -215,6 +272,13 @@ class Master(object):
161         with self._lock:
162             self.connecting=True
163 
164+            # make sure the verious states are in sync
165+            for i in Node.objects.all():
166+                if i.id not in self.nodes:
167+                    self.nodes[i.id] = i
168+                if (i.host, i.port) in self.known_nodes:
169+                    self.known_nodes.discard((i.host, i.port))
170+
171             "Begin the connection process"
172             connections = []
173             self.attempts = []
174@@ -261,14 +325,10 @@ class Master(object):
175             if result[0]:
176                 # save reference for remote calls
177                 node.ref = result[1]
178+                d = node.ref.callRemote('get_key')
179+                d.addCallback(self.check_node, node)
180 
181 
182-                logger.info('node:%s:%s - connected' % (node.host, node.port))
183-
184-                # Authenticate with the node
185-                pub_key = node.load_pub_key()
186-                self.rsa_client.auth(node.ref, self.receive_key_node, server_key=pub_key, node=node)
187-
188             #failures
189             else:
190                 logger.error('node:%s:%s - failed to connect' % (node.host, node.port))
191@@ -283,6 +343,21 @@ class Master(object):
192         else:
193             self.reconnect_count = 0
194 
195+    def check_node(self, key, node):
196+        # node.pub_key is set only for paired nodes, make sure we don't attempt
197+        # to pair with a known pub key
198+        duplicate = ''.join(key) in [i.pub_key for i in self.nodes.values()]
199+        if duplicate and not node.pub_key:
200+            logger.info('deleting %s:%s - duplicate' % (node.host, node.port))
201+            node.delete()
202+            return
203+
204+        # Authenticate with the node
205+        pub_key = node.load_pub_key()
206+        self.rsa_client.auth(node.ref, self.receive_key_node, server_key=pub_key, node=node)
207+
208+        logger.info('node:%s:%s - connected' % (node.host, node.port))
209+
210 
211     def reconnect_nodes(self, reset_counter=False):
212         """
213diff --git a/pydra_server/cluster/node.py b/pydra_server/cluster/node.py
214index d6e9d2a..8b54fb7 100755
215--- a/pydra_server/cluster/node.py
216+++ b/pydra_server/cluster/node.py
217@@ -46,6 +46,8 @@ from twisted.application import service, internet
218 
219 import os
220 from subprocess import Popen
221+import platform, dbus, avahi
222+from django.utils import simplejson
223 from pydra_server.cluster.auth.rsa_auth import load_crypto
224 from pydra_server.cluster.auth.master_avatar import MasterAvatar
225 
226@@ -55,6 +57,45 @@ import settings
227 from pydra_server.logging.logger import init_logging
228 logger = init_logging(settings.LOG_FILENAME_NODE)
229 
230+class ZeroconfService:
231+    """A simple class to publish a network service with zeroconf using
232+    avahi.
233+
234+    Shamelessly stolen from http://stackp.online.fr/?p=35
235+    """
236+
237+    def __init__(self, name, port, stype="_http._tcp",
238+                 domain="", host="", text=""):
239+        self.name = name
240+        self.stype = stype
241+        self.domain = domain
242+        self.host = host
243+        self.port = port
244+        self.text = text
245+
246+    def publish(self):
247+        bus = dbus.SystemBus()
248+        server = dbus.Interface(
249+                         bus.get_object(
250+                                 avahi.DBUS_NAME,
251+                                 avahi.DBUS_PATH_SERVER),
252+                        avahi.DBUS_INTERFACE_SERVER)
253+
254+        g = dbus.Interface(
255+                    bus.get_object(avahi.DBUS_NAME,
256+                                   server.EntryGroupNew()),
257+                    avahi.DBUS_INTERFACE_ENTRY_GROUP)
258+
259+        g.AddService(avahi.IF_UNSPEC, avahi.PROTO_UNSPEC,dbus.UInt32(0),
260+                     self.name, self.stype, self.domain, self.host,
261+                     dbus.UInt16(self.port), self.text)
262+
263+        g.Commit()
264+        self.group = g
265+
266+    def unpublish(self):
267+        self.group.Reset()
268+
269 
270 class NodeServer:
271     """
272@@ -81,6 +122,10 @@ class NodeServer:
273         # get information about the server
274         self.determine_info()
275 
276+        service = ZeroconfService(name=platform.node(), port=self.port,
277+            stype="_pydra._tcp")
278+        service.publish()
279+
280         logger.info('Node - starting server on port %s' % self.port)
281 
282 
283@@ -142,7 +187,6 @@ class NodeServer:
284         Exchange public keys with the master.  This allows the Master
285         to authenticate in the future using the keypair handshake
286         """
287-        from django.utils import simplejson
288         from Crypto.PublicKey import RSA
289         from twisted.conch.ssh.keys import Key
290         import math
291diff --git a/pydra_server/models.py b/pydra_server/models.py
292index 36644f2..c50ce6e 100644
293--- a/pydra_server/models.py
294+++ b/pydra_server/models.py
295@@ -29,8 +29,9 @@ Settings
296 from _mysql_exceptions import ProgrammingError
297 try:
298     class PydraSettings(dbsettings.Group):
299-        host        = dbsettings.StringValue('host', 'IP Address or hostname for this server.  This value will be used by all nodes in the cluster to connect', default='localhost')
300-        port        = dbsettings.IntegerValue('port','Port for this server', default=18800)
301+        host                = dbsettings.StringValue('host', 'IP Address or hostname for this server.  This value will be used by all nodes in the cluster to connect', default='localhost')
302+        port             = dbsettings.IntegerValue('port','Port for this server', default=18800)
303+        multicast_all    = dbsettings.BooleanValue('multicast_all', 'Automatically use all the nodes found', default=False)
304     pydraSettings = PydraSettings('Pydra')
305 
306 except ProgrammingError:
307diff --git a/pydra_server/templates/discover.html b/pydra_server/templates/discover.html
308new file mode 100644
309index 0000000..657dae8
310--- /dev/null
311+++ b/pydra_server/templates/discover.html
312@@ -0,0 +1,32 @@
313+{% extends "base.html" %}
314+<!--
315+    Copyright 2009 Oregon State University
316+
317+    This file is part of Pydra.
318+
319+    Pydra is free software: you can redistribute it and/or modify
320+    it under the terms of the GNU General Public License as published by
321+    the Free Software Foundation, either version 3 of the License, or
322+    (at your option) any later version.
323+
324+    Pydra is distributed in the hope that it will be useful,
325+    but WITHOUT ANY WARRANTY; without even the implied warranty of
326+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
327+    GNU General Public License for more details.
328+
329+    You should have received a copy of the GNU General Public License
330+    along with Pydra.  If not, see <http://www.gnu.org/licenses/>.
331+-->
332+{% block content %}
333+{% if known_nodes %}
334+    Please put the checkbox next to the Node's you would like to see added and hit submit.
335+    <form method="post" action=".">
336+    {% for field in known_nodes %}
337+        <p>{{ field.0 }}:{{ field.1 }} <input type="checkbox" name="{{ field.0 }}:{{ field.1 }}"></p>
338+    {% endfor %}
339+    <input type="submit">
340+    </form>
341+{% else %}
342+    There appear to be no non active discovered nodes.
343+{% endif %}
344+{% endblock %}
345diff --git a/pydra_server/templates/header.html b/pydra_server/templates/header.html
346index 18c8c71..87249e7 100644
347--- a/pydra_server/templates/header.html
348+++ b/pydra_server/templates/header.html
349@@ -88,7 +88,7 @@
350     {% if nodes %}
351         <div id="submenu">
352             <span class="menuitem"><a class="menuitem" href="{{SITE_ROOT}}/nodes/edit">Create Node</a></span>
353-            <span class="menuitem lastmenuitem"><a class="menuitem" href="{{SITE_ROOT}}/nodes/discover">Discover Nodes</a></span>
354+            <span class="menuitem lastmenuitem"><a class="menuitem" href="{{SITE_ROOT}}/nodes/discover/">Discover Nodes</a></span>
355         </div>
356     {% endif %}
357     {% if tasks %}
358diff --git a/pydra_server/templates/nodes.html b/pydra_server/templates/nodes.html
359index 74ba998..5e81472 100644
360--- a/pydra_server/templates/nodes.html
361+++ b/pydra_server/templates/nodes.html
362@@ -281,7 +281,7 @@
363 {% block submenu %}
364     {% if nodes and perms.pydra_server.can_edit_nodes %}
365         <span class="menuitem"><a class="menuitem" href="{{SITE_ROOT}}/nodes/edit">Create Node</a></span>
366-        <span class="menuitem lastmenuitem"><a class="menuitem" href="{{SITE_ROOT}}/nodes/discover">Discover Nodes</a></span>
367+        <span class="menuitem lastmenuitem"><a class="menuitem" href="{{SITE_ROOT}}/nodes/discover/">Discover Nodes</a></span>
368     {% endif %}
369 {% endblock %}
370 
371diff --git a/pydra_server/urls.py b/pydra_server/urls.py
372index d81873e..b760d24 100644
373--- a/pydra_server/urls.py
374+++ b/pydra_server/urls.py
375@@ -29,6 +29,7 @@ urlpatterns = patterns('',
376 
377     # node urls
378     (r'^nodes/$', nodes),
379+    (r'^nodes/discover/$', discover),
380     (r'^nodes/edit/(\d?)$', node_edit),
381     (r'^nodes/status/$', node_status),
382 
383diff --git a/pydra_server/views.py b/pydra_server/views.py
384index 9bd0142..92189fd 100644
385--- a/pydra_server/views.py
386+++ b/pydra_server/views.py
387@@ -134,6 +134,25 @@ def node_edit(request, id=None):
388     }, context_instance=RequestContext(request, processors=[settings_processor]))
389 
390 
391+@user_passes_test(lambda u: u.has_perm('pydra_server.can_edit_nodes'))
392+def discover(request):
393+    """
394+    allow users to activate the nodes that have been discovered via avahi
395+    """
396+    from django import forms
397+    global pydra_controller
398+
399+    if request.method == 'POST':
400+        reconnect = False
401+        for i in request.POST.keys():
402+            host, port = i.split(':')
403+            if not Node.objects.filter(host=host, port=port):
404+                Node.objects.create(host=host, port=port)
405+                reconnect = True
406+        if reconnect:
407+            pydra_controller.remote_connect()
408+    return render_to_response('discover.html', {'known_nodes': pydra_controller.remote_list_known_nodes()})
409+
410 def node_status(request):
411     """
412     Retrieves Status of nodes
413diff --git a/urls.py b/urls.py
414index fa916a8..a2723ab 100644
415--- a/urls.py
416+++ b/urls.py
417@@ -26,5 +26,6 @@ admin.autodiscover()
418 urlpatterns = patterns('',
419     (r'^', include('pydra_server.urls')),
420     (r'^admin/(.*)', admin.site.root),
421+    (r'^settings/', include('dbsettings.urls')),
422 
423 )