Python实时检测文件及文件夹变动
比较流行的是通过watchdog,一个例子:
import time
import logging
from watchdog.observers import Observer
from watchdog.events import LoggingEventHandler
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
path = "C:/data/"
# 内置的LoggingEventHandler
event_handler = LoggingEventHandler()
# 观察者
observer = Observer()
# recursive:True 递归的检测文件夹下所有文件变化。
observer.schedule(event_handler, path, recursive=True)
# 观察线程,非阻塞式的。
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
当创建一个文件夹,创建一个文件,或者删除一个文件夹,对文件内容写操作,都会触发事件:
也可以自己定义一个事件监听器,在里面监听文件/目录的增删改读写状态变化:
import datetime
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
class MyEventHandler(FileSystemEventHandler):
def __init__(self):
FileSystemEventHandler.__init__(self)
def on_any_event(self, event):
print("-----")
print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'))
# 移动
def on_moved(self, event):
if event.is_directory:
print("目录 moved:{src_path} -> {dest_path}".format(src_path=event.src_path, dest_path=event.dest_path))
else:
print("文件 moved:{src_path} -> {dest_path}".format(src_path=event.src_path, dest_path=event.dest_path))
# 新建
def on_created(self, event):
if event.is_directory:
print("目录 created:{file_path}".format(file_path=event.src_path))
else:
print("文件 created:{file_path}".format(file_path=event.src_path))
# 删除
def on_deleted(self, event):
if event.is_directory:
print("目录 deleted:{file_path}".format(file_path=event.src_path))
else:
print("文件 deleted:{file_path}".format(file_path=event.src_path))
# 修改
def on_modified(self, event):
if event.is_directory:
print("目录 modified:{file_path}".format(file_path=event.src_path))
else:
print("文件 modified:{file_path}".format(file_path=event.src_path))
if __name__ == '__main__':
path = "C:/data/"
myEventHandler = MyEventHandler()
# 观察者
observer = Observer()
# recursive:True 递归的检测文件夹下所有文件变化。
observer.schedule(myEventHandler, path, recursive=True)
# 观察线程,非阻塞式的。
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()