広告
Pythonのサンプルコードです。
ネットワーク系が中心となります。
書かなくなるとすぐ忘れてしまうので備忘録として残しておきます。
全てクラアント-サーバの通信のサンプルです。
TCP
tcp_client.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import os import sys import socket if __name__ == '__main__': address = ('127.0.0.1', 50000) datasize = 4096 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect(address) msg = 'Hello TCP' s.send(msg.encode()) s.close() |
tcp_server.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import os import sys import socket if __name__ == '__main__': address = ('127.0.0.1', 50000) datasize = 4096 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(address) s.listen(1) client, addr = s.accept() data = client.recv(datasize) sys.stdout.write("receive : {}\n".format(data.decode())) s.close() |
UDP
udp_client.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
import os import sys import socket if __name__ == '__main__': server_address = ('127.0.0.1', 50001) datasize = 4096 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) msg = 'Hello UDP' s.sendto(msg.encode(), server_address) s.close() |
udp_server.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import os import sys import socket if __name__ == '__main__': bind_address = ('127.0.0.1', 50001) datasize = 4096 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.bind(bind_address) data, client = s.recvfrom(datasize) sys.stdout.write("receive : {}\n".format(data.decode())) s.close() |
1 |
UNIXドメイン(DATAGRAM型)
unix_dgram_client.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import os import sys import socket if __name__ == '__main__': socket_path = './unix_dgram' s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) s.connect(socket_path) msg = 'Hello unix domain (dgram)' s.send(msg.encode()) s.close() |
unix_dgram_server.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import os import sys import socket if __name__ == '__main__': socket_path = './unix_dgram' datasize = 4096 s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) s.bind(socket_path) data, client = s.recvfrom(datasize) sys.stdout.write("received : {}\n".format(data.decode())) os.unlink(socket_path) s.close() |
1 |
UNIXドメイン(STREAM型)
unix_stream_client.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import os import sys import socket if __name__ == '__main__': socket_path = './unix_stream' s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.connect(socket_path) msg = 'Hello unix domain (stream)' s.send(msg.encode()) s.close() |
1 |
unix_stream_server.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import os import sys import socket if __name__ == '__main__': socket_path = './unix_stream' datasize = 4096 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) s.bind(socket_path) s.listen(1) client, addr = s.accept() data = client.recv(datasize) sys.stdout.write("received : {}\n".format(data.decode())) os.unlink(socket_path) s.close() |
1 |
TCP(selectで待つ)
tcp_select.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
import os import sys import socket import select if __name__ == '__main__': address = ('127.0.0.1', 50000) datasize = 4096 maxconn = 10 sockets = [] s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(address) s.listen(maxconn) sockets.append(s) try: readfds = list(sockets) while True: sys.stdout.write("waiting...\n") r,w,x = select.select(readfds, [], []) for s in r: if s in sockets: conn, addr = s.accept() readfds.append(conn) sys.stdout.write("connected.\n") else: data = s.recv(datasize) if len(data) == 0: sys.stdout.write("disconnected.\n") readfds.remove(s) s.close() break sys.stdout.write("received {}\n".format(data.decode())) finally: s.close() |
1 |
TCP-UDP-UNIXを全てselectで待つ
all_select.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
import os import sys import socket import select import signal if __name__ == '__main__': address1 = ('127.0.0.1', 50000) address2 = ('127.0.0.1', 50001) socket_path1 = "./unix_stream" socket_path2 = "./unix_dgram" datasize = 4096 maxconn = 1 sockets = [] conn_unix = None conn_tcp = None tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM) tcp.bind(address1) tcp.listen(maxconn) udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) udp.bind(address2) unix_s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) unix_s.bind(socket_path1) unix_s.listen(maxconn) unix_d = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) unix_d.bind(socket_path2) sockets.append(tcp) sockets.append(udp) sockets.append(unix_s) sockets.append(unix_d) try: readfds = list(sockets) while True: sys.stdout.write("waiting...\n") r,w,x = select.select(readfds, [], []) for s in r: if s in sockets: if s == tcp: conn_tcp, addr_tcp = s.accept() readfds.append(conn_tcp) sys.stdout.write("tcp connected.\n") if s == udp: data, client = s.recvfrom(datasize) sys.stdout.write("received {}\n".format(data.decode())) if s == unix_s: conn_unix, addr_unix = s.accept() readfds.append(conn_unix) sys.stdout.write("unix connected.\n") if s == unix_d: data, client = s.recvfrom(datasize) sys.stdout.write("received {}\n".format(data.decode())) else: if s == conn_tcp: data = s.recv(datasize) if len(data) == 0: sys.stdout.write("tcp disconnected.\n") readfds.remove(s) s.close() break sys.stdout.write("received {}\n".format(data.decode())) if s == conn_unix: data = s.recv(datasize) if len(data) == 0: sys.stdout.write("unix disconnected.\n") readfds.remove(s) s.close() break sys.stdout.write("received {}\n".format(data.decode())) except KeyboardInterrupt: tcp.close() udp.close() os.unlink(socket_path1) os.unlink(socket_path2) unix_s.close() unix_d.close() |
1 |
エラー処理は上手く書けないのでごめんなさい。
広告
広告