xpra icon
Bug tracker and wiki

Ticket #999: bandwidth-congestion-handling.patch

File bandwidth-congestion-handling.patch, 12.2 KB (added by Antoine Martin, 13 months ago)

work in progress patch

  • xpra/server/window/window_source.py

     
    1111import hashlib
    1212import threading
    1313from collections import deque
     14from math import sqrt
    1415
    1516from xpra.os_util import monotonic_time
    1617from xpra.util import envint, envbool, csv
     
    3334
    3435MAX_PIXELS_PREFER_RGB = envint("XPRA_MAX_PIXELS_PREFER_RGB", 4096)
    3536
     37CONGESTION_AVOIDANCE = envbool("XPRA_CONGESTION_AVOIDANCE", True)
     38
    3639DELTA = envbool("XPRA_DELTA", True)
    3740MIN_DELTA_SIZE = envint("XPRA_MIN_DELTA_SIZE", 1024)
    3841MAX_DELTA_SIZE = envint("XPRA_MAX_DELTA_SIZE", 32768)
     
    6164from xpra.server.window.batch_config import DamageBatchConfig
    6265from xpra.simple_stats import get_list_stats
    6366from xpra.server.window.batch_delay_calculator import calculate_batch_delay, get_target_speed, get_target_quality
    64 from xpra.server.cystats import time_weighted_average  #@UnresolvedImport
     67from xpra.server.cystats import time_weighted_average, logp #@UnresolvedImport
    6568from xpra.server.window.region import rectangle, add_rectangle, remove_rectangle, merge_all   #@UnresolvedImport
    6669from xpra.codecs.xor.cyxor import xor_str           #@UnresolvedImport
    6770from xpra.server.picture_encode import rgb_encode, mmap_send, argb_swap
     
    273276        self.suspended = False
    274277        self.strict = STRICT_MODE
    275278        #
     279        self.last_congestion_time = 0
     280        self.congestion_sequences = set()
     281        self.congestion_timers = {}
    276282        self.may_send_timer = None
    277283        self.auto_refresh_delay = 0
    278284        self.video_helper = None
     
    890896        self.cancel_refresh_timer()
    891897        self.cancel_timeout_timer()
    892898        self.cancel_av_sync_timer()
     899        self.cancel_congestion_timers()
    893900        #if a region was delayed, we can just drop it now:
    894901        self.refresh_regions = []
    895902        self._damage_delayed = None
     
    11241131                        existing_options[k] = options[k]
    11251132            damagelog("damage%-24s wid=%s, using existing delayed %s regions created %.1fms ago",
    11261133                (x, y, w, h, options), self.wid, delayed[3], now-delayed[0])
    1127             if not self.expire_timer and not self.soft_timer and self.soft_expired==0:
    1128                 log.error("Error: bug, found a delayed region without a timer!")
     1134            if not self.expire_timer and not self.soft_timer and self.soft_expired==0 and not self.congestion_sequences:
     1135                #this can happen after congestion recovery
     1136                log("found a delayed region without a timer")
    11291137                self.expire_timer = self.timeout_add(0, self.expire_delayed_region, 0)
    11301138            return
    11311139        elif self.batch_config.delay <= self.batch_config.min_delay and not self.batch_config.always:
     
    11491157            delay = max(10, min(self.batch_config.min_delay, delay)) * (qsize/4.0)
    11501158        delay = max(delay, options.get("min_delay", 0))
    11511159        delay = min(delay, options.get("max_delay", self.batch_config.max_delay))
    1152         delay = int(delay)
     1160        delay = int(delay*sqrt(1+len(self.congestion_sequences)))
    11531161        packets_backlog = self.statistics.get_packets_backlog()
    11541162        pixels_encoding_backlog, enc_backlog_count = self.statistics.get_pixels_encoding_backlog()
    11551163        #only send without batching when things are going well:
     
    11871195        self.expire_timer = self.timeout_add(delay, self.expire_delayed_region, delay)
    11881196
    11891197    def must_batch(self, delay):
    1190         if FORCE_BATCH or self.batch_config.always or delay>self.batch_config.min_delay or self.bandwidth_limit>0:
     1198        if FORCE_BATCH or self.batch_config.always or delay>self.batch_config.min_delay or self.bandwidth_limit>0 or self.congestion_sequences:
    11911199            return True
    11921200        try:
    11931201            t, _ = self.batch_config.last_delays[-5]
     
    12091217            #region has been sent
    12101218            return
    12111219        self.cancel_may_send_timer()
     1220        if self.congestion_sequences:
     1221            #network congestion, we will fire may_send_delayed from damage_packet_sent
     1222            return
    12121223        self.may_send_delayed()
    12131224        delayed = self._damage_delayed
    12141225        if not delayed:
     
    12161227            return
    12171228        #the region has not been sent yet because we are waiting for damage ACKs from the client
    12181229        if self.soft_expired<self.max_soft_expired:
    1219             #there aren't too many regions soft expired yet
     1230            #there aren't too many regions soft expired yet,
     1231            #and no congestion either,
    12201232            #so use the "soft timer":
    12211233            self.soft_expired += 1
    12221234            #we have already waited for "delay" to get here, wait more as we soft expire more regions:
     
    12771289        if not dd:
    12781290            log("window %s delayed region already sent", self.wid)
    12791291            return
     1292        if self.congestion_sequences:
     1293            log("window %s, network congestion: %s", self.wid, self.congestion_sequences)
     1294            #we will get an ack and fire again
     1295            return
    12801296        damage_time = dd[0]
    12811297        packets_backlog = self.statistics.get_packets_backlog()
    12821298        now = monotonic_time()
     
    17301746            - damage latency (via a callback once the packet is actually sent)
    17311747        """
    17321748        #packet = ["draw", wid, x, y, w, h, coding, data, self._damage_packet_sequence, rowstride, client_options]
     1749        now = monotonic_time()
     1750        statistics = self.statistics
     1751        #we may have to throttle the encode thread a bit
     1752        #ideally, this would be done in a common place for all windows
     1753        if CONGESTION_AVOIDANCE:
     1754            wait = 0
     1755            if len(self.congestion_sequences)>0:
     1756                wait = 50*sqrt(len(self.congestion_sequences))
     1757            elif self.last_congestion_time>0:
     1758                dct = now-self.last_congestion_time
     1759                if dct<1:
     1760                    #congestion in the last second
     1761                    #wait gradually less from 50ms down to 0ms:
     1762                    wait = 50*(1-dct)
     1763            if statistics:
     1764                notsending = statistics.last_sequence_queued - statistics.last_sequence_sending
     1765                if notsending:
     1766                    #1 -> 20, 2 -> 28, 5 -> 44, 10 -> 63
     1767                    wait = max(wait, 20*sqrt(notsending))
     1768            log.info("queue wait=%3ims", wait)
     1769            if wait>0:
     1770                time.sleep(wait/1000.0)
     1771            del wait
    17331772        width = packet[4]
    17341773        height = packet[5]
    17351774        coding = packet[6]
     1775        ldata = len(packet[7])
    17361776        damage_packet_sequence = packet[8]
    17371777        actual_batch_delay = process_damage_time-damage_time
     1778        queue_time = now
    17381779        ack_pending = [0, coding, 0, 0, 0, width*height]
    1739         statistics = self.statistics
    17401780        statistics.damage_ack_pending[damage_packet_sequence] = ack_pending
    17411781        def start_send(bytecount):
    17421782            ack_pending[0] = monotonic_time()
    17431783            ack_pending[2] = bytecount
     1784            log.info("queue to send delay (start): %4ims (damage_packet_sequence=%5i)", 1000*(now-queue_time), damage_packet_sequence)
    17441785            statistics.last_sequence_sending = damage_packet_sequence
    17451786        def damage_packet_sent(bytecount):
     1787            #use idle_add to prevent race conditions accessing the self.congestion_sequences
     1788            if CONGESTION_AVOIDANCE:
     1789                self.idle_add(self.cancel_congestion_timer, damage_packet_sequence)
    17461790            now = monotonic_time()
    17471791            ack_pending[3] = now
    17481792            ack_pending[4] = bytecount
     
    17491793            if process_damage_time>0:
    17501794                damage_out_latency = now-process_damage_time
    17511795                statistics.damage_out_latency.append((now, width*height, actual_batch_delay, damage_out_latency))
    1752         if process_damage_time>0:
    1753             now = monotonic_time()
    1754             damage_in_latency = now-process_damage_time
    1755             statistics.damage_in_latency.append((now, width*height, actual_batch_delay, damage_in_latency))
    1756         #log.info("queuing %s packet with fail_cb=%s", coding, fail_cb)
     1796            elapsed = now-queue_time
     1797            log.info("queue to send delay (end)  : %4ims (damage_packet_sequence=%5i) for %4iKB using %6s: %6iKB/s", 1000*elapsed, damage_packet_sequence, ldata//1024, coding, ldata//elapsed//1024)
     1798        if statistics:
     1799            statistics.last_sequence_queued = damage_packet_sequence
     1800            if process_damage_time>0:
     1801                damage_in_latency = now-process_damage_time
     1802                statistics.damage_in_latency.append((now, width*height, actual_batch_delay, damage_in_latency))
    17571803        self.queue_packet(packet, self.wid, width*height, start_send, damage_packet_sent, self.get_fail_cb(packet))
     1804        if CONGESTION_AVOIDANCE:
     1805            max_send_delay = int(5*logp(ldata/1024.0))
     1806            #  1KB -> 5ms, 10KB -> 17ms, 100KB -> 33ms, 1MB -> 50ms, 10MB -> 66ms
     1807            log.info("queueing %4i, size=%i, max_send_delay=%4i", damage_packet_sequence, ldata, max_send_delay)
     1808            t = self.timeout_add(max_send_delay, self.congestion_timer_event, damage_packet_sequence)
     1809            self.congestion_timers[damage_packet_sequence] = t
    17581810
     1811    def congestion_timer_event(self, damage_packet_sequence):
     1812        log.info("congestion_timer_event() damage_packet_sequence=%s, congestion_sequences=%s", damage_packet_sequence, self.congestion_sequences)
     1813        try:
     1814            del self.congestion_timers[damage_packet_sequence]
     1815        except:
     1816            pass
     1817        self.congestion_sequences.add(damage_packet_sequence)
     1818        return False
     1819
     1820    def cancel_congestion_timers(self):
     1821        timers = list(self.congestion_timers.values())
     1822        self.congestion_timers = {}
     1823        for t in timers:
     1824            self.source_remove(t)
     1825
     1826    def cancel_congestion_timer(self, damage_packet_sequence):
     1827        t = self.congestion_timers.get(damage_packet_sequence)
     1828        if t:
     1829            self.source_remove(t)
     1830            try:
     1831                del self.congestion_timers[damage_packet_sequence]
     1832            except:
     1833                pass
     1834        l = len(self.congestion_sequences)
     1835        try:
     1836            self.congestion_sequences.remove(damage_packet_sequence)
     1837        except:
     1838            #timer had not fired yet
     1839            return
     1840        if l>0 and len(self.congestion_sequences)==0:
     1841            log.info("cancel_congestion_timer(%i) last congestion sequence cleared, trying to send delayed region", damage_packet_sequence)
     1842            self.last_congestion_time = monotonic_time()
     1843            self.may_send_delayed()
     1844        else:
     1845            log.info("cancel_congestion_timer(%i) remaining congestion sequences=%s", damage_packet_sequence, self.congestion_sequences)
     1846
    17591847    def get_fail_cb(self, packet):
    17601848        def resend():
    17611849            log("paint packet failure, resending")
  • xpra/server/window/window_stats.py

     
    6262        self.last_recalculate = 0
    6363        self.damage_events_count = 0
    6464        self.packet_count = 0
     65        self.last_sequence_queued = 0
     66        self.last_sequence_sending = 0
    6567
    6668        self.last_resized = 0
    6769
  • xpra/server/window/window_video_source.py

     
    463463        #take into account how many pixels need to be encoded:
    464464        #more pixels means we switch to lossless more easily
    465465        lossless_q = min(100, self._lossless_threshold_base + self._lossless_threshold_pixel_boost * pixel_count / (ww*wh))
    466         if quality<lossless_q and self.image_depth>16 and "jpeg" in options:
     466        lce = monotonic_time()-self.last_congestion_time
     467        if (quality<lossless_q or self.congestion_sequences or lce<1.0) and self.image_depth>16 and "jpeg" in options:
    467468            #assume that we have "turbojpeg",
    468469            #which beats everything in terms of efficiency for lossy compression:
    469470            return "jpeg"