859898918 发表于 2025-5-31 18:43:12

【已解答】宝塔跑分获取不到CPU型号出错的BUG!

#coding: utf-8
# +-------------------------------------------------------------------
# | 宝塔Linux面板 x3
# +-------------------------------------------------------------------
# | Copyright (c) 2015-2016 宝塔软件(http://bt.cn) All rights reserved.
# +-------------------------------------------------------------------
# | Author: 黄文良 <2879625666@qq.com>
# +-------------------------------------------------------------------

#+--------------------------------------------------------------------
#|   服务器测试
#+--------------------------------------------------------------------

import time,psutil,random,os,urllib,binascii,json,public,re,sys;
class score_main:
   
    __APIURL = 'http://www.bt.cn/api/Auth';
    __UPATH = 'data/userInfo.json';
    __userInfo = None;
    __PDATA = None;
   
    def CheckToken(self):
      pdata = {}
      data = {}
      if os.path.exists(self.__UPATH):
            self.__userInfo = json.loads(public.readFile(self.__UPATH));
            if self.__userInfo:
                pdata['access_key'] = self.__userInfo['access_key'];
                data['secret_key'] = self.__userInfo['secret_key'];
      else:
            pdata['access_key'] = 'test';
            data['secret_key'] = '123456';
      pdata['data'] = data;
      self.__PDATA = pdata;
            
    #修正信息
    def SetScore(self,get):
      if hasattr(get,'ips'):
            return self.SubmitSetScore('ips',get.ips);
      if hasattr(get,'virt'):
            return self.SubmitSetScore('virt',get.virt);
   
    #发送信息修正
    def SubmitSetScore(self,key,value):
      self.CheckToken();
      self.__PDATA['data'] = value
      self.__PDATA['data'] = self.De_Code(self.__PDATA['data']);
      result = json.loads(public.httpPost(self.__APIURL+'/SetSocre',self.__PDATA));
      result['data'] = self.En_Code(result['data']);
      return result;
   
    #获取得分列表
    def GetScore(self,get):
      self.CheckToken();
      self.__PDATA['data'] = self.De_Code(self.__PDATA['data']);
      result = json.loads(public.httpPost(self.__APIURL+'/GetSocre',self.__PDATA));
      result['data'] = self.En_Code(result['data']);
      result['last'] = self.GetConfig()
      return result;
   
    #获取配置信息
    def GetConfig(self,get=None):
      virt = '/usr/sbin/virt-what'
      if not os.path.exists(virt):
            if os.path.exists('/etc/yum.repos.d/epel.repo'):
                os.system('mv /etc/yum.repos.d/epel.repo /etc/yum.repos.d/epel.repo_backup');
            os.system('yum install virt-what -y');
            if os.path.exists('/etc/yum.repos.d/epel.repo_backup'):
                os.system('mv /etc/yum.repos.d/epel.repo_backup /etc/yum.repos.d/epel.repo');
      
      data = {}
      data['virt'] = public.ExecShell('virt-what').strip();
      
      # 修复CPU信息提取
      try:
            cpuinfo = open('/proc/cpuinfo','r').read()
            
            # 方案1:优先尝试ARM架构字段
            arm_implementer = re.search(r"CPU implementer\s+:\s+(0x[\da-fA-F]+)", cpuinfo)
            arm_part = re.search(r"CPU part\s+:\s+(0x[\da-fA-F]+)", cpuinfo)
            
            if arm_implementer and arm_part:
                implementer_code = arm_implementer.groups()
                part_code = arm_part.groups()
                data['cpu'] = f"ARM {self._decode_arm_cpu(implementer_code, part_code)}"
            else:
                # 方案2:回退到x86的model name
                tmp = re.search(r"model\s+name\s+:\s+(.+)", cpuinfo)
                data['cpu'] = tmp.groups().strip() if tmp else "Unknown CPU"
               
      except Exception as e:
            data['cpu'] = f"Error: {str(e)}"
         # 修复CPU信息结束
         
      data['core'] = psutil.cpu_count();
      data['memory'] = psutil.virtual_memory().total/1024/1024
      data['system'] = self.GetSystemVersion();
      
      scoreInfo = self.readScore();
      data['disk'] = str(scoreInfo['read'])+','+str(scoreInfo['write'])
      data['mem_score'] = scoreInfo['mem'];
      data['cpu_score'] = scoreInfo['cpu1'] + scoreInfo['cpu2'] + scoreInfo['cpu3'] + scoreInfo['cpu4'];
      data['disk_score'] = scoreInfo['disk_score'];
      data['total_score'] = scoreInfo['mem']+data['cpu_score']+scoreInfo['disk_score'];
      return data;
