-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
235 lines (207 loc) · 8.28 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
import os
import time
import socket
import select
import logging
from threading import Thread
from const import *
logging.basicConfig(level = logging.DEBUG, format='%(asctime)s - %(message)s')
log = logging.getLogger(__name__)
GBN_ENABLE = True # 回退N帧协议 可在接收方窗口为1时,大幅提高信道利用率
class TftpSession(Thread):
""" TFTP 会话 """
def __init__(self, tftp_dir, client_addr, req_data: bytes):
super().__init__(name=f'{client_addr}', daemon=True)
self.tftp_dir = tftp_dir
self.req_data = req_data
self.blksize = DEF_BLOCK_SIZE
self.windowsize = DEF_WINDOW_SIZE
self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.s.bind(('0.0.0.0', 0))
self.s.connect(client_addr)
self.s.settimeout(DEF_TIMEOUT)
def run(self):
t0 = time.time()
file_path = self.request_parse()
t1 = time.time()
self.transfer(file_path)
t2 = time.time()
speed = os.path.getsize(file_path) / 1024 / 1024 / (t2 - t0)
log.info(f'传输完成,握手时间:{t1-t0:.3f}s,传输时间:{t2-t1:.3f}s,速度:{speed:.2f}M/s')
def send(self, opcode, **kwargs):
pkt = opcode
if opcode == ERROR:
errcode = kwargs['errcode']
pkt += errcode.to_bytes(2, 'big')
if errcode == 0 and 'errmsg' in kwargs:
pkt += kwargs['errmsg'].encode() + b'\x00'
elif opcode == ACK:
pkt += kwargs['block'].to_bytes(2, 'big')
elif opcode == OACK:
pkt += kwargs['option']
elif opcode == DATA:
pkt += kwargs['block'].to_bytes(2, 'big') + kwargs['data']
else:
return
self.s.send(pkt)
def recv(self):
data = self.s.recv(65536)
opcode = data[:2]
if opcode == ACK:
block = int.from_bytes(data[2:4], 'big')
return block
elif opcode == ERROR:
errcode = int.from_bytes(data[2:4], 'big')
errmsg = data[4:].decode()
log.error(f'传输失败,错误码:{errcode} {errmsg}')
raise SystemExit()
else:
return self.recv()
def request_parse(self):
""" 握手过程 """
opcode = self.req_data[:2]
fields = self.req_data[2:-1].lower().decode().split('\x00')
file_name = fields[0]
file_path = os.path.join(self.tftp_dir, file_name)
mode = fields[1]
options = fields[2:]
log.info('')
req = '下载' if opcode == RRQ else '上传'
log.info(f'{req}请求,文件名:{file_name},选项:{options}')
if opcode == WRQ:
errmsg = '传输失败,暂未支持上传请求'
self.send(ERROR, errcode=0, errmsg=errmsg)
log.error(errmsg)
raise SystemExit()
if mode != 'octet':
errmsg = f'传输失败,不支持的传输模式:{mode}'
self.send(ERROR, errcode=0, errmsg=errmsg)
log.error(errmsg)
raise SystemExit()
if not os.path.exists(file_path):
self.send(ERROR, errcode=1)
log.error(f'传输失败,{file_path} 不存在')
raise SystemExit()
# 选项协商
nego = ''
if 'blksize' in options:
idx = options.index('blksize')
blksize = int(options[idx + 1])
self.blksize = max(min(MAX_BLOCK_SIZE, blksize), MIN_BLOCK_SIZE)
nego += f'blksize\x00{self.blksize}\x00'
if 'windowsize' in options:
idx = options.index('windowsize')
windowsize = int(options[idx + 1])
self.windowsize = max(min(MAX_WINDOW_SIZE, windowsize), MIN_WINDOW_SIZE)
nego += f'windowsize\x00{self.windowsize}\x00'
if 'tsize' in options:
idx = options.index('tsize')
tsize = os.path.getsize(file_path)
nego += f'tsize\x00{tsize}\x00'
if nego:
retry = 0
while True:
self.send(OACK, option=nego.encode())
try:
self.recv()
break
except TimeoutError as e:
if retry == MAX_RETRY:
errmsg=f'传输失败,超过最大重传次数{MAX_RETRY}'
self.send(ERROR, errcode=0, errmsg=errmsg)
log.error(errmsg)
raise SystemExit()
retry += 1
log.error(f'选项协商超时{retry}次,超时时间:{self.s.gettimeout()}s')
log.info(f'协商选项:{nego.split('\x00')[:-1]}')
return file_path
def transfer(self, file_path):
""" 传输过程 """
f = open(file_path, 'rb')
global GBN_ENABLE
if GBN_ENABLE and self.windowsize == 1:
self.windowsize = 8
last_ack_block = 0
send_end = False
else:
GBN_ENABLE = False
ack_block = 0
retry = 0
finish = False
while not finish:
send_block = ack_block
for _ in range(self.windowsize):
if GBN_ENABLE and send_end:
finish = True
break
readable, _, _ = select.select([self.s], [], [], 0)
if self.s in readable: # 窗口未发送完收到数据
break
data = f.read(self.blksize)
size = len(data)
send_block = (send_block + 1) & 0xffff # uint_16
self.send(DATA, block=send_block, data=data)
if size < self.blksize:
finish = True
break
try:
ack_block = self.recv()
retry = 0
if GBN_ENABLE:
# ack块序号大于上次ack序号
if (ack_block - last_ack_block) & 0xffff > 0:
last_ack_block = ack_block
if finish and ack_block != send_block:
finish = False
send_end = True
ack_block = send_block
continue
else:
finish = False
send_end = False
ack_block = last_ack_block
except TimeoutError as e:
if GBN_ENABLE:
ack_block = last_ack_block
if retry == MAX_RETRY:
errmsg=f'传输失败,超过最大重传次数{MAX_RETRY}'
self.send(ERROR, errcode=0, errmsg=errmsg)
log.error(errmsg)
raise SystemExit()
retry += 1
log.error(f'块:{send_block},ACK超时{retry}次,超时时间:{self.s.gettimeout()}s')
if ack_block != send_block:
# ack序号大于发送序号,忽略掉
if (ack_block - send_block) & 0xffff > 0:
continue
finish = False
offset = ((send_block - ack_block - 1) & 0xffff) * self.blksize + size
f.seek(-offset, os.SEEK_CUR)
log.error(f'重传块:{(ack_block + 1) & 0xffff}')
class TftpServer(Thread):
""" TFTP 服务端 """
def __init__(self, tftp_dir='.', ip='0.0.0.0', port=69):
super().__init__()
self.daemon = True
self.tftp_dir = os.path.abspath(tftp_dir)
self.server_addr = (ip, port)
def stop(self):
self.is_running = False
def run(self):
self.is_running = True
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.bind(self.server_addr)
log.info(f'TFTP服务端已启动:{s.getsockname()},文件目录:{self.tftp_dir}')
while self.is_running:
data, addr = s.recvfrom(1024)
opcode = data[:2]
if len(data) >= 4 and opcode in [RRQ, WRQ]:
session = TftpSession(self.tftp_dir, addr, data)
session.start()
# else:
# log.error(f'客户端:{address},未知请求')
s.close()
if __name__ == '__main__':
server = TftpServer('.')
server.start()
server.join()