xpra icon
Bug tracker and wiki

Ticket #1174: named-pipes-ctypes-v3.patch

File named-pipes-ctypes-v3.patch, 14.0 KB (added by Antoine Martin, 2 years ago)

can connect and read the packet.. but we need overlapped IO to both read and write without blocking, oh joy

  • xpra/log.py

     
    237237                ("mmap"         , "mmap transfers"),
    238238                ("protocol"     , "Packet input and output (formatting, parsing, sending and receiving)"),
    239239                ("websocket"    , "Websocket layer"),
     240                ("named-pipe"   , "Named pipe"),
    240241                ("crypto"       , "Encryption"),
    241242                ("auth"         , "Authentication"),
    242243                ])),
  • xpra/net/bytestreams.py

     
    1414from xpra.log import Logger
    1515log = Logger("network", "protocol")
    1616from xpra.net import ConnectionClosedException
    17 from xpra.util import envint, envbool
     17from xpra.util import envint, envbool, repr_ellipsized
    1818from xpra.os_util import WIN32
    1919
    2020
     
    128128            return f(*a, **kw)
    129129        except Exception as e:
    130130            retry = can_retry(e)
    131             log("untilConcludes(%s, %s, %s, %s) %s, retry=%s", is_active_cb, f, a, kw, e, retry)
     131            log("untilConcludes(%s, %s, %s, %s) %s, retry=%s", is_active_cb, f, repr_ellipsized(str(a)), kw, e, retry)
    132132            if retry:
    133133                if wait>0:
    134134                    time.sleep(wait/1000.0)     #wait is in milliseconds, sleep takes seconds
  • xpra/platform/win32/namedpipes/connection.py

     
    66
    77#@PydevCodeAnalysisIgnore
    88
    9 import ctypes
    10 import binascii
     9from ctypes import windll, addressof, byref, c_ulong, c_char_p, c_char, c_void_p, cast, string_at
    1110
    1211from xpra.net.bytestreams import Connection
    1312
    1413from xpra.log import Logger
    15 log = Logger("network", "win32")
     14log = Logger("network", "named-pipe", "win32")
    1615
    17 kernel32 = ctypes.windll.kernel32
     16kernel32 = windll.kernel32
     17CreateFileA = kernel32.CreateFileA
    1818ReadFile = kernel32.ReadFile
    1919WriteFile = kernel32.WriteFile
    2020CloseHandle = kernel32.CloseHandle
    2121DisconnectNamedPipe = kernel32.DisconnectNamedPipe
    2222FlushFileBuffers = kernel32.FlushFileBuffers
     23WaitNamedPipeA = kernel32.WaitNamedPipeA
     24GetLastError = kernel32.GetLastError
     25SetNamedPipeHandleState = kernel32.SetNamedPipeHandleState
    2326
    2427ERROR_PIPE_NOT_CONNECTED = 233
    2528ERROR_MORE_DATA = 234
     
    2730
    2831class NamedPipeConnection(Connection):
    2932    def __init__(self, name, pipe_handle):
     33        log("NamedPipeConnection(%s, %i)", name, pipe_handle)
    3034        Connection.__init__(self, name, "named-pipe")
    3135        self.pipe_handle = pipe_handle
    3236
    33     def untilConcludes(self, *args):
     37    def untilConcludes(self, fn, *args, **kwargs):
    3438        try:
    35             return Connection.untilConcludes(self, *args)
     39            return Connection.untilConcludes(self, fn, *args, **kwargs)
    3640        except Exception as e:
    37             code = ctypes.get_last_error()
     41            code = GetLastError()
     42            log("untilConcludes(%s, ) exception: %s, error code=%s", fn, e, code)
    3843            if code==ERROR_PIPE_NOT_CONNECTED:
    3944                return None
    4045            raise IOError("%s: %s" % (e, code))
     
    4348        return self._read(self._pipe_read, n)
    4449
    4550    def _pipe_read(self, buf):
     51        BUFSIZE = 4096
     52        buf = (c_char*BUFSIZE)()
     53        p = cast(addressof(buf), c_void_p)
     54        read = c_ulong(0)
     55        r = ERROR_MORE_DATA
    4656        data = []
    47         hr = ERROR_MORE_DATA
    48         while hr==ERROR_MORE_DATA:
    49             hr, d = ReadFile(self.pipe_handle, 65536)
    50             data.append(d)
     57        while r==ERROR_MORE_DATA:
     58            log("ReadFile(..)")
     59            r = ReadFile(self.pipe_handle, p, BUFSIZE, byref(read), None)
     60            log("ReadFile(..)=%i, len=%s", r, read.value)
     61            if r==1 or read.value!=0:
     62                data.append(string_at(p, read.value))
    5163        s = b"".join(data)
    52         log("pipe_read: %i / %s", hr, binascii.hexlify(s))
     64        log("pipe_read: %i bytes in %i chunks", len(s), len(data))          #, binascii.hexlify(s))
    5365        return s
    5466
    5567    def write(self, buf):
     
    5668        return self._write(self._pipe_write, buf)
    5769
    5870    def _pipe_write(self, buf):
    59         log("pipe_write: %s", binascii.hexlify(buf))
    60         WriteFile(self.pipe_handle, buf)
     71        size = len(buf)
     72        log("pipe_write: %i bytes", size)   #binascii.hexlify(buf))
     73        written = c_ulong(0)
     74        log("WriteFile(..)")
     75        r = WriteFile(self.pipe_handle, c_char_p(buf), len(buf), byref(written), None)
     76        log("WriteFile(..)=%s, len=%i", r, written.value)
     77        if not r:
     78            raise Exception("failed to write buffer to named pipe handle %i" % self.pipe_handle)
     79        log("pipe_write: %i bytes written, flushing", written.value)
    6180        FlushFileBuffers(self.pipe_handle)
    6281        #SetFilePointer(self.pipe_handle, 0, FILE_BEGIN)
    63         return len(buf)
     82        return written.value
    6483
    6584    def close(self):
    6685        def _close_err(fn, e):
     
    7089                l = log.debug
    7190            l("Error: %s(%s) %i: %s", fn, self.pipe_handle, code, e)
    7291        try:
     92            FlushFileBuffers(self.pipe_handle)
     93        except Exception as e:
     94            _close_err("FlushFileBuffers", e)
     95        try:
    7396            DisconnectNamedPipe(self.pipe_handle)
    7497        except Exception as e:
    7598            _close_err("DisconnectNamedPipe", e)
     
    87110        d["type"] = "named-pipe"
    88111        d["closed"] = self.pipe_handle is None
    89112        return d
     113
     114
     115GENERIC_READ = 0x80000000
     116GENERIC_WRITE = 0x40000000
     117OPEN_EXISTING = 0x3
     118INVALID_HANDLE_VALUE = -1
     119ERROR_PIPE_BUSY = 231
     120PIPE_READMODE_MESSAGE = 0x2
     121
     122def connect_to_namedpipe(pipe_name, timeout=10):
     123    log("connect_to_namedpipe(%s, %i)", pipe_name, timeout)
     124    import time
     125    start = time.time()
     126    while True:
     127        if time.time()-start>=timeout:
     128            raise Exception("timeout waiting for named pipe '%s'" % pipe_name)
     129        pipe_handle = CreateFileA(pipe_name, GENERIC_READ | GENERIC_WRITE, 0, None, OPEN_EXISTING, 0, None)
     130        log("CreateFileA(%s)=%s", pipe_name, pipe_handle)
     131        if pipe_handle!=INVALID_HANDLE_VALUE:
     132            break
     133        if GetLastError()!=ERROR_PIPE_BUSY:
     134            raise Exception("cannot open named pipe '%s'" % pipe_name)
     135        if WaitNamedPipeA(pipe_name, timeout*10000)==0:
     136            raise Exception("timeout waiting for named pipe '%s'" % pipe_name)
     137    #we have a valid handle!
     138    dwMode = c_ulong(PIPE_READMODE_MESSAGE)
     139    r = SetNamedPipeHandleState(pipe_handle, byref(dwMode), None, None);
     140    log("SetNamedPipeHandleState(..)=%i", r)
     141    if not r:
     142        log.warn("Warning: SetNamedPipeHandleState failed")
     143    return pipe_handle
  • xpra/platform/win32/namedpipes/listener.py

     
    1010from threading import Thread
    1111
    1212from xpra.log import Logger
    13 log = Logger("network", "win32")
     13log = Logger("network", "named-pipe", "win32")
    1414
    1515
    1616kernel32 = ctypes.windll.kernel32
     
    1717ReadFile = kernel32.ReadFile
    1818WriteFile = kernel32.WriteFile
    1919CloseHandle = kernel32.CloseHandle
    20 CreateNamedPipe = kernel32.CreateNamedPipeW
     20CreateNamedPipeA = kernel32.CreateNamedPipeA
    2121ConnectNamedPipe = kernel32.ConnectNamedPipe
    2222DisconnectNamedPipe = kernel32.DisconnectNamedPipe
    2323FlushFileBuffers = kernel32.FlushFileBuffers
     24GetLastError = kernel32.GetLastError
    2425
    2526FILE_GENERIC_READ = 0x120089
    2627FILE_GENERIC_WRITE = 0x120116
     
    3031PIPE_ACCESS_DUPLEX = 0x3
    3132PIPE_READMODE_BYTE = 0
    3233PIPE_UNLIMITED_INSTANCES = 0xff
    33 
     34PIPE_ACCESS_DUPLEX = 0x3
     35PIPE_TYPE_BYTE=0
     36PIPE_TYPE_MESSAGE = 0x4
     37PIPE_READMODE_MESSAGE = 0x2
     38PIPE_WAIT = 0
     39PIPE_UNLIMITED_INSTANCES = 255
     40NMPWAIT_USE_DEFAULT_WAIT = 0
     41INVALID_HANDLE_VALUE = -1
    3442ERROR_PIPE_CONNECTED = 535
    3543
    36 SECURITY_CREATOR_SID_AUTHORITY = (0, 0, 0, 0, 0, 3)
    37 SECURITY_WORLD_SID_AUTHORITY = (0, 0, 0, 0, 0, 1)
    38 SECURITY_WORLD_RID = 0
    39 SECURITY_CREATOR_OWNER_RID = 0
    40 
    4144TIMEOUT = 6000
    4245MAX_INSTANCES = PIPE_UNLIMITED_INSTANCES
    4346
    44 class SECURITY_ATTRIBUTES(ctypes.Structure):
    45     _fields_ = [
    46         ("nLength", ctypes.c_int),
    47         ("lpSecurityDescriptor", ctypes.c_void_p),
    48         ("bInheritHandle", ctypes.c_int),
    49         ]
    5047
    51 
    5248class NamedPipeListener(Thread):
    5349    def __init__(self, pipe_name, new_connection_cb=None):
    5450        self.pipe_name = pipe_name
    5551        self.new_connection_cb = new_connection_cb
    5652        self.exit_loop = False
    57         Thread.__init__(self, name="NamedPipeListener")
     53        Thread.__init__(self, name="NamedPipeListener-%s" % pipe_name)
    5854        self.daemon = True
    5955
    6056    def __repr__(self):
     
    6561        self.exit_loop = True
    6662
    6763    def run(self):
     64        log("%s.run()", self)
    6865        try:
    6966            self.do_run()
    7067        except Exception:
     
    7269
    7370    def do_run(self):
    7471        while not self.exit_loop:
    75             pipe_handle = self.CreatePipeHandle()
     72            pipe_handle = None
    7673            try:
    77                 hr = ConnectNamedPipe(pipe_handle)
    78                 assert hr in (0, ERROR_PIPE_CONNECTED), "ConnectNamedPipe returned %i" % hr
    79             except Exception as e:
    80                 log.error("Error: connecting pipe handle %s:", pipe_handle)
    81                 log.error(" %s", e)
     74                pipe_handle = self.CreatePipeHandle()
     75            except Exception:
     76                log.error("Error: failed to create named pipe '%s'", self.pipe_name)
     77                return
     78            log("CreatePipeHandle()=%s", pipe_handle)
     79            if pipe_handle==INVALID_HANDLE_VALUE:
     80                log.error("Error: invalid handle for named pipe '%s'", self.pipe_name)
     81                return
     82            hr = ConnectNamedPipe(pipe_handle, None)
     83            log("ConnectNamedPipe()=%s", hr)
     84            if self.exit_loop:
    8285                CloseHandle(pipe_handle)
    8386                break
    84             log("new client connected to pipe: %s", hr)
    85             if self.exit_loop:
    86                 break
    87             if self.new_connection_cb:
    88                 self.new_connection_cb(self, pipe_handle)
    89             else:
    90                 log.warn("Warning: no callback defined for new named pipe connection on %s", self.pipe_name)
    91                 CloseHandle(pipe_handle)
     87            if hr==0:
     88                if GetLastError()==ERROR_PIPE_CONNECTED:
     89                    pass
     90                else:
     91                    log.error("Error: cannot connect to named pipe '%s'", self.pipe_name)
     92                    CloseHandle(pipe_handle)
     93                    continue
     94            #from now on, the pipe_handle will be managed elsewhere:
     95            self.new_connection_cb(self, pipe_handle)
    9296
    9397    def CreatePipeHandle(self):
     98        BUFSIZE = 4096
    9499        sa = self.CreatePipeSecurityObject()
    95         try:
    96             return CreateNamedPipe(self.pipe_name,
    97                     PIPE_ACCESS_DUPLEX| FILE_FLAG_OVERLAPPED,
    98                     PIPE_READMODE_BYTE,
    99                     MAX_INSTANCES,
    100                     0, 0, TIMEOUT, sa)
    101         except Exception:
    102             log("failed to create named pipe '%s'", self.pipe_name)
    103             raise
     100        return CreateNamedPipeA(self.pipe_name, PIPE_ACCESS_DUPLEX,
     101                                PIPE_TYPE_BYTE | PIPE_READMODE_BYTE| PIPE_WAIT,
     102                                PIPE_UNLIMITED_INSTANCES, BUFSIZE, BUFSIZE, NMPWAIT_USE_DEFAULT_WAIT, sa)
    104103
    105104    def CreatePipeSecurityObject(self):
     105        #TODO: re-implement using ctypes
    106106        return None
    107         # Create a security object giving World read/write access,
    108         # but only "Owner" modify access.
    109         #sa = SECURITY_ATTRIBUTES()
    110         #sidEveryone = pywintypes.SID()
    111         #sidEveryone.Initialize(SECURITY_WORLD_SID_AUTHORITY,1)
    112         #sidEveryone.SetSubAuthority(0, SECURITY_WORLD_RID)
    113         #sidCreator = pywintypes.SID()
    114         #sidCreator.Initialize(SECURITY_CREATOR_SID_AUTHORITY,1)
    115         #sidCreator.SetSubAuthority(0, SECURITY_CREATOR_OWNER_RID)
    116         #acl = pywintypes.ACL()
    117         #acl.AddAccessAllowedAce(FILE_GENERIC_READ|FILE_GENERIC_WRITE, sidEveryone)
    118         #acl.AddAccessAllowedAce(FILE_ALL_ACCESS, sidCreator)
    119         #sa.SetSecurityDescriptorDacl(1, acl, 0)
    120         #return sa
  • xpra/platform/win32/shadow_server.py

     
    348348
    349349    def _new_connection(self, listener, *args):
    350350        socktype = self.socket_types.get(listener)
    351         log("_new_connection(%s) socktype=%s", listener, socktype)
     351        netlog("_new_connection(%s) socktype=%s", listener, socktype)
    352352        if socktype!="named-pipe":
    353353            return GTKServerBase._new_connection(self, listener)
    354354        pipe_handle = args[0]
  • xpra/scripts/main.py

     
    17561756            raise InitException("named pipes are only supported on MS Windows")
    17571757        import errno
    17581758        from xpra.platform.win32.dotxpra import PIPE_PATH
    1759         from xpra.platform.win32.namedpipes.connection import NamedPipeConnection
    1760         from win32file import CreateFile, GENERIC_READ, GENERIC_WRITE, OPEN_EXISTING    #@UnresolvedImport
     1759        from xpra.platform.win32.namedpipes.connection import NamedPipeConnection, connect_to_namedpipe
    17611760        path = PIPE_PATH+pipe_name
    17621761        try:
    1763             pipe_handle = CreateFile(path, GENERIC_READ | GENERIC_WRITE, 0, None, OPEN_EXISTING, 0, None)
     1762            pipe_handle = connect_to_namedpipe(path)
    17641763        except Exception as e:
    17651764            if e[0]==errno.ENOENT:
    17661765                raise InitException("the named pipe '%s' does not exist" % pipe_name)