import psutil import time import os import socket import pymysql import platform from datetime import datetime from config import mysql_config # Try GPU try: import pynvml pynvml.nvmlInit() GPU_AVAILABLE = True except Exception: GPU_AVAILABLE = False # 雪花算法ID生成器 class SnowflakeIdGenerator: def __init__(self, datacenter_id=1, worker_id=1): self.datacenter_id = datacenter_id self.worker_id = worker_id self.sequence = 0 self.last_timestamp = -1 # 各部分位数 self.datacenter_id_bits = 5 self.worker_id_bits = 5 self.sequence_bits = 12 # 最大值 self.max_datacenter_id = -1 ^ (-1 << self.datacenter_id_bits) self.max_worker_id = -1 ^ (-1 << self.worker_id_bits) self.max_sequence = -1 ^ (-1 << self.sequence_bits) # 位移 self.worker_id_shift = self.sequence_bits self.datacenter_id_shift = self.sequence_bits + self.worker_id_bits self.timestamp_shift = self.sequence_bits + self.worker_id_bits + self.datacenter_id_bits # 起始时间戳 (2020-01-01) self.epoch = 1577836800000 def _current_millis(self): return int(time.time() * 1000) def _wait_next_millis(self, last_timestamp): timestamp = self._current_millis() while timestamp <= last_timestamp: timestamp = self._current_millis() return timestamp def generate_id(self): timestamp = self._current_millis() if timestamp < self.last_timestamp: raise Exception("Clock moved backwards!") if timestamp == self.last_timestamp: self.sequence = (self.sequence + 1) & self.max_sequence if self.sequence == 0: timestamp = self._wait_next_millis(self.last_timestamp) else: self.sequence = 0 self.last_timestamp = timestamp return ((timestamp - self.epoch) << self.timestamp_shift) | \ (self.datacenter_id << self.datacenter_id_shift) | \ (self.worker_id << self.worker_id_shift) | \ self.sequence # 全局ID生成器 id_generator = SnowflakeIdGenerator() def mb(v): return round(v / 1024 / 1024, 2) def cpu_info(): return { "usage": psutil.cpu_percent(interval=1), "cores": psutil.cpu_count(logical=True), "load": os.getloadavg() } def memory_info(): m = psutil.virtual_memory() return { "used": mb(m.used), "total": mb(m.total), "percent": m.percent } def disk_info(prev=None, interval=1): now = psutil.disk_io_counters() if prev is None: return now, {"read_mb": 0, "write_mb": 0} read_speed = (now.read_bytes - prev.read_bytes) / interval / 1024 / 1024 write_speed = (now.write_bytes - prev.write_bytes) / interval / 1024 / 1024 return now, { "read_mb": round(read_speed, 2), "write_mb": round(write_speed, 2) } def get_disk_usage(): """获取磁盘使用情况""" disk = psutil.disk_usage('/') return { "total": round(disk.total / 1024 / 1024 / 1024, 2), # GB "used": round(disk.used / 1024 / 1024 / 1024, 2), # GB "free": round(disk.free / 1024 / 1024 / 1024, 2), # GB "percent": disk.percent } def disk_io_info(prev=None, interval=1): """获取磁盘IO信息,包括IOPS""" now = psutil.disk_io_counters() if prev is None: return now, { "read_mb": 0, "write_mb": 0, "read_iops": 0, "write_iops": 0 } read_speed = (now.read_bytes - prev.read_bytes) / interval / 1024 / 1024 write_speed = (now.write_bytes - prev.write_bytes) / interval / 1024 / 1024 read_iops = int((now.read_count - prev.read_count) / interval) write_iops = int((now.write_count - prev.write_count) / interval) return now, { "read_mb": round(read_speed, 2), "write_mb": round(write_speed, 2), "read_iops": read_iops, "write_iops": write_iops } def network_speed(prev, interval): now = psutil.net_io_counters() up = (now.bytes_sent - prev.bytes_sent) / interval down = (now.bytes_recv - prev.bytes_recv) / interval return now, round(up / 1024 / 1024, 2), round(down / 1024 / 1024, 2) def gpu_info(): if not GPU_AVAILABLE: return [] gpus = [] for i in range(pynvml.nvmlDeviceGetCount()): h = pynvml.nvmlDeviceGetHandleByIndex(i) mem = pynvml.nvmlDeviceGetMemoryInfo(h) util = pynvml.nvmlDeviceGetUtilizationRates(h) temp = pynvml.nvmlDeviceGetTemperature( h, pynvml.NVML_TEMPERATURE_GPU ) gpus.append({ "id": i, "util": util.gpu, "mem_used": mb(mem.used), "mem_total": mb(mem.total), "temp": temp }) return gpus def get_local_ip(): """获取本机内网IP地址和网卡名称""" try: # 创建一个UDP socket s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # 连接到外部地址(不需要真正连接,只是为了获取本地IP) s.connect(("8.8.8.8", 80)) local_ip = s.getsockname()[0] s.close() # 获取网卡名称 net_if_addrs = psutil.net_if_addrs() interface_name = "Unknown" for interface, addrs in net_if_addrs.items(): for addr in addrs: if addr.family == socket.AF_INET and addr.address == local_ip: interface_name = interface break if interface_name != "Unknown": break return interface_name, local_ip except Exception: # 如果上述方法失败,尝试获取hostname对应的IP try: hostname = socket.gethostname() local_ip = socket.gethostbyname(hostname) # 尝试获取网卡名称 net_if_addrs = psutil.net_if_addrs() interface_name = "Unknown" for interface, addrs in net_if_addrs.items(): for addr in addrs: if addr.family == socket.AF_INET and addr.address == local_ip: interface_name = interface break if interface_name != "Unknown": break return interface_name, local_ip except Exception: return "Unknown", "Unable to get IP" def get_db_connection(): """获取数据库连接""" return pymysql.connect(**mysql_config) def get_server_base_info(): """获取服务器基础信息""" try: # 获取主机名 server_name = socket.gethostname() if not server_name: server_name = "服务" # 获取操作系统信息 os_type = platform.system() # Linux, Windows, Darwin os_version = platform.release() # 内核版本 # 尝试获取更详细的系统信息 try: if os_type == "Linux": # 尝试读取 /etc/os-release if os.path.exists("/etc/os-release"): with open("/etc/os-release", "r") as f: for line in f: if line.startswith("PRETTY_NAME="): os_version = line.split("=")[1].strip().strip('"') break except: pass # 服务器类型默认为物理机 server_type = "物理机" # 尝试判断是否为虚拟机或容器 try: # 检查是否在Docker容器中 if os.path.exists("/.dockerenv"): server_type = "容器" # 检查是否为虚拟机 elif os.path.exists("/sys/class/dmi/id/product_name"): with open("/sys/class/dmi/id/product_name", "r") as f: product = f.read().lower() if "virtual" in product or "vmware" in product or "kvm" in product: server_type = "虚拟机" except: pass return { "server_name": server_name, "server_type": server_type, "os_type": os_type, "os_version": os_version } except Exception as e: print(f"Error getting server base info: {e}") return { "server_name": "Unknown", "server_type": "物理机", "os_type": "Linux", "os_version": "Unknown" } def get_or_create_server_id(server_ip): """获取或创建服务器ID""" try: conn = get_db_connection() cursor = conn.cursor() # 查询是否存在该IP的服务器记录 cursor.execute("SELECT id, server_name FROM server_base_info WHERE server_ip = %s", (server_ip,)) result = cursor.fetchone() if result: # 已存在,返回ID server_id = result[0] server_name = result[1] else: # 不存在,创建新记录 server_id = id_generator.generate_id() base_info = get_server_base_info() server_name = base_info['server_name'] cursor.execute( """INSERT INTO server_base_info (id, server_ip, server_name, server_type, os_type, os_version) VALUES (%s, %s, %s, %s, %s, %s)""", (server_id, server_ip, base_info['server_name'], base_info['server_type'], base_info['os_type'], base_info['os_version']) ) conn.commit() print(f"Created new server base info: {base_info['server_name']} ({server_ip})") cursor.close() conn.close() return server_id, server_name except Exception as e: print(f"Error getting or creating server ID: {e}") return None def save_to_database(server_ip, interface_name, cpu, mem, disk_usage, disk_io, up, down, gpus, save_disk_usage=True): """保存监控数据到数据库 Args: save_disk_usage: 是否保存磁盘使用数据,默认True。设为False可跳过磁盘使用数据的保存 """ conn = None try: from datetime import datetime # 格式化为字符串 current_time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 获取或创建服务器ID server_id, server_name = get_or_create_server_id(server_ip) if server_id is None: return False conn = get_db_connection() cursor = conn.cursor() # 插入CPU数据 cpu_id = id_generator.generate_id() cursor.execute( "INSERT INTO server_cpu_monitor (id, server_id, server_ip, cpu_usage, cpu_core, server_name, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s)", (cpu_id, server_id, server_ip, cpu['usage'], cpu['cores'], server_name, current_time_str) ) # 插入内存数据 mem_id = id_generator.generate_id() mem_free = mem['total'] - mem['used'] cursor.execute( "INSERT INTO server_memory_monitor (id, server_id, server_ip, mem_total, mem_used, mem_free, mem_usage, server_name, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)", (mem_id, server_id, server_ip, int(mem['total']), int(mem['used']), int(mem_free), mem['percent'], server_name, current_time_str) ) # 插入磁盘使用数据 if save_disk_usage: disk_id = id_generator.generate_id() cursor.execute( "INSERT INTO server_disk_monitor (id, server_id, server_ip, disk_total, disk_used, disk_free, disk_usage, server_name, disk_path, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", (disk_id, server_id, server_ip, int(disk_usage['total']), int(disk_usage['used']), int(disk_usage['free']), disk_usage['percent'], server_name, "/opt", current_time_str) ) # 插入磁盘IO数据 io_id = id_generator.generate_id() cursor.execute( "INSERT INTO server_io_monitor (id, server_id, server_ip, io_read_mb, io_write_mb, io_read_iops, io_write_iops, server_name, disk_path, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", (io_id, server_id, server_ip, disk_io['read_mb'], disk_io['write_mb'], disk_io['read_iops'], disk_io['write_iops'], server_name, "/opt", current_time_str) ) # 插入网络上行数据 upload_id = id_generator.generate_id() cursor.execute( "INSERT INTO server_net_upload_monitor (id, server_id, server_ip, net_card, upload_speed, server_name, upload_packets, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)", (upload_id, server_id, server_ip, interface_name, up, server_name, 0, current_time_str) ) # 插入网络下行数据 download_id = id_generator.generate_id() cursor.execute( "INSERT INTO server_net_download_monitor (id, server_id, server_ip, net_card, download_speed, server_name, download_packets, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)", (download_id, server_id, server_ip, interface_name, down, server_name, 0, current_time_str) ) # 插入GPU数据 gpu_count = 0 for g in gpus: try: gpu_id = id_generator.generate_id() cursor.execute( "INSERT INTO server_gpu_monitor (id, server_id, server_ip, gpu_card_no, gpu_usage, gpu_mem_total, gpu_mem_used, gpu_temp, server_name, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)", (gpu_id, server_id, server_ip, g['id'], g['util'], int(g['mem_total']), int(g['mem_used']), g['temp'], server_name, current_time_str) ) gpu_count += 1 print(f"DEBUG: GPU{g['id']} data inserted successfully (ID: {gpu_id})") except Exception as gpu_error: print(f"Error inserting GPU{g['id']} data: {gpu_error}") print(f"GPU data: {g}") conn.commit() print(f"DEBUG: Transaction committed. {gpu_count} GPU records saved.") cursor.close() conn.close() # 调试信息:显示GPU数据保存情况 if gpus and gpu_count == 0: print(f"WARNING: {len(gpus)} GPUs detected but 0 saved to database!") elif gpus: print(f"INFO: Successfully saved {gpu_count}/{len(gpus)} GPU records to database") return True except Exception as e: print(f"Database error: {e}") import traceback traceback.print_exc() if conn: try: conn.rollback() conn.close() except: pass return False def cleanup_old_data(days=7): """清理指定天数之前的监控数据 Args: days: 保留最近多少天的数据,默认7天 """ try: from datetime import datetime, timedelta conn = get_db_connection() cursor = conn.cursor() # 计算截止日期 cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S') # 需要清理的表列表 tables = [ 'server_cpu_monitor', 'server_memory_monitor', 'server_disk_monitor', 'server_io_monitor', 'server_net_upload_monitor', 'server_net_download_monitor', 'server_gpu_monitor' ] # 记录清理数据数量(便于调试) total_deleted = 0 for table in tables: try: # 找到创建时间小于截止日期的数据并删除 cursor.execute(f"DELETE FROM {table} WHERE create_time < %s", (cutoff_date,)) deleted = cursor.rowcount total_deleted += deleted print(f"Cleaned {deleted} records from {table}") except Exception as e: print(f"Error cleaning {table}: {e}") # 关闭事务 conn.commit() cursor.close() conn.close() print(f"Total cleaned: {total_deleted} records (older than {days} days)") return True except Exception as e: print(f"Cleanup error: {e}") return False def monitor(interval=5, disk_usage_interval=3600, cleanup_interval=86400): """系统资源监控主循环 Args: interval: 常规监控数据采集间隔(秒),默认5秒 disk_usage_interval: 磁盘使用数据更新间隔(秒),默认3600秒(1小时) cleanup_interval: 数据清理间隔(秒),默认86400秒(24小时) """ interface_name, local_ip = get_local_ip() print("System resource monitor started (Ctrl+C to exit)") print(f"Network Interface: {interface_name} | Local IP: {local_ip}") print(f"Recording interval: {interval} seconds") print(f"Disk usage update interval: {disk_usage_interval} seconds ({disk_usage_interval/3600:.1f} hours)") print(f"Data cleanup interval: {cleanup_interval} seconds ({cleanup_interval/86400:.1f} days)\n") prev_net = psutil.net_io_counters() prev_disk = psutil.disk_io_counters() # 记录上次更新磁盘使用数据的时间 last_disk_usage_update = time.time() # 记录上次清理数据的时间 last_cleanup_time = time.time() while True: time.sleep(interval) current_time = time.time() ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") cpu = cpu_info() mem = memory_info() disk_usage = get_disk_usage() prev_disk, disk_io = disk_io_info(prev_disk, interval) prev_net, up, down = network_speed(prev_net, interval) gpus = gpu_info() # 判断是否需要更新磁盘使用数据 should_save_disk_usage = (current_time - last_disk_usage_update) >= disk_usage_interval # 保存到数据库 db_success = save_to_database( local_ip, interface_name, cpu, mem, disk_usage, disk_io, up, down, gpus, save_disk_usage=should_save_disk_usage ) # 更新磁盘使用数据的时间戳 if should_save_disk_usage and db_success: last_disk_usage_update = current_time disk_status = "Disk usage saved" else: disk_status = "Disk usage skipped" db_status = f"Saved to DB ({disk_status})" if db_success else "DB Error" # 判断是否需要清理旧数据 if (current_time - last_cleanup_time) >= cleanup_interval: print("\n" + "=" * 80) print("Starting data cleanup...") cleanup_success = cleanup_old_data(days=7) if cleanup_success: last_cleanup_time = current_time print("Data cleanup completed") else: print("Data cleanup failed") print("=" * 80 + "\n") print("=" * 80) print(f"Time: {ts} | Interface: {interface_name} | IP: {local_ip} | {db_status}") print(f"CPU: {cpu['usage']}% | Cores: {cpu['cores']} | Load: {cpu['load']}") print(f"Memory: {mem['used']}MB / {mem['total']}MB ({mem['percent']}%)") print(f"Disk: {disk_usage['used']}GB / {disk_usage['total']}GB ({disk_usage['percent']}%)") print(f"Disk IO: Read {disk_io['read_mb']}MB/s | Write {disk_io['write_mb']}MB/s | " f"IOPS: R={disk_io['read_iops']} W={disk_io['write_iops']}") print(f"Network: Up {up} MB/s | Down {down} MB/s") if gpus: for g in gpus: print( f"GPU{g['id']}: {g['util']}% | " f"{g['mem_used']}/{g['mem_total']}MB | " f"{g['temp']}°C" ) else: print("GPU: Not available") print("=" * 80) if __name__ == "__main__": monitor()