kaixin
Published on 2023-02-11 / 13 Visits
0

Python网络编程

计算机网络基础

1.计算机

计算机硬件
操作系统  系统软件  控制程序 让计算机硬件进行启动会起来
qq 微信  引用软件  应用软件是基于操作系统进行控制计算机硬件

2.网络设备

二交换机:
内部只会维护 接口和mac地址,局域网内的交换
每台电脑存在mac地址,电脑主板换掉,mac地址就会换
路由器:
内部存在网管
实现局域网和局域网之间的通信
三层交换机:
具有路由器的功能,也有尔晴交换机的功能
企业网络架构
光猫-核心路由器-防火墙-核心交换机(3层)-接入交换机(2层)
家庭网路架构
运营商-光猫-家用路由器
​
被覆盖的部分是:掩码部分,
没有覆盖:主机部分
子网掩码:为了掩盖电脑的ip地址
ip:ip地址代指这台电脑 32位的2进制
二进制:00000000.00000000.0000000......
十进制:255.255.255.15
0-255
DHCP服务:
开启后 自动随机设置网管和掩码,王关
以太网-属性-ipv4-使用下面的ip地址(手动编写)
​
内网ip:自己组件组建的网段
10.0.0.0-10.255.255.255
172.16.0.0.-172.31.255.255
192.168.0.0.-192.168.255.255
​
公网ip:运营商分配的ip地址
​
云服务器:
不需要自己配置服务器,其他公司整的服务器,把程序放到他的服务器上去运行。
端口:网站会存在一个端口:端口指向一个网站或者程序
0-65535 端口范围
0-5000 都存在含义
​
域名:用户很难记住ip和端口,就需要一个域名
根据域名寻找ip
获取ip地址再去访问网址
先域名解析,在去访问ip

3.网络是什么

计算机与计算机之间的通信
计算机之间的通信需要 一个标准进行通信 标准:互联网协议
# 例如:
计算机1: 发数据按照互联网协议发送
计算机2: 接受计算机1的数据 根据互联网协议进行反解

# 互联网协议:osi 七层协议
1.物理层: 
	与硬件相关 发送的是二进制数据 01001010 发电报
2.数据链路层: 
    将数据进行分组,赋予2进制数据意义
    根据以太网协议:
        1.将一串电信号 为数据包
        2.数据报 分为:抱头 和包内容
            包头规定 18 个字节:  前6字节原地址mac 6字节接受地址mac 6字节数据描述
            包内容:就是主要的传递内容
        以太网的工作方式:广播方式 基于mac地址进行广播方式进行数据传入的
        mac地址只能表示一个计算的地址在哪里
        广播的方式:必须要在同一个局域网之内,但是局域网存在多个,那么怎么根据mac地址找到不同局域网的进行传递数据(网络层)

3.网络层:
    ip协议,每个机器配一个ip地址
	也分为ip头部分和ip数据部分 固定字节
	原地址  目标长度
    ip地址就是:找到对应的子网地址(局域网)
    # ip + mac 地址 就可以找到世界中的某个计算机的位置
    ip 负责找 子网的位置
    mac负责在子网中找到对相应的机器


4.传输层: 
	tcp 与 utp 协议 基于端口协议 
	tcp与utp头 和 包数据
    基于端口操作 0-65535
    一台机器中的端口就是一个软件
    ip + 端口 就能表示全是键一个独一无二的软件
    # 发包 就是封装包的数据
    # 收包 就是解包的数据
    
    
5.应用层:
	执行的硬用软件
    应用软件自己规定的
    应用层的软件头 + 数据

    
# 传输数据封装过程 从头开始一层一层的封装到
# 解数据,就是从尾部一点一点解开
6.数据链层
	报头 + 数据(网络层 ip的头部 + 数据(传入层tcp 或者 utp 协议头 + 数据(硬用层头 + 数据)))
    
    
7.物理层
	将数据链层的数据转为电报(2进制010101)

简单理解
OSI 7 层模型:
应用层:规定数据格式
表示层:压缩数据,分块,加密
会话层:负责服务端建立,关闭
传输层:确定端口 双方的端口
网络层:标记目标ip信息(ip协议)
数据链层:对数据分组设置和目标mac地址
物理层:将数据打包成为2进制传输

4.tcp/udp

UDP协议/TCP协议 都是在传输层进行定义的:

UDP协议:是一个无连接简单的面向数据的传输协议,UDP不提供可靠,它只是将应用程序传给IP层的数据发出去
不能保证能不能到达目标地址。不予客户端建立链接,超时没有重发机制,传输速度慢。
语音通话,视频通话,实时游戏画面
速度快,不需要链接

TCP协议:是面向链接的协议,在收发数据,必须和对方建立链接,在进行收发数据
存在重发数据机制
网站,手机App


1.tcp 协议:
    可靠协议:发送者需要接受者的反馈
    可靠,但是开销大,需要建立链接
    流式协议,以水流一样传递
    客户端 与 服务端 之间需要建一个 双向管道
    在没有传入数据之前,就要建好一个双向的管道
    
2.udp协议:
    不可靠的协议
    不用创建链接 开销低,效率高
    没有链接存在
    在发送数据,直接丢给服务端或者客户端,不需要知道对方是否接受
    
    
3.tcp的三次握手与四次挥手

    三次握手
    服务端  <<< -----   客户端  告诉服务端 进行链接(会传输数据随机生成)
    服务端  ----- >>>  客户端   服务端在回一个数据+1 返回给客户端
    服务端  <<< ----- 客户端   确定链接

    四次挥手
    服务端  <<< -----   客户端  服务端或者客户端我要关闭连
    服务端  ----- >>>  客户端  收到消息 有数据未处理
    服务端  ----- >>>  客户端  在返回消息 数据处理完毕
    服务端  <<< -----   客户端  同意关闭

socket

1.什么是socket

套接字编程,因为tcp 与 utp协议,古老而庞大,研究下来时间过长
所以出现了socket
socket 是基于 应用层 传输层 之间的一个抽象层
负责将用户的数据转为tcp 与 utp协议的 内容传给传输层-网络层-数据链层-物理层
基于socket 写出的程序自然就是按照tcp 与 utp 协议

tcp 与 utp 使用的协议不同
TCP/UTP区别于:UTP:socket.SOCK_DGRAM 不同/TCP:socket.SOCK_STREAM不同

2.socket-utp

# UTP协议socket的代码
server 服务端

    import socket

    sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # socket.SOCK_DGRAM参数不同
    sock.bind(('127.0.0.1',8001))

    while True:
        data,(host,post) = sock.recvfrom(1024) # 接受3个参数
        print(data,host,post) # data 是收到的内容, (host,post) 是ip地址和端口
        sock.sendto('返回的消息'.encode('utf-8'),(host,post)) # 将内容发送,发到指定的端口


client 客户端 socket.SOCK_DGRAM

    import socket
    client = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) # 实例对象
    while True:
        client.sendto('发送消息'.encode('utf-8'),('127.0.0.1',8001))
        #将内容发送给指定的 ip 和端口
        data, (host, post) = client.recvfrom(1024)  # 接受3个参数
        print(data.decode('utf-8')) # 将发送的消息转换
        break
    client.close() # 关闭链接

3.socket-tcp

server 服务端:
    import socket

    # 买手机
    # AF_INET 基于 网络通信套接字
    # AF_UNIX 本机通讯套接字
    # socket.SOCK_STREAM 基于tcp协议
    phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 绑定手机卡 传入(host,post) ip 端口
    phone.bind(('127.0.0.1', 8000))

    # 开机
    phone.listen(5)  # 最大链接的数量5个

    # 等待链接
    print('等待链接中....')
    # 对象1 链接对象  对象2 链接对象的ip和端口
    conn, client_add = phone.accept()  # 阻塞状态,等待链接 返回两个对象
    # 收发消息
    data = conn.recv(1024)  # 接受1024个字节 最大接受1024字节 最大的显示
    print(data)

    # 回消息
    conn.send(b'1001')

    # 挂电话
    conn.close()  # 关闭链接

    # 关机
    phone.close()  # 关闭服务


client 客户端
    import socket

    # 买手机
    # AF_INET 基于 网络通信套接字
    phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)


    # 播好进行链接
    phone.connect(('127.0.0.1', 8000)) # 链接当前id端口服务器

    # 发送消息
    phone.send(b'hahah')

    # 接受消息
    data = phone.recv(1024)
    print(data)

    # 关闭消息
    phone.close()

socket-tcp修复问题与循环操作

server 服务端 

	import socket
    # 买手机
    # AF_INET 基于 网络通信套接字
    # AF_UNIX 本机通讯套接字
    # socket.SOCK_STREAM 基于tcp协议
    phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 套接字对象
    # 重用当前的端口,端口不会被系统回收
    phone.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    # 绑定手机卡 传入(host,post) ip 端口
    phone.bind(('127.0.0.1', 8000))

    # 开机
    phone.listen(5)  # 最大链接的数量5个 进行3次握手
    print('等待链接中....')

    # 对象1 链接对象  对象2 链接对象的ip和端口
    conn, client_addr = phone.accept()  # 阻塞状态,等待链接 返回两个对象
    print(client_addr)
    # 等待链接
    while True:  # 通信循环
        try:
            # 收发消息
            # 接受1024个字节 最大接受1024字节 最大的显示
            data = conn.recv(1024)  # 告诉操作系统,取网卡中拷贝发送的数据,给我
            # if not data: break # 当客户端单方面关闭,linux
            print(data.decode('utf-8'))
            msg = input('>>:').strip()
            # 回消息
            conn.send(msg.encode('utf-8'))
        except ConnectionResetError as e: # 当客户端单方面关闭 就会报错,是用错误获取机制,将服务关闭
            break

    # 挂电话
    conn.close()  # 关闭链接

    # 关机
    phone.close()  # 关闭服务


client 客户端
    import socket

    # 买手机
    # AF_INET 基于 网络通信套接字
    phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    phone.connect(('127.0.0.1', 8000))  # 链接当前id端口服务器

    # 播好进行链接
    while True: # 通信循环
        msg = input('>>:').strip()
        if not msg:continue
        # 发送消息
        phone.send(msg.encode('utf-8')) # 将存在内存的数据msg,发给操作系统,操作系统在传给网卡,在进行发送

        # 接受消息
        data = phone.recv(1024)
        print(data.decode('utf-8'))

    # 关闭消息
    phone.close()

加上通信循环

client 客户端
    import socket
    # 买手机
    # AF_INET 基于 网络通信套接字
    phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    phone.connect(('127.0.0.1', 8000))  # 链接当前id端口服务器

    # 播好进行链接
    while True: 
        msg = input('>>:').strip()
        if not msg:continue
        # 发送消息
        phone.send(msg.encode('utf-8')) # 将存在内存的数据msg,发给操作系统,操作系统在传给网卡,在进行发送

        # 接受消息
        data = phone.recv(1024)
        print(data.decode('utf-8'))

    # 关闭消息
    phone.close()



server 服务端
    import socket

    # 买手机
    # AF_INET 基于 网络通信套接字
    # AF_UNIX 本机通讯套接字
    # socket.SOCK_STREAM 基于tcp协议
    phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  # 套接字对象
    # 重用当前的端口,端口不会被系统回收
    phone.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    # 绑定手机卡 传入(host,post) ip 端口
    phone.bind(('127.0.0.1', 8000))

    # 开机
    phone.listen(5)  # 最大链接的数量5个 进行3次握手
    print('等待链接中....')

    while True: # 链接循环 不会停止,一直循环监听循环
        # 对象1 链接对象  对象2 链接对象的ip和端口
        conn, client_addr = phone.accept()  # 阻塞状态,等待链接 返回两个对象
        print(client_addr)
        # 等待链接
        while True:  # 通信循环
            try:
                # 收发消息
                # 接受1024个字节 最大接受1024字节 最大的显示
                data = conn.recv(1024)  # 告诉操作系统,取网卡中拷贝发送的数据,给我
                # if not data: break # 当客户端单方面关闭,linux
                print(data.decode('utf-8'))
                msg = input('>>:').strip()
                # 回消息
                conn.send(msg.encode('utf-8'))
            except ConnectionResetError as e: # 当客户端单方面关闭 就会报错,是用错误获取机制,将服务关闭
                break

        # 挂电话
        conn.close()  # 关闭链接

    # 关机
    phone.close()  # 关闭服务



1个1个进行服务,不会断开服务,等待上一次客户端断开后,下一个客户端才会链接

模拟ssh远程执行命令-项目分析

******** 命令只是点了解 *******

# 什么是命令,就是执行系统的命令
# win
# dir:查看某一个文件下的子文件名和子文件名
# ipconfig:查看本地网卡
# tasklist:查看运行的进程md

# 如何在机器上执行命令
# os 模块 中的system 方法就可以执行和cmd命令一样

# import os  不可使用
# os.system('dir /') # 只能拿到命令执行的结果的成功还是失败


# 需要使用subprocess
# shell=True 就是将字符串转为cmd的命令
import subprocess

