加入收藏 | 设为首页 | 会员中心 | 我要投稿 鞍山站长网 (https://www.0412zz.com/)- 应用安全、运维、云计算、5G、云通信!
当前位置: 首页 > 站长资讯 > 外闻 > 正文

RPC 服务器之【多进程描述符传递】高阶模型

发布时间:2019-07-07 00:25:30 所属栏目:外闻 来源:码洞
导读:今天老师要给大家介绍一个比较特别的 RPC 服务器模型,这个模型不同于 Nginx、不同于 Redis、不同于 Apache、不同于 Tornado、不同于 Netty,它的原型是 Node Cluster 的多进程并发模型。 Nginx 并发模型 我们知道 Nginx 的并发模型是一个多进程并发模型,

下面我来献上完整的服务器代码,为了简单起见,我们在 Slave 进程中处理 RPC 请求使用同步模型。

  1. # coding: utf 
  2. # sendmsg recvmsg python3.5+才可以支持 
  3.  
  4. import os 
  5. import json 
  6. import struct 
  7. import socket 
  8.  
  9.  
  10. def handle_conn(conn, addr, handlers): 
  11.     print(addr, "comes") 
  12.     while True: 
  13.         # 简单起见,这里就没有使用循环读取了 
  14.         length_prefix = conn.recv(4) 
  15.         if not length_prefix: 
  16.             print(addr, "bye") 
  17.             conn.close() 
  18.             break  # 关闭连接,继续处理下一个连接 
  19.         length, = struct.unpack("I", length_prefix) 
  20.         body = conn.recv(length) 
  21.         request = json.loads(body) 
  22.         in_ = request['in'] 
  23.         params = request['params'] 
  24.         print(in_, params) 
  25.         handler = handlers[in_] 
  26.         handler(conn, params) 
  27.  
  28.  
  29. def loop_slave(pr, handlers): 
  30.     while True: 
  31.         bufsize = 1 
  32.         ancsize = socket.CMSG_LEN(struct.calcsize('i')) 
  33.         msg, ancdata, flags, addr = pr.recvmsg(bufsize, ancsize) 
  34.         cmsg_level, cmsg_type, cmsg_data = ancdata[0] 
  35.         fd = struct.unpack('i', cmsg_data)[0] 
  36.         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd) 
  37.         handle_conn(sock, sock.getpeername(), handlers) 
  38.  
  39.  
  40. def ping(conn, params): 
  41.     send_result(conn, "pong", params) 
  42.  
  43.  
  44. def send_result(conn, out, result): 
  45.     response = json.dumps({"out": out, "result": result}).encode('utf-8') 
  46.     length_prefix = struct.pack("I", len(response)) 
  47.     conn.sendall(length_prefix) 
  48.     conn.sendall(response) 
  49.  
  50.  
  51. def loop_master(serv_sock, pws): 
  52.     idx = 0 
  53.     while True: 
  54.         sock, addr = serv_sock.accept() 
  55.         pw = pws[idx % len(pws)] 
  56.         # 消息数据,whatever 
  57.         msg = [b'x'] 
  58.         # 辅助数据,携带描述符 
  59.         ancdata = [( 
  60.             socket.SOL_SOCKET, 
  61.             socket.SCM_RIGHTS, 
  62.             struct.pack('i', sock.fileno()))] 
  63.         pw.sendmsg(msg, ancdata) 
  64.         sock.close()  # 关闭引用 
  65.         idx += 1 
  66.  
  67.  
  68. def prefork(serv_sock, n): 
  69.     pws = [] 
  70.     for i in range(n): 
  71.         # 开辟父子进程通信「管道」 
  72.         pr, pw = socket.socketpair() 
  73.         pid = os.fork() 
  74.         if pid < 0:  # fork error 
  75.             return pws 
  76.         if pid > 0: 
  77.             # 父进程 
  78.             pr.close()  # 父进程不用读 
  79.             pws.append(pw) 
  80.             continue 
  81.         if pid == 0: 
  82.             # 子进程 
  83.             serv_sock.close()  # 关闭引用 
  84.             pw.close()  # 子进程不用写 
  85.             return pr 
  86.     return pws 
  87.  
  88.  
  89. if __name__ == '__main__': 
  90.     serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
  91.     serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
  92.     serv_sock.bind(("localhost", 8080)) 
  93.     serv_sock.listen(1) 
  94.     pws_or_pr = prefork(serv_sock, 10) 
  95.     if hasattr(pws_or_pr, '__len__'): 
  96.         if pws_or_pr: 
  97.             loop_master(serv_sock, pws_or_pr) 
  98.         else: 
  99.             # fork 全部失败,没有子进程,Game Over 
  100.             serv_sock.close() 
  101.     else: 
  102.         handlers = { 
  103.             "ping": ping 
  104.         } 
  105.         loop_slave(pws_or_pr, handlers) 

(编辑:鞍山站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

推荐文章
    热点阅读