|
4 | 4 | from traceback import print_exc |
5 | 5 | import json |
6 | 6 | import os |
| 7 | +from typing import Any |
7 | 8 | from config import * |
| 9 | +from urllib import request |
| 10 | + |
8 | 11 | # Set sudo command to blank if in systemd (since then we are root) |
9 | 12 | SUDO = "" if os.geteuid() == 0 else "sudo " |
10 | 13 |
|
| 14 | + |
11 | 15 | def nft_add_table(): |
12 | | - cmd=f"{SUDO}nft add table inet {NFT_TABLE}" |
13 | | - subprocess.check_call(cmd.split(" ")) |
14 | | - cmd=f"{SUDO}nft add chain inet {NFT_TABLE} " + r" input { type filter hook input priority 0 \; }" |
15 | | - subprocess.check_call(cmd, shell=True) |
| 16 | + cmd = f"{SUDO}nft add table inet {NFT_TABLE}" |
| 17 | + _ = subprocess.check_call(cmd.split(" ")) |
| 18 | + cmd = ( |
| 19 | + f"{SUDO}nft add chain inet {NFT_TABLE} " |
| 20 | + + r" input { type filter hook input priority 0 \; }" |
| 21 | + ) |
| 22 | + _ = subprocess.check_call(cmd, shell=True) |
| 23 | + |
16 | 24 |
|
17 | 25 | def nft_drop_table(): |
18 | | - cmd=f"{SUDO}nft delete table inet {NFT_TABLE}" |
19 | | - subprocess.call(cmd.split(" ")) |
| 26 | + cmd = f"{SUDO}nft delete table inet {NFT_TABLE}" |
| 27 | + _ = subprocess.call(cmd.split(" ")) |
| 28 | + |
20 | 29 |
|
21 | | -def get_nft_counters()->dict[ipaddress.IPv4Address, int]: |
22 | | - cmd=f"{SUDO}nft -j list chain inet {NFT_TABLE} input" |
23 | | - counters:dict[ipaddress.IPv4Address, int] = {} |
| 30 | +def get_nft_counters() -> dict[ipaddress.IPv4Address, int]: |
| 31 | + cmd = f"{SUDO}nft -j list chain inet {NFT_TABLE} input" |
| 32 | + counters: dict[ipaddress.IPv4Address, int] = {} |
24 | 33 | try: |
25 | 34 | (_status, output) = subprocess.getstatusoutput(cmd) |
26 | 35 | x = json.loads(output) |
27 | | - for row in x['nftables']: |
28 | | - if 'rule' not in row: |
| 36 | + for row in x["nftables"]: |
| 37 | + if "rule" not in row: |
29 | 38 | continue |
30 | | - expr = row['rule']['expr'] |
31 | | - source = ipaddress.IPv4Address(expr[0]['match']['right']) |
32 | | - counters[source] = expr[1]['counter']['packets'] |
| 39 | + expr = row["rule"]["expr"] |
| 40 | + source = ipaddress.IPv4Address(expr[0]["match"]["right"]) |
| 41 | + counters[source] = expr[1]["counter"]["packets"] |
33 | 42 | except: |
34 | 43 | print_exc() |
35 | 44 | finally: |
36 | 45 | return counters |
37 | 46 |
|
38 | | -def nft_add_counter(ip:ipaddress.IPv4Address): |
| 47 | + |
| 48 | +def nft_add_counter(ip: ipaddress.IPv4Address): |
39 | 49 | cmd = f"{SUDO} nft add rule inet {NFT_TABLE} input ip saddr {ip} counter" |
40 | 50 | (_status, _output) = subprocess.getstatusoutput(cmd) |
41 | 51 |
|
42 | | -async def get_staked_nodes() ->dict[str, int]: |
43 | | - cmd = f"-u{CLUSTER} validators --output json" |
44 | | - proc = await asyncio.create_subprocess_exec("solana", *cmd.split(" "), stdout=asyncio.subprocess.PIPE, |
45 | | - stderr=asyncio.subprocess.PIPE) |
46 | | - stdout, _stderr = await proc.communicate() |
47 | | - output = json.loads(stdout) |
48 | | - return {v['identityPubkey']:v['activatedStake'] for v in output["validators"] if v['activatedStake'] > MIN_STAKE_TO_CARE and not v['delinquent']} |
49 | | - |
50 | | -async def get_contact_infos()->dict[str, ipaddress.IPv4Address]: |
51 | | - cmd = f"-u{CLUSTER} gossip --output json" |
52 | | - proc = await asyncio.create_subprocess_exec("solana", *cmd.split(" "), stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE) |
53 | | - stdout, _stderr = await proc.communicate() |
54 | | - output = json.loads(stdout) |
55 | | - return {v['identityPubkey']:ipaddress.IPv4Address( v['ipAddress']) for v in output if 'tpuPort' in v} |
56 | | - |
57 | | -def kill_dz_interface()->None: |
58 | | - subprocess.call(f"{SUDO}ip link set doublezero0 down", shell=True) |
| 52 | + |
| 53 | +async def get_staked_nodes() -> dict[str, int]: |
| 54 | + output = await get_from_RPC("getVoteAccounts") |
| 55 | + return { |
| 56 | + v["nodePubkey"]: v["activatedStake"] |
| 57 | + for v in output["current"] |
| 58 | + if v["activatedStake"] > MIN_STAKE_TO_CARE |
| 59 | + } |
| 60 | + |
| 61 | + |
| 62 | +async def get_contact_infos() -> dict[str, ipaddress.IPv4Address]: |
| 63 | + output = await get_from_RPC("getClusterNodes") |
| 64 | + return { |
| 65 | + v["pubkey"]: ipaddress.IPv4Address(v["tpuQuic"].split(":")[0]) |
| 66 | + for v in output |
| 67 | + if v.get("tpuQuic") is not None |
| 68 | + } |
| 69 | + |
| 70 | + |
| 71 | +def kill_dz_interface() -> None: |
| 72 | + _ = subprocess.call(f"{SUDO}ip link set doublezero0 down", shell=True) |
| 73 | + |
| 74 | + |
| 75 | +async def get_from_RPC(method: str) -> Any: |
| 76 | + url = f"https://api.{CLUSTER}.solana.com" |
| 77 | + headers = {"Content-Type": "application/json"} |
| 78 | + data = json.dumps( |
| 79 | + { |
| 80 | + "jsonrpc": "2.0", |
| 81 | + "id": 1, |
| 82 | + "method": method, |
| 83 | + "params": [], |
| 84 | + } |
| 85 | + ).encode() |
| 86 | + |
| 87 | + loop = asyncio.get_event_loop() |
| 88 | + |
| 89 | + def fetch() -> dict[str, Any]: |
| 90 | + req = request.Request(url, data=data, headers=headers, method="POST") |
| 91 | + with request.urlopen(req) as resp: |
| 92 | + return json.load(resp) |
| 93 | + |
| 94 | + try: |
| 95 | + result = await loop.run_in_executor(None, fetch) |
| 96 | + return result["result"] |
| 97 | + except Exception as e: |
| 98 | + print(f"Could not fetch data from RPC, error {e}") |
| 99 | + return [] |
0 commit comments