Python3サンプルコード集(その2)
Pythonのサンプルコードです。
ネットワーク系が中心となります。
書かなくなるとすぐ忘れてしまうので備忘録として残しておきます。
全てクラアント-サーバの通信のサンプルです。
[rtoc_mokuji title=”” title_display=”” heading=”h3″ list_h2_type=”” list_h3_type=”” display=”” frame_design=”” animation=””]
目次
TCP
tcp_client.py
import os
import sys
import socket
if __name__ == '__main__':
address = ('127.0.0.1', 50000)
datasize = 4096
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(address)
msg = 'Hello TCP'
s.send(msg.encode())
s.close()
tcp_server.py
import os
import sys
import socket
if __name__ == '__main__':
address = ('127.0.0.1', 50000)
datasize = 4096
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(address)
s.listen(1)
client, addr = s.accept()
data = client.recv(datasize)
sys.stdout.write("receive : {}\n".format(data.decode()))
s.close()
UDP
udp_client.py
import os
import sys
import socket
if __name__ == '__main__':
server_address = ('127.0.0.1', 50001)
datasize = 4096
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
msg = 'Hello UDP'
s.sendto(msg.encode(), server_address)
s.close()
udp_server.py
import os
import sys
import socket
if __name__ == '__main__':
bind_address = ('127.0.0.1', 50001)
datasize = 4096
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(bind_address)
data, client = s.recvfrom(datasize)
sys.stdout.write("receive : {}\n".format(data.decode()))
s.close()
UNIXドメイン(DATAGRAM型)
unix_dgram_client.py
import os
import sys
import socket
if __name__ == '__main__':
socket_path = './unix_dgram'
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
s.connect(socket_path)
msg = 'Hello unix domain (dgram)'
s.send(msg.encode())
s.close()
unix_dgram_server.py
import os
import sys
import socket
if __name__ == '__main__':
socket_path = './unix_dgram'
datasize = 4096
s = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
s.bind(socket_path)
data, client = s.recvfrom(datasize)
sys.stdout.write("received : {}\n".format(data.decode()))
os.unlink(socket_path)
s.close()
UNIXドメイン(STREAM型)
unix_stream_client.py
import os
import sys
import socket
if __name__ == '__main__':
socket_path = './unix_stream'
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.connect(socket_path)
msg = 'Hello unix domain (stream)'
s.send(msg.encode())
s.close()
unix_stream_server.py
import os
import sys
import socket
if __name__ == '__main__':
socket_path = './unix_stream'
datasize = 4096
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
s.bind(socket_path)
s.listen(1)
client, addr = s.accept()
data = client.recv(datasize)
sys.stdout.write("received : {}\n".format(data.decode()))
os.unlink(socket_path)
s.close()
TCP(selectで待つ)
tcp_select.py
import os
import sys
import socket
import select
if __name__ == '__main__':
address = ('127.0.0.1', 50000)
datasize = 4096
maxconn = 10
sockets = []
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(address)
s.listen(maxconn)
sockets.append(s)
try:
readfds = list(sockets)
while True:
sys.stdout.write("waiting...\n")
r,w,x = select.select(readfds, [], [])
for s in r:
if s in sockets:
conn, addr = s.accept()
readfds.append(conn)
sys.stdout.write("connected.\n")
else:
data = s.recv(datasize)
if len(data) == 0:
sys.stdout.write("disconnected.\n")
readfds.remove(s)
s.close()
break
sys.stdout.write("received {}\n".format(data.decode()))
finally:
s.close()
TCP-UDP-UNIXを全てselectで待つ
all_select.py
import os
import sys
import socket
import select
import signal
if __name__ == '__main__':
address1 = ('127.0.0.1', 50000)
address2 = ('127.0.0.1', 50001)
socket_path1 = "./unix_stream"
socket_path2 = "./unix_dgram"
datasize = 4096
maxconn = 1
sockets = []
conn_unix = None
conn_tcp = None
tcp = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp.bind(address1)
tcp.listen(maxconn)
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp.bind(address2)
unix_s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
unix_s.bind(socket_path1)
unix_s.listen(maxconn)
unix_d = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
unix_d.bind(socket_path2)
sockets.append(tcp)
sockets.append(udp)
sockets.append(unix_s)
sockets.append(unix_d)
try:
readfds = list(sockets)
while True:
sys.stdout.write("waiting...\n")
r,w,x = select.select(readfds, [], [])
for s in r:
if s in sockets:
if s == tcp:
conn_tcp, addr_tcp = s.accept()
readfds.append(conn_tcp)
sys.stdout.write("tcp connected.\n")
if s == udp:
data, client = s.recvfrom(datasize)
sys.stdout.write("received {}\n".format(data.decode()))
if s == unix_s:
conn_unix, addr_unix = s.accept()
readfds.append(conn_unix)
sys.stdout.write("unix connected.\n")
if s == unix_d:
data, client = s.recvfrom(datasize)
sys.stdout.write("received {}\n".format(data.decode()))
else:
if s == conn_tcp:
data = s.recv(datasize)
if len(data) == 0:
sys.stdout.write("tcp disconnected.\n")
readfds.remove(s)
s.close()
break
sys.stdout.write("received {}\n".format(data.decode()))
if s == conn_unix:
data = s.recv(datasize)
if len(data) == 0:
sys.stdout.write("unix disconnected.\n")
readfds.remove(s)
s.close()
break
sys.stdout.write("received {}\n".format(data.decode()))
except KeyboardInterrupt:
tcp.close()
udp.close()
os.unlink(socket_path1)
os.unlink(socket_path2)
unix_s.close()
unix_d.close()
エラー処理は上手く書けないのでごめんなさい。
コメント