| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573 |
- import psutil
- import time
- import os
- import socket
- import pymysql
- import platform
- from datetime import datetime
- from config import mysql_config
- # Try GPU
- try:
- import pynvml
- pynvml.nvmlInit()
- GPU_AVAILABLE = True
- except Exception:
- GPU_AVAILABLE = False
- # 雪花算法ID生成器
- class SnowflakeIdGenerator:
- def __init__(self, datacenter_id=1, worker_id=1):
- self.datacenter_id = datacenter_id
- self.worker_id = worker_id
- self.sequence = 0
- self.last_timestamp = -1
-
- # 各部分位数
- self.datacenter_id_bits = 5
- self.worker_id_bits = 5
- self.sequence_bits = 12
-
- # 最大值
- self.max_datacenter_id = -1 ^ (-1 << self.datacenter_id_bits)
- self.max_worker_id = -1 ^ (-1 << self.worker_id_bits)
- self.max_sequence = -1 ^ (-1 << self.sequence_bits)
-
- # 位移
- self.worker_id_shift = self.sequence_bits
- self.datacenter_id_shift = self.sequence_bits + self.worker_id_bits
- self.timestamp_shift = self.sequence_bits + self.worker_id_bits + self.datacenter_id_bits
-
- # 起始时间戳 (2020-01-01)
- self.epoch = 1577836800000
-
- def _current_millis(self):
- return int(time.time() * 1000)
-
- def _wait_next_millis(self, last_timestamp):
- timestamp = self._current_millis()
- while timestamp <= last_timestamp:
- timestamp = self._current_millis()
- return timestamp
-
- def generate_id(self):
- timestamp = self._current_millis()
-
- if timestamp < self.last_timestamp:
- raise Exception("Clock moved backwards!")
-
- if timestamp == self.last_timestamp:
- self.sequence = (self.sequence + 1) & self.max_sequence
- if self.sequence == 0:
- timestamp = self._wait_next_millis(self.last_timestamp)
- else:
- self.sequence = 0
-
- self.last_timestamp = timestamp
-
- return ((timestamp - self.epoch) << self.timestamp_shift) | \
- (self.datacenter_id << self.datacenter_id_shift) | \
- (self.worker_id << self.worker_id_shift) | \
- self.sequence
- # 全局ID生成器
- id_generator = SnowflakeIdGenerator()
- def mb(v):
- return round(v / 1024 / 1024, 2)
- def cpu_info():
- return {
- "usage": psutil.cpu_percent(interval=1),
- "cores": psutil.cpu_count(logical=True),
- "load": os.getloadavg()
- }
- def memory_info():
- m = psutil.virtual_memory()
- return {
- "used": mb(m.used),
- "total": mb(m.total),
- "percent": m.percent
- }
- def disk_info(prev=None, interval=1):
- now = psutil.disk_io_counters()
-
- if prev is None:
- return now, {"read_mb": 0, "write_mb": 0}
-
- read_speed = (now.read_bytes - prev.read_bytes) / interval / 1024 / 1024
- write_speed = (now.write_bytes - prev.write_bytes) / interval / 1024 / 1024
-
- return now, {
- "read_mb": round(read_speed, 2),
- "write_mb": round(write_speed, 2)
- }
- def get_disk_usage():
- """获取磁盘使用情况"""
- disk = psutil.disk_usage('/')
- return {
- "total": round(disk.total / 1024 / 1024 / 1024, 2), # GB
- "used": round(disk.used / 1024 / 1024 / 1024, 2), # GB
- "free": round(disk.free / 1024 / 1024 / 1024, 2), # GB
- "percent": disk.percent
- }
- def disk_io_info(prev=None, interval=1):
- """获取磁盘IO信息,包括IOPS"""
- now = psutil.disk_io_counters()
-
- if prev is None:
- return now, {
- "read_mb": 0,
- "write_mb": 0,
- "read_iops": 0,
- "write_iops": 0
- }
-
- read_speed = (now.read_bytes - prev.read_bytes) / interval / 1024 / 1024
- write_speed = (now.write_bytes - prev.write_bytes) / interval / 1024 / 1024
- read_iops = int((now.read_count - prev.read_count) / interval)
- write_iops = int((now.write_count - prev.write_count) / interval)
-
- return now, {
- "read_mb": round(read_speed, 2),
- "write_mb": round(write_speed, 2),
- "read_iops": read_iops,
- "write_iops": write_iops
- }
- def network_speed(prev, interval):
- now = psutil.net_io_counters()
- up = (now.bytes_sent - prev.bytes_sent) / interval
- down = (now.bytes_recv - prev.bytes_recv) / interval
- return now, round(up / 1024 / 1024, 2), round(down / 1024 / 1024, 2)
- def gpu_info():
- if not GPU_AVAILABLE:
- return []
- gpus = []
- for i in range(pynvml.nvmlDeviceGetCount()):
- h = pynvml.nvmlDeviceGetHandleByIndex(i)
- mem = pynvml.nvmlDeviceGetMemoryInfo(h)
- util = pynvml.nvmlDeviceGetUtilizationRates(h)
- temp = pynvml.nvmlDeviceGetTemperature(
- h, pynvml.NVML_TEMPERATURE_GPU
- )
- gpus.append({
- "id": i,
- "util": util.gpu,
- "mem_used": mb(mem.used),
- "mem_total": mb(mem.total),
- "temp": temp
- })
- return gpus
- def get_local_ip():
- """获取本机内网IP地址和网卡名称"""
- try:
- # 创建一个UDP socket
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- # 连接到外部地址(不需要真正连接,只是为了获取本地IP)
- s.connect(("8.8.8.8", 80))
- local_ip = s.getsockname()[0]
- s.close()
-
- # 获取网卡名称
- net_if_addrs = psutil.net_if_addrs()
- interface_name = "Unknown"
-
- for interface, addrs in net_if_addrs.items():
- for addr in addrs:
- if addr.family == socket.AF_INET and addr.address == local_ip:
- interface_name = interface
- break
- if interface_name != "Unknown":
- break
-
- return interface_name, local_ip
- except Exception:
- # 如果上述方法失败,尝试获取hostname对应的IP
- try:
- hostname = socket.gethostname()
- local_ip = socket.gethostbyname(hostname)
-
- # 尝试获取网卡名称
- net_if_addrs = psutil.net_if_addrs()
- interface_name = "Unknown"
-
- for interface, addrs in net_if_addrs.items():
- for addr in addrs:
- if addr.family == socket.AF_INET and addr.address == local_ip:
- interface_name = interface
- break
- if interface_name != "Unknown":
- break
-
- return interface_name, local_ip
- except Exception:
- return "Unknown", "Unable to get IP"
- def get_db_connection():
- """获取数据库连接"""
- return pymysql.connect(**mysql_config)
- def get_server_base_info():
- """获取服务器基础信息"""
- try:
- # 获取主机名
- server_name = socket.gethostname()
- if not server_name:
- server_name = "服务"
-
- # 获取操作系统信息
- os_type = platform.system() # Linux, Windows, Darwin
- os_version = platform.release() # 内核版本
-
- # 尝试获取更详细的系统信息
- try:
- if os_type == "Linux":
- # 尝试读取 /etc/os-release
- if os.path.exists("/etc/os-release"):
- with open("/etc/os-release", "r") as f:
- for line in f:
- if line.startswith("PRETTY_NAME="):
- os_version = line.split("=")[1].strip().strip('"')
- break
- except:
- pass
-
- # 服务器类型默认为物理机
- server_type = "物理机"
-
- # 尝试判断是否为虚拟机或容器
- try:
- # 检查是否在Docker容器中
- if os.path.exists("/.dockerenv"):
- server_type = "容器"
- # 检查是否为虚拟机
- elif os.path.exists("/sys/class/dmi/id/product_name"):
- with open("/sys/class/dmi/id/product_name", "r") as f:
- product = f.read().lower()
- if "virtual" in product or "vmware" in product or "kvm" in product:
- server_type = "虚拟机"
- except:
- pass
-
- return {
- "server_name": server_name,
- "server_type": server_type,
- "os_type": os_type,
- "os_version": os_version
- }
- except Exception as e:
- print(f"Error getting server base info: {e}")
- return {
- "server_name": "Unknown",
- "server_type": "物理机",
- "os_type": "Linux",
- "os_version": "Unknown"
- }
- def get_or_create_server_id(server_ip):
- """获取或创建服务器ID"""
- try:
- conn = get_db_connection()
- cursor = conn.cursor()
-
- # 查询是否存在该IP的服务器记录
- cursor.execute("SELECT id, server_name FROM server_base_info WHERE server_ip = %s", (server_ip,))
- result = cursor.fetchone()
-
- if result:
- # 已存在,返回ID
- server_id = result[0]
- server_name = result[1]
- else:
- # 不存在,创建新记录
- server_id = id_generator.generate_id()
- base_info = get_server_base_info()
- server_name = base_info['server_name']
-
- cursor.execute(
- """INSERT INTO server_base_info
- (id, server_ip, server_name, server_type, os_type, os_version)
- VALUES (%s, %s, %s, %s, %s, %s)""",
- (server_id, server_ip, base_info['server_name'],
- base_info['server_type'], base_info['os_type'], base_info['os_version'])
- )
- conn.commit()
- print(f"Created new server base info: {base_info['server_name']} ({server_ip})")
-
- cursor.close()
- conn.close()
- return server_id, server_name
- except Exception as e:
- print(f"Error getting or creating server ID: {e}")
- return None
- def save_to_database(server_ip, interface_name, cpu, mem, disk_usage, disk_io, up, down, gpus, save_disk_usage=True):
- """保存监控数据到数据库
-
- Args:
- save_disk_usage: 是否保存磁盘使用数据,默认True。设为False可跳过磁盘使用数据的保存
- """
- conn = None
- try:
- from datetime import datetime
- # 格式化为字符串
- current_time_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- # 获取或创建服务器ID
- server_id, server_name = get_or_create_server_id(server_ip)
- if server_id is None:
- return False
-
- conn = get_db_connection()
- cursor = conn.cursor()
-
- # 插入CPU数据
- cpu_id = id_generator.generate_id()
- cursor.execute(
- "INSERT INTO server_cpu_monitor (id, server_id, server_ip, cpu_usage, cpu_core, server_name, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s)",
- (cpu_id, server_id, server_ip, cpu['usage'], cpu['cores'], server_name, current_time_str)
- )
-
- # 插入内存数据
- mem_id = id_generator.generate_id()
- mem_free = mem['total'] - mem['used']
- cursor.execute(
- "INSERT INTO server_memory_monitor (id, server_id, server_ip, mem_total, mem_used, mem_free, mem_usage, server_name, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)",
- (mem_id, server_id, server_ip, int(mem['total']), int(mem['used']), int(mem_free), mem['percent'], server_name, current_time_str)
- )
-
- # 插入磁盘使用数据
- if save_disk_usage:
- disk_id = id_generator.generate_id()
- cursor.execute(
- "INSERT INTO server_disk_monitor (id, server_id, server_ip, disk_total, disk_used, disk_free, disk_usage, server_name, disk_path, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
- (disk_id, server_id, server_ip, int(disk_usage['total']), int(disk_usage['used']), int(disk_usage['free']), disk_usage['percent'], server_name, "/opt", current_time_str)
- )
-
- # 插入磁盘IO数据
- io_id = id_generator.generate_id()
- cursor.execute(
- "INSERT INTO server_io_monitor (id, server_id, server_ip, io_read_mb, io_write_mb, io_read_iops, io_write_iops, server_name, disk_path, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
- (io_id, server_id, server_ip, disk_io['read_mb'], disk_io['write_mb'], disk_io['read_iops'], disk_io['write_iops'], server_name, "/opt", current_time_str)
- )
-
- # 插入网络上行数据
- upload_id = id_generator.generate_id()
- cursor.execute(
- "INSERT INTO server_net_upload_monitor (id, server_id, server_ip, net_card, upload_speed, server_name, upload_packets, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)",
- (upload_id, server_id, server_ip, interface_name, up, server_name, 0, current_time_str)
- )
-
- # 插入网络下行数据
- download_id = id_generator.generate_id()
- cursor.execute(
- "INSERT INTO server_net_download_monitor (id, server_id, server_ip, net_card, download_speed, server_name, download_packets, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)",
- (download_id, server_id, server_ip, interface_name, down, server_name, 0, current_time_str)
- )
-
- # 插入GPU数据
- gpu_count = 0
- for g in gpus:
- try:
- gpu_id = id_generator.generate_id()
- cursor.execute(
- "INSERT INTO server_gpu_monitor (id, server_id, server_ip, gpu_card_no, gpu_usage, gpu_mem_total, gpu_mem_used, gpu_temp, server_name, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
- (gpu_id, server_id, server_ip, g['id'], g['util'], int(g['mem_total']), int(g['mem_used']), g['temp'], server_name, current_time_str)
- )
- gpu_count += 1
- print(f"DEBUG: GPU{g['id']} data inserted successfully (ID: {gpu_id})")
- except Exception as gpu_error:
- print(f"Error inserting GPU{g['id']} data: {gpu_error}")
- print(f"GPU data: {g}")
-
- conn.commit()
- print(f"DEBUG: Transaction committed. {gpu_count} GPU records saved.")
- cursor.close()
- conn.close()
-
- # 调试信息:显示GPU数据保存情况
- if gpus and gpu_count == 0:
- print(f"WARNING: {len(gpus)} GPUs detected but 0 saved to database!")
- elif gpus:
- print(f"INFO: Successfully saved {gpu_count}/{len(gpus)} GPU records to database")
-
- return True
- except Exception as e:
- print(f"Database error: {e}")
- import traceback
- traceback.print_exc()
- if conn:
- try:
- conn.rollback()
- conn.close()
- except:
- pass
- return False
- def cleanup_old_data(days=7):
- """清理指定天数之前的监控数据
-
- Args:
- days: 保留最近多少天的数据,默认7天
- """
- try:
- from datetime import datetime, timedelta
-
- conn = get_db_connection()
- cursor = conn.cursor()
-
- # 计算截止日期
- cutoff_date = (datetime.now() - timedelta(days=days)).strftime('%Y-%m-%d %H:%M:%S')
-
- # 需要清理的表列表
- tables = [
- 'server_cpu_monitor',
- 'server_memory_monitor',
- 'server_disk_monitor',
- 'server_io_monitor',
- 'server_net_upload_monitor',
- 'server_net_download_monitor',
- 'server_gpu_monitor'
- ]
-
- # 记录清理数据数量(便于调试)
- total_deleted = 0
-
- for table in tables:
- try:
- # 找到创建时间小于截止日期的数据并删除
- cursor.execute(f"DELETE FROM {table} WHERE create_time < %s", (cutoff_date,))
- deleted = cursor.rowcount
- total_deleted += deleted
- print(f"Cleaned {deleted} records from {table}")
- except Exception as e:
- print(f"Error cleaning {table}: {e}")
-
- # 关闭事务
- conn.commit()
- cursor.close()
- conn.close()
-
- print(f"Total cleaned: {total_deleted} records (older than {days} days)")
- return True
- except Exception as e:
- print(f"Cleanup error: {e}")
- return False
- def monitor(interval=5, disk_usage_interval=3600, cleanup_interval=86400):
- """系统资源监控主循环
-
- Args:
- interval: 常规监控数据采集间隔(秒),默认5秒
- disk_usage_interval: 磁盘使用数据更新间隔(秒),默认3600秒(1小时)
- cleanup_interval: 数据清理间隔(秒),默认86400秒(24小时)
- """
- interface_name, local_ip = get_local_ip()
- print("System resource monitor started (Ctrl+C to exit)")
- print(f"Network Interface: {interface_name} | Local IP: {local_ip}")
- print(f"Recording interval: {interval} seconds")
- print(f"Disk usage update interval: {disk_usage_interval} seconds ({disk_usage_interval/3600:.1f} hours)")
- print(f"Data cleanup interval: {cleanup_interval} seconds ({cleanup_interval/86400:.1f} days)\n")
-
- prev_net = psutil.net_io_counters()
- prev_disk = psutil.disk_io_counters()
-
- # 记录上次更新磁盘使用数据的时间
- last_disk_usage_update = time.time()
- # 记录上次清理数据的时间
- last_cleanup_time = time.time()
-
- while True:
- time.sleep(interval)
- current_time = time.time()
- ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- cpu = cpu_info()
- mem = memory_info()
- disk_usage = get_disk_usage()
- prev_disk, disk_io = disk_io_info(prev_disk, interval)
- prev_net, up, down = network_speed(prev_net, interval)
- gpus = gpu_info()
- # 判断是否需要更新磁盘使用数据
- should_save_disk_usage = (current_time - last_disk_usage_update) >= disk_usage_interval
-
- # 保存到数据库
- db_success = save_to_database(
- local_ip, interface_name, cpu, mem, disk_usage, disk_io, up, down, gpus,
- save_disk_usage=should_save_disk_usage
- )
-
- # 更新磁盘使用数据的时间戳
- if should_save_disk_usage and db_success:
- last_disk_usage_update = current_time
- disk_status = "Disk usage saved"
- else:
- disk_status = "Disk usage skipped"
-
- db_status = f"Saved to DB ({disk_status})" if db_success else "DB Error"
- # 判断是否需要清理旧数据
- if (current_time - last_cleanup_time) >= cleanup_interval:
- print("\n" + "=" * 80)
- print("Starting data cleanup...")
- cleanup_success = cleanup_old_data(days=7)
- if cleanup_success:
- last_cleanup_time = current_time
- print("Data cleanup completed")
- else:
- print("Data cleanup failed")
- print("=" * 80 + "\n")
- print("=" * 80)
- print(f"Time: {ts} | Interface: {interface_name} | IP: {local_ip} | {db_status}")
- print(f"CPU: {cpu['usage']}% | Cores: {cpu['cores']} | Load: {cpu['load']}")
- print(f"Memory: {mem['used']}MB / {mem['total']}MB ({mem['percent']}%)")
- print(f"Disk: {disk_usage['used']}GB / {disk_usage['total']}GB ({disk_usage['percent']}%)")
- print(f"Disk IO: Read {disk_io['read_mb']}MB/s | Write {disk_io['write_mb']}MB/s | "
- f"IOPS: R={disk_io['read_iops']} W={disk_io['write_iops']}")
- print(f"Network: Up {up} MB/s | Down {down} MB/s")
- if gpus:
- for g in gpus:
- print(
- f"GPU{g['id']}: {g['util']}% | "
- f"{g['mem_used']}/{g['mem_total']}MB | "
- f"{g['temp']}°C"
- )
- else:
- print("GPU: Not available")
- print("=" * 80)
- if __name__ == "__main__":
- monitor()
|