filesys.py :  » Web-Frameworks » Zope » Zope-2.6.0 » ZServer » medusa » Python Open Source

Home
Python Open Source
1.3.1.2 Python
2.Ajax
3.Aspect Oriented
4.Blog
5.Build
6.Business Application
7.Chart Report
8.Content Management Systems
9.Cryptographic
10.Database
11.Development
12.Editor
13.Email
14.ERP
15.Game 2D 3D
16.GIS
17.GUI
18.IDE
19.Installer
20.IRC
21.Issue Tracker
22.Language Interface
23.Log
24.Math
25.Media Sound Audio
26.Mobile
27.Network
28.Parser
29.PDF
30.Project Management
31.RSS
32.Search
33.Security
34.Template Engines
35.Test
36.UML
37.USB Serial
38.Web Frameworks
39.Web Server
40.Web Services
41.Web Unit
42.Wiki
43.Windows
44.XML
Python Open Source » Web Frameworks » Zope 
Zope » Zope 2.6.0 » ZServer » medusa » filesys.py
# -*- Mode: Python; tab-width: 4 -*-
#  $Id: filesys.py,v 1.11 2002/04/12 14:08:12 andreasjung Exp $
#  Author: Sam Rushing <rushing@nightmare.com>
#
# Generic filesystem interface.
#

# We want to provide a complete wrapper around any and all
# filesystem operations.

# this class is really just for documentation,
# identifying the API for a filesystem object.

# opening files for reading, and listing directories, should
# return a producer.

class abstract_filesystem:
    def __init__ (self):
        pass
        
    def current_directory (self):
        "Return a string representing the current directory."
        pass
        
    def listdir (self, path, long=0):
        """Return a listing of the directory at 'path' The empty string
        indicates the current directory.  If 'long' is set, instead
        return a list of (name, stat_info) tuples
        """
        pass
        
    def open (self, path, mode):
        "Return an open file object"
        pass
        
    def stat (self, path):
        "Return the equivalent of os.stat() on the given path."
        pass
        
    def isdir (self, path):
        "Does the path represent a directory?"
        pass
        
    def isfile (self, path):
        "Does the path represent a plain file?"
        pass
        
    def cwd (self, path):
        "Change the working directory."
        pass
        
    def cdup (self):
        "Change to the parent of the current directory."
        pass
        
        
    def longify (self, path):
        """Return a 'long' representation of the filename
        [for the output of the LIST command]"""
        pass
        
        # standard wrapper around a unix-like filesystem, with a 'false root'
        # capability.
        
        # security considerations: can symbolic links be used to 'escape' the
        # root?  should we allow it?  if not, then we could scan the
        # filesystem on startup, but that would not help if they were added
        # later.  We will probably need to check for symlinks in the cwd method.
        
        # what to do if wd is an invalid directory?
        
import os,re
import stat
import string

def safe_stat (path):
    try:
        return (path, os.stat (path))
    except:
        return None
        
import glob

class os_filesystem:
    path_module = os.path
    
    # set this to zero if you want to disable pathname globbing.
    # [we currently don't glob, anyway]
    do_globbing = 1
    
    def __init__ (self, root, wd='/'):
        self.root = root
        self.wd = wd
        
    def current_directory (self):
        return self.wd
        
    def isfile (self, path):
        p = self.normalize (self.path_module.join (self.wd, path))
        return self.path_module.isfile (self.translate(p))
        
    def isdir (self, path):
        p = self.normalize (self.path_module.join (self.wd, path))
        return self.path_module.isdir (self.translate(p))
        
    def cwd (self, path):
        p = self.normalize (self.path_module.join (self.wd, path))
        translated_path = self.translate(p)
        if not self.path_module.isdir (translated_path):
            return 0
        else:
            old_dir = os.getcwd()
            # temporarily change to that directory, in order
            # to see if we have permission to do so.
            try:
                can = 0
                try:
                    os.chdir (translated_path)
                    can = 1
                    self.wd = p
                except:
                    pass
            finally:
                if can:
                    os.chdir (old_dir)
            return can
            
    def cdup (self):
        return self.cwd ('..')
        
    def listdir (self, path, long=0):
        p = self.translate (path)
        # I think we should glob, but limit it to the current
        # directory only.
        ld = os.listdir (p)
        if not long:
            return list_producer (ld, 0, None)
        else:
            old_dir = os.getcwd()
            try:
                os.chdir (p)
                # if os.stat fails we ignore that file.
                result = filter (None, map (safe_stat, ld))
            finally:
                os.chdir (old_dir)
            return list_producer (result, 1, self.longify)
            
            # TODO: implement a cache w/timeout for stat()
    def stat (self, path):
        p = self.translate (path)
        return os.stat (p)
        
    def open (self, path, mode):
        p = self.translate (path)
        return open (p, mode)
        
    def unlink (self, path):
        p = self.translate (path)
        return os.unlink (p)
        
    def mkdir (self, path):
        p = self.translate (path)
        return os.mkdir (p)
        
    def rmdir (self, path):
        p = self.translate (path)
        return os.rmdir (p)
        
        # utility methods
    def normalize (self, path):
            # watch for the ever-sneaky '/+' path element
        path = re.sub ('/+', '/', path)
        p = self.path_module.normpath (path)
        # remove 'dangling' cdup's.
        if len(p) > 2 and p[:3] == '/..':
            p = '/'
        return p
        
    def translate (self, path):
            # we need to join together three separate
            # path components, and do it safely.
            # <real_root>/<current_directory>/<path>
            # use the operating system's path separator.
        path = string.join (string.split (path, '/'), os.sep)
        p = self.normalize (self.path_module.join (self.wd, path))
        p = self.normalize (self.path_module.join (self.root, p[1:]))
        return p
        
    def longify (self, (path, stat_info)):
        return unix_longify (path, stat_info)
        
    def __repr__ (self):
        return '<unix-style fs root:%s wd:%s>' % (
                self.root,
                self.wd
                )
        