obj = subprocess.Popen('dir C:\\Users\\56515', shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# win 编码 是gbk mac是utf-8
# 从管道中取
print(obj.stdout.read().decode('gbk')) # 正常的命令
print(obj.stderr.read().decode('gbk')) # 错误的命令



******* 服务端 *******
import socket
import subprocess

phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 重用当前的端口,端口不会被系统回收
phone.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 绑定手机卡 传入(host,post) ip 端口
phone.bind(('127.0.0.1', 8000))

# 开机
phone.listen(5)  # 最大链接的数量5个 进行3次握手
print('等待链接中....')

while True:  # 链接循环 通信循环,接受客户端
    conn, client_addr = phone.accept()  # 阻塞状态,等待链接 返回两个对象
    print(client_addr)
    # 等待链接
    while True:  # 通信循环
        try:
            # 1.接受命令
            data = conn.recv(1024)
            print(data.decode('utf-8'))

            # 2.执行命令,拿到结果
            obj = subprocess.Popen(data.decode('utf-8'), shell=True,
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE)
            # 3.返回命令执行的结果
            stderrs = obj.stderr.read()
            stdouts = obj.stdout.read()
            if not stdouts:
                conn.send(stderrs)
            conn.send(stdouts)
        except ConnectionResetError as e:  # 当客户端单方面关闭 就会报错,是用错误获取机制,将服务关闭
            break

    # 挂电话
    conn.close()  # 关闭链接

# 关机
phone.close()  # 关闭服务


*******  客户端 ********
import socket

# 买手机
# AF_INET 基于 网络通信套接字
phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
phone.connect(('127.0.0.1', 8000))  # 链接当前id端口服务器 进行3次握手

while True:
    # 1.发命令
    msg = input('>>:').strip()
    if not msg: continue
    phone.send(msg.encode('utf-8'))

    # 2.拿到命令结果
    data = phone.recv(1024)
    print(data.decode('gbk'))  # win 解码 需要使用cmd的内容gbk

# 关闭消息
phone.close()

4.IO多路复用

服务器:当等待客户端链接时,处于阻塞状态
客户端:与服务器连接时,也是阻塞状态

如何避免阻塞:
1.在创建服务端
加入:
服务器对象.setblocking(False)# 变为非阻塞状态

2.在客户端
加入:
客户端对象.setblocking(False)# 变为非阻塞状态

IO多路复用:配合 非阻塞 一起使用的一种技术,当没有链接时,可以执行别的代码。


IO读哟路复用基于select模块进行使用

案例:
import select  # IO多路复用模块
import socket  # 网络链接模块

sock =  socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.setblocking(False)  # 变为非阻塞状态
sock.bind(('127.0.0.1',8001))
sock.listen(5)

inputs = [sock,] #sock的 对象列表 监听列表

while True:

    r,w,e = select.select(inputs,[],[],0.5)

    # 0.5:用0.5秒的时间取 检测 inputs列表的 sock的对象,是否有新链接到来
    # 当没有发生变化时(新链接发起请求),inputs被检测的列表就没有变化,是个 r = []列表
    # 当有新链接到来时,r =[sock,] ,sock发生变化就会出现在r 的列表中别检测
    # 在最开始 r=[] 是个空的列表,inputs 别检测的列标中发生变化 r列表就会添加发生变化的对象 r=[发生变化的对象]
    for sock in r:
        if sock = sock : # 确保 是否有新链接
        conn,addr = sock.accept()
        print("新链接")
        inputs.append(conn) # 将新链接 添加到监控列表中 inputs = [sock,第一个客户端对象 conn]

    else:
        data = sock.recv(1024)  # 当客户端发送的是空数据时,判断为关闭链接,就会将客户端移除检测列表
        if data:
            print(新消息,data)
        else:
            print('关闭链接')
            inputs.remove(sock)

 # 其他数据

1.两大优点:
    1.没有新客户端到来,可以利用空档期,去处理其他的数据
    2.可以支持多个客户端进行来接。不在等待1个客户端来发送消息。
	3.客户端可以伪造并发现象
	
2.参数:
'''
r,w,e = select.select([第一个列表],[第二个列表],[第三个列表],0.5)
第一个列表:监听服务端有没有 发送数据 监听 sock对象是否有发送数据。
第二个列表:监听 sock对象是否和服务端 监听链接是否成功
第三个列表:监听 sock对象是否发生异常
0.5:监听时间
'''

IO多路复用的三种模式
1.select.select([],[],[],监听时间)  # 逐一遍历,一个一个进行检测 , 最大1024个sock对象

2.select.poll([],[],[],监听时间)  # 逐一遍历,一个一个进行检测 ,没个数的限制

3.select.epoll([],[],[],监听时间) # 基于回调机制,那个sock对象发生变化,那个就将数据返回,效率更高基于 ,没有数据的限制

5.sockeserver模块

服务器框架:


服务器


import socketserver

'''

1 创建功能类 继承seocketserver
    class MyServer(socketserver.BaseRequestHandler):
        def handle(self) -> None:
            pass 将全部的并发方法写入到handle中
2 socketserver.ThreadingTCPServer( 传入端口 和 创建的功能类)
    socketserver.ThreadingTCPServer(('127.0.0.1',8000),MyServer)
    
3 调用serve forever
    serve.serve_forever()
'''


class MyServer(socketserver.BaseRequestHandler):
    def handle(self) -> None:
        '''
        书写并发的逻辑

        self.request 是客户的套接字对象
        '''
        while 1:
            chient_data = self.request.recv(1024)  # 客户端的传入的值
            if not chient_data: break
            print('接受的值', chient_data)
            response = input('>>>发送值:')
            self.request.sendall(response.encode('utf-8'))
        self.request.close()


# 1.创建一个socket对象 2. 绑定了ip和端口 3.规定了最大链接数listen(5)
serve = socketserver.ThreadingTCPServer(('127.0.0.1', 8000), MyServer)

# 1.返回conn 和 attr 2.开启多线程模式 3.内部将conn赋值了request arrt赋值client_address 内部try 调用了def handle(self)方法
serve.serve_forever()



客户端
import socket

sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.connect(('127.0.0.1', 8000))

print('服务端启动')

while True:
    inp = input('发送服务端>>>')
    sock.send(inp.encode('utf-8'))

    response = sock.recv(1024)
    print(response.decode('utf-8'))

粘包现象

如果没有收到的信息,没有收完时就会出现粘包的现象

粘包的底层原理分析:
	send 通知操作系统 有数据要发送个自己操作系统的内存中,有没有被操作系统发送 也不知道(tcp协议),但是客户端和服务端是隔离的
	recv
		对方发送到网卡的缓存中,
		由操作系统从内存将数据拷贝到系统中

都不是从自己的直接接受数据,而是在自己的操作系统拷贝网卡存储的在内存中的数据
不是一个send(可能是多个) 对应一个 recv

处理机制

粘包: 当客户端快速发送两个数据时,会出现粘包的现象。
解决粘包:先发送数据长度,在接受数据。

粘包基于 struct模块实现的
1.struct模块 进行获取长度 ,将数据转换为字节。
    # 使用方式
    import struct # 1.导入

    # 2.将数字转为为固定的4个字节
    v1 = struct.pack("i",数字/数字变量)  
    # 第一个参数为模式 “i”,	第二个参数为整数/整数的变量
    print(v1) # 将数字转换为 就是4个固定字节

    # 3.将字节转换为数字
    v2 = struct.unpack('i',v1)
    print(v2) # 字节转换为数字


2.粘包的处理过程
	# 第一条数据
    1.先去读取4个字节 ,就确定的数据的长度
    2.直接读取长度
    3.根据长度获取 数据
    # 第二条数据
    1.在去读取4个字节,确定长度
    2.直接读取长度
    3.根据长度获取 数据
    
3.粘包的网络编程案例
# 服务器:
import socket
import struct

sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
sock.bind(('127.0.0.1',8001))
sock.listen(5)
conn,addr = sock.accept()

# 接受第一个数据
header1 = conn.recv(4) # 读取4个字节
data_length1 = struct.unpack("i",header1)[0] #将字节转换为数值
# data_length1 就是第一条数据的总长度

has_recv_len = 0   # 设置接受时的起始长度,每次接受的字节长度加载这个变量
date1 = b'' # 将接受的字节加在一起


# 开始接受数据
while True:
    leng1 = data_length1 -  has_recv_len 
    # 现有的数据长度 总长度-已知长度 ,作用判断数据传入的总共是多少
    # 还需传入多少
    if leng1>1024 :  # 字节最大数量接受为1024字节最大
        lth =1024
    else:
        lth = leng1 # 如果小于1024个字节就接受 剩余的字节
    chunk = conn.recv(lth) # 判断的字节进行接受
    has_recv_len += len(chunk) # 将每次传入的字节,转换为长度和以传长度相加
    date1 += chunk # 将传入的字节,和已传的字节相加
    if  has_recv_len ==  data_length1: 
    # 传入的长度 和 总长度相同时 退出(数据传入完毕)
        break
print(date.decode('utf-8')) # 打印

#接受第二条数据
header2 = conn.recv(4) # 接受的第一个4个字节是,客户端传入的数据长度
data_length2 = struct.unpack("i",header2)[0] #将字节转换为数值
data2 = conn.recv(data_length2) # 接受长度
print(data2.decode('utf-8'))

conn.close()
sock.close()


# 客户端

import socket
import struct

sock = socket.socket()
sock.connect(('127.0.0.1',8001))

# 第一条数据

data1 = '数据1'.encode('utf-8') # 转换为字节
header1 = struct.pack('i',len(data1)) # 转换为4个字节的长度

sock.sendall(header1) # 先传入数据的大小
sock.sendall(data1) # 在传入数据的本身


# 第二条数据
data2 = '数据2'.encode('utf-8') # 转换为字节
header2 = struct.pack('i',len(data2)) # 字节长度,转换为4个字节

sock.sendall(header2) # 先传入数据的大小的字节
sock.sendall(data2) # 在传入数据的本身

sock.close()

1.简单的处理粘包

****** 服务器 *******
import socket
import subprocess

phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 重用当前的端口,端口不会被系统回收
phone.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 绑定手机卡 传入(host,post) ip 端口
phone.bind(('127.0.0.1', 8000))

# 开机
phone.listen(5)  # 最大链接的数量5个 进行3次握手
print('等待链接中....')

while True:  # 链接循环 通信循环,接受客户端
    conn, client_addr = phone.accept()  # 阻塞状态,等待链接 返回两个对象
    print(client_addr)
    # 等待链接
    while True:  # 通信循环
        try:

            data = conn.recv(8096)
            print(data.decode('utf-8'))


            obj = subprocess.Popen(data.decode('utf-8'), shell=True,
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE)
            # 拿到执行cmd命令结果,成功报错
            stderrs = obj.stderr.read()
            stdouts = obj.stdout.read()
            import struct 可以将数据变为字节类型
            # 1.发送数据的长度,但是必须设置标志 将包头发给客户端 固定长度
            '''
            使用struct 模块 将计算的长度进行打包(固定就是4个字节,不论数据多大都是4个字节)
            打包
            res = struct.pack('i',1024000) # res 4个字节的值,不论数字多大都是 
            解包
            struct.unpack(i,res) # 返回元组类型(10244400,) 第一个元素就是
            '''
            # 发送报头 4个字节存储的就是 当前数据的长度
            size = len(stdouts) + len(stderrs)
            header = struct.pack('i',size)

            conn.send(header)

            # 2.发送数据 会出现粘包的情况
            conn.send(stdouts)
            conn.send(stderrs)
        except ConnectionResetError as e:  # 当客户端单方面关闭 就会报错,是用错误获取机制,将服务关闭
            break

    # 挂电话
    conn.close()  # 关闭链接

# 关机
phone.close()  # 关闭服务




******** 客户端 **********

import socket

# 买手机
# AF_INET 基于 网络通信套接字
phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
phone.connect(('127.0.0.1', 8000))  # 链接当前id端口服务器 进行3次握手

while True:
    msg = input('>>:').strip()
    if not msg: continue
    phone.send(msg.encode('utf-8'))

    # 1.拿到数据长度 收到包头
    import struct

    header = phone.recv(4)  # 接受包头,的数据4个字节存储的就是传入数据的长度
    size = struct.unpack('i', header)[0]  # 将包头给解出来,返回的元组,第一个元素就是长度
    # 2.接受真实的数据描述信息
    recv_size = 0
    recv_data = b''
    while recv_size < size:
        data = phone.recv(1024)  # 存在粘包信息
        recv_data += data
        recv_size += len(data)
    print(recv_data.decode('gbk'))

# 关闭消息
phone.close()




'''

import struct 可以将数据变为字节类型

使用struct 模块 将计算的长度进行打包(固定就是4个字节,不论数据多大都是4个字节)
            打包
            res = struct.pack('i',1024000) # res 4个字节的值,不论数字多大都是 
            解包
            struct.unpack(i,res) # 返回元组类型(10244400,) 第一个元素就是



i 模式 有最大上限长度
l 模式也是最大上限长度




'''

2.高级的处理粘包

 **** 服务端 ******
 import socket
import subprocess

phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 重用当前的端口,端口不会被系统回收
phone.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

# 绑定手机卡 传入(host,post) ip 端口
phone.bind(('127.0.0.1', 8000))

# 开机
phone.listen(5)  # 最大链接的数量5个 进行3次握手
print('等待链接中....')

while True:  # 链接循环 通信循环,接受客户端
    conn, client_addr = phone.accept()  # 阻塞状态,等待链接 返回两个对象
    print(client_addr)
    # 等待链接
    while True:  # 通信循环
        try:

            data = conn.recv(8096)
            print(data.decode('utf-8'))

            obj = subprocess.Popen(data.decode('utf-8'), shell=True,
                                   stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE)
         
            stderrs = obj.stderr.read()
            stdouts = obj.stdout.read()
            import struct
            import json

            # 第一步:制作固定的报头发给客户端
            header_dict = {
                'filename': "xx.txt",
                'md5': 'xxx',
                'total_size': len(stdouts) + len(stderrs)  # 数据的长度
            }
            header_json = json.dumps(header_dict)
            header_bytes = header_json.encode('utf-8')

            # 第二步 将包头长度进行发送
            header = struct.pack('i', len(header_bytes))  # 只能接受b类型
            conn.send(header)

            # 第三步 将包头信息发送出去
            conn.send(header_bytes)  # 只能接受bytes类型

            # 第四步 将真实数据发送
            conn.send(stdouts)
            conn.send(stderrs)
        except ConnectionResetError as e:  # 当客户端单方面关闭 就会报错,是用错误获取机制,将服务关闭
            break

    # 挂电话
    conn.close()  # 关闭链接

# 关机
phone.close()  # 关闭服务


***** 客户端 ******

import socket

# 买手机
# AF_INET 基于 网络通信套接字
phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
phone.connect(('127.0.0.1', 8000))  # 链接当前id端口服务器 进行3次握手

while True:
    msg = input('>>:').strip()
    if not msg: continue
    phone.send(msg.encode('utf-8'))

    # 1.拿到数据长度 收到包头
    import struct

    # 1.接受报头长度
    header = phone.recv(4)
    # 将长度获取 元组类型
    header_size = struct.unpack('i', header)[0]
    # 2.接受报头信息
    header_bytes = phone.recv(header_size) # 接受报头的长度
    import json
    # 3.从报头信息中获取 主要内容的长度
    header_dict = json.loads(header_bytes) # 解析为字典
    print(header_dict)
    size = header_dict.get('total_size') # 获取主要内容长度

    # 4.解析真正的内容
    recv_size = 0
    recv_data = b''
    while recv_size < size:
        data = phone.recv(1024)  # 存在粘包信息
        recv_data += data
        recv_size += len(data)
    print(recv_data.decode('gbk'))

# 关闭消息
phone.close()







将报头 变为字典形式进行存储(主要内容的长度),转为json格式,将json格式转为 字节格式
在将字节格式进行len 获取长度,在使用struct.pack('i',报头)设置固定的长度,将内容发送(报头的长度)


在将字节类型的报头传给客户端 

3.tcp套接字传输文件

***** 客户端  *****
import socket

# 买手机
# AF_INET 基于 网络通信套接字
phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
phone.connect(('127.0.0.1', 8000))  # 链接当前id端口服务器 进行3次握手
dow_dir = '客户端下载的路径,要与打开的文件 拼接'
while True:
    # 1.发送命令
    msg = input('>>:').strip()
    if not msg: continue
    phone.send(msg.encode('utf-8'))

    # 2.写的方式打开一个新的文件,收到服务发来的文件写入文件
    import struct

    # 1.接受报头长度
    header = phone.recv(4)
    # 将长度获取 元组类型
    header_size = struct.unpack('i', header)[0]
    # 2.接受报头信息
    header_bytes = phone.recv(header_size)  # 接受报头的长度
    import json

    # 3.从报头信息中获取 主要内容的长度
    header_dict = json.loads(header_bytes)  # 解析为字典
    print(header_dict)
    size = header_dict.get('filename_size')  # 获取文件的长度
    filename = header_dict.get('filename')
    # 4.解析真正的内容
    # 不可以和服务端文件路径进行重复
    with open(filename, 'wb') as f:

        # 获取文件的长度,进行循环写入到一个新的文件中
        recv_size = 0
        while recv_size < size:
            data = phone.recv(1024)
            f.write(data)
            recv_size += len(data)

# 关闭消息
phone.close()




******* 服务端 *********
import socket
import os
import struct
import json
phone = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 重用当前的端口,端口不会被系统回收
phone.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)


ser_dir = '服务端存放的文件夹 与 客户端发送的文件名要进行拼接'

# 绑定手机卡 传入(host,post) ip 端口
phone.bind(('127.0.0.1', 8000))

# 开机
phone.listen(5)  # 最大链接的数量5个 进行3次握手
print('等待链接中....')

while True:  # 链接循环 通信循环,接受客户端
    conn, client_addr = phone.accept()  # 阻塞状态,等待链接 返回两个对象
    print(client_addr)
    # 等待链接
    while True:  # 通信循环
        try:
            # 1.收到命令
            data = conn.recv(8096) # get 1.txt
            print(data.decode('utf-8'))

            # 2.解析命令,获得响应参数
            cmds = data.decode('utf-8').split() # [get,1.txt]
            filename = cmds[1] # 获取文件名

            # 将文件内容写入固定长度当为报头写入

            # 第一步:制作固定的报头发给客户端
            header_dict = {
                'filename': filename,
                'md5': 'xxx',
                'filename_size': os.path.getsize(filename)  # 文件长度 获取文件大小要是需要进行路径拼接的
            }
            header_json = json.dumps(header_dict)
            header_bytes = header_json.encode('utf-8')

            # 第二步 将包头长度进行发送
            header = struct.pack('i', len(header_bytes))  # 只能接受b类型
            conn.send(header)

            # 第三步 将包头信息发送出去
            conn.send(header_bytes)  # 只能接受bytes类型

            # 3.以读的形式进行,让客户端进行文件下载
            # 打开和写入不能与客户端的重复问题
            with open(filename,'rb') as f:
                # 将文件内容传递 循环
                for line in f:
                    conn.send(line)

        except ConnectionResetError as e:  # 当客户端单方面关闭 就会报错,是用错误获取机制,将服务关闭
            break

    # 挂电话
    conn.close()  # 关闭链接

