下面我来献上完整的服务器代码,为了简单起见,我们在 Slave 进程中处理 RPC 请求使用同步模型。
- # coding: utf
- # sendmsg recvmsg python3.5+才可以支持
-
- import os
- import json
- import struct
- import socket
-
-
- def handle_conn(conn, addr, handlers):
- print(addr, "comes")
- while True:
- # 简单起见,这里就没有使用循环读取了
- length_prefix = conn.recv(4)
- if not length_prefix:
- print(addr, "bye")
- conn.close()
- break # 关闭连接,继续处理下一个连接
- length, = struct.unpack("I", length_prefix)
- body = conn.recv(length)
- request = json.loads(body)
- in_ = request['in']
- params = request['params']
- print(in_, params)
- handler = handlers[in_]
- handler(conn, params)
-
-
- def loop_slave(pr, handlers):
- while True:
- bufsize = 1
- ancsize = socket.CMSG_LEN(struct.calcsize('i'))
- msg, ancdata, flags, addr = pr.recvmsg(bufsize, ancsize)
- cmsg_level, cmsg_type, cmsg_data = ancdata[0]
- fd = struct.unpack('i', cmsg_data)[0]
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd)
- handle_conn(sock, sock.getpeername(), handlers)
-
-
- def ping(conn, params):
- send_result(conn, "pong", params)
-
-
- def send_result(conn, out, result):
- response = json.dumps({"out": out, "result": result}).encode('utf-8')
- length_prefix = struct.pack("I", len(response))
- conn.sendall(length_prefix)
- conn.sendall(response)
-
-
- def loop_master(serv_sock, pws):
- idx = 0
- while True:
- sock, addr = serv_sock.accept()
- pw = pws[idx % len(pws)]
- # 消息数据,whatever
- msg = [b'x']
- # 辅助数据,携带描述符
- ancdata = [(
- socket.SOL_SOCKET,
- socket.SCM_RIGHTS,
- struct.pack('i', sock.fileno()))]
- pw.sendmsg(msg, ancdata)
- sock.close() # 关闭引用
- idx += 1
-
-
- def prefork(serv_sock, n):
- pws = []
- for i in range(n):
- # 开辟父子进程通信「管道」
- pr, pw = socket.socketpair()
- pid = os.fork()
- if pid < 0: # fork error
- return pws
- if pid > 0:
- # 父进程
- pr.close() # 父进程不用读
- pws.append(pw)
- continue
- if pid == 0:
- # 子进程
- serv_sock.close() # 关闭引用
- pw.close() # 子进程不用写
- return pr
- return pws
-
-
- if __name__ == '__main__':
- serv_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv_sock.bind(("localhost", 8080))
- serv_sock.listen(1)
- pws_or_pr = prefork(serv_sock, 10)
- if hasattr(pws_or_pr, '__len__'):
- if pws_or_pr:
- loop_master(serv_sock, pws_or_pr)
- else:
- # fork 全部失败,没有子进程,Game Over
- serv_sock.close()
- else:
- handlers = {
- "ping": ping
- }
- loop_slave(pws_or_pr, handlers)
(编辑:鞍山站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|