24
loading...
This website collects cookies to deliver better user experience
import socket
import ssl
import uuid
def upgrade(s, conn_info):
with s.makefile(mode = 'rw', encoding = "ISO-8859-1") as f:
f.write(f'GET {conn_info["path"]} HTTP/1.1\r\n')
f.write(f'Host: {conn_info["host"]}\r\n')
f.write("Connection: Upgrade\r\n")
f.write("Upgrade: websocket\r\n")
f.write(f'Sec-Websocket-Key: {str(uuid.uuid1())}\r\n')
f.write("Sec-WebSocket-Version: 13\r\n")
f.write("\r\n")
f.flush()
# reading response
for line in f:
if line == "\n":
break
def read_payload_len(s):
match s.recv(1)[0] & 0x7F:
case 127:
return read_extra_len(s, 4)
case 126:
return read_extra_len(s, 2)
case l:
return l
def read_extra_len(s, num_of_bytes):
buf = s.recv(num_of_bytes)
len = 0
for i in range(num_of_bytes):
len += buf[i] << (8 * (num_of_bytes - i - 1))
return len
def read_frame(s):
header0 = s.recv(1)[0]
opcode = header0 & 0x0F
match opcode:
case 0x1:
payload_len = read_payload_len(s)
print(s.recv(payload_len))
case 0x9:
print("PING")
case 0xA:
print("PONG")
def connect(conn_info):
ctx = ssl.create_default_context()
with socket.create_connection((conn_info["host"], conn_info["port"])) as s:
with ctx.wrap_socket(s, server_hostname = conn_info["host"]) as ss:
upgrade(ss, conn_info)
while True:
read_frame(ss)
connect({"host": "api.bitkub.com",
"port": 443,
"path": "/websocket-api/market.trade.thb_btc"})
> python3.10 http_ex.py
b'{"amt":0.00004661,"bid":86911896,"rat":2140000,"sid":82187323,"stream":"market.trade.thb_btc","sym":"THB_BTC","ts":1636729677,"txn":"BTCSELL0011078574"}\n{"amt":0.23306074,"bid":86912246,"rat":2140000,"sid":82187325,"stream":"market.trade.thb_btc","sym":"THB_BTC","ts":1636729677,"txn":"BTCSELL0011078576"}'
b'{"amt":0.00466121,"bid":86912014,"rat":2140000,"sid":82187324,"stream":"market.trade.thb_btc","sym":"THB_BTC","ts":1636729677,"txn":"BTCSELL0011078575"}'