# 关机
phone.close()  # 关闭服务

Python进程与线程

操作系统

操作系统是什么

1.协调管理计算机硬件资源与软件控制程序
	将硬件复杂的细节控制起来形成一个接口给应用程序用
2.管理机器之上的多个进程(qq,微信....)
	
操作系统存储在硬盘之中,
当启动电脑时,就会将硬盘的操作系统代码启动起来,读到内存之中
cpu就从内存中读取操作系统的代码进行执行 (操作系统进程)

当启动qq时,qq的快捷方式是一个绝对的路径
提交给系统一个qq的绝对路径,操作系统(在内存中)到硬盘的根据qq的绝对路径,将qq读到内存中(qq的代码就在内存中),操作系统就调用cup让cup读取内存中的qq的代码(qq进程)执行
操作系统负责调用应用程序

介绍操作系统发展史

第一代的计算机: 真空管 与穿孔卡片
没有进程和编程语言的概念 浪费了计算机资源

第二代计算机: 晶体管和批处理系统
将生产人员和操作人员系统人员与维护人员进行分开
有了操作系统的概念
fortran语言或汇编语言 写入纸上,让后穿孔 成为卡牌
一大堆程序 输入 与 输出
一个一个执行叫做串行执行
节省时间
1401 程序输入输出 io操作
7094 负责就计算

第三代:集成电路和多道技术(还是在批处理系统)
多道程序:就是让cup 在对io操作时 不让cup等待,同时让他干别的

空间复用:内存存入多个程序,物理层面上需要进行程序在内存中进行隔离,如果程序A与程序B 在物理层面不隔离,A关闭,那么B也会关闭,同时操作系统也在内存中,同时也会关闭

时间的复用:共享cup的时间,提升cup的执行效率,当程序发生io操作,就进行切换到其他的程序中,程序io操作的状态进行记录,当代下次cup执行就接着执行这个程序
被称为并发的效果
遇到io cup 进行切换,一个程序运行长了 也要进行cup进行切换
分时操作系统
多个联机终端 + 多道技术

基于多道技术:针对单个处理进行的并发处理
	空间复用
		1.将程序读到内存中,进行隔离开来
	时间复用
		cup在多个进程之间进行切换
		当cup执行当前程序遇到io操作(记录程序的当状态)进行切换到其他的程序中执行。在进程进行切换执行,看似多个程序进程在执行,的并发现象

多道技术:多核cup的话就是并行执行

进程的理论

1.进程?
	正在进行的过程或者说一个任务呀,负责执行任务是cup
2.进程和程序的区别?
	1.程序时一堆代码,而进程是程序的运行过程
	2.进程就是系统在阅读程序的代码,将程序的代码执行的总和
3.执行
    如果出现优先级更高的任务出现,那么cup就会先记录当前执行的任务的状态,记录完毕后,开始执行更高的优先级的任务,同一个程序执行两次,那么就是两个进程

4.进程创建过程
    1.系统初始化
    2.一个进程在运行中开启子进程(如nginx开启多进程)
    3.用户的交互式请求,创建一个新进程(点击启动qq)
    4.一个批处理作业的初始化(只在大型机的批处理系统)
    5.无论哪一种,新进程的创建都是由一个存在的进程执行了一个用于创建进程的系统调用的而创建的
        1.在unix中系统中调用 fork foke会创建一个与父进程一模一样的副本,二者都有相同的映射 同样的环境字符串,同样的打开文件(在shell解释器进程中,执行一个命令就会创建一个子进程)
        2.在win中系统调用是createprocess 处理进程的创建,也负责把正确的程序装入新进程中

        # 相同点:
            进程创建后,父进程和子进程有各自不同的地址空间(多道技术要求物理层面的实现进程与进程之间的隔离)任何进程在修改自己的内容,不会影响其他的进程
        # 不同点:
            在unix中的子进程的初始地址空间是父进程的副本,子进程和父进程可以只读共享内存的,win系统中,父进程与子进程内存地址就不同


# 进程状态
    运行态
    阻塞态
    进行态


# 子进程创建时
    子进程创建时与主进程是隔离的。
    子进程会复制主进程的全部参数内容值
    进程之间可以共享硬盘之间的内容文件数据库之类的

1.进程创建

# 方式1 调用进程对象执行
    import time
    from multiprocessing import Process 

     def func(*args, **kwargs):
         print(args)
         time.sleep(3)


     if __name__ == '__main__':
         # args 传入的是元组,位置传参,kwargs 传入的关键字传参
         p = Process(target=func, args=('子进程1', '这是子进程1'))  # 调用子进程
        p.start()  # start给操作系统一个信号,开启一个进程

         print('主')


#方式2 
	继承当前的进程类
    class Myprocss(Process):

        def __init__(self,name):
            super(Myprocss, self).__init__()
            self.name = name

        def run(self):
            print(self.name)
            time.sleep(1.5)


    if __name__ == '__main__':

        p = Myprocss('wkx')  # 调用子进程
        p.start()  # start给操作系统一个信号,开启一个进程 start默认自动进程执行run方法

        print('主')
      
 



 在python下执行当前py脚本时,当前执行的脚本到p.start()的时候会创建一个子进程
 创建的子进程是当前执行py脚本的子进程
 进程的执行:
     当执行当前py 文件时,会生成一个主进程,从代码的上面执行到下面
     当执行到start()的时候,就会给操作系统一个信号,创建一个子进程,执行func函数

所以:
   主进程就会先打印 
   子进程好打印

2.进程的方法

import time,os
from multiprocessing import Process


def func(*args, **kwargs):
    print(args)
    time.sleep(3)


if __name__ == '__main__':
    p = Process(target=func, args=('子进程1', '这是子进程1'),name='进程1的名字')
    p1 = Process(target=func, args=('子进程2', '这是子进程2'))
    p.start()
    p1.start()
    p.join() 
    p1.join()
    print('主')
    print(p.pid)  
    print(p.is_alive()) 
    p.terminate() 
    print(p.name)
    os.getpid 	
    os.getppid 
    
# 方法解析

1.创建进程,不会立即创建而是给cup发送信号告诉操作系统创建一个子进程
如果需要创建多个进程,那么需要等待上面的发送完毕信号下面创建进程才会发送信号 并行
    进程对象.start()

2.等待子进程执行完毕后,父进程才会执行,也就让当前的主进程等待 子进程执行完毕后,在执行
    进程对象.join()
    
3.查看当前子进程id
    进程对象.pid()

4.查看子进程是否存活
    进程对象.is_alive()
    
5.发送信号终止进程,也是发送一个信号,而不是立即执行
    进程对象.terminate()

6.查看进程的名字
    进程对象.name 默认为:Process-开启进程的数量递进
    
7.执行py文件进程id
	os.getpid()

8.当前py文件的父id
	os.getppid()

getpid 查看当前运行进程
getppid 查看当前运行进程的父进程
理解:
当开启系统是,就会创建一个进程,当点击运行pycharm时,就会在系统的进程中复制一个子进程,启动pycharm
也就是:
	操作系统的进程 主进程
	pycharm的进程 子进程

当在pycharm中执行一个py文件时,就会创建一个进程,而这个进程是由pycharm的进程中复制出来的一个子进程
也就是
	pycharm的进程 主进程
    py文件		子进程
 
当在py文件执行调用了multiprocessing模块创建一个子进程时
也就是,就会从当前py进程复制一个子进程
	py文件    主进程
    multiprocessing创建的进程  子进程

3.僵尸进程和孤儿进程?

1.僵尸进程:
	当父进程执行时需要看到自己进程下的子进程的状态,如果子进程消失,不见了,不行,必须让父进程知道当前的子进程们的执行状态是什么样子的,所有的子进程一旦死掉(执行完毕)就会进入僵尸进程的状态(会将内存清除,但是保留当前的子进程状态),作用就是当前父进程 要看子进程都可以看到子进程,当父进程死掉(执行完毕),就会将自己的儿子进程全部清除
	
    缺点:父进程一直不死的情况下
        如果僵尸进程过多,会导致进程起不来,因为每一个僵尸进程都会拿着一个pid,每个新的子进程起来也会使用一个pid

2.孤儿进程:
	没有父进程,也就是父进程执行完毕,子进程还没有执行完毕,那么这些孤儿进程都会被inid(孤儿进程的孤儿院) 进行接管

4.进程之间的内存隔离?证明

from multiprocessing import Process

n = 100


def work():
    global n  # 修改全局变量
    n = 0  # 将n赋值为0
    print('子进程:', n) # 打印 0


if __name__ == '__main__':
    p = Process(target=work)
    p.start()
    p.join()
    print('主进程:', n) # 打印100


主进程: 100
子进程: 0

# 证明两个进程之间是相互隔离的
当执行p.start() 告诉操作系统创建一个当前执行py文件的进程的一个子进程,将全部参数复制到创建的子进程中
子进程执行work函数,将n赋值为0,但是主进程的值不受影响

5.多进程证明并发?证明

import time
from multiprocessing import Process
from datetime import datetime


def func1(name):
    time.sleep(2)
    print(name)


def func2(name):
    time.sleep(3)
    print(name)


def func3(name):
    time.sleep(5)
    print(name)
    print(datetime.now()) # 2022-12-10 22:54:41.438845


if __name__ == '__main__':
    p1 = Process(target=func1, args=('进程1',))
    p2 = Process(target=func2, args=('进程2',))
    p3 = Process(target=func3, args=('进程3',)) # 最后创建的

    p1.start()
    p2.start()
    p3.start()
    # 不使用join 等待子进程的执行完毕,让主进程直接执行完毕
    print(datetime.now()) # 2022-12-10 22:54:36.353487


证明多个进程进行执行时,会出现伪并发的状态
主进程的执行完毕的时间
 22:54:36.353487

等待时间最长的子进程完成时间
22:54:41.438845
之间相差5秒,说明进程之间存在并发的现象,而不是串行执行

6.守护进程

守护进程:
	1.主进程执行完毕后,代码终止,不论子进程是否执行
	2.守护进程内部的子进程不能在开启进程
作用:
    防止出现孤儿进程和僵尸进程

import time
from multiprocessing import Process

def func(name):
    print(name)
    time.sleep(2)


if __name__ == '__main__':
    p = Process(target=func, args=('进程1',))
    p.daemon = True # 开启守护进程
    p.start()
    p.join() # 开启一个join,让主进程等待,等待子进程执行完毕,销毁
    print('主')
    
开启守护进程 与 join,主进程等待子进程完成后,因为开启守护进程原因就会销毁当前的子进程

7.进程互斥锁(锁概念)

import time
from multiprocessing import Process, Lock
将并发改为串行
'''
进程
进程 0 1
进程 1 1
进程 2 1
进程 0 2
进程 1 2
进程 2 2
进程 0 3
进程 1 3
进程 2 3

出现了子进程谁拿到了共享资源,谁就会先打印
应该是 谁先拿到资源打印完毕后,下一个子进程才会拿到资源

from multiprocessing import Lock # 就试一把锁

1.创建锁,锁必须是唯一的,要不没有意义
2.加锁,当子进程带上锁,那么其他子进程只能等待
3.解锁,当子子进程解锁后,其他进子程进行抢锁,谁拿到谁就会执行公共资源
进程 0 1
进程 0 2
进程 0 3
进程 1 1
进程 1 2
进程 1 3
进程 2 1
进程 2 2
进程 2 3

牺牲了效率,保证了程序不会错乱
'''


def task(name, mutex):
    mutex.acquire()  # 加锁
    print('进程', name, 1)
    time.sleep(1)
    print('进程', name, 2)
    time.sleep(1)
    print('进程', name, 3)
    mutex.release()  # 解锁


if __name__ == '__main__':
    mutex = Lock()  # 创建一把锁
    for i in range(3):
        p = Process(target=task, args=(i, mutex))
        p.start()

 join 与 互斥锁的区别:
	join是将全部代码变为串行
    而互斥锁,只是将需要对数据改变的部分变为串行,其他部分还是伪并行的状态(牺牲效率,保证的了数据的安全)
    

8.项目案例

抢车票案例

import time
from multiprocessing import Process, Lock


def func(name, num, m):
    m.acquire()  # 加锁
    time.sleep(1)
    file = open('1.txt', 'r', encoding='utf-8')
    n = int(file.read())
    file.close()
    if n > 0:
        print(name, '抢到了票', num, )
        n -= num
        file2 = open('1.txt', 'w', encoding='utf-8')
        file2.write('%s' % (n))
        print('还有%d票' % n)
    else:
        print('没票了')
    m.release()  # 解锁


if __name__ == '__main__':
    m = Lock()
    for i in range(10):
        p = Process(target=func, args=(i, 2, m))
        p.start()

'''
注意:进程的内存是隔离的,内部的值也是进程自己私有的
只有在创建子进程时的公共资源是共有的,数据库信息,文件信息

这个案例可以看到 互斥锁就是为了防止 子进程之间对公共资源的同时修改造成的资源混乱现象

锁:
	当自己上厕所时,其他人只能等待厕所外面,只有上完厕所,其他人才能进去上厕所
'''

多进程socket

from multiprocessing import Process
from socket import *


def talk(conn):
    data = conn.recv(1024)
    print(data)
    conn.close()


def servers(ip, port):
    server = socket(AF_INET, SOCK_STREAM)
    server.bind((ip, port))
    server.listen(5)

    while True:
        conn, attr = server.accept()
        # 来一个客户起一个服务
        p = Process(target=talk, args=(conn,))
        p.start()
    server.close()


if __name__ == '__main__':
    servers('127.0.0.1',8000)

9.进程的管道和对列 *

对列使用:
	from multiprocessing import Queue  # 当前进程模块的对列

# 参数:设置对列大小(对列使用的内存的大小数据)
q = Queue(3)  # 实例化对列对象

# 在对列中放数据,当对列满,就会出现阻塞现象,等待另个一个数据拿出来才会将数据取走
q.put('110')
q.put('120')
q.put('119')

# 判断对列是否满了
print(q.full())  # 返回True 或者False
# 查看对列是否清空了
print(q.empty()) # 返回True 或者False

# 取数据,如果对列空了,在次取的时候就会阻塞等待 添加到对列中
print(q.get())
print(q.get())
print(q.get())

对列的生产者消费者模型

import time
from multiprocessing import Process, Queue


def func(q):
    for i in range(3):
        res = '包子%s' % i
        time.sleep(2)
        print('生产者生产%s' % res)

        # 将生产者的生产的产品放入对列中
        q.put(res)


def show(q):
    while True:  # 如果数据取完后,就会卡到
        res = q.get()
        if not res: break
        time.sleep(1)
        # 从对列中取出来内容进行消费
        print('消费者消费了%s' % res)


if __name__ == '__main__':
    q = Queue()

    # 使用多线程,将q对列传入
    p = Process(target=func, args=(q,))

    # 消费者
    c = Process(target=show, args=(q,))
    p.start()
    c.start()
    p.join()  # 让主线程等待子线程执行完毕,也就是当前的生产者生产完毕后
    q.put(None)  # 主线程等待生产者子线程执行完毕后,会执行put none 就是当前对列已经结束了
    print('主线程')

'''
生产者 子进程 与主进程 同时关闭
在主进程关闭的同时 put 一个 None到对列中

在消费者中 当取到None时,就会关闭当前对列
'''



import time
from multiprocessing import Process, JoinableQueue


def func(q):
    for i in range(3):
        res = '包子%s' % i
        time.sleep(2)
        print('生产者生产%s' % res)

        # 将生产者的生产的产品放入对列中
        q.put(res)
    q.join() # 当生产者 生产完毕 就会停止


