Ticket #1424: html5-zerocopy.patch
File html5-zerocopy.patch, 7.9 KB (added by , 5 years ago) |
---|
-
html5/js/Protocol.js
91 91 this.cipher_out = null; 92 92 this.mode = 'binary'; // Current WebSocket mode: 'binary', 'base64' 93 93 this.rQ = []; // Receive queue 94 this.rQi = 0; // Receive queue index95 this.rQmax = 10000; // Max receive queue size before compacting96 94 this.sQ = []; // Send queue 97 95 this.mQ = []; // Worker message queue 96 this.header = []; 98 97 99 98 //Queue processing via intervals 100 99 this.process_interval = 4; //milliseconds … … 107 106 var me = this; 108 107 // init 109 108 this.rQ = []; 110 this.rQi = 0;111 109 this.sQ = []; 112 110 this.websocket = null; 113 111 // connect the socket … … 124 122 }; 125 123 this.websocket.onmessage = function (e) { 126 124 // push arraybuffer values onto the end 127 var u8 = new Uint8Array(e.data); 128 for (var i = 0; i < u8.length; i++) { 129 me.rQ.push(u8[i]); 130 } 131 // wait for 8 bytes 132 //if (me.rQ.length >= 8) { 133 //me._process(); 134 //} 125 me.rQ.push(new Uint8Array(e.data)); 135 126 }; 136 127 this.start_processing(); 137 128 } … … 152 143 var me = this; 153 144 if(this.rQ_interval_id === null){ 154 145 this.rQ_interval_id = setInterval(function(){ 155 if (me.rQ.length > = 8) {146 if (me.rQ.length > 0) { 156 147 me.process_receive_queue(); 157 148 } 158 149 }, this.process_interval); … … 185 176 this.mQ_interval_id = null; 186 177 } 187 178 179 188 180 XpraProtocol.prototype.process_receive_queue = function() { 189 // peek at first 8 bytes of buffer 190 var buf = this._buffer_peek(8); 181 var i = 0, j = 0; 182 if (this.header.length<8 && this.rQ.length>0) { 183 //add from receive queue data to header until we get the 8 bytes we need: 184 while (this.header.length<8 && this.rQ.length>0) { 185 var slice = this.rQ[0]; 186 var needed = 8-this.header.length; 187 var n = Math.min(needed, slice.length); 188 //console.log("header size", this.header.length, ", adding", n, "bytes from", slice.length); 189 //copy at most n characters: 190 for (i = 0; i < n; i++) { 191 this.header.push(slice[i]); 192 } 193 if (slice.length>needed) { 194 //replace the slice with what is left over: 195 this.rQ[0] = slice.subarray(n); 196 } 197 else { 198 //this slice has been fully consumed already: 199 this.rQ.shift(); 200 } 201 } 191 202 192 if (buf[0] !== ord("P")) { 193 msg = "invalid packet header format: " + buf[0]; 194 if (buf.length>1) { 195 msg += ": "; 196 for (c in buf) { 197 msg += String.fromCharCode(c); 203 //verify the header format: 204 if (this.header[0] !== ord("P")) { 205 msg = "invalid packet header format: " + this.header[0]; 206 if (this.header.length>1) { 207 msg += ": "; 208 for (c in this.header) { 209 msg += String.fromCharCode(c); 210 } 198 211 } 212 throw msg; 199 213 } 200 throw msg;201 214 } 202 215 203 var proto_flags = buf[1]; 216 if (this.header.length<8) { 217 //we need more data to continue 218 return; 219 } 220 221 var proto_flags = this.header[1]; 204 222 var proto_crypto = proto_flags & 0x2; 205 206 223 if (proto_flags!=0) { 207 224 // check for crypto protocol flag 208 225 if (!(proto_crypto)) { … … 210 227 } 211 228 } 212 229 213 var level = buf[2]; 214 var index = buf[3]; 230 var level = this.header[2]; 231 if (level & 0x20) { 232 throw "lzo compression is not supported"; 233 } 234 var index = this.header[3]; 235 if (index>=20) { 236 throw "invalid packet index: "+index; 237 } 215 238 var packet_size = 0; 216 for (var i=0; i<4; i++) { 217 //debug("size header["+i+"]="+buf[4+i]); 239 for (i=0; i<4; i++) { 218 240 packet_size = packet_size*0x100; 219 packet_size += buf[4+i];241 packet_size += this.header[4+i]; 220 242 } 243 221 244 // work out padding if necessary 222 245 var padding = 0 223 246 if (proto_crypto) { … … 224 247 padding = (this.cipher_in_block_size - packet_size % this.cipher_in_block_size); 225 248 packet_size += padding; 226 249 } 227 //debug("packet_size="+packet_size+", level="+level+", index="+index);228 250 229 // wait for packet to be complete 230 // the header is still on the buffer so wait for packetsize+headersize bytes! 231 if (this.rQ.length < packet_size+8) { 232 // we already shifted the header off the buffer? 233 //debug("packet is not complete yet"); 251 // verify that we have enough data for the full payload: 252 var rsize = 0; 253 for (i=0,j=this.rQ.length;i<j;++i) { 254 rsize += this.rQ[i].length; 255 } 256 if (rsize<packet_size) { 234 257 return; 235 258 } 236 259 237 // packet is complete but header is still on buffer 238 this._buffer_shift(8); 239 //debug("got a full packet, shifting off "+packet_size); 240 var packet_data = this._buffer_shift(packet_size); 260 // done parsing the header, the next packet will need a new one: 261 this.header = [] 241 262 263 var packet_data = null; 264 if (this.rQ[0].length==packet_size) { 265 //exact match: the payload is in a buffer already: 266 packet_data = this.rQ.shift(); 267 } 268 else { 269 //aggregate all the buffers into "packet_data" until we get exactly "packet_size" bytes: 270 packet_data = new Uint8Array(packet_size); 271 rsize = 0; 272 while (rsize < packet_size) { 273 var slice = this.rQ[0]; 274 var needed = packet_size - rsize; 275 //console.log("slice:", slice.length, "bytes, needed", needed); 276 if (slice.length>needed) { 277 //add part of this slice: 278 packet_data.set(slice.subarray(0, needed), rsize); 279 rsize += needed; 280 this.rQ[0] = slice.subarray(needed); 281 } 282 else { 283 //add this slice in full: 284 packet_data.set(slice, rsize); 285 rsize += slice.length; 286 this.rQ.shift(); 287 } 288 } 289 } 290 242 291 // decrypt if needed 243 292 if (proto_crypto) { 244 293 this.cipher_in.update(forge.util.createBuffer(uintToString(packet_data))); 245 294 var decrypted = this.cipher_in.output.getBytes(); 246 295 packet_data = []; 247 for ( vari=0; i<decrypted.length; i++)296 for (i=0; i<decrypted.length; i++) 248 297 packet_data.push(decrypted[i].charCodeAt(0)); 249 298 packet_data = packet_data.slice(0, -1 * padding); 250 299 } … … 255 304 // lz4 256 305 // python-lz4 inserts the length of the uncompressed data as an int 257 306 // at the start of the stream 258 var d = packet_data.s plice(0, 4);259 // will always belittle endian307 var d = packet_data.subarray(0, 4); 308 // output buffer length is stored as little endian 260 309 var length = d[0] | (d[1] << 8) | (d[2] << 16) | (d[3] << 24); 261 310 // decode the LZ4 block 262 var inflated = new Buffer(length); 263 var uncompressedSize = LZ4.decodeBlock(packet_data, inflated); 264 if(!proto_crypto) 265 inflated = inflated.slice(0, uncompressedSize); 266 } else if (level & 0x20) { 267 // lzo 311 console.log("lz4 decompress packet size", packet_size, ", lz4 length=", length); 312 var inflated = new Uint8Array(length); 313 var uncompressedSize = LZ4.decodeBlock(packet_data, inflated, 4); 314 // if lz4 errors out at the end of the buffer, ignore it: 315 if (uncompressedSize<=0 && packet_size+uncompressedSize!=0) { 316 console.error("failed to decompress lz4 data, error code:", uncompressedSize); 317 return; 318 } 268 319 } else { 269 320 // zlib 270 321 var inflated = new Zlib.Inflate(packet_data).decompress(); … … 287 338 } 288 339 this.raw_packets = {} 289 340 // pass to our packet handler 290 291 341 if((packet[0] === 'draw') && (packet[6] !== 'scroll')){ 292 342 var img_data = packet[7]; 293 343 if (typeof img_data === 'string') { 294 344 var uint = new Uint8Array(img_data.length); 295 for( vari=0,j=img_data.length;i<j;++i) {345 for(i=0,j=img_data.length;i<j;++i) { 296 346 uint[i] = img_data.charCodeAt(i); 297 347 } 298 348 packet[7] = uint; 299 349 } 300 else {301 packet[7] = new Uint8Array(packet[7]);302 }303 350 } 304 if (this.is_worker){351 if (this.is_worker){ 305 352 this.mQ[this.mQ.length] = packet; 306 353 } else { 307 354 this.packet_handler(packet, this.packet_ctx); … … 314 361 } 315 362 316 363 // see if buffer still has unread packets 317 if (this.rQ.length > = 8) {364 if (this.rQ.length > 0) { 318 365 this.process_receive_queue(); 319 366 } 320 367 } … … 405 452 this.cipher_out.start({iv: caps['cipher.iv']}); 406 453 } 407 454 408 XpraProtocol.prototype._buffer_peek = function(bytes) {409 return this.rQ.slice(0, 0+bytes);410 }411 455 412 XpraProtocol.prototype._buffer_shift = function(bytes) {413 return this.rQ.splice(0, 0+bytes);;414 }415 416 417 456 /* 418 457 If we are in a web worker, set up an instance of the protocol 419 458 */