当前位置:首页 > 科技  > 软件

如何在 Asyncio 中使用 Socket

来源: 责编: 时间:2024-01-18 09:38:54 131观看
导读楔子本次我们来聊一聊 Socket,以及它如何与 asyncio 搭配使用。阻塞 SocketSocket 是对 TCP/IP 协议的一个封装,可以让我们更方便地使用 TCP/IP 协议,而不用关注背后的原理。并且我们经常使用的 Web 框架,本质上也是一个

楔子

pxI28资讯网——每日最新资讯28at.com

pxI28资讯网——每日最新资讯28at.com

本次我们来聊一聊 Socket,以及它如何与 asyncio 搭配使用。pxI28资讯网——每日最新资讯28at.com

pxI28资讯网——每日最新资讯28at.com

pxI28资讯网——每日最新资讯28at.com

pxI28资讯网——每日最新资讯28at.com

阻塞 Socket

pxI28资讯网——每日最新资讯28at.com

pxI28资讯网——每日最新资讯28at.com

Socket 是对 TCP/IP 协议的一个封装,可以让我们更方便地使用 TCP/IP 协议,而不用关注背后的原理。并且我们经常使用的 Web 框架,本质上也是一个 Socket。pxI28资讯网——每日最新资讯28at.com

所以 Socket 是操作系统对 TCP/IP 网络协议栈的封装,并提供了一系列的接口,我们通过这些接口可以实现网络通信,而不用关注网络协议的具体细节。pxI28资讯网——每日最新资讯28at.com

图片图片pxI28资讯网——每日最新资讯28at.com

按照现有的网络模型,Socket 并不属于其中的任何一层,但我们可以简单地将 Socket 理解为传输层之上的抽象层,负责连接应用层和传输层。Socket 提供了大量的 API,基于这些 API 我们可以非常方便地使用网络协议栈,在不同主机间进行网络通信。pxI28资讯网——每日最新资讯28at.com

Linux 一切皆文件,Socket 也不例外,它被称为套接字文件,在使用上和普通文件是类似的。pxI28资讯网——每日最新资讯28at.com

Socket 是什么我们已经知道了,下面来看看如何使用 Socket 进行编程。pxI28资讯网——每日最新资讯28at.com

图片图片pxI28资讯网——每日最新资讯28at.com

整个过程如下:pxI28资讯网——每日最新资讯28at.com

  • 服务端初始化 socket,此时会得到「主动套接字」;
  • 服务端调用 bind 方法,将套接字绑定在某个 IP 和端口上;
  • 服务端调用 listen 进行监听,此时「主动套接字」会变成「监听套接字」;
  • 服务端调用 accept,等待客户端连接,此时服务端会阻塞在这里(调用的是阻塞的 API);
  • 客户端同样初始化 socket,得到主动套接字;
  • 客户端调用主动套接字的 connect,向服务器端发起连接请求,如果连接成功,后续客户端就用这个主动套接字进行数据的传输;
  • 当客户端来连接时,那么服务端的 accept 将不再阻塞,并返回「已连接套接字」,后续服务端便用这个已连接套接字和客户端进行数据传输;
  • 当客户端来连接时,那么服务端的 accept 将不再阻塞,并返回「已连接套接字」,后续服务端便用这个已连接套接字和客户端进行数据传输;

我们使用来编写代码演示一下这个过程,首先是服务端:pxI28资讯网——每日最新资讯28at.com

