xpra icon
Bug tracker and wiki

This bug tracker and wiki are being discontinued
please use https://github.com/Xpra-org/xpra instead.


Ticket #2121: websockify-zerocopy.patch

File websockify-zerocopy.patch, 6.2 KB (added by Antoine Martin, 3 years ago)

working zerocopy patch

  • xpra/server/websocket.py

     
    337337            return None
    338338        return content
    339339
    340 
    341340try:
    342341    #websockify version > 0.8
    343342    #patch WebSocket class so we always choose binary
     
    349348            SocketConnection.__init__(self, socket, local, remote, target, socktype)
    350349            self.protocol_type = "websocket"
    351350            self.request = ws_handler.request
    352        
     351
    353352        def close(self):
    354353            SocketConnection.close(self)
    355354            request = self.request
     
    358357                    request.close()
    359358                except Exception:
    360359                    log("error closing %s", request, exc_info=True)
    361        
     360
    362361        def read(self, n):
    363362            #FIXME: we should try to honour n
    364363            #from websockify.websocket import WebSocketWantReadError, WebSocketWantWriteError
     
    380379                        self.input_bytecount += len(buf)
    381380                        return buf
    382381            return None
    383        
     382
    384383        def write(self, buf):
    385384            #log("write(%i bytes)", len(buf))
    386385            from websockify.websocket import WebSocketWantWriteError
     
    395394                    continue
    396395            return None
    397396
     397    from xpra.util import envint
     398    JOIN_SIZE = envint("XPRA_WEBSOCKET_JOIN_SIZE", 16*1024)
     399
     400    import sys
     401    import errno
     402    import socket
     403    import random
     404    import struct
     405    from websockify.websocket import WebSocketWantWriteError
     406    class ZeroCopyWebSocket(WebSocket):
     407
     408        def __init__(self):
     409            log.info("ZeroCopyWebSocket")
     410            WebSocket.__init__(self)
     411            #use an array:
     412            self._send_buffers = []
     413            delattr(self, "_send_buffer")
     414
     415        def _flush(self):
     416            log("_flush() %i buffers", len(self._send_buffers))
     417            # Writes pending data to the socket
     418            if not self._send_buffers:
     419                return
     420
     421            assert self.socket is not None
     422
     423            while self._send_buffers:
     424                buf = self._send_buffers[0]
     425                log("_flush() first buffer: %i bytes", len(buf))
     426                try:
     427                    sent = self.socket.send(buf)
     428                except (socket.error, OSError):
     429                    exc = sys.exc_info()[1]
     430                    if hasattr(exc, 'errno'):
     431                        err = exc.errno
     432                    else:
     433                        err = exc[0]
     434
     435                    if err == errno.EWOULDBLOCK:
     436                        raise WebSocketWantWriteError
     437
     438                    raise
     439
     440                log("_flush() sent %i from %i", sent, len(buf))
     441                if sent==len(buf):
     442                    self._send_buffers.pop(0)
     443                else:
     444                    self._send_buffers[0] = buf[sent:]
     445
     446            # We had a pending close and we've flushed the buffer,
     447            # time to end things
     448            if self._received_close and self._sent_close:
     449                self._close()
     450
     451        def _send(self, data):
     452            # Queues data and attempts to send it
     453            self._send_buffers.append(data)
     454            self._flush()
     455
     456        def _queue_str(self, string):
     457            # Queue some data to be sent later.
     458            # Only used by the connecting methods.
     459            self._send_buffers.append(string.encode("latin-1"))
     460
     461        def _sendmsg(self, opcode, msg):
     462            # Sends a standard data message
     463            if self.client:
     464                mask = ''
     465                for _ in range(4):
     466                    mask += chr(random.randrange(256))
     467                if sys.hexversion >= 0x3000000:
     468                    mask = bytes(mask, "latin-1")
     469                buf = self._mask(msg, mask)
     470                l = len(buf)
     471                frames = (self._encode_hybi_header(opcode, l, True), mask, buf)
     472            else:
     473                l = len(msg)
     474                if l<JOIN_SIZE:
     475                    #log("websocket: joined")
     476                    frames = (self._encode_hybi_header(opcode, l) + msg, )
     477                else:
     478                    #log("websocket: split")
     479                    frames = (self._encode_hybi_header(opcode, l), msg)
     480
     481            for frame in frames:
     482                self._send_buffers.append(frame)
     483            self._flush()
     484
     485        def _close(self):
     486            # Close the underlying socket
     487            self.socket.close()
     488            self.socket = None
     489
     490        def _encode_hybi_header(self, opcode, payload_len, has_mask=False, fin=True):
     491            """ Encode a HyBi style WebSocket frame.
     492            Optional opcode:
     493                0x0 - continuation
     494                0x1 - text frame
     495                0x2 - binary frame
     496                0x8 - connection close
     497                0x9 - ping
     498                0xA - pong
     499            """
     500            assert (opcode & 0x0f)==opcode, "invalid opcode %#x" % opcode
     501            mask_bit = 0x80*has_mask
     502            b1 = opcode | (0x80 * fin)
     503            if payload_len <= 125:
     504                return struct.pack('>BB', b1, payload_len | mask_bit)
     505            if payload_len > 125 and payload_len < 65536:
     506                return struct.pack('>BBH', b1, 126 | mask_bit, payload_len)
     507            return struct.pack('>BBQ', b1, 127 | mask_bit, payload_len)
     508
     509    WSRequestHandler.SocketClass = ZeroCopyWebSocket
     510
    398511except ImportError:
    399512    #websockify version 0.8 or older:
    400513    class WebSocketConnection(SocketConnection):
     
    403516            self.protocol_type = "websocket"
    404517            self.ws_handler = ws_handler
    405518            self.pending_read = Queue()
    406    
     519
    407520        def close(self):
    408521            self.pending_read = Queue()
    409522            SocketConnection.close(self)
    410    
     523
    411524        def read(self, n):
    412525            #FIXME: we should try to honour n
    413526            while self.is_active():
     
    428541                            self.pending_read.put(v)
    429542                    self.input_bytecount += len(buf)
    430543                    return buf
    431    
     544
    432545        def write(self, buf):
    433546            self.ws_handler.send_frames((memoryview_to_bytes(buf),))
    434547            self.output_bytecount += len(buf)