if os.name == 'posix':

    class unix_filesystem (os_filesystem):
        pass
        
    class schizophrenic_unix_filesystem (os_filesystem):
        PROCESS_UID    = os.getuid()
        PROCESS_EUID  = os.geteuid()
        PROCESS_GID    = os.getgid()
        PROCESS_EGID  = os.getegid()
        
        def __init__ (self, root, wd='/', persona=(None, None)):
            os_filesystem.__init__ (self, root, wd)
            self.persona = persona
            
        def become_persona (self):
            if self.persona is not (None, None):
                uid, gid = self.persona
                # the order of these is important!
                os.setegid (gid)
                os.seteuid (uid)
                
        def become_nobody (self):
            if self.persona is not (None, None):
                os.seteuid (self.PROCESS_UID)
                os.setegid (self.PROCESS_GID)
                
                # cwd, cdup, open, listdir
        def cwd (self, path):
            try:
                self.become_persona()
                return os_filesystem.cwd (self, path)
            finally:
                self.become_nobody()
                
        def cdup (self, path):
            try:
                self.become_persona()
                return os_filesystem.cdup (self)
            finally:
                self.become_nobody()
                
        def open (self, filename, mode):
            try:
                self.become_persona()
                return os_filesystem.open (self, filename, mode)
            finally:
                self.become_nobody()
                
        def listdir (self, path, long=0):
            try:
                self.become_persona()
                return os_filesystem.listdir (self, path, long)
            finally:
                self.become_nobody()
                
                # This hasn't been very reliable across different platforms.
                # maybe think about a separate 'directory server'.
                #
                #  import posixpath
                #  import fcntl
                #  import FCNTL
                #  import select
                #  import asyncore
                #
                #  # pipes /bin/ls for directory listings.
                #  class unix_filesystem (os_filesystem):
                #    pass
                #     path_module = posixpath
                #
                #     def listdir (self, path, long=0):
                #       p = self.translate (path)
                #       if not long:
                #         return list_producer (os.listdir (p), 0, None)
                #       else:
                #         command = '/bin/ls -l %s' % p
                #         print 'opening pipe to "%s"' % command
                #         fd = os.popen (command, 'rt')
                #         return pipe_channel (fd)
                #
                #   # this is both a dispatcher, _and_ a producer
                #   class pipe_channel (asyncore.file_dispatcher):
                #     buffer_size = 4096
                #
                #     def __init__ (self, fd):
                #       asyncore.file_dispatcher.__init__ (self, fd)
                #       self.fd = fd
                #       self.done = 0
                #       self.data = ''
                #
                #     def handle_read (self):
                #       if len (self.data) < self.buffer_size:
                #         self.data = self.data + self.fd.read (self.buffer_size)
                #       #print '%s.handle_read() => len(self.data) == %d' % (self, len(self.data))
                #
                #     def handle_expt (self):
                #       #print '%s.handle_expt()' % self
                #       self.done = 1
                #
                #     def ready (self):
                #       #print '%s.ready() => %d' % (self, len(self.data))
                #       return ((len (self.data) > 0) or self.done)
                #
                #     def more (self):
                #       if self.data:
                #         r = self.data
                #         self.data = ''
                #       elif self.done:
                #         self.close()
                #         self.downstream.finished()
                #         r = ''
                #       else:
                #         r = None
                #       #print '%s.more() => %s' % (self, (r and len(r)))
                #       return r
                
                # For the 'real' root, we could obtain a list of drives, and then
                # use that.  Doesn't win32 provide such a 'real' filesystem?
                # [yes, I think something like this "\\.\c\windows"]
                
