xpra icon
Bug tracker and wiki

Ticket #999: bandwidth-congestion-handling-v6.patch

File bandwidth-congestion-handling-v6.patch, 33.9 KB (added by Antoine Martin, 22 months ago)

updated patch

  • xpra/server/cystats.pyx

     
    107107        rw += w
    108108    return int(tv / tw), int(rv / rw)
    109109
    110 def calculate_timesize_weighted_average(data, float sizeunit=1.0):
     110def calculate_timesize_weighted_average(data, float unit=1.0):
     111    #the value is elapsed time,
     112    #so we want to divide by the value:
     113    recs = tuple((a,b,unit/c) for a,b,c in data)
     114    return calculate_size_weighted_average(recs)
     115
     116def calculate_size_weighted_average(data):
    111117    """
    112118        This is a time weighted average where the size
    113119        of each record also gives it a weight boost.
    114120        This is to prevent small packets from skewing the average.
    115         Data format: (event_time, size, elapsed_time)
     121        Data format: (event_time, size, value)
    116122    """
    117123    cdef double size_avg = sum(x for _, x, _ in data)/len(data)
    118124    cdef double now = monotonic_time()              #@DuplicatedSignature
     
    123129    cdef double event_time                          #@DuplicatedSignature
    124130    cdef double size
    125131    cdef double size_ps
    126     cdef double elapsed_time
     132    cdef double value
    127133    cdef double pw
    128134    cdef double w                                   #@DuplicatedSignature
    129135    cdef double delta                               #@DuplicatedSignature
    130     for event_time, size, elapsed_time in data:
    131         if elapsed_time<=0:
     136    for event_time, size, value in data:
     137        if value<=0:
    132138            continue        #invalid record
    133139        delta = now-event_time
    134140        pw = clogp(size/size_avg)
    135         size_ps = max(1, size*sizeunit/elapsed_time)
     141        size_ps = max(1, size*value)
    136142        w = pw/(1.0+delta)
    137143        tv += w*size_ps
    138         tw += w
     144        tw += w*size
    139145        w = pw/(0.1+delta**2)
    140146        rv += w*size_ps
    141         rw += w
     147        rw += w*size
    142148    return float(tv / tw), float(rv / rw)
    143149
    144150def calculate_for_target(metric, float target_value, float avg_value, float recent_value, float aim=0.5, float div=1.0, float slope=0.1, smoothing=logp, float weight_multiplier=1.0):
  • xpra/server/source.py

     
    6464NEW_STREAM_SOUND = envbool("XPRA_NEW_STREAM_SOUND", True)
    6565PING_DETAILS = envbool("XPRA_PING_DETAILS", True)
    6666PING_TIMEOUT = envint("XPRA_PING_TIMEOUT", 60)
     67CONGESTION_AVOIDANCE = envbool("XPRA_CONGESTION_AVOIDANCE", True)
    6768
    6869PRINTER_LOCATION_STRING = os.environ.get("XPRA_PRINTER_LOCATION_STRING", "via xpra")
    6970PROPERTIES_DEBUG = [x.strip() for x in os.environ.get("XPRA_WINDOW_PROPERTIES_DEBUG", "").split(",")]
     
    442443        self.double_click_time  = -1
    443444        self.double_click_distance = -1, -1
    444445        self.bandwidth_limit = self.server_bandwidth_limit
     446        self.soft_bandwidth_limit = self.bandwidth_limit
    445447        #what we send back in hello packet:
    446448        self.ui_client = True
    447449        self.wants_aliases = True
     
    508510
    509511
    510512    def update_bandwidth_limits(self):
    511         if self.bandwidth_limit<=0 or self.mmap_size>0:
     513        if self.mmap_size>0:
    512514            return
     515        #calculate soft bandwidth limit based on send congestion data:
     516        bandwidth_limit = 0
     517        if CONGESTION_AVOIDANCE:
     518            bandwidth_limit = self.statistics.avg_congestion_send_speed
     519            log.warn("avg_congestion_send_speed=%s", bandwidth_limit)
     520            if bandwidth_limit>20*1024*1024:
     521                #ignore congestion speed if greater 20Mbps
     522                bandwidth_limit = 0
     523        if self.bandwidth_limit>0:
     524            #command line options could overrule what we detect?
     525            bandwidth_limit = min(self.bandwidth_limit, bandwidth_limit)
     526        self.soft_bandwidth_limit = bandwidth_limit
     527        log.warn("update_bandwidth_limits() bandwidth_limit=%s, soft bandwidth limit=%s", self.bandwidth_limit, bandwidth_limit)
     528        if self.soft_bandwidth_limit<=0:
     529            return
    513530        #figure out how to distribute the bandwidth amongst the windows,
    514531        #we use the window size,
    515532        #(we should actually use the number of bytes actually sent: framerate, compression, etc..)
     
    527544        for wid, ws in self.window_sources.items():
    528545            weight = window_weight.get(wid)
    529546            if weight is not None:
    530                 ws.bandwidth_limit = max(1, self.bandwidth_limit*weight/total_weight)
     547                ws.bandwidth_limit = max(1, bandwidth_limit*weight/total_weight)
    531548
    532549    def recalculate_delays(self):
    533550        """ calls update_averages() on ServerSource.statistics (GlobalStatistics)
     
    538555        if self.is_closed():
    539556            return
    540557        self.calculate_last_time = monotonic_time()
     558        self.statistics.update_averages()
    541559        self.update_bandwidth_limits()
    542         self.statistics.update_averages()
    543560        wids = list(self.calculate_window_ids)  #make a copy so we don't clobber new wids
    544561        focus = self.get_focus()
    545562        sources = self.window_sources.items()
     
    16201637                "suspended"         : self.suspended,
    16211638                "counter"           : self.counter,
    16221639                "hello-sent"        : self.hello_sent,
    1623                 "bandwidth-limit"   : self.bandwidth_limit,
     1640                "bandwidth-limit"   : {
     1641                    "setting"       : self.bandwidth_limit,
     1642                    "actual"        : self.soft_bandwidth_limit,
     1643                    }
    16241644                }
    16251645        if self.desktop_mode_size:
    16261646            info["desktop_mode_size"] = self.desktop_mode_size
  • xpra/server/source_stats.py

     
    1212from xpra.log import Logger
    1313log = Logger("stats")
    1414
    15 from xpra.server.cystats import logp, calculate_time_weighted_average, calculate_for_target, queue_inspect  #@UnresolvedImport
     15from xpra.server.cystats import logp, calculate_time_weighted_average, calculate_size_weighted_average, calculate_for_target, queue_inspect  #@UnresolvedImport
    1616from xpra.simple_stats import get_list_stats
    1717from xpra.os_util import monotonic_time
    1818
     
    5252                                                            #(event_time, elapsed_time_in_seconds)
    5353        self.server_ping_latency = deque(maxlen=NRECS)      #time it took for the client to get a ping_echo back from us:
    5454                                                            #(event_time, elapsed_time_in_seconds)
     55        self.congestion_send_speed = deque(maxlen=4*NRECS)  #when we are being throttled, record what speed we are sending at
     56                                                            #last NRECS: (event_time, no of pixels, duration)
     57        self.late_packets = set()
    5558        self.client_load = None
    5659        self.damage_events_count = 0
    5760        self.packet_count = 0
    5861        self.decode_errors = 0
     62        self.last_congestion_time = 0
    5963        #these values are calculated from the values above (see update_averages)
    6064        self.min_client_latency = self.DEFAULT_LATENCY
    6165        self.avg_client_latency = self.DEFAULT_LATENCY
     
    6670        self.min_server_ping_latency = self.DEFAULT_LATENCY
    6771        self.avg_server_ping_latency = self.DEFAULT_LATENCY
    6872        self.recent_server_ping_latency = self.DEFAULT_LATENCY
     73        self.avg_congestion_send_speed = 0
    6974
    7075    def record_latency(self, wid, decode_time, start_send_at, end_send_at, pixels, bytecount):
    7176        now = monotonic_time()
     
    98103            data = list(self.server_ping_latency)
    99104            self.min_server_ping_latency = min([x for _,x in data])
    100105            self.avg_server_ping_latency, self.recent_server_ping_latency = calculate_time_weighted_average(data)
     106        #set to 0 if we have less than 2 events in the last 60 seconds:
     107        min_time = monotonic_time()-60
     108        css = tuple(x for x in self.congestion_send_speed if x[0]>min_time)
     109        if len(css)<=1:
     110            self.avg_congestion_send_speed = 0
     111        else:
     112            self.avg_congestion_send_speed = int(calculate_size_weighted_average(list(self.congestion_send_speed))[0])
    101113
    102114    def get_factors(self, pixel_count):
    103115        factors = []
     
    153165    def get_info(self):
    154166        cwqsizes = [x for _,x in list(self.compression_work_qsizes)]
    155167        pqsizes = [x for _,x in list(self.packet_qsizes)]
     168        now = monotonic_time()
     169        time_limit = now-60             #ignore old records (60s)
    156170        info = {"damage" : {
    157171                            "events"        : self.damage_events_count,
    158172                            "packets_sent"  : self.packet_count,
     
    164178                                               },
    165179                            },
    166180                "encoding" : {"decode_errors"   : self.decode_errors},
     181                "congestion" : {
     182                    "avg-send-speed"        : self.avg_congestion_send_speed,
     183                    "last-congestion-event" : self.last_congestion_time,
     184                    "late-packets-per-minute" : sum(1 for x in self.late_packets if x>time_limit),
     185                                },
    167186            }
    168187        #client pixels per second:
    169         now = monotonic_time()
    170         time_limit = now-30             #ignore old records (30s)
    171188        #pixels per second: decode time and overall
    172189        total_pixels = 0                #total number of pixels processed
    173190        total_time = 0                  #total decoding time
  • xpra/server/window/batch_delay_calculator.py

     
    169169
    170170    #combine factors: use the highest one:
    171171    target = min(1.0, max(dam_lat_abs, dam_lat_rel, dec_lat, pps, 0.0))
     172    #discount for congestion by up to 40%:
     173    lp = len(global_statistics.late_packets)
     174    target = target*max(40, 100-lp*20)/100
    172175
    173176    #scale target between min_speed and 100:
    174177    ms = min(100.0, max(min_speed, 0.0))
     
    191194                                           "target"   : int(target_decode_speed),
    192195                                           "factor"   : int(100.0*dec_lat),
    193196                                           },
     197            "late-packets"              : lp,
    194198            }
    195199    return info, target_speed
    196200
     
    272276            target = min(1.0, target + (1.0-pctpixdamaged*2))
    273277        if pixl5<pixn5:
    274278            target = sqrt(target)
     279    #discount for congestion by up to 40%:
     280    lp = len(global_statistics.late_packets)
     281    target = target*max(40, 100-lp*20)/100
     282    info["late-packets"] = lp
     283
    275284    #apply min-quality:
    276285    mq = min(100.0, max(min_quality, 0.0))
    277286    target_quality = mq + (100.0-mq) * target
  • xpra/server/window/window_source.py

     
    1111import hashlib
    1212import threading
    1313from collections import deque
     14from math import sqrt
    1415
    1516from xpra.os_util import monotonic_time
    1617from xpra.util import envint, envbool, csv
    1718from xpra.log import Logger
    1819log = Logger("window", "encoding")
     20log.enable_debug()
    1921refreshlog = Logger("window", "refresh")
    2022compresslog = Logger("window", "compress")
    2123damagelog = Logger("window", "damage")
     
    3335
    3436MAX_PIXELS_PREFER_RGB = envint("XPRA_MAX_PIXELS_PREFER_RGB", 4096)
    3537
     38CONGESTION_AVOIDANCE = envbool("XPRA_CONGESTION_AVOIDANCE", True)
     39
    3640DELTA = envbool("XPRA_DELTA", True)
    3741MIN_DELTA_SIZE = envint("XPRA_MIN_DELTA_SIZE", 1024)
    3842MAX_DELTA_SIZE = envint("XPRA_MAX_DELTA_SIZE", 32768)
     
    6165from xpra.server.window.batch_config import DamageBatchConfig
    6266from xpra.simple_stats import get_list_stats
    6367from xpra.server.window.batch_delay_calculator import calculate_batch_delay, get_target_speed, get_target_quality
    64 from xpra.server.cystats import time_weighted_average  #@UnresolvedImport
     68from xpra.server.cystats import time_weighted_average, logp #@UnresolvedImport
    6569from xpra.server.window.region import rectangle, add_rectangle, remove_rectangle, merge_all   #@UnresolvedImport
    6670from xpra.codecs.xor.cyxor import xor_str           #@UnresolvedImport
    6771from xpra.server.picture_encode import rgb_encode, mmap_send, argb_swap
     
    273277        self.suspended = False
    274278        self.strict = STRICT_MODE
    275279        #
     280        self.congestion_timers = {}
    276281        self.may_send_timer = None
    277282        self.auto_refresh_delay = 0
    278283        self.video_helper = None
     
    359364                                               "pixel_boost"    : self._lossless_threshold_pixel_boost
    360365                                               },
    361366                      })
     367        info["congestion"] = {
     368            "timers"                : len(self.congestion_timers),
     369            }
    362370        try:
    363371            #ie: get_strict_encoding -> "strict_encoding"
    364372            einfo["selection"] = self.get_best_encoding.__name__.replace("get_", "")
     
    890898        self.cancel_refresh_timer()
    891899        self.cancel_timeout_timer()
    892900        self.cancel_av_sync_timer()
     901        self.cancel_congestion_timers()
    893902        #if a region was delayed, we can just drop it now:
    894903        self.refresh_regions = []
    895904        self._damage_delayed = None
     
    11061115        if self.full_frames_only:
    11071116            x, y, w, h = 0, 0, ww, wh
    11081117
     1118        nlate_packets = len(self.global_statistics.late_packets)
    11091119        delayed = self._damage_delayed
    11101120        if delayed:
    11111121            #use existing delayed region:
     
    11241134                        existing_options[k] = options[k]
    11251135            damagelog("damage%-24s wid=%s, using existing delayed %s regions created %.1fms ago",
    11261136                (x, y, w, h, options), self.wid, delayed[3], now-delayed[0])
    1127             if not self.expire_timer and not self.soft_timer and self.soft_expired==0:
    1128                 log.error("Error: bug, found a delayed region without a timer!")
     1137            if not self.expire_timer and not self.soft_timer and self.soft_expired==0 and not nlate_packets and not self.congestion_timers:
     1138                #this can happen after congestion recovery
     1139                log.warn("Warning: found a delayed region without a timer")
    11291140                self.expire_timer = self.timeout_add(0, self.expire_delayed_region, 0)
    11301141            return
    11311142        elif self.batch_config.delay <= self.batch_config.min_delay and not self.batch_config.always:
     
    11491160            delay = max(10, min(self.batch_config.min_delay, delay)) * (qsize/4.0)
    11501161        delay = max(delay, options.get("min_delay", 0))
    11511162        delay = min(delay, options.get("max_delay", self.batch_config.max_delay))
    1152         delay = int(delay)
     1163        delay = int(delay*sqrt(1+nlate_packets))
    11531164        packets_backlog = self.statistics.get_packets_backlog()
    11541165        pixels_encoding_backlog, enc_backlog_count = self.statistics.get_pixels_encoding_backlog()
    11551166        #only send without batching when things are going well:
     
    11561167        # - no packets backlog from the client
    11571168        # - the amount of pixels waiting to be encoded is less than one full frame refresh
    11581169        # - no more than 10 regions waiting to be encoded
    1159         if not self.must_batch(delay) and (packets_backlog==0 and pixels_encoding_backlog<=ww*wh and enc_backlog_count<=10):
     1170        if not self.must_batch(delay) and (packets_backlog==0 and pixels_encoding_backlog<=ww*wh and enc_backlog_count<=10 and nlate_packets==0):
    11601171            #send without batching:
    11611172            damagelog("damage%-24s wid=%s, sending now with sequence %s", (x, y, w, h, options), self.wid, self._sequence)
    11621173            actual_encoding = options.get("encoding")
     
    12071218        delayed = self._damage_delayed
    12081219        if not delayed:
    12091220            #region has been sent
    1210             return
     1221            return False
    12111222        self.cancel_may_send_timer()
    1212         self.may_send_delayed()
    1213         delayed = self._damage_delayed
    1214         if not delayed:
    1215             #got sent
    1216             return
    1217         #the region has not been sent yet because we are waiting for damage ACKs from the client
     1223        lp = len(self.global_statistics.late_packets)
     1224        if lp==0:
     1225            self.may_send_delayed()
     1226            delayed = self._damage_delayed
     1227            if not delayed:
     1228                #got sent
     1229                return False
     1230        #the region has not been sent yet,
     1231        #because we are waiting for damage ACKs from the client,
     1232        #or even waiting to send some packets
    12181233        if self.soft_expired<self.max_soft_expired:
    1219             #there aren't too many regions soft expired yet
     1234            #there aren't too many regions soft expired yet,
    12201235            #so use the "soft timer":
    12211236            self.soft_expired += 1
    12221237            #we have already waited for "delay" to get here, wait more as we soft expire more regions:
    1223             self.soft_timer = self.timeout_add(int(self.soft_expired*delay), self.delayed_region_soft_timeout)
     1238            soft_delay = int(delay*max(self.soft_expired, sqrt(1+lp)))
     1239            log("expire_delayed_region() soft delay=%i, soft_expired=%i, late packets=%i", soft_delay, self.soft_expired, lp)
     1240            self.soft_timer = self.timeout_add(soft_delay, self.delayed_region_soft_timeout, lp)
    12241241        else:
    12251242            #NOTE: this should never happen...
    12261243            #the region should now get sent when we eventually receive the pending ACKs
     
    12271244            #but if somehow they go missing... clean it up from a timeout:
    12281245            delayed_region_time = delayed[0]
    12291246            self.timeout_timer = self.timeout_add(self.batch_config.timeout_delay, self.delayed_region_timeout, delayed_region_time)
     1247        return False
    12301248
    1231     def delayed_region_soft_timeout(self):
     1249    def delayed_region_soft_timeout(self, lp):
     1250        #FIXME: if we have many late packets, re-schedule
     1251        # but need to know if they belong to this window to know how...
     1252        # (wait for ack vs timer)
     1253        new_lp = len(self.global_statistics.late_packets)
     1254        log("delayed_region_soft_timeout(%i) new lp=%i, late packets=%s", lp, new_lp, self.global_statistics.late_packets)
     1255        if lp>0 and new_lp>=lp:
     1256            #not getting better - re-schedule
     1257            return True
    12321258        self.soft_timer = None
    12331259        self.do_send_delayed()
    12341260        return False
     
    12531279        if dap:
    12541280            log.warn(" %i late responses:", len(dap))
    12551281            for seq in sorted(dap.keys()):
    1256                 ack_data = dap.get(seq)
    1257                 if ack_data and ack_data[0]>0:
     1282                ack_data = dap[seq]
     1283                if ack_data[0]>0:
    12581284                    log.warn(" %6i %-5s: %3is", seq, ack_data[1], now-ack_data[3])
    12591285        #re-try: cancel anything pending and do a full quality refresh
    12601286        self.cancel_damage()
     
    12771303        if not dd:
    12781304            log("window %s delayed region already sent", self.wid)
    12791305            return
     1306        stats = self.statistics
     1307        if not stats:
     1308            #closing
     1309            return
    12801310        damage_time = dd[0]
    1281         packets_backlog = self.statistics.get_packets_backlog()
    12821311        now = monotonic_time()
    12831312        actual_delay = int(1000.0 * (now-damage_time))
     1313        def check_again(delay=actual_delay/10.0):
     1314            #schedules a call to check again:
     1315            delay = int(min(self.batch_config.max_delay, max(10, delay)))
     1316            self.may_send_timer = self.timeout_add(delay, self._may_send_delayed)
     1317            return
     1318        lp = len(self.global_statistics.late_packets)
     1319        if lp:
     1320           
     1321            #reduce the backlog tolerance:
     1322            pass
     1323        latency_tolerance_pct = 100//(1+lp)
     1324        packets_backlog = self.statistics.get_packets_backlog(latency_tolerance_pct)
    12841325        if packets_backlog>0:
    12851326            if actual_delay>self.batch_config.timeout_delay:
    12861327                log.warn("send_delayed for wid %s, elapsed time %ims is above limit of %.1f", self.wid, actual_delay, self.batch_config.max_delay)
     
    12921333        #if we're here, there is no packet backlog, but there may be damage acks pending or a bandwidth limit to honour,
    12931334        #if there are acks pending, may_send_delayed() should be called again from damage_packet_acked,
    12941335        #if not, we must either process the region now or set a timer to check again later
    1295         def check_again(delay=actual_delay/10.0):
    1296             #schedules a call to check again:
    1297             delay = int(min(self.batch_config.max_delay, max(10, delay)))
    1298             self.may_send_timer = self.timeout_add(delay, self._may_send_delayed)
    1299             return
    13001336        #locked means a fixed delay we try to honour,
    13011337        #this code ensures that we don't fire too early if called from damage_packet_acked
    13021338        if self.batch_config.locked:
     
    13071343            else:
    13081344                self.do_send_delayed()
    13091345            return
    1310         if self.bandwidth_limit>0:
    1311             used = self.statistics.get_bits_encoded()
    1312             log("may_send_delayed() bandwidth limit=%i, used=%i : %i%%", self.bandwidth_limit, used, 100*used//self.bandwidth_limit)
    1313             if used>=self.bandwidth_limit:
     1346        bwl = self.bandwidth_limit
     1347        if bwl>0:
     1348            #log("may_send_delayed() last 5 seconds: bandwidth limit=%i, used=%i : %i%%", bwl, used5, 100*used5//bwl)
     1349            used = self.statistics.get_bitrate(1) or self.statistics.get_bitrate(5)
     1350            log("may_send_delayed() last 1 second:  bandwidth limit=%i, used=%i : %i%%", bwl, used, 100*used//bwl)
     1351            if used>=bwl:
    13141352                check_again(50)
    13151353                return
    13161354        pixels_encoding_backlog, enc_backlog_count = self.statistics.get_pixels_encoding_backlog()
     
    13311369        self.do_send_delayed()
    13321370
    13331371    def do_send_delayed(self):
     1372        import traceback
     1373        traceback.print_stack(limit=4)
    13341374        self.cancel_timeout_timer()
    13351375        self.cancel_soft_timer()
    13361376        delayed = self._damage_delayed
     
    13681408
    13691409        def send_full_window_update():
    13701410            actual_encoding = get_encoding(ww*wh)
     1411            if actual_encoding=="png":
     1412                log.error("send_full_window_update() %s(%i)=%s, get_best_encoding=%s", get_encoding, ww*wh, actual_encoding, get_best_encoding)
     1413                import traceback
     1414                traceback.print_stack(limit=5)
    13711415            log("send_delayed_regions: using full window update %sx%s with %s", ww, wh, actual_encoding)
    13721416            assert actual_encoding is not None
    13731417            self.process_damage_region(damage_time, 0, 0, ww, wh, actual_encoding, options)
     
    15961640                #delay less when speed is high
    15971641                #(the resulting range is 100*100 to 200*200)
    15981642                qsmult = (200-self._current_quality) * (100+self._current_speed)
     1643                #when there is congestion, the auto-refresh can kill the bandwidth:
     1644                qsmult *= sqrt(1+len(self.global_statistics.late_packets))
    15991645                #important: must check both, I think:
    16001646                if target_time==0 or not self.refresh_timer:
    16011647                    #this means we must schedule the refresh
     
    16471693            We figure out if now is the right time to do the refresh,
    16481694            and if not re-schedule.
    16491695        """
     1696        #FIXME: delay some more when we detect congestion
    16501697        #timer is running now, clear so we don't try to cancel it somewhere else:
    16511698        self.refresh_timer = None
    16521699        #re-do some checks that may have changed:
     
    17301777            - damage latency (via a callback once the packet is actually sent)
    17311778        """
    17321779        #packet = ["draw", wid, x, y, w, h, coding, data, self._damage_packet_sequence, rowstride, client_options]
     1780        statistics = self.statistics
     1781        gs = self.global_statistics
     1782        now = monotonic_time()
    17331783        width = packet[4]
    17341784        height = packet[5]
    17351785        coding = packet[6]
     1786        ldata = len(packet[7])
    17361787        damage_packet_sequence = packet[8]
    17371788        actual_batch_delay = process_damage_time-damage_time
     1789        queue_time = now
    17381790        ack_pending = [0, coding, 0, 0, 0, width*height]
    1739         statistics = self.statistics
    1740         statistics.damage_ack_pending[damage_packet_sequence] = ack_pending
     1791        if statistics:
     1792            statistics.damage_ack_pending[damage_packet_sequence] = ack_pending
     1793        max_send_delay = int(5*logp(ldata/1024.0))
    17411794        def start_send(bytecount):
    17421795            ack_pending[0] = monotonic_time()
    17431796            ack_pending[2] = bytecount
    1744             statistics.last_sequence_sending = damage_packet_sequence
     1797            log.info("queue to send delay (start): %4ims (max=%3i, damage_packet_sequence=%5i)", 1000*(now-queue_time), max_send_delay, damage_packet_sequence)
     1798            if statistics:
     1799                statistics.last_sequence_sending = damage_packet_sequence
    17451800        def damage_packet_sent(bytecount):
     1801            if CONGESTION_AVOIDANCE:
     1802                #use idle_add to prevent race conditions accessing the list of late packets:
     1803                self.idle_add(self.cancel_congestion_timer, damage_packet_sequence)
    17461804            now = monotonic_time()
    17471805            ack_pending[3] = now
    17481806            ack_pending[4] = bytecount
    1749             if process_damage_time>0:
    1750                 damage_out_latency = now-process_damage_time
    1751                 statistics.damage_out_latency.append((now, width*height, actual_batch_delay, damage_out_latency))
    1752         if process_damage_time>0:
    1753             now = monotonic_time()
    1754             damage_in_latency = now-process_damage_time
    1755             statistics.damage_in_latency.append((now, width*height, actual_batch_delay, damage_in_latency))
    1756         #log.info("queuing %s packet with fail_cb=%s", coding, fail_cb)
     1807            if process_damage_time and statistics:
     1808                statistics.damage_out_latency.append((now, width*height, actual_batch_delay, now-process_damage_time))
     1809            if gs:
     1810                elapsed = now-ack_pending[0]
     1811                send_speed = 0
     1812                if elapsed>0:
     1813                    send_speed = int(8*ldata/elapsed)
     1814                log.info("queue to send delay (end)  : %4ims (max=%3i, damage_packet_sequence=%5i) for %4iKB using %6s: %6iMbps, elapsed=%.2fms", 1000*elapsed, max_send_delay, damage_packet_sequence, ldata//1024, coding, send_speed//1024//1024, elapsed*1000)
     1815                if elapsed*1000>max_send_delay and send_speed<100*1024*1024:
     1816                    log.warn("recording congestion send speed=%i", send_speed)
     1817                    gs.congestion_send_speed.append((now, ldata, send_speed))
     1818        if statistics:
     1819            statistics.last_sequence_queued = damage_packet_sequence
     1820            if process_damage_time:
     1821                statistics.damage_in_latency.append((now, width*height, actual_batch_delay, now-process_damage_time))
     1822        if CONGESTION_AVOIDANCE:
     1823            #  1KB -> 5ms, 10KB -> 17ms, 100KB -> 33ms, 1MB -> 50ms, 10MB -> 66ms
     1824            log.info("queueing %4i, size=%i, max_send_delay=%4i", damage_packet_sequence, ldata, max_send_delay)
     1825            t = self.timeout_add(max_send_delay, self.congestion_timer_event, damage_packet_sequence)
     1826            self.congestion_timers[damage_packet_sequence] = t
    17571827        self.queue_packet(packet, self.wid, width*height, start_send, damage_packet_sent, self.get_fail_cb(packet))
    17581828
     1829    def congestion_timer_event(self, damage_packet_sequence):
     1830        log.info("congestion_timer_event() damage_packet_sequence=%s, late packets=%s", damage_packet_sequence, self.global_statistics.late_packets)
     1831        try:
     1832            del self.congestion_timers[damage_packet_sequence]
     1833        except:
     1834            pass
     1835        self.global_statistics.late_packets.add((self.wid, damage_packet_sequence))
     1836        self.global_statistics.last_congestion_time = monotonic_time()
     1837        return False
     1838
     1839    def cancel_congestion_timers(self):
     1840        timers = list(self.congestion_timers.values())
     1841        self.congestion_timers = {}
     1842        for t in timers:
     1843            self.source_remove(t)
     1844
     1845    def cancel_congestion_timer(self, damage_packet_sequence):
     1846        t = self.congestion_timers.get(damage_packet_sequence)
     1847        log("cancel_congestion_timer(%i) timer=%s", damage_packet_sequence, t)
     1848        if t:
     1849            self.source_remove(t)
     1850            try:
     1851                del self.congestion_timers[damage_packet_sequence]
     1852            except:
     1853                pass
     1854        #record last_congestion_time before removing
     1855        #what is potentially the last item from late_packets
     1856        self.global_statistics.last_congestion_time = monotonic_time()
     1857        late_packets = self.global_statistics.late_packets
     1858        l = len(late_packets)
     1859        try:
     1860            late_packets.remove((self.wid, damage_packet_sequence))
     1861        except KeyError:
     1862            #congestion event had not fired yet
     1863            return
     1864        if l>0 and len(late_packets)==0:
     1865            log.info("cancel_congestion_timer(%i) last congestion sequence cleared, trying to send delayed region", damage_packet_sequence)
     1866            self.may_send_delayed()
     1867        else:
     1868            log.info("cancel_congestion_timer(%i) remaining congestion sequences=%s", damage_packet_sequence, late_packets)
     1869
    17591870    def get_fail_cb(self, packet):
    17601871        def resend():
    17611872            log("paint packet failure, resending")
     
    19602071        end = monotonic_time()
    19612072        compresslog("compress: %5.1fms for %4ix%-4i pixels at %4i,%-4i for wid=%-5i using %6s with ratio %5.1f%%  (%5iKB to %5iKB), sequence %5i, client_options=%s",
    19622073                 (end-start)*1000.0, outw, outh, x, y, self.wid, coding, 100.0*csize/psize, psize/1024, csize/1024, self._damage_packet_sequence, client_options)
    1963         self.statistics.encoding_stats.append((end, coding, w*h, bpp, len(data), end-start))
     2074        self.statistics.encoding_stats.append((end, coding, w*h, bpp, csize, end-start))
    19642075        return self.make_draw_packet(x, y, outw, outh, coding, data, outstride, client_options, options)
    19652076
    19662077    def make_draw_packet(self, x, y, outw, outh, coding, data, outstride, client_options={}, options={}):
  • xpra/server/window/window_stats.py

     
    6262        self.last_recalculate = 0
    6363        self.damage_events_count = 0
    6464        self.packet_count = 0
     65        self.last_sequence_queued = 0
     66        self.last_sequence_sending = 0
    6567
    6668        self.last_resized = 0
    6769
     
    8789        #client decode speed:
    8890        if len(self.client_decode_time)>0:
    8991            #the elapsed time recorded is in microseconds, so multiply by 1000*1000 to get the real value:
    90             self.avg_decode_speed, self.recent_decode_speed = calculate_timesize_weighted_average(list(self.client_decode_time), sizeunit=1000*1000)
     92            self.avg_decode_speed, self.recent_decode_speed = calculate_timesize_weighted_average(list(self.client_decode_time), unit=1000*1000)
    9193        #network send speed:
    9294        all_l = [0.1,
    9395                 self.avg_damage_in_latency, self.recent_damage_in_latency,
     
    153155        info = {"damage"    : {"events"         : self.damage_events_count,
    154156                               "packets_sent"   : self.packet_count,
    155157                               "target-latency" : int(1000*self.target_latency),
     158                               "last-queued"    : self.last_sequence_queued,
     159                               "last-sending"   : self.last_sequence_sending,
    156160                               }
    157161                }
    158162        #encoding stats:
     
    244248    def get_acks_pending(self):
    245249        return sum(1 for x in self.damage_ack_pending.values() if x[0]!=0)
    246250
    247     def get_packets_backlog(self):
     251    def get_packets_backlog(self, latency_tolerance_pct=100):
    248252        packets_backlog = 0
    249253        if len(self.damage_ack_pending)>0:
    250             sent_before = monotonic_time()-(self.target_latency+0.020)
     254            sent_before = monotonic_time()-((self.target_latency+0.020)*100.0/latency_tolerance_pct)
    251255            for _, (start_send_at, _, _, end_send_at, _, _) in self.damage_ack_pending.items():
    252256                if end_send_at>0 and start_send_at<=sent_before:
    253257                    packets_backlog += 1
     
    260264            count += 1
    261265        return pixels, count
    262266
    263     def get_bits_encoded(self, elapsed=1):
    264         cutoff = monotonic_time()-elapsed
    265         return sum(v[4] for v in self.encoding_stats if v[0]>cutoff) * 8
     267    def get_bitrate(self, max_elapsed=1):
     268        cutoff = monotonic_time()-max_elapsed
     269        recs = tuple((v[0], v[4]) for v in self.encoding_stats if v[0]>=cutoff)
     270        if len(recs)<2:
     271            return 0
     272        bits = sum(v[1] for v in recs) * 8
     273        elapsed = recs[-1][0]-recs[0][0]
     274        return bits/elapsed
    266275
    267276    def get_damage_pixels(self, elapsed=1):
    268277        cutoff = monotonic_time()-elapsed
  • xpra/server/window/window_video_source.py

     
    463463        #take into account how many pixels need to be encoded:
    464464        #more pixels means we switch to lossless more easily
    465465        lossless_q = min(100, self._lossless_threshold_base + self._lossless_threshold_pixel_boost * pixel_count / (ww*wh))
    466         if quality<lossless_q and self.image_depth>16 and "jpeg" in options:
     466        lce = monotonic_time()-self.global_statistics.last_congestion_time
     467        if (quality<lossless_q or self.global_statistics.late_packets or lce<1.0) and self.image_depth>16 and "jpeg" in options:
    467468            #assume that we have "turbojpeg",
    468469            #which beats everything in terms of efficiency for lossy compression:
    469470            return "jpeg"