博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python 面向对象&常用模块(二)
阅读量:7242 次
发布时间:2019-06-29

本文共 15198 字,大约阅读时间需要 50 分钟。

1.面向对象

1.1 异常处理(异常、断言、异常类型、抛出异常、Python异常捕捉)

异常处理

name = "aa"try:   # 这一段是要执行的主体    print(name)# except Exception as msg: # 如果没有正常执行,就执行下面的这一段。Exception 是错误类型。except : # 如果没有正常执行,就执行下面的这一段。Exception 是错误类型(有好多类型,可以百度查下,错误类型 ,尽量写的准确些)。    print("name is err...")else: # 只有当try的那段 正常执行了后,才会执行下面的这一段。    print("welcome: {}.".format(name))finally: # 无论有没有正常执行 都会执行这一段的。    print("this is test.")

断言 assert

格式: assert 表达式, '异常错误信息'# 当表达式不成立时,就会抛后面的错误信息。assert  name == "aa","name not wf"

raise 异常类型("错误提示") ;和try结合使用

作用:1.捕获异常为了让程序不至于中断,在逻辑控制之内。2.抛出异常是为了,保证程序在遇到这种问题的时候必须中止。v1 = "wf"li = [1,23]try:    # print(v1)    # print(li[4])    raise  IndexError("索引不存在") # 一般不用# 为啥写这么多except呢?方便记录日志.except IndexError as msg: # 错误类型是索引    print(msg)except Exception as msg: # 所有的错误类型,你可以这么些    print(msg)else:    print("good")finally:    print("test")

1.2 面向对象编程(面向对象基础、封装和调用、间接调用、定义类、类成员)

def send_mail(msg,mail):    print("msg:{} to mail:{}".format(msg,mail))send_mail("hellow","wangfei1000@yeah.net")class mail():    def __init__(self,msg,mail):        #构造方法,无须调用,自动执行        self.msg = msg        self.mail = mail    def send_mail(self):        print("send msg:{} to mail:{}".format(self.msg, self.mail))# 创建一个对象,obj 就是对象。obj是类对象的指针。obj = mail("hello","wf@qq.com") 他指向mail这个类。obj.send_mail()

什么时候能用到类呢? 我个人理解:

例如我写一个zabbix 增删改查脚本,每次操作都需要使用zabbix创建好的这个连接。我当然可以写成面向过程或面向函数的这种脚本。但是它的可读性和代码的简洁度不好。我写成一个类,无论是使用还是可读性都会变的很好。

1.3 继承与多态(类的继承、继承顺序、方法重写)

class c1(): # 父类;基类    def f1(self):        v1 = 1        print(v1)    def f2(self):        v2 = 2        print(v2)class c2(c1): # 子类;派生类,如何继承多个父类呢? c2(c1,c4)    name = "wangfei"   # 静态字段    def __init__(self,age):        self.age = age    def f1(self):        v1 = "c2_2"    # 普通字段        print(v1)    def f3(self):        v3 = 3        print(v3)# obj = c2("18") ## # obj.f1() # 子类的方法是优先父类的方法# # obj.f2() # 子类没有这个方法就到父类去找# re = c2.name # 静态字段通过类来访问的# print(re)# re2 = obj # 普通字段通过对象来访问# print(re2)

啥叫多态呢? 可以往类里传入任何数据类型的参数。

多继承注意事项。

1. 当只有一条道的时候,就是一条路走到底。子类继承父类,父类继承父父类。2. 当有2条道的时候,左边的的优先于右边的。当左边的道走到第三步还没有走到时,就要返回同级走第二条道到底了。3. 这里需要注意的是,self是代表是第一层的obj 就是最小的那代类。

1.4 类方法与类属性(类方法定义、方法调用、类内置属性、运算符重载)

类的方法

