Source code for avantis_trader_sdk.rpc.category_parameters

import asyncio
from ..types import OpenInterest, OpenInterestLimits, Utilization, Skew


[docs] class CategoryParametersRPC: """ This class provides methods to retrieve and calculate various category parameters related to open interest, open interest limits, and category utilization. """ def __init__(self, client): """ Constructor for the CategoryParametersRPC class. Args: client: The TraderClient object. """ self.client = client
[docs] async def get_oi_limits(self): """ Retrieves the open interest limits for all categories. Returns: An OpenInterestLimits instance containing the open interest limits. The index of the list corresponds to the category index. """ PairStorage = self.client.contracts.get("PairStorage") Multicall = self.client.contracts.get("Multicall") pairs_info = await self.client.pairs_cache.get_pairs_info() group_indexes = await self.client.pairs_cache.get_group_indexes() pair_indexes = [] for group_index in group_indexes: for _, pair in pairs_info.items(): if pair.group_index == group_index: pair_indexes.append(pair.fee_index) break calls = [] for pair_index in pair_indexes: call_data = PairStorage.encodeABI(fn_name="groupMaxOI", args=[pair_index]) calls.append((PairStorage.address, call_data)) response = await Multicall.functions.aggregate(calls).call() decoded_response = [ int.from_bytes(value, byteorder="big") / 10**6 for value in response[1] ] decoded_response = { str(group_index): value for group_index, value in zip(group_indexes, decoded_response) } return OpenInterestLimits(limits=decoded_response)
[docs] async def get_oi(self): """ Retrieves the current open interest for all categories. Returns: An OpenInterest instance containing the long and short open interest """ PairStorage = self.client.contracts.get("PairStorage") Multicall = self.client.contracts.get("Multicall") group_indexes = await self.client.pairs_cache.get_group_indexes() long_calls = [] short_calls = [] for group_index in group_indexes: call_data = PairStorage.encodeABI(fn_name="groupOIs", args=[group_index, 0]) long_calls.append((PairStorage.address, call_data)) call_data = PairStorage.encodeABI(fn_name="groupOIs", args=[group_index, 1]) short_calls.append((PairStorage.address, call_data)) long_task = Multicall.functions.aggregate(long_calls).call() short_task = Multicall.functions.aggregate(short_calls).call() long_response, short_response = await asyncio.gather(long_task, short_task) long_response = [ int.from_bytes(value, byteorder="big") / 10**6 for value in long_response[1] ] short_response = [ int.from_bytes(value, byteorder="big") / 10**6 for value in short_response[1] ] long_response = { str(group_index): value for group_index, value in zip(group_indexes, long_response) } short_response = { str(group_index): value for group_index, value in zip(group_indexes, short_response) } return OpenInterest(long=long_response, short=short_response)
# async def get_oi(self): # """ # Retrieves the current open interest for all categories. # Returns: # An OpenInterest instance containing the long and short open interest # """ # pair_info = await self.client.pairs_cache.get_pairs_info() # pair_oi = await self.client.asset_parameters.get_oi() # group_indexes = await self.client.pairs_cache.get_group_indexes() # long_response = {} # short_response = {} # for group_index in group_indexes: # long_response[group_index] = 0 # short_response[group_index] = 0 # for pair_key, oi in pair_oi.long.items(): # if pair_key in self.client.pairs_cache._pair_mapping: # pair_index = self.client.pairs_cache._pair_mapping[pair_key] # pair = pair_info[pair_index] # group_index = pair.group_index # long_response[group_index] += oi # # Assuming pair_oi.short has the same keys as pair_oi.long # short_response[group_index] += pair_oi.short[pair_key] # else: # print(f"Warning: {pair_key} not found in pair_info") # return OpenInterest(long=long_response, short=short_response) # async def get_utilization(self): # """ # Calculates the category utilization for all categories. # Returns: # A Utilization instance containing the category utilization # percentage % for each category. # """ # PairStorage = self.client.contracts.get("PairStorage") # Multicall = self.client.contracts.get("Multicall") # pairs_info = await self.client.pairs_cache.get_pairs_info() # group_indexes = await self.client.pairs_cache.get_group_indexes() # pair_indexes = [] # for group_index in group_indexes: # for _, pair in pairs_info.items(): # if pair.group_index == group_index: # pair_indexes.append(pair.fee_index) # break # calls = [] # for pair_index in pair_indexes: # call_data = PairStorage.encodeABI(fn_name="groupOI", args=[pair_index]) # calls.append((PairStorage.address, call_data)) # oi_limits_task = self.get_oi_limits() # aggregate_task = Multicall.functions.aggregate(calls).call() # oi_limits, aggregate_response = await asyncio.gather( # oi_limits_task, aggregate_task # ) # utilization = {} # for pair_index in range(len(aggregate_response[1])): # current_oi = ( # int.from_bytes(aggregate_response[1][pair_index], byteorder="big") # / 10**6 # ) # limit = oi_limits.limits[str(pair_index)] # print(current_oi, limit) # utilization[pair_index] = current_oi * 100 / limit if limit else 0 # return Utilization(utilization=utilization)
[docs] async def get_utilization(self): """ Calculates the category utilization for all categories. Returns: A Utilization instance containing the category utilization absolute value for each category. """ oi_limits_task = self.get_oi_limits() oi_task = self.get_oi() oi_limits, oi = await asyncio.gather(oi_limits_task, oi_task) utilization = {} for group_index in range(len(oi.long)): current_oi = oi.long[str(group_index)] + oi.short[str(group_index)] limit = oi_limits.limits[str(group_index)] utilization[str(group_index)] = current_oi / limit if limit else 0 return Utilization(utilization=utilization)
[docs] async def get_category_skew(self): """ Calculates the category skew for all categories. Returns: An Skew instance containing the category skew absolute value for each category. """ oi = await self.get_oi() skew = {} for group_index, long_ratio in oi.long.items(): short_ratio = oi.short[group_index] skew[group_index] = ( long_ratio / (long_ratio + short_ratio) if long_ratio + short_ratio else 0 ) return Skew(skew=skew)