def show(q):
    while True:  # 如果数据取完后,就会卡到
        res = q.get()
        if not res: break
        time.sleep(1)
        # 从对列中取出来内容进行消费
        print('消费者消费了%s' % res)
        q.task_done() # 给生产者发送信号,我已经取完了,可以停止了


if __name__ == '__main__':
    q = JoinableQueue()

    # 使用多线程,将q对列传入
    p = Process(target=func, args=(q,))

    # 消费者
    c = Process(target=show, args=(q,))
    c.daemon = True # 开启守护进程,当主进程执行完毕,子进程也同时销毁,
    p.start()
    c.start()
    p.join() # 等待生产者 子进程执行完毕,主进程才会执行
    print('主线程') # 当主进程执行完毕后,消费者子进程也会停止,因为开启了守卫进程

'''
等待生产完毕后,对列就消失停止

当消费者进行取时,会发送信号,如果取没了那么就会阻塞停止
内部发送了 我已经消费完毕的信号

生产者生产完毕, 消费者也就消费完毕
利用守护进程的概念进行停止对列
同时利用JoinableQueue 中的join 停止对列

'''

线程的理论

进程:将进程与进程之间进行隔离开来
	那么进程也属于资源单位,每一个子进程之间都存在自己的资源,资源不能共享

真正工作的单位是 线程
每一个进程下面最少会出现一个线程的出现
在同一个进程中的线程是共享资源的(共享进程中的资源)

1.线程创建

线程的创建
from threading import Thread

# 方式1
def func():
    print(123)


if __name__ == '__main__':
    t1 = Thread(target=func)
    t1.start() # 告诉操作系统在我的进程中在造一个线程

    print('')

当前存在多少个进程和线程
当启动py文件是,会生成一个 ide的子进程执行py文件
那么 子进程下面 必定有一个工作的 线程
同时又开启一个线程执行func任务
那么就是 1进程 2线程


# 方式2
class Myprocss(Thread):

    def __init__(self,name):
        super(Myprocss, self).__init__()
        self.name = name

    def run(self):
        print(self.name)



if __name__ == '__main__':

    t1 = Myprocss('wkx')  # 调用子进程
    t1.start()  # start给操作系统一个信号,开启一个进程 start默认自动进程执行run方法

    print('主')

2.进程与线程的区别

1.开进程的开销远大于线程的开销
2.进程内容的多个线程 共享进程的地址空间
3.关于pid内容


# 1. 进程比线程开销大

from threading import Thread
from multiprocessing import Process

def func():
    print('123456')


if __name__ == '__main__':
    # t = Thread(target=func)
    p = Process(target=func)
    # t.start() # 向操作系统发信号,我要创建一个子线程
    p.start() # 向操作系统发出信号,我要创建一个子进程
    print('主')

线程打印顺序:
123456
主
进程打印顺序:
主
123456
线程的执行等同于 共同执行
进程的执行 先打印主,主进程先执行完毕,子进程可能正在申请创建索引 最后才会打印123456
线程创建,不需要在次开辟内存空间,使用当前的进程内的内存空间就可以
进程的创建,需要开辟进程空间,进程与进程之间是隔离的,所以需要单独申请内存空间
所以使用进程的开销比较大



# 2.同一个进程 内的子线程资源是共享的
n = 1000
def func():
    global n
    n = 0
    print(n)


if __name__ == '__main__':
    t = Thread(target=func)
    # p = Process(target=func)
    t.start() # 向操作系统发信号,我要创建一个子线程
    # p.start()
    print(n)

1.当使用进程执行时
    1.给操作系统发出信号,在当前进程中创建一个子进程,将父进程的参数复制一份
        那么 n =1000 就被子进程复制了一份
    2.在外部的打印是当前主进程打印的内容 n=1000
    3.在函数体内部 是创建的子进程去执行的,子进程复制了一份n=1000到自己的进程空间内部了
        当global n  n=0 时,也就将自己进程空间的n=1000 变为了 n=0
    进程与进程是隔离存在
2.当使用线程时
    1.给操作系统发信号,在当前进程内部(存在一个主线程)创建一个子线程
    2.这个子线程存在于当前进程的内存空间内部,所以获得的资源和主线程是相同的
    3.当func内部进行对n=0 进行修改时就会修改进程内部的资源 n=1000
    同一进程内部的线程 共享进程内的全部资源


    ****** 进程的id号 *******
    import os

    def func():
        # 查看当前进程的id号
        print(current_process().pid) # 14404
        # 查看当前父级进程id
        print(os.getppid()) # 8220

    if __name__ == '__main__':
        p = Process(target=func)
        p.start() # 创建一个子进程
        # 获取当前进程id
        print('主进程id',current_process().pid) # 8220
    
    

 3.线程的归属的进程id号 证明进程下的线程资源可以共享
        #  可以证明,当前创建的新线程与原有线程都属于当前进程的内存空间中的,资源可以共享
        def func():
            # 查看当线程的进程id号
            print(os.getpid()) # 8220

        if __name__ == '__main__':
            t = Thread(target=func)
            t.start() # 创建一个子线程
            # 获取当前线程的进程id
            print('主线程id',os.getpid()) # 8220

3.线程的其他方法

from threading import Thread, currentThread,active_count,enumerate


def func():
    print('6666')
    print('当前线程的名字', currentThread().getName())


if __name__ == '__main__':
    
    1.修改线程名
    t = Thread(target=func, name='线程名') # name 属性设置线程名
    t.setName('8899') # 修改创建线程的名字
    t.start()
    
    2.主线程等待子线程执行完毕后子线程销毁
    t.join()
    
    3.查看当前创建的线程是否存活
    print(t.is_alive()) # 返回false ture

    4.查看当前进程下活跃的线程
    # 1.当启动py文件时,会生成创建一个进程,进程内部工作单位是线程(1个)
    # 2.当创建一个线程,也是在当前进程内存空间内,那么线程就是2
    print(active_count()) # 2
    
    5.查看活跃线程对象
    # [<_MainThread(MainThread, started 15356)>, <Thread(线程名, started 13116)>]
    print(enumerate()) 

    6.查看主线程名字(当前进程下的非创建的自带的一个工作最小单位线程)修改主线程名字
    print('主线程', currentThread().getName())  # 查看名字MainThread
    currentThread().setName('666')  # 修改主线程名字

4.守护的概念

1.一个进程在什么时候应该被销毁掉
    干完活后就让进程销毁

2. 一个进程内部存在一个干活的主线线程
    而这个主线程的存在代表了 这个进程是在干活的

    主线程当执行完毕后,会等待其他创建的线程执行完毕

    如果主线程执行完毕直接停止,那么就代表当前进程执行完毕停止

3.在一个进程内,默认不开线程,那么就是一个主线程在工作

4.主线程 代表了 当前进程的生命周期

守护的概念:
    进程:
        当父进程 执行 到创建一个子进程时,子进程开启了守护进程
        也就代表着当前子进程 会守护当前 父进程,当父进程执行完毕,子进程无论是否执行完毕都会停止

        可以理解为 子进程 ----依附---> 父进程
        内部代码怎么执行父进程不管,但是只要父进程执行完毕,子进程无论是否在干什么都要关闭

        也就是 子进程 盯着 父进程  父进程死了 子进程也自杀死掉


    线程:
        一个进程内在初始的情况下就存在一个主线程
        当创建多个线程时,其中一个线程为守护线程
        那么 守护线程 盯着 主线程(主线程死 守护线程 死)
        但是 主线程 代表了 当前进程的生命请求周期
        所以 主线程 需要等待其他 非守护线程 执行完毕后才会死

5.守护线程

from threading import Thread
import time

def func():
    time.sleep(2)
    print('888999')


if __name__ == '__main__':
    t = Thread(target=func)
    t.setDaemon(True) # 守护线程
    # t.daemon = True # 守护线程
    t.start() # 线程创建的速度超级快
    print(123) # 只会打印123 而创建的线程执行func 内部都不会执行
    # 守护线程 主线程si 守护线程si

6.线程互斥锁

牺牲了效率 保护的数据安全
执行原理:将并发改为串行

from threading import Thread,Lock
import time
n = 100
def func(r):
    global n
    r.acquire() # 创建锁
    temp = n
    time.sleep(0.1) # 全部创建的线程都在这里停止 拿到了n=100
    n = temp -1 # 每个线程都相当于 100 -1 最后就是99
    r.release() # 释放锁

if __name__ == '__main__':
    t_list = []
    r = Lock() # 创建一个锁 效率低了 因为时一个一个执行的
    for i in range(15):
        t = Thread(target=func,args=(r,))
        t_list.append(t)
        t.start() # 线程创建的速度快
    for i in t_list:
        i.join()

    print('主',n) # 99
    
   
# 锁只对 修改数据时使用,其他的代码部分不需进行加锁

7.线程中Event的事件

from thrading import Event

even = Event()
even.wait() # 开始 可以设置超时时间,wait(1) 不需要等待其他线程的set()执行,1秒后,这个线程就可以执行下面的任务
even.set() # 结束 那么其他线程内部设置wait() 线程就可以向下执行了
even.is_set() # 是否被设置了 event 返回bool
even.clear() # 恢复event的状态为False
作用:
	一个线程通知另一个线程,我的活干完了,你可以你自己的任务了
	实现线程之间互通

from threading import Thread, Event

even = Event()  # 设置一个even的事件


def s1():
    '''模拟学生'''
    print('学生正在上课')
    even.wait()  # 处于阻塞状态
    print('学生下课休息')


def s2():
    '''模拟老师'''
    print('老师正在上课')
    import time
    time.sleep(5)
    even.set()  # 释放even的事件

    print('老师说下课')


if __name__ == '__main__':
    t = Thread(target=s1)  # 学生A

    t2 = Thread(target=s2)  # 老师

    t.start()
    t2.start()
	

*******
线程的关键特性,就是当前线程是独立的状态运行且不可以预测的,如果程序中的其他线程需要通过某一个线程的状态来确定自己下一步的操作,就可以使用envnt事件

不使用envet事件也是可以的,设置全局变量,其他线程根据这个变量进行是否执行下一步操作

8.线程中的定时器功能

在定时的时间进行执行某些任务


from threading import Timer


def task(name):
    print(name)


# 参数1:间隔的时间秒为单位 参数2 函数 参数3 参数
# 内代码 继承了线程类,开启一个timer就是起一个线程,同时内部还有envnt事件控制者这时间的间隔
Timer(5, task, args=('11111',)).start()



*********** 60秒设置验证码 ********** 
# from threading import Lock,RLock
#
# # mutex = Lock()  # 互斥锁
# # # 互斥锁只能 加锁1次,不能加锁多次
# # mutex.acquire()
# # print('加个锁')
# # mutex.acquire() # 阻塞到这个位置,因为互斥锁不能进行acquire两次
# # print('在加个锁')
# # mutex.release()
#
# # 递归锁,是可以加多个的
# # 递归锁,可以连续的acquire,每一次acquire一次计数器 +1
# # 只有计数器为0时,才会被其他的线程抢到
# r = RLock()
# print('加锁')
# r.acquire()
# print('在加个锁')
# r.acquire()
# r.release()


# from threading import Thread,Semaphore, currentThread
# import time
#
# sm = Semaphore(5)  # 信号量,定义量为5,那么只有5个线程可以同时抢到
#
#
# def task():
#     sm.acquire()  # 加锁
#
#     print('我是谁%s' % currentThread().getName())
#     time.sleep(1)
#     sm.release()  # 解锁
#
#
# for i in range(10):
#     Thread(target=task).start()

# from threading import Thread, Event
#
# even = Event()  # 设置一个even的事件
#
#
# def s1():
#     '''模拟学生'''
#     print('学生正在上课')
#     even.wait()  # 处于阻塞状态
#     print('学生下课休息')
#
#
# def s2():
#     '''模拟老师'''
#     print('老师正在上课')
#     import time
#     time.sleep(5)
#     even.set()  # 释放even的事件
#
#     print('老师说下课')


# if __name__ == '__main__':
#     t = Thread(target=s1)  # 学生A
#
#     t2 = Thread(target=s2)  # 老师
#
#     t.start()
#     t2.start()

#
# from threading import Timer
#
#
# def task(name):
#     print(name)
#
#
# # 参数1:间隔的时间秒为单位 参数2 函数 参数3 参数
# # 内代码 继承了线程类,开启一个timer就是起一个线程,同时内部还有envnt事件控制者这时间的间隔
# Timer(5, task, args=('11111',)).start()


import random
from threading import Timer


class Code:
    def __init__(self):
        '''在初始化的时候,就会执行一次验证码,先获取'''
        self.make_code()

    # 获取验证码:
    def make_cache(self, interval=60):
        # 1.初始获取一个验证码
        self.cache = self.make_code()
        print(self.cache)
        # 2.定期进行修改,默认每隔60秒就会进行换一个验证码
        # 内部的一个循环调用,所以会出现一个阻塞的状态
        self.t = Timer(interval, self.make_cache)
        self.t.start() # 开启定时器
        # self.t.cancel() # 关闭定时器
    # 生成验证操作
    def make_code(self, n=4):
        res = ''
        for i in range(n):
            s1 = str(random.randint(0, 9))
            s2 = chr(random.randint(64, 90))  # 取随机的吗
            res += random.choice([s1, s2])
        return res

Code().make_cache(5)




循环调用函数:
每隔1秒打印123
from threading import Timer


def func():
    print(123)
    Timer(1, func).start()


func()

9.线程队列q

线程内部没有对列的方法,而进程中自带的有Queue
所以需要使用queue 进行对列的操作
import queue

******* 对列 *******
q = queue.Queue(3) # 对列最大3个 先进先出

#  对列加入3个值
q.put(11)
q.put(11)
q.put(11)

# 当对列满的时候
q.put(12,block=False) # 就会抛出异常
q.put(12,block=Ture) # 就会出现阻塞的状态
q.put(12,block=False,timeout=3) # 对列满了,阻塞timeout等待3秒后抛异常

# 对列取值
q.get()


# 当取值的时候
q.get(block=False) 如果对列中没有值,那么就会抛出异常
q.get(block=False,timeout=3) 对列中没有值,就会抛出异常 等待3秒后




**********  堆栈  *******
q = queue.LifoQueue(3) # 后进先出
#  对列加入3个值
q.put(11)
q.put(11)
q.put(11)

# 对列取值
q.get()



优先级的堆栈,可以设置优先级,级别高的先出来
q = queue.PriorityQueue

# 参数1 优先级 参数2 存的值
q.put((10,'666'))
q.put((10,'wkx'))

# 数字越少优先级越高
q.get()

10.多线程套接字socket

import socket
from  threading import Thread

def commit(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except ConnectionError:
            break
    conn.close()


def servers():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('127.0.0.1', 8000))
    server.listen(5)

    while True:
        # 建立成功链接后,立马起一个线程,执行获取内容
        # 每一次来一个链接链接服务器,就会起一个线程(相当于一个服务员服务)
        # 只要来新的就会存在一个新的服务员进行服务
        conn, addr = server.accept()
        t = Thread(target=commit,args=(conn,))
        t.start()
    server.close()


if __name__ == '__main__':
    servers()

GIL全局锁的概念

当执行1.py文件,内部的流程

1.在python解释器的进程空间内将python解释器的cpython代码加载到内存空间中

2.再将1.py内部的代码加载到python解释器内存空间中

3.当执行时,会在当前python解释器进程 创建一个子进程执行1.py文件

4.而1.py 是怎么执行的,需要将1.py中代码 交给cpython解释器中让进行执行(解释性语言)


原理:
	GIL本身就是保证,在同一时间内,只有一个线程执行cpython解释器的代码
	
	就是互斥锁,将并行的线程,变为一个串行的线程,效率低了,但是数据安全了
	只有cpython解释器来说,存在GIL锁的概念
	在一个进程中多个线程执行时,在GIL锁 只有一个线程去执行没办法使用多核优势
	在什么条件下没有多核优势:
		在同一个进程下起多个线程才会出现没有多核优势的情况

        