class msdos_filesystem (os_filesystem):
    def longify (self, (path, stat_info)):
        return msdos_longify (path, stat_info)
        
        # A merged filesystem will let you plug other filesystems together.
        # We really need the equivalent of a 'mount' capability - this seems
        # to be the most general idea.  So you'd use a 'mount' method to place
        # another filesystem somewhere in the hierarchy.
        
        # Note: this is most likely how I will handle ~user directories
        # with the http server.
        
class merged_filesystem:
    def __init__ (self, *fsys):
        pass
        
        # this matches the output of NT's ftp server (when in
        # MSDOS mode) exactly.
        
def msdos_longify (file, stat_info):
    if stat.S_ISDIR (stat_info[stat.ST_MODE]):
        dir = '<DIR>'
    else:
        dir = '     '
    date = msdos_date (stat_info[stat.ST_MTIME])
    return '%s       %s %8d %s' % (
            date,
            dir,
            stat_info[stat.ST_SIZE],
            file
            )
    
def msdos_date (t):
    try:
        info = time.gmtime (t)
    except:
        info = time.gmtime (0)
        # year, month, day, hour, minute, second, ...
    if info[3] > 11:
        merid = 'PM'
        info[3] = info[3] - 12
    else:
        merid = 'AM'
    return '%02d-%02d-%02d  %02d:%02d%s' % (
            info[1],
            info[2],
            info[0]%100,
            info[3],
            info[4],
            merid
            )
    
months = ['Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun',
                  'Jul', 'Aug', 'Sep', 'Oct', 'Nov', 'Dec']

mode_table = {
        '0':'---',
        '1':'--x',
        '2':'-w-',
        '3':'-wx',
        '4':'r--',
        '5':'r-x',
        '6':'rw-',
        '7':'rwx'
        }

import time

def unix_longify (file, stat_info):
        # for now, only pay attention to the lower bits
    mode = ('%o' % stat_info[stat.ST_MODE])[-3:]
    mode = string.join (map (lambda x: mode_table[x], mode), '')
    if stat.S_ISDIR (stat_info[stat.ST_MODE]):
        dirchar = 'd'
    else:
        dirchar = '-'
    date = ls_date (long(time.time()), stat_info[stat.ST_MTIME])
    user = str(stat_info[stat.ST_UID].replace(' ','_'))
    group= str(stat_info[stat.ST_GID].replace(' ','_'))
    if user=='System_Processes': user='Sysproc'
    if group=='System_Processes': group='Sysproc'

    return '%s%s %3d %-8s %-8s %8d %s %s' % (
            dirchar,
            mode,
            stat_info[stat.ST_NLINK],
            user,
            group,
            stat_info[stat.ST_SIZE],
            date,
            file
            )
    
    # Emulate the unix 'ls' command's date field.
    # it has two formats - if the date is more than 180
    # days in the past, then it's like this:
    # Oct 19  1995
    # otherwise, it looks like this:
    # Oct 19 17:33
    
def ls_date (now, t):
    try:
        info = time.gmtime (t)
    except:
        info = time.gmtime (0)
        # 15,600,000 == 86,400 * 180
    if (now - t) > 15600000:
        return '%s %2d  %d' % (
                months[info[1]-1],
                info[2],
                info[0]
                )
    else:
        return '%s %2d %02d:%02d' % (
                months[info[1]-1],
                info[2],
                info[3],
                info[4]
                )
        
        # ===========================================================================
        # Producers
        # ===========================================================================
        
class list_producer:
    def __init__ (self, file_list, long, longify):
        self.file_list = file_list
        self.long = long
        self.longify = longify
        self.done = 0
        
    def ready (self):
        if len(self.file_list):
            return 1
        else:
            if not self.done:
                self.done = 1
            return 0
        return (len(self.file_list) > 0)
        
        # this should do a pushd/popd
    def more (self):
        if not self.file_list:
            return ''
        else:
                # do a few at a time
            bunch = self.file_list[:50]
            if self.long:
                bunch = map (self.longify, bunch)
            self.file_list = self.file_list[50:]
            return string.joinfields (bunch, '\r\n') + '\r\n'
            
www.java2java.com | Contact Us
Copyright 2009 - 12 Demo Source and Support. All rights reserved.
All other trademarks are property of their respective owners.