xpra icon
Bug tracker and wiki

Ticket #639: udp-v10.patch

File udp-v10.patch, 61.1 KB (added by Antoine Martin, 3 years ago)

updated patch for r16713

  • xpra/client/client_base.py

     
    232232        raise NotImplementedError()
    233233
    234234    def setup_connection(self, conn):
    235         netlog("setup_connection(%s) timeout=%s, socktype=%s", conn, conn.timeout, conn.socktype)
    236         self._protocol = Protocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
     235        netlog.info("setup_connection(%s) timeout=%s, socktype=%s", conn, conn.timeout, conn.socktype)
     236        if conn.socktype=="udp":
     237            from xpra.net.udp_protocol import UDPClientProtocol
     238            self._protocol = UDPClientProtocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
     239            #use a random uuid:
     240            import random
     241            self._protocol.uuid = random.randint(0, 2**64-1)
     242            self.set_packet_handlers(self._packet_handlers, {
     243                "udp-control"   : self._process_udp_control,
     244                })
     245        else:
     246            self._protocol = Protocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
    237247        self._protocol.large_packets.append("keymap-changed")
    238248        self._protocol.large_packets.append("server-settings")
    239249        self._protocol.large_packets.append("logging")
     
    254264            getChildReaper().add_process(proc, name, command, ignore=True, forget=False)
    255265        netlog("setup_connection(%s) protocol=%s", conn, self._protocol)
    256266
     267    def _process_udp_control(self, packet):
     268        self._protocol.process_control(*packet[1:])
    257269
     270
    258271    def remove_packet_handlers(self, *keys):
    259272        for k in keys:
    260273            for d in (self._packet_handlers, self._ui_packet_handlers):
     
    440453            p.source_has_more()
    441454
    442455    def next_packet(self):
     456        netlog("next_packet() packets in queues: priority=%i, ordinary=%i, mouse=%s", len(self._priority_packets), len(self._ordinary_packets), bool(self._mouse_position))
     457        synchronous = True
    443458        if self._priority_packets:
    444459            packet = self._priority_packets.pop(0)
    445460        elif self._ordinary_packets:
     
    446461            packet = self._ordinary_packets.pop(0)
    447462        elif self._mouse_position is not None:
    448463            packet = self._mouse_position
     464            synchronous = False
    449465            self._mouse_position = None
    450466        else:
    451467            packet = None
     
    452468        has_more = packet is not None and \
    453469                (bool(self._priority_packets) or bool(self._ordinary_packets) \
    454470                 or self._mouse_position is not None)
    455         return packet, None, None, has_more
     471        return packet, None, None, None, synchronous, has_more
    456472
    457473
    458474    def cleanup(self):
     
    867883        self.quit(EXIT_PACKET_FAILURE)
    868884
    869885
    870     def process_packet(self, _proto, packet):
     886    def process_packet(self, proto, packet):
    871887        try:
    872888            handler = None
    873889            packet_type = packet[0]
  • xpra/client/gtk_base/gtk_client_base.py

     
    468468               "configure.pointer"      : True,
    469469               "frame_sizes"            : self.get_window_frame_sizes()
    470470               })
    471         from xpra.client.window_backing_base import DELTA_BUCKETS
    472471        updict(capabilities, "encoding", {
    473472                    "icons.greedy"      : True,         #we don't set a default window icon any more
    474473                    "icons.size"        : (64, 64),     #size we want
    475474                    "icons.max_size"    : (128, 128),   #limit
    476                     "delta_buckets"     : DELTA_BUCKETS,
    477475                    })
     476        from xpra.client import window_backing_base
     477        if self._protocol._conn.socktype=="udp":
     478            #lossy protocol means we can't use delta regions:
     479            log("no delta buckets with udp, since we can drop paint packets")
     480            window_backing_base.DELTA_BUCKETS = 0
     481        updict(capabilities, "encoding", {
     482                    "delta_buckets"     : window_backing_base.DELTA_BUCKETS,
     483                    })
    478484        return capabilities
    479485
    480486
  • xpra/log.py

     
    241241                ("protocol"     , "Packet input and output (formatting, parsing, sending and receiving)"),
    242242                ("websocket"    , "WebSocket layer"),
    243243                ("named-pipe"   , "Named pipe"),
     244                ("udp"          , "UDP"),
    244245                ("crypto"       , "Encryption"),
    245246                ("auth"         , "Authentication"),
    246247                ])),
  • xpra/net/bytestreams.py

     
    277277            i = s
    278278        log("%s.close() for socket=%s", self, i)
    279279        Connection.close(self)
     280        #meaningless for udp:
     281        try:
     282            s.settimeout(0)
     283        except:
     284            pass
    280285        #this is more proper but would break the proxy server:
    281286        #s.shutdown(socket.SHUT_RDWR)
    282287        s.close()
     
    305310        s = self._socket
    306311        if not s:
    307312            return None
    308         return {
    309                 #"class"         : str(type(s)),
    310                 "fileno"        : s.fileno(),
    311                 "timeout"       : int(1000*(s.gettimeout() or 0)),
     313        info = {
    312314                "family"        : FAMILY_STR.get(s.family, s.family),
    313315                "proto"         : s.proto,
    314316                "type"          : PROTOCOL_STR.get(s.type, s.type),
    315317                }
     318        try:
     319            info["timeout"] = int(1000*(s.gettimeout() or 0))
     320        except:
     321            pass
     322        try:
     323            info["fileno"] = s.fileno()
     324        except:
     325            pass
     326        return info
    316327
    317328try:
    318329    #this wrapper class allows us to override the normal ssl.Socket
  • xpra/net/protocol.py

     
    311311                return
    312312            self._internal_error("error in network packet write/format", e, exc_info=True)
    313313
    314     def _add_packet_to_queue(self, packet, start_send_cb=None, end_send_cb=None, has_more=False):
     314    def _add_packet_to_queue(self, packet, start_send_cb=None, end_send_cb=None, fail_cb=None, synchronous=True, has_more=False):
    315315        if has_more:
    316316            self._source_has_more.set()
    317317        if packet is None:
     
    322322            if self._closed:
    323323                return
    324324            try:
    325                 self._add_chunks_to_queue(chunks, start_send_cb, end_send_cb)
     325                self._add_chunks_to_queue(chunks, start_send_cb, end_send_cb, fail_cb, synchronous)
    326326            except:
    327327                log.error("Error: failed to queue '%s' packet", packet[0])
    328                 log("add_chunks_to_queue%s", (chunks, start_send_cb, end_send_cb), exc_info=True)
     328                log("add_chunks_to_queue%s", (chunks, start_send_cb, end_send_cb, fail_cb), exc_info=True)
    329329                raise
    330330
    331     def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None):
     331    def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None, fail_cb=None, synchronous=True):
    332332        """ the write_lock must be held when calling this function """
    333333        counter = 0
    334334        items = []
    335335        for proto_flags,index,level,data in chunks:
    336             scb, ecb = None, None
    337             #fire the start_send_callback just before the first packet is processed:
    338             if counter==0:
    339                 scb = start_send_cb
    340             #fire the end_send callback when the last packet (index==0) makes it out:
    341             if index==0:
    342                 ecb = end_send_cb
    343336            payload_size = len(data)
    344337            actual_size = payload_size
    345338            if self.cipher_out:
     
    360353                assert not self.cipher_out
    361354                #for plain/text packets (ie: gibberish response)
    362355                log("sending %s bytes without header", payload_size)
    363                 items.append((data, scb, ecb))
     356                items.append(data)
    364357            elif actual_size<PACKET_JOIN_SIZE:
    365                 if type(data) not in JOIN_TYPES:
     358                if not isinstance(data, JOIN_TYPES):
    366359                    data = memoryview_to_bytes(data)
    367360                header_and_data = pack_header(proto_flags, level, index, payload_size) + data
    368361                items.append(header_and_data)
     
    371364                items.append(header)
    372365                items.append(data)
    373366            counter += 1
    374         if self._write_thread is None:
    375             self.start_write_thread()
    376         self._write_queue.put((items, scb, ecb))
    377         self.output_packetcount += 1
     367        self.raw_write(items, start_send_cb, end_send_cb, fail_cb, synchronous)
    378368
    379369    def start_write_thread(self):
    380370        self._write_thread = start_thread(self._write_thread_loop, "write", daemon=True)
    381371
    382     def raw_write(self, contents, start_cb=None, end_cb=None):
     372    def raw_write(self, items, start_cb=None, end_cb=None, fail_cb=None, synchronous=True):
    383373        """ Warning: this bypasses the compression and packet encoder! """
    384374        if self._write_thread is None:
    385375            self.start_write_thread()
    386         self._write_queue.put(((contents, start_cb, end_cb), ))
     376        self._write_queue.put((items, start_cb, end_cb, fail_cb, synchronous))
    387377
     378
    388379    def verify_packet(self, packet):
    389380        """ look for None values which may have caused the packet to fail encoding """
    390381        if type(packet)!=list:
     
    604595            return False
    605596        return self.write_items(*items)
    606597
    607     def write_items(self, buf_data, start_cb=None, end_cb=None):
     598    def write_items(self, buf_data, start_cb=None, end_cb=None, fail_cb=None, synchronous=True):
    608599        con = self._conn
    609600        if not con:
    610601            return False
     
    614605            except:
    615606                if not self._closed:
    616607                    log.error("Error on write start callback %s", start_cb, exc_info=True)
    617         self.write_buffers(buf_data)
     608        self.write_buffers(buf_data, fail_cb, synchronous)
    618609        if end_cb:
    619610            try:
    620611                end_cb(self._conn.output_bytecount)
     
    623614                    log.error("Error on write end callback %s", end_cb, exc_info=True)
    624615        return True
    625616
    626     def write_buffers(self, buf_data):
     617    def write_buffers(self, buf_data, _fail_cb, _synchronous):
    627618        con = self._conn
    628619        if not con:
    629620            return 0
     
    967958                    if wait_for_packet_sent():
    968959                        #check again every 100ms
    969960                        self.timeout_add(100, wait_for_packet_sent)
    970                 self._add_chunks_to_queue(chunks, start_send_cb=None, end_send_cb=packet_queued)
     961                self._add_chunks_to_queue(chunks, start_send_cb=None, end_send_cb=packet_queued, synchronous=False)
    971962                #just in case wait_for_packet_sent never fires:
    972963                self.timeout_add(5*1000, close_and_release)
    973964
  • xpra/net/udp_protocol.py

     
     1# This file is part of Xpra.
     2# Copyright (C) 2017 Antoine Martin <antoine@devloop.org.uk>
     3# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
     4# later version. See the file COPYING for details.
     5
     6import socket
     7import struct
     8import random
     9try:
     10    import errno
     11    EMSGSIZE = errno.EMSGSIZE
     12except ImportError as e:
     13    EMSGSIZE = None
     14
     15from xpra.log import Logger
     16log = Logger("network", "protocol", "udp")
     17
     18from xpra.os_util import LINUX, monotonic_time, memoryview_to_bytes
     19from xpra.util import envint, repr_ellipsized
     20from xpra.make_thread import start_thread
     21from xpra.net.protocol import Protocol
     22from xpra.net.bytestreams import SocketConnection
     23
     24READ_BUFFER_SIZE = envint("XPRA_READ_BUFFER_SIZE", 65536)
     25DROP_PCT = envint("XPRA_UDP_DROP_PCT", 0)
     26
     27
     28#UUID, seqno, synchronous, chunk, chunks
     29_header_struct = struct.Struct('!QQHHH')
     30_header_size = _header_struct.size
     31
     32
     33class IncompletePacket(object):
     34    def __init__(self, seqno, start_time, chunks=None):
     35        self.seqno = seqno
     36        self.start_time = start_time
     37        self.last_time = start_time
     38        #todo: use numpy array of bytes
     39        self.chunks = chunks
     40    def __repr__(self):
     41        return ("IncompletePacket(%i: %s chunks)" % (self.seqno, len(self.chunks or [])))
     42
     43
     44class UDPListener(object):
     45    """
     46        This class is used by servers to receive UDP packets,
     47        it parses the header and then exposes the data received via process_packet_cb.
     48    """
     49
     50    def __init__(self, sock, process_packet_cb):
     51        assert sock is not None
     52        self._closed = False
     53        self._socket = sock
     54        self._process_packet_cb =  process_packet_cb
     55        self._read_thread = start_thread(self._read_thread_loop, "read", daemon=True)
     56
     57    def __repr__(self):
     58        return "UDPListener(%s)" % self._socket
     59
     60    def _read_thread_loop(self):
     61        log.info("udp read thread loop starting")
     62        try:
     63            while not self._closed:
     64                buf, bfrom = self._socket.recvfrom(READ_BUFFER_SIZE)
     65                if not buf:
     66                    log("read thread: eof")
     67                    break
     68                values = list(_header_struct.unpack_from(buf[:_header_size])) + [buf[_header_size:], bfrom]
     69                try:
     70                    self._process_packet_cb(self, *values)
     71                except Exception as e:
     72                    log("_read_thread_loop() buffer=%s, from=%s", repr_ellipsized(buf), bfrom, exc_info=True)
     73                    if not self._closed:
     74                        log.error("Error: UDP packet processing error:")
     75                        log.error(" %s", e)
     76        except Exception as e:
     77            #can happen during close(), in which case we just ignore:
     78            if not self._closed:
     79                log.error("Error: read on %s failed: %s", self._socket, type(e), exc_info=True)
     80        log("udp read thread loop ended")
     81        self.close()
     82
     83    def close(self):
     84        s = self._socket
     85        log("UDPListener.close() closed=%s, socket=%s", self._closed, s)
     86        if self._closed:
     87            return
     88        self._closed = True
     89        if s:
     90            try:
     91                log("Protocol.close() calling %s", s.close)
     92                s.close()
     93            except:
     94                log.error("error closing %s", s, exc_info=True)
     95            self._socket = None
     96        log("UDPListener.close() done")
     97
     98
     99class UDPProtocol(Protocol):
     100    """
     101        This class extends the Protocol class with UDP encapsulation.
     102        A single packet may end up being fragmented into multiple UDP frames
     103        to fit in the MTU.
     104        We keep track of the function which can be used to handle send failures
     105        (or the packet data if no function is supplied).
     106        "udp-control" packets are used to synchronize both ends.
     107    """
     108
     109    def __init__(self, *args):
     110        Protocol.__init__(self, *args)
     111        self.mtu = 0
     112        self.last_sequence = -1     #the most recent packet sequence we processed in full
     113        self.highest_sequence = -1
     114        self.jitter = 20            #20ms
     115        self.uuid = 0
     116        self.fail_cb = {}
     117        self.resend_cache = {}
     118        self.incomplete_packets = {}
     119        self.can_skip = set()       #processed already, or cancelled
     120        self.cancel = set()         #tell the other end to forget those
     121        self.control_timer = None
     122        self.control_timer_due = 0
     123        self._process_read = self.process_read
     124
     125    def close(self):
     126        Protocol.close(self)
     127        self.cancel_control_timer()
     128
     129
     130    def schedule_control(self, delay=1000):
     131        due = monotonic_time()+delay/1000.0
     132        if self.control_timer_due and self.control_timer_due<=due:
     133            #due already
     134            return
     135        ct = self.control_timer
     136        if ct:
     137            self.source_remove(ct)
     138        self.control_timer = self.timeout_add(delay, self.send_control)
     139        self.control_timer_due = due
     140
     141    def cancel_control_timer(self):
     142        ct = self.control_timer
     143        if ct:
     144            self.control_timer = None
     145            self.source_remove(ct)
     146
     147    def send_control(self):
     148        self.control_timer = None
     149        self.control_timer_due = 0
     150        if self._closed:
     151            return False
     152        missing = self._get_missing()
     153        packet = ("udp-control", self.mtu, self.last_sequence, self.highest_sequence, missing, tuple(self.cancel))
     154        log("send_control() packet(%s)=%s", self.incomplete_packets, packet)
     155        self.send_async(packet)
     156        self.cancel = set()
     157        self.schedule_control()
     158        return False
     159
     160    def _get_missing(self):
     161        """ the packets and chunks we are missing """
     162        if not self.incomplete_packets:
     163            return {}
     164        now = monotonic_time()
     165        max_start_time = now-self.jitter/1000.0
     166        late_start_time = now-2
     167        not_recent = now-0.5
     168        missing = {}
     169        for seqno, ip in self.incomplete_packets.items():
     170            st = ip.start_time
     171            if st>=max_start_time:
     172                continue        #too recent, may still arrive
     173            if st<late_start_time or ip.last_time<not_recent:
     174                if ip.chunks is None:
     175                    missing[seqno] = []
     176                else:
     177                    #TODO: use bitmap instead?
     178                    missing_chunks = [i for i,x in enumerate(ip.chunks) if x is None]
     179                    if missing_chunks:
     180                        missing[seqno] = missing_chunks
     181        return missing
     182
     183    def process_control(self, mtu, last_seq, high_seq, missing, cancel):
     184        log("process_control(%i, %i, %i, %s, %s)", mtu, last_seq, high_seq, missing, cancel)
     185        con = self._conn
     186        if not con:
     187            return
     188        if mtu and self.mtu==0:
     189            self.mtu = mtu
     190        #first, we can free all the packets that have been processed by the other end:
     191        if last_seq>=0:
     192            done = [x for x in self.fail_cb.keys() if x<=last_seq]
     193            for x in done:
     194                try:
     195                    del self.fail_cb[x]
     196                except:
     197                    pass
     198            done = [x for x in self.resend_cache.keys() if x<=last_seq]
     199            for x in done:
     200                try:
     201                    del self.resend_cache[x]
     202                except:
     203                    pass
     204        #next we can forget about sequence numbers that have been cancelled:
     205        if cancel:
     206            for seqno in cancel:
     207                if seqno>self.last_sequence:
     208                    self.can_skip.add(seqno)
     209                try:
     210                    del self.incomplete_packets[seqno]
     211                except:
     212                    pass
     213            #we may now be able to move forward a bit:
     214            if self.incomplete_packets and (self.last_sequence+1) in self.can_skip:
     215                self.process_incomplete()
     216        #re-send the missing ones:
     217        for seqno, missing_chunks in missing.items():
     218            resend_cache = self.resend_cache.get(seqno)
     219            fail_cb_seq = self.fail_cb.get(seqno)
     220            log("fail_cb[%i]=%s", seqno, repr_ellipsized(str(fail_cb_seq)))
     221            if fail_cb_seq is None and not resend_cache:
     222                log("cannot resend packet sequence %i - assuming we cancelled it already", seqno)
     223                #hope for the best, and tell the other end to stop asking:
     224                self.cancel.add(seqno)
     225                continue
     226            if len(missing_chunks)==0:
     227                #the other end only knows it is missing the seqno,
     228                #not how many chunks are missing, so send them all
     229                missing_chunks = resend_cache.keys()
     230            if fail_cb_seq:
     231                #we have a fail callback for this packet,
     232                #we have to decide if we send the missing chunks or use the callback,
     233                #resend if the other end is missing less than 25% of the chunks:
     234                #TODO: if the latency is low, resending is cheaper..
     235                if len(missing_chunks)>=len(resend_cache)//4:
     236                    #too many are missing, forget about it
     237                    try:
     238                        del self.resend_cache[seqno]
     239                    except:
     240                        pass
     241                    try:
     242                        del self.fail_cb[seqno]
     243                    except:
     244                        pass
     245                    self.cancel.add(seqno)
     246                    fail_cb_seq()
     247                    continue
     248            for c in missing_chunks:
     249                data = resend_cache.get(c)
     250                log("resend data[%i][%i]=%s", seqno, c, repr_ellipsized(str(data)))
     251                if data is None:
     252                    log.error("Error: cannot resend chunk %i of packet sequence %i", c, seqno)
     253                    log.error(" data missing from packet resend cache")
     254                    continue
     255                #send it again:
     256                #TODO: if the mtu is now lower, we should re-send the whole packet,
     257                # with the new chunk size..
     258                con.write(data)
     259
     260
     261    def send_async(self, packet):
     262        chunks = self.encode(packet)
     263        if len(chunks)>1:
     264            return Protocol.send_now(packet)
     265        proto_flags,index,level,data = chunks[0]
     266        from xpra.net.header import pack_header
     267        payload_size = len(data)
     268        header_and_data = pack_header(proto_flags, level, index, payload_size) + data
     269        with self._write_lock:
     270            if self._write_thread is None:
     271                self.start_write_thread()
     272            self._write_queue.put((header_and_data, None, None, None, False))
     273
     274    def process_udp_data(self, uuid, seqno, synchronous, chunk, chunks, data, _bfrom):
     275        #log("process_udp_data%s %i bytes", (uuid, seqno, synchronous, chunk, chunks, repr_ellipsized(data), bfrom), len(data))
     276        assert uuid==self.uuid
     277        if DROP_PCT>0:
     278            if random.randint(0, 100) <= DROP_PCT:
     279                log.warn("Warning: dropping udp packet %5i.%i", seqno, chunk)
     280                return
     281        if seqno<=self.last_sequence:
     282            #must be a duplicate, we've already processed it!
     283            return
     284        self.highest_sequence = max(self.highest_sequence, seqno)
     285        if self.incomplete_packets or (synchronous and seqno!=self.last_sequence+1) or chunk!=0 or chunks!=1:
     286            assert chunk>=0 and chunks>0 and chunk<chunks, "invalid chunk: %i/%i" % (chunk, chunks)
     287            #slow path: add chunk to incomplete packet
     288            now = monotonic_time()
     289            ip = self.incomplete_packets.get(seqno)
     290            if not ip or not ip.chunks:
     291                chunks_array = [None for _ in range(chunks)]
     292                ip = IncompletePacket(seqno, now, chunks_array)
     293                self.incomplete_packets[seqno] = ip
     294            else:
     295                ip.last_time = now
     296            ip.chunks[chunk] = data
     297            if seqno!=self.last_sequence+1:
     298                #we're waiting for a packet and this is not it,
     299                #make sure any gaps are marked as incomplete:
     300                for i in range(self.last_sequence+1, seqno):
     301                    if i not in self.incomplete_packets and i not in self.can_skip:
     302                        self.incomplete_packets[i] = IncompletePacket(i, now)
     303                #make sure we request the missing packets:
     304                self.schedule_control(self.jitter)
     305                if synchronous:
     306                    #we have to wait for the missing chunks / packets
     307                    log("process_udp_data: we're waiting for %i, not %i", self.last_sequence+1, seqno)
     308                    return
     309            if any(x is None for x in ip.chunks):
     310                #one of the chunks is still missing
     311                log("process_udp_data: sequence %i is still missing some chunks: %s", seqno, [i for i,x in enumerate(ip.chunks) if x is None])
     312                self.schedule_control(self.jitter)
     313                return
     314            #all the data is here!
     315            del self.incomplete_packets[seqno]
     316            data = b"".join(ip.chunks)
     317        #log("process_udp_data: adding packet sequence %i to read queue", seqno)
     318        if seqno==self.last_sequence+1:
     319            self.last_sequence = seqno
     320        else:
     321            assert not synchronous
     322            self.can_skip.add(seqno)
     323        self._read_queue_put(data)
     324        if self.incomplete_packets:
     325            self.process_incomplete()
     326
     327    def process_incomplete(self):
     328        #maybe we can send the next one(s) now?
     329        seqno = self.last_sequence
     330        log("process_incomplete() last_sequence=%i, can skip=%s", seqno, self.can_skip)
     331        while True:
     332            seqno += 1
     333            if seqno in self.can_skip:
     334                try:
     335                    del self.incomplete_packets[seqno]
     336                except KeyError:
     337                    pass
     338                self.can_skip.remove(seqno)
     339                self.last_sequence = seqno
     340                continue
     341            ip = self.incomplete_packets.get(seqno)
     342            if not ip or not ip.chunks:
     343                #it's missing, we just don't know how many chunks
     344                return
     345            if any(x is None for x in ip.chunks):
     346                #one of the chunks is still missing
     347                return
     348            #all the data is here!
     349            del self.incomplete_packets[seqno]
     350            data = b"".join(ip.chunks)
     351            log("process_incomplete: adding packet sequence %i to read queue", seqno)
     352            self.last_sequence = seqno
     353            self._read_queue_put(data)
     354
     355
     356    def write_buffers(self, buf_data, fail_cb, synchronous):
     357        buf = b"".join(memoryview_to_bytes(x) for x in buf_data)
     358        #if not isinstance(buf, JOIN_TYPES):
     359        #    buf = memoryview_to_bytes(buf)
     360        while True:
     361            try:
     362                seqno = self.output_packetcount
     363                return self.send_buf(seqno, buf, fail_cb, synchronous)
     364            except MTUExceeded as e:
     365                log.warn("%s: %s", e, self.mtu)
     366                if self.mtu>576:
     367                    self.mtu //= 2
     368                raise
     369
     370    def send_buf(self, seqno, data, fail_cb, synchronous):
     371        con = self._conn
     372        if not con:
     373            return 0
     374        #TODO: bump to 1280 for IPv6
     375        #mtu = max(576, self.mtu)
     376        mtu = max(1280, self.mtu)
     377        l = len(data)
     378        maxpayload = mtu-_header_size
     379        chunks = l // maxpayload
     380        if l % maxpayload > 0:
     381            chunks += 1
     382        #log("UDP.send_buf(%s, %i bytes, %s, %s) seq=%i, mtu=%s, maxpayload=%i, chunks=%i, data=%s", con, l, fail_cb, synchronous, seqno, mtu, maxpayload, chunks, repr_ellipsized(data))
     383        chunk = 0
     384        offset = 0
     385        if fail_cb:
     386            self.fail_cb[seqno] = fail_cb
     387        chunk_resend_cache = self.resend_cache.setdefault(seqno, {})
     388        while offset<l:
     389            assert chunk<chunks
     390            pl = min(maxpayload, l-offset)
     391            data_chunk = data[offset:offset+pl]
     392            udp_data = _header_struct.pack(self.uuid, seqno, synchronous, chunk, chunks) + data_chunk
     393            assert len(udp_data)<=mtu, "invalid payload size: %i greater than mtu %i" % (len(udp_data), mtu)
     394            con.write(udp_data)
     395            self.output_raw_packetcount += 1
     396            offset += pl
     397            if chunk_resend_cache is not None:
     398                chunk_resend_cache[chunk] = udp_data
     399            chunk += 1
     400        assert chunk==chunks, "wrote %i chunks but expected %i" % (chunk, chunks)
     401        self.output_packetcount += 1
     402        if not self.control_timer:
     403            self.control_timer = self.timeout_add(1000, self.send_control)
     404        return offset
     405
     406
     407    def get_info(self, alias_info=True):
     408        i = Protocol.get_info(self, alias_info)
     409        i["mtu"] = self.mtu
     410        return i
     411
     412
     413class UDPServerProtocol(UDPProtocol):
     414
     415    def _read_thread_loop(self):
     416        #server protocol is not used to read,
     417        #we rely on the listener to dispatch packets instead
     418        pass
     419
     420class UDPClientProtocol(UDPProtocol):
     421
     422    def con_write(self, data, fail_cb):
     423        """ After successfully writing some data, update the mtu value """
     424        r = UDPProtocol.con_write(self, data, fail_cb)
     425        if r>0 and LINUX:
     426            IP_MTU = 14
     427            con = self._conn
     428            if con:
     429                try:
     430                    self.mtu = min(32767, con._socket.getsockopt(socket.IPPROTO_IP, IP_MTU))
     431                    #log("mtu=%s", self.mtu)
     432                except IOError as e:
     433                    pass
     434        return r
     435
     436    def process_read(self, buf):
     437        """ Splits and parses the UDP frame header from the packet """
     438        #log.info("UDPClientProtocol.read_queue_put(%s)", repr_ellipsized(buf))
     439        uuid, seqno, synchronous, chunk, chunks = _header_struct.unpack_from(buf[:_header_size])
     440        data = buf[_header_size:]
     441        bfrom = None        #not available here..
     442        self.process_udp_data(uuid, seqno, synchronous, chunk, chunks, data, bfrom)
     443
     444
     445class UDPSocketConnection(SocketConnection):
     446    """
     447        This class extends SocketConnection to use socket.sendto
     448        to send data to the correct destination.
     449        (servers use a single socket to talk to multiple clients,
     450        they do not call connect() and so we have to specify the remote target every time)
     451    """
     452
     453    def __init__(self, *args):
     454        SocketConnection.__init__(self, *args)
     455
     456    def write(self, buf):
     457        #log("UDPSocketConnection: sending %i bytes to %s", len(buf), self.remote)
     458        try:
     459            return self._socket.sendto(buf, self.remote)
     460        except IOError as e:
     461            if e.errno==EMSGSIZE:
     462                raise MTUExceeded("invalid UDP payload size, cannot send %i bytes: %s" % (len(buf), e))
     463            raise
     464
     465    def close(self):
     466        """
     467            don't close the socket, we're don't own it
     468        """
     469        pass
     470
     471class MTUExceeded(IOError):
     472    pass
  • xpra/scripts/config.py

     
    437437                    "auth"              : str,
    438438                    "vsock-auth"        : str,
    439439                    "tcp-auth"          : str,
     440                    "udp-auth"          : str,
    440441                    "ws-auth"           : str,
    441442                    "wss-auth"          : str,
    442443                    "ssl-auth"          : str,
     
    594595                    "bind"              : list,
    595596                    "bind-vsock"        : list,
    596597                    "bind-tcp"          : list,
     598                    "bind-udp"          : list,
    597599                    "bind-ws"           : list,
    598600                    "bind-wss"          : list,
    599601                    "bind-ssl"          : list,
     
    615617    "start-after-connect", "start-child-after-connect",
    616618    "start-on-connect", "start-child-on-connect",
    617619    ]
    618 BIND_OPTIONS = ["bind", "bind-tcp", "bind-ssl", "bind-ws", "bind-wss", "bind-vsock"]
     620BIND_OPTIONS = ["bind", "bind-tcp", "bind-udp", "bind-ssl", "bind-ws", "bind-wss", "bind-vsock"]
    619621
    620622#keep track of the options added since v1,
    621623#so we can generate command lines that work with older supported versions:
     
    679681    "av-sync", "global-menus",
    680682    "printing", "file-transfer", "open-command", "open-files", "start-new-commands",
    681683    "mmap", "mmap-group", "mdns",
    682     "auth", "vsock-auth", "tcp-auth", "ws-auth", "wss-auth", "ssl-auth", "rfb-auth",
    683     "bind", "bind-vsock", "bind-tcp", "bind-ssl", "bind-ws", "bind-wss", "bind-rfb",
     684    "auth", "vsock-auth", "tcp-auth", "udp-auth", "ws-auth", "wss-auth", "ssl-auth", "rfb-auth",
     685    "bind", "bind-vsock", "bind-tcp", "bind-udp", "bind-ssl", "bind-ws", "bind-wss", "bind-rfb",
    684686    "start", "start-child",
    685687    "start-after-connect", "start-child-after-connect",
    686688    "start-on-connect", "start-child-on-connect",
     
    799801                    "auth"              : "",
    800802                    "vsock-auth"        : "",
    801803                    "tcp-auth"          : "",
     804                    "udp-auth"          : "",
    802805                    "ws-auth"           : "",
    803806                    "wss-auth"          : "",
    804807                    "ssl-auth"          : "",
     
    946949                    "bind"              : bind_dirs,
    947950                    "bind-vsock"        : [],
    948951                    "bind-tcp"          : [],
     952                    "bind-udp"          : [],
    949953                    "bind-ws"           : [],
    950954                    "bind-wss"          : [],
    951955                    "bind-ssl"          : [],
  • xpra/scripts/main.py

     
    534534                          metavar="[HOST]:[PORT]",
    535535                          help="Listen for connections over TCP (use --tcp-auth to secure it)."
    536536                            + " You may specify this option multiple times with different host and port combinations")
     537        group.add_option("--bind-udp", action="append",
     538                          dest="bind_udp", default=list(defaults.bind_udp or []),
     539                          metavar="[HOST]:[PORT]",
     540                          help="Listen for connections over UDP (use --udp-auth to secure it)."
     541                            + " You may specify this option multiple times with different host and port combinations")
    537542        group.add_option("--bind-ws", action="append",
    538543                          dest="bind_ws", default=list(defaults.bind_ws or []),
    539544                          metavar="[HOST]:[PORT]",
     
    558563        ignore({
    559564            "bind"      : defaults.bind,
    560565            "bind-tcp"  : defaults.bind_tcp,
     566            "bind-udp"  : defaults.bind_udp,
    561567            "bind-ws"   : defaults.bind_ws,
    562568            "bind-wss"  : defaults.bind_wss,
    563569            "bind-ssl"  : defaults.bind_ssl,
     
    974980    group.add_option("--tcp-auth", action="store",
    975981                      dest="tcp_auth", default=defaults.tcp_auth,
    976982                      help="The authentication module to use for TCP sockets (default: '%default')")
     983    group.add_option("--udp-auth", action="store",
     984                      dest="udp_auth", default=defaults.udp_auth,
     985                      help="The authentication module to use for UDP sockets (default: '%default')")
    977986    group.add_option("--ws-auth", action="store",
    978987                      dest="ws_auth", default=defaults.ws_auth,
    979988                      help="The authentication module to use for Websockets (default: '%default')")
     
    11631172    #    elif tcp_encryption:
    11641173    #        raise InitException("tcp-encryption %s should not use the same file as the password authentication file" % tcp_encryption)
    11651174
    1166 def dump_frames(*arsg):
     1175def dump_frames(*_args):
    11671176    frames = sys._current_frames()
    11681177    print("")
    11691178    print("found %s frames:" % len(frames))
     
    12411250
    12421251    #register posix signals for debugging:
    12431252    if POSIX:
    1244         def sigusr1(*args):
     1253        def sigusr1(*_args):
    12451254            dump_frames()
    12461255        signal.signal(signal.SIGUSR1, sigusr1)
    12471256
     
    16861695        if opts.socket_dir:
    16871696            desc["socket_dir"] = opts.socket_dir
    16881697        return desc
    1689     elif display_name.startswith("tcp:") or display_name.startswith("tcp/") or \
    1690             display_name.startswith("ssl:") or display_name.startswith("ssl/"):
    1691         ctype = display_name[:3]        #ie: "ssl" or "tcp"
    1692         separator = display_name[3]     # ":" or "/"
     1698    elif (
     1699        display_name.startswith("tcp:") or display_name.startswith("tcp/") or \
     1700        display_name.startswith("ssl:") or display_name.startswith("ssl/") or \
     1701        display_name.startswith("udp:") or display_name.startswith("udp/")
     1702        ):
     1703        ctype = display_name[:3]                #ie: "ssl" or "tcp"
     1704        separator = display_name[len(ctype)]    # ":" or "/"
    16931705        desc.update({
    16941706                     "type"     : ctype,
    16951707                     })
     
    20732085        from xpra.net.bytestreams import SocketConnection
    20742086        return SocketConnection(sock, "local", "host", (CID_TYPES.get(cid, cid), iport), dtype)
    20752087
    2076     elif dtype in ("tcp", "ssl", "ws", "wss"):
     2088    elif dtype in ("tcp", "ssl", "ws", "wss", "udp"):
    20772089        if display_desc.get("ipv6"):
    20782090            assert socket.has_ipv6, "no IPv6 support"
    20792091            family = socket.AF_INET6
     
    20892101                socket.AF_INET  : "IPv4",
    20902102                }.get(family, family), (host, port), e))
    20912103        sockaddr = addrinfo[0][-1]
    2092         sock = socket.socket(family, socket.SOCK_STREAM)
    2093         sock.settimeout(SOCKET_TIMEOUT)
    2094         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, TCP_NODELAY)
     2104        if dtype=="udp":
     2105            opts.mmap = False
     2106            sock = socket.socket(family, socket.SOCK_DGRAM)
     2107        else:
     2108            sock = socket.socket(family, socket.SOCK_STREAM)
     2109            sock.settimeout(SOCKET_TIMEOUT)
     2110            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, TCP_NODELAY)
    20952111        strict_host_check = display_desc.get("strict-host-check")
    20962112        if strict_host_check is False:
    20972113            opts.ssl_server_verify_mode = "none"
     
    24012417                from xpra.codecs.loader import encodings_help
    24022418                encodings = ["auto"] + app.get_encodings()
    24032419                raise InitInfo(info+"%s xpra client supports the following encodings:\n * %s" % (app.client_toolkit(), "\n * ".join(encodings_help(encodings))))
    2404         def handshake_complete(*args):
     2420        def handshake_complete(*_args):
    24052421            from xpra.log import Logger
    24062422            log = Logger()
    24072423            log.info("Attached to %s (press Control-C to detach)\n", conn.target)
  • xpra/scripts/server.py

     
    4444    _cleanups.append(f)
    4545
    4646
    47 def deadly_signal(signum, frame):
     47def deadly_signal(signum, _frame):
    4848    info("got deadly signal %s, exiting\n" % SIGNAMES.get(signum, signum))
    4949    run_cleanups()
    5050    # This works fine in tests, but for some reason if I use it here, then I
     
    357357    if opts.encoding=="help" or "help" in opts.encodings:
    358358        return show_encoding_help(opts)
    359359
    360     from xpra.server.socket_util import parse_bind_tcp, parse_bind_vsock
    361     bind_tcp = parse_bind_tcp(opts.bind_tcp)
    362     bind_ssl = parse_bind_tcp(opts.bind_ssl)
    363     bind_ws = parse_bind_tcp(opts.bind_ws)
    364     bind_wss = parse_bind_tcp(opts.bind_wss)
    365     bind_rfb = parse_bind_tcp(opts.bind_rfb)
     360    from xpra.server.socket_util import parse_bind_ip, parse_bind_vsock
     361    bind_tcp = parse_bind_ip(opts.bind_tcp)
     362    bind_udp = parse_bind_ip(opts.bind_udp)
     363    bind_ssl = parse_bind_ip(opts.bind_ssl)
     364    bind_ws  = parse_bind_ip(opts.bind_ws)
     365    bind_wss = parse_bind_ip(opts.bind_wss)
     366    bind_rfb = parse_bind_ip(opts.bind_rfb)
    366367    bind_vsock = parse_bind_vsock(opts.bind_vsock)
    367368
    368369    assert mode in ("start", "start-desktop", "upgrade", "shadow", "proxy")
     
    589590            cpaths = csv("'%s'" % x for x in (opts.ssl_cert, opts.ssl_key) if x)
    590591            raise InitException("cannot create SSL socket, check your certificate paths (%s): %s" % (cpaths, e))
    591592
     593    from xpra.server.socket_util import setup_tcp_socket, setup_udp_socket, setup_vsock_socket, setup_local_sockets
    592594    def add_mdns(socktype, host, port):
    593595        recs = mdns_recs.setdefault(socktype.lower(), [])
    594596        rec = (host, port)
     
    598600        socket = setup_tcp_socket(host, iport, socktype)
    599601        sockets.append(socket)
    600602        add_mdns(socktype, host, iport)
    601 
     603    def add_udp_socket(socktype, host, iport):
     604        socket = setup_udp_socket(host, iport, socktype)
     605        sockets.append(socket)
     606        add_mdns(socktype, host, iport)
    602607    # Initialize the TCP sockets before the display,
    603608    # That way, errors won't make us kill the Xvfb
    604609    # (which may not be ours to kill at that point)
    605     from xpra.server.socket_util import setup_tcp_socket, setup_vsock_socket, setup_local_sockets
    606610    netlog("setting up SSL sockets: %s", bind_ssl)
    607611    for host, iport in bind_ssl:
    608612        add_tcp_socket("SSL", host, iport)
     
    615619        add_tcp_socket("tcp", host, iport)
    616620        if tcp_ssl:
    617621            add_mdns("ssl", host, iport)
     622    netlog("setting up UDP sockets: %s", bind_udp)
     623    for host, iport in bind_udp:
     624        add_udp_socket("udp", host, iport)
    618625    netlog("setting up http / ws (websockets): %s", bind_ws)
    619626    for host, iport in bind_ws:
    620627        add_tcp_socket("ws", host, iport)
  • xpra/server/server_core.py

     
    151151        #networking bits:
    152152        self._socket_info = []
    153153        self._potential_protocols = []
     154        self._udp_listeners = []
     155        self._udp_protocols = {}
    154156        self._tcp_proxy_clients = []
    155157        self._tcp_proxy = ""
    156         self._ssl_wrap_socket = None
     158        self._ssl_wrap_server_socket = None
     159        self._ssl_wrap_client_socket = None
    157160        self._accept_timeout = SOCKET_TIMEOUT + 1
    158161        self.ssl_mode = None
    159162        self._html = False
     
    285288            self.auth_classes["named-pipes"] = auth
    286289        else:
    287290            self.auth_classes["unix-domain"] = auth
    288         for x in ("tcp", "ws", "wss", "ssl", "rfb", "vsock"):
     291        for x in ("tcp", "ws", "wss", "ssl", "rfb", "vsock", "udp"):
    289292            opts_value = getattr(opts, "%s_auth" % x)
    290293            self.auth_classes[x] = self.get_auth_module(x, opts_value, opts)
    291294        authlog("init_auth(..) auth=%s", self.auth_classes)
     
    377380        self._default_packet_handlers = {
    378381            "hello":                                self._process_hello,
    379382            "disconnect":                           self._process_disconnect,
     383            "udp-control":                          self._process_udp_control,
    380384            Protocol.CONNECTION_LOST:               self._process_connection_lost,
    381385            Protocol.GIBBERISH:                     self._process_gibberish,
    382386            Protocol.INVALID:                       self._process_invalid,
     
    532536        self.do_cleanup()
    533537        self.cleanup_protocols(protocols, reason, True)
    534538        self._potential_protocols = []
     539        self.cleanup_udp_listeners()
    535540
    536541    def do_cleanup(self):
    537542        #allow just a bit of time for the protocol packet flush
     
    538543        sleep(0.1)
    539544
    540545
     546    def cleanup_udp_listeners(self):
     547        for udpl in self._udp_listeners:
     548            udpl.close()
     549        self._udp_listeners = []
     550
    541551    def cleanup_all_protocols(self, reason):
    542552        protocols = self.get_all_protocols()
    543553        self.cleanup_protocols(protocols, reason)
     
    563573            #named pipe listener uses a thread:
    564574            sock.new_connection_cb = self._new_connection
    565575            sock.start()
     576        elif socktype=="udp":
     577            #socket_info = self.socket_info.get(sock)
     578            from xpra.net.udp_protocol import UDPListener
     579            udpl = UDPListener(sock, self.process_udp_packet)
     580            self._udp_listeners.append(udpl)
    566581        else:
    567582            from xpra.gtk_common.gobject_compat import import_glib
    568583            glib = import_glib()
     
    10341049            if not proto._closed:
    10351050                self._log_disconnect(proto, "Connection lost")
    10361051            self._potential_protocols.remove(proto)
     1052        #remove from UDP protocol map:
     1053        uuid = getattr(proto, "uuid", None)
     1054        if uuid and uuid in self._udp_protocols:
     1055            del self._udp_protocols[uuid]
    10371056
    10381057    def _process_gibberish(self, proto, packet):
    10391058        (_, message, data) = packet
     
    14621481    def handle_rfb_connection(self, conn):
    14631482        log.error("Error: RFB protocol is not supported by this server")
    14641483        conn.close()
     1484
     1485
     1486    def _process_udp_control(self, proto, packet):
     1487        proto.process_control(*packet[1:])
     1488
     1489    def process_udp_packet(self, udp_listener, uuid, seqno, synchronous, chunk, chunks, data, bfrom):
     1490        #log.info("process_udp_packet%s", (udp_listener, uuid, seqno, synchronous, chunk, chunks, len(data), bfrom))
     1491        protocol = self._udp_protocols.get(uuid)
     1492        if not protocol:
     1493            from xpra.net.udp_protocol import UDPServerProtocol, UDPSocketConnection
     1494            def udp_protocol_class(conn):
     1495                protocol = UDPServerProtocol(self, conn, self.process_packet)
     1496                protocol.uuid = uuid
     1497                protocol.large_packets.append("info-response")
     1498                protocol.receive_aliases.update(self._aliases)
     1499                return protocol
     1500            socktype = "udp"
     1501            host, port = bfrom
     1502            sock = udp_listener._socket
     1503            sockname = sock.getsockname()
     1504            conn = UDPSocketConnection(sock, sockname, (host, port), (host, port), socktype)
     1505            conn.timeout = SOCKET_TIMEOUT
     1506            protocol = self.do_make_protocol(socktype, conn, udp_protocol_class)
     1507            self._udp_protocols[uuid] = protocol
     1508        else:
     1509            #update remote address in case the client is roaming:
     1510            conn = protocol._conn
     1511            if conn:
     1512                conn.remote = bfrom
     1513        protocol.process_udp_data(uuid, seqno, synchronous, chunk, chunks, data, bfrom)
  • xpra/server/socket_util.py

     
    108108        log("create_tcp_socket%s", (host, iport), exc_info=True)
    109109        raise InitException("failed to setup %s socket on %s:%s %s" % (socktype, host, iport, e))
    110110    def cleanup_tcp_socket():
    111         log.info("closing %s socket %s:%s", socktype, host, iport)
     111        log.info("closing %s socket %s:%s", socktype.lower(), host, iport)
    112112        try:
    113113            tcp_socket.close()
    114114        except:
     
    117117    log("%s: %s:%s : %s", socktype, host, iport, socket)
    118118    return socktype, tcp_socket, (host, iport)
    119119
     120def create_udp_socket(host, iport):
     121    if host.find(":")<0:
     122        listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
     123        sockaddr = (host, iport)
     124    else:
     125        assert socket.has_ipv6, "specified an IPv6 address but this is not supported"
     126        res = socket.getaddrinfo(host, iport, socket.AF_INET6, socket.SOCK_DGRAM)
     127        listener = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
     128        sockaddr = res[0][-1]
     129    listener.bind(sockaddr)
     130    return listener
    120131
    121 def parse_bind_tcp(bind_tcp):
    122     tcp_sockets = set()
    123     if bind_tcp:
    124         for spec in bind_tcp:
     132def setup_udp_socket(host, iport, socktype="udp"):
     133    from xpra.log import Logger
     134    log = Logger("network")
     135    try:
     136        udp_socket = create_udp_socket(host, iport)
     137    except Exception as e:
     138        log("create_udp_socket%s", (host, iport), exc_info=True)
     139        raise InitException("failed to setup %s socket on %s:%s %s" % (socktype, host, iport, e))
     140    def cleanup_udp_socket():
     141        log.info("closing %s socket %s:%s", socktype, host, iport)
     142        try:
     143            udp_socket.close()
     144        except:
     145            pass
     146    add_cleanup(cleanup_udp_socket)
     147    log("%s: %s:%s : %s", socktype, host, iport, socket)
     148    return socktype, udp_socket, (host, iport)
     149
     150
     151def parse_bind_ip(bind_ip):
     152    ip_sockets = set()
     153    if bind_ip:
     154        for spec in bind_ip:
    125155            if ":" not in spec:
    126                 raise InitException("TCP port must be specified as [HOST]:PORT")
     156                raise InitException("port must be specified as [HOST]:PORT")
    127157            host, port = spec.rsplit(":", 1)
    128158            if host == "":
    129159                host = "127.0.0.1"
     
    135165                    assert iport>0 and iport<2**16
    136166                except:
    137167                    raise InitException("invalid port number: %s" % port)
    138             tcp_sockets.add((host, iport))
    139     return tcp_sockets
     168            ip_sockets.add((host, iport))
     169    return ip_sockets
    140170
    141171def setup_vsock_socket(cid, iport):
    142172    from xpra.log import Logger
  • xpra/server/source.py

     
    11201120        self.update_av_sync_delay_total()
    11211121
    11221122    def new_sound_buffer(self, sound_source, data, metadata, packet_metadata=[]):
    1123         soundlog("new_sound_buffer(%s, %s, %s, %s) suspended=%s",
    1124                  sound_source, len(data or []), metadata, [len(x) for x in packet_metadata], self.suspended)
     1123        soundlog("new_sound_buffer(%s, %s, %s, %s) info=%s, suspended=%s",
     1124                 sound_source, len(data or []), metadata, [len(x) for x in packet_metadata], sound_source.info, self.suspended)
    11251125        if self.sound_source!=sound_source or self.is_closed():
    11261126            soundlog("sound buffer dropped: from old source or closed")
    11271127            return
     
    11371137            else:
    11381138                #the packet metadata is compressed already:
    11391139                packet_metadata = Compressed("packet metadata", packet_metadata, can_inline=True)
    1140         self.send_sound_data(sound_source, data, metadata, packet_metadata)
     1140        #don't drop the first 10 buffers
     1141        can_drop_packet = (sound_source.info or {}).get("buffer_count", 0)>10
     1142        self.send_sound_data(sound_source, data, metadata, packet_metadata, can_drop_packet)
    11411143
    1142     def send_sound_data(self, sound_source, data, metadata={}, packet_metadata=()):
     1144    def send_sound_data(self, sound_source, data, metadata={}, packet_metadata=(), can_drop_packet=False):
    11431145        packet_data = [sound_source.codec, Compressed(sound_source.codec, data), metadata]
    11441146        if packet_metadata:
    11451147            assert self.sound_bundle_metadata
    11461148            packet_data.append(packet_metadata)
    1147         if sound_source.sequence>=0:
    1148             metadata["sequence"] = sound_source.sequence
    1149         self.send("sound-data", *packet_data)
     1149        sequence = sound_source.sequence
     1150        if sequence>=0:
     1151            metadata["sequence"] = sequence
     1152        fail_cb = None
     1153        if can_drop_packet:
     1154            def sound_data_fail_cb():
     1155                #ideally we would tell gstreamer to send an audio "key frame"
     1156                #or synchronization point to ensure the stream recovers
     1157                soundlog("a sound data buffer was not received and will not be resent")
     1158            fail_cb = sound_data_fail_cb
     1159        self._send(fail_cb, False, "sound-data", *packet_data)
    11501160
    11511161    def stop_receiving_sound(self):
    11521162        ss = self.sound_sink
     
    13951405            return
    13961406        if self.mouse_last_position!=(x, y, rx, ry):
    13971407            self.mouse_last_position = (x, y, rx, ry)
    1398             self.send("pointer-position", wid, x, y, rx, ry)
     1408            self.send_async("pointer-position", wid, x, y, rx, ry)
    13991409
    14001410#
    14011411# Functions for interacting with the network layer:
     
    14021412#
    14031413    def next_packet(self):
    14041414        """ Called by protocol.py when it is ready to send the next packet """
    1405         packet, start_send_cb, end_send_cb, have_more = None, None, None, False
     1415        packet, start_send_cb, end_send_cb, fail_cb, synchronous, have_more = None, None, None, None, True, False
    14061416        if not self.is_closed():
    14071417            if len(self.ordinary_packets)>0:
    1408                 packet = self.ordinary_packets.pop(0)
     1418                packet, synchronous, fail_cb = self.ordinary_packets.pop(0)
    14091419            elif len(self.packet_queue)>0:
    1410                 packet, _, _, start_send_cb, end_send_cb = self.packet_queue.popleft()
     1420                packet, _, _, start_send_cb, end_send_cb, fail_cb = self.packet_queue.popleft()
    14111421            have_more = packet is not None and (len(self.ordinary_packets)>0 or len(self.packet_queue)>0)
    1412         return packet, start_send_cb, end_send_cb, have_more
     1422        return packet, start_send_cb, end_send_cb, fail_cb, synchronous, have_more
    14131423
    14141424    def send(self, *parts):
    14151425        """ This method queues non-damage packets (higher priority) """
    1416         self.ordinary_packets.append(parts)
     1426        self._send(None, True, *parts)
     1427
     1428    def send_async(self, *parts):
     1429        self._send(None, False, *parts)
     1430
     1431    def _send(self, fail_cb=None, synchronous=True, *parts):
     1432        """ This method queues non-damage packets (higher priority) """
     1433        #log.info("_send%s", (fail_cb, synchronous, parts))
    14171434        p = self.protocol
    14181435        if p:
     1436            self.ordinary_packets.append((parts, synchronous, fail_cb))
    14191437            p.source_has_more()
    14201438
     1439
    14211440#
    14221441# Functions used by the server to request something
    14231442# (window events, stats, user requests, etc)
     
    15101529        if self.last_ping_echoed_time>0:
    15111530            lpe = int(monotonic_time()*1000-self.last_ping_echoed_time)
    15121531        info = {
     1532                "protocol"          : "xpra",
    15131533                "version"           : self.client_version or "unknown",
    15141534                "revision"          : self.client_revision or "unknown",
    15151535                "platform_name"     : platform_name(self.client_platform, self.client_release),
     
    16911711            v = notypedict(info)
    16921712        else:
    16931713            v = flatten_dict(info)
    1694         self.send("info-response", v)
     1714        self.send_async("info-response", v)
    16951715
    16961716
    16971717    def send_server_event(self, *args):
     
    17021722    def send_clipboard_enabled(self, reason=""):
    17031723        if not self.hello_sent:
    17041724            return
    1705         self.send("set-clipboard-enabled", self.clipboard_enabled, reason)
     1725        self.send_async("set-clipboard-enabled", self.clipboard_enabled, reason)
    17061726
    17071727    def send_clipboard_progress(self, count):
    17081728        if not self.clipboard_notifications or not self.hello_sent:
     
    18231843    def bell(self, wid, device, percent, pitch, duration, bell_class, bell_id, bell_name):
    18241844        if not self.send_bell or self.suspended or not self.hello_sent:
    18251845            return
    1826         self.send("bell", wid, device, percent, pitch, duration, bell_class, bell_id, bell_name)
     1846        self.send_async("bell", wid, device, percent, pitch, duration, bell_class, bell_id, bell_name)
    18271847
    18281848    def notify(self, dbus_id, nid, app_name, replaces_nid, app_icon, summary, body, expire_timeout):
    18291849        if not self.send_notifications:
     
    18331853            notifylog("client %s is suspended, notification not sent", self)
    18341854            return False
    18351855        if self.hello_sent:
    1836             self.send("notify_show", dbus_id, int(nid), str(app_name), int(replaces_nid), str(app_icon), str(summary), str(body), int(expire_timeout))
     1856            self.send_async("notify_show", dbus_id, int(nid), str(app_name), int(replaces_nid), str(app_icon), str(summary), str(body), int(expire_timeout))
    18371857        return True
    18381858
    18391859    def notify_close(self, nid):
     
    18471867
    18481868    def send_webcam_ack(self, device, frame, *args):
    18491869        if self.hello_sent:
    1850             self.send("webcam-ack", device, frame, *args)
     1870            self.send_async("webcam-ack", device, frame, *args)
    18511871
    18521872    def send_webcam_stop(self, device, message):
    18531873        if self.hello_sent:
    1854             self.send("webcam-stop", device, message)
     1874            self.send_async("webcam-stop", device, message)
    18551875
    18561876
    18571877    def set_printers(self, printers, password_file, auth, encryption, encryption_keyfile):
     
    19721992        #NOTE: all ping time/echo time/load avg values are in milliseconds
    19731993        now_ms = int(1000*monotonic_time())
    19741994        log("sending ping to %s with time=%s", self.protocol, now_ms)
    1975         self.send("ping", now_ms)
     1995        self.send_async("ping", now_ms)
    19761996        timeout = 60
    19771997        def check_echo_timeout():
    19781998            if self.last_ping_echoed_time<now_ms and not self.is_closed():
     
    19922012            cl = int(1000.0*cl)
    19932013        else:
    19942014            cl = -1
    1995         self.send("ping_echo", time_to_echo, l1, l2, l3, cl)
     2015        self.send_async("ping_echo", time_to_echo, l1, l2, l3, cl)
    19962016        #if the client is pinging us, ping it too:
    19972017        self.timeout_add(500, self.ping)
    19982018
     
    20192039
    20202040    def show_desktop(self, show):
    20212041        if self.show_desktop_allowed and self.hello_sent:
    2022             self.send("show-desktop", show)
     2042            self.send_async("show-desktop", show)
    20232043
    20242044    def initiate_moveresize(self, wid, window, x_root, y_root, direction, button, source_indication):
    20252045        if not self.can_send_window(window) or not self.window_initiate_moveresize:
     
    20872107        metadata = {}
    20882108        for propname in list(window.get_property_names()):
    20892109            metadata.update(self._make_metadata(window, propname))
    2090         self.send("new-tray", wid, w, h, metadata)
     2110        self.send_async("new-tray", wid, w, h, metadata)
    20912111
    20922112    def new_window(self, ptype, wid, window, x, y, w, h, client_properties):
    20932113        if not self.can_send_window(window):
     
    21052125                metalog("make_metadata(%s, %s, %s)=%s", wid, window, prop, v)
    21062126            metadata.update(v)
    21072127        log("new_window(%s, %s, %s, %s, %s, %s, %s, %s) metadata(%s)=%s", ptype, window, wid, x, y, w, h, client_properties, send_props, metadata)
    2108         self.send(ptype, wid, x, y, w, h, metadata, client_properties or {})
     2128        self.send_async(ptype, wid, x, y, w, h, metadata, client_properties or {})
    21092129        if send_raw_icon:
    21102130            self.send_window_icon(wid, window)
    21112131
     
    21552175    def raise_window(self, wid, window):
    21562176        if not self.can_send_window(window):
    21572177            return
    2158         self.send("raise-window", wid)
     2178        self.send_async("raise-window", wid)
    21592179
    21602180    def remove_window(self, wid, window):
    21612181        """ The given window is gone, ensure we free all the related resources """
     
    22942314        self.statistics.compression_work_qsizes.append((monotonic_time(), self.encode_work_queue.qsize()))
    22952315        self.encode_work_queue.put(fn_and_args)
    22962316
    2297     def queue_packet(self, packet, wid=0, pixels=0, start_send_cb=None, end_send_cb=None):
     2317    def queue_packet(self, packet, wid=0, pixels=0, start_send_cb=None, end_send_cb=None, fail_cb=None):
    22982318        """
    22992319            Add a new 'draw' packet to the 'packet_queue'.
    23002320            Note: this code runs in the non-ui thread
     
    23032323        self.statistics.packet_qsizes.append((now, len(self.packet_queue)))
    23042324        if wid>0:
    23052325            self.statistics.damage_packet_qpixels.append((now, wid, sum(x[2] for x in list(self.packet_queue) if x[1]==wid)))
    2306         self.packet_queue.append((packet, wid, pixels, start_send_cb, end_send_cb))
     2326        self.packet_queue.append((packet, wid, pixels, start_send_cb, end_send_cb, fail_cb))
    23072327        p = self.protocol
    23082328        if p:
    23092329            p.source_has_more()
  • xpra/server/window/window_source.py

     
    17261726            now = monotonic_time()
    17271727            damage_in_latency = now-process_damage_time
    17281728            self.statistics.damage_in_latency.append((now, width*height, actual_batch_delay, damage_in_latency))
    1729         self.queue_packet(packet, self.wid, width*height, start_send, damage_packet_sent)
     1729        fail_cb = self.get_fail_cb(packet)
     1730        #log.info("queuing %s packet with fail_cb=%s", coding, fail_cb)
     1731        self.queue_packet(packet, self.wid, width*height, start_send, damage_packet_sent, fail_cb)
    17301732
     1733    def get_fail_cb(self, packet):
     1734        def resend():
     1735            log.warn("paint packet failure, resending")
     1736            x,y,width,height = packet[2:6]
     1737            damage_packet_sequence = packet[8]
     1738            self.damage_packet_acked(damage_packet_sequence, width, height, 0, "")
     1739            self.idle_add(self.damage, x, y, width, height)
     1740        return resend
     1741
    17311742    def damage_packet_acked(self, damage_packet_sequence, width, height, decode_time, message):
    17321743        """
    17331744            The client is acknowledging a damage packet,
  • xpra/server/window/window_video_source.py

     
    14261426        return {}
    14271427
    14281428
     1429    def get_fail_cb(self, packet):
     1430        coding = packet[6]
     1431        if coding in self.common_video_encodings:
     1432            return None
     1433        return WindowSource.get_fail_cb(self, packet)
     1434
     1435
    14291436    def make_draw_packet(self, x, y, w, h, coding, data, outstride, client_options={}, options={}):
    14301437        #overriden so we can invalidate the scroll data:
    14311438        #log.error("make_draw_packet%s", (x, y, w, h, coding, "..", outstride, client_options)