class c1():    v2 = 20    def __init__(self,v1):  # 构造方法        self.v1 = v1    def f1(self):  # 普通方法        print(self.v1)        print(c1.v2)    @staticmethod    def f2():   # 静态方法,通过类来调用;为了节省内存        print(c1.v2)obj1 = c1(120)c1.f2() # 通过类来访问静态方法obj1.f2() # 通过对象来访问

类的属性

class c1():    v2 = 20    def __init__(self,v1):        self.v1 = v1    def f1(self):        print(self.v1)        print(c1.v2)    @property # 这个就叫做属性.    def f2(self):        print(c1.v2)    def f3(self):        re = obj1.f2        # print(re)obj1 = c1(120)obj1.f2 # 奇怪吧? 通过访问字段的方式进行使用的

2.实战应用

2.1 模块应用Requests(RESTAPI标准操作、HTTP请求与返回操作、API接口调用实战)

获取访问url的状态码

import  requestsres = requests.get("http://www.baidu.com/")print(res.status_code)

案列1:读取url列表,进行测试,如果状态码是40x 50x 的就打印出来。生产环境就是调用接口进行报警。

import requestsurl_list = ['http://www.baidu.com', 'http://www.sina.com.cn', 'http://adminset.cn']err_http_code = [401,402,403,404,500,501,502,503,504,505]for url in url_list:    res = requests.get(url)    if res.status_code in err_htttp_code:        print("access {} http code failed".format(url))

案例2:上方案例从文件中读取url列表,进行判断。

import requestserror_code_list = [401,402,403,404,500,501,502,503,504,505]with open("url_list.txt","rb") as url_file:     while True:        url_line = url_file.readline()        if url_line:            str_url_line = str(url_line,encoding="utf-8")            str_url = str_url_line.split("\n")[0] # 从文件里面读取的行,后面是有换行符的。\n的 不能直接使用,需要去掉哦。            res = requests.get(str_url)            if res.status_code in error_code_list:                print("url: {}, status code: {}".format(str_url,res.status_code))        else:            break

如果url是包含https,怎么处理呢 ?

res = requests.get("https://www.12306.cn",verify = False)print(res)

请求api接口,获取添加的主机信息。由于请求的时候,需要token信息,url需要拼接下。

# url后面的”“ 表示后面要接参数 ,因为在字典里面定义了(para变量),requests模块加了params后会自动进行 拼接得。# http://115.28.147.154/cmdb/get/host/?token="token信息"&name=“user”para = {'token': 'PnSlpwJDQE3L', 'name': 'adminset'}res = requests.get("http://115.28.147.154/cmdb/get/host/", params = para)# 查看拼接后的urlprint(res.url)# 获取结果data = r.textimport json# 进行反序列化d1 = json.loads(data)print(d1['data']['ip'])

2.2 模块应用Redis、MySQL、MongoDB

向redis里面写入数据,读取数据。

import  redisredis_conn = redis.StrictRedis(host="127.0.0.1",port=6379,db=0)redis_conn.set("name","wangfei") # 写入数据re = redis_conn.get("name") # 读取数据数据print(re)redis_conn.lpush("list2","python class") # push的是一个列表

例子:获取主机名和ip地址 ,然后写到redis里面去。

import  redisimport  socketdef get_sys_info():    try:        hostname = socket.gethostname()        ip = socket.gethostbyname(hostname)    except Exception as msg:        print(msg)        hostname = ""        ip = ""    finally:        data = {"hostname": hostname,"ip":ip}    return  data# 链接redisredis_conn = redis.StrictRedis("127.0.0.1",6379)try:    # 获取系统信息    re = get_sys_info()    # 将系统信息写到redis,作为列表里的一个元素。    redis_conn.lpush("host",re)except Exception as msg:    print(msg)# 上面这一段或者是这么写。# re = get_sys_info()# assert redis_conn.lpush("host",re),"lpush error"

2.3 模块应用logging(日志记录,为自己的程序添加日志)