# 想要使用多核优势: 
     那么就是用多进程,每个进程只有一个主线程的情况,那么就没有其他线程和他抢GIL锁了
	在cpython解释器中,想要使用多核优势,那么就是需要开多进程
    GIL锁时存在cpython解释器中,而python中的代码将传入到cpython解释器中
    所以每次执行,都会先从cpython解释器中获取GIL锁
    cpython解释器(python代码字符串):
        pass
    
    
补充:
	垃圾回收线程 是cpython解释器中定时的开启使用,定时的关闭的,而垃圾回收线程是调用cpython代码进行对没有用的 到而占用的内存进行回收
	
	
# 所以说cpython的 多线程就是假的多线程,想要使用多核优势就需要使用多进程


解释器的代码是所有线程共享的,所有垃圾回收线程可能访问到解释器代码而去执行
问题:
	对于同一个数据100,可能线程1执行 x=100,而垃圾回收机制需要执行回收x=100的操作
	而解决并没有那么高明的操作,就是加锁处理,那个线程获取到了GIL,那么就会执行代码

GIL与自定互斥锁的区别

GIL锁保护的是和cpython解释器有关的,保护的垃圾回收线程的安全
GIL锁不是保护当前自己程序的数据,而是保护cpython中的数据安全


自定互斥锁 帮助的自己编写的数据安全


锁:目的就是要保护数据的安全,同一个时间只能存在一个线程来修改共享的数据,所以保护不同的数据需要加不同的锁
所以GIL与自定义的互斥锁,保护的数据不同,GIL锁保护的是cpython解释器级别的数据(垃圾回收的数据),自定义的锁保护的是自己开发的数据,明显GIL不负责用户数据的安全

进程中存在两个线程

线程1  线程2

同时执行func函数 对 n=0 进行+1操作

线程1 执行 先抢到cpython解释器中的GIL锁,将自己的代码丢给cpython解释器,当在执行到自定义的锁时,就会加上自己的锁,当出现io操作阻塞时,cup不会等待阻塞状态,会记录当前执行的位置,并且强行释放GIL锁。

这个时候线程2就会抢到GIL锁执行到 锁的位置,发现线程1,没有释放锁,线程2就会阻塞,会被操作系统强行的将cpu拿走,释放GIL锁

线程1重新拿到GIL锁,执行完毕内部的数据,自定义释放锁

线程2重新拿到GIL锁,执行内部代码,获取自定义锁,执行代码,到达阻塞,就会被操作系统拿走cup 执行其他的线程任务.....


结论:
	1.对于计算 cup越多越好,但是对于io来说,再多的cpu也没有用
	2.对于运行一个程序来说,随着cup核数增多,那么执行效率就会大大提高
	
	
在一个cup核数的情况下
现在写的软件都是 io密集型的,使用多线程进行处理(io 和网络打交道)
如果是金融类的产品,那就是计算过多,那么就是计算密集型(需要多核优势多线程)

计算密集与io密集型

import os
import time
from multiprocessing import Process
from threading import Thread

# def work():
#     '''计算密集型使用多进程,可以使用多核优势,速度快
#     进程计算: 9秒
#     线程计算: 31秒
#     '''
#     res = 0
#     for i in range(100000000):
#         res *= i


def work():
    '''
    io密集型,也就是一直属于阻塞的状态,cup就会进行遇到阻塞io进行切换执行
    速度比较快,另外多线程,不需要让操作系统申请内存空间,创建的速度也比较块
    所以 io密集型还是比较适合使用多线程

    '''
    time.sleep(2)


if __name__ == '__main__':
    l = []
    state_time = time.time() # 开始时间

    for i in range(400):
        p = Process(target=work) # 10秒执行完毕
        # t = Thread(target=work) # 2 秒执行完毕
        l.append(p)
        #l.append(t)
        p.start()
        #t.start()

    for i in l:
        i.join()
    end_time = time.time() # 结束时间
    print(end_time - state_time)

'''
如果遇到了计算密集型
使用多核优势,开多进程进行计算,速度比较快


遇到io密集型 使用多线程
cup当遇到io操作不会等待,而是进行切换到其他任务上去,
真正的时间是花在任务的切换,可以将单核发挥到性能最大化

'''

死锁与递归锁与信号量

锁A
锁B

线程1拿着 A锁 没有释放
线程2拿着 B锁 没有释放

现在线程1执行到需要使用到B锁
线程2执行到需要A锁

这种情况就属于死锁的概念


from threading import Lock,RLock

# mutex = Lock()  # 互斥锁
# # 互斥锁只能 加锁1次,不能加锁多次
# mutex.acquire()
# print('加个锁')
# mutex.acquire() # 阻塞到这个位置,因为互斥锁不能进行acquire两次
# print('在加个锁')
# mutex.release()

# 递归锁,是可以加多个的
# 递归锁,可以连续的acquire,每一次acquire一次计数器 +1
# 只有计数器为0时,才会被其他的线程抢到
r = RLock()
print('加锁')
r.acquire()
print('在加个锁')
r.acquire()
r.release()

# 互斥锁:保证只能在同一时间内只有一个人去运行
# 递归锁:同时也是这么保证的,但是不同的是,每一把锁。都存在一个计时器,每一抢到锁,这个计时器就会+1,只有当前线程使用的锁,计数器为0,就会释放这把锁,那么其他的线程就会抢到


********** 信号量 **************

信号量也是一把锁,可以指定信号量为5,对比与互斥锁同一时间执行一个线程使用,信号量同一时间可以有5线程去拿这个把锁
比如:
	互斥锁:如多个人在一个出租屋人,但是厕所只有一个,需要等待里面的人上完,其他的人才能取
	信号量:相当于一群路人取公共厕所,公共厕所坑位多,这就一位这同一个时间可以有多个人上公共厕所,但是厕所的容纳是有限的,这就是信号量的大小
	
# 信号量,可以同时定义多个线程,同时拥有这把锁
from threading import Thread,Semaphore, currentThread
import time

sm = Semaphore(5)  # 信号量,定义量为5,那么只有5个线程可以同时抢到


def task():
    sm.acquire()  # 加锁

    print('我是谁%s' % currentThread().getName())
    time.sleep(1)
    sm.release()  # 解锁


for i in range(10):
    Thread(target=task).start()

进程池与线程池

可以将线程和进程控制到一定的数量中
******* 线程池与进程池的使用*********


from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
import os,time
from threading import currentThread
'''
ProcessPoolExecutor # 进程池
ThreadPoolExecutor # 线程池
'''

def func(name):

    print(name,os.getpid())
    time.sleep(1.5)



if __name__ == '__main__':
    # 开进程池
    # 属于一个异步提交,不管这个任务是否起来没有,提交完毕就走,不管是返回值
    p = ProcessPoolExecutor(4) # 指定进程的多少,4个进程 默认是cup的核数

    for i in range(10):
        # 参数1函数名 参数2 函数内用的参数
        p.submit(func,i)
    # shutdown 作用就是让主等待进程池执行完毕后,在进行执行下面的代码 和join的意思是一样的
    # 将进程池入口关掉,不让新的任务进行提交
    p.shutdown()
    print('主')



def func(name):
    print(currentThread().getName())
    print(name,os.getpid())
    time.sleep(1.5)



if __name__ == '__main__':
    # 开线程池
    # 属于一个异步提交,不管这个任务是否起来没有,提交完毕就走,不管是返回值
    p = ThreadPoolExecutor(4) # 开线程池 这个池子就4个

    for i in range(10):
        # 参数1函数名 参数2 函数内用的参数
        p.submit(func,i)
        # 传入多个参数
        p.submit(函数名,参数1,参数2)
    # shutdown 作用就是让主等待线程池执行完毕后,在进行执行下面的代码 和join的意思是一样的
    # 将线程池入口关掉,不让新的任务进行提交
    p.shutdown()
    print('主')

    

    
线程池与进程池获取 调用函数返回值
# result() 方法可以获取调用函数的返回值
from concurrent.futures import ThreadPoolExecutor


def func():
    print(1)
    return 666


if __name__ == '__main__':
    t = ThreadPoolExecutor(5)
    m = t.submit(func).result()
    print(m) # 获取返回值 666 

    
    
******** 线程池与进程池的异步调用和回调机制 ************


# 提交任务的两种方式
# 1. 同步调用:提交完毕任务后,就在原地等待任务执行完毕,拿到结果,才会执行下面的内容,导致程序时串行执行

# 2. 异步调用:提交任务后,不等待任务执行完毕


import random
from concurrent.futures import ThreadPoolExecutor


def func():
    print('开始执行')
    return random.randint(1, 10),666


def show(val):
    # val 不是一个结果,而是一个对象
    # 需要使用result() 对象的返回值获取
    print(val.result()) # <Future at 0x22ed9bf5940 state=finished returned int>



if __name__ == '__main__':
    t = ThreadPoolExecutor(5)
    # 在提交每个任务的时候执行完毕,触发的内容被成为回调函数
    '''
    执行流程:
        add_done_callback(show) :回调函数
        当t.submit(func) 执行完毕后 return 就会触发这个回调函数
        
        不会将return的结果返回给回调函数中,而是将t.submit(func)对象当为一个参数给回调函数中
        所以回调函数需要接受参数
        
        如果t.submit(func) 返回的值时多个那么类型就会变为一个tuple
        <Future at 0x202c6fe5970 state=finished returned tuple>
        
        如果t.submit(func) 返回一个值,那么就会根据这个值的类型不同进行显示
        <Future at 0x202c6fe5970 state=finished returned 根据类型不同>
    '''
    t.submit(func).add_done_callback(show)


    
    
'''
阻塞:
	就是遇到io情况,被操作系统收回cup的使用权限
非阻塞
	就是没有遇到io情况或者执行时间过长
'''

1.基于线程池socket

import socket
from concurrent.futures import ThreadPoolExecutor

def commit(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except ConnectionError:
            break
    conn.close()


def servers():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('127.0.0.1', 8000))
    server.listen(5)

    while True:
        # 建立成功链接后,立马起一个线程,执行获取内容
        # 每一次来一个链接链接服务器,就会起一个线程(相当于一个服务员服务)
        # 只要来新的就会存在一个新的服务员进行服务,线程池内就5个线程,如果链接数量大于5,那么就会有客户端进行等待
        conn, addr = server.accept()
        t = t.submit(commit,conn)
    server.close()


if __name__ == '__main__':
	t = ThreadPoolExecutor(5) # 线程池一共5个线程
    servers()

2.进程池

#进程池设置
from concurrent.futures import ProcessPoolExecutor #导入进程池

def func():
    pass
     return 123

def doo(return):
    pass

if __name__ == "__main__":  #进程
p = ProcessPoolExecutor(4)  

# 创建一个进程池,最多4个进程

    for i in range(10):
        
# 设置回调效果
        特殊对象 = pool.submit(func,参数1....) #执行进程池
        特殊对象.add_done_callback(doo) 
        #调用的函数动作 是由主进程进行函数动作


# 设置回调效果
1.回调函数是由主进程完成的执行函数foo。与线程池不同(由子线程执行回调函数)
当执行func函数时,将返回时封装到特殊对象中。
特殊对象调用.add_done_callback(doo) 回调方法并执行doo函数,将参数传入doo函数中。

2.特殊对象 会接受 func函数中 返回值
在特殊对象调用.add_done_callback(函数名)方法时,会将接受的返回传入 回调函数中
回调函数会接受return的返回值
# 设置回调效果

3.线程池

1.线程池方法:

from concurrent.futures import ThreadPoolExecutor  # 导入线程池

 pool = ThreadPoolExecutor(100)  # 创建100个线程
 pool.submit(函数名,传入函数参数1,参数2.....)  
# 告诉线程,让线程执行函数,线程池会分配一个线程去指向函数 


2.线程池案例:

#从上到下的执行代码 是主线程执行的
from concurrent.futures import ThreadPoolExecutor  # 导入线程池

pool = ThreadPoolExecutor(10)  # 创建10个线程

def func(): #需要执行的函数
    pass

if i in range(100)
# 在线程池提交一个任务,当线程执行完毕后回到线程池,等待线程池的分配,如果没有线程,就会等待
    pool.submit(func) 


print('123')  #主线程执行到这里停止 等待线程池执行完毕

线程池的执行流程:
循环100执行函数func,线程池只有10个,当线程执行完毕后回到线程池等待线程池安排任务。


3.线程池的方法

1.等待线程池执行完毕 主线程才会执行
pool.shutdown(True) 
#主线程执行到这里停止 等待线程池执行完毕 和join方法一样

2.回调方法
    1.创建一个线程池对象
    特殊对象 = pool.submit(func,参数1)  # 让线程池执行函数
    2.利用线程池对象调用.add_done_callback(函数名)方法
    特殊对象.add_done_callback(函数名)  # 通过特殊对像调用方法
    # 当线程池执行完毕后,执行.add_done_callback(函数名) 回调方法内的函数


回调方法案例:

    #从上到下的执行代码 是主线程执行的
    from concurrent.futures import ThreadPoolExecutor  # 导入线程池

    pool = ThreadPoolExecutor(10)  # 创建10个线程

    def func(): #需要执行的函数
        pass

    def show():
        pass

    if i in range(100)
    # 在线程池提交一个任务,当线程执行完毕后回到线程池,等待线程池的分配,如果没有线程,就会等待
        p = pool.submit(func)  # 如果func函数又返回值,p对象就会携带返回值调用回调方法
        p.add_done_callback(show) # 将返回值传入到show函数中当成参数

    print('123')  #主线程执行到这里停止 等待线程池执行完毕

回调函数的执行流程:

线程池执行完毕后,如果函数有返回值,就会将返回值封装到特殊对象中。特殊对象调用方法add_done_callback(函数名),会将封装的数据传入回调的函数中当成参数
当线程池任务完成后,可以根据 线程池返回的特殊对象,调用add_done_callback(函数名)方法,在执行其他的 函数方法

协程

单线程实现并发:就是协程操作
单线程并发:切换与保存状态
两种形式:
	1.操作系统遇到io阻塞状态
	2.时间过长
单线程下的 io阻塞行为,如果遇到io阻塞,那么就会进行切换

协程的优点;
	1. 协程的切换开销小,属于程序级别的切换,操作系统完全感知不到,因而更加轻量
	2. 单线程内可以实现并发的效率,最大限度的利用cup(让操作系统以为你是一个io少的程序,拿到cup的使用权限)
	
	
缺点:
	1.协程的本质还是一个单线程,无法利用多核优势,可以是一个程序开启多个进程,每个进程多个线程,每个线程内开启协程
	2.协程本质就是单线程,一单协程进入阻塞,将会阻塞整个线程
	
特点:
	必须在只有一个线程实现并发
	修改共享数据不需要加锁
	用户程序自己保存多个控制流的上下文栈
	

1.greenlet模块

from greenlet import greenlet


def eat(name):
    print('1 %s' %name)
    g2.switch('wkx')
    print('2 %s' %name)
    g2.switch('wkx')
def play(name):
    print('1 %s' %name)
    g1.switch()
    print('2 %s' %name)



g1 = greenlet(eat)
g2 = greenlet(play)

g1.switch('wkx')

'''
1 wkx
1 wkx
2 wkx
2 wkx

可以函数之间的切换,但是无法进行监控io操作
'''

2.gevent模块

这个模块就是属于单线遇到io操作进行切换到其他的非io任务,也就是单线程并发效果

对greenlet 模块的封装可以监控到io操作


import gevent
from gevent import monkey
import time
monkey.patch_all()



def eat(name):
    print('1 %s' % name)
    time.sleep(3)
    print('2 %s' % name)


def play(name):
    print('1 %s' % name)
    time.sleep(5)
    print('2 %s' % name)


# 提交任务
# 属于异步提交,不会等待程序是否执行完毕
g1 = gevent.spawn(eat, 'wkx')
g2 = gevent.spawn(play, 'clll')