import socket# socket.socket() 会返回一个「主动套接字」server = socket.socket(    # 表示使用 IPv4,如果是 socket.AF_INET6    # 则表示使用 IPv6    socket.AF_INET,    # 表示建立 TCP 连接,如果是 socket.SOCK_DGRAM    # 则表示建立 UDP 连接    socket.SOCK_STREAM)# 当然这两个参数也可以不传,因为默认就是它# 设置套接字属性,这里让端口释放后立刻就能再次使用server.setsockopt(socket.SOL_SOCKET,                  socket.SO_REUSEADDR, True)# 将「主动套接字」绑定在某个 IP 和端口上server.bind(("localhost", 12345))# 监听,此时「主动套接字」会变成「监听套接字」server.listen(5)# 调用 accept,等待客户端连接,此时会阻塞在这里# 如果客户端连接到来,那么会返回「已连接套接字」,也就是这里的 conn# 至于 addr 则是一个元组,保存了客户端连接的信息(IP 和端口)conn, addr = server.accept()# 下面我们通过「已连接套接字」conn 和客户端进行消息的收发# 收消息使用 recv、发消息使用 send,和 read、write 本质是一样的while True:    msg = conn.recv(1024)    # 当客户端断开连接时,msg 会收到一个空字节串    if not msg:        print("客户端已经断开连接")        conn.close()        break    print("客户端发来消息:", msg.decode("utf-8"))    # 然后我们加点内容之后,再给客户端发过去    conn.send("服务端收到, 你发的消息是: ".encode("utf-8") + msg)

接下来编写客户端:pxI28资讯网——每日最新资讯28at.com

import socket# 返回主动套接字client = socket.socket(socket.AF_INET,                       socket.SOCK_STREAM)# 连接服务端client.connect(("localhost", 12345))while True:    # 发送消息    data = input("请输入内容: ")    if data.strip().lower() in ("q", "quit", "exit"):        client.close()        print("Bye~~~")        break    client.send(data.encode("utf-8"))    print(client.recv(1024).decode("utf-8"))

启动服务端和客户端进行测试:pxI28资讯网——每日最新资讯28at.com

图片图片pxI28资讯网——每日最新资讯28at.com

还是比较简单的,当然我们这里的服务端每次只能和一个客户端通信,如果想服务多个客户端的话,那么需要为已连接套接字单独开一个线程和客户端进行通信,然后主线程继续调用 accept 方法等待下一个客户端。pxI28资讯网——每日最新资讯28at.com

下面来编写一下多线程的版本,这里只需要编写服务端即可,客户端代码不变。pxI28资讯网——每日最新资讯28at.com

import socketimport threadingserver = socket.socket()server.setsockopt(socket.SOL_SOCKET,                  socket.SO_REUSEADDR, True)server.bind(("localhost", 12345))server.listen(5)def handle_message(conn, addr):    while True:        msg = conn.recv(1024)        if not msg:            print(f"客户端(ip: {addr[0]}, port: {addr[1]}) 已经断开连接")            conn.close()            break        print(f"客户端(ip: {addr[0]}, port: {addr[1]}) 发来消息:",              msg.decode("utf-8"))        conn.send("服务端收到, 你发的消息是: ".encode("utf-8") + msg)while True:    conn, addr = server.accept()    threading.Thread(        target=handle_message,        args=(conn, addr)    ).start()

代码很简单,就是把已连接套接字和客户端的通信逻辑写在了单独的函数中,每来一个客户端,服务端都会启动一个新的线程去执行该函数,然后继续监听,等待下一个客户端连接到来。pxI28资讯网——每日最新资讯28at.com

然后客户端代码不变,我们启动三个客户端去和服务端通信,看看结果如何。pxI28资讯网——每日最新资讯28at.com

图片图片pxI28资讯网——每日最新资讯28at.com

结果一切正常,当然我们这里的代码比较简单,就是普通的消息收发。你也可以实现一个更复杂的功能,比如文件下载器,把服务端当成网盘,支持客户端上传和下载文件,并不难。pxI28资讯网——每日最新资讯28at.com

pxI28资讯网——每日最新资讯28at.com

非阻塞 Socket

pxI28资讯网——每日最新资讯28at.com

pxI28资讯网——每日最新资讯28at.com

先回顾一下 socket 模型:pxI28资讯网——每日最新资讯28at.com

图片图片pxI28资讯网——每日最新资讯28at.com

但是注意:我们说在 listen() 这一步,会将主动套接字转化为监听套接字,但此时的监听套接字的类型是阻塞的。阻塞类型的监听套接字在调用 accept() 方法时,如果没有客户端来连接的话,就会一直处于阻塞状态,那么此时主线程就没法干其它事情了。pxI28资讯网——每日最新资讯28at.com

