Ticket #999: bandwidth-congestion-handling-v6.patch
File bandwidth-congestion-handling-v6.patch, 33.9 KB (added by , 3 years ago) |
---|
-
xpra/server/cystats.pyx
107 107 rw += w 108 108 return int(tv / tw), int(rv / rw) 109 109 110 def calculate_timesize_weighted_average(data, float sizeunit=1.0): 110 def calculate_timesize_weighted_average(data, float unit=1.0): 111 #the value is elapsed time, 112 #so we want to divide by the value: 113 recs = tuple((a,b,unit/c) for a,b,c in data) 114 return calculate_size_weighted_average(recs) 115 116 def calculate_size_weighted_average(data): 111 117 """ 112 118 This is a time weighted average where the size 113 119 of each record also gives it a weight boost. 114 120 This is to prevent small packets from skewing the average. 115 Data format: (event_time, size, elapsed_time)121 Data format: (event_time, size, value) 116 122 """ 117 123 cdef double size_avg = sum(x for _, x, _ in data)/len(data) 118 124 cdef double now = monotonic_time() #@DuplicatedSignature … … 123 129 cdef double event_time #@DuplicatedSignature 124 130 cdef double size 125 131 cdef double size_ps 126 cdef double elapsed_time132 cdef double value 127 133 cdef double pw 128 134 cdef double w #@DuplicatedSignature 129 135 cdef double delta #@DuplicatedSignature 130 for event_time, size, elapsed_time in data:131 if elapsed_time<=0:136 for event_time, size, value in data: 137 if value<=0: 132 138 continue #invalid record 133 139 delta = now-event_time 134 140 pw = clogp(size/size_avg) 135 size_ps = max(1, size* sizeunit/elapsed_time)141 size_ps = max(1, size*value) 136 142 w = pw/(1.0+delta) 137 143 tv += w*size_ps 138 tw += w 144 tw += w*size 139 145 w = pw/(0.1+delta**2) 140 146 rv += w*size_ps 141 rw += w 147 rw += w*size 142 148 return float(tv / tw), float(rv / rw) 143 149 144 150 def calculate_for_target(metric, float target_value, float avg_value, float recent_value, float aim=0.5, float div=1.0, float slope=0.1, smoothing=logp, float weight_multiplier=1.0): -
xpra/server/source.py
64 64 NEW_STREAM_SOUND = envbool("XPRA_NEW_STREAM_SOUND", True) 65 65 PING_DETAILS = envbool("XPRA_PING_DETAILS", True) 66 66 PING_TIMEOUT = envint("XPRA_PING_TIMEOUT", 60) 67 CONGESTION_AVOIDANCE = envbool("XPRA_CONGESTION_AVOIDANCE", True) 67 68 68 69 PRINTER_LOCATION_STRING = os.environ.get("XPRA_PRINTER_LOCATION_STRING", "via xpra") 69 70 PROPERTIES_DEBUG = [x.strip() for x in os.environ.get("XPRA_WINDOW_PROPERTIES_DEBUG", "").split(",")] … … 442 443 self.double_click_time = -1 443 444 self.double_click_distance = -1, -1 444 445 self.bandwidth_limit = self.server_bandwidth_limit 446 self.soft_bandwidth_limit = self.bandwidth_limit 445 447 #what we send back in hello packet: 446 448 self.ui_client = True 447 449 self.wants_aliases = True … … 508 510 509 511 510 512 def update_bandwidth_limits(self): 511 if self. bandwidth_limit<=0 or self.mmap_size>0:513 if self.mmap_size>0: 512 514 return 515 #calculate soft bandwidth limit based on send congestion data: 516 bandwidth_limit = 0 517 if CONGESTION_AVOIDANCE: 518 bandwidth_limit = self.statistics.avg_congestion_send_speed 519 log.warn("avg_congestion_send_speed=%s", bandwidth_limit) 520 if bandwidth_limit>20*1024*1024: 521 #ignore congestion speed if greater 20Mbps 522 bandwidth_limit = 0 523 if self.bandwidth_limit>0: 524 #command line options could overrule what we detect? 525 bandwidth_limit = min(self.bandwidth_limit, bandwidth_limit) 526 self.soft_bandwidth_limit = bandwidth_limit 527 log.warn("update_bandwidth_limits() bandwidth_limit=%s, soft bandwidth limit=%s", self.bandwidth_limit, bandwidth_limit) 528 if self.soft_bandwidth_limit<=0: 529 return 513 530 #figure out how to distribute the bandwidth amongst the windows, 514 531 #we use the window size, 515 532 #(we should actually use the number of bytes actually sent: framerate, compression, etc..) … … 527 544 for wid, ws in self.window_sources.items(): 528 545 weight = window_weight.get(wid) 529 546 if weight is not None: 530 ws.bandwidth_limit = max(1, self.bandwidth_limit*weight/total_weight)547 ws.bandwidth_limit = max(1, bandwidth_limit*weight/total_weight) 531 548 532 549 def recalculate_delays(self): 533 550 """ calls update_averages() on ServerSource.statistics (GlobalStatistics) … … 538 555 if self.is_closed(): 539 556 return 540 557 self.calculate_last_time = monotonic_time() 558 self.statistics.update_averages() 541 559 self.update_bandwidth_limits() 542 self.statistics.update_averages()543 560 wids = list(self.calculate_window_ids) #make a copy so we don't clobber new wids 544 561 focus = self.get_focus() 545 562 sources = self.window_sources.items() … … 1620 1637 "suspended" : self.suspended, 1621 1638 "counter" : self.counter, 1622 1639 "hello-sent" : self.hello_sent, 1623 "bandwidth-limit" : self.bandwidth_limit, 1640 "bandwidth-limit" : { 1641 "setting" : self.bandwidth_limit, 1642 "actual" : self.soft_bandwidth_limit, 1643 } 1624 1644 } 1625 1645 if self.desktop_mode_size: 1626 1646 info["desktop_mode_size"] = self.desktop_mode_size -
xpra/server/source_stats.py
12 12 from xpra.log import Logger 13 13 log = Logger("stats") 14 14 15 from xpra.server.cystats import logp, calculate_time_weighted_average, calculate_ for_target, queue_inspect #@UnresolvedImport15 from xpra.server.cystats import logp, calculate_time_weighted_average, calculate_size_weighted_average, calculate_for_target, queue_inspect #@UnresolvedImport 16 16 from xpra.simple_stats import get_list_stats 17 17 from xpra.os_util import monotonic_time 18 18 … … 52 52 #(event_time, elapsed_time_in_seconds) 53 53 self.server_ping_latency = deque(maxlen=NRECS) #time it took for the client to get a ping_echo back from us: 54 54 #(event_time, elapsed_time_in_seconds) 55 self.congestion_send_speed = deque(maxlen=4*NRECS) #when we are being throttled, record what speed we are sending at 56 #last NRECS: (event_time, no of pixels, duration) 57 self.late_packets = set() 55 58 self.client_load = None 56 59 self.damage_events_count = 0 57 60 self.packet_count = 0 58 61 self.decode_errors = 0 62 self.last_congestion_time = 0 59 63 #these values are calculated from the values above (see update_averages) 60 64 self.min_client_latency = self.DEFAULT_LATENCY 61 65 self.avg_client_latency = self.DEFAULT_LATENCY … … 66 70 self.min_server_ping_latency = self.DEFAULT_LATENCY 67 71 self.avg_server_ping_latency = self.DEFAULT_LATENCY 68 72 self.recent_server_ping_latency = self.DEFAULT_LATENCY 73 self.avg_congestion_send_speed = 0 69 74 70 75 def record_latency(self, wid, decode_time, start_send_at, end_send_at, pixels, bytecount): 71 76 now = monotonic_time() … … 98 103 data = list(self.server_ping_latency) 99 104 self.min_server_ping_latency = min([x for _,x in data]) 100 105 self.avg_server_ping_latency, self.recent_server_ping_latency = calculate_time_weighted_average(data) 106 #set to 0 if we have less than 2 events in the last 60 seconds: 107 min_time = monotonic_time()-60 108 css = tuple(x for x in self.congestion_send_speed if x[0]>min_time) 109 if len(css)<=1: 110 self.avg_congestion_send_speed = 0 111 else: 112 self.avg_congestion_send_speed = int(calculate_size_weighted_average(list(self.congestion_send_speed))[0]) 101 113 102 114 def get_factors(self, pixel_count): 103 115 factors = [] … … 153 165 def get_info(self): 154 166 cwqsizes = [x for _,x in list(self.compression_work_qsizes)] 155 167 pqsizes = [x for _,x in list(self.packet_qsizes)] 168 now = monotonic_time() 169 time_limit = now-60 #ignore old records (60s) 156 170 info = {"damage" : { 157 171 "events" : self.damage_events_count, 158 172 "packets_sent" : self.packet_count, … … 164 178 }, 165 179 }, 166 180 "encoding" : {"decode_errors" : self.decode_errors}, 181 "congestion" : { 182 "avg-send-speed" : self.avg_congestion_send_speed, 183 "last-congestion-event" : self.last_congestion_time, 184 "late-packets-per-minute" : sum(1 for x in self.late_packets if x>time_limit), 185 }, 167 186 } 168 187 #client pixels per second: 169 now = monotonic_time()170 time_limit = now-30 #ignore old records (30s)171 188 #pixels per second: decode time and overall 172 189 total_pixels = 0 #total number of pixels processed 173 190 total_time = 0 #total decoding time -
xpra/server/window/batch_delay_calculator.py
169 169 170 170 #combine factors: use the highest one: 171 171 target = min(1.0, max(dam_lat_abs, dam_lat_rel, dec_lat, pps, 0.0)) 172 #discount for congestion by up to 40%: 173 lp = len(global_statistics.late_packets) 174 target = target*max(40, 100-lp*20)/100 172 175 173 176 #scale target between min_speed and 100: 174 177 ms = min(100.0, max(min_speed, 0.0)) … … 191 194 "target" : int(target_decode_speed), 192 195 "factor" : int(100.0*dec_lat), 193 196 }, 197 "late-packets" : lp, 194 198 } 195 199 return info, target_speed 196 200 … … 272 276 target = min(1.0, target + (1.0-pctpixdamaged*2)) 273 277 if pixl5<pixn5: 274 278 target = sqrt(target) 279 #discount for congestion by up to 40%: 280 lp = len(global_statistics.late_packets) 281 target = target*max(40, 100-lp*20)/100 282 info["late-packets"] = lp 283 275 284 #apply min-quality: 276 285 mq = min(100.0, max(min_quality, 0.0)) 277 286 target_quality = mq + (100.0-mq) * target -
xpra/server/window/window_source.py
11 11 import hashlib 12 12 import threading 13 13 from collections import deque 14 from math import sqrt 14 15 15 16 from xpra.os_util import monotonic_time 16 17 from xpra.util import envint, envbool, csv 17 18 from xpra.log import Logger 18 19 log = Logger("window", "encoding") 20 log.enable_debug() 19 21 refreshlog = Logger("window", "refresh") 20 22 compresslog = Logger("window", "compress") 21 23 damagelog = Logger("window", "damage") … … 33 35 34 36 MAX_PIXELS_PREFER_RGB = envint("XPRA_MAX_PIXELS_PREFER_RGB", 4096) 35 37 38 CONGESTION_AVOIDANCE = envbool("XPRA_CONGESTION_AVOIDANCE", True) 39 36 40 DELTA = envbool("XPRA_DELTA", True) 37 41 MIN_DELTA_SIZE = envint("XPRA_MIN_DELTA_SIZE", 1024) 38 42 MAX_DELTA_SIZE = envint("XPRA_MAX_DELTA_SIZE", 32768) … … 61 65 from xpra.server.window.batch_config import DamageBatchConfig 62 66 from xpra.simple_stats import get_list_stats 63 67 from xpra.server.window.batch_delay_calculator import calculate_batch_delay, get_target_speed, get_target_quality 64 from xpra.server.cystats import time_weighted_average 68 from xpra.server.cystats import time_weighted_average, logp #@UnresolvedImport 65 69 from xpra.server.window.region import rectangle, add_rectangle, remove_rectangle, merge_all #@UnresolvedImport 66 70 from xpra.codecs.xor.cyxor import xor_str #@UnresolvedImport 67 71 from xpra.server.picture_encode import rgb_encode, mmap_send, argb_swap … … 273 277 self.suspended = False 274 278 self.strict = STRICT_MODE 275 279 # 280 self.congestion_timers = {} 276 281 self.may_send_timer = None 277 282 self.auto_refresh_delay = 0 278 283 self.video_helper = None … … 359 364 "pixel_boost" : self._lossless_threshold_pixel_boost 360 365 }, 361 366 }) 367 info["congestion"] = { 368 "timers" : len(self.congestion_timers), 369 } 362 370 try: 363 371 #ie: get_strict_encoding -> "strict_encoding" 364 372 einfo["selection"] = self.get_best_encoding.__name__.replace("get_", "") … … 890 898 self.cancel_refresh_timer() 891 899 self.cancel_timeout_timer() 892 900 self.cancel_av_sync_timer() 901 self.cancel_congestion_timers() 893 902 #if a region was delayed, we can just drop it now: 894 903 self.refresh_regions = [] 895 904 self._damage_delayed = None … … 1106 1115 if self.full_frames_only: 1107 1116 x, y, w, h = 0, 0, ww, wh 1108 1117 1118 nlate_packets = len(self.global_statistics.late_packets) 1109 1119 delayed = self._damage_delayed 1110 1120 if delayed: 1111 1121 #use existing delayed region: … … 1124 1134 existing_options[k] = options[k] 1125 1135 damagelog("damage%-24s wid=%s, using existing delayed %s regions created %.1fms ago", 1126 1136 (x, y, w, h, options), self.wid, delayed[3], now-delayed[0]) 1127 if not self.expire_timer and not self.soft_timer and self.soft_expired==0: 1128 log.error("Error: bug, found a delayed region without a timer!") 1137 if not self.expire_timer and not self.soft_timer and self.soft_expired==0 and not nlate_packets and not self.congestion_timers: 1138 #this can happen after congestion recovery 1139 log.warn("Warning: found a delayed region without a timer") 1129 1140 self.expire_timer = self.timeout_add(0, self.expire_delayed_region, 0) 1130 1141 return 1131 1142 elif self.batch_config.delay <= self.batch_config.min_delay and not self.batch_config.always: … … 1149 1160 delay = max(10, min(self.batch_config.min_delay, delay)) * (qsize/4.0) 1150 1161 delay = max(delay, options.get("min_delay", 0)) 1151 1162 delay = min(delay, options.get("max_delay", self.batch_config.max_delay)) 1152 delay = int(delay )1163 delay = int(delay*sqrt(1+nlate_packets)) 1153 1164 packets_backlog = self.statistics.get_packets_backlog() 1154 1165 pixels_encoding_backlog, enc_backlog_count = self.statistics.get_pixels_encoding_backlog() 1155 1166 #only send without batching when things are going well: … … 1156 1167 # - no packets backlog from the client 1157 1168 # - the amount of pixels waiting to be encoded is less than one full frame refresh 1158 1169 # - no more than 10 regions waiting to be encoded 1159 if not self.must_batch(delay) and (packets_backlog==0 and pixels_encoding_backlog<=ww*wh and enc_backlog_count<=10 ):1170 if not self.must_batch(delay) and (packets_backlog==0 and pixels_encoding_backlog<=ww*wh and enc_backlog_count<=10 and nlate_packets==0): 1160 1171 #send without batching: 1161 1172 damagelog("damage%-24s wid=%s, sending now with sequence %s", (x, y, w, h, options), self.wid, self._sequence) 1162 1173 actual_encoding = options.get("encoding") … … 1207 1218 delayed = self._damage_delayed 1208 1219 if not delayed: 1209 1220 #region has been sent 1210 return 1221 return False 1211 1222 self.cancel_may_send_timer() 1212 self.may_send_delayed() 1213 delayed = self._damage_delayed 1214 if not delayed: 1215 #got sent 1216 return 1217 #the region has not been sent yet because we are waiting for damage ACKs from the client 1223 lp = len(self.global_statistics.late_packets) 1224 if lp==0: 1225 self.may_send_delayed() 1226 delayed = self._damage_delayed 1227 if not delayed: 1228 #got sent 1229 return False 1230 #the region has not been sent yet, 1231 #because we are waiting for damage ACKs from the client, 1232 #or even waiting to send some packets 1218 1233 if self.soft_expired<self.max_soft_expired: 1219 #there aren't too many regions soft expired yet 1234 #there aren't too many regions soft expired yet, 1220 1235 #so use the "soft timer": 1221 1236 self.soft_expired += 1 1222 1237 #we have already waited for "delay" to get here, wait more as we soft expire more regions: 1223 self.soft_timer = self.timeout_add(int(self.soft_expired*delay), self.delayed_region_soft_timeout) 1238 soft_delay = int(delay*max(self.soft_expired, sqrt(1+lp))) 1239 log("expire_delayed_region() soft delay=%i, soft_expired=%i, late packets=%i", soft_delay, self.soft_expired, lp) 1240 self.soft_timer = self.timeout_add(soft_delay, self.delayed_region_soft_timeout, lp) 1224 1241 else: 1225 1242 #NOTE: this should never happen... 1226 1243 #the region should now get sent when we eventually receive the pending ACKs … … 1227 1244 #but if somehow they go missing... clean it up from a timeout: 1228 1245 delayed_region_time = delayed[0] 1229 1246 self.timeout_timer = self.timeout_add(self.batch_config.timeout_delay, self.delayed_region_timeout, delayed_region_time) 1247 return False 1230 1248 1231 def delayed_region_soft_timeout(self): 1249 def delayed_region_soft_timeout(self, lp): 1250 #FIXME: if we have many late packets, re-schedule 1251 # but need to know if they belong to this window to know how... 1252 # (wait for ack vs timer) 1253 new_lp = len(self.global_statistics.late_packets) 1254 log("delayed_region_soft_timeout(%i) new lp=%i, late packets=%s", lp, new_lp, self.global_statistics.late_packets) 1255 if lp>0 and new_lp>=lp: 1256 #not getting better - re-schedule 1257 return True 1232 1258 self.soft_timer = None 1233 1259 self.do_send_delayed() 1234 1260 return False … … 1253 1279 if dap: 1254 1280 log.warn(" %i late responses:", len(dap)) 1255 1281 for seq in sorted(dap.keys()): 1256 ack_data = dap .get(seq)1257 if ack_data and ack_data[0]>0:1282 ack_data = dap[seq] 1283 if ack_data[0]>0: 1258 1284 log.warn(" %6i %-5s: %3is", seq, ack_data[1], now-ack_data[3]) 1259 1285 #re-try: cancel anything pending and do a full quality refresh 1260 1286 self.cancel_damage() … … 1277 1303 if not dd: 1278 1304 log("window %s delayed region already sent", self.wid) 1279 1305 return 1306 stats = self.statistics 1307 if not stats: 1308 #closing 1309 return 1280 1310 damage_time = dd[0] 1281 packets_backlog = self.statistics.get_packets_backlog()1282 1311 now = monotonic_time() 1283 1312 actual_delay = int(1000.0 * (now-damage_time)) 1313 def check_again(delay=actual_delay/10.0): 1314 #schedules a call to check again: 1315 delay = int(min(self.batch_config.max_delay, max(10, delay))) 1316 self.may_send_timer = self.timeout_add(delay, self._may_send_delayed) 1317 return 1318 lp = len(self.global_statistics.late_packets) 1319 if lp: 1320 1321 #reduce the backlog tolerance: 1322 pass 1323 latency_tolerance_pct = 100//(1+lp) 1324 packets_backlog = self.statistics.get_packets_backlog(latency_tolerance_pct) 1284 1325 if packets_backlog>0: 1285 1326 if actual_delay>self.batch_config.timeout_delay: 1286 1327 log.warn("send_delayed for wid %s, elapsed time %ims is above limit of %.1f", self.wid, actual_delay, self.batch_config.max_delay) … … 1292 1333 #if we're here, there is no packet backlog, but there may be damage acks pending or a bandwidth limit to honour, 1293 1334 #if there are acks pending, may_send_delayed() should be called again from damage_packet_acked, 1294 1335 #if not, we must either process the region now or set a timer to check again later 1295 def check_again(delay=actual_delay/10.0):1296 #schedules a call to check again:1297 delay = int(min(self.batch_config.max_delay, max(10, delay)))1298 self.may_send_timer = self.timeout_add(delay, self._may_send_delayed)1299 return1300 1336 #locked means a fixed delay we try to honour, 1301 1337 #this code ensures that we don't fire too early if called from damage_packet_acked 1302 1338 if self.batch_config.locked: … … 1307 1343 else: 1308 1344 self.do_send_delayed() 1309 1345 return 1310 if self.bandwidth_limit>0: 1311 used = self.statistics.get_bits_encoded() 1312 log("may_send_delayed() bandwidth limit=%i, used=%i : %i%%", self.bandwidth_limit, used, 100*used//self.bandwidth_limit) 1313 if used>=self.bandwidth_limit: 1346 bwl = self.bandwidth_limit 1347 if bwl>0: 1348 #log("may_send_delayed() last 5 seconds: bandwidth limit=%i, used=%i : %i%%", bwl, used5, 100*used5//bwl) 1349 used = self.statistics.get_bitrate(1) or self.statistics.get_bitrate(5) 1350 log("may_send_delayed() last 1 second: bandwidth limit=%i, used=%i : %i%%", bwl, used, 100*used//bwl) 1351 if used>=bwl: 1314 1352 check_again(50) 1315 1353 return 1316 1354 pixels_encoding_backlog, enc_backlog_count = self.statistics.get_pixels_encoding_backlog() … … 1331 1369 self.do_send_delayed() 1332 1370 1333 1371 def do_send_delayed(self): 1372 import traceback 1373 traceback.print_stack(limit=4) 1334 1374 self.cancel_timeout_timer() 1335 1375 self.cancel_soft_timer() 1336 1376 delayed = self._damage_delayed … … 1368 1408 1369 1409 def send_full_window_update(): 1370 1410 actual_encoding = get_encoding(ww*wh) 1411 if actual_encoding=="png": 1412 log.error("send_full_window_update() %s(%i)=%s, get_best_encoding=%s", get_encoding, ww*wh, actual_encoding, get_best_encoding) 1413 import traceback 1414 traceback.print_stack(limit=5) 1371 1415 log("send_delayed_regions: using full window update %sx%s with %s", ww, wh, actual_encoding) 1372 1416 assert actual_encoding is not None 1373 1417 self.process_damage_region(damage_time, 0, 0, ww, wh, actual_encoding, options) … … 1596 1640 #delay less when speed is high 1597 1641 #(the resulting range is 100*100 to 200*200) 1598 1642 qsmult = (200-self._current_quality) * (100+self._current_speed) 1643 #when there is congestion, the auto-refresh can kill the bandwidth: 1644 qsmult *= sqrt(1+len(self.global_statistics.late_packets)) 1599 1645 #important: must check both, I think: 1600 1646 if target_time==0 or not self.refresh_timer: 1601 1647 #this means we must schedule the refresh … … 1647 1693 We figure out if now is the right time to do the refresh, 1648 1694 and if not re-schedule. 1649 1695 """ 1696 #FIXME: delay some more when we detect congestion 1650 1697 #timer is running now, clear so we don't try to cancel it somewhere else: 1651 1698 self.refresh_timer = None 1652 1699 #re-do some checks that may have changed: … … 1730 1777 - damage latency (via a callback once the packet is actually sent) 1731 1778 """ 1732 1779 #packet = ["draw", wid, x, y, w, h, coding, data, self._damage_packet_sequence, rowstride, client_options] 1780 statistics = self.statistics 1781 gs = self.global_statistics 1782 now = monotonic_time() 1733 1783 width = packet[4] 1734 1784 height = packet[5] 1735 1785 coding = packet[6] 1786 ldata = len(packet[7]) 1736 1787 damage_packet_sequence = packet[8] 1737 1788 actual_batch_delay = process_damage_time-damage_time 1789 queue_time = now 1738 1790 ack_pending = [0, coding, 0, 0, 0, width*height] 1739 statistics = self.statistics 1740 statistics.damage_ack_pending[damage_packet_sequence] = ack_pending 1791 if statistics: 1792 statistics.damage_ack_pending[damage_packet_sequence] = ack_pending 1793 max_send_delay = int(5*logp(ldata/1024.0)) 1741 1794 def start_send(bytecount): 1742 1795 ack_pending[0] = monotonic_time() 1743 1796 ack_pending[2] = bytecount 1744 statistics.last_sequence_sending = damage_packet_sequence 1797 log.info("queue to send delay (start): %4ims (max=%3i, damage_packet_sequence=%5i)", 1000*(now-queue_time), max_send_delay, damage_packet_sequence) 1798 if statistics: 1799 statistics.last_sequence_sending = damage_packet_sequence 1745 1800 def damage_packet_sent(bytecount): 1801 if CONGESTION_AVOIDANCE: 1802 #use idle_add to prevent race conditions accessing the list of late packets: 1803 self.idle_add(self.cancel_congestion_timer, damage_packet_sequence) 1746 1804 now = monotonic_time() 1747 1805 ack_pending[3] = now 1748 1806 ack_pending[4] = bytecount 1749 if process_damage_time>0: 1750 damage_out_latency = now-process_damage_time 1751 statistics.damage_out_latency.append((now, width*height, actual_batch_delay, damage_out_latency)) 1752 if process_damage_time>0: 1753 now = monotonic_time() 1754 damage_in_latency = now-process_damage_time 1755 statistics.damage_in_latency.append((now, width*height, actual_batch_delay, damage_in_latency)) 1756 #log.info("queuing %s packet with fail_cb=%s", coding, fail_cb) 1807 if process_damage_time and statistics: 1808 statistics.damage_out_latency.append((now, width*height, actual_batch_delay, now-process_damage_time)) 1809 if gs: 1810 elapsed = now-ack_pending[0] 1811 send_speed = 0 1812 if elapsed>0: 1813 send_speed = int(8*ldata/elapsed) 1814 log.info("queue to send delay (end) : %4ims (max=%3i, damage_packet_sequence=%5i) for %4iKB using %6s: %6iMbps, elapsed=%.2fms", 1000*elapsed, max_send_delay, damage_packet_sequence, ldata//1024, coding, send_speed//1024//1024, elapsed*1000) 1815 if elapsed*1000>max_send_delay and send_speed<100*1024*1024: 1816 log.warn("recording congestion send speed=%i", send_speed) 1817 gs.congestion_send_speed.append((now, ldata, send_speed)) 1818 if statistics: 1819 statistics.last_sequence_queued = damage_packet_sequence 1820 if process_damage_time: 1821 statistics.damage_in_latency.append((now, width*height, actual_batch_delay, now-process_damage_time)) 1822 if CONGESTION_AVOIDANCE: 1823 # 1KB -> 5ms, 10KB -> 17ms, 100KB -> 33ms, 1MB -> 50ms, 10MB -> 66ms 1824 log.info("queueing %4i, size=%i, max_send_delay=%4i", damage_packet_sequence, ldata, max_send_delay) 1825 t = self.timeout_add(max_send_delay, self.congestion_timer_event, damage_packet_sequence) 1826 self.congestion_timers[damage_packet_sequence] = t 1757 1827 self.queue_packet(packet, self.wid, width*height, start_send, damage_packet_sent, self.get_fail_cb(packet)) 1758 1828 1829 def congestion_timer_event(self, damage_packet_sequence): 1830 log.info("congestion_timer_event() damage_packet_sequence=%s, late packets=%s", damage_packet_sequence, self.global_statistics.late_packets) 1831 try: 1832 del self.congestion_timers[damage_packet_sequence] 1833 except: 1834 pass 1835 self.global_statistics.late_packets.add((self.wid, damage_packet_sequence)) 1836 self.global_statistics.last_congestion_time = monotonic_time() 1837 return False 1838 1839 def cancel_congestion_timers(self): 1840 timers = list(self.congestion_timers.values()) 1841 self.congestion_timers = {} 1842 for t in timers: 1843 self.source_remove(t) 1844 1845 def cancel_congestion_timer(self, damage_packet_sequence): 1846 t = self.congestion_timers.get(damage_packet_sequence) 1847 log("cancel_congestion_timer(%i) timer=%s", damage_packet_sequence, t) 1848 if t: 1849 self.source_remove(t) 1850 try: 1851 del self.congestion_timers[damage_packet_sequence] 1852 except: 1853 pass 1854 #record last_congestion_time before removing 1855 #what is potentially the last item from late_packets 1856 self.global_statistics.last_congestion_time = monotonic_time() 1857 late_packets = self.global_statistics.late_packets 1858 l = len(late_packets) 1859 try: 1860 late_packets.remove((self.wid, damage_packet_sequence)) 1861 except KeyError: 1862 #congestion event had not fired yet 1863 return 1864 if l>0 and len(late_packets)==0: 1865 log.info("cancel_congestion_timer(%i) last congestion sequence cleared, trying to send delayed region", damage_packet_sequence) 1866 self.may_send_delayed() 1867 else: 1868 log.info("cancel_congestion_timer(%i) remaining congestion sequences=%s", damage_packet_sequence, late_packets) 1869 1759 1870 def get_fail_cb(self, packet): 1760 1871 def resend(): 1761 1872 log("paint packet failure, resending") … … 1960 2071 end = monotonic_time() 1961 2072 compresslog("compress: %5.1fms for %4ix%-4i pixels at %4i,%-4i for wid=%-5i using %6s with ratio %5.1f%% (%5iKB to %5iKB), sequence %5i, client_options=%s", 1962 2073 (end-start)*1000.0, outw, outh, x, y, self.wid, coding, 100.0*csize/psize, psize/1024, csize/1024, self._damage_packet_sequence, client_options) 1963 self.statistics.encoding_stats.append((end, coding, w*h, bpp, len(data), end-start))2074 self.statistics.encoding_stats.append((end, coding, w*h, bpp, csize, end-start)) 1964 2075 return self.make_draw_packet(x, y, outw, outh, coding, data, outstride, client_options, options) 1965 2076 1966 2077 def make_draw_packet(self, x, y, outw, outh, coding, data, outstride, client_options={}, options={}): -
xpra/server/window/window_stats.py
62 62 self.last_recalculate = 0 63 63 self.damage_events_count = 0 64 64 self.packet_count = 0 65 self.last_sequence_queued = 0 66 self.last_sequence_sending = 0 65 67 66 68 self.last_resized = 0 67 69 … … 87 89 #client decode speed: 88 90 if len(self.client_decode_time)>0: 89 91 #the elapsed time recorded is in microseconds, so multiply by 1000*1000 to get the real value: 90 self.avg_decode_speed, self.recent_decode_speed = calculate_timesize_weighted_average(list(self.client_decode_time), sizeunit=1000*1000)92 self.avg_decode_speed, self.recent_decode_speed = calculate_timesize_weighted_average(list(self.client_decode_time), unit=1000*1000) 91 93 #network send speed: 92 94 all_l = [0.1, 93 95 self.avg_damage_in_latency, self.recent_damage_in_latency, … … 153 155 info = {"damage" : {"events" : self.damage_events_count, 154 156 "packets_sent" : self.packet_count, 155 157 "target-latency" : int(1000*self.target_latency), 158 "last-queued" : self.last_sequence_queued, 159 "last-sending" : self.last_sequence_sending, 156 160 } 157 161 } 158 162 #encoding stats: … … 244 248 def get_acks_pending(self): 245 249 return sum(1 for x in self.damage_ack_pending.values() if x[0]!=0) 246 250 247 def get_packets_backlog(self ):251 def get_packets_backlog(self, latency_tolerance_pct=100): 248 252 packets_backlog = 0 249 253 if len(self.damage_ack_pending)>0: 250 sent_before = monotonic_time()-( self.target_latency+0.020)254 sent_before = monotonic_time()-((self.target_latency+0.020)*100.0/latency_tolerance_pct) 251 255 for _, (start_send_at, _, _, end_send_at, _, _) in self.damage_ack_pending.items(): 252 256 if end_send_at>0 and start_send_at<=sent_before: 253 257 packets_backlog += 1 … … 260 264 count += 1 261 265 return pixels, count 262 266 263 def get_bits_encoded(self, elapsed=1): 264 cutoff = monotonic_time()-elapsed 265 return sum(v[4] for v in self.encoding_stats if v[0]>cutoff) * 8 267 def get_bitrate(self, max_elapsed=1): 268 cutoff = monotonic_time()-max_elapsed 269 recs = tuple((v[0], v[4]) for v in self.encoding_stats if v[0]>=cutoff) 270 if len(recs)<2: 271 return 0 272 bits = sum(v[1] for v in recs) * 8 273 elapsed = recs[-1][0]-recs[0][0] 274 return bits/elapsed 266 275 267 276 def get_damage_pixels(self, elapsed=1): 268 277 cutoff = monotonic_time()-elapsed -
xpra/server/window/window_video_source.py
463 463 #take into account how many pixels need to be encoded: 464 464 #more pixels means we switch to lossless more easily 465 465 lossless_q = min(100, self._lossless_threshold_base + self._lossless_threshold_pixel_boost * pixel_count / (ww*wh)) 466 if quality<lossless_q and self.image_depth>16 and "jpeg" in options: 466 lce = monotonic_time()-self.global_statistics.last_congestion_time 467 if (quality<lossless_q or self.global_statistics.late_packets or lce<1.0) and self.image_depth>16 and "jpeg" in options: 467 468 #assume that we have "turbojpeg", 468 469 #which beats everything in terms of efficiency for lossy compression: 469 470 return "jpeg"