Level Numeric value
CRITICAL 50
ERROR 40
WARNING 30
INFO 20
DEBUG 10
import logginglogging.basicConfig(level=logging.INFO,                    format='%(asctime)s %(levelname)s %(message)s',                    datefmt='%Y%m%d %H:%M:%S',                    filename="./agent.log",                    filemode='a')log = "redis connecting+++++++++++++"logging.critical(log)

2.4 模块应用psutil(操作系统信息采集实战)

psutil 模块获取系统信息

import  psutil# 获取cpu的数量信息print(psutil.cpu_count()) # 逻辑核心数print(psutil.cpu_count(logical=False)) # 物理核心数# 获取所有磁盘分区信息# print(psutil.disk_partitions())res = psutil.disk_partitions()for mount in res:    print(mount[1])# 获取磁盘分区的使用信息# print(psutil.disk_usage("/"))res = psutil.disk_partitions()for mount in res:    print(psutil.disk_usage(mount[1]))# 获取内存信息print(sutil.virtual_memory())print(sutil.virtual_memory().total)

2.5 模块应用configparser(配置文件解析,为自己的程序增加配置文件管理)

configparser 模块读取配置文件。

import  configparser# 如何从一个文件里面读取参数?# 读取文件流程# 第1步 初始化模块(我自己想的,帮助记忆理解)config = configparser.RawConfigParser()# 第2步 打开配置参数文件redis_conf_file = open("redis.conf","r")# 第3步 读取所有参数内容config.read_file(redis_conf_file)# 第4步 获取对应key下面的参数。print(config.get("redis","host"))print(config.get("redis","port"))-----------------------备注:redis.conf内容    [redis]    host = 127.0.0.1    port = 6379

例子: 从配置文件里面读取redis 地址和端口信息,然后写一个信息到redis里面去。

import  redis# python2 python3 导入configparser模块名不一样。 try:    import configparserexcept Exception as msg:    print(msg)    import  ConfigParserconfig = configparser.RawConfigParser()# 打开配置文件,读取内容try:    with open("redis.conf","r") as redis_conf_file:        config.read_file(redis_conf_file)        host = config.get("redis","host")        port = config.get("redis","port")except Exception as msg:    print(msg)# 写信息到redis里面去try:    redis_conn = redis.StrictRedis(host,port,0)    redis_conn.set("test_key","test_values")except Exception as msg:    print(msg)

2.6 模块应用schedule(Python计划任务实战)

# 定时任务模块# time.sleep 是整个脚本程序都中断了,任务是串行的。使用schedule 是异步执行的。import  scheduleimport  timeimport  threadingdef hello(): 60s    print("2 hello")def morning(): 3600s    print("1 morning")def job_thread(func): # 多线程    t = threading.Thread(target=func)    t.start()schedule.every(2).seconds.do(job_thread, hello) # 每隔2秒执行一次任务。schedule.every(1).seconds.do(job_thread, morning) # 每隔1秒 执行一次任务。while True: # 这个是固定的写法,每隔1s 检查查定时任务    schedule.run_pending()    time.sleep(1)

2.7 xlrd/xlwt (实现对excel文件的读写)

import  xlrd # 读取excel文件内容#### 从Excel文件读取内容zabbix_file = xlrd.open_workbook("./zabbix-host.xls")zabbix_file_table = zabbix_file.sheet_by_name("sheet01")print(zabbix_file_table.nrows) # 获取行数print(zabbix_file_table.row_values(0)) # 获取指定行的内容print( zabbix_file_table.ncols) # 获取列数print(zabbix_file_table.col_values(2)) # 获取指定列的内容print(zabbix_file_table.cell(1,2)) # 获取1行2列内容#### 向Excel文件写入内容import xlwt # 向Excel文件里写入内容zabbix_file = xlwt.Workbook(encoding="ascii") # 指定编码file_sheet = zabbix_file.add_sheet("sheet02") # 新建工作簿file_sheet.write(0,1,label = "hehe") # 写入内容file_sheet.write(0,2,label = "hehe")file_sheet.write(0,4,label = "hehe")file_sheet.write(0,3,label = "hehe")zabbix_file.save("test.xls") # 保存

