Get Pools Info
This page shows how to retrieve pool data from Aquarius AMM.
Last updated
This page shows how to retrieve pool data from Aquarius AMM.
Last updated
You can retrieve information about pools in two ways: or by using the .
Pool details are also displayed on the Aquarius website, on pool info page - e.g.
Since calling contract methods does not alter the Stellar network and is executed through transaction simulation, a transaction signature is not required, and the call can be made using a public key. Scroll to see the complete code.
Specify user public key and tokens
# AQUA issuer - we don't call user related information in our simulations, so any public key is fine
USER_PUBLIC_KEY = 'GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA'
# XLM
TOKEN_A = Asset.native()
# AQUA
TOKEN_B = Asset('AQUA', 'GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA')
// AQUA issuer - we don't call user related information in our simulations, so any public key is fine
const userPublicKey = 'GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA'
// XLM
const tokenA = Asset.native();
// AQUA
const tokenB = new Asset('AQUA','GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA');
Retrieve a list of pools with specified tokens.
def get_pools(server, address, router_contract_id, tokens: list[Asset]) -> dict:
tx_builder = TransactionBuilder(
source_account=server.load_account(address),
network_passphrase=NETWORK_PASSPHRASE,
base_fee=1000000 # Set base fee; adjust as necessary
).set_timeout(3600) # Set transaction timeout
tx = (
tx_builder
.append_invoke_contract_function_op(
contract_id=router_contract_id,
function_name="get_pools",
parameters=[
scval.to_vec(order_token_ids([
Address(token.contract_id(NETWORK_PASSPHRASE)).to_xdr_sc_val()
for token in tokens
])),
],
)
.build()
)
simulation = server.simulate_transaction(tx)
tx_result = [xdr.SCVal.from_xdr(r.xdr) for r in simulation.results][0]
if not tx_result:
raise RuntimeError
return {
entry.key.bytes.sc_bytes.hex(): StrKey.encode_contract(binascii.unhexlify(entry.val.address.contract_id.hash.hex()))
for entry in tx_result.map.sc_map
}
async function getPools(server, tokensContactIds) {
/**
* Retrieves the pools associated with specific token contract IDs from the Stellar network.
*
* @async
* @param {rpc.Server} server - The Stellar server instance to connect to.
* @param {string[]} tokensContactIds - An array of token contract IDs for which pools are being queried.
* @returns {Promise<Array<[string, string]>>} - A promise that resolves to an array of arrays, each containing:
* - The contract ID as an encoded string.
* - The corresponding hash value from the pool.
*/
const contract = new StellarSdk.Contract(routerContractId);
const account = await server.getAccount(userPublicKey);
const tx = new TransactionBuilder(account, {
fee: BASE_FEE,
networkPassphrase: Networks.PUBLIC,
})
.addOperation(
contract.call(
'get_pools',
xdr.ScVal.scvVec(
orderTokensIds(
tokensContactIds.map(id => contractIdToScVal(id))
)
)
)
)
.setTimeout(TimeoutInfinite)
.build();
const simulateResult = await server.simulateTransaction(tx);
if (!simulateResult.result) {
return [];
}
const hashArray = simulateResult.result.retval.value();
if (!hashArray.length) {
return [];
}
return hashArray.map((item) => [
StrKey.encodeContract(Buffer.from(binascii.unhexlify(item.val().value().value().toString('hex')), 'ascii')),
item.key().value().toString('hex'),
]);
}
Retrieve information about each pool obtained in the previous step.
def get_pools_info():
server = SorobanServer(SOROBAN_SERVER_RPC)
pools = get_pools(
server,
USER_PUBLIC_KEY,
ROUTER_CONTRACT_ID,
[TOKEN_A, TOKEN_B],
)
for pool_hash, pool_id in pools.items():
print(f"pool '{pool_hash}', '{pool_id}'")
tx = (
TransactionBuilder(
source_account=server.load_account(USER_PUBLIC_KEY),
network_passphrase=NETWORK_PASSPHRASE,
base_fee=1000000
).set_timeout(3600)
.append_invoke_contract_function_op(
contract_id=ROUTER_CONTRACT_ID,
function_name="get_info",
parameters=[
scval.to_vec(order_token_ids([
Address(TOKEN_A.contract_id(NETWORK_PASSPHRASE)).to_xdr_sc_val(),
Address(TOKEN_B.contract_id(NETWORK_PASSPHRASE)).to_xdr_sc_val()
])),
scval.to_bytes(bytes.fromhex(pool_hash)), # Known pool hash as bytes
],
)
.build()
)
simulation = server.simulate_transaction(tx)
tx_result = [xdr.SCVal.from_xdr(r.xdr) for r in simulation.results][0]
if not tx_result:
raise RuntimeError
pool_info = {
entry.key.sym.sc_symbol.decode(): entry.val
for entry in tx_result.map.sc_map
}
pool_info['pool_type'] = pool_info['pool_type'].sym.sc_symbol.decode()
pool_info['fee'] = "{0}%".format(str(int(pool_info['fee'].u32.uint32) / 10_000 * 100))
if 'a' in pool_info:
pool_info['a'] = u128_to_int(pool_info['a'].u128)
if 'n_tokens' in pool_info:
pool_info['n_tokens'] = int(pool_info['n_tokens'].u32.uint32)
print('pool hash', pool_hash)
for key, value in pool_info.items():
print(key, value)
print('-------')
async function getPoolInfo(server, poolId) {
/**
* Retrieves detailed information about a specific pool from the Stellar network.
*
* @async
* @param {rpc.Server} server - The Stellar server instance to connect to.
* @param {string} poolId - The ID of the pool for which information is being retrieved.
* @returns {Promise<Object>} - A promise that resolves to an object containing key-value pairs of pool information.
*/
const contract = new Contract(poolId);
const account = await server.getAccount(userPublicKey);
const tx = new TransactionBuilder(account, {
fee: BASE_FEE,
networkPassphrase: Networks.PUBLIC,
})
.addOperation(
contract.call('get_info')
)
.setTimeout(TimeoutInfinite)
.build();
const simulateResult = await server.simulateTransaction(tx);
if (!simulateResult.result) {
throw new Error('Something went wrong');
}
return simulateResult.result.retval.value().reduce((acc, val) => {
acc[val.key().value().toString()] =
typeof val.val().value() === 'number'
? val.val().value()
: val.val().value().hi
? u128ToInt(val.val().value())
: val.val().value().toString();
return acc;
}, {});
}
async function getPoolsInfo() {
const server = new rpc.Server(sorobanServer);
const tokenAContractId = tokenA.contractId(Networks.PUBLIC);
const tokenBContractId = tokenB.contractId(Networks.PUBLIC)
const pools = await getPools(server, [tokenAContractId, tokenBContractId]);
console.log(pools);
// [
// [
// 'CCSY43EHJAHT3NQDYKAMJXRFBEEH7OXDL3J3VNGO33UUSEXWNN27GBIZ',
// '37b955f3708dae748cad465441fcad70b26ffe272385227a4a737234ae429bdf'
// ]
// ]
const poolsInfo = await Promise.all(pools.map(([id]) => getPoolInfo(server, id)));
console.log(poolsInfo);
// [ { fee: 100,, pool_type: 'constant_product' } ]
}
# get_pools.py
import binascii
from typing import List
from stellar_sdk import Address, Keypair, scval, SorobanServer, xdr, TransactionBuilder, Server, StrKey, Network, Asset
from stellar_sdk.xdr import UInt128Parts
# ==========================
# Configuration Variables
# ==========================
# Step 1. Specify user public key and tokens
# AQUA issuer - we don't call user related information in our simulations, so any public key is fine
USER_PUBLIC_KEY = 'GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA'
# XLM
TOKEN_A = Asset.native()
# AQUA
TOKEN_B = Asset('AQUA', 'GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA')
# Sorobanserver RPC endpoint and Router conract id
SOROBAN_SERVER_RPC = 'https://mainnet.sorobanrpc.com'
ROUTER_CONTRACT_ID = 'CBQDHNBFBZYE4MKPWBSJOPIYLW4SFSXAXUTSXJN76GNKYVYPCKWC6QUK'
# Stellar network passphrase
NETWORK_PASSPHRASE = Network.PUBLIC_NETWORK_PASSPHRASE
# ==========================
# Utility Functions
# ==========================
def order_token_ids(tokens: List[xdr.SCVal]) -> List[xdr.SCVal]:
"""
Orders token IDs based on their contract ID to maintain consistency.
Args:
tokens (List[xdr.SCVal]): List of token addresses as SCVal objects.
Returns:
List[xdr.SCVal]: Ordered list of token SCVal objects.
"""
return sorted(tokens, key=lambda token: int(token.address.contract_id.hash.hex(), 16))
def u128_to_int(value: UInt128Parts) -> int:
"""
Converts UInt128Parts from Stellar's XDR to a Python integer.
Args:
value (UInt128Parts): UInt128Parts object from Stellar SDK.
Returns:
int: Corresponding Python integer.
"""
return int(value.hi.uint64 << 64) + value.lo.uint64
# Step 2. Retrieve a list of pools with specified tokens.
def get_pools(server, address, router_contract_id, tokens: list[Asset]) -> dict:
tx_builder = TransactionBuilder(
source_account=server.load_account(address),
network_passphrase=NETWORK_PASSPHRASE,
base_fee=1000000 # Set base fee; adjust as necessary
).set_timeout(3600) # Set transaction timeout
tx = (
tx_builder
.append_invoke_contract_function_op(
contract_id=router_contract_id,
function_name="get_pools",
parameters=[
scval.to_vec(order_token_ids([
Address(token.contract_id(NETWORK_PASSPHRASE)).to_xdr_sc_val()
for token in tokens
])),
],
)
.build()
)
simulation = server.simulate_transaction(tx)
tx_result = [xdr.SCVal.from_xdr(r.xdr) for r in simulation.results][0]
if not tx_result:
raise RuntimeError
return {
entry.key.bytes.sc_bytes.hex(): StrKey.encode_contract(binascii.unhexlify(entry.val.address.contract_id.hash.hex()))
for entry in tx_result.map.sc_map
}
# Step 3. Retrieve information about each pool obtained in the previous step.
def get_pools_info():
server = SorobanServer(SOROBAN_SERVER_RPC)
pools = get_pools(
server,
USER_PUBLIC_KEY,
ROUTER_CONTRACT_ID,
[TOKEN_A, TOKEN_B],
)
for pool_hash, pool_id in pools.items():
print(f"pool '{pool_hash}', '{pool_id}'")
tx = (
TransactionBuilder(
source_account=server.load_account(USER_PUBLIC_KEY),
network_passphrase=NETWORK_PASSPHRASE,
base_fee=1000000
).set_timeout(3600)
.append_invoke_contract_function_op(
contract_id=ROUTER_CONTRACT_ID,
function_name="get_info",
parameters=[
scval.to_vec(order_token_ids([
Address(TOKEN_A.contract_id(NETWORK_PASSPHRASE)).to_xdr_sc_val(),
Address(TOKEN_B.contract_id(NETWORK_PASSPHRASE)).to_xdr_sc_val()
])),
scval.to_bytes(bytes.fromhex(pool_hash)), # Known pool hash as bytes
],
)
.build()
)
simulation = server.simulate_transaction(tx)
tx_result = [xdr.SCVal.from_xdr(r.xdr) for r in simulation.results][0]
if not tx_result:
raise RuntimeError
pool_info = {
entry.key.sym.sc_symbol.decode(): entry.val
for entry in tx_result.map.sc_map
}
pool_info['pool_type'] = pool_info['pool_type'].sym.sc_symbol.decode()
pool_info['fee'] = "{0}%".format(str(int(pool_info['fee'].u32.uint32) / 10_000 * 100))
if 'a' in pool_info:
pool_info['a'] = u128_to_int(pool_info['a'].u128)
if 'n_tokens' in pool_info:
pool_info['n_tokens'] = int(pool_info['n_tokens'].u32.uint32)
print('pool hash', pool_hash)
for key, value in pool_info.items():
print(key, value)
print('-------')
if __name__ == "__main__":
get_pools_info()
"""
pool '37b955f3708dae748cad465441fcad70b26ffe272385227a4a737234ae429bdf', 'CCSY43EHJAHT3NQDYKAMJXRFBEEH7OXDL3J3VNGO33UUSEXWNN27GBIZ'
pool hash 37b955f3708dae748cad465441fcad70b26ffe272385227a4a737234ae429bdf
fee 1.0%
pool_type constant_product
-------
pool '9ac7a9cde23ac2ada11105eeaa42e43c2ea8332ca0aa8f41f58d7160274d718e', 'CCY2PXGMKNQHO7WNYXEWX76L2C5BH3JUW3RCATGUYKY7QQTRILBZIFWV'
pool hash 9ac7a9cde23ac2ada11105eeaa42e43c2ea8332ca0aa8f41f58d7160274d718e
fee 0.3%
pool_type constant_product
-------
pool 'b2e02fcfca6c96f8ad5cbd84e7784a777b36d9c96a2459402c4f458462aab7f0', 'CDE57N6XTUPBKYYDGQMXX7E7SLNOLFY3JEQB4MULSMR2AKTSAENGX2HC'
pool hash b2e02fcfca6c96f8ad5cbd84e7784a777b36d9c96a2459402c4f458462aab7f0
fee 0.1%
pool_type constant_product
-------
"""
const StellarSdk = require('@stellar/stellar-sdk');
const binascii = require('binascii');
const {
Asset,
Contract,
TransactionBuilder,
rpc,
BASE_FEE,
Networks,
xdr,
TimeoutInfinite,
StrKey,
} = StellarSdk;
// ==========================
// Configuration Variables
// ==========================
// Step 1. Specify user public key and tokens
// AQUA issuer - we don't call user related information in our simulations, so any public key is fine
const userPublicKey = 'GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA'
// XLM
const tokenA = Asset.native();
// AQUA
const tokenB = new Asset('AQUA','GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA');
// Sorobanserver RPC endpoint and Router conract id
const sorobanServer = 'https://mainnet.sorobanrpc.com';
const routerContractId = 'CBQDHNBFBZYE4MKPWBSJOPIYLW4SFSXAXUTSXJN76GNKYVYPCKWC6QUK';
// ==========================
// Utility Functions
// ==========================
function orderTokensIds(tokensIds) {
/**
* Orders token IDs based on their contract ID to maintain consistency.
*
* @param {Array} tokensIds - List of token addresses as SCVal objects.
* @returns {Array} Ordered list of token SCVal objects.
*/
return tokensIds.sort((a, b) => {
const aHash = BigInt('0x' + a.address().contractId().toString('hex'));
const bHash = BigInt('0x' + b.address().contractId().toString('hex'));
// Compare BigInts directly without converting to number
if (aHash < bHash) return -1;
if (aHash > bHash) return 1;
return 0;
});
}
function u128ToInt(value) {
/**
* Converts UInt128Parts from Stellar's XDR to a JavaScript number.
*
* @param {Object} value - UInt128Parts object from Stellar SDK, with `hi` and `lo` properties.
* @returns {number|null} Corresponding JavaScript number, or null if the number is too large.
*/
const result = (BigInt(value.hi()._value) << 64n) + BigInt(value.lo()._value);
// Check if the result is within the safe integer range for JavaScript numbers
if (result <= BigInt(Number.MAX_SAFE_INTEGER)) {
return Number(result);
} else {
console.warn("Value exceeds JavaScript's safe integer range");
return null;
}
}
function contractIdToScVal(contractId) {
/**
* Converts a contract ID to a Stellar SCVal (Smart Contract Value) format.
*
* @param {string} contractId - The contract ID to convert.
* @returns {StellarSdk.xdr.ScVal} - The SCVal representation of the contract ID.
*
* @throws {Error} Throws an error if the contract ID is invalid or cannot be decoded.
*/
return StellarSdk.Address.contract(StrKey.decodeContract(contractId)).toScVal();
}
// Step 2. Retrieve a list of pools with specified tokens.
async function getPools(server, tokensContactIds) {
/**
* Retrieves the pools associated with specific token contract IDs from the Stellar network.
*
* @async
* @param {rpc.Server} server - The Stellar server instance to connect to.
* @param {string[]} tokensContactIds - An array of token contract IDs for which pools are being queried.
* @returns {Promise<Array<[string, string]>>} - A promise that resolves to an array of arrays, each containing:
* - The contract ID as an encoded string.
* - The corresponding hash value from the pool.
*/
const contract = new StellarSdk.Contract(routerContractId);
const account = await server.getAccount(userPublicKey);
const tx = new TransactionBuilder(account, {
fee: BASE_FEE,
networkPassphrase: Networks.PUBLIC,
})
.addOperation(
contract.call(
'get_pools',
xdr.ScVal.scvVec(
orderTokensIds(
tokensContactIds.map(id => contractIdToScVal(id))
)
)
)
)
.setTimeout(TimeoutInfinite)
.build();
const simulateResult = await server.simulateTransaction(tx);
if (!simulateResult.result) {
return [];
}
const hashArray = simulateResult.result.retval.value();
if (!hashArray.length) {
return [];
}
return hashArray.map((item) => [
StrKey.encodeContract(Buffer.from(binascii.unhexlify(item.val().value().value().toString('hex')), 'ascii')),
item.key().value().toString('hex'),
]);
}
// Step 3. Retrieve information about each pool obtained in the previous step.
async function getPoolInfo(server, poolId) {
/**
* Retrieves detailed information about a specific pool from the Stellar network.
*
* @async
* @param {rpc.Server} server - The Stellar server instance to connect to.
* @param {string} poolId - The ID of the pool for which information is being retrieved.
* @returns {Promise<Object>} - A promise that resolves to an object containing key-value pairs of pool information.
*/
const contract = new Contract(poolId);
const account = await server.getAccount(userPublicKey);
const tx = new TransactionBuilder(account, {
fee: BASE_FEE,
networkPassphrase: Networks.PUBLIC,
})
.addOperation(
contract.call('get_info')
)
.setTimeout(TimeoutInfinite)
.build();
const simulateResult = await server.simulateTransaction(tx);
if (!simulateResult.result) {
throw new Error('Something went wrong');
}
return simulateResult.result.retval.value().reduce((acc, val) => {
acc[val.key().value().toString()] =
typeof val.val().value() === 'number'
? val.val().value()
: val.val().value().hi
? u128ToInt(val.val().value())
: val.val().value().toString();
return acc;
}, {});
}
async function getPoolsInfo() {
const server = new rpc.Server(sorobanServer);
const tokenAContractId = tokenA.contractId(Networks.PUBLIC);
const tokenBContractId = tokenB.contractId(Networks.PUBLIC)
const pools = await getPools(server, [tokenAContractId, tokenBContractId]);
console.log(pools);
// [
// [
// 'CD6P4U2BR4KGRGICBUEZHV4RFAU6365Y4LS7VESVGTKEE73SVFKODOPP',
// '9ac7a9cde23ac2ada11105eeaa42e43c2ea8332ca0aa8f41f58d7160274d718e'
// ]
// ]
const poolsInfo = await Promise.all(pools.map(([id]) => getPoolInfo(server, id)));
console.log(poolsInfo);
// [ { fee: 30, pool_type: 'constant_product' } ]
}
getPoolsInfo();
import requests
base_api = 'https://amm-api.aqua.network/api/external/v1';
# You can skip using the address__in filter and retrieve a list of all pools with pagination.
def get_pools_info(base_api: str, pools: list[str]) -> dict:
return requests.get(f"{base_api}/pools/?address__in={','.join(pools)}").json()
print(
get_pools_info(
base_api,
[
'CDE57N6XTUPBKYYDGQMXX7E7SLNOLFY3JEQB4MULSMR2AKTSAENGX2HC'
],
)
)
"""
{
'count': 1,
'next': None,
'previous': None,
'results': [
{
'index': 'b2e02fcfca6c96f8ad5cbd84e7784a777b36d9c96a2459402c4f458462aab7f0',
'address': 'CDE57N6XTUPBKYYDGQMXX7E7SLNOLFY3JEQB4MULSMR2AKTSAENGX2HC',
'tokens_addresses': [
'CAS3J7GYLGXMF6TDJBBYYSE3HQ6BBSMLNUQ34T6TZMYMW2EVH34XOWMA',
'CAUIKL3IYGMERDRUN6YSCLWVAKIFG5Q4YJHUKM4S4NJZQIA3BAS6OJPK'
],
'tokens_str': [
'native',
'AQUA:GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA'
],
'pool_type': 'constant_product',
'fee': '0.0010',
'a': None
}
]
}
"""
const baseApi = 'https://amm-api.aqua.network/api/external/v1';
async function getPoolsInfo(pools) {
const poolsAddresses = pools.join(',');
// You can skip using the address__in filter and retrieve a list of all pools with pagination.
const poolsResponse = await fetch(`${baseApi}/pools/?address__in=${poolsAddresses}`)
const poolsInfo = await poolsResponse.json();
console.log(poolsInfo);
//{
// count: 1,
// next: null,
// previous: null,
// results: [
// {
// index: '37b955f3708dae748cad465441fcad70b26ffe272385227a4a737234ae429bdf',
// address: 'CCSY43EHJAHT3NQDYKAMJXRFBEEH7OXDL3J3VNGO33UUSEXWNN27GBIZ',
// tokens_addresses: [
// 'CAS3J7GYLGXMF6TDJBBYYSE3HQ6BBSMLNUQ34T6TZMYMW2EVH34XOWMA',
// 'CAUIKL3IYGMERDRUN6YSCLWVAKIFG5Q4YJHUKM4S4NJZQIA3BAS6OJPK'
// ]
// tokens_str: ['native', 'AQUA:GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA'],
// pool_type: 'constant_product',
// fee: '0.0010',
// a: null
// }
// ]
//}
}
getPoolsInfo(['CCSY43EHJAHT3NQDYKAMJXRFBEEH7OXDL3J3VNGO33UUSEXWNN27GBIZ']);