所以要设置为非阻塞,而非阻塞的监听套接字在调用 accept() 时,如果没有客户端来连接,那么主线程不会傻傻地等待,而是会直接返回,然后去做其它的事情。pxI28资讯网——每日最新资讯28at.com

类似的,我们在创建已连接套接字的时候默认也是阻塞的,阻塞类型的已连接套接字在调用 send() 和 recv() 的时候也会处于阻塞状态。比如当客户端一直不发数据的时候,已连接套接字就会一直阻塞在 recv() 这一步。如果是非阻塞类型的已连接套接字,那么当调用 recv() 但却收不到数据时,也不用处于阻塞状态,同样可以直接返回去做其它事情。pxI28资讯网——每日最新资讯28at.com

import socketserver = socket.socket()server.bind(("localhost", 12345))# 调用 setblocking 方法,传入 False# 表示将监听套接字和已连接套接字的类型设置为非阻塞server.setblocking(False)server.listen(5)while True:    try:        # 非阻塞的监听套接字调用 accept() 时        # 如果发现没有客户端连接,则会立刻抛出 BlockingIOError        # 因此这里写了个死循环        conn, addr = server.accept()    except BlockingIOError:        pass    else:        breakwhile True:    try:        # 同理,非阻塞的已连接套接字在调用 recv() 时        # 如果发现客户端没有发数据,那么同样会报错        msg = conn.recv(1024)    except BlockingIOError:        pass    else:        print(msg.decode("utf-8"))        conn.send(b"data from server")

很明显,虽然上面的代码在运行的时候正常,但存在两个问题:pxI28资讯网——每日最新资讯28at.com

1)虽然 accept() 不阻塞了,在没有客户端连接时主线程可以去做其它事情,但如果后续有客户端连接,主线程要如何得知呢?因此必须要有一种机制,能够继续在监听套接字上等待后续连接请求,并在请求到来时通知主线程。我们上面的做法是写了一个死循环,但很明显这是没有意义的,这种做法还不如使用阻塞的套接字。pxI28资讯网——每日最新资讯28at.com

2)send() / recv() 不阻塞了,相当于 I/O 读写流程不再是阻塞的,读写方法都会瞬间完成并返回,也就是说它会采用能读多少就读多少、能写多少就写多少的策略来执行 I/O 操作,这显然更符合我们对性能的追求。pxI28资讯网——每日最新资讯28at.com

图片图片pxI28资讯网——每日最新资讯28at.com

显然对于非阻塞套接字而言,会面临一个问题,那就是当我们执行读取操作时,有可能只读了一部分数据,剩余的数据客户端还没发过来,那么这些数据何时可读呢?同理写数据也是这种情况,当缓冲区满了,而我们的数据还没有写完,那么剩下的数据又何时可写呢?因此同样要有一种机制,能够在主线程做别的事情的时候继续监听已连接套接字,并且在有数据可读写的时候通知主线程。pxI28资讯网——每日最新资讯28at.com

这样才能保证主线程既不会像基本 IO 模型一样,一直在阻塞点等待,也不会无法处理实际到达的客户端连接请求和可读写的数据,而上面所提到的机制便是 I/O 多路复用。pxI28资讯网——每日最新资讯28at.com

早期的所有框架都是非阻塞 + 回调 + 基于 IO 多路复用的事件循环,这种模式的性能也非常高,Redis 和 Nginx 都是基于这种方式实现了高并发。只是这种编码方式非常痛苦,它将好端端的自上而下的逻辑分割的四分五裂,而且也不好维护,它使得开发人员在编写业务逻辑的同时,还要关注并发细节。pxI28资讯网——每日最新资讯28at.com

因此使用多路复用 + 回调的方式编写异步化代码,虽然并发量能上去,但是对开发者很不友好;而使用同步的方式编写同步代码,虽然很容易理解,可并发量却又上不去。那么问题来了,有没有一种办法,能够让我们在享受异步化带来的高并发的同时,又能以同步的方式去编写代码呢?也就是我们能不能以同步的方式去编写异步化的代码呢?pxI28资讯网——每日最新资讯28at.com

答案是可以的,使用「协程」便可以办到。协程在这种模式的基础之上又批了一层外衣,兼顾了开发效率与运行效率。pxI28资讯网——每日最新资讯28at.com

