Module directory_observer.directory_observer

Functions

def destroy()
def dummyHandler(signum, frame)
def eventPendingTest(notifier)
def getMask(mask, expstoadd=None)
def init()
def initRedisTimeout(timeout)
def main()
def onIdle()
def onRedisTimeout(signum, frame)
def onTerminate(signum, frame)
def resetRedisTimeout()
def run()
def startMonitoring()
def stopMonitoring(notifier)

Classes

class Monitor (pevent=None, **kargs)

Process events objects, can be specialized via subclassing, thus its behavior can be overriden:

Note: you should not override init in your subclass instead define a my_init() method, this method will be called automatically from the constructor of this class with its optionals parameters.

  1. Provide specialized individual methods, e.g. process_IN_DELETE for processing a precise type of event (e.g. IN_DELETE in this case).
  2. Or/and provide methods for processing events by 'family', e.g. process_IN_CLOSE method will process both IN_CLOSE_WRITE and IN_CLOSE_NOWRITE events (if process_IN_CLOSE_WRITE and process_IN_CLOSE_NOWRITE aren't defined though).
  3. Or/and override process_default for catching and processing all the remaining types of events.

Enable chaining of ProcessEvent instances.

@param pevent: Optional callable object, will be called on event processing (before self). @type pevent: callable @param kargs: This constructor is implemented as a template method delegating its optionals keyworded arguments to the method my_init(). @type kargs: dict

Expand source code
class Monitor(pyinotify.ProcessEvent):
    redis = None
    server = None
    port = None
    wm = None

    ##
    # Initialization of the instance
    #
    # Classes derived from ProcessEvent must not overload __init __ ()
    # But implement my_init () which will be automatically called
    # by __init __ () of the base class.
    # Attention my_init only supports named parameters.

    def my_init(self, **kargs):
        self.redisqueue = None
        self.redistimeout = DEFAULT_REDIS_TIMEOUT
        self.lenmaxqueue = -1

        self.dir = None

        self.only = None
        self.ignore = None

        self.lastpolling = 0
        self.tpolling = 0
        self.pollinglimit = DEFAULT_POLLING_LIMIT

        self.wd = -1

        self.statshome = None

    ##
    # Callback called by inotify when a file is created
    #
    # @param self: Monitor instance
    # @param event: event inotify.

    def process_IN_CREATE(self, event):
        if self.wd >= 0 and self.filefilter(event.name):
            self.postRedis(event.name, 'created')

    ##
    # Callback called by inotify when a file has been renamed
    # to (or moved to) the "scanned" directory
    #
    # @param self: instance of Monitor
    # @param event: event inotify.

    def process_IN_MOVED_TO(self, event):
        if self.wd >= 0 and self.filefilter(event.name):
            # There is no distinction between creation and rename
            self.postRedis(event.name, 'created')

    ##
    # Callback called by inotify when the scrute directory has been mved/rnamed
    #
    # @param self: instance of Monitor
    # @param event: event inotify.

    def process_IN_MOVE_SELF(self, event):
        if self.wd >= 0:
            logger.warn('Directory [%s] was moved' % self.dir)
            # Mandatory: The watch (in the inotify sense) is not destroyed
            # in this case, and becomes incoherent
            self.stopWatch()

    ##
    # Callback called by inotify when the scrutinized directory was deleted
    #
    # @param self: instance of Monitor
    # @param event: event inotify.

    def process_IN_DELETE_SELF(self, event):
        logger.warn('Directory [%s] was deleted' % self.dir)
        logger.warn('Stop Scanning [%s]' % self.dir)
        # Not stopWatch (The watch (in the inotify sense) is already destroyed)
        # And stopWatch on an invalid wd generates a bad error message
        self.wd = -1

    def startWatch(self):
        logger.info('Start Scanning [%s]' % self.dir)
        mask = pyinotify.IN_CREATE | pyinotify.IN_MOVED_TO | \
            pyinotify.IN_MOVE_SELF | pyinotify.IN_DELETE_SELF
        wds = Monitor.wm.add_watch(self.dir,
                                   mask,
                                   proc_fun=self)
        self.wd = wds[self.dir]
        if self.wd < 0:
            return False
        else:
            return True

    def stopWatch(self):
        if Monitor.wm and self.wd >= 0:
            logger.info('Stop Scanning [%s]' % self.dir)
            Monitor.wm.rm_watch(self.wd)
        self.wd = -1

    ##
    # Filter files according to the criteria of a monitor provided.
    #
    # The file name must not be the name of a directory.
    # If the monitor has an 'only' section (self.only is not None), the file
    # name must meet this criteria. If a file name does not meet this
    # criteria, False will be returned.
    # If the monitor has an 'ignore' section (self.ignore is not None), the
    # file name must not meet this criteria. If the file name meets this
    # criteria, False will be returned.
    #
    # @param self: instance of Monitor
    # @param filename: The name of the file to test.
    #
    # @return: True If the file meets the filter criteria.
    #          False If not.
    def filefilter(self, filename):
        try:
            path = os.path.join(self.dir, filename)
            if self.only is not None:
                if not self.only.match(filename):
                    return False
            if self.ignore is not None:
                if self.ignore.match(filename):
                    return False
            if os.path.isdir(path):
                return False
            return True
        except Exception as err:
            logger.error(str(err))
            return False

    ##
    # Polling function associated with monitor
    #
    # Currently only a listing of the associated directory is
    # carried out.
    #
    # @param self: instance of monitor
    #
    def polling(self):
        self.lastpolling = time.time()
        if self.wd < 0:
            return
        self.listdir()

    ##
    # Directory listing associated with the monitor
    #
    # Files filtered according to the monitor criteria are listed in
    # the chronological order of their last modification date.
    #
    # @param self: instance of monitor
    #

    def listdir(self):
        def dtFic(fic):
            try:
                return os.stat(os.path.join(self.dir, fic)).st_mtime
            except Exception:
                return 0
        try:
            fics = [(fic, dtFic(fic)) for fic in os.listdir(self.dir)
                    if self.filefilter(fic)]
            if not fics:
                return
            sfics = sorted(fics, key=lambda f: f[1])
            nb = self.pollinglimit
            if nb > 0:
                queue_length = self.queueLength()
                if queue_length is not None:
                    nb -= queue_length
            for i, fic in enumerate(sfics):
                if self.pollinglimit > 0 and i >= nb:
                    logger.warn("%s: Aborted periodic polling. Queue size "
                                "limit reached (%d)" %
                                (self.dir, self.pollinglimit))
                    break
                self.postRedis(fic[0], 'exists')
        except Exception as err:
            logger.warn(str(err))

    ##
    # Writing an Event in the REDIS queue
    #
    # For files that meet the monitor criteria, a JSON-encoded message describ
    # the event is written to the designated REDIS queue. The designated REDIS
    # counter is also incremented.
    #
    # @param self: instance of monitor
    # @param filename: Name of file.
    # @param evid: Event ID ( string )  ( "created" or "exists" ).

    def postRedis(self, filename, evid):
        try:
            redisError = False

            if not stopRequest:
                # Writing JSON message in the Redis queue
                try:
                    infosredis = {
                        'directory': self.dir,
                        'filename': filename,
                        'event': evid,
                        'event_timestamp': time.time()
                    }

                    initRedisTimeout(self.redistimeout)
                    len = self.redis.lpush(self.redisqueue,
                                           json.dumps(infosredis))
                    if len is not None and len > self.lenmaxqueue:
                        self.lenmaxqueue = len

                except redis.ConnectionError as err:
                    if not stopRequest:
                        logger.warn(str(err))
                        logger.warn("Failed to write on REDIS queue '%s' "
                                    "%s [%s]" % (self.redisqueue,
                                                 os.path.join(self.dir,
                                                              filename),
                                                 evid))
                    redisError = True
                finally:
                    resetRedisTimeout()

            if not redisError:
                logger.info("%s added to redis queue %s" %
                            (os.path.join(self.dir, filename),
                             self.redisqueue))

        except Exception as ex:
            logger.critical(str(ex))

    def queueLength(self):
        try:
            tmp = self.redis.llen(self.redisqueue)
            return int(tmp)
        except Exception:
            logger.debug("failed to llen %s" % self.redisqueue)
        return None

    def queueMaxLength(self, reset=True):
        if self.lenmaxqueue < 0:
            len = self.queueLength()
            if len is not None:
                self.lenmaxqueue = len
        res = self.lenmaxqueue
        if reset:
            self.lenmaxqueue = -1
        return res

Ancestors

  • pyinotify.ProcessEvent
  • pyinotify._ProcessEvent

Class variables

var port
var redis
var server
var wm

Methods

def filefilter(self, filename)
def listdir(self)
def my_init(self, **kargs)

This method is called from ProcessEvent.init(). This method is empty here and must be redefined to be useful. In effect, if you need to specifically initialize your subclass' instance then you just have to override this method in your subclass. Then all the keyworded arguments passed to ProcessEvent.init() will be transmitted as parameters to this method. Beware you MUST pass keyword arguments though.

@param kargs: optional delegated arguments from init(). @type kargs: dict

def polling(self)
def postRedis(self, filename, evid)
def process_IN_CREATE(self, event)
def process_IN_DELETE_SELF(self, event)
def process_IN_MOVED_TO(self, event)
def process_IN_MOVE_SELF(self, event)
def queueLength(self)
def queueMaxLength(self, reset=True)
def startWatch(self)
def stopWatch(self)