# 主等待任务执行
g1.join()
g2.join()

# 无法识别其他的io操作,只能识别自己模拟的io操作gevent.sleep(1)等待操作

# 模块下存在一个monkey.patch_all() 会将程序下面的全部io打上标记
# 这样就算不是gevent模块的io操作也会被检测到,实现切换操作



******* gevent模块的提交任务 *******
import gevent
from gevent import monkey;monkey.patch_all() # 打补丁 将当前程序下的全部io操作进行标记,让gevent模块能识别,如果不打monkey补丁的话,那么就只会识别gevent模块自己的io操作
import time



def eat(name):
    print('1 %s' % name)
    time.sleep(3)
    print('2 %s' % name)


def play(name):
    print('1 %s' % name)
    time.sleep(4)
    print('2 %s' % name)


# 提交任务
# 属于异步提交,不会等待程序是否执行完毕
g1 = gevent.spawn(eat, 'wkx')
g2 = gevent.spawn(play, 'clll')


# 线程等待,gevent异步提交的任务执行完毕后,才会死掉
g1.join()
g2.join()

# 传入参数 [任务1对象,任务2对象]
gevent.joinall([g1,g2]) # 等待全部的gevent的提交的任务执行完毕后,才会让线程死掉


计算密集型使用的意义不大,只有在遇到io操作,让操作系统将cup的使用权,给到下一个任务进行操作,在遇到io操作在进行切换,这样的话就属于并发执行。
例如:
	A 执行io 6 秒
	B 执行io 3 秒
	C 执行io 2 秒
	
	那么协程gevent 先执行 A程序遇到io(记录当前程序的执行位置),切换执行B程序(记录当前程序的执行位置),切换执行C程序遇到io(记录当前程序执行位置),来回切换ABC正常程序执行下拉,就是一共也就是 6 秒时间 ,因为遇到io等待时间时,操作系统将cup的使用权限给到其他程序去执行,也就是利用A程序io操作的时间内执行完毕了BC程序。大大节约了时间

gevent模块实现单线程并发soket服务

import socket

from gevent import monkey, spawn

monkey.patch_all()  # 打补丁,将全部的io操作打上标签让gevent模块可以使用


def commit(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except ConnectionError:
            break
    conn.close()


def servers():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('127.0.0.1', 8000))
    server.listen(5)

    while True:

        conn, addr = server.accept()
        spawn(commit, conn)  # 进行提交gevent 协程 异步任务
    server.close()


if __name__ == '__main__':
    g = spawn(servers)  # 进行提交gevent 协程 异步任务
    g.join()  # 必须让线程等待 gevent内部的任务,因为gevent任务属于异步触发,提交任务,不等待任务完成,所需需要线程等待任务进行执行

IO模型

同步操作:会等待执行完毕的结果,在执行下一步
异步操作: 提交完毕后,不会等待结果的产生,而是执行下一步的操作
阻塞,非阻塞
同步不等于阻塞
遇到io就是阻塞


套接字:
	accept() send() recv() 属于阻塞行为,等待收发消息
例如:
	recv()接受 差不多 accept()等待
	1.等待缓存数据(客户端发给服务端的数据,会通过网卡,网络到服务端网卡,在到缓存中)
	2.操作系统拷贝服务端内的缓存中(客户端发送的数据)
	
	send() 也属于io 但是为什么体会不到他的等待时间,因为当进行send()时,数据就丢给了操作系统,剩下的操作都是由操作系统来执行(数据量少) 当数据量大时还是等体会到等待的时间的

1.阻塞IO模型

单线程子节套

只有到
sever.accept() # 出现阻塞 等来链接
一直处于wait data 读取是否有新的链接来
如果没有那么操作系统就会收回cup的使用权限给其他的非io状态的程序
那么当前的服务端程序时没有占cup使用,那么效率非常低
sever.accept() wait data  copy data的两个状态

当运行客户端时:
那么就会被度读取到,sever.accept()变为非阻塞状态

阻塞到conn.recv(1024)那么当前属于等待客户端发送数据的阶段,服务端就会询问操作系统是否有新数据来。conn.recv(1024) 属于 wait data copy data


阻塞io 没有并发效果:	
	因为当遇到阻塞,操作系统就会将cup拿走
实现并发:
	开多线程,让主线程(当前进程中的工作线程,执行accept()等待链接操作),没来一个链接 起一个线程进行处理新的链接,缺点:随着客户端链接增多,那么线程就会越来越多,会出现机器抗住不,
	开线程池 固定当前线程的多少,问题规模大,线程池也会降低效率
	
# 最开始学习soket模块就是一个单线程阻塞io模型,当前线程等待客户端链接,处于io状态,recv 等待客户端数据,io状态

2.非阻塞IO模型

在单线程条件下,监控io操作进行动态切换,提升效率

wait data 与copy data(使用的时间短,从操作系统的缓存区拷贝到用户的内存中) 的操作

有应用程序发送系统调用,收一条消息 发给操作系统内核
	如果没有数据,马上获取一个结果,error,知道数据没有准备好
	在下次发送系统调用的时间段内,进行做其他的# 事情 # 
	直到问操作系统,有数据,就会操作系统的缓冲区拷贝到用户的内存中

非阻塞io,也就是在wait data中 在等待数据来的时候,这段时间内,去干其他的事情,等到数据真正来了,再去执行copy data 等一系列操作

******* 非阻塞io *********
from socket import *

server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1', 8000))
server.listen(5)
# 默认为Teur 是一个阻塞的io 模型 设置为Fasle那么就是一个非阻塞
# 下面的代码全部就变为了非阻塞的状态
server.setblocking(False)
print('state')
rlist = []
wlist = []
while True:
    try:
        conn, attr = server.accept()  # 阻塞
        rlist.append(conn)  # 将链接对象添加到列表中
        print(rlist)
    except BlockingIOError:

        # 只负责收消息
        del_rlist = []
        for conn in rlist:  # 循环链接对象
            try:
                # 这里也属于io阻塞,使用非io阻塞模型,会出现错误,使用try步骤
                data = conn.recv(1024)  # 阻塞
                if not data:  # 如果收到的消息为空
                    del_rlist.append(conn)  # 添加到删除列表中
                    continue  # 跳过开始下一次循环

                # 将发送的消息添加到列表中
                wlist.append((conn,data.upper()))
            except BlockingIOError:  # 出现错误说明链接没有新内容,跳过开始下个循环,看看下个是否有新消息
                continue
            except Exception:
                # 出现链接断开的情况
                conn.close()
                del_rlist.append(conn)

        # 发消息
        del_wlist = []
        for item in wlist:
            try:
                conn = item[0]
                data = item[1]
                conn.send(data)
                del_wlis.append(item)
            except BlockingIOError:
                pass

        # 删除已发送消息列表
        for item in del_wlist:
            wlist.remove(item)

        # 将断开链接的客户端,从循环添加的rlist 中进行删除
        for conn in del_rlist:
            rlist.remove(conn)

server.close()
# 这样的话可以实现并发,出现io切换到执行其他的活中


# 不建议使用,会造成cup被当前程序占用,但是没有功的占用,造成cup,使用率高,其他程序没办法使用的状态
缺点:
	1.如果在io阻塞,线程去干其他的活时,数据来了,没办法及时响应
    2.当前程序属于一个及时状态随时准备的状态,那么操作系统 就会将cup尽量了给当前程序使用,但是当前处于死循环的状态,处于大量的询问工作,无用工作,cup的使用率高的,导致cup的吞吐量降低

3.多路复用io模型

可以同时检测多可套接字 效率高
如果检测1个,那么不如阻塞io的效率


当用户进程调用select ,那么整个进程就会被block(阻塞),相当于select 就如同中介一般进行在中间对操作系统进行询问,等待操作系统回消息。同时 kernel(计算机核心)会监控所有的select负责的soket(多个)

当任何一个soket 数据准备好了,那么select就会返回,这个时候进程就会调用read操作,将数据从 kernel(计算机核心)拷贝到用户进程的内存中

与阻塞io相比
就是多了一个步,中介 select 检测套接字io行为


******* 通过select去问操作sockt是否有数据或者链接  *******
import select
from socket import *

server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1', 8001))
server.listen(5)
# 默认为Teur 是一个阻塞的io 模型 设置为Fasle那么就是一个非阻塞
# 下面的代码全部就变为了非阻塞的状态
server.setblocking(False)

rlist = [server]  # 读消息的套接字,就是当前的服务器链接
wlist = []  # 写消息的套接字
wdata = {}
while True:
    # 这个方法就是帮我们问操作系统,代理管理的 socket 数据是否准备好了
    # 参数1 收的套接字列表  参数2 发套接字列表 参数3 出异常的列表 参数4 询问操作系统的间隔时间
    rl, wl, xl = select.select(rlist, wlist, [], 0.5)  # 询问成功后会出现一个返回值 收套接字列表,发套接字列, 出异常的套接字列表
    print(rl, wl, xl)

    for sock in rl:  # 循环收套接字列表
        if sock == server:  # 如果与server 列表原内容收套接字相同,那么就是一个建立新链接的操作
            conn, attr = sock.accept()
            rlist.append(conn)  # 将新建的链接 conn 添加到 收套接字列表中,进行监控
        else:
            try:
                data = sock.recv(1024)
                wlist.append(sock)  # 添加到发消息的套接字列表中 进行监控
                wdata[sock] = data.upper()  # 将发消息的对象 k v 结构存放到字典中
            except Exception:
                sock.close()
                rlist.remove(sock) # 从

    # 传
    for sock in wl:
        data = wdata[sock]  # 从消息字典中根据对象获取 val
        sock.send(data)  # 将消息发出去
        wl.remove(sock)  # 将对象从监控 传套接字中列表中剔除
        wdata.pop(sock) # 在将发的数据从发消息字典中剔除
server.close()

# 这样的话可以实现并发,出现io切换到执行其他的活中


缺点:
	select 会随着,监听的sokcet的增多,而造成效率越来越低
    因为操作系统对select 监听的名单列表,也是进行循环查看的(遍历方式)
    

4.异步io

效率最高:当进行对操作系统进行询问的时候,会直接返回。下面的全部wait data 与copy data 都是操作系统进行做的事情,等有了数据,那么操作系统就主动将数据传递


用户进程发起 read操作 立即返回执行其他的事情,而另一方面  kernel的角度 当他受到一个异步io之后 会立即返回,所以不会对用户的进程产生任何的阻塞操作,然后 kernel会等待数据准备完成后,将数据拷贝到当前用户的进程内存中,当这一切完成后,kernel会给用户就进程发送信号signal,告诉他read操作完了

Python协程

协程概念

1.异步非阻塞,asyncio
2.异步框架: 提升性能
	tomado fastapi django3.x asgi aiohttp 

协程是什么

协程是不是计算机提供出来的,程序员自己创建的。

协程(coroutine) 被称为微线程,是一种用户动态上下文切换的技术,简而言之使用一个线程在代码中进行切换的过程

1.采用原来的同步指向(代码从上到下执行) 一共使用了3秒时间

def func1():
    print(1)
    time.sleep(1)

def func2():
    print(2)
    time.sleep(2)

print('开始秒数' + time.strftime('%S'))
func1()
func2()
print('开始秒数' + time.strftime('%S'))



2.实现线程的方式

1.greenlet 早期模块
2.yield 关键字控制
3.asyncio装饰器 py3.4
4.async await 关键字 py3.5 [推荐]

greenlet实现协程方式

1.pip install greenlet

2.案例
from greenlet import greenlet


def func1():
    print('func1-第1次打印')
    gr2.switch()  # 2.切换到func2 进行执行[会进行记录当前这个函数执行的位置,如果切换回来就从当前开始执行]
    print('func1-第2次打印')
    gr2.switch()  # 4.切换到func2 进行执行


def func2():
    print('func2-第1次打印')
    gr1.switch()  # 3.切换到func1函数执行
    print('func2-第2次打印')


# 注册到greenlet 对象
gr1 = greenlet(func1)
gr2 = greenlet(func2)
# print(gr2) # greenlet.greenlet object
gr1.switch()  # 1.启动gr1对象的进行执行

yield关键字实现

def func1():
    yield 1
    yield from func2() # 跳到 执行func2生成器
    yield 2


def func2():
    yield 3
    yield 4

f = func1()
for i in f: # 循环执行生成器
    print(i)

asynico实现

实际执行时间2秒

import asyncio
import time

@asyncio.coroutine
def func1():
    print(1)
    yield from asyncio.sleep(1)  # 模拟io请求
    print(2)


@asyncio.coroutine
def func2():
    print(3)
    yield from asyncio.sleep(2)
    print(4)

print('开始秒数' + time.strftime('%S'))
tasks = [asyncio.ensure_future(func1()), asyncio.ensure_future(func2())]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('结束秒数' + time.strftime('%S'))

async与await关键字实现

实际执行时间2秒
@asyncio.coroutine 替换为  async def 
yield from 替换为 await


import asyncio
import time

# 通过关键字将普通函数包装为协程函数(coroutine func)
async def func1():
    print(1)
    # await:1.将asyncio.sleep(1)协程对象 包装为task任务 告知event loop 2.将控制权限还给event loop
    await asyncio.sleep(1) # 模拟i/o操作

# 协程函数
async def func2():
    print(2)
    await asyncio.sleep(2)


async def main():
    print('开始秒数' + time.strftime('%S'))
    coroutine_list = [func1(), func2()] # coroutine func object list
    await asyncio.gather(*coroutine_list)
    print('结束秒数' + time.strftime('%S'))

if __name__ == '__main__':
    asyncio.run(main()) # main 获取event loop权限执行mian task任务执行 (协程函数对象)
    
当前的优势:
    遇到io操作,就会将线程进行切换执行其他的task进行执行,大大的节省了时间的损耗

协程的意义

在一个线程中如果遇到io等待时间,线程不会傻傻的等待,利用空闲的时间赶其他的事情,大大的提高了效率


1.实例代码:同步方式[排队执行] 使用3秒下载完成
import requests

def download_img(url,img_name):
    res = requests.get(url)
    with open(f'{img_name}.png', mode='wb', ) as file_obj:
        file_obj.write(res.content)
    print('本地下载完成' + f'{img_name}.png')


url_list = [
      'url2',
      'url1',
      'url3'
]
print('开始下载时间' + time.strftime('%X'))
for index,url in enumerate(url_list):
    print('url:' + url)
    download_img(url,index)
print('结束下载' + time.strftime('%X'))

# 如果图片下载时间为1分钟,需要花费3分钟[第一张下载完毕后,才会执行下一次下载]




2.实例代码:协程方式[不等带结果执行下一个] 使用1秒将3张图片下载完成
pip install aiohttp # 需要使用这个模块

import time
import asyncio
import aiohttp

async def download_img(session, url, img_name):
    async with session.get(url, verify_ssl=False) as response:  # 发送io请求
        content = await response.content.read()
        with open(f'{img_name}.png', mode='wb', ) as file_obj:
            file_obj.write(content)
            print('本地下载完成' + f'{img_name}.png')


async def main():
    print('开始秒数' + time.strftime('%S'))
    async with aiohttp.ClientSession() as session:
        url_list = [
            'url2',
            'url1',
            'url3'
        ]
        # 协程对象列表
        task_list = [download_img(session, url, index) for index, url in enumerate(url_list)]
        await asyncio.gather(*task_list)  # 将协程对象列表批量添加到event loop 中作为task任务
    print('结束秒数' + time.strftime('%S'))


if __name__ == '__main__':
    # asyncio.run(main()) # 使用这种方式会出现RuntimeError: Event loop is closed 异常
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
   
# 使用协程asyncio模块需要配合着能使用协程方式的模块进行操作,不然不会有所作用

异步编程

事件循环'event loop'

事件循环: 称为'event loop'
作用: 可以理解为死循环,去检测并执行某写代码[检测任务列表中得任务]

理解代码:
    
任务列表 = [任务1,任务2,任务3]

