Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
DISCONTINUED:openSUSE:11.2:Update
ivman
halmount.py
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File halmount.py of Package ivman
#!/usr/bin/python # hal mount script # GPL, by Ludwig Nussel <lnussel@suse.de> # TODO: # - use volume.crypto_luks.clear.backing_volume to detach luks partition on # umount or at least display it somehow import sys import subprocess from os import getuid, getpid from optparse import OptionParser def err(arg): if arg[-1] != "\n": arg += "\n" sys.stderr.write(arg) try: import dbus except: err("you need to install dbus-1-python") sys.exit(1) class Dev: def __init__(self, bus, udi): self.bus = bus self.udi = udi self.update() def update(self): obj = self.bus.get_object("org.freedesktop.Hal", self.udi) props = obj.GetAllProperties(dbus_interface="org.freedesktop.Hal.Device") try: self.device = props["block.device"] except: self.device = "" try: self.mountpoint = props["volume.mount_point"] except: self.mountpoint = "" try: self.label = props["volume.label"] except: self.label = "" try: self.fstype = props["volume.fstype"] except: self.fstype = "" try: self.is_mounted = props["volume.is_mounted"] except: self.is_mounted = False self.has_uid = False self.options = "" try: l = props["volume.mount.valid_options"] for i in l: if i == "uid=": self.has_uid = True if self.options != "": self.options += "," self.options += i except: pass class Mount: def __init__(self): self.mountable = [] self.verbose = False self.bus = dbus.SystemBus() self.hal_obj = self.bus.get_object("org.freedesktop.Hal", "/org/freedesktop/Hal/Manager") self.hal = dbus.Interface(self.hal_obj, "org.freedesktop.Hal.Manager") devices = self.hal.GetAllDevices() for udi in devices: obj = self.bus.get_object("org.freedesktop.Hal", udi) if obj.PropertyExists("volume.fsusage", dbus_interface="org.freedesktop.Hal.Device"): fsusage = obj.GetProperty("volume.fsusage", dbus_interface="org.freedesktop.Hal.Device") if fsusage == "filesystem" or fsusage == "crypto": mdev = Dev(self.bus, udi) self.mountable.append(mdev) def listudi(self): for dev in self.mountable: print dev.device, dev.udi def list(self): for dev in self.mountable: if dev.fstype == "crypto_LUKS": print dev.device, "encrypted" continue if dev.is_mounted == True: print dev.device, "on", dev.mountpoint, else: print dev.device, "->", # python throws an exception when trying to # mount a label with unicode characters in it # anyways. Additionally get rid of control # characters. try: l = dev.label.encode('unicode_escape') except: try: l = dev.label.encode('string_escape') except: l = dev.label if l != "": try: print l, except: print '?', else: print "?", print "type", dev.fstype, if self.verbose == True: haveopts = False if dev.label != "": haveopts = True print "(label=\""+dev.label+"\"", if dev.options != "": if haveopts == False: print "(", print "options=\""+dev.options+"\"", haveopts = True if haveopts == True: print ")", print def cryptosetup(self, dev): import getpass try: pw = getpass.getpass(); except: return 1 if pw == "": return 1 obj = self.bus.get_object("org.freedesktop.Hal", dev.udi) try: if(obj.Setup(pw, dbus_interface="org.freedesktop.Hal.Device.Volume.Crypto")): return 1 except dbus.DBusException, msg: err(dev.device + ": " + str(msg)) return 1 return 0 def cryptoteardown(self, dev): obj = self.bus.get_object("org.freedesktop.Hal", dev.udi) try: if(obj.Teardown(dbus_interface="org.freedesktop.Hal.Device.Volume.Crypto")): return 1 except dbus.DBusException, msg: err(dev.device + ": " + str(msg)) return 1 return 0 def mount(self, name, dest, fs, opt): ret = 0 found = False if name[0:7] == "/media/": name = name[7:] if dest[0:7] == "/media/": dest = dest[7:] if name != "/": name = name.rstrip("/") if dest != "/": dest = dest.rstrip("/") for dev in self.mountable: if (self.all or (name[0:5] == "/dev/" and dev.device == name) or \ dev.label == name): found = True if dev.is_mounted == True: err( dev.device + " is already mounted on " + dev.mountpoint) continue if dev.fstype == "crypto_LUKS": if(self.cryptosetup(dev)): ret += 1 continue; d = dest if d == "": d=dev.label if self.verbose == True: print "mounting", dev.device, "on", d uid = getuid() if uid != 0 and dev.has_uid == True: add_uid_option = True for i in opt: if i[0:4] == "uid=": add_uid_option = False if add_uid_option == True: opt.append("uid="+str(uid)) obj = self.bus.get_object("org.freedesktop.Hal", dev.udi) try: if opt == []: opt = [""] if(obj.Mount(d, fs, opt, dbus_interface="org.freedesktop.Hal.Device.Volume")): ret += 1 except dbus.DBusException, msg: # XXX: looks like we have no way to get # the error name # org.freedesktop.Hal.Device.Volume.InvalidMountpoint if str(msg).find("mountpoint") != -1 and str(msg).find("invalid") != -1: try: obj.Mount("", fs, opt, dbus_interface="org.freedesktop.Hal.Device.Volume") except dbus.DBusException, msg: err(dev.device + ": " + str(msg)) ret += 1 else: if msg.get_dbus_name() == "org.freedesktop.Hal.Device.PermissionDeniedByPolicy": privilege = str(msg)[len("org.freedesktop.Hal.Device.PermissionDeniedByPolicy: "):] privilege = privilege[:privilege.find(" ")] try: bus = dbus.SessionBus() agent = bus.get_object("org.freedesktop.PolicyKit.AuthenticationAgent", "/") ok = agent.ObtainAuthorization(privilege, dbus.UInt32(0), dbus.UInt32(getpid())); if ok == True: self.mountable.append(dev) continue else: err("Missing Privilege: " + privilege); ret += 1 except: err('authentication agent unreachable.\ntry running "polkit-auth --obtain %(p)s"' % { 'p': privilege}) ret += 1 else: err(dev.device + ": " + str(msg)) ret += 1 if ret == 0: dev.update() if dev.is_mounted == True: if d == "" or d != dest: print dev.device, "mounted on", dev.mountpoint else: err("failed to mount " + dev.device) if found == False: if name != "": err(name + " not found") else: err("no device found") ret += 1 return ret def umount(self, name): ret = 0 found = False if name != "/": name = name.rstrip("/") for dev in self.mountable: if (self.all or (name[0:5] == "/dev/" and dev.device == name) or dev.mountpoint == name \ or dev.label == name): found = True; if dev.fstype == "crypto_LUKS": if(self.cryptoteardown(dev)): ret +=1 continue if dev.is_mounted == False: err(dev.device + " is not mounted") continue obj = self.bus.get_object("org.freedesktop.Hal", dev.udi) try: if(obj.Unmount([""], dbus_interface="org.freedesktop.Hal.Device.Volume")): ret += 1 except dbus.DBusException, msg: err(dev.device + ": " + str(msg)) ret += 1; if found == False: if name != "": err(name + " not found") else: err("no device found") ret += 1 return ret def eject(self, name): ret = 0 found = False if name != "/": name = name.rstrip("/") for dev in self.mountable: if (self.all or (name[0:5] == "/dev/" and dev.device == name) or dev.mountpoint == name): found = True; obj = self.bus.get_object("org.freedesktop.Hal", dev.udi) try: if(obj.Eject([""], dbus_interface="org.freedesktop.Hal.Device.Volume")): ret += 1 except dbus.DBusException, msg: err(dev.device + ": " + str(msg)) ret += 1; if found == False: if name != "": err(name + " not found") else: err("no device found") ret += 1 return ret parser = OptionParser(usage="%prog [options] <device|label> [mountpoint]") parser.add_option("-t", dest="fstype", default="", help="file system type") parser.add_option("-o", dest="options", default="", help="mount options") parser.add_option("-v", dest="verbose", action="store_true", default=False, help="verbose listing") parser.add_option("-u", dest="umount", action="store_true", default=False, help="umount") parser.add_option("-e", dest="eject", action="store_true", default=False, help="eject") parser.add_option("-a", dest="all", action="store_true", default=False, help="(u)mount all") parser.add_option("--listudi", dest="listudi", action="store_true", default=False, help="list UDIs") (options, args) = parser.parse_args() try: m = Mount() except dbus.DBusException, msg: err("Can't connect to hald: "+str(msg)) sys.exit(1) m.verbose = options.verbose m.all = options.all ret = 0 if (options.umount == True) or (options.eject == True): if len(args) != 1 and options.all == False: err("specify device name or mountpoint") sys.exit(1) try: name = args[0] except: name = "" if options.umount == True: ret = m.umount(name) else: ret = m.eject(name) elif options.listudi == True: m.listudi() else: if len(args) == 0 and options.all == False: m.list() else: opt = []; if options.options != "": opt = options.options.split(",") try: name = args[0] except: name = "" try: dest = args[1] except: dest = "" ret = m.mount(name, dest, options.fstype, opt) sys.exit(ret)
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor