xpra icon
Bug tracker and wiki

Ticket #999: bandwidth-congestion-handling-v7.patch

File bandwidth-congestion-handling-v7.patch, 21.7 KB (added by Antoine Martin, 21 months ago)

updated patch to apply to r17452

  • xpra/server/source_stats.py

     
    5454                                                            #(event_time, elapsed_time_in_seconds)
    5555        self.congestion_send_speed = deque(maxlen=4*NRECS)  #when we are being throttled, record what speed we are sending at
    5656                                                            #last NRECS: (event_time, no of pixels, duration)
     57        self.late_packets = set()
    5758        self.client_load = None
    5859        self.damage_events_count = 0
    5960        self.packet_count = 0
    6061        self.decode_errors = 0
     62        self.last_congestion_time = 0
    6163        #these values are calculated from the values above (see update_averages)
    6264        self.min_client_latency = self.DEFAULT_LATENCY
    6365        self.avg_client_latency = self.DEFAULT_LATENCY
     
    178180                "encoding" : {"decode_errors"   : self.decode_errors},
    179181                "congestion" : {
    180182                    "avg-send-speed"        : self.avg_congestion_send_speed,
     183                    "last-congestion-event" : self.last_congestion_time,
     184                    "late-packets-per-minute" : sum(1 for x in self.late_packets if x>time_limit),
    181185                                },
    182186            }
    183187        #client pixels per second:
  • xpra/server/window/batch_delay_calculator.py

     
    169169
    170170    #combine factors: use the highest one:
    171171    target = min(1.0, max(dam_lat_abs, dam_lat_rel, dec_lat, pps, 0.0))
     172    #discount for congestion by up to 40%:
     173    lp = len(global_statistics.late_packets)
     174    target = target*max(40, 100-lp*20)/100
    172175
    173176    #scale target between min_speed and 100:
    174177    ms = min(100.0, max(min_speed, 0.0))
     
    191194                                           "target"   : int(target_decode_speed),
    192195                                           "factor"   : int(100.0*dec_lat),
    193196                                           },
     197            "late-packets"              : lp,
    194198            }
    195199    return info, target_speed
    196200
     
    272276            target = min(1.0, target + (1.0-pctpixdamaged*2))
    273277        if pixl5<pixn5:
    274278            target = sqrt(target)
     279    #discount for congestion by up to 40%:
     280    lp = len(global_statistics.late_packets)
     281    target = target*max(40, 100-lp*20)/100
     282    info["late-packets"] = lp
     283
    275284    #apply min-quality:
    276285    mq = min(100.0, max(min_quality, 0.0))
    277286    target_quality = mq + (100.0-mq) * target
  • xpra/server/window/window_source.py

     
    1111import hashlib
    1212import threading
    1313from collections import deque
     14from math import sqrt
    1415
    1516from xpra.os_util import monotonic_time
    1617from xpra.util import envint, envbool, csv
    1718from xpra.log import Logger
    1819log = Logger("window", "encoding")
     20log.enable_debug()
    1921refreshlog = Logger("window", "refresh")
    2022compresslog = Logger("window", "compress")
    2123damagelog = Logger("window", "damage")
     
    3335
    3436MAX_PIXELS_PREFER_RGB = envint("XPRA_MAX_PIXELS_PREFER_RGB", 4096)
    3537
     38CONGESTION_AVOIDANCE = envbool("XPRA_CONGESTION_AVOIDANCE", True)
     39
    3640DELTA = envbool("XPRA_DELTA", True)
    3741MIN_DELTA_SIZE = envint("XPRA_MIN_DELTA_SIZE", 1024)
    3842MAX_DELTA_SIZE = envint("XPRA_MAX_DELTA_SIZE", 32768)
     
    273277        self.suspended = False
    274278        self.strict = STRICT_MODE
    275279        #
     280        self.congestion_timers = {}
    276281        self.may_send_timer = None
    277282        self.auto_refresh_delay = 0
    278283        self.video_helper = None
     
    359364                                               "pixel_boost"    : self._lossless_threshold_pixel_boost
    360365                                               },
    361366                      })
     367        info["congestion"] = {
     368            "timers"                : len(self.congestion_timers),
     369            }
    362370        try:
    363371            #ie: get_strict_encoding -> "strict_encoding"
    364372            einfo["selection"] = self.get_best_encoding.__name__.replace("get_", "")
     
    890898        self.cancel_refresh_timer()
    891899        self.cancel_timeout_timer()
    892900        self.cancel_av_sync_timer()
     901        self.cancel_congestion_timers()
    893902        #if a region was delayed, we can just drop it now:
    894903        self.refresh_regions = []
    895904        self._damage_delayed = None
     
    11061115        if self.full_frames_only:
    11071116            x, y, w, h = 0, 0, ww, wh
    11081117
     1118        nlate_packets = len(self.global_statistics.late_packets)
    11091119        delayed = self._damage_delayed
    11101120        if delayed:
    11111121            #use existing delayed region:
     
    11241134                        existing_options[k] = options[k]
    11251135            damagelog("damage%-24s wid=%s, using existing delayed %s regions created %.1fms ago",
    11261136                (x, y, w, h, options), self.wid, delayed[3], now-delayed[0])
    1127             if not self.expire_timer and not self.soft_timer and self.soft_expired==0:
    1128                 log.error("Error: bug, found a delayed region without a timer!")
     1137            if not self.expire_timer and not self.soft_timer and self.soft_expired==0 and not nlate_packets and not self.congestion_timers:
     1138                #this can happen after congestion recovery
     1139                log.warn("Warning: found a delayed region without a timer")
    11291140                self.expire_timer = self.timeout_add(0, self.expire_delayed_region, 0)
    11301141            return
    11311142        elif self.batch_config.delay <= self.batch_config.min_delay and not self.batch_config.always:
     
    11491160            delay = max(10, min(self.batch_config.min_delay, delay)) * (qsize/4.0)
    11501161        delay = max(delay, options.get("min_delay", 0))
    11511162        delay = min(delay, options.get("max_delay", self.batch_config.max_delay))
    1152         delay = int(delay)
     1163        delay = int(delay*sqrt(1+nlate_packets))
    11531164        packets_backlog = self.statistics.get_packets_backlog()
    11541165        pixels_encoding_backlog, enc_backlog_count = self.statistics.get_pixels_encoding_backlog()
    11551166        #only send without batching when things are going well:
     
    11561167        # - no packets backlog from the client
    11571168        # - the amount of pixels waiting to be encoded is less than one full frame refresh
    11581169        # - no more than 10 regions waiting to be encoded
    1159         if not self.must_batch(delay) and (packets_backlog==0 and pixels_encoding_backlog<=ww*wh and enc_backlog_count<=10):
     1170        if not self.must_batch(delay) and (packets_backlog==0 and pixels_encoding_backlog<=ww*wh and enc_backlog_count<=10 and nlate_packets==0):
    11601171            #send without batching:
    11611172            damagelog("damage%-24s wid=%s, sending now with sequence %s", (x, y, w, h, options), self.wid, self._sequence)
    11621173            actual_encoding = options.get("encoding")
     
    12091220            #region has been sent
    12101221            return False
    12111222        self.cancel_may_send_timer()
    1212         self.may_send_delayed()
    1213         delayed = self._damage_delayed
    1214         if not delayed:
    1215             #got sent
    1216             return False
    1217         #the region has not been sent yet because we are waiting for damage ACKs from the client
     1223        lp = len(self.global_statistics.late_packets)
     1224        if lp==0:
     1225            self.may_send_delayed()
     1226            delayed = self._damage_delayed
     1227            if not delayed:
     1228                #got sent
     1229                return False
     1230        #the region has not been sent yet,
     1231        #because we are waiting for damage ACKs from the client,
     1232        #or even waiting to send some packets
    12181233        if self.soft_expired<self.max_soft_expired:
    12191234            #there aren't too many regions soft expired yet
    12201235            #so use the "soft timer":
    12211236            self.soft_expired += 1
    12221237            #we have already waited for "delay" to get here, wait more as we soft expire more regions:
    1223             self.soft_timer = self.timeout_add(int(self.soft_expired*delay), self.delayed_region_soft_timeout)
     1238            soft_delay = int(delay*max(self.soft_expired, sqrt(1+lp)))
     1239            log("expire_delayed_region() soft delay=%i, soft_expired=%i, late packets=%i", soft_delay, self.soft_expired, lp)
     1240            self.soft_timer = self.timeout_add(soft_delay, self.delayed_region_soft_timeout, lp)
    12241241        else:
    12251242            #NOTE: this should never happen...
    12261243            #the region should now get sent when we eventually receive the pending ACKs
     
    12291246            self.timeout_timer = self.timeout_add(self.batch_config.timeout_delay, self.delayed_region_timeout, delayed_region_time)
    12301247        return False
    12311248
    1232     def delayed_region_soft_timeout(self):
     1249    def delayed_region_soft_timeout(self, lp):
     1250        #FIXME: if we have many late packets, re-schedule
     1251        # but need to know if they belong to this window to know how...
     1252        # (wait for ack vs timer)
     1253        new_lp = len(self.global_statistics.late_packets)
     1254        log("delayed_region_soft_timeout(%i) new lp=%i, late packets=%s", lp, new_lp, self.global_statistics.late_packets)
     1255        if lp>0 and new_lp>=lp:
     1256            #not getting better - re-schedule
     1257            return True
    12331258        self.soft_timer = None
    12341259        self.do_send_delayed()
    12351260        return False
     
    12781303        if not dd:
    12791304            log("window %s delayed region already sent", self.wid)
    12801305            return
     1306        stats = self.statistics
     1307        if not stats:
     1308            #closing
     1309            return
    12811310        damage_time = dd[0]
    1282         packets_backlog = self.statistics.get_packets_backlog()
    12831311        now = monotonic_time()
    12841312        actual_delay = int(1000.0 * (now-damage_time))
     1313        def check_again(delay=actual_delay/10.0):
     1314            #schedules a call to check again:
     1315            delay = int(min(self.batch_config.max_delay, max(10, delay)))
     1316            self.may_send_timer = self.timeout_add(delay, self._may_send_delayed)
     1317            return
     1318        lp = len(self.global_statistics.late_packets)
     1319        if lp:
     1320           
     1321            #reduce the backlog tolerance:
     1322            pass
     1323        latency_tolerance_pct = 100//(1+lp)
     1324        packets_backlog = self.statistics.get_packets_backlog(latency_tolerance_pct)
    12851325        if packets_backlog>0:
    12861326            if actual_delay>self.batch_config.timeout_delay:
    12871327                log.warn("send_delayed for wid %s, elapsed time %ims is above limit of %.1f", self.wid, actual_delay, self.batch_config.max_delay)
     
    12931333        #if we're here, there is no packet backlog, but there may be damage acks pending or a bandwidth limit to honour,
    12941334        #if there are acks pending, may_send_delayed() should be called again from damage_packet_acked,
    12951335        #if not, we must either process the region now or set a timer to check again later
    1296         def check_again(delay=actual_delay/10.0):
    1297             #schedules a call to check again:
    1298             delay = int(min(self.batch_config.max_delay, max(10, delay)))
    1299             self.may_send_timer = self.timeout_add(delay, self._may_send_delayed)
    1300             return
    13011336        #locked means a fixed delay we try to honour,
    13021337        #this code ensures that we don't fire too early if called from damage_packet_acked
    13031338        if self.batch_config.locked:
     
    13081343            else:
    13091344                self.do_send_delayed()
    13101345            return
    1311         if self.bandwidth_limit>0:
    1312             used = self.statistics.get_bitrate()
    1313             log("may_send_delayed() bandwidth limit=%i, used=%i : %i%%", self.bandwidth_limit, used, 100*used//self.bandwidth_limit)
    1314             if used>=self.bandwidth_limit:
     1346        bwl = self.bandwidth_limit
     1347        if bwl>0:
     1348            #log("may_send_delayed() last 5 seconds: bandwidth limit=%i, used=%i : %i%%", bwl, used5, 100*used5//bwl)
     1349            used = self.statistics.get_bitrate(1) or self.statistics.get_bitrate(5)
     1350            log("may_send_delayed() last 1 second:  bandwidth limit=%i, used=%i : %i%%", bwl, used, 100*used//bwl)
     1351            if used>=bwl:
    13151352                check_again(50)
    13161353                return
    13171354        pixels_encoding_backlog, enc_backlog_count = self.statistics.get_pixels_encoding_backlog()
     
    13321369        self.do_send_delayed()
    13331370
    13341371    def do_send_delayed(self):
     1372        import traceback
     1373        traceback.print_stack(limit=4)
    13351374        self.cancel_timeout_timer()
    13361375        self.cancel_soft_timer()
    13371376        delayed = self._damage_delayed
     
    13691408
    13701409        def send_full_window_update():
    13711410            actual_encoding = get_encoding(ww*wh)
     1411            if actual_encoding=="png":
     1412                log.error("send_full_window_update() %s(%i)=%s, get_best_encoding=%s", get_encoding, ww*wh, actual_encoding, get_best_encoding)
     1413                import traceback
     1414                traceback.print_stack(limit=5)
    13721415            log("send_delayed_regions: using full window update %sx%s with %s", ww, wh, actual_encoding)
    13731416            assert actual_encoding is not None
    13741417            self.process_damage_region(damage_time, 0, 0, ww, wh, actual_encoding, options)
     
    15971640                #delay less when speed is high
    15981641                #(the resulting range is 100*100 to 200*200)
    15991642                qsmult = (200-self._current_quality) * (100+self._current_speed)
     1643                #when there is congestion, the auto-refresh can kill the bandwidth:
     1644                qsmult *= sqrt(1+len(self.global_statistics.late_packets))
    16001645                #important: must check both, I think:
    16011646                if target_time==0 or not self.refresh_timer:
    16021647                    #this means we must schedule the refresh
     
    16481693            We figure out if now is the right time to do the refresh,
    16491694            and if not re-schedule.
    16501695        """
     1696        #FIXME: delay some more when we detect congestion
    16511697        #timer is running now, clear so we don't try to cancel it somewhere else:
    16521698        self.refresh_timer = None
    16531699        #re-do some checks that may have changed:
     
    17311777            - damage latency (via a callback once the packet is actually sent)
    17321778        """
    17331779        #packet = ["draw", wid, x, y, w, h, coding, data, self._damage_packet_sequence, rowstride, client_options]
     1780        now = monotonic_time()
    17341781        width = packet[4]
    17351782        height = packet[5]
    17361783        coding = packet[6]
     
    17371784        ldata = len(packet[7])
    17381785        damage_packet_sequence = packet[8]
    17391786        actual_batch_delay = process_damage_time-damage_time
     1787        queue_time = now
    17401788        ack_pending = [0, coding, 0, 0, 0, width*height]
    17411789        statistics = self.statistics
    1742         statistics.damage_ack_pending[damage_packet_sequence] = ack_pending
    17431790        gs = self.global_statistics
     1791        if statistics:
     1792            statistics.damage_ack_pending[damage_packet_sequence] = ack_pending
    17441793        max_send_delay = int(5*logp(ldata/1024.0))
    17451794        def start_send(bytecount):
    17461795            ack_pending[0] = monotonic_time()
    17471796            ack_pending[2] = bytecount
    1748             statistics.last_sequence_sending = damage_packet_sequence
     1797            log.info("queue to send delay (start): %4ims (max=%3i, damage_packet_sequence=%5i)", 1000*(now-queue_time), max_send_delay, damage_packet_sequence)
     1798            if statistics:
     1799                statistics.last_sequence_sending = damage_packet_sequence
    17491800        def damage_packet_sent(bytecount):
     1801            if CONGESTION_AVOIDANCE:
     1802                #use idle_add to prevent race conditions accessing the list of late packets:
     1803                self.idle_add(self.cancel_congestion_timer, damage_packet_sequence)
    17501804            now = monotonic_time()
    17511805            ack_pending[3] = now
    17521806            ack_pending[4] = bytecount
    1753             if process_damage_time>0:
    1754                 damage_out_latency = now-process_damage_time
    1755                 statistics.damage_out_latency.append((now, width*height, actual_batch_delay, damage_out_latency))
     1807            if process_damage_time and statistics:
     1808                statistics.damage_out_latency.append((now, width*height, actual_batch_delay, now-process_damage_time))
    17561809            if gs:
    17571810                elapsed = now-ack_pending[0]
    17581811                send_speed = 0
    17591812                if elapsed>0:
    17601813                    send_speed = int(8*ldata/elapsed)
     1814                log.info("queue to send delay (end)  : %4ims (max=%3i, damage_packet_sequence=%5i) for %4iKB using %6s: %6iMbps, elapsed=%.2fms", 1000*elapsed, max_send_delay, damage_packet_sequence, ldata//1024, coding, send_speed//1024//1024, elapsed*1000)
    17611815                if elapsed*1000>max_send_delay and send_speed<100*1024*1024:
    1762                     log("recording congestion send speed=%i", send_speed)
     1816                    log.warn("recording congestion send speed=%i", send_speed)
    17631817                    gs.congestion_send_speed.append((now, ldata, send_speed))
    1764         if process_damage_time>0:
    1765             now = monotonic_time()
    1766             damage_in_latency = now-process_damage_time
    1767             statistics.damage_in_latency.append((now, width*height, actual_batch_delay, damage_in_latency))
    1768         #log.info("queuing %s packet with fail_cb=%s", coding, fail_cb)
     1818        if statistics:
     1819            statistics.last_sequence_queued = damage_packet_sequence
     1820            if process_damage_time:
     1821                statistics.damage_in_latency.append((now, width*height, actual_batch_delay, now-process_damage_time))
     1822        if CONGESTION_AVOIDANCE:
     1823            #  1KB -> 5ms, 10KB -> 17ms, 100KB -> 33ms, 1MB -> 50ms, 10MB -> 66ms
     1824            log.info("queueing %4i, size=%i, max_send_delay=%4i", damage_packet_sequence, ldata, max_send_delay)
     1825            t = self.timeout_add(max_send_delay, self.congestion_timer_event, damage_packet_sequence)
     1826            self.congestion_timers[damage_packet_sequence] = t
    17691827        self.queue_packet(packet, self.wid, width*height, start_send, damage_packet_sent, self.get_fail_cb(packet))
    17701828
     1829    def congestion_timer_event(self, damage_packet_sequence):
     1830        log.info("congestion_timer_event() damage_packet_sequence=%s, late packets=%s", damage_packet_sequence, self.global_statistics.late_packets)
     1831        try:
     1832            del self.congestion_timers[damage_packet_sequence]
     1833        except:
     1834            pass
     1835        self.global_statistics.late_packets.add((self.wid, damage_packet_sequence))
     1836        self.global_statistics.last_congestion_time = monotonic_time()
     1837        return False
     1838
     1839    def cancel_congestion_timers(self):
     1840        timers = list(self.congestion_timers.values())
     1841        self.congestion_timers = {}
     1842        for t in timers:
     1843            self.source_remove(t)
     1844
     1845    def cancel_congestion_timer(self, damage_packet_sequence):
     1846        t = self.congestion_timers.get(damage_packet_sequence)
     1847        log("cancel_congestion_timer(%i) timer=%s", damage_packet_sequence, t)
     1848        if t:
     1849            self.source_remove(t)
     1850            try:
     1851                del self.congestion_timers[damage_packet_sequence]
     1852            except:
     1853                pass
     1854        #record last_congestion_time before removing
     1855        #what is potentially the last item from late_packets
     1856        self.global_statistics.last_congestion_time = monotonic_time()
     1857        late_packets = self.global_statistics.late_packets
     1858        l = len(late_packets)
     1859        try:
     1860            late_packets.remove((self.wid, damage_packet_sequence))
     1861        except KeyError:
     1862            #congestion event had not fired yet
     1863            return
     1864        if l>0 and len(late_packets)==0:
     1865            log.info("cancel_congestion_timer(%i) last congestion sequence cleared, trying to send delayed region", damage_packet_sequence)
     1866            self.may_send_delayed()
     1867        else:
     1868            log.info("cancel_congestion_timer(%i) remaining congestion sequences=%s", damage_packet_sequence, late_packets)
     1869
    17711870    def get_fail_cb(self, packet):
    17721871        def resend():
    17731872            log("paint packet failure, resending")
  • xpra/server/window/window_stats.py

     
    6262        self.last_recalculate = 0
    6363        self.damage_events_count = 0
    6464        self.packet_count = 0
     65        self.last_sequence_queued = 0
     66        self.last_sequence_sending = 0
    6567
    6668        self.last_resized = 0
    6769
     
    153155        info = {"damage"    : {"events"         : self.damage_events_count,
    154156                               "packets_sent"   : self.packet_count,
    155157                               "target-latency" : int(1000*self.target_latency),
     158                               "last-queued"    : self.last_sequence_queued,
     159                               "last-sending"   : self.last_sequence_sending,
    156160                               }
    157161                }
    158162        #encoding stats:
  • xpra/server/window/window_video_source.py

     
    463463        #take into account how many pixels need to be encoded:
    464464        #more pixels means we switch to lossless more easily
    465465        lossless_q = min(100, self._lossless_threshold_base + self._lossless_threshold_pixel_boost * pixel_count / (ww*wh))
    466         if quality<lossless_q and self.image_depth>16 and "jpeg" in options:
     466        lce = monotonic_time()-self.global_statistics.last_congestion_time
     467        if (quality<lossless_q or self.global_statistics.late_packets or lce<1.0) and self.image_depth>16 and "jpeg" in options:
    467468            #assume that we have "turbojpeg",
    468469            #which beats everything in terms of efficiency for lossy compression:
    469470            return "jpeg"