while True:
	可执行任务列表,已完成任务的任务列表 = 去任务列表中检测全部任务,将可执行和可完成的任务返回('相当于当前的任务列表中得任务存在多个状态,将每个状态进行分类进行循环,直到任务列表中任务完全没有为止')
    
    for 就绪任务 in 已经准备就绪的任务列表
    	执行就绪的任务
    
    for 已完成的任务 in 已完成的任务列表
    	从'任务列表'中进行移除已完成的任务
  
如果任务列表中得任务全部完成,那么这个死循环就会终止

'''
简单的理解事件循环的操作(大致的流程):

任务列表 = ['正在io请求的任务','已经完成的任务','可以执行的任务']
那么事件循环 event loop 就会先将 '正在io请求的任务无视'
将已完成的任务从任务列表中剔除
执行可以执行的任务

如果可以执行的任务出现【io请求的操作】,那么就会【放任这个任务进行io操作】,并【标记】当前任务时【请求io的任务】,执行下一个任务。

如果【当前任务】已经完成,那么就会添加到【已完成任务列表】中,进行循环从【任务列表】中进行剔除
'''


代码:
import asyncio

# 当前 生成或者获取一个事件循环
loop = asyncio.get_enent_loop()
# 将任务放到任务列表中,让可以理解为让当前的任务可以被事件循环监听
loop.run_until_complete('存放任务')

协程函数与协程对象

1.协程函数 coroutine func
async def 函数名: #  async关键字为前缀的函数被称为'协程函数'
    pass

2.协程对象 coroutine object
协程函数() # 协程函数+()得到的就是协程对象,当前函数并不会执行


3.协程函数与普通定义函数区别
最大的区别就是,普通函数()当前函数就会进行执行,协程函数()只能获取协程对象[内部代码不会执行]

协程函数的使用(async关键字)

async def func()
	print('我是协程函数')

asy = func() # 不会执行
print(asy) # 获取当前协程函数的对象[协程对象]


怎么才能让协程函数执行?
需要借助 asyncio 模块中得事件循环 event loop

import asyncio

async def func():
    print('123456')

# 执行方式
    1.python3.7之前的执行操作    
    # 事件循环
    loop = asyncio.get_event_loop()
    # 使用事件循环执行协程函数
    loop.run_until_complete(func())

    
    2.python3.7后执行的方式
    asyncio.run(func())


# 关于asyncio.run
内部帮助我们创建获取事件循环,并让事件循环执行传入的函数,当前run()作为一个异步编程的入口进行操作
run 的内部还是完成了原来版本的操作 执行创建或者获取event loop 将任务执行


# 注意:
	异步函数一定要不事件循环进行一起使用,没有事件循环异步函数是没办法执行的

协程函数的使用(await关键字)

# await 等待对象[需要等待对象执行完毕后才会执行下面的代码]
# await +  代表的可等待的对象(协程对象,future,task对象[这三个对象可以理解为io等待对象])

await最重要的最用: 会将协程对象包装为一个task任务,告知event loop。


import asyncio


案例1:

async def func():
    print('123456') 1.先执行当前打印
    # 模拟io等待
    res = await asyncio.sleep(1)  2.await会将当前协程对象对象包装为task任务,添加到event loop中,并告知当前task任务需要等待(会记录当前执行的位置),将权限还给event loop,去执行其他不需要等待(等待完成的任务)的task任务
    print(res) 3.获取io等待返回值,当前等待结束变为可执行任务(已经有了执行的结果),event loop在下次循环中 会将权限给func()函数,从当前记录位置开始执行(并接受返回值)

asyncio.run(func())




案例2:

async def show():
    print('正在下载中...')  4.打印
    await asyncio.sleep(2) 5.等待asyncio.sleep(2)协程方法
    print('下载完成...') 5. 打印
    return '文件.txt' 6. 返回值

async def func():
    print('我要开始下载了')  2.打印
    # 执行的协程函数的执行结果的返回值
    res = await show() 3.包装携程对象 并进去show()进行执行 7.接受返回值
    print(res)  8.打印返回值

asyncio.run(func()) # 1.执行func() 协程对象



案例3

import asyncio
async def show():
    print('正在下载中...')
    await asyncio.sleep(2)
    print('下载完成...')
    return '文件.txt'

async def func():
    print('我要开始下载了')
    res1 = await show()
    print(res1)
    res2 = await show() # 等待第一个await等待对象的值得到结果后才会执行
    print(res2)

asyncio.run(func())

只有第一个await协程对象(将这对象包装为一个event loop中得一个任务)执行完毕后,并且获取执行结果后,才会执行下一个协程函数
原因: 因为第二个协程对象并未被await包装为任务,告知event loop。代码是从上而下的执行,并没有执行到第二个await

Task对象

task作用:
    1.帮助我们在事件循环(event loop)中添加多个任务
    例如:
        任务列表 = [task1,task2,task3] # 添加多个任务到event loop 中
        当某个任务出现io操作就会就会进行任务切换
    
    2.tasks 用于并发的调度协程,通过asyncio.create_task(协程对象)[3.7上使用]的方式创建task对象,这样可以将协程对象添加到event loop(事件循环)。其他的方式(更低级) loop.create_task() 或者asyncio.evsur_future()[3.7下使用]不建议手动实例化task对象
    

# 重点:

import asyncio
import time


1.实例1通过create_task将协程对象包装为task任务[2个任务执行2秒]

# 使用的较少create_task创建任务 需要大量的await进行通知event loop调度

async def show(num):
    print('show函数正在执行...%s'%num)
    await asyncio.sleep(2) # 模拟io阻塞
    print('show函数执行完毕...%s'%num)
    return 'ok'

async def func():
    print('执行func函数')
    # 创建task对象,将当前执行show函数添加到事件循环中(event loop)
    task1 = asyncio.create_task(show(1)) 
    task2 = asyncio.create_task(show(2)) 
    #  当执行到某个遇到io操作会自动化切换执行其他任务
    #  此处的awati是等待响应的协程全都执行完毕后获取的结果

    task1_return = await task1 
    task2_return = await task2
    print(task1_return,task2_return)
   
print(time.strftime('%S'))
asyncio.run(func())
print(time.strftime('%S'))

执行过程说明:
1.现在event loop中存在3个任务: [func(),task1,task2]

2.执行过程模拟(未必详细,大概过程)
	2.1 执行脚本时,先执行asyncio.run()执行func()对象,并将任务添加到event loop中
    
    2.2 执行到asyncio.create_task() 添加任务到event loop中[将task1-task2添加],任务列表中已经存在了3个任务
    
    2.3 await task1 就会执行task1任务内部代码,遇到 asyncio.sleep(2) task1任务阻塞,'开始任务切换' 
    
    2.4 切换到task2任务(为什么不会切换到func任务中,因为当前的任务是在func任务内,task1在阻塞,那func也在阻塞),遇到 asyncio.sleep(2) task2任务阻塞,'开始任务切换' 
    
    2.5.切换到task1任务执行完成 获取返回值
    
    2.6.切换到task2任务执行完成 获取返回值
    
    2.7.func()任务执行完成,整段程序执行完毕
    
    

2.实例代码2

# 使用的较多,节省await编写调度

asyncio.gather(*任务列表)批量调度 # 需要通过*解构列表
asyncio.wait(任务列表)批量调度 # 直接添加列表



async def show(num):
    print('show函数正在执行...%s'%num)
    await asyncio.sleep(2)
    print('show函数执行完毕...%s'%num)
    return 'ok'

async def func():
    print('执行func函数')
    # 执行的协程函数的执行结果的返回值
    task_list = []
    
    # 循环将任务呢添加到任务列表中task_list
    for i in range(5):
        task = asyncio.create_task(show(i))
        task_list.append(task)
    
    # 方式1:通过gather方法批量通知event loop进行调度 asyncio.gather
     res = await asyncio.gather(*task_list) 
    print(res) # 接受任务执行返回的结果 [task1,task2] 存放结果为添加任务的顺序
    
    # 方式2:直接将列表添加 asyncio.wait
    done,pending = await asyncio.wait(task_list,timeout=1) # timeout=None
    print(done) # 返回的结果的集合 
    print(pending) # 返回timeout超时未完成任务集合 与参数timeout(设置后,timeout=2,任务最大执行时间为2秒,超出存入当前集合中)
    
   
print(time.strftime('%S'))
asyncio.run(func())
print(time.strftime('%S'))


注意:
# 不是用函数调用方式添加任务需要一下写法    
async def show(num):
    print('show函数正在执行...%s'%num)
    await asyncio.sleep(2)
    print('show函数执行完毕...%s'%num)
    return 'ok'

task_list = [show(1),show(2)]
'''
不能
task_list = [
asyncio.create_task(show(1)),
asyncio.create_task(show(2))
]
因为当前操作不直接将协程对象添加到事件循环中 event loop 但是当前还没有创建事件循环就会报错

async def func():
	pass
asyncio.run(func()) 会先创建事件循环将func()对象包装为任务进行添加并且执行
'''
asyncio.run(asyncio.wait(task_list)) # 当前内部代码如果发现并事件循环就会进行创建

asyncio.Future对象

future是底层对象,是task类的基类

Task继承Future,task对象内部await结果的处理基于future对象的

import asyncio
实例1:

# 在这种情况下创建future对象毫无意义因为没有用返回结果就处于等待
async def func():
    # asyncio.get_event_loop()  获取当前执行 run 创建的事件循环对象
    loop = asyncio.get_event_loop()
    print(loop)

    # 创建一个任务(future对象),这个任务什么都不干
    fut = loop.create_future()
    # 当前就会在等待任务(future对象) 等待结果出现,如果没有就一直等待
    await fut

asyncio.run(func())


案例2
# 如果有返回值那么future对象就会执行完毕,如果没有就会阻塞等待返回值
async def func1(fut):
    # 4.执行任务
    await asyncio.sleep(2)
    # 5.给future对象进行设置值
    fut.set_result('666')


async def func2():
    # 1.获取当前事件循环
    loop = asyncio.get_event_loop()
    # 2.创建一个future对象
    fut = loop.create_future()
    # 3.添加一个任务到携程对象中同时将future对象传入做参数
    await loop.create_task(func1(fut))

    # 6.等待fut对象如果向下执行,没有值阻塞等待值
    data = await fut
    print(data)

asyncio.run(func2())

####
await asyncio.create_task(函数对象) 创建任务,任务完成后自动执行set_result进行设置返回值 等待的状态不会等待

coucurrent.futures.future对象

使用线程池与进程池实现异步函数时使用的对象与asynio没有关系

# concurrent 项目中得第三方模块不支持异步,只支持线程异步或者进程异步使用当前模块

import time
import asyncio
from concurrent.futures import Future
from concurrent.futures.thread import ThreadPoolExecutor  # 线程池
from concurrent.futures.process import ProcessPoolExecutor  # 进程池

1.基本使用了解
def func(value):
    time.sleep(1)
    print(value)
# 创建线程池
pool = ThreadPoolExecutor(max_workers=5)
# 创建进程池
# pool = ProcessPoolExecutor(max_workers=5)
for i in range(10):
    # 返回值会赋值给fut
    fut = pool.submit(func,i)
    print(fut)
    

2. 使用场景: # 重点
	项目中进行异步时80%为异步编程,但是20%使用项目使用的模块不支持异步编程可以使用当前方式
 
def func1():
    print('我是一个普通函数')
    return '666'


async def main():
    # 获取事件循环
    loop = asyncio.get_event_loop()
    # 创建一个Future对象并且将函数包装为可以被async执行的协程对象
    # 默认使用的是线程模式 run_in_executor(线程池对象或者进程池对象[如果参数为none那么默认使用线程池],普通函数(第三方模块不支持异步方式))
    
    # 1.内部调用ThreadPoolExecutor的 submit 方法申请一个线程 去执行func1函数并返回一个concurrent.futures.Future对象
    
    # 2. asyncio.wrap_future 将 concurrent.futures.Future对象包装为 asyncio.Future的对象
    
    # 因为concurrent.futures.Future对象不支持 await方法所以需要包装
    fut = loop.run_in_executor(None, func1)
    res = await fut
    print(res)

    # 使用线程池: 可以传入max_workers 固定线程数量
    with ThreadPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, func1)
        print(res)

    # 使用进程池 可以传入max_workers 固定进程数量
    with ProcessPoolExecutor() as pool:
        res = await loop.run_in_executor(pool, func1)
        print(res)


if __name__ == '__main__':
    asyncio.run(main())

案例:asyncio+不支持异步模块

import asyncio
import requests
import random


# asyncio + 不支持异步的模块下载


async def download_img(url):
    print('开始下载图片', url)
    loop = asyncio.get_event_loop()
    # requests模块默认不支持一步操作,所以使用线程池方式配合
    # run_in_executor方式非异步转为asyncio.future进行异步执行
    # 内部默认是线程方式执行,3个人任务就是3个线程,那么资源上比普通的asyncio要大
    future = loop.run_in_executor(None, requests.get, url)

    ret = await future

    file_img_name = str(random.randint(1, 10)) + '.jpg'
    with open(file_img_name, mode='wb') as f:
        f.write(ret.content)


if __name__ == '__main__':
    url_list = [
        'https://ts1.cn.mm.bing.net/th/id/R-C.df4462fabf18edd07195679a5f8a37e5?rik=FnNvr9jWWjHCVQ&riu=http%3a%2f%2fseopic.699pic.com%2fphoto%2f50059%2f8720.jpg_wh1200.jpg&ehk=ofb4q76uCls2S07aIlc8%2bab3H5zwrmj%2bhqiZ%2fyw3Ghw%3d&risl=&pid=ImgRaw&r=0',
        'https://pic3.zhimg.com/v2-58d652598269710fa67ec8d1c88d8f03_r.jpg?source=1940ef5c',
        'https://tse1-mm.cn.bing.net/th/id/OIP-C.xq6cOv82ubIhJY9qkFd5AgHaEK?pid=ImgDet&rs=1',
    ]

    loop = asyncio.get_event_loop()

    task_list = [download_img(i) for i in url_list]
    loop.run_until_complete(asyncio.wait(task_list))

异步迭代器(不重要)

异步迭代器:
实现了 __aiter__() 和 __anext__() 方法的对象。__anext__必须返回一个awaitable对象 async for会处理异步迭代器 __anext__() 方法返回可执行等待对象,直到引发 stopasynciteration异常

异步可迭代对象
可在 async for 语句中使用的对象,必须通过__aiter__()方法返回一个asynchronous.iterator


import asyncio


# 无太大的应用场景 
# 除非使用循环取每一个值,但是又不想使用 内部循环时可以使用
class Reader:
    # 自定义异步迭代器(同时可以异步迭代对象)
    def __init__(self):
        self.count = 0

    async def readline(self):
        self.count += 1
        if self.count == 100:
            return None
        return self.count

    def __aiter__(self):
        return self

    async def __anext__(self):
        val = await self.readline()
        if val is None:
            raise StopAsyncIteration
        return val


# 正常的迭代器
# r = Reader()
# for i in r: print(i)

# 异步的迭代器
# async fro 必须写在异步函数内
async def func():
    r = Reader()
    async for i in r:
        print(i)


if __name__ == '__main__':
    asyncio.run(func())

异步上下文管理器(重要)

上下文管理器:with 上下文 自动帮打开文件写入信息并且在内容写入完毕后自动关闭文件

异步上下文管理器是一样的概念
使用对象定义 __aenter__() 和 __aexit__() 方法 对async with语句中的环境进行控制

# 在类中定义了:
	__aenter__() 和 __aexit__() 支持async with上下文方式 [在正常定义的类中如果使用这个方法也可以使用这种方式]
    
    
import asyncio


# 作用例如: 链接数据库 关闭数据
# 正常的非异步 只要类中定义了这类方法都可以进行上下文
class AsyncioM:

    def __init__(self):
        self.count = 0

    async def func(self):
        return 666

    async def __aenter__(self):
        # 异步链接数据库操作
        self.conn = asyncio.sleep(1)
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 异步关闭数据库链接
        await asyncio.sleep(1)


