遠(yuǎn)程過程調(diào)用,簡(jiǎn)稱為RPC,是一個(gè)計(jì)算機(jī)通信協(xié)議,它允許運(yùn)行于一臺(tái)計(jì)算機(jī)的程序調(diào)用另一臺(tái)計(jì)算機(jī)的子程序,而無需額外地為這個(gè)交互作用編程。
RPC與傳統(tǒng)的HTTP對(duì)比
優(yōu)點(diǎn):
? 1. 傳輸效率高(二進(jìn)制傳輸)
? 2. 發(fā)起調(diào)用的一方無需知道RPC的具體實(shí)現(xiàn),如同調(diào)用本地函數(shù)般調(diào)用
缺點(diǎn):
? 1. 通用性不如HTTP好(HTTP是標(biāo)準(zhǔn)協(xié)議)
總結(jié):RPC適合內(nèi)部服務(wù)間的通信調(diào)用;HTTP適合面向用戶與服務(wù)間的通信調(diào)用
?
RPC調(diào)用過程如下:
?
1. 調(diào)用者(客戶端Client)以本地調(diào)用的方式發(fā)起調(diào)用;
2. Client stub(客戶端存根)收到調(diào)用后,負(fù)責(zé)將被調(diào)用的方法名、參數(shù)等打包編碼成特定格式的能進(jìn)行網(wǎng)絡(luò)傳輸?shù)南Ⅲw;
3. Client stub將消息體通過網(wǎng)絡(luò)發(fā)送給服務(wù)端;
4. Server stub(服務(wù)端存根)收到通過網(wǎng)絡(luò)接收到消息后按照相應(yīng)格式進(jìn)行拆包解碼,獲取方法名和參數(shù);
5. Server stub根據(jù)方法名和參數(shù)進(jìn)行本地調(diào)用;
6. 被調(diào)用者(Server)本地調(diào)用執(zhí)行后將結(jié)果返回給server stub;
7. Server stub將返回值打包編碼成消息,并通過網(wǎng)絡(luò)發(fā)送給客戶端;
8. Client stub收到消息后,進(jìn)行拆包解碼,返回給Client;
9. Client得到本次RPC調(diào)用的最終結(jié)果。
?
對(duì)于RPC調(diào)用流程的實(shí)現(xiàn),拋開調(diào)用方與被調(diào)用方,其核心主要是:消息協(xié)議和傳輸控制的實(shí)現(xiàn)?
(1). RPC消息協(xié)議:客戶端調(diào)用的參數(shù)和服務(wù)端的返回值這些在網(wǎng)絡(luò)上傳輸?shù)臄?shù)據(jù)以何種方式打包編碼和拆包解碼
?RPC的消息協(xié)議在設(shè)計(jì)時(shí)主要要考慮,消息轉(zhuǎn)換及傳輸?shù)男剩?br>為解決消息轉(zhuǎn)換及傳輸?shù)男?,可以以二進(jìn)制的方式傳輸消息,使用原始的二進(jìn)制傳輸可以省去中間轉(zhuǎn)換的環(huán)節(jié)并減少傳輸?shù)臄?shù)據(jù)量。
在Python中可以使用struct模塊對(duì)二進(jìn)制進(jìn)行編碼和解碼:
struct.pact將其它類型轉(zhuǎn)換為二進(jìn)制,通常用于消息長(zhǎng)度的轉(zhuǎn)換:
import struct # 將整數(shù)2轉(zhuǎn)換成適用網(wǎng)絡(luò)傳輸?shù)臒o符號(hào)的4個(gè)字節(jié)整數(shù) >>> struct.pack('!I', 2) '
\x00\x00\x00\x02'
?struct.unpack將二進(jìn)制類型轉(zhuǎn)換為其它類型:
byte_data = '\x00\x00\x00\x02' # 將2對(duì)應(yīng)的二進(jìn)制 轉(zhuǎn)換為十進(jìn)制整數(shù) 返回結(jié)果是個(gè)元組 >>> struct.unpack('
!I',byte_data) (2,)
?
?(2). RPC傳輸控制
?對(duì)于消息數(shù)據(jù)的傳輸,主要有HTTP傳輸和TCP傳輸,鑒于TCP傳輸?shù)目煽啃?,RPC的傳輸一般使用TCP作為傳輸協(xié)議
在TCP傳輸中客戶端與服務(wù)端通過socket進(jìn)行通信,通信流程:
?
RPC使用TCP進(jìn)行傳輸控制的實(shí)現(xiàn)
1 import socket 2 3 class Channel(object): 4 """ 5 與客戶端建立網(wǎng)絡(luò)連接 6 """ 7
8 def __init__(self, host, port): 9 self.host = host # 服務(wù)器地址 10 self.port =
port# 服務(wù)器端口 11 12 def get_connection(self): 13 """ 14 獲取一個(gè)tcp連接 15 """ 16 sock
= socket.socket(socket.AF_INET, socket.SOCK_STREAM) 17
sock.connect((self.host, self.port))18 return sock 19 20 21 class
Server(object):22 def __init__(self, host, port, handlers): 23 self.sock =
socket.socket(socket.AF_INET, socket.SOCK_STREAM)24
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 25 self.host =
host26 self.port = port 27 self.sock.bind((host, port)) 28 self.handlers =
handlers29 30 def serve(self): 31 """ 32 開啟服務(wù)器運(yùn)行,提供RPC服務(wù) 33 """ 34 #
開啟服務(wù)監(jiān)聽,等待客戶端連接 35 self.sock.listen(128) 36 print("開始監(jiān)聽") 37 while True: 38 #
接收客戶端的連接請(qǐng)求 39 conn, addr = self.sock.accept() 40 print("建立連接{}"
.format(str(addr)))41 42 # 創(chuàng)建ServerStub對(duì)象,完成客戶端具體的RPC調(diào)用 43 stub =
ServerStub(conn, self.handlers)44 try: 45 while True: 46 stub.process() 47
except EOFError: 48 # 表示客戶端關(guān)閉了連接 49 print("客戶端關(guān)閉連接") 50 # 關(guān)閉服務(wù)端連接 51
conn.close()
?
通過Socket使得RPC客戶端與服務(wù)器進(jìn)行通信
1 TCP服務(wù)端 2 sock = socket.socket() # 創(chuàng)建一個(gè)套接字 3 sock.bind() # 綁定端口 4
sock.listen()# 監(jiān)聽連接 5 sock.accept() # 接受新連接 6 sock.close() # 關(guān)閉服務(wù)器套接字 7 8 9
TCP客戶端10 sock = socket.socket() # 創(chuàng)建一個(gè)套接字 11 sock.connect() # 連接遠(yuǎn)程服務(wù)器 12
sock.recv()# 讀 13 sock.send() # 寫 14 sock.sendall() # 完全寫 15 sock.close() # 關(guān)閉
?
RPC服務(wù)的實(shí)現(xiàn):為了能讓RPC服務(wù)器同時(shí)處理多個(gè)客戶端的請(qǐng)求,提升性能,可以采用多線程、多進(jìn)程方式實(shí)現(xiàn),下面分別介紹這兩種方式實(shí)現(xiàn)的RPC服務(wù)
多線程RPC服務(wù):
1 class ThreadServer(object): 2 """ 3 多線程RPC服務(wù)器 4 """ 5 6 def __init__
(self, host, port, handlers): 7 self.sock = socket.socket(socket.AF_INET,
socket.SOCK_STREAM) 8 self.sock.setsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR, 1) 9 self.host = host 10 self.port = port 11
self.sock.bind((host, port))12 self.handlers = handlers 13 14 def serve(self):
15 """ 16 開始服務(wù) 17 """ 18 self.sock.listen(128) 19 print("開始監(jiān)聽") 20 while True:
21 conn, addr = self.sock.accept() 22 print("建立鏈接{}".format(str(addr))) 23 t =
threading.Thread(target=self.handle, args=(conn,)) 24 t.start() 25 26 def
handle(self, client):27 stub = ServerStub(client, self.handlers) 28 try: 29
while True: 30 stub.process() 31 except EOFError: 32 print("客戶端關(guān)閉連接") 33 34
client.close()
多進(jìn)程RPC服務(wù):
1 class MultiProcessServer(object): 2 """ 3 多進(jìn)程服務(wù)器 4 """ 5 6 def
__init__(self, host, port, handlers): 7 self.sock =
socket.socket(socket.AF_INET, socket.SOCK_STREAM) 8
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 9 self.host =
host10 self.port = port 11 self.sock.bind((host, port)) 12 self.handlers =
handlers13 14 def serve(self): 15 """ 16 開始服務(wù) 17 """ 18 self.sock.listen(128)
19 print("開始監(jiān)聽") 20 while True: 21 conn, addr = self.sock.accept() 22 print("
建立鏈接{}".format(str(addr))) 23 t = Process(target=self.handle, args=(conn,)) 24
t.start()25 26 def handle(self, client): 27 stub = ServerStub(client,
self.handlers)28 try: 29 while True: 30 stub.process() 31 except EOFError: 32
print("客戶端關(guān)閉連接") 33 34 client.close()
熱門工具 換一換