henlo again
not much typings today, maybe later.
we run pyhon code ere but u could do anytin

import asyncio
from websockets.server import serve
import json
import base64
SERVICE_ID = 61445
CHARACTERISTIC_ID = "5261da01-fa7e-42ab-850b-7c80220097cc"
class Client():
def __init__(self, ws):
self.ws = ws
self.val = "AAAA"
self.vals = bytearray()
self.code = ""
def set_val(self, val):
b = bytearray(3)
b[2] = val
self.val = str(base64.b64encode(b), "utf8")
async def result(self, result):
await self.ws.send(json.dumps({"jsonrpc": "2.0", "id": self.data["id"], "result": result}))
async def ok(self):
await self.result(None)
async def params(self, method, **params):
await self.ws.send(json.dumps({"jsonrpc": "2.0", "method": method, "params": params}))
async def discover(self):
await self.params("didDiscoverPeripheral", peripheralId=0, name="sys", rssi=None)
await self.ok()
async def update_val(self):
await self.params("characteristicDidChange", serviceId=SERVICE_ID, characteristicId=CHARACTERISTIC_ID, message=self.val, encoding="base64")
async def notifications(self):
while True:
await self.update_val()
await asyncio.sleep(1)
async def read(self):
await self.result({"message": self.val, "encoding": "base64"})
if self.data["params"].get("startNotifications"):
asyncio.create_task(self.notifications())
def print(self, *v, **kv):
for e in v:
for c in str(e):
self.vals.append(ord(c))
self.vals.append(ord(" "))
async def write(self):
message = self.data["params"]["message"]
if message == "ggAAAAAA":
if self.code:
self.vals = bytearray()
print = self.print
exec(self.code)
self.code = ""
else:
if self.vals: self.vals = self.vals[1:]
else:
d = base64.b64decode(message)
self.code = self.code + str(d[1:], "utf8").replace(";", "\n")
await self.result(0)
self.set_val(self.vals[0] if self.vals and len(self.vals) > 0 else 0)
await self.update_val()
async def run(self):
async for msg in self.ws:
self.data = json.loads(msg)
print(self.data)
match self.data.get("method"):
case "discover": await self.discover()
case "read": await self.read()
case "write": await self.write()
case _: await self.ok()
class Server():
async def connection(self, ws):
await Client(ws).run()
async def serve(self):
async with serve(self.connection, "127.0.0.1", 20111):
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(Server().serve())