# async with 方法需要在异步函数内进行使用
# 当使用是会执行当前类中得__aenter__ 这个方法返回什么那么f就是什么[可以进行设置数据库链接]
# 当上下文完成后 就会自动使用__aexit__方法[关闭数据库链接]
async def func1():
    async with AsyncioM() as f:
        res = await f.func()
        print(res)


asyncio.run(func1())

uvloop事件循环

事件循环的替换方案
效率: 默认event loop < uvloop(接近go语言效率)


1.安装
pip3 install uvloop # 当前模块暂时不支持win系统(无法安装)

2.需要将默认事件循环替换为 uvloop

import asyncio
import uvloop
# 将以前的event loop替换为uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# 代码与原来的一样

asyncio.run(..)


框架内部使用了asgi -> uviorn(内部使用了unloop)

案例

异步Redis操作

假设:
	后端服务 A服务器
	redis数据库 B服务器
那么后端代码与redis进行交互式的时候,就会产生网络请求(网络io)链接/操作/断开都是网络io请求。
# 语法都是相近的只不过 同步变异步
# 文档;https://aioredis.readthedocs.io/en/latest/migration/



1.安装
pip install aioredis

2.使用
import asyncio
import aioredis

'''
aioredis模块基础了原来的redis的类使用方式是相同
'''


async def execute(address, password=None):
    print('开始执行链接操作', address)

    # 1.创建链接
    # decode_responses 获取参数时解码
    # encoding 用于响应解码的编解码器。
    # 链接方式1: 默认的生成方式需要 传入指定的ip端口链接[io等待]
    redis = await aioredis.Redis(host='127.0.0.1', port=6379, db=0, decode_responses=True, encoding='utf-8')
    # 链接方式2: 通过url进行生成redis 链接对象 cdn链接[io等待]
    redis1 = await aioredis.from_url(address, encoding="utf-8", decode_responses=True)

    # 2.链接对象
    # print(redis)
    # print(redis1)

    # 3.设置值[io等待]
    await redis.set('66789', 'wkx')
    await redis1.set('8899', 'wkx')

    # 4.获取值[io等待]
    key = await redis.get('66789')
    print(key)
    key1 = await redis1.get('8899')
    print(key1)

    # 5.关闭链接[io等待]
    await redis.close()
    await redis1.close()

	await redis.wait_closed()
    await redis1.wait_closed()


asyncio.run(execute('redis://127.0.0.1:6379/1'))

异步Mysql操作

io操作:链接/设置数据/获取数据/关闭mysql
# 语法都是相近的只不过 同步变异步
# 文档地址:https://aiomysql.readthedocs.io/en/latest/
1.安装
pip install aiomysql


2.普通的mysql链接方式
import pymysql
pymysql.install_as_MySQLdb()

# 1.创建服务链接
serve = pymysql.connect(host='127.0.0.1',port=3306,user='root',password='123456',db='db21')

# 2.创建控制mysql权柄光标
conn = serve.cursor() 

# 3.执行sql
sql = conn.execute('show tables')

# 4.获取数据
data = conn.fetchall()
print(data)

# 5.关闭链接
conn.close()
serve.close()


3.使用aiomysql链接与普通的方式相同
内部处理语法相同的
import asyncio
import aiomysql


async def execute():
    # 创建mysql服务链接
    conn = await aiomysql.connect(host='127.0.0.1', port=3306, user='root', password='123456', db='db21')

    # 创建控制mysql权柄光标
    cur = await conn.cursor()

    # 执行命令
    await cur.execute('show tables')

    # 获取数据
    res = await cur.fetchall()
    print(res)

    # 关闭链接
    await cur.close() # 关闭光标
    conn.close() # 关闭链接


asyncio.run(execute())

异步mongodb操作

# mongodb模块使用的异步模块是motor模块
# 语法都是相近的只不过 同步变异步
# 文档地址:https://motor.readthedocs.io/en/stable/

普通的链接方式:
import pymongo
# 设置了密码域账户
url = 'mongodb://root:123456@127.0.0.1:27017/admin'
mon = pymongo.MongoClient(url)
print(mon.list_database_names())


1.安装
pip install motor

2.使用
import asyncio
from motor import motor_asyncio

url = 'mongodb://root:123456@127.0.0.1:27017/admin'


async def func():
    # 创建链接对象
    conn = motor_asyncio.AsyncIOMotorClient(url)
    # 获取数据库的全部名称
    db = await conn.list_database_names()
    print(db)
    # 断开链接
    conn.close()


asyncio.run(func())

FastAPI框架(为例)

# 文档:https://fastapi.tiangolo.com/zh/

性能比较高的框架(使用的事件循环:uvloop)
fastapi: 是一个api的接口异步框架
uvicorn:基于 uvloop 和 httptools 构建的非常快速的 ASGI '服务器'

1.安装fastapi框架
pip install fastapi
pip install uvicorn(asgi是一个支持异步的 uwsgi web服务器,内部基于uvloop)用来启动异步框架的服务器


2.基本使用[无异步功能,排队使用]
import asyncio
import uvicorn
from fastapi import FastAPI

app = FastAPI()


@app.get('/')
def index():
    '''普通的接口程序'''
    print(1234)
    return {'code': 200, 'msg': '你好,欢迎使用', 'err': ''}


if __name__ == '__main__':
    # uvicorn.run(内部的参数)
    # app: 运行出口文件的函数  所在文件.py:fastaspi实例对象 例如:测试13:app[测试13.py文件下的app=FastAPI()]
    # host: 服务启动的url 默认127.0.0.1
    # port: 端口 默认为 8080
    # reload : 热更新,如果内容修改重启服务器相当于falsk debug=True
    # debug: 同reload
    # reload_dirs:设置需要 reload 的目录,List[str] 类型
    # log_level: 设置日志等级 默认为等级为info
    uvicorn.run('测试13:app')


# 第二种启动方式: 在命令行启动
# uvicorn main:app 命令含义如下:
# main:main.py 文件(一个 Python "模块")。
# app:在 main.py 文件中通过 app = FastAPI() 创建的对象。
# --reload:让服务器在更新代码后重新启动。仅在开发时使用该选项。




3.异步使用

import asyncio
import aioredis
import uvicorn

from fastapi import FastAPI

app = FastAPI()
# 创建连接池redis对象,添加了一个max_connections限制链接
# 方式1
conn = aioredis.from_url('redis://127.0.0.1:6379',max_connections=10)

# 方式2
conn1 = aioredis.ConnectionPool.from_url('redis://127.0.0.1:6379',max_connections=10)
redis = aioredis.Redis(connection_pool=conn1)

@app.get('/')
def index():
    '''普通的接口程序'''
    print(1234)
    return {'code': 200, 'msg': '你好,欢迎使用首页接口', 'err': ''}


@app.get('/red')
async def red():
    '''异步接口,用户访问出现io操作,不会进行等待,去接待新的访问用户。'''
    print(456777)

    await redis.execute_command("set", "my-key", "value")
    return {'code': 200, 'msg': '你好,欢迎使用red接口', 'err': ''}


if __name__ == '__main__':
    uvicorn.run('测试13:app')

    
其他的异步框架写法也是相同的  

爬虫

1.安装
pip install aiohttp

2.实例

import asyncio
import aiohttp

async def download_img(session, url, img_name):
    async with session.get(url, verify_ssl=False) as response:  # 发送io请求
        content = await response.content.read()
        with open(f'{img_name}.png', mode='wb', ) as file_obj:
            file_obj.write(content)
            print('本地下载完成' + f'{img_name}.png')


async def main():
    async with aiohttp.ClientSession() as session:
        url_list = [
            'https://ts1.cn.mm.bing.net/th/id/R-C.df4462fabf18edd07195679a5f8a37e5?rik=FnNvr9jWWjHCVQ&riu=http%3a%2f%2fseopic.699pic.com%2fphoto%2f50059%2f8720.jpg_wh1200.jpg&ehk=ofb4q76uCls2S07aIlc8%2bab3H5zwrmj%2bhqiZ%2fyw3Ghw%3d&risl=&pid=ImgRaw&r=0',
            'https://pic3.zhimg.com/v2-58d652598269710fa67ec8d1c88d8f03_r.jpg?source=1940ef5c',
            'https://tse1-mm.cn.bing.net/th/id/OIP-C.xq6cOv82ubIhJY9qkFd5AgHaEK?pid=ImgDet&rs=1',
        ]
        # 协程对象列表
        task_list = [download_img(session, url, index) for index, url in enumerate(url_list)]
        await asyncio.gather(*task_list)  # 将协程对象列表批量添加到event loop 中作为task任务


if __name__ == '__main__':
    asyncio.run(main())

asyncio模块

asyncio

1.单进程单线程的程序,不能提高程序的运算速度
2.作用: 比较适合处理等待的任务,网络通信,不存在系统级的上下文切换
3.async分为: coroutine function(协程函数) coroutine object(协程对象)




例如:
import asyncio [3.7版本以上]

# 1.当前这个函数被称为coroutine function(协程函数)[async开头函数的都叫协程函数]
async def main(): 
    print('hello')
    await asyncio.sleep(2)
    print('world')
    
# 2.正常的调用main函数[只会获取到当前协程对象,不会执行]
coro = main()
# coroutine object(协程对象) 打印出来的函数是携程的对象<coroutine object ..>
print(coro)   # 不会进行运行 提示:运行错误,协程函数main主进程未等待


如果想启动async函数需要进入 event loop(事件循环) 控制程序的状态
1.需要导入asyncio

2.使用asyncio.run()接管整个程序[那么当前的入口程序就需要一个入口函数]

3.asyncio.run()函数: 1.建立起 event loop [事件循环] 2. 会将当前的main函数[coroutine function(协程函数)]作为当前event loop[事件循环]当其中的task[任务]

4.asyncio.run('参数是一个async关键字函数') 函数将程序从 '同步模式' 改变为 '异步模式' 的入口

asyncio使用说明

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return '666'


async def main():
    print(f'当前时间{time.strftime("%X")}')
    a = await say_after(1, 'hello')  # 接受返回值
    print(a)
    await say_after(2, 'word')
    print(f'结束时间{time.strftime("%X")}')


asyncio.run(main())


1.await:
'''
await会将 say_after() 时,变为一个task(任务)
发生:
    1. 当前这个 协程函数对象 被包装为一个task(任务),告诉了event loop (循环事件) 当前这个新任务的存在
    2. 告诉了event loop (循环事件) 这个task(任务)需要等待 say_after(async函数)执行完毕后 才能接着执行
    3.yield出去,告诉event loop(循环事件) 当前这个任务暂时执行不了 请执行其他task(任务)
    4. 当前event loop(循环事件) 再次安排当前task(任务)执行时 会将say_after(async函数)中得真正返回值拿出来进行保存(赋值给变量)
'''

2.整段程序执行过程:
'''

1.asyncio.run(main())  当main函数作为一个task(任务) 给到了event loop(循环事件)中
event loop 在寻找task(任务)时,发现只有一个main task(任务),就运行了main函数
2.main任务再执行先打印了:print(f'当前时间{time.strftime("%X")}')打印
3.执行了 await say_after(1, 'hello')  函数得到了 协程对象(coroutine object)
4.await say_after(1, 'hello') await 将整这个协程对象(coroutine object) 变为一个 task(任务) 放回了event loop 里面 同时告诉event loop需要等待他,将控制权给到了event loop
5.现在event loop 中存在两个任务[main,say_after],main运行不了需要等待say_after运行后才能运行,但是say_after函数内部有await asyncio.sleep(delay)[将当前的sleep作为一个task(任务)添加到event loop中]需要等待,又将控制权转给了event loop
6.say_after需要等待 await asyncio.sleep(delay)任务执行完毕。await asyncio.sleep(delay) 执行完毕后 event loop 让say_after 执行 打印了 print(what)执行完毕 ,将控制权交给了event loop
7.event loop 就会将控制权给main函数执行[main就执行完毕第一个 say_after] 在执行第二个say_after(与上面第一个执行一样)
'''

3. 注意
关于event loop控制权交回的两种方式:1.await 2.函数执行完毕后自动交回控制权 。如果当前任务中是一个死循环那么 event loop就直接卡死了


4.发现问题:当前两个async执行实际时间为3秒钟
因为await做的事情太多,需要将对象变为一个任务,告诉event loop 将控制权交还给event loop还需要等待,那么后面需要执行的协程函数也就需要等待前面的任务(协程函数已经通过awati转为任务后)完成后,后面的协程函数才转为一个任务,才被event loop 调用
问题解决: create_task函数

asyncio中得create_task函数

create_task('coroutine object(协程函数对象)')
create_taskd的作用:将协程对象对象转变为task(任务)注册到event loop中,分担了await一部分功能将协程函数对象包装为一个task(任务)

import asyncio
import time


1.使用create_task

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    # 分担await一部分功能,将协程函数对象包装为task(任务)告诉event loop 这个任务可以执行
    # 但是没办执行,执行权在main手中 需要await将event loop控制权拿到并且执行
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'word'))
    print(f'当前时间{time.strftime("%X")}')
    # 将event loop控制权拿到并且执行
    # 在之前await 协程对象时,需要将整个对象包装为task(任务)在进行执行 在将event loop 控制权拿到执行
    # await task(任务) 直接获取event loop控制权进行执行(我需要这个task任务完成,将控制权交还给event loop) 在控制权回来的时候将当前任务的返回值保存
    await task1
    await task2
    print(f'结束时间{time.strftime("%X")}')


asyncio.run(main())



2.create_task拿返回值
async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return f'{what} - {delay}'

async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'word'))
    print(f'当前时间{time.strftime("%X")}')
    # 获取 任务(协程对象)返回值(返回值会在任务执行完毕后赋值给变量)
    ret1 = await task1
    ret2 = await task2
    # 打印 协程对象返回值
    print(ret1)
    print(ret2)
    print(f'结束时间{time.strftime("%X")}')


asyncio.run(main())



3.批量将协程函数转变为task(任务)交给event loop

使用gather('接受多个协程函数对象')
会将多个协程函数对象包装成为任务,给到event loop 进行执行
返回值处理: 会将全部的任务的返回值添加到一个列表中(返回值顺序与gather(task任务顺序一致))


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return f'{what} - {delay}'


async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))
    task2 = asyncio.create_task(say_after(2, 'word'))
    print(f'当前时间{time.strftime("%X")}')

    task_list = [task1, task2] # 任务列表
    ret = await asyncio.gather(*task_list)
    print(ret)  # 全部的返回值['hello - 1', 'word - 2']
    print(f'结束时间{time.strftime("%X")}')


asyncio.run(main())


4.gather的另一个好处:将全部的协程对象包装为task(任务)
# 省去 cerate_task 的操作

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)
    return f'{what} - {delay}'


async def main():
    print(f'当前时间{time.strftime("%X")}')
    task_list = [say_after(1, 'hello'), say_after(2, 'word')]  # 协程函数对象列表
    ret = await asyncio.gather(*task_list)
    print(ret)  # 全部的返回值['hello - 1', 'word - 2']
    print(f'结束时间{time.strftime("%X")}')

asyncio.run(main())

async理解

event loop 当做大脑,若干个可以执行的task任务,task任务没办控制event loop去到那个task执行的,只能告诉event loop 我在等待那个task执行完毕,由event loop 进行控制权的分配到那个task

将event loop权限给出去的方式: 1.await 2.函数执行完毕

其实可以理解为,只是当前的代码在执行,只不过将等待的事件给利用了起来执行了另外的内容,节约了时间,如果代码中存在等待的操作,那么使用async最好,如果没有等待那么没什么用

要理解:
    1.coroutine function(协程函数) 
    2.coroutine object(协程对象[协程函数加()执行的结果])
    3.task(任务[是由协程对象被asyncio方法包装为任务给到event loop中,变为task才能被执行])
    4.拿到task返回值需要使用await方法,await方法会获取到event loop控制权(告诉它这个任务需要执行)