# 修复CPU信息提取
def _decode_arm_cpu(self, implementer, part):
      """将ARM CPU代码转换为可读名称"""
      arm_cpu_db = {
            ("0x41", "0xd03"): "Cortex-A53",
            ("0x41", "0xd04"): "Cortex-A35",
            ("0x41", "0xd05"): "Cortex-A55",# 您的CPU
            ("0x41", "0xd0a"): "Cortex-A75",
      }
      return arm_cpu_db.get((implementer.lower(), part.lower()), f"Unrecognized ARM CPU ({implementer}/{part})")
# 修复CPU信息提取
#提交到云端
    def SubmitScore(self,get):
      try:
            self.CheckToken();
            pdata = self.GetConfig(get);
            if not pdata['total_score']: return public.returnMsg(False,'请先跑分!');
            pdata['secret_key'] = self.__userInfo['secret_key'];
            self.__PDATA['data'] = self.De_Code(pdata);
            result = json.loads(public.httpPost(self.__APIURL+'/SubmitScore',self.__PDATA));
            result['data'] = self.En_Code(result['data']);
            return result;
      except:
            return None;
      
    #取操作系统版本
    def GetSystemVersion(self):
      version = public.readFile('/etc/redhat-release')
      if not version:
            version = public.readFile('/etc/issue').replace('\\n \\l','').strip();
      else:
            version = version.replace('release ','').strip();
      return version
   
    #写当前得分
    def writeScore(self,type,value):
      scoreFile = 'plugin/score/score.json';
      tmp = public.readFile(scoreFile)
      if not tmp:
            data = {}
            data['cpu1'] = 0;
            data['cpu2'] = 0;
            data['cpu3'] = 0;
            data['cpu4'] = 0;
            data['mem'] = 0;
            data['disk_score'] = 0;
            data['read'] = 0;
            data['write'] = 0;
            public.writeFile(scoreFile,json.dumps(data));
            tmp = public.readFile(scoreFile)
      
      data = json.loads(tmp);
      data = value;
      public.writeFile(scoreFile,json.dumps(data));
   
    #读当前得分
    def readScore(self):
      scoreFile = 'plugin/score/score.json';
      tmp = public.readFile(scoreFile)
      if not tmp:
            data = {}
            data['cpu1'] = 0;
            data['cpu2'] = 0;
            data['cpu3'] = 0;
            data['cpu4'] = 0;
            data['mem'] = 0;
            data['disk_score'] = 0;
            data['read'] = 0;
            data['write'] = 0;
            public.writeFile(scoreFile,json.dumps(data));
            tmp = public.readFile(scoreFile)
      data = json.loads(tmp);
      return data;
                        
   
    #测试CPU
    def testCpu(self,get,n = 1):
      data = {}
      data['cpuCount'] = psutil.cpu_count();
      if not hasattr(get,'type'): get.type = '0';
      import re;
      cpuinfo = open('/proc/cpuinfo','r').read();
      rep = "model\s+name\s+:\s+(.+)"
      tmp = re.search(rep,cpuinfo);
      data['cpuType'] = ""
      if tmp:
            data['cpuType'] = tmp.groups();
      
      import system
      data['system'] = system.system().GetSystemVersion();
      path = 'plugin/score/testcpu';
      if not os.path.exists(path): os.system('gcc '+path+'.c -o ' +path + ' -lpthread');
      start = time.time();
      os.system(path + ' 32 ' + get.type);
      end = time.time();
      data['score'] = int(400 * 10 / (end - start));
      if not os.path.exists(path): data['score'] = 0;
      self.writeScore('cpu'+get.type, data['score'])
      return data;
   
    #测试整数运算
    def testInt(self):
      return self.testIntOrFloat(1);
   
    #测试浮点运行
    def testFloat(self):
      return self.testIntOrFloat(1.01);
      
    #CPU测试体
    def testIntOrFloat(self,n=1):
      start = time.time();
      num= 10000 * 100;
      for i in range(num):
            if i == 0: continue;
            a = n + i;
            b = n - i;
            c = n * i;
            d = n / i;
            n = n + 1;
            
      end = time.time();
      usetime = end - start;
      return num / 100 / usetime;
   
    #冒泡算法测试
    def testBubble(self):
      start = time.time();
      num= 10000 * 5;
      xx = 'qwertyuiopasdfghjklzxcvbnm1234567890'
      for c in xrange(num):
            lst = []
            for k in range(10):
                lst.append(xx)
            lst = self.bubbleSort(lst)
      end = time.time();
      usetime = end - start;
      return num / 5 / usetime;
   
    #冒泡排序
    def bubbleSort(self,lst):
      length = len(lst)
      for i in xrange(0, length, 1):
            for j in xrange(0, length-1-i, 1):
                if lst < lst:
                  temp = lst
                  lst = lst
                  lst = temp
      return lst
   
    #二叉树算法测试
    def testTree(self):
      import testTree
      
      start = time.time();
      elems = range(3000)            #生成树节点
      tree = testTree.Tree()         #新建一个树对象
      for elem in elems:                  
            tree.add(elem)               #逐个加入树的节点
   
      tree.level_queue(tree.root)
      tree.front_digui(tree.root)
      tree.middle_digui(tree.root)
      tree.later_digui(tree.root)
      tree.front_stack(tree.root)
      tree.middle_stack(tree.root)
      tree.later_stack(tree.root)
      
      end = time.time();
      usetime = end - start;
      return 3000 / usetime;
   
   
   
    #测试内存
    def testMem(self,get):
      mem = psutil.virtual_memory()
      self.writeScore('mem', mem.total/1024/1024)
      #提交数据
      self.SubmitScore(get)
      return int(mem.total/1024/1024);
   
    #测试磁盘
    def testDisk(self,get):
      import os
      data = {}
      os.system('rm -f testDisk_*');
      filename = "testDisk_" + time.strftime('%Y%m%d%H%M%S',time.localtime());
      data['write'] =self.testDiskWrite(filename);
      import shutil
      filename2 = "testDisk_" + time.strftime('%Y%m%d%H%M%S',time.localtime());
      shutil.move(filename,filename2);
      data['read'] =self.testDiskRead(filename2);
      diskIo = psutil.disk_partitions()
      diskInfo = []
      for disk in diskIo:
            tmp = {}
            tmp['path'] = disk
            tmp['size'] = psutil.disk_usage(disk)
            diskInfo.append(tmp)
      data['diskInfo'] = diskInfo;
      writeDisk = data['write'];
      if data['write'] > 1000: writeDisk = 1000;
      readDisk = data['read'];
      if data['read'] > 1000: readDisk = 1000;
      
      data['score'] = (writeDisk * 6) + (readDisk * 6)
      os.remove(filename2);
      
      self.writeScore('disk_score', data['score'])
      self.writeScore('write', data['write'])
      self.writeScore('read', data['read'])
      return data;
      pass
   
    #测试磁盘写入速度
    def testDiskWrite(self,filename):
      import random
      start = time.time();
      fp = open(filename,'w+');
      strTest = "";
      strTmp = "";
      for n in range(4):
            strTmp += chr(random.randint(97, 122))
      for n in range(1024):
            strTest += strTmp;
   
      for i in range(1024 * 256):
            fp.write(strTest);
            
      del(strTest);
      del(strTmp);
      fp.close()
      end = time.time();
      usetime = end - start;
      return int(1024/usetime);
      
    #测试磁盘读取速度   
    def testDiskRead(self,filename):
      os.system('echo 3 > /proc/sys/vm/drop_caches');
      import random
      start = time.time();
      fp = open(filename,'r');
      size = 4096;
      while True:
            tmp = fp.read(size);
            if not tmp: break;
            del(tmp);
      fp.close()
      end = time.time();
      usetime = end - start;
      return int(1024/usetime);
   
    def testWorkNet(self):
      pass
   
   
    #加密数据
    def De_Code(self,data):
      if sys.version_info == 2:
            pdata = urllib.urlencode(data);
      else:
            pdata = urllib.parse.urlencode(data);
            if type(pdata) == str: pdata = pdata.encode('utf-8')
      return binascii.hexlify(pdata);
   
    #解密数据
    def En_Code(self,data):
      if sys.version_info == 2:
            result = urllib.unquote(binascii.unhexlify(data));
      else:
            if type(data) == str: data = data.encode('utf-8')
            tmp = binascii.unhexlify(data)
            if type(tmp) != str: tmp = tmp.decode('utf-8')
            result = urllib.parse.unquote(tmp)
      if type(result) != str: result = result.decode('utf-8')
      try:
            return json.loads(result);
      except: return result
   


859898918 发表于 2025-5-31 18:50:07

我是小盒子刷的linux。安装完宝塔,获取不到CPU型号,就会报错,。

图片传不了。。。提示帖子长度不符合要求

阿珂 发表于 2025-6-4 10:28:49

感谢反馈,这个问题已经记录过了
另外,面板本身不针对aarch这类机构兼容测试哦
页: [1]
查看完整版本: 【已解答】宝塔跑分获取不到CPU型号出错的BUG!