[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Xen-devel] txenmon: Cluster monitoring/management



On Sun, Feb 08, 2004 at 09:19:56AM +0000, Ian Pratt wrote:
> Of course, this will all be much neater in rev 3 of the domain
> control tools that will use a db backend to maintain state about
> currently running domains across a cluster...

Ack!  We might be doing duplicate work.  How far have you gotten with
this?

Right now I'm running python code (distantly descended from
createlinuxdom.py) that is able to:

- monitor each domain and restart as needed

- migrate domains from one host to another

- dynamically create and assign swap vd's, and garbage collect them at
  shutdown or after crash

...and a few other things.  Right now migration is via reboot, not
suspend; haven't had a chance to troubleshoot resume further.  

The only thing I'm using VD's for at this point is swap.  This code so
far depends on NFS root partitions, all served from a central NFS
server; control and state are communicated via NFS also.  I just today
started migrating the control/state comms to jabber instead, so that I
could start using VD root filesystems after the COW stuff settles down.
Haven't decided what to do for migrating filesystems between nodes in
that case though.

Right now I'm calling this 'txenmon' (TerraLuna Xen Monitor) but was
already considering renaming it 'xenmon' and posting it after I got it
cleaned up.  

This is all to support a production Xen cluster rollout that I plan to
have running by the end of this month.  I really don't want to go back
to UML at this point, and if I don't have this cluster running by March
I'm in deep doo-doo -- so I'm committed to working full-time on Xen
tools now.  ;-}

So here's the current version, not cleaned up, way too verbose, crufty,
but running:

Steve


#!/usr/bin/python2.2

import Xc, XenoUtil, string, sys, os, time, socket, cPickle

# initialize a few variables that might come in handy
thishostname = socket.gethostname()
if not len(sys.argv) >= 2:
    print "usage: %s /path/to/base/of/users/hosts" % sys.argv[0]
    sys.exit(1)

nfsserv="10.27.2.50"

base = sys.argv[1]
if len(base) > 1 and base.endswith('/'):
    base=base[:-1]

# Obtain an instance of the Xen control interface
xc = Xc.new()

# daemonize
daemonize=0
if daemonize:
    try: 
        pid = os.fork() 
        if pid > 0:
            # exit first parent
            sys.exit(0) 
    except OSError, e: 
        print >>sys.stderr, "fork #1 failed: %d (%s)" % (e.errno, e.strerror) 
        sys.exit(1)
    # decouple from parent environment
    # os.chdir("/") 
    os.setsid() 
    os.umask(0) 
    # XXX what about stdout etc?
    # do second fork
    try: 
        pid = os.fork() 
        if pid > 0:
            # exit from second parent, print eventual PID before
            # print "Daemon PID %d" % pid 
            sys.exit(0) 
    except OSError, e: 
        print >>sys.stderr, "fork #2 failed: %d (%s)" % (e.errno, e.strerror) 
        sys.exit(1) 

def main():
    while 1:
        guests=getGuests(base)
        # state machine
        for guest in guests:
            print guest.path,guest.activeHost,guest.isRunning()
            if guest.isMine():
                if guest.isRunningHere():
                    guest.heartbeat()
                if guest.isRunnable():
                    if guest.isRunningHere():
                        pass
                    else:
                        if guest.isRunning():
                            print "warning: %s is running on %s" % (
                                guest.name, guest.activeHost
                            )
                        else:
                            guest.start()
                else: # not guest.isRunnable()
                    if guest.isRunningHere():
                        guest.shutdown()
                    if guest.isHung():
                        guest.destroy()
            else: # not guest.isMine()
                if guest.isRunningHere():
                    guest.shutdown()
                if guest.isRunning():
                    pass
                else:
                    print "warning: %s is not running on %s" % (
                        guest.name,guest.ctl('host')
                    )
        # end state machine
        # garbage collect vd's
        usedVds=[]
        for guest in guests:
            if guest.isRunningHere():
                usedVds+=guest.vds()
                guest.pickle()
        for vd in listvdids():
            print "usedVds =",usedVds
            if vd in usedVds:
                pass
            else:
                print "deleting vd %s" % vd
                XenoUtil.vd_delete(vd)
        # garbage collect domains
        # XXX
        time.sleep(10)
    # end while

def getGuests(base):
    users=os.listdir(base)
    guests=[]
    for user in users:
        if not os.path.isdir("%s/%s" % (base,user)):
            continue
        guestnames=os.listdir("%s/%s" % (base,user))
        for name in guestnames:
            path="%s/%s/%s" % (base,user,name)
            try:
                try:
                    file=open("%s/log/pickle" % path,"r")
                    guest=cPickle.load(file)
                    file.close()
                except:
                    print "creating",path
                    guest=Guest(path)
            except:
                print "exception creating guest %s/%s: %s" % (
                    user,
                    name,
                    sys.exc_info()[1].__dict__
                )
                continue
            guests.append(guest)
    return guests

def listvdids():
    vdids=[]
    for vbd in XenoUtil.vd_list():
        vdids.append(vbd['vdisk_id'])
    print "listvdids =", vdids
    return vdids


