- #coding: utf-8
- # +-------------------------------------------------------------------
- # | 宝塔Linux面板 x3
- # +-------------------------------------------------------------------
- # | Copyright (c) 2015-2016 宝塔软件(http://bt.cn) All rights reserved.
- # +-------------------------------------------------------------------
- # | Author: 黄文良 <2879625666@qq.com>
- # +-------------------------------------------------------------------
- #+--------------------------------------------------------------------
- #| 服务器测试
- #+--------------------------------------------------------------------
- import time,psutil,random,os,urllib,binascii,json,public,re,sys;
- class score_main:
-
- __APIURL = 'http://www.bt.cn/api/Auth';
- __UPATH = 'data/userInfo.json';
- __userInfo = None;
- __PDATA = None;
-
- def CheckToken(self):
- pdata = {}
- data = {}
- if os.path.exists(self.__UPATH):
- self.__userInfo = json.loads(public.readFile(self.__UPATH));
- if self.__userInfo:
- pdata['access_key'] = self.__userInfo['access_key'];
- data['secret_key'] = self.__userInfo['secret_key'];
- else:
- pdata['access_key'] = 'test';
- data['secret_key'] = '123456';
- pdata['data'] = data;
- self.__PDATA = pdata;
-
- #修正信息
- def SetScore(self,get):
- if hasattr(get,'ips'):
- return self.SubmitSetScore('ips',get.ips);
- if hasattr(get,'virt'):
- return self.SubmitSetScore('virt',get.virt);
-
- #发送信息修正
- def SubmitSetScore(self,key,value):
- self.CheckToken();
- self.__PDATA['data'][key] = value
- self.__PDATA['data'] = self.De_Code(self.__PDATA['data']);
- result = json.loads(public.httpPost(self.__APIURL+'/SetSocre',self.__PDATA));
- result['data'] = self.En_Code(result['data']);
- return result;
-
- #获取得分列表
- def GetScore(self,get):
- self.CheckToken();
- self.__PDATA['data'] = self.De_Code(self.__PDATA['data']);
- result = json.loads(public.httpPost(self.__APIURL+'/GetSocre',self.__PDATA));
- result['data'] = self.En_Code(result['data']);
- result['last'] = self.GetConfig()
- return result;
-
- #获取配置信息
- def GetConfig(self,get=None):
- virt = '/usr/sbin/virt-what'
- if not os.path.exists(virt):
- if os.path.exists('/etc/yum.repos.d/epel.repo'):
- os.system('mv /etc/yum.repos.d/epel.repo /etc/yum.repos.d/epel.repo_backup');
- os.system('yum install virt-what -y');
- if os.path.exists('/etc/yum.repos.d/epel.repo_backup'):
- os.system('mv /etc/yum.repos.d/epel.repo_backup /etc/yum.repos.d/epel.repo');
-
- data = {}
- data['virt'] = public.ExecShell('virt-what')[0].strip();
-
- # 修复CPU信息提取
- try:
- cpuinfo = open('/proc/cpuinfo','r').read()
-
- # 方案1:优先尝试ARM架构字段
- arm_implementer = re.search(r"CPU implementer\s+:\s+(0x[\da-fA-F]+)", cpuinfo)
- arm_part = re.search(r"CPU part\s+:\s+(0x[\da-fA-F]+)", cpuinfo)
-
- if arm_implementer and arm_part:
- implementer_code = arm_implementer.groups()[0]
- part_code = arm_part.groups()[0]
- data['cpu'] = f"ARM {self._decode_arm_cpu(implementer_code, part_code)}"
- else:
- # 方案2:回退到x86的model name
- tmp = re.search(r"model\s+name\s+:\s+(.+)", cpuinfo)
- data['cpu'] = tmp.groups()[0].strip() if tmp else "Unknown CPU"
-
- except Exception as e:
- data['cpu'] = f"Error: {str(e)}"
- # 修复CPU信息结束
-
- data['core'] = psutil.cpu_count();
- data['memory'] = psutil.virtual_memory().total/1024/1024
- data['system'] = self.GetSystemVersion();
-
- scoreInfo = self.readScore();
- data['disk'] = str(scoreInfo['read'])+','+str(scoreInfo['write'])
- data['mem_score'] = scoreInfo['mem'];
- data['cpu_score'] = scoreInfo['cpu1'] + scoreInfo['cpu2'] + scoreInfo['cpu3'] + scoreInfo['cpu4'];
- data['disk_score'] = scoreInfo['disk_score'];
- data['total_score'] = scoreInfo['mem']+data['cpu_score']+scoreInfo['disk_score'];
- return data;
- # 修复CPU信息提取
- def _decode_arm_cpu(self, implementer, part):
- """将ARM CPU代码转换为可读名称"""
- arm_cpu_db = {
- ("0x41", "0xd03"): "Cortex-A53",
- ("0x41", "0xd04"): "Cortex-A35",
- ("0x41", "0xd05"): "Cortex-A55", # 您的CPU
- ("0x41", "0xd0a"): "Cortex-A75",
- }
- return arm_cpu_db.get((implementer.lower(), part.lower()), f"Unrecognized ARM CPU ({implementer}/{part})")
- # 修复CPU信息提取
- #提交到云端
- def SubmitScore(self,get):
- try:
- self.CheckToken();
- pdata = self.GetConfig(get);
- if not pdata['total_score']: return public.returnMsg(False,'请先跑分!');
- pdata['secret_key'] = self.__userInfo['secret_key'];
- self.__PDATA['data'] = self.De_Code(pdata);
- result = json.loads(public.httpPost(self.__APIURL+'/SubmitScore',self.__PDATA));
- result['data'] = self.En_Code(result['data']);
- return result;
- except:
- return None;
-
- #取操作系统版本
- def GetSystemVersion(self):
- version = public.readFile('/etc/redhat-release')
- if not version:
- version = public.readFile('/etc/issue').replace('\\n \\l','').strip();
- else:
- version = version.replace('release ','').strip();
- return version
-
- #写当前得分
- def writeScore(self,type,value):
- scoreFile = 'plugin/score/score.json';
- tmp = public.readFile(scoreFile)
- if not tmp:
- data = {}
- data['cpu1'] = 0;
- data['cpu2'] = 0;
- data['cpu3'] = 0;
- data['cpu4'] = 0;
- data['mem'] = 0;
- data['disk_score'] = 0;
- data['read'] = 0;
- data['write'] = 0;
- public.writeFile(scoreFile,json.dumps(data));
- tmp = public.readFile(scoreFile)
-
- data = json.loads(tmp);
- data[type] = value;
- public.writeFile(scoreFile,json.dumps(data));
-
- #读当前得分
- def readScore(self):
- scoreFile = 'plugin/score/score.json';
- tmp = public.readFile(scoreFile)
- if not tmp:
- data = {}
- data['cpu1'] = 0;
- data['cpu2'] = 0;
- data['cpu3'] = 0;
- data['cpu4'] = 0;
- data['mem'] = 0;
- data['disk_score'] = 0;
- data['read'] = 0;
- data['write'] = 0;
- public.writeFile(scoreFile,json.dumps(data));
- tmp = public.readFile(scoreFile)
- data = json.loads(tmp);
- return data;
-
-
- #测试CPU
- def testCpu(self,get,n = 1):
- data = {}
- data['cpuCount'] = psutil.cpu_count();
- if not hasattr(get,'type'): get.type = '0';
- import re;
- cpuinfo = open('/proc/cpuinfo','r').read();
- rep = "model\s+name\s+:\s+(.+)"
- tmp = re.search(rep,cpuinfo);
- data['cpuType'] = ""
- if tmp:
- data['cpuType'] = tmp.groups()[0];
-
- import system
- data['system'] = system.system().GetSystemVersion();
- path = 'plugin/score/testcpu';
- if not os.path.exists(path): os.system('gcc '+path+'.c -o ' +path + ' -lpthread');
- start = time.time();
- os.system(path + ' 32 ' + get.type);
- end = time.time();
- data['score'] = int(400 * 10 / (end - start));
- if not os.path.exists(path): data['score'] = 0;
- self.writeScore('cpu'+get.type, data['score'])
- return data;
-
- #测试整数运算
- def testInt(self):
- return self.testIntOrFloat(1);
-
- #测试浮点运行
- def testFloat(self):
- return self.testIntOrFloat(1.01);
-
- #CPU测试体
- def testIntOrFloat(self,n=1):
- start = time.time();
- num = 10000 * 100;
- for i in range(num):
- if i == 0: continue;
- a = n + i;
- b = n - i;
- c = n * i;
- d = n / i;
- n = n + 1;
-
- end = time.time();
- usetime = end - start;
- return num / 100 / usetime;
-
- #冒泡算法测试
- def testBubble(self):
- start = time.time();
- num = 10000 * 5;
- xx = 'qwertyuiopasdfghjklzxcvbnm1234567890'
- for c in xrange(num):
- lst = []
- for k in range(10):
- lst.append(xx[random.randint(0,len(xx)-1)])
- lst = self.bubbleSort(lst)
- end = time.time();
- usetime = end - start;
- return num / 5 / usetime;
-
- #冒泡排序
- def bubbleSort(self,lst):
- length = len(lst)
- for i in xrange(0, length, 1):
- for j in xrange(0, length-1-i, 1):
- if lst[j] < lst[j+1]:
- temp = lst[j]
- lst[j] = lst[j+1]
- lst[j+1] = temp
- return lst
-
- #二叉树算法测试
- def testTree(self):
- import testTree
-
- start = time.time();
- elems = range(3000) #生成树节点
- tree = testTree.Tree() #新建一个树对象
- for elem in elems:
- tree.add(elem) #逐个加入树的节点
-
- tree.level_queue(tree.root)
- tree.front_digui(tree.root)
- tree.middle_digui(tree.root)
- tree.later_digui(tree.root)
- tree.front_stack(tree.root)
- tree.middle_stack(tree.root)
- tree.later_stack(tree.root)
-
- end = time.time();
- usetime = end - start;
- return 3000 / usetime;
-
-
-
- #测试内存
- def testMem(self,get):
- mem = psutil.virtual_memory()
- self.writeScore('mem', mem.total/1024/1024)
- #提交数据
- self.SubmitScore(get)
- return int(mem.total/1024/1024);
-
- #测试磁盘
- def testDisk(self,get):
- import os
- data = {}
- os.system('rm -f testDisk_*');
- filename = "testDisk_" + time.strftime('%Y%m%d%H%M%S',time.localtime());
- data['write'] = self.testDiskWrite(filename);
- import shutil
- filename2 = "testDisk_" + time.strftime('%Y%m%d%H%M%S',time.localtime());
- shutil.move(filename,filename2);
- data['read'] = self.testDiskRead(filename2);
- diskIo = psutil.disk_partitions()
- diskInfo = []
- for disk in diskIo:
- tmp = {}
- tmp['path'] = disk[1]
- tmp['size'] = psutil.disk_usage(disk[1])[0]
- diskInfo.append(tmp)
- data['diskInfo'] = diskInfo;
- writeDisk = data['write'];
- if data['write'] > 1000: writeDisk = 1000;
- readDisk = data['read'];
- if data['read'] > 1000: readDisk = 1000;
-
- data['score'] = (writeDisk * 6) + (readDisk * 6)
- os.remove(filename2);
-
- self.writeScore('disk_score', data['score'])
- self.writeScore('write', data['write'])
- self.writeScore('read', data['read'])
- return data;
- pass
-
- #测试磁盘写入速度
- def testDiskWrite(self,filename):
- import random
- start = time.time();
- fp = open(filename,'w+');
- strTest = "";
- strTmp = "";
- for n in range(4):
- strTmp += chr(random.randint(97, 122))
- for n in range(1024):
- strTest += strTmp;
-
- for i in range(1024 * 256):
- fp.write(strTest);
-
- del(strTest);
- del(strTmp);
- fp.close()
- end = time.time();
- usetime = end - start;
- return int(1024/usetime);
-
- #测试磁盘读取速度
- def testDiskRead(self,filename):
- os.system('echo 3 > /proc/sys/vm/drop_caches');
- import random
- start = time.time();
- fp = open(filename,'r');
- size = 4096;
- while True:
- tmp = fp.read(size);
- if not tmp: break;
- del(tmp);
- fp.close()
- end = time.time();
- usetime = end - start;
- return int(1024/usetime);
-
- def testWorkNet(self):
- pass
-
-
- #加密数据
- def De_Code(self,data):
- if sys.version_info[0] == 2:
- pdata = urllib.urlencode(data);
- else:
- pdata = urllib.parse.urlencode(data);
- if type(pdata) == str: pdata = pdata.encode('utf-8')
- return binascii.hexlify(pdata);
-
- #解密数据
- def En_Code(self,data):
- if sys.version_info[0] == 2:
- result = urllib.unquote(binascii.unhexlify(data));
- else:
- if type(data) == str: data = data.encode('utf-8')
- tmp = binascii.unhexlify(data)
- if type(tmp) != str: tmp = tmp.decode('utf-8')
- result = urllib.parse.unquote(tmp)
- if type(result) != str: result = result.decode('utf-8')
- try:
- return json.loads(result);
- except: return result
-
复制代码
|
|