pxI28资讯网——每日最新资讯28at.com

在 asyncio 中使用 Socket

pxI28资讯网——每日最新资讯28at.com

pxI28资讯网——每日最新资讯28at.com

asyncio 的事件循环提供了处理套接字的一些方法,我们主要会用到三个:pxI28资讯网——每日最新资讯28at.com

  • sock_accept()
  • sock_recv()
  • sock_sendall()

这些方法类似于前面使用的套接字方法,但不同之处在于,它们需要接收非阻塞套接字作为参数,然后返回协程。我们可以等待协程,直到有数据可供操作。pxI28资讯网——每日最新资讯28at.com

先来看一下 sock_accept(),它类似于 server.accept()。pxI28资讯网——每日最新资讯28at.com

conn,add = await loop.sock_accept(sock)

然后 sock_recv 和 sock_sendall 的调用方式与 sock_accept 类似,它们接收一个套接字,然后返回协程对象。通过 await 表达式,sock_recv 将会阻塞,直到套接字有可以处理的字节;sock_sendall 接收一个套接字和要发送的数据,同样会陷入阻塞,直到要发送给套接字的所有数据都发送完毕,成功时返回 None。pxI28资讯网——每日最新资讯28at.com

data = await loop.sock_recv(sock)await loop.sock_sendall(sock, data)

下面我们就基于 asyncio 设计一个回显服务器。pxI28资讯网——每日最新资讯28at.com

import asyncioimport socketasync def echo(conn: socket.socket):    loop = asyncio.get_running_loop()    # 无限循环等待来自客户端连接的数据    try:        while data := await loop.sock_recv(conn, 1024):            # 收到数据之后再将其发送给客户端            # 为了区分,我们发送的时候在结尾加一个 b"~"            await loop.sock_sendall(conn, data + b"~")    except Exception as e:        print(f"服务出错: {e}")    finally:        conn.close()async def listen_for_conn(server: socket.socket):    loop = asyncio.get_running_loop()    while True:        conn, addr = await loop.sock_accept(server)        conn.setblocking(False)        print(f"收到客户端 {addr} 的连接")        # 每次连接时,都创建一个任务来监听客户端的数据        asyncio.create_task(echo(conn))async def main():    server = socket.socket()    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)    server.setblocking(False)    server.bind(("localhost", 12345))    server.listen()    await listen_for_conn(server)asyncio.run(main())

运行这个应用程序可以同时服务多个客户端,它里面同样使用了 IO 多路复用,只不过事件循环将它封装起来了,我们不需要直接面对。所以这种编程模式就简单多了。pxI28资讯网——每日最新资讯28at.com

pxI28资讯网——每日最新资讯28at.com

小结

pxI28资讯网——每日最新资讯28at.com

pxI28资讯网——每日最新资讯28at.com

如果使用阻塞套接字创建应用程序,那么阻塞套接字将在等待数据时停止整个线程。这阻止了我们实现并发,因为一次只能从一个客户端获取数据。pxI28资讯网——每日最新资讯28at.com

使用非阻塞套接字构建应用程序,这些套接字总是会立即返回,而结果有两种:要么已经准备好了数据,要么因为没有数据而出现异常。pxI28资讯网——每日最新资讯28at.com

使用 asyncio 的事件循环方法来构建具有非阻塞套接字的应用程序,这些方法接收一个套接字并返回一个协程,然后可在 await 表达式中使用它。这将暂停父协程,直到套接字带有数据。事件循环就是基于 IO 多路复用做的一个封装,而 IO 多路复用能够实现的前提之一就是:套接字必须是非阻塞的。pxI28资讯网——每日最新资讯28at.com

本文链接:http://www.28at.com/showinfo-26-64097-0.html如何在 Asyncio 中使用 Socket

声明:本网页内容旨在传播知识,若有侵权等问题请及时与本网联系,我们将在第一时间删除处理。邮件:2376512515@qq.com

上一篇: 你知道.NET中的数组在内存中如何布局的吗?

下一篇: Gorm 框架原理&源码解析

标签:
  • 热门焦点
Top