class Guest(object):

    def __init__(self,path):
        self.reload(path)

    def reload(self,path):
        pathparts=path.split('/')
        name=pathparts.pop()
        user=pathparts.pop()
        base='/'.join(pathparts)
        self.path=path
        self.base=base
        self.user=user
        self.name=name
        self.domain_name="%s" % name
        self.ctlcache={}
        # requested domain id number
        self.domid=self.ctl("domid")
        # kernel
        self.image=self.ctl("kernel")
        # memory
        self.memory_megabytes=int(self.ctl("mem"))
        swap=self.ctl("swap")
        (swap_dev,swap_megabytes) = swap.split(",")
        self.swap_dev=swap_dev
        self.swap_megabytes=int(swap_megabytes)
        # ip
        self.ipaddr    = [self.ctl("ip")]
        self.netmask = XenoUtil.get_current_ipmask()
        self.gateway = self.ctl("gw")
        # vbd's
        vbds = []
        vbdfile = open("%s/ctl/vbds" % self.path,"r")
        for line in vbdfile.readlines():
            print line
            ( uname, virt_name, rw ) = line.split(',')
            uname = uname.strip()
            virt_name = virt_name.strip()
            rw = rw.strip()
            vbds.append(( uname, virt_name, rw ))
        self.vbds=vbds
        self.vbd_expert = 0
        # build kernel command line
        ipbit    = "ip="+self.ipaddr[0]
        ipbit += ":"+nfsserv
        ipbit += ":"+self.gateway+":"+self.netmask+"::eth0:off"
        rootbit = "root=/dev/nfs nfsroot=/export/%s/root" % path
        extrabit = "4 DOMID=%s " % self.domid 
        self.cmdline = ipbit +" "+ rootbit +" "+ extrabit
        self.curid=None
        self.swapvdid=None
        self.shutdownTime=None
        self.activeHost=None


    def ctl(self,var):
        filename="%s/ctl/%s" % (self.path,var)
        # if not hasattr(self,'ctlcache'):
        #     print dir(self)
        #     print self.path
        #     self.ctlcache={}
        if not self.ctlcache.has_key('filename'):
            self.ctlcache[filename]={'mtime': 0, 'val': None}
        val=None
        mtime=os.path.getmtime(filename)
        if self.ctlcache[filename]['mtime'] < mtime:
            val = open(filename,"r").readline().strip()
            self.ctlcache[filename]={'mtime': mtime, 'val': val}
        else:
            val = self.ctlcache[filename]['val']
        return val
    
    def destroy(self):
        print "destroying %s" % self.domain_name
        # print "now curid =",self.curid
        if self.curid == 0:
            raise "attempt to kill dom0" 
        xc.domain_destroy(dom=self.curid,force=True)

    def heartbeat(self):
        assert self.isRunningHere()
        # update swap expiry to one day
        try:
            XenoUtil.vd_refresh(self.swapvdid, 86400)
        except:
            print "%s missed swap expiry update: %s" % (
                self.domain_name,
                sys.exc_info()[1].__dict__
            )
        self.activeHost=thishostname
        self.pickle()

    def isHung(self):
        if not self.isRunningHere():
            return False
        if self.shutdownTime and time.time() - self.shutdownTime > 300:
            return True
        return False

    def isMine(self):
        if self.ctl("host") == thishostname:
            return True
        return False

    def isRunnable(self):
        run=int(self.ctl("run"))
        if run > 0:
            return True
        return False

    def isRunning(self):
        if self.isRunningHere():
            return True
        else:
            host=self.activeHost
            if host == None:
                return None
            if host == thishostname:
                return False
        filename="%s/log/%s" % (self.path,"pickle")
        mtime=None
        try:
            mtime=os.path.getmtime(filename)
        except:
            return False
        now=time.time()
        if now - mtime < 60:
            return True
        return False

    def isRunningHere(self):
        if not self.curid or self.curid == 0:
            return False
        domains=xc.domain_getinfo()
        domids = [ d['dom'] for d in domains ]
        if self.curid in domids:
            # print self.curid
            return True
        self.curid=None
        return False

    def XXXlog(self,var,val=None,append=False):
        filename="%s/log/%s" % (self.path,var)
        if val==None:
            out=None
            try:
                out=open(filename,"r").readlines()
            except:
                return None
            out=[l.strip() for l in out]
            return out
        mode="w"
        if append:
            mode="a"
        file=open(filename,mode)
        file.write("%s\n" % str(val))
        file.close()

    def mkswap(self):
        # create swap, 1 minute expiry 
        vdid=XenoUtil.vd_create(self.swap_megabytes,60)
        # print "vdid =",vdid
        self.swapvdid=vdid
        uname="vd:%s" % vdid
        # format it
        segments = XenoUtil.lookup_disk_uname(uname)
        if XenoUtil.vd_extents_validate(segments,1) < 0:
            print "segment conflict on %s" % uname
            sys.exit(1)
        tmpdev="/dev/xenswap%s" % vdid
        cmd="mknod %s b 125 %s" % (tmpdev,vdid)
        os.system(cmd)
        virt_dev = XenoUtil.blkdev_name_to_number(tmpdev)
        xc.vbd_create(0,virt_dev,1)
        xc.vbd_setextents(0,virt_dev,segments)
        cmd="mkswap %s" % tmpdev
        os.system(cmd)
        xc.vbd_destroy(0,virt_dev)
        self.vbds.append(( uname, self.swap_dev, "w" ))
        print "mkswap:",uname, self.swap_dev, "w"
        print self.vbds

    def pickle(self):
        assert self.isRunningHere()
        # write then rename so others see an atomic operation...
        file=open("%s/log/pickle.new" % self.path,"w")
        cPickle.dump(self,file)
        file.close()
        os.rename(
            "%s/log/pickle.new" % self.path, 
            "%s/log/pickle"     % self.path
        )

    def shutdown(self):
        print "shutting down %s" % self.name
        # reduce swap expiry to 10 minutes (to give it time to shut down)
        if self.swapvdid:
            XenoUtil.vd_refresh(self.swapvdid, 600)
        xc.domain_destroy(dom=self.curid)
        if not self.shutdownTime:
            self.shutdownTime=time.time()

    def start(self):
        """Create, build and start the domain for this guest."""
        self.reload(self.path)
        image=self.image
        memory_megabytes=self.memory_megabytes
        domain_name=self.domain_name
        ipaddr=self.ipaddr
        netmask=self.netmask
        vbds=self.vbds
        cmdline=self.cmdline
        vbd_expert=self.vbd_expert
        
        print "Domain image                    : ", self.image
        print "Domain memory                 : ", self.memory_megabytes
        print "Domain IP address(es) : ", self.ipaddr 
        print "Domain block devices    : ", self.vbds
        print 'Domain cmdline                : "%s"' % self.cmdline

        if self.isRunning():
            raise "%s already running on %s" % (self.name,self.activeHost)

        if not os.path.isfile( image ):
            print "Image file '" + image + "' does not exist"
            return None

        id = xc.domain_create( mem_kb=memory_megabytes*1024, name=domain_name )
        print "Created new domain with id = " + str(id)
        if id <= 0:
            print "Error creating domain"
            return None

        ret = xc.linux_build( dom=id, image=image, cmdline=cmdline )
        if ret < 0:
            print "Error building Linux guest OS: "
            print "Return code from linux_build = " + str(ret)
            xc.domain_destroy ( dom=id )
            return None

        # setup the virtual block devices
        # set the expertise level appropriately
        XenoUtil.VBD_EXPERT_MODE = vbd_expert
        
        self.mkswap()

        self.datavds=[]
        for ( uname, virt_name, rw ) in vbds:
            virt_dev = XenoUtil.blkdev_name_to_number( virt_name )
            segments = XenoUtil.lookup_disk_uname( uname )
            if not segments or segments < 0:
                print "Error looking up %s\n" % uname
                xc.domain_destroy ( dom=id )
                return None

            # check that setting up this VBD won't violate the sharing
            # allowed by the current VBD expertise level
            # print uname, virt_name, rw, segments
            if XenoUtil.vd_extents_validate(segments, rw=='w' or rw=='rw') < 0:
                xc.domain_destroy( dom = id )
                return None
                
            if xc.vbd_create( dom=id, vbd=virt_dev, writeable= rw=='w' or 
rw=='rw' ):
                print "Error creating VBD vbd=%d writeable=%d\n" % (virt_dev,rw)
                xc.domain_destroy ( dom=id )
                return None

            if xc.vbd_setextents( 
                    dom=id,
                    vbd=virt_dev,
                    extents=segments):
                print "Error populating VBD vbd=%d\n" % virt_dev
                xc.domain_destroy ( dom=id )
                return None
            self.datavds.append(virt_dev)


        # setup virtual firewall rules for all aliases
        for ip in ipaddr:
            XenoUtil.setup_vfr_rules_for_vif( id, 0, ip )

        if xc.domain_start( dom=id ) < 0:
            print "Error starting domain"
            xc.domain_destroy ( dom=id )
            sys.exit()

        self.curid=id
        print "domain (re)started: %s (%d)" % (domain_name,id)
        self.heartbeat()
        return id


    def vds(self):
        vds=[]
        # XXX add data vbds
        vds.append(self.swapvdid)
        return vds


        

main()


-- 
Stephen G. Traugott  (KG6HDQ)
UNIX/Linux Infrastructure Architect, TerraLuna LLC
stevegt@xxxxxxxxxxxxx 
http://www.stevegt.com -- http://Infrastructures.Org 


-------------------------------------------------------
The SF.Net email is sponsored by EclipseCon 2004
Premiere Conference on Open Tools Development and Integration
See the breadth of Eclipse activity. February 3-5 in Anaheim, CA.
http://www.eclipsecon.org/osdn
_______________________________________________
Xen-devel mailing list
Xen-devel@xxxxxxxxxxxxxxxxxxxxx
https://lists.sourceforge.net/lists/listinfo/xen-devel


 


Rackspace

Lists.xenproject.org is hosted with RackSpace, monitoring our
servers 24x7x365 and backed by RackSpace's Fanatical Support®.