Toggle Navigation
Hatchery
Eggs
obsbadge
protocol.py
Users
Badges
Login
Register
MCH2022 badge?
go to mch2022.badge.team
protocol.py
raw
Content
""" Websockets protocol """ import ure as re import ustruct as struct import urandom as random import usocket as socket from ucollections import namedtuple # Opcodes OP_CONT = const(0x0) OP_TEXT = const(0x1) OP_BYTES = const(0x2) OP_CLOSE = const(0x8) OP_PING = const(0x9) OP_PONG = const(0xa) # Close codes CLOSE_OK = const(1000) CLOSE_GOING_AWAY = const(1001) CLOSE_PROTOCOL_ERROR = const(1002) CLOSE_DATA_NOT_SUPPORTED = const(1003) CLOSE_BAD_DATA = const(1007) CLOSE_POLICY_VIOLATION = const(1008) CLOSE_TOO_BIG = const(1009) CLOSE_MISSING_EXTN = const(1010) CLOSE_BAD_CONDITION = const(1011) URL_RE = re.compile(r'(wss|ws)://([A-Za-z0-9-\.]+)(?:\:([0-9]+))?(/.+)?') URI = namedtuple('URI', ('protocol', 'hostname', 'port', 'path')) class NoDataException(Exception): pass class ConnectionClosed(Exception): pass def urlparse(uri): """Parse ws:// URLs""" match = URL_RE.match(uri) if match: protocol = match.group(1) host = match.group(2) port = match.group(3) path = match.group(4) if protocol == 'wss': if port is None: port = 443 elif protocol == 'ws': if port is None: port = 80 else: raise ValueError('Scheme {} is invalid'.format(protocol)) return URI(protocol, host, int(port), path) class Websocket: """ Basis of the Websocket protocol. This can probably be replaced with the C-based websocket module, but this one currently supports more options. """ is_client = False def __init__(self, sock): self.sock = sock self.open = True def __enter__(self): return self def __exit__(self, exc_type, exc, tb): self.close() def settimeout(self, timeout): self.sock.settimeout(timeout) def read_frame(self, max_size=None): """ Read a frame from the socket. See https://tools.ietf.org/html/rfc6455#section-5.2 for the details. """ # Frame header two_bytes = self.sock.read(2) if not two_bytes: raise NoDataException byte1, byte2 = struct.unpack('!BB', two_bytes) # Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4) fin = bool(byte1 & 0x80) opcode = byte1 & 0x0f # Byte 2: MASK(1) LENGTH(7) mask = bool(byte2 & (1 << 7)) length = byte2 & 0x7f if length == 126: # Magic number, length header is 2 bytes length, = struct.unpack('!H', self.sock.read(2)) elif length == 127: # Magic number, length header is 8 bytes length, = struct.unpack('!Q', self.sock.read(8)) if mask: # Mask is 4 bytes mask_bits = self.sock.read(4) try: data = self.sock.read(length) except MemoryError: # We can't receive this many bytes, close the socket self.close(code=CLOSE_TOO_BIG) return True, OP_CLOSE, None if mask: data = bytes(b ^ mask_bits[i % 4] for i, b in enumerate(data)) return fin, opcode, data def write_frame(self, opcode, data=b''): """ Write a frame to the socket. See https://tools.ietf.org/html/rfc6455#section-5.2 for the details. """ fin = True mask = self.is_client # messages sent by client are masked length = len(data) # Frame header # Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4) byte1 = 0x80 if fin else 0 byte1 |= opcode # Byte 2: MASK(1) LENGTH(7) byte2 = 0x80 if mask else 0 if length < 126: # 126 is magic value to use 2-byte length header byte2 |= length self.sock.write(struct.pack('!BB', byte1, byte2)) elif length < (1 << 16): # Length fits in 2-bytes byte2 |= 126 # Magic code self.sock.write(struct.pack('!BBH', byte1, byte2, length)) elif length < (1 << 64): byte2 |= 127 # Magic code self.sock.write(struct.pack('!BBQ', byte1, byte2, length)) else: raise ValueError() if mask: # Mask is 4 bytes mask_bits = struct.pack('!I', random.getrandbits(32)) self.sock.write(mask_bits) data = bytes(b ^ mask_bits[i % 4] for i, b in enumerate(data)) self.sock.write(data) def recv(self): """ Receive data from the websocket. This is slightly different from 'websockets' in that it doesn't fire off a routine to process frames and put the data in a queue. If you don't call recv() sufficiently often you won't process control frames. """ assert self.open while self.open: try: fin, opcode, data = self.read_frame() except NoDataException: return '' except ValueError: self._close() raise ConnectionClosed() if not fin: raise NotImplementedError() if opcode == OP_TEXT: return data.decode('utf-8') elif opcode == OP_BYTES: return data elif opcode == OP_CLOSE: self._close() return elif opcode == OP_PONG: # Ignore this frame, keep waiting for a data frame continue elif opcode == OP_PING: # We need to send a pong frame self.write_frame(OP_PONG, data) # And then wait to receive continue elif opcode == OP_CONT: # This is a continuation of a previous frame raise NotImplementedError(opcode) else: raise ValueError(opcode) def send(self, buf): """Send data to the websocket.""" assert self.open if isinstance(buf, str): opcode = OP_TEXT buf = buf.encode('utf-8') elif isinstance(buf, bytes): opcode = OP_BYTES else: raise TypeError() self.write_frame(opcode, buf) def close(self, code=CLOSE_OK, reason=''): """Close the websocket.""" if not self.open: return buf = struct.pack('!H', code) + reason.encode('utf-8') self.write_frame(OP_CLOSE, buf) self._close() def _close(self): self.open = False self.sock.close()