xpra icon
Bug tracker and wiki

This bug tracker and wiki are being discontinued
please use https://github.com/Xpra-org/xpra instead.


Ticket #639: udp-v6.patch

File udp-v6.patch, 89.4 KB (added by Antoine Martin, 4 years ago)

add some rfb refactoring

  • xpra/client/client_base.py

     
    232232        raise NotImplementedError()
    233233
    234234    def setup_connection(self, conn):
    235         netlog("setup_connection(%s) timeout=%s", conn, conn.timeout)
    236         self._protocol = Protocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
     235        netlog.info("setup_connection(%s) timeout=%s, socktype=%s", conn, conn.timeout, conn.socktype)
     236        if conn.socktype in ("udp", "dtls"):
     237            from xpra.net.udp_protocol import UDPClientProtocol
     238            self._protocol = UDPClientProtocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
     239            self.set_packet_handlers(self._packet_handlers, {
     240                "udp-control"   : self._process_udp_control,
     241                })
     242        else:
     243            self._protocol = Protocol(self.get_scheduler(), conn, self.process_packet, self.next_packet)
    237244        self._protocol.large_packets.append("keymap-changed")
    238245        self._protocol.large_packets.append("server-settings")
    239246        self._protocol.large_packets.append("logging")
     
    254261            getChildReaper().add_process(proc, name, command, ignore=True, forget=False)
    255262        netlog("setup_connection(%s) protocol=%s", conn, self._protocol)
    256263
     264    def _process_udp_control(self, packet):
     265        self._protocol.process_control(*packet[1:])
    257266
     267
    258268    def remove_packet_handlers(self, *keys):
    259269        for k in keys:
    260270            for d in (self._packet_handlers, self._ui_packet_handlers):
     
    440450            p.source_has_more()
    441451
    442452    def next_packet(self):
     453        netlog("next_packet() packets in queues: priority=%i, ordinary=%i, mouse=%s", len(self._priority_packets), len(self._ordinary_packets), bool(self._mouse_position))
    443454        if self._priority_packets:
    444455            packet = self._priority_packets.pop(0)
    445456        elif self._ordinary_packets:
     
    452463        has_more = packet is not None and \
    453464                (bool(self._priority_packets) or bool(self._ordinary_packets) \
    454465                 or self._mouse_position is not None)
    455         return packet, None, None, has_more
     466        return packet, None, None, None, has_more
    456467
    457468
    458469    def cleanup(self):
  • xpra/client/gtk_base/gtk_client_window_base.py

     
    550550        def do_set_shape():
    551551            xid = get_xid(self.get_window())
    552552            x_off, y_off = shape.get("x", 0), shape.get("y", 0)
    553             for kind, name in SHAPE_KIND.items():
     553            for kind, name in SHAPE_KIND.items():       #@UndefinedVariable
    554554                rectangles = shape.get("%s.rectangles" % name)      #ie: Bounding.rectangles = [(0, 0, 150, 100)]
    555555                if rectangles:
    556556                    #adjust for scaling:
  • xpra/client/ui_client_base.py

     
    3939webcamlog = Logger("webcam")
    4040notifylog = Logger("notify")
    4141cursorlog = Logger("cursor")
     42netlog = Logger("network")
    4243
    4344
    4445from xpra.gtk_common.gobject_util import no_arg_signal
     
    16971698        if FAKE_BROKEN_CONNECTION>0:
    16981699            self._server_ok = (int(monotonic_time()) % FAKE_BROKEN_CONNECTION) <= (FAKE_BROKEN_CONNECTION//2)
    16991700        else:
    1700             self._server_ok = not FAKE_BROKEN_CONNECTION and self.last_ping_echoed_time>=ping_sent_time
    1701         log("check_server_echo(%s) last=%s, server_ok=%s", ping_sent_time, last, self._server_ok)
     1701            self._server_ok = self.last_ping_echoed_time>=ping_sent_time
     1702        log("check_server_echo(%s) last=%s, server_ok=%s (last_ping_echoed_time=%s)", ping_sent_time, last, self._server_ok, self.last_ping_echoed_time)
    17021703        if last!=self._server_ok and not self._server_ok:
    17031704            log.info("server is not responding, drawing spinners over the windows")
    17041705            def timer_redraw():
     
    17241725                w.spinner(ok)
    17251726
    17261727    def check_echo_timeout(self, ping_time):
    1727         log("check_echo_timeout(%s) last_ping_echoed_time=%s", ping_time, self.last_ping_echoed_time)
     1728        netlog("check_echo_timeout(%s) last_ping_echoed_time=%s", ping_time, self.last_ping_echoed_time)
    17281729        if self.last_ping_echoed_time<ping_time:
    17291730            #no point trying to use disconnect_and_quit() to tell the server here..
    17301731            self.warn_and_quit(EXIT_TIMEOUT, "server ping timeout - waited %s seconds without a response" % PING_TIMEOUT)
     
    17381739            l = [x for _,x in list(self.server_ping_latency)]
    17391740            avg = sum(l) / len(l)
    17401741            wait = min(5, 1.0+avg*2.0)
    1741             log("average server latency=%.1f, using max wait %.2fs", 1000.0*avg, wait)
     1742            netlog("send_ping() timestamp=%s, average server latency=%.1f, using max wait %.2fs", now_ms, 1000.0*avg, wait)
    17421743        self.timeout_add(int(1000.0*wait), self.check_server_echo, now_ms)
    17431744        return True
    17441745
     
    17511752        self.server_load = l1, l2, l3
    17521753        if cl>=0:
    17531754            self.client_ping_latency.append((monotonic_time(), cl/1000.0))
    1754         log("ping echo server load=%s, measured client latency=%sms", self.server_load, cl)
     1755        netlog("ping echo server load=%s, measured client latency=%sms", self.server_load, cl)
    17551756
    17561757    def _process_ping(self, packet):
    17571758        echotime = packet[1]
  • xpra/net/bytestreams.py

     
    9898            return f(*a, **kw)
    9999        except Exception as e:
    100100            retry = can_retry(e)
     101            log("untilConcludes", exc_info=True)
    101102            log("untilConcludes(%s, %s, %s, %s, %s) %s, retry=%s", is_active_cb, can_retry, f, a, kw, e, retry)
    102103            if retry:
    103104                if wait>0:
     
    277278            i = s
    278279        log("%s.close() for socket=%s", self, i)
    279280        Connection.close(self)
    280         s.settimeout(0)
     281        try:
     282            s.settimeout(0)
     283        except:
     284            pass
    281285        #this is more proper but would break the proxy server:
    282286        #s.shutdown(socket.SHUT_RDWR)
    283287        s.close()
     
    306310        s = self._socket
    307311        if not s:
    308312            return None
    309         return {
    310                 #"class"         : str(type(s)),
    311                 "fileno"        : s.fileno(),
    312                 "timeout"       : int(1000*(s.gettimeout() or 0)),
     313        info = {
    313314                "family"        : FAMILY_STR.get(s.family, s.family),
    314315                "proto"         : s.proto,
    315316                "type"          : PROTOCOL_STR.get(s.type, s.type),
    316317                }
     318        try:
     319            info["timeout"] = int(1000*(s.gettimeout() or 0))
     320        except:
     321            pass
     322        if hasattr(s, "fileno"):
     323            info["fileno"] = s.fileno()
     324        return info
    317325
    318 
    319326try:
    320327    #this wrapper class allows us to override the normal ssl.Socket
    321328    #class so that we can fake peek() support by actually reading from the socket
  • xpra/net/protocol.py

     
    5959    packet_encoding_sanity_checks()
    6060
    6161
     62def exit_queue():
     63    queue = Queue()
     64    for _ in range(10):     #just 2 should be enough!
     65        queue.put(None)
     66    return queue
     67
     68def force_flush_queue(q):
     69    try:
     70        #discard all elements in the old queue and push the None marker:
     71        try:
     72            while q.qsize()>0:
     73                q.read(False)
     74        except:
     75            pass
     76        q.put_nowait(None)
     77    except:
     78        pass
     79
    6280class Protocol(object):
     81    """
     82        This class handles sending and receiving packets,
     83        it will encode and compress them before sending,
     84        and decompress and decode when receiving.
     85    """
     86
    6387    CONNECTION_LOST = "connection-lost"
    6488    GIBBERISH = "gibberish"
    6589    INVALID = "invalid"
     
    7296        assert conn is not None
    7397        self.timeout_add = scheduler.timeout_add
    7498        self.idle_add = scheduler.idle_add
     99        self.source_remove = scheduler.source_remove
    75100        self._conn = conn
    76101        if FAKE_JITTER>0:
    77102            from xpra.net.fake_jitter import FakeJitter
     
    81106            self._process_packet_cb = process_packet_cb
    82107        self._write_queue = Queue(1)
    83108        self._read_queue = Queue(20)
     109        self._process_read = self.read_queue_put
    84110        self._read_queue_put = self.read_queue_put
    85111        # Invariant: if .source is None, then _source_has_more == False
    86112        self._get_packet_cb = get_packet_cb
     
    239265        if SEND_INVALID_PACKET:
    240266            self.timeout_add(SEND_INVALID_PACKET*1000, self.raw_write, SEND_INVALID_PACKET_DATA)
    241267
     268
     269    def send_disconnect(self, reason, *extra):
     270        self.send_now(["disconnect", reason]+list(extra))
     271
    242272    def send_now(self, packet):
    243273        if self._closed:
    244274            log("send_now(%s ...) connection is closed already, not sending", packet[0])
     
    279309                return
    280310            self._internal_error("error in network packet write/format", e, exc_info=True)
    281311
    282     def _add_packet_to_queue(self, packet, start_send_cb=None, end_send_cb=None, has_more=False):
     312    def _add_packet_to_queue(self, packet, start_send_cb=None, end_send_cb=None, fail_cb=None, has_more=False):
    283313        if has_more:
    284314            self._source_has_more.set()
    285315        if packet is None:
     
    290320            if self._closed:
    291321                return
    292322            try:
    293                 self._add_chunks_to_queue(chunks, start_send_cb, end_send_cb)
     323                self._add_chunks_to_queue(chunks, start_send_cb, end_send_cb, fail_cb)
    294324            except:
    295325                log.error("Error: failed to queue '%s' packet", packet[0])
    296                 log("add_chunks_to_queue%s", (chunks, start_send_cb, end_send_cb), exc_info=True)
     326                log("add_chunks_to_queue%s", (chunks, start_send_cb, end_send_cb, fail_cb), exc_info=True)
    297327                raise
    298328
    299     def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None):
     329    def _add_chunks_to_queue(self, chunks, start_send_cb=None, end_send_cb=None, fail_cb=None, synchronous=True):
    300330        """ the write_lock must be held when calling this function """
    301331        counter = 0
    302332        items = []
    303333        for proto_flags,index,level,data in chunks:
    304             scb, ecb = None, None
    305334            #fire the start_send_callback just before the first packet is processed:
    306             if counter==0:
    307                 scb = start_send_cb
    308335            #fire the end_send callback when the last packet (index==0) makes it out:
    309             if index==0:
    310                 ecb = end_send_cb
    311336            payload_size = len(data)
    312337            actual_size = payload_size
    313338            if self.cipher_out:
     
    328353                assert not self.cipher_out
    329354                #for plain/text packets (ie: gibberish response)
    330355                log("sending %s bytes without header", payload_size)
    331                 items.append((data, scb, ecb))
     356                items.append(data)
    332357            elif actual_size<PACKET_JOIN_SIZE:
    333                 if type(data) not in JOIN_TYPES:
     358                if not isinstance(data, JOIN_TYPES):
    334359                    data = memoryview_to_bytes(data)
    335360                header_and_data = pack_header(proto_flags, level, index, payload_size) + data
    336                 items.append((header_and_data, scb, ecb))
     361                items.append(header_and_data)
    337362            else:
    338363                header = pack_header(proto_flags, level, index, payload_size)
    339                 items.append((header, scb, None))
    340                 items.append((data, None, ecb))
     364                items.append(header)
     365                items.append(data)
    341366            counter += 1
    342367        if self._write_thread is None:
    343368            self.start_write_thread()
    344         self._write_queue.put(items)
    345         self.output_packetcount += 1
     369        self._write_queue.put((items, start_send_cb, end_send_cb, fail_cb, synchronous))
    346370
     371
    347372    def start_write_thread(self):
    348373        self._write_thread = start_thread(self._write_thread_loop, "write", daemon=True)
    349374
    350     def raw_write(self, contents, start_cb=None, end_cb=None):
     375    def raw_write(self, contents, start_cb=None, end_cb=None, fail_cb=None, synchronous=True):
    351376        """ Warning: this bypasses the compression and packet encoder! """
    352377        if self._write_thread is None:
    353378            self.start_write_thread()
    354         self._write_queue.put(((contents, start_cb, end_cb), ))
     379        self._write_queue.put((contents, start_cb, end_cb, fail_cb, synchronous))
    355380
    356381    def verify_packet(self, packet):
    357382        """ look for None values which may have caused the packet to fail encoding """
     
    539564        assert level>=0 and level<=10, "invalid compression level: %s (must be between 0 and 10" % level
    540565        self.compression_level = level
    541566
     567
    542568    def _io_thread_loop(self, name, callback):
    543569        try:
    544570            log("io_thread_loop(%s, %s) loop starting", name, callback)
     
    559585                log.error("Error: %s on %s failed: %s", name, self._conn, type(e), exc_info=True)
    560586                self.close()
    561587
     588
    562589    def _write_thread_loop(self):
    563590        self._io_thread_loop("write", self._write)
    564591    def _write(self):
     
    568595            log("write thread: empty marker, exiting")
    569596            self.close()
    570597            return False
    571         for buf, start_cb, end_cb in items:
    572             con = self._conn
    573             if not con:
    574                 return False
    575             if start_cb:
    576                 try:
    577                     start_cb(con.output_bytecount)
    578                 except:
    579                     if not self._closed:
    580                         log.error("Error on write start callback %s", start_cb, exc_info=True)
     598        return self.write_items(*items)
     599   
     600    def write_items(self, buf_data, start_cb=None, end_cb=None, fail_cb=None, synchronous=True):
     601        con = self._conn
     602        if not con:
     603            return False
     604        if start_cb:
     605            try:
     606                start_cb(con.output_bytecount)
     607            except:
     608                if not self._closed:
     609                    log.error("Error on write start callback %s", start_cb, exc_info=True)
     610        self.write_buffers(buf_data, fail_cb, synchronous)
     611        if end_cb:
     612            try:
     613                end_cb(self._conn.output_bytecount)
     614            except:
     615                if not self._closed:
     616                    log.error("Error on write end callback %s", end_cb, exc_info=True)
     617        return True
     618
     619    def write_buffers(self, buf_data, _fail_cb, _synchronous):
     620        con = self._conn
     621        if not con:
     622            return 0
     623        for buf in buf_data:
    581624            while buf and not self._closed:
    582625                written = con.write(buf)
    583626                #example test code, for sending small chunks very slowly:
     
    587630                if written:
    588631                    buf = buf[written:]
    589632                    self.output_raw_packetcount += 1
    590             if end_cb:
    591                 try:
    592                     end_cb(self._conn.output_bytecount)
    593                 except:
    594                     if not self._closed:
    595                         log.error("Error on write end callback %s", end_cb, exc_info=True)
    596         return True
     633        self.output_packetcount += 1
    597634
     635
    598636    def _read_thread_loop(self):
    599637        self._io_thread_loop("read", self._read)
    600638    def _read(self):
     
    601639        buf = self._conn.read(READ_BUFFER_SIZE)
    602640        #log("read thread: got data of size %s: %s", len(buf), repr_ellipsized(buf))
    603641        #add to the read queue (or whatever takes its place - see steal_connection)
    604         self._read_queue_put(buf)
     642        self._process_read(buf)
    605643        if not buf:
    606644            log("read thread: eof")
    607645            #give time to the parse thread to call close itself
     
    646684    def _invalid_header(self, data, msg=""):
    647685        self.invalid_header(self, data, msg)
    648686
    649     def invalid_header(self, proto, data, msg="invalid packet header"):
     687    def invalid_header(self, _proto, data, msg="invalid packet header"):
    650688        err = "%s: '%s'" % (msg, binascii.hexlify(data[:8]))
    651689        if len(data)>1:
    652690            err += " read buffer=%s (%i bytes)" % (repr_ellipsized(data), len(data))
     
    653691        self.gibberish(err, data)
    654692
    655693
     694    def process_read(self, data):
     695        self._read_queue_put(data)
     696
    656697    def read_queue_put(self, data):
    657698        #start the parse thread if needed:
    658699        if not self._read_parser_thread and not self._closed:
     
    660701                log("empty marker in read queue, exiting")
    661702                self.idle_add(self.close)
    662703                return
    663             self._read_parser_thread = make_thread(self._read_parse_thread_loop, "parse", daemon=True)
    664             self._read_parser_thread.start()
     704            self.start_read_parser_thread()
    665705        self._read_queue.put(data)
    666706        #from now on, take shortcut:
    667707        if self._read_queue_put==self.read_queue_put:
    668708            self._read_queue_put = self._read_queue.put
    669709
     710    def start_read_parser_thread(self):
     711        self._read_parser_thread = start_thread(self._read_parse_thread_loop, "parse", daemon=True)
     712
    670713    def _read_parse_thread_loop(self):
    671714        log("read_parse_thread_loop starting")
    672715        try:
     
    911954                        close_and_release()
    912955                        return False
    913956                    return not self._closed     #run until we manage to close (here or via the timeout)
    914                 def packet_queued(*args):
     957                def packet_queued(*_args):
    915958                    #if we're here, we have the lock and the packet is in the write queue
    916959                    log("flush_then_close: packet_queued() closed=%s", self._closed)
    917960                    if wait_for_packet_sent():
     
    952995        self.idle_add(self._process_packet_cb, self, [Protocol.CONNECTION_LOST])
    953996        c = self._conn
    954997        if c:
     998            self._conn = None
    955999            try:
    9561000                log("Protocol.close() calling %s", c.close)
    9571001                c.close()
    958                 if self._log_stats is None and self._conn.input_bytecount==0 and self._conn.output_bytecount==0:
     1002                if self._log_stats is None and c.input_bytecount==0 and c.output_bytecount==0:
    9591003                    #no data sent or received, skip logging of stats:
    9601004                    self._log_stats = False
    9611005                if self._log_stats:
    9621006                    from xpra.simple_stats import std_unit, std_unit_dec
    9631007                    log.info("connection closed after %s packets received (%s bytes) and %s packets sent (%s bytes)",
    964                          std_unit(self.input_packetcount), std_unit_dec(self._conn.input_bytecount),
    965                          std_unit(self.output_packetcount), std_unit_dec(self._conn.output_bytecount)
     1008                         std_unit(self.input_packetcount), std_unit_dec(c.input_bytecount),
     1009                         std_unit(self.output_packetcount), std_unit_dec(c.output_bytecount)
    9661010                         )
    9671011            except:
    968                 log.error("error closing %s", self._conn, exc_info=True)
    969             self._conn = None
     1012                log.error("error closing %s", c, exc_info=True)
    9701013        self.terminate_queue_threads()
    9711014        self.idle_add(self.clean)
    9721015        log("Protocol.close() done")
     
    10071050        self._get_packet_cb = None
    10081051        self._source_has_more.set()
    10091052        #make all the queue based threads exit by adding the empty marker:
    1010         exit_queue = Queue()
    1011         for _ in range(10):     #just 2 should be enough!
    1012             exit_queue.put(None)
    1013         try:
    1014             owq = self._write_queue
    1015             self._write_queue = exit_queue
    1016             #discard all elements in the old queue and push the None marker:
    1017             try:
    1018                 while owq.qsize()>0:
    1019                     owq.read(False)
    1020             except:
    1021                 pass
    1022             owq.put_nowait(None)
    1023         except:
    1024             pass
    1025         try:
    1026             orq = self._read_queue
    1027             self._read_queue = exit_queue
    1028             #discard all elements in the old queue and push the None marker:
    1029             try:
    1030                 while orq.qsize()>0:
    1031                     orq.read(False)
    1032             except:
    1033                 pass
    1034             orq.put_nowait(None)
    1035         except:
    1036             pass
     1053        #write queue:
     1054        owq = self._write_queue
     1055        self._write_queue = exit_queue()
     1056        force_flush_queue(owq)
     1057        #read queue:
     1058        orq = self._read_queue
     1059        self._read_queue = exit_queue()
     1060        force_flush_queue(orq)
    10371061        #just in case the read thread is waiting again:
    10381062        self._source_has_more.set()
  • xpra/net/rfb.py

     
    1313log = Logger("network", "protocol", "rfb")
    1414
    1515from xpra.os_util import Queue
    16 from xpra.util import repr_ellipsized, envint
     16from xpra.util import repr_ellipsized, envint, nonl
    1717from xpra.make_thread import make_thread, start_thread
     18from xpra.net.protocol import force_flush_queue, exit_queue
    1819from xpra.net.common import ConnectionClosedException          #@UndefinedVariable (pydev false positive)
    1920from xpra.net.bytestreams import ABORT
     21from xpra.net.rfb_const import RFBClientMessage, RFBAuth, PIXEL_FORMAT
    2022
    2123READ_BUFFER_SIZE = envint("XPRA_READ_BUFFER_SIZE", 65536)
    2224#merge header and packet if packet is smaller than:
    23 PIXEL_FORMAT = "BBBBHHHBBBBBB"
    2425
    25 RFB_SETPIXELFORMAT = 0
    26 RFB_SETENCODINGS = 2
    27 RFB_FRAMEBUFFERUPDATEREQUEST = 3
    28 RFB_KEYEVENT = 4
    29 RFB_POINTEREVENT = 5
    30 RFB_CLIENTCUTTEXT = 6
    31 PACKET_TYPE = {
    32     RFB_SETPIXELFORMAT              : "SetPixelFormat",
    33     RFB_SETENCODINGS                : "SetEncodings",
    34     RFB_FRAMEBUFFERUPDATEREQUEST    : "FramebufferUpdateRequest",
    35     RFB_KEYEVENT                    : "KeyEvent",
    36     RFB_POINTEREVENT                : "PointerEvent",
    37     RFB_CLIENTCUTTEXT               : "ClientCutText",
    38     }
    39 PACKET_FMT = {
    40     RFB_SETPIXELFORMAT              : "!BBBB"+PIXEL_FORMAT,
    41     RFB_SETENCODINGS                : "!BBH",
    42     RFB_FRAMEBUFFERUPDATEREQUEST    : "!BBHHHH",
    43     RFB_KEYEVENT                    : "!BBBBi",
    44     RFB_POINTEREVENT                : "!BBHH",
    45     RFB_CLIENTCUTTEXT               : "!BBBBi",
    46     }
    47 PACKET_STRUCT = {}
    48 for ptype, fmt in PACKET_FMT.items():
    49     PACKET_STRUCT[ptype] = struct.Struct(fmt)
    5026
    51 
    5227class RFBProtocol(object):
    5328    CONNECTION_LOST = "connection-lost"
    5429    INVALID = "invalid"
     
    8762        return len(packet)
    8863
    8964    def _parse_protocol_handshake(self, packet):
     65        log("parse_protocol_handshake(%s)", nonl(packet))
    9066        if len(packet)<12:
    9167            return 0
    9268        if not packet.startswith(b'RFB '):
     
    9571        #ie: packet==b'RFB 003.008\n'
    9672        self._protocol_version = tuple(int(x) for x in packet[4:11].split("."))
    9773        log.info("RFB version %s", b".".join(str(x) for x in self._protocol_version))
     74        if self._protocol_version!=(3, 8):
     75            msg = "unsupported protocol version"
     76            log.error("Error: %s", msg)
     77            self.send(struct.pack("!BI", 0, len(msg))+msg)
     78            self.invalid(msg, packet)
     79            return 0
    9880        #reply with Security Handshake:
    9981        self._packet_parser = self._parse_security_handshake
    100         self.send(struct.pack("BB", 1, 1))
     82        security_types = [RFBAuth.NONE]
     83        packet = struct.pack("B", len(security_types))
     84        for x in security_types:
     85            packet += struct.pack("B", x)
     86        self.send(packet)
    10187        return 12
    10288
    10389    def _parse_security_handshake(self, packet):
    104         if packet!=b"\1":
    105             self._invalid_header(packet, "invalid security handshake response")
     90        log("parse_security_handshake(%s)", binascii.hexlify(packet))
     91        try:
     92            auth = struct.unpack("B", packet)[0]
     93        except:
     94            self._internal_error(packet, "cannot parse security handshake response '%s'" % binascii.hexlify(packet))
    10695            return 0
     96        auth_str = RFBAuth.AUTH_STR.get(auth, auth)
     97        if auth!=RFBAuth.NONE:
     98            self._invalid_header(packet, "invalid security handshake response: %s" % auth_str)
     99            return 0
     100        log("parse_security_handshake: auth=%s, sending SecurityResult", auth_str)
    107101        #Security Handshake, send SecurityResult Handshake
    108102        self._packet_parser = self._parse_security_result
    109103        self.send(struct.pack("BBBB", 0, 0, 0, 0))
     
    110104        return 1
    111105
    112106    def _parse_security_result(self, packet):
    113         if packet!=b"\0":
    114             self._invalid_header(packet, "invalid security result")
    115             return 0
     107        log("parse_security_result(%s)", binascii.hexlify(packet))
     108        sharing  = packet != b"\0"
     109        log("_parse_security_result: sharing=%s, sending ClientInit", sharing)
    116110        #send ClientInit
    117111        self._packet_parser = self._parse_rfb
    118112        w, h, bpp, depth, bigendian, truecolor, rmax, gmax, bmax, rshift, bshift, gshift = self._get_rfb_pixelformat()
     
    126120            ptype = ord(packet[0])
    127121        except:
    128122            ptype = packet[0]
    129         packet_type = PACKET_TYPE.get(ptype)
     123        packet_type = RFBClientMessage.PACKET_TYPE_STR.get(ptype)
    130124        if not packet_type:
    131125            self.invalid("unknown RFB packet type: %#x" % ptype, packet)
    132126            return 0
    133         s = PACKET_STRUCT[ptype]        #ie: Struct("!BBBB")
     127        s = RFBClientMessage.PACKET_STRUCT.get(ptype)     #ie: Struct("!BBBB")
     128        if not s:
     129            self.invalid("RFB packet type '%s' is not supported" % packet_type, packet)
     130            return 0
    134131        if len(packet)<s.size:
    135132            return 0
    136133        size = s.size
     
    137134        values = list(s.unpack(packet[:size]))
    138135        values[0] = packet_type
    139136        #some packets require parsing extra data:
    140         if ptype==RFB_SETENCODINGS:
     137        if ptype==RFBClientMessage.SETENCODINGS:
    141138            N = values[2]
    142139            estruct = struct.Struct("!"+"i"*N)
    143140            size += estruct.size
     
    145142                return 0
    146143            encodings = estruct.unpack(packet[s.size:size])
    147144            values.append(encodings)
    148         elif ptype==RFB_CLIENTCUTTEXT:
     145        elif ptype==RFBClientMessage.CLIENTCUTTEXT:
    149146            l = values[4]
    150147            size += l
    151148            if len(packet)<size:
     
    153150            text = packet[s.size:size]
    154151            values.append(text)
    155152        self.input_packetcount += 1
    156         #log("RFB packet: %s", values)
     153        log("RFB packet: %s: %s", packet_type, values[1:])
    157154        #now trigger the callback:
    158155        self._process_packet_cb(self, values)
    159156        #return part of packet not consumed:
     
    193190                self._read_thread.start()
    194191        self.idle_add(start_network_read_thread)
    195192
     193
     194    def gibberish(self, msg, data):
     195        GIBBERISH = "gibberish"
     196        self.idle_add(self._process_packet_cb, self, [GIBBERISH, msg, data])
     197        # Then hang up:
     198        self.timeout_add(1000, self._connection_lost, msg)
     199
     200
     201    def send_disconnect(self, _reason, *_extra):
     202        #no such packet in RFB, just close
     203        self.close()
     204
     205
    196206    def send(self, packet):
    197207        if self._closed:
    198             log("send(%s ...) connection is closed already, not sending", packet[0])
     208            log("connection is closed already, not sending packet")
    199209            return
    200         log("send(%s ...)", packet[0])
     210        log("send(%i bytes: %s..)", len(packet), binascii.hexlify(packet[:16]))
    201211        with self._write_lock:
    202212            if self._closed:
    203213                return
     
    255265    def _read_thread_loop(self):
    256266        self._io_thread_loop("read", self._read)
    257267    def _read(self):
    258         buf = self._conn.read(READ_BUFFER_SIZE)
     268        c = self._conn
     269        if not c:
     270            return None
     271        buf = c.read(READ_BUFFER_SIZE)
    259272        #log("read()=%s", repr_ellipsized(buf))
    260273        if not buf:
    261274            log("read thread: eof")
     
    386399
    387400
    388401    def close(self):
    389         log("Protocol.close() closed=%s, connection=%s", self._closed, self._conn)
     402        log("RFBProtocol.close() closed=%s, connection=%s", self._closed, self._conn)
    390403        if self._closed:
    391404            return
    392405        self._closed = True
     
    394407        c = self._conn
    395408        if c:
    396409            try:
    397                 log("Protocol.close() calling %s", c.close)
     410                log("RFBProtocol.close() calling %s", c.close)
    398411                c.close()
    399412            except:
    400413                log.error("error closing %s", self._conn, exc_info=True)
     
    401414            self._conn = None
    402415        self.terminate_queue_threads()
    403416        self.idle_add(self.clean)
    404         log("Protocol.close() done")
     417        log("RFBProtocol.close() done")
    405418
    406419    def clean(self):
    407420        #clear all references to ensure we can get garbage collected quickly:
     
    412425    def terminate_queue_threads(self):
    413426        log("terminate_queue_threads()")
    414427        #make all the queue based threads exit by adding the empty marker:
    415         exit_queue = Queue()
    416         for _ in range(10):     #just 2 should be enough!
    417             exit_queue.put(None)
    418         try:
    419             owq = self._write_queue
    420             self._write_queue = exit_queue
    421             #discard all elements in the old queue and push the None marker:
    422             try:
    423                 while owq.qsize()>0:
    424                     owq.read(False)
    425             except:
    426                 pass
    427             owq.put_nowait(None)
    428         except:
    429             pass
    430         try:
    431             orq = self._read_queue
    432             self._read_queue = exit_queue
    433             #discard all elements in the old queue and push the None marker:
    434             try:
    435                 while orq.qsize()>0:
    436                     orq.read(False)
    437             except:
    438                 pass
    439             orq.put_nowait(None)
    440         except:
    441             pass
     428        owq = self._write_queue
     429        self._write_queue = exit_queue()
     430        force_flush_queue(owq)
  • xpra/net/rfb_const.py

     
     1# This file is part of Xpra.
     2# Copyright (C) 2017 Antoine Martin <antoine@devloop.org.uk>
     3# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
     4# later version. See the file COPYING for details.
     5
     6import struct
     7
     8#merge header and packet if packet is smaller than:
     9PIXEL_FORMAT = "BBBBHHHBBBBBB"
     10
     11
     12class RFBClientMessage(object):
     13    """ client to server messages """
     14    SETPIXELFORMAT = 0
     15    SETENCODINGS = 2
     16    FRAMEBUFFERUPDATEREQUEST = 3
     17    KEYEVENT = 4
     18    POINTEREVENT = 5
     19    CLIENTCUTTEXT = 6
     20    #optional:
     21    FILETRANSFER = 7
     22    SETSCALE = 8
     23    SETSERVERINPUT = 9
     24    SETSW = 10
     25    TEXTCHAT = 11
     26    KEYFRAMEREQUEST = 12
     27    KEEPALIVE = 13
     28    SETSCALEFACTOR = 15
     29    REQUESTSESSION = 20
     30    SETSESSION = 21
     31    NOTIFYPLUGINSTREAMING = 80
     32    VMWARE = 127
     33    CARCONNECTIVITY = 128
     34    ENABLECONTINUOUSUPDATES = 150
     35    CLIENTFENCE = 248
     36    OLIVECALLCONTROL = 249
     37    XVPCLIENTMESSAGE = 250
     38    SETDESKTOPSIZE = 251
     39    TIGHT = 252
     40    GIICLIENTMESSAGE = 253
     41    VMWARE = 254
     42    QEMUCLIENTMESSAGE = 255
     43
     44    PACKET_TYPE_STR = {
     45        SETPIXELFORMAT               : "SetPixelFormat",
     46        SETENCODINGS                 : "SetEncodings",
     47        FRAMEBUFFERUPDATEREQUEST     : "FramebufferUpdateRequest",
     48        KEYEVENT                     : "KeyEvent",
     49        POINTEREVENT                 : "PointerEvent",
     50        CLIENTCUTTEXT                : "ClientCutText",
     51        #optional:
     52        FILETRANSFER                 : "FileTransfer",
     53        SETSCALE                     : "SetScale",
     54        SETSERVERINPUT               : "SetServerInput",
     55        SETSW                        : "SetSW",
     56        TEXTCHAT                     : "TextChat",
     57        KEYFRAMEREQUEST              : "KeyFrameRequest",
     58        KEEPALIVE                    : "KeepAlive",
     59        SETSCALEFACTOR               : "SetScaleFactor",
     60        REQUESTSESSION               : "RequestSession",
     61        SETSESSION                   : "SetSession",
     62        NOTIFYPLUGINSTREAMING        : "NotifiyPluginStreaming",
     63        VMWARE                       : "VMWare",
     64        CARCONNECTIVITY              : "CarConnectivity",
     65        ENABLECONTINUOUSUPDATES      : "EnableContiniousUpdates",
     66        CLIENTFENCE                  : "ClientFence",
     67        OLIVECALLCONTROL             : "OliveCallControl",
     68        XVPCLIENTMESSAGE             : "XvpClientMessage",
     69        SETDESKTOPSIZE               : "SetDesktopSize",
     70        TIGHT                        : "Tight",
     71        GIICLIENTMESSAGE             : "GIIClientMessage",
     72        VMWARE                       : "VMWare",
     73        QEMUCLIENTMESSAGE            : "QEMUClientMessage",
     74    }
     75    PACKET_FMT = {
     76        SETPIXELFORMAT               : "!BBBB"+PIXEL_FORMAT,
     77        SETENCODINGS                 : "!BBH",
     78        FRAMEBUFFERUPDATEREQUEST     : "!BBHHHH",
     79        KEYEVENT                     : "!BBBBi",
     80        POINTEREVENT                 : "!BBHH",
     81        CLIENTCUTTEXT                : "!BBBBi",
     82        }
     83    PACKET_STRUCT = {}
     84    for ptype, fmt in PACKET_FMT.items():
     85        PACKET_STRUCT[ptype] = struct.Struct(fmt)
     86
     87
     88class RFBServerMessage(object):
     89    #server to client messages:
     90    FRAMEBUFFERUPDATE = 0
     91    SETCOLORMAPENTRIES = 1
     92    BELL = 2
     93    SERVERCUTTEXT = 3
     94    #optional:
     95    RESIZEFRAMEBUFFER1 = 4
     96    KEYFRAMEUPDATE = 4
     97    FILETRANSFER = 7
     98    TEXTCHAT = 11
     99    KEEPALIVE = 13
     100    RESIZEFRAMEBUFFER2 = 15
     101    VMWARE1 = 127
     102    CARCONNECTIVITY = 128
     103    ENDOFCONTINOUSUPDATES = 150
     104    SERVERSTATE = 173
     105    SERVERFENCE = 248
     106    OLIVECALLCONTROL = 249
     107    XVPSERVERMESSAGE = 250
     108    TIGHT = 252
     109    GIISERVERMESSAGE = 253
     110    VMWARE2 = 254
     111    QEMUSERVERMESSAGE = 255
     112
     113    PACKET_TYPE_STR = {
     114        FRAMEBUFFERUPDATE        : "FramebufferUpdate",
     115        SETCOLORMAPENTRIES       : "SetColorMapEntries",
     116        BELL                     : "Bell",
     117        SERVERCUTTEXT            : "ServerCutText",
     118        #optional:
     119        RESIZEFRAMEBUFFER1       : "ResizeFrameBuffer1",
     120        KEYFRAMEUPDATE           : "KeyFrameUpdate",
     121        FILETRANSFER             : "FileTransfer",
     122        TEXTCHAT                 : "TextChat",
     123        KEEPALIVE                : "KeepAlive",
     124        RESIZEFRAMEBUFFER2       : "ResizeFrameBuffer2",
     125        VMWARE1                  : "VMWare1",
     126        CARCONNECTIVITY          : "CarConnectivity",
     127        ENDOFCONTINOUSUPDATES    : "EndOfContinousUpdates",
     128        SERVERSTATE              : "ServerState",
     129        SERVERFENCE              : "ServerFence",
     130        OLIVECALLCONTROL         : "OliveCallControl",
     131        XVPSERVERMESSAGE         : "XvpServerMessage",
     132        TIGHT                    : "Tight",
     133        GIISERVERMESSAGE         : "GIIServerMessage",
     134        VMWARE2                  : "VMWare2",
     135        QEMUSERVERMESSAGE        : "QEMUServerMessage",
     136        }
     137
     138class RFBEncoding(object):
     139    RAW = 0
     140    COPYRECT = 1
     141    RRE = 2
     142    CORRE = 4
     143    HEXTILE = 5
     144    ZLIB = 6
     145    TIGHT = 7
     146    ZLIBHEX = 8
     147    TRLE = 15
     148    ZRLE = 16
     149    H264 = 20
     150    JPEG = 21
     151    JRLE = 22
     152    HITACHI_ZYWRLE = 17
     153    DESKTOPSIZE = -223
     154    LASTRECT = -224
     155    CURSOR = -239
     156    XCURSOR = -240
     157    QEMU_POINTER = -257
     158    QEMU_KEY = -258
     159    QEMU_AUDIO = -259
     160    GII = -305
     161    DESKTOPNAME = -307
     162    EXTENDEDDESKTOPSIZE = -308
     163    XVP = -309
     164    FENCE = -312
     165    CONTINUOUSUPDATES = -313
     166    CURSORWITHALPHA = -314
     167    VA_H264 = 0x48323634
     168
     169    #-23 to -32    JPEG Quality Level Pseudo-encoding
     170    #-247 to -256    Compression Level Pseudo-encoding
     171    #-412 to -512    JPEG Fine-Grained Quality Level Pseudo-encoding
     172    #-763 to -768    JPEG Subsampling Level Pseudo-encoding
     173   
     174    ENCODING_STR = {
     175        RAW                 : "Raw",
     176        COPYRECT            : "CopyRect",
     177        RRE                 : "RRE",
     178        CORRE               : "CoRRE",
     179        HEXTILE             : "Hextile",
     180        ZLIB                : "Zlib",
     181        TIGHT               : "Tight",
     182        ZLIBHEX             : "ZlibHex",
     183        TRLE                : "TRLE",
     184        ZRLE                : "ZRLE",
     185        H264                : "H264",
     186        JPEG                : "JPEG",
     187        JRLE                : "JRLE",
     188        HITACHI_ZYWRLE      : "HITACHI_ZYWRLE",
     189        DESKTOPSIZE         : "DesktopSize",
     190        LASTRECT            : "LastRect",
     191        CURSOR              : "Cursor",
     192        XCURSOR             : "XCursor",
     193        QEMU_POINTER        : "QEMU Pointer",
     194        QEMU_KEY            : "QEMU Key",
     195        QEMU_AUDIO          : "QEMU Audio",
     196        GII                 : "GII",
     197        DESKTOPNAME         : "DesktopName",
     198        EXTENDEDDESKTOPSIZE : "ExtendedDesktopSize",
     199        XVP                 : "Xvp",
     200        FENCE               : "Fence",
     201        CONTINUOUSUPDATES   : "ContinuousUpdates",
     202        CURSORWITHALPHA     : "CursorWithAlpha",
     203        VA_H264             : "VA_H264",
     204        }
     205
     206
     207class RFBAuth(object):
     208    INVALID = 0
     209    NONE = 1
     210    VNC = 2
     211    TIGHT = 16
     212    AUTH_STR = {
     213        INVALID    : "Invalid",
     214        NONE       : "None",
     215        VNC        : "VNC",
     216        TIGHT      : "Tight",
     217        5                   : "RA2",
     218        6                   : "RA2ne",
     219        17                  : "Ultra",
     220        18                  : "TLS",
     221        19                  : "VeNCrypt",
     222        20                  : "SASL",
     223        21                  : "MD5",
     224        22                  : "xvp",
     225        }
     226    for i in (3, 4):
     227        AUTH_STR[i] = "RealVNC"
     228    for i in range(7, 16):
     229        AUTH_STR[i] = "RealVNC"
     230    for i in range(128, 255):
     231        AUTH_STR[i] = "RealVNC"
     232    for i in range(30, 35):
     233        AUTH_STR[i] = "Apple"
  • xpra/net/udp_protocol.py

     
     1# This file is part of Xpra.
     2# Copyright (C) 2017 Antoine Martin <antoine@devloop.org.uk>
     3# Xpra is released under the terms of the GNU GPL v2, or, at your option, any
     4# later version. See the file COPYING for details.
     5
     6import socket
     7import struct
     8import random
     9try:
     10    import errno
     11    EMSGSIZE = errno.EMSGSIZE
     12except ImportError as e:
     13    EMSGSIZE = None
     14
     15from xpra.log import Logger
     16log = Logger("network", "protocol")
     17
     18from xpra.os_util import LINUX, monotonic_time
     19from xpra.util import envint, repr_ellipsized
     20from xpra.make_thread import start_thread
     21from xpra.net.protocol import Protocol
     22from xpra.net.bytestreams import SocketConnection
     23
     24READ_BUFFER_SIZE = envint("XPRA_READ_BUFFER_SIZE", 65536)
     25DROP_PCT = envint("XPRA_UDP_DROP_PCT", 0)
     26
     27
     28#UUID, seqno, chunk, chunks
     29_header_struct = struct.Struct('!QQHHH')
     30_header_size = _header_struct.size
     31
     32
     33class IncompletePacket(object):
     34    def __init__(self, seqno, start_time, chunks=None):
     35        self.seqno = seqno
     36        self.start_time = start_time
     37        self.last_time = start_time
     38        #todo: use numpy array of bytes
     39        self.chunks = chunks
     40    def __repr__(self):
     41        return ("IncompletePacket(%i: %s chunks)" % (self.seqno, len(self.chunks or [])))
     42
     43
     44class UDPListener(object):
     45    """
     46        This class is used by servers to receive UDP packets,
     47        it parses the header and then exposes the data received via process_packet_cb.
     48    """
     49
     50    def __init__(self, sock, process_packet_cb):
     51        assert sock is not None
     52        self._closed = False
     53        self._socket = sock
     54        self._process_packet_cb =  process_packet_cb
     55        self._read_thread = start_thread(self._read_thread_loop, "read", daemon=True)
     56
     57    def __repr__(self):
     58        return "UDPListener(%s)" % self._socket
     59
     60    def _read_thread_loop(self):
     61        log.info("udp read thread loop starting")
     62        try:
     63            while not self._closed:
     64                buf, bfrom = self._socket.recvfrom(READ_BUFFER_SIZE)
     65                if not buf:
     66                    log("read thread: eof")
     67                    break
     68                values = list(_header_struct.unpack_from(buf[:_header_size])) + [buf[_header_size:], bfrom]
     69                try:
     70                    self._process_packet_cb(self, *values)
     71                except Exception as e:
     72                    log("_read_thread_loop() buffer=%s, from=%s", repr_ellipsized(buf), bfrom, exc_info=True)
     73                    if not self._closed:
     74                        log.error("Error: UDP packet processing error:")
     75                        log.error(" %s", e)
     76        except Exception as e:
     77            #can happen during close(), in which case we just ignore:
     78            if not self._closed:
     79                log.error("Error: read on %s failed: %s", self._socket, type(e), exc_info=True)
     80        log("udp read thread loop ended")
     81        self.close()
     82
     83    def close(self):
     84        s = self._socket
     85        log("UDPListener.close() closed=%s, socket=%s", self._closed, s)
     86        if self._closed:
     87            return
     88        self._closed = True
     89        if s:
     90            try:
     91                log("Protocol.close() calling %s", s.close)
     92                s.close()
     93            except:
     94                log.error("error closing %s", s, exc_info=True)
     95            self._socket = None
     96        log("UDPListener.close() done")
     97
     98
     99class UDPProtocol(Protocol):
     100    """
     101        This class extends the Protocol class with UDP encapsulation.
     102        A single packet may end up being fragmented into multiple UDP frames
     103        to fit in the MTU.
     104        We keep track of the function which can be used to handle send failures
     105        (or the packet data if no function is supplied).
     106        "udp-control" packets are used to synchronize both ends.
     107    """
     108
     109    def __init__(self, *args):
     110        Protocol.__init__(self, *args)
     111        self.mtu = 0
     112        self.last_sequence = -1     #the most recent packet sequence we processed in full
     113        self.highest_sequence = -1
     114        self.jitter = 20            #20ms
     115        self.fail_cb = {}
     116        self.incomplete_packets = {}
     117        self.can_skip = set()       #processed already, or cancelled
     118        self.cancel = set()         #tell the other end to forget those
     119        self.control_timer = None
     120        self.control_timer_due = 0
     121        self._process_read = self.process_read
     122
     123    def close(self):
     124        Protocol.close(self)
     125        self.cancel_control_timer()
     126
     127
     128    def schedule_control(self, delay=1000):
     129        due = monotonic_time()+delay/1000.0
     130        if self.control_timer_due<=due:
     131            #due already
     132            return
     133        ct = self.control_timer
     134        if ct:
     135            self.source_remove(ct)
     136        self.control_timer = self.timeout_add(delay, self.send_control)
     137        self.control_timer_due = due
     138
     139    def cancel_control_timer(self):
     140        ct = self.control_timer
     141        if ct:
     142            self.control_timer = None
     143            self.source_remove(ct)
     144
     145    def send_control(self):
     146        self.control_timer = None
     147        self.control_timer_due = 0
     148        if self._closed:
     149            return False
     150        missing = self._get_missing()
     151        packet = ("udp-control", self.mtu, self.last_sequence, self.highest_sequence, missing, tuple(self.cancel))
     152        log("send_control() packet(%s)=%s", self.incomplete_packets, packet)
     153        self.send_async(packet)
     154        self.cancel = set()
     155        self.schedule_control()
     156        return False
     157
     158    def _get_missing(self):
     159        """ the packets and chunks we are missing """
     160        if not self.incomplete_packets:
     161            return {}
     162        now = monotonic_time()
     163        max_start_time = now-self.jitter/1000.0
     164        late_start_time = now-2
     165        not_recent = now-0.5
     166        missing = {}
     167        for seqno, ip in self.incomplete_packets.items():
     168            st = ip.start_time
     169            if st>=max_start_time:
     170                continue        #too recent, may still arrive
     171            if st<late_start_time or ip.last_time<not_recent:
     172                if ip.chunks is None:
     173                    missing[seqno] = []
     174                else:
     175                    #TODO: use bitmap instead?
     176                    missing_chunks = [i for i,x in enumerate(ip.chunks) if x is None]
     177                    if missing_chunks:
     178                        missing[seqno] = missing_chunks
     179        return missing
     180
     181    def process_control(self, mtu, last_seq, high_seq, missing, cancel):
     182        log("process_control(%i, %i, %i, %s, %s)", mtu, last_seq, high_seq, missing, cancel)
     183        con = self._conn
     184        if not con:
     185            return
     186        if mtu and self.mtu==0:
     187            self.mtu = mtu
     188        #first, we can free all the packets that have been processed by the other end:
     189        if last_seq>=0:
     190            done = [x for x in self.fail_cb.keys() if x<=last_seq]
     191            for x in done:
     192                try:
     193                    del self.fail_cb[x]
     194                except:
     195                    pass
     196        #next we can forget about sequence numbers that have been cancelled:
     197        if cancel:
     198            for seqno in cancel:
     199                if seqno>self.last_sequence:
     200                    self.can_skip.add(seqno)
     201                try:
     202                    del self.incomplete_packets[seqno]
     203                except:
     204                    pass
     205            #we may now be able to move forward a bit:
     206            if self.incomplete_packets and (self.last_sequence+1) in self.can_skip:
     207                self.process_incomplete()
     208        #re-send the missing ones:
     209        for seqno, missing_chunks in missing.items():
     210            fail_cb_seq = self.fail_cb.get(seqno)
     211            if fail_cb_seq is None:
     212                log.error("Error: cannot resend packet sequence %i", seqno)
     213                #hope for the best, and tell the other end to stop asking:
     214                self.cancel.add(seqno)
     215                continue
     216            log("fail_cb[%i]=%s", seqno, repr_ellipsized(str(fail_cb_seq)))
     217            if callable(fail_cb_seq):
     218                self.cancel.add(seqno)
     219                fail_cb_seq()
     220                continue
     221            if len(missing_chunks)==0:
     222                #the other end only knows it is missing the seqno,
     223                #not how many chunks are missing, so send them all
     224                missing_chunks = fail_cb_seq.keys()
     225            for c in missing_chunks:
     226                data = fail_cb_seq.get(c)
     227                log("resend data[%i][%i]=%s", seqno, c, repr_ellipsized(str(data)))
     228                if data is None:
     229                    log.error("Error: cannot resend chunk %i of packet sequence %i", c, seqno)
     230                    log.error(" data missing from packet resend cache")
     231                    continue
     232                #send it again:
     233                #TODO: if the mtu is now lower, we should re-send the whole packet,
     234                # with the new chunk size..
     235                con.write(data)
     236
     237
     238    def send_async(self, packet):
     239        chunks = self.encode(packet)
     240        if len(chunks)>1:
     241            return Protocol.send_now(packet)
     242        proto_flags,index,level,data = chunks[0]
     243        from xpra.net.header import pack_header
     244        payload_size = len(data)
     245        header_and_data = pack_header(proto_flags, level, index, payload_size) + data
     246        with self._write_lock:
     247            if self._write_thread is None:
     248                self.start_write_thread()
     249            self._write_queue.put((header_and_data, None, None, None, False))
     250
     251    def process_udp_data(self, seqno, synchronous, chunk, chunks, data, bfrom):
     252        log("process_udp_data%s %i bytes", (seqno, synchronous, chunk, chunks, repr_ellipsized(data), bfrom), len(data))
     253        if DROP_PCT>0:
     254            if random.randint(0, 100) <= DROP_PCT:
     255                log.warn("Warning: dropping udp packet %i.%i", seqno, chunk)
     256                return
     257        if seqno<=self.last_sequence:
     258            #must be a duplicate, we've already processed it!
     259            return
     260        self.highest_sequence = max(self.highest_sequence, seqno)
     261        if self.incomplete_packets or (synchronous and seqno!=self.last_sequence+1) or chunk!=0 or chunks!=1:
     262            assert chunk>=0 and chunks>0 and chunk<chunks, "invalid chunk: %i/%i" % (chunk, chunks)
     263            #slow path: add chunk to incomplete packet
     264            now = monotonic_time()
     265            ip = self.incomplete_packets.get(seqno)
     266            if not ip or not ip.chunks:
     267                chunks_array = [None for _ in range(chunks)]
     268                ip = IncompletePacket(seqno, now, chunks_array)
     269                self.incomplete_packets[seqno] = ip
     270            else:
     271                ip.last_time = now
     272            ip.chunks[chunk] = data
     273            if seqno!=self.last_sequence+1:
     274                #we're waiting for a packet and this is not it,
     275                #make sure any gaps are marked as incomplete:
     276                for i in range(self.last_sequence+1, seqno):
     277                    if i not in self.incomplete_packets and i not in self.can_skip:
     278                        self.incomplete_packets[i] = IncompletePacket(i, now)
     279                #make sure we request the missing packets:
     280                self.schedule_control(self.jitter)
     281                if synchronous:
     282                    #we have to wait for the missing chunks / packets
     283                    log("process_udp_data: we're waiting for %i, not %i", self.last_sequence+1, seqno)
     284                    return
     285            if any(x is None for x in ip.chunks):
     286                #one of the chunks is still missing
     287                log("process_udp_data: sequence %i is still missing some chunks: %s", seqno, [i for i,x in enumerate(ip.chunks) if x is None])
     288                self.schedule_control(self.jitter)
     289                return
     290            #all the data is here!
     291            del self.incomplete_packets[seqno]
     292            data = b"".join(ip.chunks)
     293        #log("process_udp_data: adding packet sequence %i to read queue", seqno)
     294        if seqno==self.last_sequence+1:
     295            self.last_sequence = seqno
     296        else:
     297            assert not synchronous
     298            self.can_skip.add(seqno)
     299        self._read_queue_put(data)
     300        if self.incomplete_packets:
     301            self.process_incomplete()
     302
     303    def process_incomplete(self):
     304        #maybe we can send the next one(s) now?
     305        seqno = self.last_sequence
     306        log("process_incomplete() last_sequence=%i, can skip=%s", seqno, self.can_skip)
     307        while True:
     308            seqno += 1
     309            if seqno in self.can_skip:
     310                try:
     311                    del self.incomplete_packets[seqno]
     312                except KeyError:
     313                    pass
     314                self.can_skip.remove(seqno)
     315                self.last_sequence = seqno
     316                continue
     317            ip = self.incomplete_packets.get(seqno)
     318            if not ip or not ip.chunks:
     319                #it's missing, we just don't know how many chunks
     320                return
     321            if any(x is None for x in ip.chunks):
     322                #one of the chunks is still missing
     323                return
     324            #all the data is here!
     325            del self.incomplete_packets[seqno]
     326            data = b"".join(ip.chunks)
     327            log("process_incomplete: adding packet sequence %i to read queue", seqno)
     328            self.last_sequence = seqno
     329            self._read_queue_put(data)
     330
     331
     332    def write_buffers(self, buf_data, fail_cb, synchronous):
     333        buf = b"".join(buf_data)
     334        #if not isinstance(buf, JOIN_TYPES):
     335        #    buf = memoryview_to_bytes(buf)
     336        while True:
     337            try:
     338                seqno = self.output_packetcount
     339                return self.send_buf(seqno, buf, fail_cb, synchronous)
     340            except MTUExceeded as e:
     341                log.warn("%s: %s", e, self.mtu)
     342                if self.mtu>576:
     343                    self.mtu //= 2
     344                raise
     345
     346    def send_buf(self, seqno, data, fail_cb, synchronous):
     347        con = self._conn
     348        if not con:
     349            return 0
     350        #TODO: bump to 1280 for IPv6
     351        #mtu = max(576, self.mtu)
     352        mtu = max(1280, self.mtu)
     353        uuid = 0 #todo!
     354        l = len(data)
     355        maxpayload = mtu-_header_size
     356        chunks = l // maxpayload
     357        if l % maxpayload > 0:
     358            chunks += 1
     359        log("UDP.send_buf(%s, %i bytes, %s, %s) seq=%i, mtu=%s, maxpayload=%i, chunks=%i, data=%s", con, l, fail_cb, synchronous, seqno, mtu, maxpayload, chunks, repr_ellipsized(data))
     360        chunk = 0
     361        offset = 0
     362        chunk_resend_cache = None
     363        if fail_cb:
     364            self.fail_cb[seqno] = fail_cb
     365        else:
     366            chunk_resend_cache = {}
     367            self.fail_cb[seqno] = chunk_resend_cache
     368        while offset<l:
     369            assert chunk<chunks
     370            pl = min(maxpayload, l-offset)
     371            data_chunk = data[offset:offset+pl]
     372            udp_data = _header_struct.pack(uuid, seqno, synchronous, chunk, chunks) + data_chunk
     373            assert len(udp_data)<=mtu, "invalid payload size: %i greater than mtu %i" % (len(udp_data), mtu)
     374            con.write(udp_data)
     375            self.output_raw_packetcount += 1
     376            offset += pl
     377            if chunk_resend_cache is not None:
     378                chunk_resend_cache[chunk] = udp_data
     379            chunk += 1
     380        assert chunk==chunks, "wrote %i chunks but expected %i" % (chunk, chunks)
     381        self.output_packetcount += 1
     382        if not self.control_timer:
     383            self.control_timer = self.timeout_add(1000, self.send_control)
     384        return offset
     385
     386
     387    def get_info(self, alias_info=True):
     388        i = Protocol.get_info(self, alias_info)
     389        i["mtu"] = self.mtu
     390        return i
     391
     392
     393class UDPServerProtocol(UDPProtocol):
     394
     395    def _read_thread_loop(self):
     396        #server protocol is not used to read,
     397        #we rely on the listener to dispatch packets instead
     398        pass
     399
     400class UDPClientProtocol(UDPProtocol):
     401
     402    def con_write(self, data, fail_cb):
     403        """ After successfully writing some data, update the mtu value """
     404        r = UDPProtocol.con_write(self, data, fail_cb)
     405        if r>0 and LINUX:
     406            IP_MTU = 14
     407            con = self._conn
     408            if con:
     409                try:
     410                    self.mtu = min(32767, con._socket.getsockopt(socket.IPPROTO_IP, IP_MTU))
     411                    #log("mtu=%s", self.mtu)
     412                except IOError as e:
     413                    pass
     414        return r
     415
     416    def process_read(self, buf):
     417        """ Splits and parses the UDP frame header from the packet """
     418        #log.info("UDPClientProtocol.read_queue_put(%s)", repr_ellipsized(buf))
     419        _, seqno, synchronous, chunk, chunks = _header_struct.unpack_from(buf[:_header_size])
     420        data = buf[_header_size:]
     421        bfrom = None        #not available here..
     422        self.process_udp_data(seqno, synchronous, chunk, chunks, data, bfrom)
     423
     424
     425class UDPSocketConnection(SocketConnection):
     426    """
     427        This class extends SocketConnection to use socket.sendto
     428        to send data to the correct destination.
     429        (servers use a single socket to talk to multiple clients,
     430        they do not call connect() and so we have to specify the remote target every time)
     431    """
     432
     433    def __init__(self, *args):
     434        SocketConnection.__init__(self, *args)
     435
     436    def write(self, buf):
     437        #log("UDPSocketConnection: sending %i bytes to %s", len(buf), self.remote)
     438        try:
     439            return self._socket.sendto(buf, self.remote)
     440        except IOError as e:
     441            if e.errno==EMSGSIZE:
     442                raise MTUExceeded("invalid UDP payload size, cannot send %i bytes: %s" % (len(buf), e))
     443            raise
     444
     445    def close(self):
     446        """
     447            don't close the socket, we're don't own it
     448        """
     449        pass
     450
     451class MTUExceeded(IOError):
     452    pass
  • xpra/scripts/config.py

     
    437437                    "auth"              : str,
    438438                    "vsock-auth"        : str,
    439439                    "tcp-auth"          : str,
     440                    "udp-auth"          : str,
     441                    "dtls-auth"         : str,
    440442                    "ws-auth"           : str,
    441443                    "wss-auth"          : str,
    442444                    "ssl-auth"          : str,
     
    594596                    "bind"              : list,
    595597                    "bind-vsock"        : list,
    596598                    "bind-tcp"          : list,
     599                    "bind-udp"          : list,
     600                    "bind-dtls"         : list,
    597601                    "bind-ws"           : list,
    598602                    "bind-wss"          : list,
    599603                    "bind-ssl"          : list,
     
    615619    "start-after-connect", "start-child-after-connect",
    616620    "start-on-connect", "start-child-on-connect",
    617621    ]
    618 BIND_OPTIONS = ["bind", "bind-tcp", "bind-ssl", "bind-ws", "bind-wss", "bind-vsock"]
     622BIND_OPTIONS = ["bind", "bind-tcp", "bind-udp", "bind-ssl", "bind-ws", "bind-wss", "bind-vsock"]
    619623
    620624#keep track of the options added since v1,
    621625#so we can generate command lines that work with older supported versions:
     
    679683    "av-sync", "global-menus",
    680684    "printing", "file-transfer", "open-command", "open-files", "start-new-commands",
    681685    "mmap", "mmap-group", "mdns",
    682     "auth", "vsock-auth", "tcp-auth", "ws-auth", "wss-auth", "ssl-auth", "rfb-auth",
    683     "bind", "bind-vsock", "bind-tcp", "bind-ssl", "bind-ws", "bind-wss", "bind-rfb",
     686    "auth", "vsock-auth", "tcp-auth", "udp-auth", "dtls-auth", "ws-auth", "wss-auth", "ssl-auth", "rfb-auth",
     687    "bind", "bind-vsock", "bind-tcp", "bind-udp", "bind-dtls", "bind-ssl", "bind-ws", "bind-wss", "bind-rfb",
    684688    "start", "start-child",
    685689    "start-after-connect", "start-child-after-connect",
    686690    "start-on-connect", "start-child-on-connect",
     
    799803                    "auth"              : "",
    800804                    "vsock-auth"        : "",
    801805                    "tcp-auth"          : "",
     806                    "udp-auth"          : "",
     807                    "dtls-auth"         : "",
    802808                    "ws-auth"           : "",
    803809                    "wss-auth"          : "",
    804810                    "ssl-auth"          : "",
     
    946952                    "bind"              : bind_dirs,
    947953                    "bind-vsock"        : [],
    948954                    "bind-tcp"          : [],
     955                    "bind-udp"          : [],
     956                    "bind-dtls"         : [],
    949957                    "bind-ws"           : [],
    950958                    "bind-wss"          : [],
    951959                    "bind-ssl"          : [],
  • xpra/scripts/main.py

     
    534534                          metavar="[HOST]:[PORT]",
    535535                          help="Listen for connections over TCP (use --tcp-auth to secure it)."
    536536                            + " You may specify this option multiple times with different host and port combinations")
     537        group.add_option("--bind-udp", action="append",
     538                          dest="bind_udp", default=list(defaults.bind_udp or []),
     539                          metavar="[HOST]:[PORT]",
     540                          help="Listen for connections over UDP (use --udp-auth to secure it)."
     541                            + " You may specify this option multiple times with different host and port combinations")
     542        group.add_option("--bind-dtls", action="append",
     543                          dest="bind_dtls", default=list(defaults.bind_dtls or []),
     544                          metavar="[HOST]:[PORT]",
     545                          help="Listen for connections over UDP + DTLS (use --dtls-auth to secure it)."
     546                            + " You may specify this option multiple times with different host and port combinations")
    537547        group.add_option("--bind-ws", action="append",
    538548                          dest="bind_ws", default=list(defaults.bind_ws or []),
    539549                          metavar="[HOST]:[PORT]",
     
    558568        ignore({
    559569            "bind"      : defaults.bind,
    560570            "bind-tcp"  : defaults.bind_tcp,
     571            "bind-udp"  : defaults.bind_udp,
    561572            "bind-ws"   : defaults.bind_ws,
    562573            "bind-wss"  : defaults.bind_wss,
    563574            "bind-ssl"  : defaults.bind_ssl,
     
    974985    group.add_option("--tcp-auth", action="store",
    975986                      dest="tcp_auth", default=defaults.tcp_auth,
    976987                      help="The authentication module to use for TCP sockets (default: '%default')")
     988    group.add_option("--udp-auth", action="store",
     989                      dest="udp_auth", default=defaults.udp_auth,
     990                      help="The authentication module to use for UDP sockets (default: '%default')")
    977991    group.add_option("--ws-auth", action="store",
    978992                      dest="ws_auth", default=defaults.ws_auth,
    979993                      help="The authentication module to use for Websockets (default: '%default')")
     
    16861700        if opts.socket_dir:
    16871701            desc["socket_dir"] = opts.socket_dir
    16881702        return desc
    1689     elif display_name.startswith("tcp:") or display_name.startswith("tcp/") or \
    1690             display_name.startswith("ssl:") or display_name.startswith("ssl/"):
    1691         ctype = display_name[:3]        #ie: "ssl" or "tcp"
    1692         separator = display_name[3]     # ":" or "/"
     1703    elif (
     1704        display_name.startswith("tcp:") or display_name.startswith("tcp/") or \
     1705        display_name.startswith("ssl:") or display_name.startswith("ssl/") or \
     1706        display_name.startswith("udp:") or display_name.startswith("udp/") or \
     1707        display_name.startswith("dtls:") or display_name.startswith("dtls/")
     1708        ):
     1709        ctype = display_name[:4].rstrip(":/")   #ie: "ssl" or "tcp"
     1710        separator = display_name[len(ctype)]     # ":" or "/"
    16931711        desc.update({
    16941712                     "type"     : ctype,
    16951713                     })
     
    20732091        from xpra.net.bytestreams import SocketConnection
    20742092        return SocketConnection(sock, "local", "host", (CID_TYPES.get(cid, cid), iport), dtype)
    20752093
    2076     elif dtype in ("tcp", "ssl", "ws", "wss"):
     2094    elif dtype in ("tcp", "ssl", "ws", "wss", "udp", "dtls"):
    20772095        if display_desc.get("ipv6"):
    20782096            assert socket.has_ipv6, "no IPv6 support"
    20792097            family = socket.AF_INET6
     
    20892107                socket.AF_INET  : "IPv4",
    20902108                }.get(family, family), (host, port), e))
    20912109        sockaddr = addrinfo[0][-1]
    2092         sock = socket.socket(family, socket.SOCK_STREAM)
    2093         sock.settimeout(SOCKET_TIMEOUT)
    2094         sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, TCP_NODELAY)
     2110        if dtype in ("udp", "dtls"):
     2111            sock = socket.socket(family, socket.SOCK_DGRAM)
     2112        else:
     2113            sock = socket.socket(family, socket.SOCK_STREAM)
     2114            sock.settimeout(SOCKET_TIMEOUT)
     2115            sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, TCP_NODELAY)
    20952116        strict_host_check = display_desc.get("strict-host-check")
    20962117        if strict_host_check is False:
    20972118            opts.ssl_server_verify_mode = "none"
    20982119        conn = _socket_connect(sock, sockaddr, display_name, dtype)
    2099         if dtype in ("ssl", "wss"):
     2120        if dtype in ("ssl", "wss", "dtls"):
     2121            if dtype=="dtls":
     2122                from dtls import do_patch   #@UnresolvedImport
     2123                do_patch()
    21002124            wrap_socket = ssl_wrap_socket_fn(opts, server_side=False)
    21012125            sock = wrap_socket(sock)
    21022126            assert sock, "failed to wrap socket %s" % sock
     
    22712295        raise InitException("cannot check hostname with verify mode %s" % verify_mode)
    22722296    wrap_socket = context.wrap_socket
    22732297    del opts
    2274     def do_wrap_socket(tcp_socket):
     2298    def do_wrap_socket(tcp_socket, handshake=None):
    22752299        from xpra.log import Logger
    22762300        try:
    22772301            ssl_sock = wrap_socket(tcp_socket, **kwargs)
     
    22812305            if SSLEOFError and isinstance(e, SSLEOFError):
    22822306                return None
    22832307            raise InitExit(EXIT_SSL_FAILURE, "Cannot wrap socket %s: %s" (tcp_socket, e))
    2284         if not server_side:
     2308        if not server_side and handshake is not False:
    22852309            try:
    22862310                ssl_sock.do_handshake(True)
    22872311            except Exception as e:
  • xpra/scripts/server.py

     
    357357    if opts.encoding=="help" or "help" in opts.encodings:
    358358        return show_encoding_help(opts)
    359359
    360     from xpra.server.socket_util import parse_bind_tcp, parse_bind_vsock
    361     bind_tcp = parse_bind_tcp(opts.bind_tcp)
    362     bind_ssl = parse_bind_tcp(opts.bind_ssl)
    363     bind_ws = parse_bind_tcp(opts.bind_ws)
    364     bind_wss = parse_bind_tcp(opts.bind_wss)
    365     bind_rfb = parse_bind_tcp(opts.bind_rfb)
     360    from xpra.server.socket_util import parse_bind_ip, parse_bind_vsock
     361    bind_tcp = parse_bind_ip(opts.bind_tcp)
     362    bind_udp = parse_bind_ip(opts.bind_udp)
     363    bind_dtls= parse_bind_ip(opts.bind_dtls)
     364    bind_ssl = parse_bind_ip(opts.bind_ssl)
     365    bind_ws  = parse_bind_ip(opts.bind_ws)
     366    bind_wss = parse_bind_ip(opts.bind_wss)
     367    bind_rfb = parse_bind_ip(opts.bind_rfb)
    366368    bind_vsock = parse_bind_vsock(opts.bind_vsock)
    367369
    368370    assert mode in ("start", "start-desktop", "upgrade", "shadow", "proxy")
     
    567569    sockets = []
    568570
    569571    #SSL sockets:
    570     wrap_socket_fn = None
     572    wrap_server_socket_fn = None
     573    wrap_client_socket_fn = None
    571574    need_ssl = False
    572575    ssl_opt = opts.ssl.lower()
    573576    if ssl_opt in TRUE_OPTIONS or bind_ssl or bind_wss:
     
    582585    if need_ssl:
    583586        from xpra.scripts.main import ssl_wrap_socket_fn
    584587        try:
    585             wrap_socket_fn = ssl_wrap_socket_fn(opts, server_side=True)
    586             netlog("wrap_socket_fn=%s", wrap_socket_fn)
     588            wrap_server_socket_fn = ssl_wrap_socket_fn(opts, server_side=True)
     589            wrap_client_socket_fn = ssl_wrap_socket_fn(opts, server_side=False)
     590            netlog("wrap socket functions: %s, %s", wrap_server_socket_fn, wrap_client_socket_fn)
    587591        except Exception as e:
    588592            netlog("SSL error", exc_info=True)
    589593            cpaths = csv("'%s'" % x for x in (opts.ssl_cert, opts.ssl_key) if x)
    590594            raise InitException("cannot create SSL socket, check your certificate paths (%s): %s" % (cpaths, e))
    591595
     596    from xpra.server.socket_util import setup_tcp_socket, setup_udp_socket, setup_vsock_socket, setup_local_sockets
    592597    def add_mdns(socktype, host, port):
    593598        recs = mdns_recs.setdefault(socktype.lower(), [])
    594599        rec = (host, port)
     
    598603        socket = setup_tcp_socket(host, iport, socktype)
    599604        sockets.append(socket)
    600605        add_mdns(socktype, host, iport)
    601 
     606    def add_udp_socket(socktype, host, iport):
     607        socket = setup_udp_socket(host, iport, socktype)
     608        if socktype=="dtls":
     609            from dtls import do_patch   #@UnresolvedImport
     610            do_patch()
     611        sockets.append(socket)
     612        add_mdns(socktype, host, iport)
    602613    # Initialize the TCP sockets before the display,
    603614    # That way, errors won't make us kill the Xvfb
    604615    # (which may not be ours to kill at that point)
    605     from xpra.server.socket_util import setup_tcp_socket, setup_vsock_socket, setup_local_sockets
    606616    netlog("setting up SSL sockets: %s", bind_ssl)
    607617    for host, iport in bind_ssl:
    608618        add_tcp_socket("SSL", host, iport)
     
    615625        add_tcp_socket("tcp", host, iport)
    616626        if tcp_ssl:
    617627            add_mdns("ssl", host, iport)
     628    netlog("setting up UDP sockets: %s", bind_udp)
     629    for host, iport in bind_udp:
     630        add_udp_socket("udp", host, iport)
     631    netlog("setting up UDP+DTLS sockets: %s", bind_dtls)
     632    for host, iport in bind_dtls:
     633        add_udp_socket("dtls", host, iport)
    618634    netlog("setting up http / ws (websockets): %s", bind_ws)
    619635    for host, iport in bind_ws:
    620636        add_tcp_socket("ws", host, iport)
     
    908924            mdns_publish(display_name, mode, listen_on, mdns_info)
    909925
    910926    try:
    911         app._ssl_wrap_socket = wrap_socket_fn
     927        app._ssl_wrap_server_socket = wrap_server_socket_fn
     928        app._ssl_wrap_client_socket = wrap_client_socket_fn
    912929        app.original_desktop_display = desktop_display
    913930        app.exec_cwd = opts.chdir or cwd
    914931        app.init(opts)
  • xpra/server/server_core.py

     
    146146        self.ws_auth_class = None
    147147        self.wss_auth_class = None
    148148        self.ssl_auth_class = None
     149        self.udp_auth_class = None
     150        self.dtls_auth_class = None
    149151        self.rfb_auth_class = None
    150152        self.vsock_auth_class = None
    151153        self._when_ready = []
     
    157159        #networking bits:
    158160        self._socket_info = []
    159161        self._potential_protocols = []
     162        self._udp_listeners = []
     163        self._udp_protocols = {}
    160164        self._tcp_proxy_clients = []
    161165        self._tcp_proxy = ""
    162         self._ssl_wrap_socket = None
     166        self._ssl_wrap_server_socket = None
     167        self._ssl_wrap_client_socket = None
    163168        self._accept_timeout = SOCKET_TIMEOUT + 1
    164169        self.ssl_mode = None
    165170        self._html = False
     
    383388        self._default_packet_handlers = {
    384389            "hello":                                self._process_hello,
    385390            "disconnect":                           self._process_disconnect,
     391            "udp-control":                          self._process_udp_control,
    386392            Protocol.CONNECTION_LOST:               self._process_connection_lost,
    387393            Protocol.GIBBERISH:                     self._process_gibberish,
    388394            Protocol.INVALID:                       self._process_invalid,
     
    538544        self.do_cleanup()
    539545        self.cleanup_protocols(protocols, reason, True)
    540546        self._potential_protocols = []
     547        self.cleanup_udp_listeners()
    541548
    542549    def do_cleanup(self):
    543550        #allow just a bit of time for the protocol packet flush
     
    544551        sleep(0.1)
    545552
    546553
     554    def cleanup_udp_listeners(self):
     555        for udpl in self._udp_listeners:
     556            udpl.close()
     557        self._udp_listeners = []
     558
    547559    def cleanup_all_protocols(self, reason):
    548560        protocols = self.get_all_protocols()
    549561        self.cleanup_protocols(protocols, reason)
     
    569581            #named pipe listener uses a thread:
    570582            sock.new_connection_cb = self._new_connection
    571583            sock.start()
     584        elif socktype in ("udp", "dtls"):
     585            #socket_info = self.socket_info.get(sock)
     586            from xpra.net.udp_protocol import UDPListener
     587            udpl = UDPListener(sock, self.process_udp_packet)
     588            self._udp_listeners.append(udpl)
    572589        else:
    573590            from xpra.gtk_common.gobject_compat import import_glib
    574591            glib = import_glib()
     
    673690        peek_data, line1 = self.peek_connection(conn)
    674691
    675692        def ssl_wrap():
    676             ssl_sock = self._ssl_wrap_socket(sock)
     693            ssl_sock = self._ssl_wrap_server_socket(sock)
    677694            netlog("ssl wrapped socket(%s)=%s", sock, ssl_sock)
    678695            if ssl_sock is None:
    679696                #None means EOF! (we don't want to import ssl bits here)
     
    729746            self.handle_rfb_connection(conn)
    730747            return
    731748
    732         elif socktype=="tcp" and peek_data and (self._html or self._tcp_proxy or self._ssl_wrap_socket):
     749        elif peek_data and (
     750            (socktype=="tcp" and  (self._html or self._tcp_proxy or self._ssl_wrap_server_socket)) or
     751            (socktype=="udp" and self._ssl_wrap_server_socket and self._ssl_wrap_client_socket)
     752            ):
    733753            #see if the packet data is actually xpra or something else
    734754            #that we need to handle via a tcp proxy, ssl wrapper or the websockify adapter:
    735755            try:
     
    763783        netlog("make_protocol(%s, %s)", socktype, conn)
    764784        socktype = socktype.lower()
    765785        protocol = protocol_class(conn)
     786        protocol.socket_type = socktype
    766787        self._potential_protocols.append(protocol)
    767788        protocol.challenge_sent = False
    768789        protocol.authenticator = None
    769790        protocol.encryption = None
    770791        protocol.keyfile = None
     792        protocol.auth_class = {
     793            "tcp"           : self.tcp_auth_class,
     794            "ssl"           : self.ssl_auth_class,
     795            "udp"           : self.udp_auth_class,
     796            "dtls"          : self.dtls_auth_class,
     797            "ws"            : self.ws_auth_class,
     798            "wss"           : self.wss_auth_class,
     799            "rfb"           : self.rfb_auth_class,
     800            "vsock"         : self.vsock_auth_class,
     801            "unix-domain"   : self.auth_class,
     802            "named-pipe"    : self.auth_class,
     803            }[socktype]
    771804        if socktype=="tcp":
    772             protocol.auth_class = self.tcp_auth_class
     805            #special case for legacy encryption code:
    773806            protocol.encryption = self.tcp_encryption
    774807            protocol.keyfile = self.tcp_encryption_keyfile
    775808            if protocol.encryption and ENCRYPT_FIRST_PACKET:
     
    776809                authlog("encryption=%s, keyfile=%s", protocol.encryption, protocol.keyfile)
    777810                password = self.get_encryption_key(None, protocol.keyfile)
    778811                protocol.set_cipher_in(protocol.encryption, DEFAULT_IV, password, DEFAULT_SALT, DEFAULT_ITERATIONS, INITIAL_PADDING)
    779         elif socktype=="ssl":
    780             protocol.auth_class = self.ssl_auth_class
    781         elif socktype=="ws":
    782             protocol.auth_class = self.ws_auth_class
    783         elif socktype=="wss":
    784             protocol.auth_class = self.wss_auth_class
    785         elif socktype=="rfb":
    786             protocol.auth_class = self.rfb_auth_class
    787         elif socktype=="vsock":
    788             protocol.auth_class = self.vsock_auth_class
    789         else:
    790             protocol.auth_class = self.auth_class
    791         protocol.socket_type = socktype
    792812        protocol.invalid_header = self.invalid_header
    793813        authlog("socktype=%s, auth class=%s, encryption=%s, keyfile=%s", socktype, protocol.auth_class, protocol.encryption, protocol.keyfile)
    794814        protocol.start()
     
    811831            #xpra packet header, no need to wrap this connection
    812832            return True, conn, peek_data
    813833        frominfo = pretty_socket(conn.remote)
    814         if self._ssl_wrap_socket and peek_data[0] in (chr(0x16), 0x16):
    815             socktype = "SSL"
     834        if self._ssl_wrap_server_socket and peek_data[0] in (chr(0x16), 0x16):
     835            socktype = {
     836                "udp"   : "dtls",
     837                "tcp"   : "SSL",
     838                }[socktype]
    816839            sock, sockname, address, target = conn._socket, conn.local, conn.remote, conn.target
    817             sock = self._ssl_wrap_socket(sock)
     840            sock = self._ssl_wrap_server_socket(sock)
    818841            if sock is None:
    819842                #None means EOF! (we don't want to import ssl bits here)
    820843                netlog("ignoring SSL EOF error")
     
    850873        return True, conn, peek_data
    851874
    852875    def invalid_header(self, proto, data, msg=""):
    853         netlog("invalid_header(%s, %s bytes: '%s', %s) input_packetcount=%s, tcp_proxy=%s, html=%s, ssl=%s", proto, len(data or ""), msg, repr_ellipsized(data), proto.input_packetcount, self._tcp_proxy, self._html, bool(self._ssl_wrap_socket))
     876        netlog("invalid_header(%s, %s bytes: '%s', %s) input_packetcount=%s, tcp_proxy=%s, html=%s, ssl=%s",
     877               proto, len(data or ""), msg, repr_ellipsized(data), proto.input_packetcount, self._tcp_proxy, self._html, bool(self._ssl_wrap_server_socket))
    854878        err = "invalid packet format, %s" % self.guess_header_protocol(data)
    855879        proto.gibberish(err, data)
    856880
     
    10031027        netlog("send_disconnect(%s, %s, %s)", proto, reason, extra)
    10041028        if proto._closed:
    10051029            return
    1006         proto.send_now(["disconnect", reason]+list(extra))
     1030        proto.send_disconnect(reason, *extra)
    10071031        self.timeout_add(1000, self.force_disconnect, proto)
    10081032
    10091033    def force_disconnect(self, proto):
     
    14441468        for socktype, auth_class in {
    14451469                                     "tcp"          : self.tcp_auth_class,
    14461470                                     "ssl"          : self.ssl_auth_class,
     1471                                     "ws"           : self.ws_auth_class,
     1472                                     "wss"          : self.wss_auth_class,
     1473                                     "udp"          : self.udp_auth_class,
     1474                                     "dtls"         : self.dtls_auth_class,
     1475                                     "rfb"          : self.rfb_auth_class,
    14471476                                     "unix-domain"  : self.auth_class,
    14481477                                     "vsock"        : self.vsock_auth_class,
    14491478                                     }.items():
     
    14711500        except:
    14721501            netlog.error("Unhandled error while processing a '%s' packet from peer using %s", packet_type, handler, exc_info=True)
    14731502
     1503
    14741504    def handle_rfb_connection(self, conn):
    14751505        log.error("Error: RFB protocol is not supported by this server")
    14761506        conn.close()
     1507
     1508
     1509    def _process_udp_control(self, proto, packet):
     1510        proto.process_control(*packet[1:])
     1511
     1512    def process_udp_packet(self, udp_listener, uuid, seqno, synchronous, chunk, chunks, data, bfrom):
     1513        #log.info("process_udp_packet%s", (udp_listener, uuid, seqno, synchronous, chunk, chunks, len(data), bfrom))
     1514        protocol = self._udp_protocols.get(uuid)
     1515        if not protocol:
     1516            from xpra.net.udp_protocol import UDPServerProtocol, UDPSocketConnection
     1517            def udp_protocol_class(conn):
     1518                protocol = UDPServerProtocol(self, conn, self.process_packet)
     1519                protocol.large_packets.append("info-response")
     1520                protocol.receive_aliases.update(self._aliases)
     1521                return protocol
     1522            socktype = "udp"        #breaks dtls...
     1523            host, port = bfrom
     1524            sock = udp_listener._socket
     1525            if socktype=="dtls":
     1526                from dtls import do_patch   #@UnresolvedImport
     1527                do_patch()
     1528                if False:
     1529                    family = socket.AF_INET6
     1530                else:
     1531                    family = socket.AF_INET
     1532                #sock = socket.socket(family, socket.SOCK_DGRAM)
     1533                #sock.bind(("0.0.0.0", 10003))
     1534                sock.connect(bfrom)
     1535                sock  = self._ssl_wrap_client_socket(sock, handshake=False)
     1536                assert sock, "failed to wrap socket %s" % sock
     1537                sockname = sock.getsockname()
     1538                conn = SocketConnection(sock, sockname, (host, port), (host, port), socktype)
     1539            else:
     1540                sockname = sock.getsockname()
     1541                conn = UDPSocketConnection(sock, sockname, (host, port), (host, port), socktype)
     1542            conn.timeout = SOCKET_TIMEOUT
     1543            protocol = self.do_make_protocol(socktype, conn, udp_protocol_class)
     1544            self._udp_protocols[uuid] = protocol
     1545        #assert packetsize==datalen, "expected datalen=packetsize, but got %i!=%i" % (datalen, packetsize)
     1546        #assert len(data)==datalen, "expected %i bytes but got %i" % (datalen, len(data))
     1547        protocol.process_udp_data(seqno, synchronous, chunk, chunks, data, bfrom)
  • xpra/server/socket_util.py

     
    108108        log("create_tcp_socket%s", (host, iport), exc_info=True)
    109109        raise InitException("failed to setup %s socket on %s:%s %s" % (socktype, host, iport, e))
    110110    def cleanup_tcp_socket():
    111         log.info("closing %s socket %s:%s", socktype, host, iport)
     111        log.info("closing %s socket %s:%s", socktype.lower(), host, iport)
    112112        try:
    113113            tcp_socket.close()
    114114        except:
     
    117117    log("%s: %s:%s : %s", socktype, host, iport, socket)
    118118    return socktype, tcp_socket, (host, iport)
    119119
     120def create_udp_socket(host, iport):
     121    if host.find(":")<0:
     122        listener = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
     123        sockaddr = (host, iport)
     124    else:
     125        assert socket.has_ipv6, "specified an IPv6 address but this is not supported"
     126        res = socket.getaddrinfo(host, iport, socket.AF_INET6, socket.SOCK_DGRAM)
     127        listener = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
     128        sockaddr = res[0][-1]
     129    listener.bind(sockaddr)
     130    return listener
    120131
    121 def parse_bind_tcp(bind_tcp):
    122     tcp_sockets = set()
    123     if bind_tcp:
    124         for spec in bind_tcp:
     132def setup_udp_socket(host, iport, socktype="udp"):
     133    from xpra.log import Logger
     134    log = Logger("network")
     135    try:
     136        udp_socket = create_udp_socket(host, iport)
     137    except Exception as e:
     138        log("create_udp_socket%s", (host, iport), exc_info=True)
     139        raise InitException("failed to setup %s socket on %s:%s %s" % (socktype, host, iport, e))
     140    def cleanup_udp_socket():
     141        log.info("closing %s socket %s:%s", socktype, host, iport)
     142        try:
     143            udp_socket.close()
     144        except:
     145            pass
     146    add_cleanup(cleanup_udp_socket)
     147    log("%s: %s:%s : %s", socktype, host, iport, socket)
     148    return socktype, udp_socket, (host, iport)
     149
     150
     151def parse_bind_ip(bind_ip):
     152    ip_sockets = set()
     153    if bind_ip:
     154        for spec in bind_ip:
    125155            if ":" not in spec:
    126                 raise InitException("TCP port must be specified as [HOST]:PORT")
     156                raise InitException("port must be specified as [HOST]:PORT")
    127157            host, port = spec.rsplit(":", 1)
    128158            if host == "":
    129159                host = "127.0.0.1"
     
    135165                    assert iport>0 and iport<2**16
    136166                except:
    137167                    raise InitException("invalid port number: %s" % port)
    138             tcp_sockets.add((host, iport))
    139     return tcp_sockets
     168            ip_sockets.add((host, iport))
     169    return ip_sockets
    140170
    141171def setup_vsock_socket(cid, iport):
    142172    from xpra.log import Logger
  • xpra/server/source.py

     
    14021402#
    14031403    def next_packet(self):
    14041404        """ Called by protocol.py when it is ready to send the next packet """
    1405         packet, start_send_cb, end_send_cb, have_more = None, None, None, False
     1405        packet, start_send_cb, end_send_cb, fail_cb, have_more = None, None, None, None, False
    14061406        if not self.is_closed():
    14071407            if len(self.ordinary_packets)>0:
    14081408                packet = self.ordinary_packets.pop(0)
    14091409            elif len(self.packet_queue)>0:
    1410                 packet, _, _, start_send_cb, end_send_cb = self.packet_queue.popleft()
     1410                packet, _, _, start_send_cb, end_send_cb, fail_cb = self.packet_queue.popleft()
    14111411            have_more = packet is not None and (len(self.ordinary_packets)>0 or len(self.packet_queue)>0)
    1412         return packet, start_send_cb, end_send_cb, have_more
     1412        return packet, start_send_cb, end_send_cb, fail_cb, have_more
    14131413
    14141414    def send(self, *parts):
    14151415        """ This method queues non-damage packets (higher priority) """
     
    22942294        self.statistics.compression_work_qsizes.append((monotonic_time(), self.encode_work_queue.qsize()))
    22952295        self.encode_work_queue.put(fn_and_args)
    22962296
    2297     def queue_packet(self, packet, wid=0, pixels=0, start_send_cb=None, end_send_cb=None):
     2297    def queue_packet(self, packet, wid=0, pixels=0, start_send_cb=None, end_send_cb=None, fail_cb=None):
    22982298        """
    22992299            Add a new 'draw' packet to the 'packet_queue'.
    23002300            Note: this code runs in the non-ui thread
     
    23032303        self.statistics.packet_qsizes.append((now, len(self.packet_queue)))
    23042304        if wid>0:
    23052305            self.statistics.damage_packet_qpixels.append((now, wid, sum(x[2] for x in list(self.packet_queue) if x[1]==wid)))
    2306         self.packet_queue.append((packet, wid, pixels, start_send_cb, end_send_cb))
     2306        self.packet_queue.append((packet, wid, pixels, start_send_cb, end_send_cb, fail_cb))
    23072307        p = self.protocol
    23082308        if p:
    23092309            p.source_has_more()
  • xpra/server/window/window_source.py

     
    17261726            now = monotonic_time()
    17271727            damage_in_latency = now-process_damage_time
    17281728            self.statistics.damage_in_latency.append((now, width*height, actual_batch_delay, damage_in_latency))
    1729         self.queue_packet(packet, self.wid, width*height, start_send, damage_packet_sent)
     1729        fail_cb = self.get_fail_cb(packet)
     1730        #log.info("queuing %s packet with fail_cb=%s", coding, fail_cb)
     1731        self.queue_packet(packet, self.wid, width*height, start_send, damage_packet_sent, fail_cb)
    17301732
     1733    def get_fail_cb(self, packet):
     1734        def resend():
     1735            log.warn("paint packet failure, resending")
     1736            x,y,width,height = packet[2:6]
     1737            damage_packet_sequence = packet[8]
     1738            self.damage_packet_acked(damage_packet_sequence, width, height, 0, "")
     1739            self.idle_add(self.damage, x, y, width, height)
     1740        return resend
     1741
    17311742    def damage_packet_acked(self, damage_packet_sequence, width, height, decode_time, message):
    17321743        """
    17331744            The client is acknowledging a damage packet,
  • xpra/server/window/window_video_source.py

     
    14261426        return {}
    14271427
    14281428
     1429    def get_fail_cb(self, packet):
     1430        coding = packet[6]
     1431        if coding in self.common_video_encodings:
     1432            return None
     1433        return WindowSource.get_fail_cb(self, packet)
     1434
     1435
    14291436    def make_draw_packet(self, x, y, w, h, coding, data, outstride, client_options={}, options={}):
    14301437        #overriden so we can invalidate the scroll data:
    14311438        #log.error("make_draw_packet%s", (x, y, w, h, coding, "..", outstride, client_options)
  • xpra/x11/desktop_server.py

     
    1111import struct
    1212from threading import Event
    1313
    14 from xpra.util import updict, log_screen_sizes, envbool, nonl
     14from xpra.util import updict, log_screen_sizes, envbool, nonl, csv
    1515from xpra.os_util import get_generic_os_name, memoryview_to_bytes
    1616from xpra.platform.paths import get_icon
    1717from xpra.platform.gui import get_wm_name
     
    599599        return w, h, 32, 32, False, True, 255, 255, 255, 16, 8, 0
    600600
    601601    def process_rfb_packet(self, proto, packet):
    602         rfblog("RFB packet: '%s'", nonl(packet))
     602        #rfblog("RFB packet: '%s'", nonl(packet))
    603603        fn_name = "_process_rfb_%s" % packet[0]
    604604        fn = getattr(self, fn_name, None)
    605605        if not fn:
     
    663663            modifiers = []
    664664            self._handle_key(wid, bool(pressed), name, keyval, keycode, modifiers)
    665665
     666    def _process_rfb_SetEncodings(self, _proto, packet):
     667        from xpra.net.rfb_const import RFBEncoding
     668        rfblog("RFB: SetEncodings %s", packet)
     669        n, encodings = packet[2:4]
     670        known_encodings = [RFBEncoding.ENCODING_STR.get(x) for x in encodings if x in RFBEncoding.ENCODING_STR]
     671        unknown_encodings = [x for x in encodings if x not in RFBEncoding.ENCODING_STR]
     672        rfblog("%i encodings: %s, unknown: %s", n, csv(known_encodings), csv(unknown_encodings))
     673
     674    def _process_rfb_SetPixelFormat(self, _proto, packet):
     675        rfblog("RFB: SetPixelFormat %s", packet)
     676        #w, h, bpp, depth, bigendian, truecolor, rmax, gmax, bmax, rshift, bshift, gshift = packet
     677
    666678    def _process_rfb_FramebufferUpdateRequest(self, _proto, packet):
    667679        #pressed, _, _, keycode = packet[1:5]
    668680        inc, x, y, w, h = packet[1:6]
     
    727739    def keys_changed(self):
    728740        pass
    729741
     742
    730743    def send_server_event(self, *_args):
    731744        pass
    732745
     
    735748
    736749
    737750    def damage(self, _wid, window, x, y, w, h, _options=None):
     751        from xpra.net.protocol import PACKET_JOIN_SIZE
    738752        img = window.get_image(x, y, w, h)
    739753        rfblog("damage: %s", img)
    740754        fbupdate = struct.pack("!BBH", 0, 0, 1)
     
    745759        pixels = img.get_pixels()
    746760        assert len(pixels)>=4*w*h
    747761        pixels = pixels[:4*w*h]
    748         if len(pixels)<=4096:
     762        if len(pixels)<=PACKET_JOIN_SIZE:
    749763            self.send(fbupdate+rect+memoryview_to_bytes(pixels))
    750764        else:
    751765            self.send(fbupdate+rect)