xpra icon
Bug tracker and wiki

Ticket #639: udp-v12.patch

File udp-v12.patch, 66.7 KB (added by Antoine Martin, 3 years ago)

more aggressive control packet scheduling, calculate which chunks should have arrived, option to test missing the first packet - which requires more tricky udp-control packet scheduling

  • xpra/client/client_base.py

     
    232232        raise NotImplementedError()
    233233
    234234    def setup_connection(self, conn):
    235         netlog("setup_connection(%s) timeout=%s, socktype=%s", conn, conn.timeout, conn.socktype)
    236         self._protocol = Protocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
     235        netlog.info("setup_connection(%s) timeout=%s, socktype=%s", conn, conn.timeout, conn.socktype)
     236        if conn.socktype=="udp":
     237            from xpra.net.udp_protocol import UDPClientProtocol
     238            self._protocol = UDPClientProtocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
     239            #use a random uuid:
     240            import random
     241            self._protocol.uuid = random.randint(0, 2**64-1)
     242            self.set_packet_handlers(self._packet_handlers, {
     243                "udp-control"   : self._process_udp_control,
     244                })
     245        else:
     246            self._protocol = Protocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
    237247        self._protocol.large_packets.append("keymap-changed")
    238248        self._protocol.large_packets.append("server-settings")
    239249        self._protocol.large_packets.append("logging")
     
    254264            getChildReaper().add_process(proc, name, command, ignore=True, forget=False)
    255265        netlog("setup_connection(%s) protocol=%s", conn, self._protocol)
    256266
     267    def _process_udp_control(self, packet):
     268        self._protocol.process_control(*packet[1:])
    257269
     270
    258271    def remove_packet_handlers(self, *keys):
    259272        for k in keys:
    260273            for d in (self._packet_handlers, self._ui_packet_handlers):
     
    440453            p.source_has_more()
    441454
    442455    def next_packet(self):
     456        netlog("next_packet() packets in queues: priority=%i, ordinary=%i, mouse=%s", len(self._priority_packets), len(self._ordinary_packets), bool(self._mouse_position))
     457        synchronous = True
    443458        if self._priority_packets:
    444459            packet = self._priority_packets.pop(0)
    445460        elif self._ordinary_packets:
     
    446461            packet = self._ordinary_packets.pop(0)
    447462        elif self._mouse_position is not None:
    448463            packet = self._mouse_position
     464            synchronous = False
    449465            self._mouse_position = None
    450466        else:
    451467            packet = None
     
    452468        has_more = packet is not None and \
    453469                (bool(self._priority_packets) or bool(self._ordinary_packets) \
    454470                 or self._mouse_position is not None)
    455         return packet, None, None, has_more
     471        return packet, None, None, None, synchronous, has_more
    456472
    457473
    458474    def cleanup(self):
     
    815831        if not p or not p.enable_encoder_from_caps(c):
    816832            return False
    817833        p.enable_compressor_from_caps(c)
     834        p.accept()
    818835        return True
    819836
    820837    def parse_encryption_capabilities(self):
     
    867884        self.quit(EXIT_PACKET_FAILURE)
    868885
    869886
    870     def process_packet(self, _proto, packet):
     887    def process_packet(self, proto, packet):
    871888        try:
    872889            handler = None
    873890            packet_type = packet[0]
  • xpra/client/gtk_base/gtk_client_base.py

     
    468468               "configure.pointer"      : True,
    469469               "frame_sizes"            : self.get_window_frame_sizes()
    470470               })
    471         from xpra.client.window_backing_base import DELTA_BUCKETS
    472471        updict(capabilities, "encoding", {
    473472                    "icons.greedy"      : True,         #we don't set a default window icon any more
    474473                    "icons.size"        : (64, 64),     #size we want
    475474                    "icons.max_size"    : (128, 128),   #limit
    476                     "delta_buckets"     : DELTA_BUCKETS,
    477475                    })
     476        from xpra.client import window_backing_base
     477        if self._protocol._conn.socktype=="udp":
     478            #lossy protocol means we can't use delta regions:
     479            log("no delta buckets with udp, since we can drop paint packets")
     480            window_backing_base.DELTA_BUCKETS = 0
     481        updict(capabilities, "encoding", {
     482                    "delta_buckets"     : window_backing_base.DELTA_BUCKETS,
     483                    })
    478484        return capabilities
    479485
    480486
  • xpra/clipboard/clipboard_base.py

     
    236236        synchronous_client = len(packet)>=11 and bool(packet[10])
    237237        proxy.got_token(targets, target_data, claim, synchronous_client)
    238238
    239     def _get_clipboard_from_remote_handler(self, proxy, selection, target):
     239    def _get_clipboard_from_remote_handler(self, _proxy, selection, target):
    240240        if must_discard(target):
    241241            log("invalid target '%s'", target)
    242242            return None
  • xpra/log.py

     
    241241                ("protocol"     , "Packet input and output (formatting, parsing, sending and receiving)"),
    242242                ("websocket"    , "WebSocket layer"),
    243243                ("named-pipe"   , "Named pipe"),
     244                ("udp"          , "UDP"),
    244245                ("crypto"       , "Encryption"),
    245246                ("auth"         , "Authentication"),
    246247                ])),
  • xpra/net/bytestreams.py

     
    277277            i = s
    278278        log("%s.close() for socket=%s", self, i)
    279279        Connection.close(self)
     280        #meaningless for udp:
     281        try:
     282            s.settimeout(0)
     283        except:
     284            pass
    280285        #this is more proper but would break the proxy server:
    281286        #s.shutdown(socket.SHUT_RDWR)
    282287        s.close()
     
    305310        s = self._socket
    306311        if not s:
    307312            return None
    308         return {
    309                 #"class"         : str(type(s)),
    310                 "fileno"        : s.fileno(),
    311                 "timeout"       : int(1000*(s.gettimeout() or 0)),
     313        info = {
    312314                "family"        : FAMILY_STR.get(s.family, s.family),
    313315                "proto"         : s.proto,
    314316                "type"          : PROTOCOL_STR.get(s.type, s.type),
    315317                }
     318        try:
     319            info["timeout"] = int(1000*(s.gettimeout() or 0))
     320        except:
     321            pass
     322        try:
     323            info["fileno"] = s.fileno()
     324        except:
     325            pass
     326        return info
    316327
    317328try:
    318329    #this wrapper class allows us to override the normal ssl.Socket
  • xpra/net/protocol.py

     
    165165        self.enable_compressor(self.compressor)
    166166        self.enable_encoder(self.encoder)
    167167
     168
    168169    def wait_for_io_threads_exit(self, timeout=None):
    169170        io_threads = [x for x in (self._read_thread, self._write_thread) if x is not None]
    170171        for t in io_threads:
     
    205206    def get_threads(self):
    206207        return  [x for x in [self._write_thread, self._read_thread, self._read_parser_thread, self._write_format_thread] if x is not None]
    207208
     209    def accept(self):
     210        pass
    208211
     212
    209213    def get_info(self, alias_info=True):
    210214        info = {
    211215            "large_packets"         : self.large_packets,
     
    311315                return
    312316            self._internal_error("error in network packet write/format", e, exc_info=True)
    313317
    314     def _add_packet_to_queue(self, packet, start_send_cb=None, end_send_cb=None, has_more=False):
     318    def _add_packet_to_queue(self, packet, start_send_cb=None, end_send_cb=None, fail_cb=None, synchronous=True, has_more=False):
    315319        if has_more:
    316320            self._source_has_more.set()
    317321        if packet is None:
     
    322326            if self._closed:
    323327                return
    324328            try:
    325                 self._add_chunks_to_queue(chunks, start_send_cb, end_send_cb)
     329                self._add_chunks_to_queue(chunks, start_send_cb, end_send_cb, fail_cb, synchronous)
    326330            except:
    327331                log.error("Error: failed to queue '%s' packet", packet[0])
    328                 log("add_chunks_to_queue%s", (chunks, start_send_cb, end_send_cb), exc_info=True)
     332                log("add_chunks_to_queue%s", (chunks, start_send_cb, end_send_cb, fail_cb), exc_info=True)
    329333                raise
    330334
    331     def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None):
     335    def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None, fail_cb=None, synchronous=True):
    332336        """ the write_lock must be held when calling this function """
    333337        counter = 0
    334338        items = []
    335339        for proto_flags,index,level,data in chunks:
    336             scb, ecb = None, None
    337             #fire the start_send_callback just before the first packet is processed:
    338             if counter==0:
    339                 scb = start_send_cb
    340             #fire the end_send callback when the last packet (index==0) makes it out:
    341             if index==0:
    342                 ecb = end_send_cb
    343340            payload_size = len(data)
    344341            actual_size = payload_size
    345342            if self.cipher_out:
     
    360357                assert not self.cipher_out
    361358                #for plain/text packets (ie: gibberish response)
    362359                log("sending %s bytes without header", payload_size)
    363                 items.append((data, scb, ecb))
     360                items.append(data)
    364361            elif actual_size<PACKET_JOIN_SIZE:
    365                 if type(data) not in JOIN_TYPES:
     362                if not isinstance(data, JOIN_TYPES):
    366363                    data = memoryview_to_bytes(data)
    367364                header_and_data = pack_header(proto_flags, level, index, payload_size) + data
    368365                items.append(header_and_data)
     
    371368                items.append(header)
    372369                items.append(data)
    373370            counter += 1
    374         if self._write_thread is None:
    375             self.start_write_thread()
    376         self._write_queue.put((items, scb, ecb))
    377         self.output_packetcount += 1
     371        self.raw_write(items, start_send_cb, end_send_cb, fail_cb, synchronous)
    378372
    379373    def start_write_thread(self):
    380374        self._write_thread = start_thread(self._write_thread_loop, "write", daemon=True)
    381375
    382     def raw_write(self, contents, start_cb=None, end_cb=None):
     376    def raw_write(self, items, start_cb=None, end_cb=None, fail_cb=None, synchronous=True):
    383377        """ Warning: this bypasses the compression and packet encoder! """
    384378        if self._write_thread is None:
    385379            self.start_write_thread()
    386         self._write_queue.put(((contents, start_cb, end_cb), ))
     380        self._write_queue.put((items, start_cb, end_cb, fail_cb, synchronous))
    387381
     382
    388383    def verify_packet(self, packet):
    389384        """ look for None values which may have caused the packet to fail encoding """
    390385        if type(packet)!=list:
     
    604599            return False
    605600        return self.write_items(*items)
    606601
    607     def write_items(self, buf_data, start_cb=None, end_cb=None):
     602    def write_items(self, buf_data, start_cb=None, end_cb=None, fail_cb=None, synchronous=True):
    608603        con = self._conn
    609604        if not con:
    610605            return False
     
    614609            except:
    615610                if not self._closed:
    616611                    log.error("Error on write start callback %s", start_cb, exc_info=True)
    617         self.write_buffers(buf_data)
     612        self.write_buffers(buf_data, fail_cb, synchronous)
    618613        if end_cb:
    619614            try:
    620615                end_cb(self._conn.output_bytecount)
     
    623618                    log.error("Error on write end callback %s", end_cb, exc_info=True)
    624619        return True
    625620
    626     def write_buffers(self, buf_data):
     621    def write_buffers(self, buf_data, _fail_cb, _synchronous):
    627622        con = self._conn
    628623        if not con:
    629624            return 0
     
    967962                    if wait_for_packet_sent():
    968963                        #check again every 100ms
    969964                        self.timeout_add(100, wait_for_packet_sent)
    970                 self._add_chunks_to_queue(chunks, start_send_cb=None, end_send_cb=packet_queued)
     965                self._add_chunks_to_queue(chunks, start_send_cb=None, end_send_cb=packet_queued, synchronous=False)
    971966                #just in case wait_for_packet_sent never fires:
    972967                self.timeout_add(5*1000, close_and_release)
    973968
  • xpra/net/subprocess_wrapper.py

     
    428428            item = self.send_queue.get(False)
    429429        except:
    430430            item = None
    431         return (item, None, None, self.send_queue.qsize()>0)
     431        return (item, None, None, None, False, self.send_queue.qsize()>0)
    432432
    433433    def send(self, *packet_data):
    434434        self.send_queue.put(packet_data)
  • xpra/net/udp_protocol.py

     
     1# This file is part of Xpra.
     2# Copyright (C) 2017 Antoine Martin <antoine@devloop.org.uk>
     3# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
     4# later version. See the file COPYING for details.
     5
     6import socket
     7import struct
     8import random
     9try:
     10    import errno
     11    EMSGSIZE = errno.EMSGSIZE
     12except ImportError as e:
     13    EMSGSIZE = None
     14
     15from xpra.log import Logger
     16log = Logger("network", "protocol", "udp")
     17
     18from xpra.os_util import LINUX, monotonic_time, memoryview_to_bytes
     19from xpra.util import envint, repr_ellipsized
     20from xpra.make_thread import start_thread
     21from xpra.net.protocol import Protocol
     22from xpra.net.bytestreams import SocketConnection
     23
     24READ_BUFFER_SIZE = envint("XPRA_READ_BUFFER_SIZE", 65536)
     25DROP_PCT = envint("XPRA_UDP_DROP_PCT", 0)
     26DROP_FIRST = envint("XPRA_UDP_DROP_FIRST", 0)
     27
     28
     29#UUID, seqno, synchronous, chunk, chunks
     30_header_struct = struct.Struct('!QQHHH')
     31_header_size = _header_struct.size
     32
     33
     34class IncompletePacket(object):
     35    def __init__(self, seqno, start_time, chunks=None):
     36        self.seqno = seqno
     37        self.start_time = start_time
     38        self.last_time = start_time
     39        self.chunk_gap = 0
     40        #todo: use numpy array of bytes
     41        self.chunks = chunks
     42    def __repr__(self):
     43        return ("IncompletePacket(%i: %s chunks)" % (self.seqno, len(self.chunks or [])))
     44
     45
     46class UDPListener(object):
     47    """
     48        This class is used by servers to receive UDP packets,
     49        it parses the header and then exposes the data received via process_packet_cb.
     50    """
     51
     52    def __init__(self, sock, process_packet_cb):
     53        assert sock is not None
     54        self._closed = False
     55        self._socket = sock
     56        self._process_packet_cb =  process_packet_cb
     57        self._read_thread = start_thread(self._read_thread_loop, "read", daemon=True)
     58
     59    def __repr__(self):
     60        return "UDPListener(%s)" % self._socket
     61
     62    def _read_thread_loop(self):
     63        log.info("udp read thread loop starting")
     64        try:
     65            while not self._closed:
     66                buf, bfrom = self._socket.recvfrom(READ_BUFFER_SIZE)
     67                if not buf:
     68                    log("read thread: eof")
     69                    break
     70                values = list(_header_struct.unpack_from(buf[:_header_size])) + [buf[_header_size:], bfrom]
     71                try:
     72                    self._process_packet_cb(self, *values)
     73                except Exception as e:
     74                    log("_read_thread_loop() buffer=%s, from=%s", repr_ellipsized(buf), bfrom, exc_info=True)
     75                    if not self._closed:
     76                        log.error("Error: UDP packet processing error:")
     77                        log.error(" %s", e)
     78        except Exception as e:
     79            #can happen during close(), in which case we just ignore:
     80            if not self._closed:
     81                log.error("Error: read on %s failed: %s", self._socket, type(e), exc_info=True)
     82        log("udp read thread loop ended")
     83        self.close()
     84
     85    def close(self):
     86        s = self._socket
     87        log("UDPListener.close() closed=%s, socket=%s", self._closed, s)
     88        if self._closed:
     89            return
     90        self._closed = True
     91        if s:
     92            try:
     93                log("Protocol.close() calling %s", s.close)
     94                s.close()
     95            except:
     96                log.error("error closing %s", s, exc_info=True)
     97            self._socket = None
     98        log("UDPListener.close() done")
     99
     100
     101class UDPProtocol(Protocol):
     102    """
     103        This class extends the Protocol class with UDP encapsulation.
     104        A single packet may end up being fragmented into multiple UDP frames
     105        to fit in the MTU.
     106        We keep track of the function which can be used to handle send failures
     107        (or the packet data if no function is supplied).
     108        "udp-control" packets are used to synchronize both ends.
     109    """
     110
     111    def __init__(self, *args):
     112        Protocol.__init__(self, *args)
     113        self.mtu = 0
     114        self.last_sequence = -1     #the most recent packet sequence we processed in full
     115        self.highest_sequence = -1
     116        self.jitter = 20            #20ms
     117        self.uuid = 0
     118        self.fail_cb = {}
     119        self.resend_cache = {}
     120        self.incomplete_packets = {}
     121        self.can_skip = set()       #processed already, or cancelled
     122        self.cancel = set()         #tell the other end to forget those
     123        self.control_timer = None
     124        self.control_timer_due = 0
     125        self.asynchronous_enabled = False
     126        self._process_read = self.process_read
     127        self.enable_encoder("bencode")
     128
     129    def close(self):
     130        Protocol.close(self)
     131        self.cancel_control_timer()
     132
     133    def accept(self):
     134        log("accept() enabling asynchronous packet handling")
     135        self.asynchronous_enabled = True
     136
     137
     138    def schedule_control(self, delay=1000):
     139        due = monotonic_time()+delay/1000.0
     140        #log("schedule_control(%i) due=%s, current due=%s", delay, due, self.control_timer_due)
     141        if self.control_timer_due and self.control_timer_due<=due:
     142            #due already
     143            return
     144        ct = self.control_timer
     145        if ct:
     146            self.source_remove(ct)
     147        self.control_timer = self.timeout_add(delay, self.send_control)
     148        self.control_timer_due = due
     149
     150    def cancel_control_timer(self):
     151        ct = self.control_timer
     152        if ct:
     153            self.control_timer = None
     154            self.source_remove(ct)
     155
     156    def send_control(self):
     157        self.control_timer = None
     158        self.control_timer_due = 0
     159        if self._closed:
     160            return False
     161        missing = self._get_missing()
     162        packet = ("udp-control", self.mtu, self.last_sequence, self.highest_sequence, missing, tuple(self.cancel))
     163        log("send_control() packet(%s)=%s", self.incomplete_packets, packet)
     164        self.do_send_async(packet, False)
     165        self.cancel = set()
     166        self.schedule_control()
     167        return False
     168
     169    def _get_missing(self):
     170        """ the packets and chunks we are missing """
     171        if not self.incomplete_packets:
     172            return {}
     173        now = monotonic_time()
     174        max_time = now-self.jitter/1000.0
     175        missing = {}
     176        for seqno, ip in self.incomplete_packets.items():
     177            start = ip.start_time
     178            if start>=max_time:
     179                continue        #too recent, may still arrive
     180            missing_chunks = []     #by default, we don't know what is missing
     181            if ip.chunks is not None:
     182                #we have some chunks already,
     183                #so we know how many we are expecting in total,
     184                #and which ones should have arrived by now
     185                chunks = [i for i,x in enumerate(ip.chunks) if x is None]
     186                if not chunks:
     187                    continue
     188                #re-use the chunk_gap calculated previously,
     189                #so re-sent chunks don't skew the value!
     190                chunk_gap = ip.chunk_gap
     191                if chunk_gap==0:
     192                    highest = max(chunks)
     193                    if highest>0:
     194                        chunk_gap = (ip.last_time - start) / highest
     195                        ip.chunk_gap = chunk_gap
     196                for index in chunks:
     197                    #when should it have been received
     198                    eta = start + chunk_gap*index
     199                    if eta<=max_time:
     200                        missing_chunks.append(index)
     201                if not missing_chunks:
     202                    #nothing is overdue yet, so don't request anything:
     203                    continue
     204            missing[seqno] = missing_chunks
     205        return missing
     206
     207    def process_control(self, mtu, last_seq, high_seq, missing, cancel):
     208        log("process_control(%i, %i, %i, %s, %s) current seq=%i", mtu, last_seq, high_seq, missing, cancel, self.output_packetcount)
     209        con = self._conn
     210        if not con:
     211            return
     212        if mtu and self.mtu==0:
     213            self.mtu = mtu
     214        #first, we can free all the packets that have been processed by the other end:
     215        if last_seq>=0:
     216            done = [x for x in self.fail_cb.keys() if x<=last_seq]
     217            for x in done:
     218                try:
     219                    del self.fail_cb[x]
     220                except:
     221                    pass
     222            done = [x for x in self.resend_cache.keys() if x<=last_seq]
     223            for x in done:
     224                try:
     225                    del self.resend_cache[x]
     226                except:
     227                    pass
     228        #next we can forget about sequence numbers that have been cancelled:
     229        if cancel:
     230            for seqno in cancel:
     231                if seqno>self.last_sequence:
     232                    self.can_skip.add(seqno)
     233                try:
     234                    del self.incomplete_packets[seqno]
     235                except:
     236                    pass
     237            #we may now be able to move forward a bit:
     238            if self.incomplete_packets and (self.last_sequence+1) in self.can_skip:
     239                self.process_incomplete()
     240        #re-send the missing ones:
     241        for seqno, missing_chunks in missing.items():
     242            resend_cache = self.resend_cache.get(seqno)
     243            fail_cb_seq = self.fail_cb.get(seqno)
     244            if fail_cb_seq is None and not resend_cache:
     245                log("cannot resend packet sequence %i - assuming we cancelled it already", seqno)
     246                #hope for the best, and tell the other end to stop asking:
     247                self.cancel.add(seqno)
     248                continue
     249            if len(missing_chunks)==0:
     250                #the other end only knows it is missing the seqno,
     251                #not how many chunks are missing, so send them all
     252                missing_chunks = resend_cache.keys()
     253            if fail_cb_seq:
     254                log("fail_cb[%i]=%s, missing_chunks=%s, len(resend_cache)=%i", seqno, repr_ellipsized(str(fail_cb_seq)), missing_chunks, len(resend_cache))
     255                #we have a fail callback for this packet,
     256                #we have to decide if we send the missing chunks or use the callback,
     257                #resend if the other end is missing less than 25% of the chunks:
     258                #TODO: if the latency is low, resending becomes cheaper..
     259                if len(missing_chunks)>=len(resend_cache)//4:
     260                    #too many are missing, forget about it
     261                    try:
     262                        del self.resend_cache[seqno]
     263                    except:
     264                        pass
     265                    try:
     266                        del self.fail_cb[seqno]
     267                    except:
     268                        pass
     269                    self.cancel.add(seqno)
     270                    fail_cb_seq()
     271                    continue
     272            for c in missing_chunks:
     273                data = resend_cache.get(c)
     274                log("resend data[%i][%i]=%s", seqno, c, repr_ellipsized(str(data)))
     275                if data is None:
     276                    log.error("Error: cannot resend chunk %i of packet sequence %i", c, seqno)
     277                    log.error(" data missing from packet resend cache")
     278                    continue
     279                #send it again:
     280                #TODO: if the mtu is now lower, we should re-send the whole packet,
     281                # with the new chunk size..
     282                con.write(data)
     283        #make sure we eventually keep telling the client it has packets to catch up on:
     284        if high_seq<self.output_packetcount:
     285            self.schedule_control()
     286
     287    def send_async(self, packet):
     288        sync = False
     289        if not self.asynchronous_enabled:
     290            sync = True
     291        self.do_send_async(packet, sync)
     292
     293    def do_send_async(self, packet, sync=False):
     294        #log("do_send_async(%s, %s) encoder=%s, compressor=%s", packet, sync, self._encoder, self._compress)
     295        log("do_send_async(%s, %s)", packet, sync)
     296        chunks = self.encode(packet)
     297        if len(chunks)>1:
     298            return Protocol.send_now(packet)
     299        proto_flags,index,level,data = chunks[0]
     300        from xpra.net.header import pack_header
     301        payload_size = len(data)
     302        header_and_data = pack_header(proto_flags, level, index, payload_size) + data
     303        with self._write_lock:
     304            if self._write_thread is None:
     305                self.start_write_thread()
     306            self._write_queue.put((header_and_data, None, None, None, sync))
     307
     308    def process_udp_data(self, uuid, seqno, synchronous, chunk, chunks, data, _bfrom):
     309        #log("process_udp_data%s %i bytes", (uuid, seqno, synchronous, chunk, chunks, repr_ellipsized(data), bfrom), len(data))
     310        assert uuid==self.uuid
     311        if seqno<=self.last_sequence:
     312            log("skipping duplicate packet %5i.%i", seqno, chunk)
     313            return
     314        global DROP_FIRST, DROP_PCT
     315        if DROP_FIRST>0 and seqno==0 and chunk==0:
     316            DROP_FIRST -= 1
     317            log.warn("Warning: dropping first udp packet %5i.%i (%i more times)", seqno, chunk, DROP_FIRST)
     318            return
     319        if DROP_PCT>0:
     320            if random.randint(0, 100) <= DROP_PCT:
     321                log.warn("Warning: dropping udp packet %5i.%i", seqno, chunk)
     322                return
     323        self.highest_sequence = max(self.highest_sequence, seqno)
     324        if self.incomplete_packets or (synchronous and seqno!=self.last_sequence+1) or chunk!=0 or chunks!=1:
     325            assert chunk>=0 and chunks>0 and chunk<chunks, "invalid chunk: %i/%i" % (chunk, chunks)
     326            #slow path: add chunk to incomplete packet
     327            now = monotonic_time()
     328            ip = self.incomplete_packets.get(seqno)
     329            if not ip or not ip.chunks:
     330                chunks_array = [None for _ in range(chunks)]
     331                ip = IncompletePacket(seqno, now, chunks_array)
     332                self.incomplete_packets[seqno] = ip
     333            else:
     334                ip.last_time = now
     335            ip.chunks[chunk] = data
     336            if seqno!=self.last_sequence+1:
     337                #we're waiting for a packet and this is not it,
     338                #make sure any gaps are marked as incomplete:
     339                for i in range(self.last_sequence+1, seqno):
     340                    if i not in self.incomplete_packets and i not in self.can_skip:
     341                        self.incomplete_packets[i] = IncompletePacket(i, now)
     342                #make sure we request the missing packets:
     343                self.schedule_control(self.jitter)
     344                if synchronous:
     345                    #we have to wait for the missing chunks / packets
     346                    log("process_udp_data: we're waiting for %i, not %i", self.last_sequence+1, seqno)
     347                    return
     348            if any(x is None for x in ip.chunks):
     349                #one of the chunks is still missing
     350                log("process_udp_data: sequence %i, got chunk %i but still missing: %s", seqno, chunk, [i for i,x in enumerate(ip.chunks) if x is None])
     351                self.schedule_control(self.jitter)
     352                return
     353            #all the data is here!
     354            del self.incomplete_packets[seqno]
     355            data = b"".join(ip.chunks)
     356        log("process_udp_data: adding packet sequence %5i to read queue (got chunk %i, synchronous=%s)", seqno, chunk, synchronous)
     357        if seqno==self.last_sequence+1:
     358            self.last_sequence = seqno
     359        else:
     360            assert not synchronous
     361            self.can_skip.add(seqno)
     362        self._read_queue_put(data)
     363        #if self.incomplete_packets or (seqno+1) in self.can_skip:
     364        self.process_incomplete()
     365
     366    def process_incomplete(self):
     367        #maybe we can send the next one(s) now?
     368        seqno = self.last_sequence
     369        log("process_incomplete() last_sequence=%i, can skip=%s", seqno, self.can_skip)
     370        while True:
     371            seqno += 1
     372            if seqno in self.can_skip:
     373                try:
     374                    del self.incomplete_packets[seqno]
     375                except KeyError:
     376                    pass
     377                self.can_skip.remove(seqno)
     378                self.last_sequence = seqno
     379                continue
     380            ip = self.incomplete_packets.get(seqno)
     381            if not ip or not ip.chunks:
     382                #it's missing, we just don't know how many chunks
     383                return
     384            if any(x is None for x in ip.chunks):
     385                #one of the chunks is still missing
     386                return
     387            #all the data is here!
     388            del self.incomplete_packets[seqno]
     389            data = b"".join(ip.chunks)
     390            log("process_incomplete: adding packet sequence %i to read queue", seqno)
     391            self.last_sequence = seqno
     392            self._read_queue_put(data)
     393
     394
     395    def write_buffers(self, buf_data, fail_cb, synchronous):
     396        buf = b"".join(memoryview_to_bytes(x) for x in buf_data)
     397        #if not isinstance(buf, JOIN_TYPES):
     398        #    buf = memoryview_to_bytes(buf)
     399        while True:
     400            try:
     401                seqno = self.output_packetcount
     402                return self.send_buf(seqno, buf, fail_cb, synchronous)
     403            except MTUExceeded as e:
     404                log.warn("%s: %s", e, self.mtu)
     405                if self.mtu>576:
     406                    self.mtu //= 2
     407                raise
     408
     409    def send_buf(self, seqno, data, fail_cb, synchronous):
     410        con = self._conn
     411        if not con:
     412            return 0
     413        #TODO: bump to 1280 for IPv6
     414        #mtu = max(576, self.mtu)
     415        mtu = max(1280, self.mtu)
     416        l = len(data)
     417        maxpayload = mtu-_header_size
     418        chunks = l // maxpayload
     419        if l % maxpayload > 0:
     420            chunks += 1
     421        #log("UDP.send_buf(%s, %i bytes, %s, %s) seq=%i, mtu=%s, maxpayload=%i, chunks=%i, data=%s", con, l, fail_cb, synchronous, seqno, mtu, maxpayload, chunks, repr_ellipsized(data))
     422        chunk = 0
     423        offset = 0
     424        if fail_cb:
     425            self.fail_cb[seqno] = fail_cb
     426        chunk_resend_cache = self.resend_cache.setdefault(seqno, {})
     427        while offset<l:
     428            assert chunk<chunks
     429            pl = min(maxpayload, l-offset)
     430            data_chunk = data[offset:offset+pl]
     431            udp_data = _header_struct.pack(self.uuid, seqno, synchronous, chunk, chunks) + data_chunk
     432            assert len(udp_data)<=mtu, "invalid payload size: %i greater than mtu %i" % (len(udp_data), mtu)
     433            con.write(udp_data)
     434            self.output_raw_packetcount += 1
     435            offset += pl
     436            if chunk_resend_cache is not None:
     437                chunk_resend_cache[chunk] = udp_data
     438            chunk += 1
     439        assert chunk==chunks, "wrote %i chunks but expected %i" % (chunk, chunks)
     440        self.output_packetcount += 1
     441        if not self.control_timer:
     442            self.schedule_control()
     443        return offset
     444
     445
     446    def get_info(self, alias_info=True):
     447        i = Protocol.get_info(self, alias_info)
     448        i["mtu"] = self.mtu
     449        return i
     450
     451
     452class UDPServerProtocol(UDPProtocol):
     453
     454    def _read_thread_loop(self):
     455        #server protocol is not used to read,
     456        #we rely on the listener to dispatch packets instead
     457        pass
     458
     459class UDPClientProtocol(UDPProtocol):
     460
     461    def con_write(self, data, fail_cb):
     462        """ After successfully writing some data, update the mtu value """
     463        r = UDPProtocol.con_write(self, data, fail_cb)
     464        if r>0 and LINUX:
     465            IP_MTU = 14
     466            con = self._conn
     467            if con:
     468                try:
     469                    self.mtu = min(32767, con._socket.getsockopt(socket.IPPROTO_IP, IP_MTU))
     470                    #log("mtu=%s", self.mtu)
     471                except IOError as e:
     472                    pass
     473        return r
     474
     475    def process_read(self, buf):
     476        """ Splits and parses the UDP frame header from the packet """
     477        #log.info("UDPClientProtocol.read_queue_put(%s)", repr_ellipsized(buf))
     478        uuid, seqno, synchronous, chunk, chunks = _header_struct.unpack_from(buf[:_header_size])
     479        data = buf[_header_size:]
     480        bfrom = None        #not available here..
     481        self.process_udp_data(uuid, seqno, synchronous, chunk, chunks, data, bfrom)
     482
     483
     484class UDPSocketConnection(SocketConnection):
     485    """
     486        This class extends SocketConnection to use socket.sendto
     487        to send data to the correct destination.
     488        (servers use a single socket to talk to multiple clients,
     489        they do not call connect() and so we have to specify the remote target every time)
     490    """
     491
     492    def __init__(self, *args):
     493        SocketConnection.__init__(self, *args)
     494
     495    def write(self, buf):
     496        #log("UDPSocketConnection: sending %i bytes to %s", len(buf), self.remote)
     497        try:
     498            return self._socket.sendto(buf, self.remote)
     499        except IOError as e:
     500            if e.errno==EMSGSIZE:
     501                raise MTUExceeded("invalid UDP payload size, cannot send %i bytes: %s" % (len(buf), e))
     502            raise
     503
     504    def close(self):
     505        """
     506            don't close the socket, we're don't own it
     507        """
     508        pass
     509
     510class MTUExceeded(IOError):
     511    pass
  • xpra/scripts/config.py

     
    437437                    "auth"              : str,
    438438                    "vsock-auth"        : str,
    439439                    "tcp-auth"          : str,
     440                    "udp-auth"          : str,
    440441                    "ws-auth"           : str,
    441442                    "wss-auth"          : str,
    442443                    "ssl-auth"          : str,
     
    594595                    "bind"              : list,
    595596                    "bind-vsock"        : list,
    596597                    "bind-tcp"          : list,
     598                    "bind-udp"          : list,
    597599                    "bind-ws"           : list,
    598600                    "bind-wss"          : list,
    599601                    "bind-ssl"          : list,
     
    615617    "start-after-connect", "start-child-after-connect",
    616618    "start-on-connect", "start-child-on-connect",
    617619    ]
    618 BIND_OPTIONS = ["bind", "bind-tcp", "bind-ssl", "bind-ws", "bind-wss", "bind-vsock"]
     620BIND_OPTIONS = ["bind", "bind-tcp", "bind-udp", "bind-ssl", "bind-ws", "bind-wss", "bind-vsock"]
    619621
    620622#keep track of the options added since v1,
    621623#so we can generate command lines that work with older supported versions:
     
    679681    "av-sync", "global-menus",
    680682    "printing", "file-transfer", "open-command", "open-files", "start-new-commands",
    681683    "mmap", "mmap-group", "mdns",
    682     "auth", "vsock-auth", "tcp-auth", "ws-auth", "wss-auth", "ssl-auth", "rfb-auth",
    683     "bind", "bind-vsock", "bind-tcp", "bind-ssl", "bind-ws", "bind-wss", "bind-rfb",
     684    "auth", "vsock-auth", "tcp-auth", "udp-auth", "ws-auth", "wss-auth", "ssl-auth", "rfb-auth",
     685    "bind", "bind-vsock", "bind-tcp", "bind-udp", "bind-ssl", "bind-ws", "bind-wss", "bind-rfb",
    684686    "start", "start-child",
    685687    "start-after-connect", "start-child-after-connect",
    686688    "start-on-connect", "start-child-on-connect",
     
    799801                    "auth"              : "",
    800802                    "vsock-auth"        : "",
    801803                    "tcp-auth"          : "",
     804                    "udp-auth"          : "",
    802805                    "ws-auth"           : "",
    803806                    "wss-auth"          : "",
    804807                    "ssl-auth"          : "",
     
    946949                    "bind"              : bind_dirs,
    947950                    "bind-vsock"        : [],
    948951                    "bind-tcp"          : [],
     952                    "bind-udp"          : [],
    949953                    "bind-ws"           : [],
    950954                    "bind-wss"          : [],
    951955                    "bind-ssl"          : [],
  • xpra/scripts/main.py

     
    534534                          metavar="[HOST]:[PORT]",
    535535                          help="Listen for connections over TCP (use --tcp-auth to secure it)."
    536536                            + " You may specify this option multiple times with different host and port combinations")
     537        group.add_option("--bind-udp", action="append",
     538                          dest="bind_udp", default=list(defaults.bind_udp or []),
     539                          metavar="[HOST]:[PORT]",
     540                          help="Listen for connections over UDP (use --udp-auth to secure it)."
     541                            + " You may specify this option multiple times with different host and port combinations")
    537542        group.add_option("--bind-ws", action="append",
    538543                          dest="bind_ws", default=list(defaults.bind_ws or []),
    539544                          metavar="[HOST]:[PORT]",
     
    558563        ignore({
    559564            "bind"      : defaults.bind,
    560565            "bind-tcp"  : defaults.bind_tcp,
     566            "bind-udp"  : defaults.bind_udp,
    561567            "bind-ws"   : defaults.bind_ws,
    562568            "bind-wss"  : defaults.bind_wss,
    563569            "bind-ssl"  : defaults.bind_ssl,
     
    974980    group.add_option("--tcp-auth", action="store",
    975981                      dest="tcp_auth", default=defaults.tcp_auth,
    976982                      help="The authentication module to use for TCP sockets (default: '%default')")
     983    group.add_option("--udp-auth", action="store",
     984                      dest="udp_auth", default=defaults.udp_auth,
     985                      help="The authentication module to use for UDP sockets (default: '%default')")
    977986    group.add_option("--ws-auth", action="store",
    978987                      dest="ws_auth", default=defaults.ws_auth,
    979988                      help="The authentication module to use for Websockets (default: '%default')")
     
    11631172    #    elif tcp_encryption:
    11641173    #        raise InitException("tcp-encryption %s should not use the same file as the password authentication file" % tcp_encryption)
    11651174
    1166 def dump_frames(*arsg):
     1175def dump_frames(*_args):
    11671176    frames = sys._current_frames()
    11681177    print("")
    11691178    print("found %s frames:" % len(frames))
     
    12411250
    12421251    #register posix signals for debugging:
    12431252    if POSIX:
    1244         def sigusr1(*args):
     1253        def sigusr1(*_args):
    12451254            dump_frames()
    12461255        signal.signal(signal.SIGUSR1, sigusr1)
    12471256
     
    16861695        if opts.socket_dir:
    16871696            desc["socket_dir"] = opts.socket_dir
    16881697        return desc
    1689     elif display_name.startswith("tcp:") or display_name.startswith("tcp/") or \
    1690             display_name.startswith("ssl:") or display_name.startswith("ssl/"):
    1691         ctype = display_name[:3]        #ie: "ssl" or "tcp"
    1692         separator = display_name[3]     # ":" or "/"
     1698    elif (
     1699        display_name.startswith("tcp:") or display_name.startswith("tcp/") or \
     1700        display_name.startswith("ssl:") or display_name.startswith("ssl/") or \
     1701        display_name.startswith("udp:") or display_name.startswith("udp/")
     1702        ):
     1703        ctype = display_name[:3]                #ie: "ssl" or "tcp"
     1704        separator = display_name[len(ctype)]    # ":" or "/"
    16931705        desc.update({
    16941706                     "type"     : ctype,
    16951707                     })
     
    20732085        from xpra.net.bytestreams import SocketConnection
    20742086        return SocketConnection(sock, "local", "host", (CID_TYPES.get(cid, cid), iport), dtype)
    20752087
    2076     elif dtype in ("tcp", "ssl", "ws", "wss"):
     2088    elif dtype in ("tcp", "ssl", "ws", "wss", "udp"):
    20772089        if display_desc.get("ipv6"):
    20782090            assert socket.has_ipv6, "no IPv6 support"
    20792091            family = socket.AF_INET6
     
    20892101                socket.AF_INET  : "IPv4",
    20902102                }.get(family, family), (host, port), e))
    20912103        sockaddr = addrinfo[0][-1]
    2092         sock = socket.socket(family, socket.SOCK_STREAM)
    2093         sock.settimeout(SOCKET_TIMEOUT)
    2094         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, TCP_NODELAY)
     2104        if dtype=="udp":
     2105            opts.mmap = False
     2106            sock = socket.socket(family, socket.SOCK_DGRAM)
     2107        else:
     2108            sock = socket.socket(family, socket.SOCK_STREAM)
     2109            sock.settimeout(SOCKET_TIMEOUT)
     2110            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, TCP_NODELAY)
    20952111        strict_host_check = display_desc.get("strict-host-check")
    20962112        if strict_host_check is False:
    20972113            opts.ssl_server_verify_mode = "none"
     
    24012417                from xpra.codecs.loader import encodings_help
    24022418                encodings = ["auto"] + app.get_encodings()
    24032419                raise InitInfo(info+"%s xpra client supports the following encodings:\n * %s" % (app.client_toolkit(), "\n * ".join(encodings_help(encodings))))
    2404         def handshake_complete(*args):
     2420        def handshake_complete(*_args):
    24052421            from xpra.log import Logger
    24062422            log = Logger()
    24072423            log.info("Attached to %s (press Control-C to detach)\n", conn.target)
  • xpra/scripts/server.py

     
    4444    _cleanups.append(f)
    4545
    4646
    47 def deadly_signal(signum, frame):
     47def deadly_signal(signum, _frame):
    4848    info("got deadly signal %s, exiting\n" % SIGNAMES.get(signum, signum))
    4949    run_cleanups()
    5050    # This works fine in tests, but for some reason if I use it here, then I
     
    357357    if opts.encoding=="help" or "help" in opts.encodings:
    358358        return show_encoding_help(opts)
    359359
    360     from xpra.server.socket_util import parse_bind_tcp, parse_bind_vsock
    361     bind_tcp = parse_bind_tcp(opts.bind_tcp)
    362     bind_ssl = parse_bind_tcp(opts.bind_ssl)
    363     bind_ws = parse_bind_tcp(opts.bind_ws)
    364     bind_wss = parse_bind_tcp(opts.bind_wss)
    365     bind_rfb = parse_bind_tcp(opts.bind_rfb)
     360    from xpra.server.socket_util import parse_bind_ip, parse_bind_vsock
     361    bind_tcp = parse_bind_ip(opts.bind_tcp)
     362    bind_udp = parse_bind_ip(opts.bind_udp)
     363    bind_ssl = parse_bind_ip(opts.bind_ssl)
     364    bind_ws  = parse_bind_ip(opts.bind_ws)
     365    bind_wss = parse_bind_ip(opts.bind_wss)
     366    bind_rfb = parse_bind_ip(opts.bind_rfb)
    366367    bind_vsock = parse_bind_vsock(opts.bind_vsock)
    367368
    368369    assert mode in ("start", "start-desktop", "upgrade", "shadow", "proxy")
     
    589590            cpaths = csv("'%s'" % x for x in (opts.ssl_cert, opts.ssl_key) if x)
    590591            raise InitException("cannot create SSL socket, check your certificate paths (%s): %s" % (cpaths, e))
    591592
     593    from xpra.server.socket_util import setup_tcp_socket, setup_udp_socket, setup_vsock_socket, setup_local_sockets
    592594    def add_mdns(socktype, host, port):
    593595        recs = mdns_recs.setdefault(socktype.lower(), [])
    594596        rec = (host, port)
     
    598600        socket = setup_tcp_socket(host, iport, socktype)
    599601        sockets.append(socket)
    600602        add_mdns(socktype, host, iport)
    601 
     603    def add_udp_socket(socktype, host, iport):
     604        socket = setup_udp_socket(host, iport, socktype)
     605        sockets.append(socket)
     606        add_mdns(socktype, host, iport)
    602607    # Initialize the TCP sockets before the display,
    603608    # That way, errors won't make us kill the Xvfb
    604609    # (which may not be ours to kill at that point)
    605     from xpra.server.socket_util import setup_tcp_socket, setup_vsock_socket, setup_local_sockets
    606610    netlog("setting up SSL sockets: %s", bind_ssl)
    607611    for host, iport in bind_ssl:
    608612        add_tcp_socket("SSL", host, iport)
     
    615619        add_tcp_socket("tcp", host, iport)
    616620        if tcp_ssl:
    617621            add_mdns("ssl", host, iport)
     622    netlog("setting up UDP sockets: %s", bind_udp)
     623    for host, iport in bind_udp:
     624        add_udp_socket("udp", host, iport)
    618625    netlog("setting up http / ws (websockets): %s", bind_ws)
    619626    for host, iport in bind_ws:
    620627        add_tcp_socket("ws", host, iport)
  • xpra/server/server_core.py

     
    151151        #networking bits:
    152152        self._socket_info = []
    153153        self._potential_protocols = []
     154        self._udp_listeners = []
     155        self._udp_protocols = {}
    154156        self._tcp_proxy_clients = []
    155157        self._tcp_proxy = ""
    156         self._ssl_wrap_socket = None
     158        self._ssl_wrap_server_socket = None
     159        self._ssl_wrap_client_socket = None
    157160        self._accept_timeout = SOCKET_TIMEOUT + 1
    158161        self.ssl_mode = None
    159162        self._html = False
     
    285288            self.auth_classes["named-pipes"] = auth
    286289        else:
    287290            self.auth_classes["unix-domain"] = auth
    288         for x in ("tcp", "ws", "wss", "ssl", "rfb", "vsock"):
     291        for x in ("tcp", "ws", "wss", "ssl", "rfb", "vsock", "udp"):
    289292            opts_value = getattr(opts, "%s_auth" % x)
    290293            self.auth_classes[x] = self.get_auth_module(x, opts_value, opts)
    291294        authlog("init_auth(..) auth=%s", self.auth_classes)
     
    377380        self._default_packet_handlers = {
    378381            "hello":                                self._process_hello,
    379382            "disconnect":                           self._process_disconnect,
     383            "udp-control":                          self._process_udp_control,
    380384            Protocol.CONNECTION_LOST:               self._process_connection_lost,
    381385            Protocol.GIBBERISH:                     self._process_gibberish,
    382386            Protocol.INVALID:                       self._process_invalid,
     
    532536        self.do_cleanup()
    533537        self.cleanup_protocols(protocols, reason, True)
    534538        self._potential_protocols = []
     539        self.cleanup_udp_listeners()
    535540
    536541    def do_cleanup(self):
    537542        #allow just a bit of time for the protocol packet flush
     
    538543        sleep(0.1)
    539544
    540545
     546    def cleanup_udp_listeners(self):
     547        for udpl in self._udp_listeners:
     548            udpl.close()
     549        self._udp_listeners = []
     550
    541551    def cleanup_all_protocols(self, reason):
    542552        protocols = self.get_all_protocols()
    543553        self.cleanup_protocols(protocols, reason)
     
    563573            #named pipe listener uses a thread:
    564574            sock.new_connection_cb = self._new_connection
    565575            sock.start()
     576        elif socktype=="udp":
     577            #socket_info = self.socket_info.get(sock)
     578            from xpra.net.udp_protocol import UDPListener
     579            udpl = UDPListener(sock, self.process_udp_packet)
     580            self._udp_listeners.append(udpl)
    566581        else:
    567582            from xpra.gtk_common.gobject_compat import import_glib
    568583            glib = import_glib()
     
    979994
    980995    def verify_connection_accepted(self, protocol):
    981996        if self.is_timedout(protocol):
    982             log.error("connection timedout: %s", protocol)
     997            info = getattr(protocol, "_conn", protocol)
     998            log.error("connection timedout: %s", info)
    983999            self.send_disconnect(protocol, LOGIN_TIMEOUT)
    9841000
    9851001    def send_disconnect(self, proto, *reasons):
     
    10341050            if not proto._closed:
    10351051                self._log_disconnect(proto, "Connection lost")
    10361052            self._potential_protocols.remove(proto)
     1053        #remove from UDP protocol map:
     1054        uuid = getattr(proto, "uuid", None)
     1055        if uuid and uuid in self._udp_protocols:
     1056            del self._udp_protocols[uuid]
    10371057
    10381058    def _process_gibberish(self, proto, packet):
    10391059        (_, message, data) = packet
     
    12391259            self.disconnect_client(proto, SERVER_ERROR, "error accepting new connection")
    12401260
    12411261    def hello_oked(self, proto, _packet, c, _auth_caps):
     1262        proto.accept()
    12421263        ctr = c.strget("connect_test_request")
    12431264        if ctr:
    12441265            response = {"connect_test_response" : ctr}
     
    14621483    def handle_rfb_connection(self, conn):
    14631484        log.error("Error: RFB protocol is not supported by this server")
    14641485        conn.close()
     1486
     1487
     1488    def _process_udp_control(self, proto, packet):
     1489        proto.process_control(*packet[1:])
     1490
     1491    def process_udp_packet(self, udp_listener, uuid, seqno, synchronous, chunk, chunks, data, bfrom):
     1492        #log.info("process_udp_packet%s", (udp_listener, uuid, seqno, synchronous, chunk, chunks, len(data), bfrom))
     1493        protocol = self._udp_protocols.get(uuid)
     1494        if not protocol:
     1495            from xpra.net.udp_protocol import UDPServerProtocol, UDPSocketConnection
     1496            def udp_protocol_class(conn):
     1497                protocol = UDPServerProtocol(self, conn, self.process_packet)
     1498                protocol.uuid = uuid
     1499                protocol.large_packets.append("info-response")
     1500                protocol.receive_aliases.update(self._aliases)
     1501                return protocol
     1502            socktype = "udp"
     1503            host, port = bfrom
     1504            sock = udp_listener._socket
     1505            sockname = sock.getsockname()
     1506            conn = UDPSocketConnection(sock, sockname, (host, port), (host, port), socktype)
     1507            conn.timeout = SOCKET_TIMEOUT
     1508            protocol = self.do_make_protocol(socktype, conn, udp_protocol_class)
     1509            self._udp_protocols[uuid] = protocol
     1510        else:
     1511            #update remote address in case the client is roaming:
     1512            conn = protocol._conn
     1513            if conn:
     1514                conn.remote = bfrom
     1515        protocol.process_udp_data(uuid, seqno, synchronous, chunk, chunks, data, bfrom)
  • xpra/server/socket_util.py

     
    108108        log("create_tcp_socket%s", (host, iport), exc_info=True)
    109109        raise InitException("failed to setup %s socket on %s:%s %s" % (socktype, host, iport, e))
    110110    def cleanup_tcp_socket():
    111         log.info("closing %s socket %s:%s", socktype, host, iport)
     111        log.info("closing %s socket %s:%s", socktype.lower(), host, iport)
    112112        try:
    113113            tcp_socket.close()
    114114        except:
     
    117117    log("%s: %s:%s : %s", socktype, host, iport, socket)
    118118    return socktype, tcp_socket, (host, iport)
    119119
     120def create_udp_socket(host, iport):
     121    if host.find(":")<0:
     122        listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
     123        sockaddr = (host, iport)
     124    else:
     125        assert socket.has_ipv6, "specified an IPv6 address but this is not supported"
     126        res = socket.getaddrinfo(host, iport, socket.AF_INET6, socket.SOCK_DGRAM)
     127        listener = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
     128        sockaddr = res[0][-1]
     129    listener.bind(sockaddr)
     130    return listener
    120131
    121 def parse_bind_tcp(bind_tcp):
    122     tcp_sockets = set()
    123     if bind_tcp:
    124         for spec in bind_tcp:
     132def setup_udp_socket(host, iport, socktype="udp"):
     133    from xpra.log import Logger
     134    log = Logger("network")
     135    try:
     136        udp_socket = create_udp_socket(host, iport)
     137    except Exception as e:
     138        log("create_udp_socket%s", (host, iport), exc_info=True)
     139        raise InitException("failed to setup %s socket on %s:%s %s" % (socktype, host, iport, e))
     140    def cleanup_udp_socket():
     141        log.info("closing %s socket %s:%s", socktype, host, iport)
     142        try:
     143            udp_socket.close()
     144        except:
     145            pass
     146    add_cleanup(cleanup_udp_socket)
     147    log("%s: %s:%s : %s", socktype, host, iport, socket)
     148    return socktype, udp_socket, (host, iport)
     149
     150
     151def parse_bind_ip(bind_ip):
     152    ip_sockets = set()
     153    if bind_ip:
     154        for spec in bind_ip:
    125155            if ":" not in spec:
    126                 raise InitException("TCP port must be specified as [HOST]:PORT")
     156                raise InitException("port must be specified as [HOST]:PORT")
    127157            host, port = spec.rsplit(":", 1)
    128158            if host == "":
    129159                host = "127.0.0.1"
     
    135165                    assert iport>0 and iport<2**16
    136166                except:
    137167                    raise InitException("invalid port number: %s" % port)
    138             tcp_sockets.add((host, iport))
    139     return tcp_sockets
     168            ip_sockets.add((host, iport))
     169    return ip_sockets
    140170
    141171def setup_vsock_socket(cid, iport):
    142172    from xpra.log import Logger
  • xpra/server/source.py

     
    11201120        self.update_av_sync_delay_total()
    11211121
    11221122    def new_sound_buffer(self, sound_source, data, metadata, packet_metadata=[]):
    1123         soundlog("new_sound_buffer(%s, %s, %s, %s) suspended=%s",
    1124                  sound_source, len(data or []), metadata, [len(x) for x in packet_metadata], self.suspended)
     1123        soundlog("new_sound_buffer(%s, %s, %s, %s) info=%s, suspended=%s",
     1124                 sound_source, len(data or []), metadata, [len(x) for x in packet_metadata], sound_source.info, self.suspended)
    11251125        if self.sound_source!=sound_source or self.is_closed():
    11261126            soundlog("sound buffer dropped: from old source or closed")
    11271127            return
     
    11371137            else:
    11381138                #the packet metadata is compressed already:
    11391139                packet_metadata = Compressed("packet metadata", packet_metadata, can_inline=True)
    1140         self.send_sound_data(sound_source, data, metadata, packet_metadata)
     1140        #don't drop the first 10 buffers
     1141        can_drop_packet = (sound_source.info or {}).get("buffer_count", 0)>10
     1142        self.send_sound_data(sound_source, data, metadata, packet_metadata, can_drop_packet)
    11411143
    1142     def send_sound_data(self, sound_source, data, metadata={}, packet_metadata=()):
     1144    def send_sound_data(self, sound_source, data, metadata={}, packet_metadata=(), can_drop_packet=False):
    11431145        packet_data = [sound_source.codec, Compressed(sound_source.codec, data), metadata]
    11441146        if packet_metadata:
    11451147            assert self.sound_bundle_metadata
    11461148            packet_data.append(packet_metadata)
    1147         if sound_source.sequence>=0:
    1148             metadata["sequence"] = sound_source.sequence
    1149         self.send("sound-data", *packet_data)
     1149        sequence = sound_source.sequence
     1150        if sequence>=0:
     1151            metadata["sequence"] = sequence
     1152        fail_cb = None
     1153        if can_drop_packet:
     1154            def sound_data_fail_cb():
     1155                #ideally we would tell gstreamer to send an audio "key frame"
     1156                #or synchronization point to ensure the stream recovers
     1157                soundlog("a sound data buffer was not received and will not be resent")
     1158            fail_cb = sound_data_fail_cb
     1159        self._send(fail_cb, False, "sound-data", *packet_data)
    11501160
    11511161    def stop_receiving_sound(self):
    11521162        ss = self.sound_sink
     
    13951405            return
    13961406        if self.mouse_last_position!=(x, y, rx, ry):
    13971407            self.mouse_last_position = (x, y, rx, ry)
    1398             self.send("pointer-position", wid, x, y, rx, ry)
     1408            self.send_async("pointer-position", wid, x, y, rx, ry)
    13991409
    14001410#
    14011411# Functions for interacting with the network layer:
     
    14021412#
    14031413    def next_packet(self):
    14041414        """ Called by protocol.py when it is ready to send the next packet """
    1405         packet, start_send_cb, end_send_cb, have_more = None, None, None, False
     1415        packet, start_send_cb, end_send_cb, fail_cb, synchronous, have_more = None, None, None, None, True, False
    14061416        if not self.is_closed():
    14071417            if len(self.ordinary_packets)>0:
    1408                 packet = self.ordinary_packets.pop(0)
     1418                packet, synchronous, fail_cb = self.ordinary_packets.pop(0)
    14091419            elif len(self.packet_queue)>0:
    1410                 packet, _, _, start_send_cb, end_send_cb = self.packet_queue.popleft()
     1420                packet, _, _, start_send_cb, end_send_cb, fail_cb = self.packet_queue.popleft()
    14111421            have_more = packet is not None and (len(self.ordinary_packets)>0 or len(self.packet_queue)>0)
    1412         return packet, start_send_cb, end_send_cb, have_more
     1422        return packet, start_send_cb, end_send_cb, fail_cb, synchronous, have_more
    14131423
    14141424    def send(self, *parts):
    14151425        """ This method queues non-damage packets (higher priority) """
    1416         self.ordinary_packets.append(parts)
     1426        self._send(None, True, *parts)
     1427
     1428    def send_async(self, *parts):
     1429        self._send(None, False, *parts)
     1430
     1431    def _send(self, fail_cb=None, synchronous=True, *parts):
     1432        """ This method queues non-damage packets (higher priority) """
     1433        #log.info("_send%s", (fail_cb, synchronous, parts))
    14171434        p = self.protocol
    14181435        if p:
     1436            self.ordinary_packets.append((parts, synchronous, fail_cb))
    14191437            p.source_has_more()
    14201438
     1439
    14211440#
    14221441# Functions used by the server to request something
    14231442# (window events, stats, user requests, etc)
     
    15101529        if self.last_ping_echoed_time>0:
    15111530            lpe = int(monotonic_time()*1000-self.last_ping_echoed_time)
    15121531        info = {
     1532                "protocol"          : "xpra",
    15131533                "version"           : self.client_version or "unknown",
    15141534                "revision"          : self.client_revision or "unknown",
    15151535                "platform_name"     : platform_name(self.client_platform, self.client_release),
     
    16911711            v = notypedict(info)
    16921712        else:
    16931713            v = flatten_dict(info)
    1694         self.send("info-response", v)
     1714        self.send_async("info-response", v)
    16951715
    16961716
    16971717    def send_server_event(self, *args):
     
    17021722    def send_clipboard_enabled(self, reason=""):
    17031723        if not self.hello_sent:
    17041724            return
    1705         self.send("set-clipboard-enabled", self.clipboard_enabled, reason)
     1725        self.send_async("set-clipboard-enabled", self.clipboard_enabled, reason)
    17061726
    17071727    def send_clipboard_progress(self, count):
    17081728        if not self.clipboard_notifications or not self.hello_sent:
     
    18231843    def bell(self, wid, device, percent, pitch, duration, bell_class, bell_id, bell_name):
    18241844        if not self.send_bell or self.suspended or not self.hello_sent:
    18251845            return
    1826         self.send("bell", wid, device, percent, pitch, duration, bell_class, bell_id, bell_name)
     1846        self.send_async("bell", wid, device, percent, pitch, duration, bell_class, bell_id, bell_name)
    18271847
    18281848    def notify(self, dbus_id, nid, app_name, replaces_nid, app_icon, summary, body, expire_timeout):
    18291849        if not self.send_notifications:
     
    18331853            notifylog("client %s is suspended, notification not sent", self)
    18341854            return False
    18351855        if self.hello_sent:
    1836             self.send("notify_show", dbus_id, int(nid), str(app_name), int(replaces_nid), str(app_icon), str(summary), str(body), int(expire_timeout))
     1856            self.send_async("notify_show", dbus_id, int(nid), str(app_name), int(replaces_nid), str(app_icon), str(summary), str(body), int(expire_timeout))
    18371857        return True
    18381858
    18391859    def notify_close(self, nid):
     
    18471867
    18481868    def send_webcam_ack(self, device, frame, *args):
    18491869        if self.hello_sent:
    1850             self.send("webcam-ack", device, frame, *args)
     1870            self.send_async("webcam-ack", device, frame, *args)
    18511871
    18521872    def send_webcam_stop(self, device, message):
    18531873        if self.hello_sent:
    1854             self.send("webcam-stop", device, message)
     1874            self.send_async("webcam-stop", device, message)
    18551875
    18561876
    18571877    def set_printers(self, printers, password_file, auth, encryption, encryption_keyfile):
     
    19721992        #NOTE: all ping time/echo time/load avg values are in milliseconds
    19731993        now_ms = int(1000*monotonic_time())
    19741994        log("sending ping to %s with time=%s", self.protocol, now_ms)
    1975         self.send("ping", now_ms)
     1995        self.send_async("ping", now_ms)
    19761996        timeout = 60
    19771997        def check_echo_timeout():
    19781998            if self.last_ping_echoed_time<now_ms and not self.is_closed():
     
    19922012            cl = int(1000.0*cl)
    19932013        else:
    19942014            cl = -1
    1995         self.send("ping_echo", time_to_echo, l1, l2, l3, cl)
     2015        self.send_async("ping_echo", time_to_echo, l1, l2, l3, cl)
    19962016        #if the client is pinging us, ping it too:
    19972017        self.timeout_add(500, self.ping)
    19982018
     
    20192039
    20202040    def show_desktop(self, show):
    20212041        if self.show_desktop_allowed and self.hello_sent:
    2022             self.send("show-desktop", show)
     2042            self.send_async("show-desktop", show)
    20232043
    20242044    def initiate_moveresize(self, wid, window, x_root, y_root, direction, button, source_indication):
    20252045        if not self.can_send_window(window) or not self.window_initiate_moveresize:
     
    20872107        metadata = {}
    20882108        for propname in list(window.get_property_names()):
    20892109            metadata.update(self._make_metadata(window, propname))
    2090         self.send("new-tray", wid, w, h, metadata)
     2110        self.send_async("new-tray", wid, w, h, metadata)
    20912111
    20922112    def new_window(self, ptype, wid, window, x, y, w, h, client_properties):
    20932113        if not self.can_send_window(window):
     
    21052125                metalog("make_metadata(%s, %s, %s)=%s", wid, window, prop, v)
    21062126            metadata.update(v)
    21072127        log("new_window(%s, %s, %s, %s, %s, %s, %s, %s) metadata(%s)=%s", ptype, window, wid, x, y, w, h, client_properties, send_props, metadata)
    2108         self.send(ptype, wid, x, y, w, h, metadata, client_properties or {})
     2128        self.send_async(ptype, wid, x, y, w, h, metadata, client_properties or {})
    21092129        if send_raw_icon:
    21102130            self.send_window_icon(wid, window)
    21112131
     
    21552175    def raise_window(self, wid, window):
    21562176        if not self.can_send_window(window):
    21572177            return
    2158         self.send("raise-window", wid)
     2178        self.send_async("raise-window", wid)
    21592179
    21602180    def remove_window(self, wid, window):
    21612181        """ The given window is gone, ensure we free all the related resources """
     
    22942314        self.statistics.compression_work_qsizes.append((monotonic_time(), self.encode_work_queue.qsize()))
    22952315        self.encode_work_queue.put(fn_and_args)
    22962316
    2297     def queue_packet(self, packet, wid=0, pixels=0, start_send_cb=None, end_send_cb=None):
     2317    def queue_packet(self, packet, wid=0, pixels=0, start_send_cb=None, end_send_cb=None, fail_cb=None):
    22982318        """
    22992319            Add a new 'draw' packet to the 'packet_queue'.
    23002320            Note: this code runs in the non-ui thread
     
    23032323        self.statistics.packet_qsizes.append((now, len(self.packet_queue)))
    23042324        if wid>0:
    23052325            self.statistics.damage_packet_qpixels.append((now, wid, sum(x[2] for x in list(self.packet_queue) if x[1]==wid)))
    2306         self.packet_queue.append((packet, wid, pixels, start_send_cb, end_send_cb))
     2326        self.packet_queue.append((packet, wid, pixels, start_send_cb, end_send_cb, fail_cb))
    23072327        p = self.protocol
    23082328        if p:
    23092329            p.source_has_more()
  • xpra/server/window/window_source.py

     
    836836                return x
    837837        return self.common_encodings[0]
    838838
    839     def get_auto_encoding(self, pixel_count, ww, wh, speed, quality, *args):
     839    def get_auto_encoding(self, pixel_count, ww, wh, speed, quality, *_args):
    840840        if pixel_count<self._rgb_auto_threshold:
    841841            return "rgb24"
    842842        if "png" in self.common_encodings and ((quality>=80 and speed<80) or self.image_depth<=16):
     
    845845            return "jpeg"
    846846        return [x for x in self.common_encodings if x!="rgb"][0]
    847847
    848     def get_current_or_rgb(self, pixel_count, ww, wh, speed, quality, *args):
     848    def get_current_or_rgb(self, pixel_count, ww, wh, speed, quality, *_args):
    849849        if pixel_count<self._rgb_auto_threshold:
    850850            return "rgb24"
    851851        return self.encoding
     
    17261726            now = monotonic_time()
    17271727            damage_in_latency = now-process_damage_time
    17281728            self.statistics.damage_in_latency.append((now, width*height, actual_batch_delay, damage_in_latency))
    1729         self.queue_packet(packet, self.wid, width*height, start_send, damage_packet_sent)
     1729        fail_cb = self.get_fail_cb(packet)
     1730        #log.info("queuing %s packet with fail_cb=%s", coding, fail_cb)
     1731        self.queue_packet(packet, self.wid, width*height, start_send, damage_packet_sent, fail_cb)
    17301732
     1733    def get_fail_cb(self, packet):
     1734        def resend():
     1735            log.warn("paint packet failure, resending")
     1736            x,y,width,height = packet[2:6]
     1737            damage_packet_sequence = packet[8]
     1738            self.damage_packet_acked(damage_packet_sequence, width, height, 0, "")
     1739            self.idle_add(self.damage, x, y, width, height)
     1740        return resend
     1741
    17311742    def damage_packet_acked(self, damage_packet_sequence, width, height, decode_time, message):
    17321743        """
    17331744            The client is acknowledging a damage packet,
  • xpra/server/window/window_video_source.py

     
    14261426        return {}
    14271427
    14281428
     1429    def get_fail_cb(self, packet):
     1430        coding = packet[6]
     1431        if coding in self.common_video_encodings:
     1432            return None
     1433        return WindowSource.get_fail_cb(self, packet)
     1434
     1435
    14291436    def make_draw_packet(self, x, y, w, h, coding, data, outstride, client_options={}, options={}):
    14301437        #overriden so we can invalidate the scroll data:
    14311438        #log.error("make_draw_packet%s", (x, y, w, h, coding, "..", outstride, client_options)