### Eclipse Workspace Patch 1.0
#P Xpra
|
|
|
155 | 155 | self._flush_one_packet_into_buffer() |
156 | 156 | return False |
157 | 157 | |
158 | | def _queue_write(self, data, flush=False): |
| 158 | def _queue_write(self, data, cb=None, flush=False): |
159 | 159 | """ |
160 | 160 | This method should be called with _write_lock held |
161 | 161 | """ |
… |
… |
|
163 | 163 | return |
164 | 164 | if self.raw_packets or self._compressor is None: |
165 | 165 | #raw packets are compressed individually, without the header |
166 | | self._write_queue.put(data) |
| 166 | self._write_queue.put((data, cb)) |
167 | 167 | return |
168 | 168 | c = self._compressor.compress(data) |
169 | 169 | if c: |
170 | | self._write_queue.put(c) |
| 170 | self._write_queue.put((c, None)) |
171 | 171 | if not flush: |
172 | 172 | return |
173 | 173 | c = self._compressor.flush(zlib.Z_SYNC_FLUSH) |
174 | 174 | if c: |
175 | | self._write_queue.put(c) |
| 175 | self._write_queue.put((c, cb)) |
176 | 176 | |
177 | 177 | def verify_packet(self, packet): |
178 | 178 | """ look for None values which may have caused the packet to fail encoding """ |
… |
… |
|
204 | 204 | def _flush_one_packet_into_buffer(self): |
205 | 205 | if not self.source: |
206 | 206 | return |
207 | | packet, self._source_has_more = self.source.next_packet() |
| 207 | packet, cb, self._source_has_more = self.source.next_packet() |
208 | 208 | if packet is not None: |
209 | | self._add_packet_to_queue(packet) |
| 209 | self._add_packet_to_queue(packet, cb) |
210 | 210 | |
211 | 211 | def encode(self, packet): |
212 | 212 | """ |
… |
… |
|
252 | 252 | packets.append((0, True, main_packet)) |
253 | 253 | return packets |
254 | 254 | |
255 | | def _add_packet_to_queue(self, packet): |
| 255 | def _add_packet_to_queue(self, packet, cb=None): |
256 | 256 | packets = self.encode(packet) |
257 | 257 | if not self.raw_packets: |
258 | 258 | assert len(packets)==1 |
… |
… |
|
276 | 276 | header = ("PS%014d" % l).encode('latin1') |
277 | 277 | if l<4096 and sys.version<'3': |
278 | 278 | #send size and data together (low copy overhead): |
279 | | self._queue_write(header+data, True) |
| 279 | self._queue_write(header+data, cb, True) |
280 | 280 | else: |
281 | 281 | self._queue_write(header) |
282 | | self._queue_write(data, True) |
| 282 | self._queue_write(data, cb, True) |
283 | 283 | finally: |
284 | 284 | if packet[0]=="set_deflate": |
285 | 285 | level = packet[1] |
… |
… |
|
295 | 295 | def _write_thread_loop(self): |
296 | 296 | try: |
297 | 297 | while True: |
298 | | buf = self._write_queue.get() |
| 298 | item = self._write_queue.get() |
299 | 299 | # Used to signal that we should exit: |
300 | | if buf is None: |
| 300 | if item is None: |
301 | 301 | log("write thread: empty marker, exiting") |
302 | 302 | break |
| 303 | buf, cb = item |
303 | 304 | try: |
304 | 305 | while buf and not self._closed: |
305 | 306 | written = untilConcludes(self._conn.write, buf) |
… |
… |
|
307 | 308 | buf = buf[written:] |
308 | 309 | self.output_raw_packetcount += 1 |
309 | 310 | self.output_bytecount += written |
| 311 | if cb: |
| 312 | cb() |
310 | 313 | except (OSError, IOError, socket.error), e: |
311 | 314 | self._call_connection_lost("Error writing to connection: %s" % e) |
312 | 315 | break |