3.实战案例

3.1 调用Zabbix API ,增删改主机。

from  pyzabbix import ZabbixAPIimport xlrdimport configparserimport loggingimport sysimport osclass zabbix():    def __init__(self,host_file):        self.host_file = host_file        # 日志模块        logging.basicConfig(level=logging.INFO,                            format='%(asctime)s %(levelname)s %(message)s',                            datefmt='%Y%m%d %H:%M:%S',                            filename="./zabbix.log",                            filemode='a')        # 读取zabbix 配置文件        try:            config = configparser.RawConfigParser()            with open("/Users/feiwang/PycharmProjects/python-study2/work/zabbix.conf","r") as zabbix_conf_file:                config.read_file(zabbix_conf_file)                self.zabbix_user = config.get("zabbix","user")                self.zabbix_passwd = config.get("zabbix","passwd")        except Exception as msg:            logging.error(msg)            raise msg        # 登录zabbix        try:            self.zapi = ZabbixAPI("http://192.168.4.135/zabbix")            self.zapi.login(self.zabbix_user,self.zabbix_passwd)            logging.info("connecting zabbix successful.")        except Exception as msg:            logging.error("connecting zabbix failed" + "detail:{}".format(msg))            raise msg    def load_zabbix(self):        # 读取excel文件        try:            zabbix_file = xlrd.open_workbook(self.host_file)            zabbix_file_table = zabbix_file.sheet_by_name("sheet01")            logging.info("open host.xls ok.")        except Exception as msg:            logging.error(msg)            zabbix_file = ''            zabbix_file_table = ''        # 读取zabbix 信息        host_list = []        for row in range(1, zabbix_file_table.nrows):            row_values = zabbix_file_table.row_values(row)            visible_name = row_values[1]            hostname = row_values[0]            port = row_values[-1]            ip = row_values[-2]            group_id = "2"            create_host = {                "host": hostname,                "description": visible_name,                "name": visible_name,                "interfaces": [                    {                        "type": 1,                        "main": 1,                        "useip": 1,                        "ip": ip,                        "dns": "",                        "port": port                    }, ],                "groups": [{                    "groupid": group_id                }, ], }            host_list.append(create_host)        return host_list    def add_host(self):        for host in self.load_zabbix():            try:                self.zapi.host.create(host)                logging.info("create status: ok, create host info:{}".format(host))            except Exception as msg:                print(msg)                logging.error("zabbix create hosts failed. {}".format(msg))    # # del host    def del_host(self):        '''        循环zabbix 上获取到的主机,然后和本地excel文件里要删除的主机进行对比,如果是的,那么就把他删掉。        删除用hostid        :return:        '''        for host in obj.zapi.host.get():            for del_host in self.load_zabbix():                if host["host"] == del_host["host"]:                    try:                        self.zapi.host.delete(host["hostid"])                        print("delete host{} is ok.".format(host["host"]))                        logging.info("del host:{} ok".format(host["hostid"]))                    except Exception as msg:                        print(msg)                        logging.error("del host:{} failed,".format(host["host"]))if __name__ == "__main__":    action,file = sys.argv[1:]    assert os.path.exists(file),"file:'{}' doesn't exists".format(file)    obj = zabbix(file)    if action == "add":        obj.add_host()    elif action == "del":        obj.del_host()    elif action == "update":        pass    elif action == "get":        pass    else:        print("action is err.")        raise Exception

3.2 开发一个Agent,采集系统信息。

要求:

  1. 获取cpu 内存 磁盘(磁盘使用信息) 主机名 主机ip地址 等信息
  2. 组装成json格式上传到redis里。
  3. 要求记录日志
  4. 定时执行每10分钟更新一次。
  5. 用多线程执行

Agent脚本内容

