xpra icon
Bug tracker and wiki

This bug tracker and wiki are being discontinued
please use https://github.com/Xpra-org/xpra instead.


Ticket #132: xpra-unthreadeddamage.patch

File xpra-unthreadeddamage.patch, 15.5 KB (added by Antoine Martin, 9 years ago)

removes threading from damage codepath

  • src/xpra/server.py

    ### Eclipse Workspace Patch 1.0
    #P Xpra
     
    242242        self._mmap_size = mmap_size
    243243        self._mmap_bytes_sent = 0
    244244        protocol.source = self
    245         self._damage_request_queue = Queue()
    246245        self._damage_data_queue = Queue()
    247246        self._damage_packet_queue = Queue()
    248247
     
    256255            t.daemon = True
    257256            t.start()
    258257            return t
    259         self._damagedata_thread = start_daemon_thread(self.damage_to_data, "damage_to_data")
    260258        self._datapacket_thread = start_daemon_thread(self.data_to_packet, "data_to_packet")
    261259
    262260    def close(self):
    263261        self._closed = True
    264         self._damage_request_queue.put(None, block=False)
    265262        self._damage_data_queue.put(None, block=False)
    266263        self.video_encoder_cleanup()
    267264
     
    360357            return batch.encoding or self._encoding
    361358        return self._encoding
    362359
     360    def is_cancelled(self, wid, sequence):
     361        return sequence>0 and self._damage_cancelled.get(wid, 0)>=sequence
     362
     363    def get_window_pixmap(self, wid, window, sequence):
     364        # It's important to acknowledge changes *before* we extract them,
     365        # to avoid a race condition.
     366        window.acknowledge_changes()
     367        if self.is_cancelled(wid, sequence):
     368            log("get_window_pixmap: dropping damage request with sequence=%s", sequence)
     369            return  None
     370        pixmap = window.get_property("client-contents")
     371        if pixmap is None and not self.is_cancelled(wid, sequence):
     372            log.error("send_delayed_regions: wtf, pixmap is None for window %s, wid=%s", window, wid)
     373        return pixmap
     374
    363375    def damage(self, wid, window, x, y, w, h, options=None):
    364376        """ decide what to do with the damage area:
    365377            * send it now (if not congested or batch.enabled is off)
     
    378390        def damage_now(reason):
    379391            self._sequence += 1
    380392            log("damage(%s, %s, %s, %s, %s) %s, sending now with sequence %s", wid, x, y, w, h, reason, self._sequence)
    381             region = gtk.gdk.Region()
    382             region.union_with_rect(gtk.gdk.Rectangle(x, y, w, h))
    383             item = wid, window, region, self._sequence, options
    384             self._damage_request_queue.put(item)
    385             batch.last_delays.append(0)
     393            coding = self.get_encoding(wid)
     394            pixmap = self.get_window_pixmap(wid, window, self._sequence)
     395            if pixmap:
     396                self._process_damage_region(pixmap, wid, x, y, w, h, coding, self._sequence, options)
     397                batch.last_delays.append(0)
    386398        #record this damage event in the damage_last_events queue:
    387399        now = time.time()
    388400        last_events = self._damage_last_events.setdefault(wid, maxdeque(100))
     
    418430            delayed = self._damage_delayed.get(wid)
    419431            if delayed:
    420432                del self._damage_delayed[wid]
    421                 self._damage_request_queue.put(delayed)
     433                self.send_delayed_regions(*delayed)
    422434                log("moving region %s to expired list", delayed)
    423435            else:
    424436                log("window %s already removed from delayed list?", wid)
     
    427439        batch.last_delays.append(batch.delay)
    428440        gobject.timeout_add(int(batch.delay), send_delayed)
    429441
     442    def send_delayed_regions(self, wid, window, damage, sequence, options):
     443        log.info("send_delayed_regions: processing sequence=%s", sequence)
     444        if self.is_cancelled(wid, sequence):
     445            log("send_delayed_regions: dropping request with sequence=%s", sequence)
     446            return
     447        regions = []
     448        coding = self.get_encoding(wid)
     449        is_or = isinstance(window, OverrideRedirectWindowModel)
     450        try:
     451            if is_or:
     452                (_, _, ww, wh) = window.get_property("geometry")
     453            else:
     454                ww, wh = window.get_property("actual-size")
     455        except KeyError, e:
     456            ww, wh = 512, 512
     457        def send_full_screen_update():
     458            log.info("send_delayed_regions: using full screen update")
     459            pixmap = self.get_window_pixmap(wid, window, sequence)
     460            if pixmap:
     461                self._process_damage_region(pixmap, wid, 0, 0, ww, wh, coding, sequence, options)
     462
     463        try:
     464            count_threshold = 60
     465            pixels_threshold = ww*wh*9/10
     466            packet_cost = 1024
     467            if self._mmap and self._mmap_size>0:
     468                #with mmap, we can move lots of data around easily
     469                #so favour large screen updates over many small packets
     470                pixels_threshold = ww*wh/2
     471                packet_cost = 4096
     472            pixel_count = 0
     473            while not damage.empty():
     474                try:
     475                    if self.is_cancelled(wid, sequence):
     476                        return
     477                    (x, y, w, h) = get_rectangle_from_region(damage)
     478                    pixel_count += w*h
     479                    #favor full screen updates over many regions:
     480                    #x264 and vpx need full screen updates all the time
     481                    if len(regions)>count_threshold or pixel_count+packet_cost*len(regions)>=pixels_threshold or coding in ["x264", "vpx"]:
     482                        send_full_screen_update()
     483                        return
     484                    regions.append((x, y, w, h))
     485                    rect = gtk.gdk.Rectangle(x, y, w, h)
     486                    damage.subtract(gtk.gdk.region_rectangle(rect))
     487                except ValueError:
     488                    log.error("send_delayed_regions: damage is empty: %s", damage)
     489                    break
     490            log("send_delayed_regions: to regions: %s items, %s pixels", len(regions), pixel_count)
     491        except Exception, e:
     492            log.error("send_delayed_regions: error processing region %s: %s", damage, e)
     493            return
     494        log("send_delayed_regions: regions=%s, sending damage ack", regions)
     495        pixmap = self.get_window_pixmap(wid, window, sequence)
     496        if pixmap is None:
     497            return
     498        log("send_delayed_regions: pixmap size=%s, window size=%s", pixmap.get_size(), (ww, wh))
     499        for region in regions:
     500            x, y, w, h = region
     501            if self.is_cancelled(wid, sequence):
     502                return
     503            self._process_damage_region(pixmap, wid, x, y, w, h, coding, sequence, options)
     504
     505    def _process_damage_region(self, pixmap, wid, x, y, w, h, coding, sequence, options):
     506        data = _get_rgb_rawdata(wid, pixmap, x, y, w, h, coding, sequence, options)
     507        if data:
     508            log("process_damage_regions: adding pixel data %s to queue, queue size=%s", data[:6], self._damage_data_queue.qsize())
     509            self._damage_data_queue.put(data)
     510
     511    def data_to_packet(self):
     512        while not self._closed:
     513            item = self._damage_data_queue.get(True)
     514            if item is None:
     515                return              #empty marker
     516            try:
     517                packet = self.make_data_packet(item)
     518                if packet:
     519                    log("data_to_packet: adding to packet queue, size=%s, full=%s", self._damage_packet_queue.qsize(), self._damage_packet_queue.full())
     520                    if self._damage_packet_queue.full():
     521                        self._protocol.source_has_more()
     522                    self._damage_packet_queue.put(packet)
     523                    self._protocol.source_has_more()
     524            except Exception, e:
     525                log.error("error processing damage data: %s", e, exc_info=True)
     526
     527
    430528    def calculate_batch_delay(self, wid, batch):
    431529        now = time.time()
    432530        if batch.last_updated+0.025>=now:
     
    435533        def update_batch_delay(reason, factor=1, delta=0):
    436534            batch.delay = max(batch.min_delay, min(batch.max_delay, int(100.0*batch.delay*factor)/100.0)-delta)
    437535            batch.last_updated = now
    438             log("update_batch_delay: %s, wid=%s, factor=%s, delta=%s, new batch delay=%s", reason, wid, factor, delta, batch.delay)
     536            log.info("update_batch_delay: %s, wid=%s, factor=%s, delta=%s, new batch delay=%s", reason, wid, factor, delta, batch.delay)
    439537
    440538        last_delta = self.last_client_delta
    441539        delta = self._damage_packet_sequence-self.last_client_packet_sequence
     
    444542            if self._damage_packet_queue.qsize()>3:
    445543                #packets ready for sending by network layer
    446544                update_batch_delay("damage packet queue overflow: %s" % self._damage_packet_queue.qsize(), logp2(self._damage_packet_queue.qsize()-2))
    447             if self._damage_request_queue.qsize()>3:
    448                 #processes damage requests and places them on the damage_data_queue
    449                 update_batch_delay("damage request queue overflow: %s" % self._damage_request_queue.qsize(), logp10(self._damage_request_queue.qsize()-2))
    450545            if self._damage_data_queue.qsize()>3:
    451546                #contains pixmaps before they get converted to a packet that goes to the damage_packet_queue
    452547                update_batch_delay("damage data queue overflow: %s" % self._damage_data_queue.qsize(), logp10(self._damage_data_queue.qsize()-2))
     
    485580            return update_batch_delay("client is %s packets behind, up from %s" % (packets_due, last_packets_due), logp10(1.0*packets_due/(1+last_packets_due)))
    486581        return update_batch_delay("client is %s pixels behind, from %s last time around" % (pixels_behind, last_pixels_behind), min(2.0, logp2(1.0*pixels_behind/last_pixels_behind)))
    487582
    488     def damage_to_data(self):
    489         """ pick items off the damage_request_queue
    490             and places the damage pixel data in the _damage_data_queue.
    491             this method runs in a thread but most of the actual processing
    492             is done in process_regions() which runs in the gtk main thread
    493             via idle_add.
    494         """
    495         while not self._closed:
    496             damage_request = self._damage_request_queue.get(True)
    497             if damage_request is None:
    498                 return              #empty marker
    499             wid, window, damage, sequence, options = damage_request
    500             log("damage_to_data: processing sequence=%s", sequence)
    501             if self._damage_cancelled.get(wid, 0)>=sequence:
    502                 log("damage_to_data: dropping request with sequence=%s", sequence)
    503                 continue
    504             regions = []
    505             coding = self.get_encoding(wid)
    506             is_or = isinstance(window, OverrideRedirectWindowModel)
    507             try:
    508                 if is_or:
    509                     (_, _, ww, wh) = window.get_property("geometry")
    510                 else:
    511                     ww, wh = window.get_property("actual-size")
    512             except KeyError, e:
    513                 ww, wh = 512, 512
    514             try:
    515                 pixels_threshold = ww*wh*9/10
    516                 packet_cost = 1024
    517                 if self._mmap and self._mmap_size>0:
    518                     #with mmap, we can move lots of data around easily
    519                     #so favour large screen updates over many small packets
    520                     pixels_threshold = ww*wh/2
    521                     packet_cost = 4096
    522                 pixel_count = 0
    523                 while not damage.empty():
    524                     try:
    525                         (x, y, w, h) = get_rectangle_from_region(damage)
    526                         pixel_count += w*h
    527                         #favor full screen updates over many regions:
    528                         #x264 and vpx need full screen updates all the time
    529                         if pixel_count+packet_cost*len(regions)>=pixels_threshold or coding in ["x264", "vpx"]:
    530                             regions = [(0, 0, ww, wh, True)]
    531                             break
    532                         regions.append((x, y, w, h, False))
    533                         rect = gtk.gdk.Rectangle(x, y, w, h)
    534                         damage.subtract(gtk.gdk.region_rectangle(rect))
    535                     except ValueError:
    536                         log.error("damage_to_data: damage is empty: %s", damage)
    537                         break
    538                 log("damage_to_data: to regions: %s items, %s pixels", len(regions), pixel_count)
    539             except Exception, e:
    540                 log.error("damage_to_data: error processing region %s: %s", damage, e)
    541                 continue
    542             gobject.idle_add(self._process_damage_regions, wid, window, ww, wh, regions, coding, sequence, options)
    543             time.sleep(0)
    544583
    545     def _process_damage_regions(self, wid, window, ww, wh, regions, coding, sequence, options):
    546         if self._damage_cancelled.get(wid, 0)>=sequence:
    547             log("process_damage_regions: dropping damage request with sequence=%s", sequence)
    548             return
    549         # It's important to acknowledge changes *before* we extract them,
    550         # to avoid a race condition.
    551         log("process_damage_regions: regions=%s, sending damage ack", regions)
    552         window.acknowledge_changes()
    553         pixmap = window.get_property("client-contents")
    554         if pixmap is None:
    555             log.error("wtf, pixmap is None for window %s, wid=%s", window, wid)
    556             return
    557         log("process_damage_regions: pixmap size=%s, window size=%s", pixmap.get_size(), (ww, wh))
    558         for region in regions:
    559             (x, y, w, h, full_window) = region
    560             if full_window:
    561                 log("process_damage_regions: sending full window: %s", pixmap.get_size())
    562                 w, h = pixmap.get_size()
    563             data = _get_rgb_rawdata(wid, pixmap, x, y, w, h, coding, sequence, options)
    564             if data:
    565                 log("process_damage_regions: adding pixel data %s to queue, queue size=%s", data[:6], self._damage_data_queue.qsize())
    566                 self._damage_data_queue.put(data)
    567 
    568     def data_to_packet(self):
    569         while not self._closed:
    570             item = self._damage_data_queue.get(True)
    571             if item is None:
    572                 return              #empty marker
    573             try:
    574                 packet = self.make_data_packet(item)
    575                 if packet:
    576                     log("data_to_packet: adding to packet queue, size=%s, full=%s", self._damage_packet_queue.qsize(), self._damage_packet_queue.full())
    577                     if self._damage_packet_queue.full():
    578                         self._protocol.source_has_more()
    579                     self._damage_packet_queue.put(packet)
    580                     self._protocol.source_has_more()
    581             except Exception, e:
    582                 log.error("error processing damage data: %s", e, exc_info=True)
    583 
    584584    def make_data_packet(self, item):
    585585        wid, x, y, w, h, coding, data, rowstride, sequence, options = item
    586         if sequence>=0 and self._damage_cancelled.get(wid, 0)>=sequence:
     586        if sequence>=0 and self.is_cancelled(wid, sequence):
    587587            log("make_data_packet: dropping data packet for window %s with sequence=%s", wid, sequence)
    588588            return  None
    589589        log("make_data_packet: damage data: %s", (wid, x, y, w, h, coding))
     
    637637
    638638        #check cancellation list again since the code above may take some time:
    639639        #but always send mmap data so we can reclaim the space!
    640         if coding!="mmap" and sequence>=0 and self._damage_cancelled.get(wid, 0)>=sequence:
     640        if coding!="mmap" and sequence>=0 and self.is_cancelled(wid, sequence):
    641641            log("make_data_packet: dropping data packet for window %s with sequence=%s", wid, sequence)
    642642            return  None
    643643        #actual network packet: