diff --git a/logicnet/base/validator.py b/logicnet/base/validator.py index 2d33db13..86278dcb 100644 --- a/logicnet/base/validator.py +++ b/logicnet/base/validator.py @@ -2,14 +2,15 @@ import torch import asyncio import threading +import time import bittensor as bt from typing import List from traceback import print_exception - from logicnet.base.neuron import BaseNeuron + class BaseValidatorNeuron(BaseNeuron): """ Base class for Bittensor validators. Your validator should inherit from this class. @@ -21,6 +22,9 @@ def __init__(self, config=None): # Save a copy of the hotkeys to local memory. self.hotkeys = copy.deepcopy(self.metagraph.hotkeys) + # Save a copy of the new hotkeys to local memory. + self.new_hotkeys = [] + # Dendrite lets us send messages to other nodes (axons) in the network. self.dendrite = bt.dendrite(wallet=self.wallet) bt.logging.info(f"\033[1;32m🔗 Dendrite: {self.dendrite}\033[0m") @@ -251,9 +255,10 @@ def resync_metagraph(self): bt.logging.info( "\033[1;32m🔄 Metagraph updated, re-syncing hotkeys, dendrite pool and moving averages\033[0m" ) - # Zero out all hotkeys that have been replaced. + + # Zero out all hotkeys that have been replaced and add them to the new hotkeys list. for uid, hotkey in enumerate(self.hotkeys): - if (hotkey != self.metagraph.hotkeys[uid]): + if hotkey != self.metagraph.hotkeys[uid]: self.scores[uid] = 0 # hotkey has been replaced # Check to see if the metagraph has changed size. diff --git a/logicnet/utils/func_helper.py b/logicnet/utils/func_helper.py new file mode 100644 index 00000000..0f66773a --- /dev/null +++ b/logicnet/utils/func_helper.py @@ -0,0 +1,5 @@ +def linear_function(x, m=1, b=0): + """ + Computes the value of a linear function f(x) = mx + b + """ + return m * x + b \ No newline at end of file diff --git a/logicnet/validator/miner_manager.py b/logicnet/validator/miner_manager.py index 9b214b3e..a263fc57 100644 --- a/logicnet/validator/miner_manager.py +++ b/logicnet/validator/miner_manager.py @@ -60,10 +60,23 @@ class MinerManager: def __init__(self, validator): self.validator = validator self.all_uids = [int(uid.item()) for uid in self.validator.metagraph.uids] + # Ensure all entries are MinerInfo objects self.all_uids_info = {uid: MinerInfo() for uid in self.all_uids} def to_dict(self): - return {uid: info.to_dict() for uid, info in self.all_uids_info.items()} + """Convert miner info to dictionary format, ensuring all entries are MinerInfo objects""" + result = {} + for uid, info in self.all_uids_info.items(): + if isinstance(info, dict): + # Convert dict to MinerInfo object if needed + info = MinerInfo(**info) + self.all_uids_info[uid] = info + elif not isinstance(info, MinerInfo): + # Create new MinerInfo object if invalid type + info = MinerInfo() + self.all_uids_info[uid] = info + result[uid] = info.to_dict() + return result def get_miner_info(self): """ @@ -101,10 +114,7 @@ def update_miners_identity(self): miner_distribution = {} for uid, info in valid_miners_info.items(): # info = self.all_uids_info[int(uid)] if int(uid) in self.all_uids_info else MinerInfo(**info) - miner_state = self.all_uids_info.setdefault( - uid, - {"scores": [], "reward_logs": []}, - ) + miner_state = self.all_uids_info.setdefault(uid, MinerInfo()) miner_state.category = info.get("category", "") miner_state.epoch_volume = info.get("epoch_volume") if info.get("epoch_volume") else 512 info = miner_state @@ -139,9 +149,9 @@ def get_miner_uids(self, category: str): Get miner uids based on category, useful if subnet has multiple categories """ available_uids = [ - int(uid) - for uid in self.all_uids_info.keys() - if self.all_uids_info[uid].category == category + uid for uid in self.all_uids_info.keys() + if isinstance(self.all_uids_info[uid], MinerInfo) and + self.all_uids_info[uid].category == category ] return available_uids diff --git a/logicnet/validator/rewarder.py b/logicnet/validator/rewarder.py index 00e86e13..dfd08319 100644 --- a/logicnet/validator/rewarder.py +++ b/logicnet/validator/rewarder.py @@ -2,6 +2,7 @@ import openai import sympy import random +import time import bittensor as bt from concurrent import futures from logicnet.protocol import LogicSynapse @@ -9,12 +10,19 @@ from logicnet.utils.model_selector import model_selector from logicnet.utils.regex_helper import extract_numbers from logicnet.validator.prompt import DETECT_TRICK_TEMPLATE, CORRECTNESS_TEMPLATE, EXTRACT_ANSWER_PROMPT +from logicnet.utils.func_helper import linear_function SIMILARITY_WEIGHT = 0.3 CORRECTNESS_WEIGHT = 0.7 PROCESSING_TIME_WEIGHT = -0.05 +ELIGIBLE_TIMEOUT = 604800 # 7 days +class MinerInfo: + def __init__(self, uid: str, time: float): + self.uid = uid + self.time = time + class LogicRewarder: def __init__(self, model_rotation_pool: dict): @@ -24,7 +32,7 @@ def __init__(self, model_rotation_pool: dict): self.model_rotation_pool = model_rotation_pool self.embedder = SentenceTransformer("sentence-transformers/all-MiniLM-L6-v2") - def __call__(self, uids, responses: list[LogicSynapse], base_synapse: LogicSynapse): + def __call__(self, uids, responses: list[LogicSynapse], base_synapse: LogicSynapse, new_miners: list[MinerInfo]): """Calculate reward for each response using similarity, correctness, and processing time. Args: @@ -68,7 +76,7 @@ def __call__(self, uids, responses: list[LogicSynapse], base_synapse: LogicSynap + CORRECTNESS_WEIGHT * correctness[i] + PROCESSING_TIME_WEIGHT * min(process_times[i] / timeout, 1) ) - + # Scale up the reward reward = reward / 2 + 0.5 valid_rewards.append(reward) @@ -93,8 +101,15 @@ def __call__(self, uids, responses: list[LogicSynapse], base_synapse: LogicSynap except Exception as e: bt.logging.error(f"Error in logging reward for valid miners: {e}") - - + + # Bonus reward for new miners + if not new_miners: + bt.logging.info("No new miners within the eligible timeframe") + else: + # Convert new_miners from list of dicts to list of MinerInfo objects + new_miners = [MinerInfo(uid=miner['uid'], time=miner['time']) for miner in new_miners] + valid_rewards = self.bonus_rewarder(valid_uids, new_miners, valid_rewards) + total_uids = valid_uids + invalid_uids rewards = valid_rewards + invalid_rewards @@ -204,7 +219,6 @@ def clean_response(self, response: str): response = response.replace(char, ' ') return response - def _get_correctness_by_llm(self, question: str, ground_truth: str, response: str, model_name: str, openai_client: openai.OpenAI): """Calculate the correctness score for a single response using LLM. @@ -430,4 +444,49 @@ def _get_ground_truth(self, question: str): except openai.OpenAIError as e: bt.logging.error(f"API request failed after switching: {e}") - return response \ No newline at end of file + return response + + def bonus_rewarder(self, available_miner_uids: list[int], eligible_new_miners: list[MinerInfo], rewards: list[float]): + """Reward new miners with a bonus based on their registration time. + + Args: + available_miner_uids (list[int]): List of miner UIDs that are currently active. + eligible_new_miners (list[MinerInfo]): List of MinerInfo objects containing miner UIDs and registration times. + rewards (list[float]): List of base rewards to be modified with bonuses. + + Returns: + list[float]: Modified rewards list with time-based bonuses applied to eligible new miners. + """ + if not available_miner_uids: + bt.logging.info("No available miners") + return rewards + + new_miners_dict = {int(miner.uid): miner.time for miner in eligible_new_miners} + current_time = time.time() + + bt.logging.debug(f"Processing bonus rewards - Eligible new miners: {new_miners_dict}") + bt.logging.debug(f"Available miner UIDs: {available_miner_uids}") + + # Process each miner exactly once with direct dictionary lookup + for idx, miner_uid in enumerate(available_miner_uids): + if miner_uid in new_miners_dict: + miner_time = new_miners_dict[miner_uid] + time_factor = 1 - (current_time - miner_time) / ELIGIBLE_TIMEOUT + + if time_factor > 0: + original_reward = rewards[idx] + # Calculate bonus as a percentage of original reward, scaled by time factor + bonus = linear_function(time_factor, m=0.1 * original_reward) + rewards[idx] = min(original_reward + bonus, 1.0) + + bt.logging.info( + f"Bonus applied to miner {miner_uid}: " + f"final incentive = {rewards[idx]:.4f} " + f"(original = {original_reward:.4f}, bonus = {bonus:.4f}, " + f"time_factor = {time_factor:.2f})" + ) + else: + bt.logging.debug(f"Miner {miner_uid} bonus period expired " + f"(registered {(current_time - miner_time) / 86400:.1f} days ago)") + + return rewards \ No newline at end of file diff --git a/neurons/validator/validator.py b/neurons/validator/validator.py index 29e47f5f..b40afb8b 100644 --- a/neurons/validator/validator.py +++ b/neurons/validator/validator.py @@ -24,6 +24,9 @@ from neurons.validator.core.serving_queue import QueryQueue from collections import defaultdict import wandb +from bittensor import Subtensor + +ELIGIBLE_TIMEOUT = 604800 # 7 days def init_category(config=None, model_rotation_pool=None, dataset_weight=None): @@ -150,6 +153,7 @@ def forward(self): Query miners by batched from the serving queue then process challenge-generating -> querying -> rewarding in background by threads DEFAULT: 16 miners per batch, 600 seconds per loop. """ + self.sync() self.store_miner_infomation() bt.logging.info("\033[1;34m🔄 Updating available models & uids\033[0m") async_batch_size = self.config.async_batch_size @@ -214,7 +218,6 @@ def forward(self): ) time.sleep(loop_base_time - actual_time_taken) - def async_query_and_reward( self, category: str, @@ -256,9 +259,12 @@ def async_query_and_reward( uid for uid, should_reward in zip(uids, should_rewards) if should_reward ] + # Get new miners within the ELIGIBLE_TIMEOUT period + self.new_hotkeys = self.get_new_miners() + if reward_uids: uids, rewards, reward_logs = self.categories[category]["rewarder"]( - reward_uids, reward_responses, base_synapse + reward_uids, reward_responses, base_synapse, self.new_hotkeys ) for i, uid in enumerate(reward_uids): @@ -434,6 +440,7 @@ def save_state(self): bt.logging.info("State successfully saved to state.pkl") except Exception as e: bt.logging.error(f"Failed to save state: {e}") + def load_state(self): """Loads state of validator from a file, with fallback to .pt if .pkl is not found.""" # TODO: After a transition period, remove support for the old .pt format. @@ -475,7 +482,6 @@ def load_state(self): self.step = 0 # Default fallback in case of an unknown error bt.logging.error(f"Error loading state: {e}") - def store_miner_infomation(self): miner_informations = self.miner_manager.to_dict() @@ -574,6 +580,65 @@ def _log_wandb(self, log): except Exception as e: bt.logging.error(f"Error logging to wandb: {e}") + def get_new_miners(self): + """ + Get newly registered miners within the ELIGIBLE_TIMEOUT period. + + Returns: + list: List of dicts containing new miner UIDs and their registration timestamps. + Format: [{'uid': int, 'time': float}, ...] + """ + try: + # Get current block number from the chain + current_block_number = bt.subtensor().get_current_block() + + # Query registration blocks for all miners in netuid 78 + netuid = 35 + on_chain_data = [] + new_miners = [] + + with Subtensor() as subtensor: + on_chain_data = subtensor.query_map('SubtensorModule', 'BlockAtRegistration', params=[netuid]) + + if not on_chain_data: + bt.logging.warning(f"No registration data found for netuid {netuid}") + return [] + + # Process registration data + miner_status = [] + + # Get block numbers for all miners + for uid, block_number_ in on_chain_data: + block_number = block_number_.value + miner_status.append({ + 'uid': uid, + 'registered_block_number': block_number + }) + + # Calculate age and filter recent miners + current_time = time.time() + for miner in miner_status: + # Convert blocks to seconds (12 seconds per block) + blocks_age = current_block_number - miner['registered_block_number'] + miner_age_seconds = blocks_age * 12 + if miner_age_seconds < ELIGIBLE_TIMEOUT: + bt.logging.info(f"Miner {miner['uid']} registered {miner_age_seconds / 86400:.1f} days ago") + registered_timestamp = current_time - miner_age_seconds + new_miners.append({ + 'uid': miner['uid'], + 'time': registered_timestamp + }) + + if not new_miners: + bt.logging.debug(f"No new miners found within the last {ELIGIBLE_TIMEOUT / 86400:.1f} days") + else: + bt.logging.info(f"Found {len(new_miners)} new miners within {ELIGIBLE_TIMEOUT / 86400:.1f} days") + + return new_miners + + except Exception as e: + bt.logging.error(f"Error in get_new_miner: {str(e)}") + return [] # The main function parses the configuration and runs the validator. if __name__ == "__main__":