import jsonimport platformimport scheduleimport psutilimport configparserimport redisimport loggingimport threadingimport socketimport timeimport mathdef log():    logging.basicConfig(level=logging.INFO,                        format='%(asctime)s %(levelname)s %(message)s',  # 日志格式                        datefmt='%Y%m%d %H:%M:%S',  # 时间格式                        filename="./agent.log",  # 日志文件的路径                        filemode='a')  # 文件的打开模式    return logging.basicConfiglog()def get_sys():    try:        hostname = socket.gethostname()        ip = socket.gethostbyname(hostname)    except Exception as msg:        print(msg)        hostname = ""        ip = ""    data = {"hostname":hostname,"ip":ip}    return  datadef get_cpu():    cpu_count_logical = psutil.cpu_count(logical=False)    cpu_count_phycial = psutil.cpu_count(logical=True)    cpu_percent = psutil.cpu_percent(interval=2)    data = {"phycial_count":cpu_count_phycial,"logical":cpu_count_logical,"percent":cpu_percent}    return datadef get_mem():    # math.ceil(小数) 取整数    mem_total = math.ceil(psutil.virtual_memory().total /1024/1024/1024)    mem_percent = psutil.virtual_memory().percent    data = {"mem_total":mem_total,"mem_percent":mem_percent}    return datadef get_disk():    disk_list = []    all_disk_part = psutil.disk_partitions()    for partition in all_disk_part:        mount_point = partition[1]        mount_total = math.ceil(psutil.disk_usage(mount_point).total/1024/1024/1024)        mount_percent = psutil.disk_usage(mount_point).percent        data = {"mount": mount_point,"total":mount_total,"percent":mount_percent}        disk_list.append(data)    return  disk_listdef thread_run():    passdef agent_info():    sys_data = {}    sys_data["host"] = get_sys()    sys_data["disk"] = get_disk()    sys_data["mem"] = get_mem()    sys_data["cpu"] = get_cpu()    return  json.dumps(sys_data) # 进行反序列化def put_redis():    config = configparser.RawConfigParser()    try:        with open("redis.conf","r") as redis_conf_file:            config.read_file(redis_conf_file)            redis_host = config.get("redis","host")            redis_port = config.get("redis","port")    except Exception as msg:        redis_host = "127.0.0.1"        redis_port = 6379        print(msg)    try:        logging.info("connect redis server.")        redis_conn = redis.StrictRedis(redis_host,redis_port,0)        logging.info("put systeminfo to redis server.")        redis_conn.lpush("host",xagent_info())        logging.info("data:{}".format(agent_info()))        logging.info("put successful....")def thread_job(func):    target =threading.Thread(target=func)    target.start()if __name__ == "__main__":    schedule.every(2).seconds.do(thread_job,put_redis)    while True:        schedule.run_pending()        time.sleep(1)

转载于:https://blog.51cto.com/damaicha/2123578

你可能感兴趣的文章
sequelize update 原生sql 没有返回值
查看>>
根据手机类型调转到应用吧还是appstore下载应用
查看>>
听阿里巴巴JVM工程师为你分析常见Java故障案例
查看>>
Sphinx reStructuredText 的安装
查看>>
日本程序员是这样吗
查看>>
JavaScript与C++对象绑定原理及效率分析
查看>>
大数据时代的IT架构设计
查看>>
Custom Tableviews
查看>>
java 开源 cms FreeCMS1.6发布
查看>>
事件库之Libev(一)
查看>>
使用原理视角看 Git
查看>>
jooq-codegen-maven 的配置和使用
查看>>
如何处理Tomcat日志catalina.out日志文件过大的问题
查看>>
数据质量监控工具-Apache Griffin
查看>>
支付宝担保交易接口
查看>>
Intellij Idea中运行tomcat 后报内存溢出
查看>>
打开core文件功能
查看>>
nginx重新安装模块
查看>>
13个能快速开发android的经典项目
查看>>
HTML5给Button加链接
查看>>