Get Pools Info
This page shows how to retrieve pool data from Aquarius AMM.
You can retrieve information about pools in two ways: by calling smart contract methods or by using the Aquarius backend API.
Pool details are also displayed on the Aquarius website, on pool info page - e.g. https://aqua.network/pools/CCY2PXGMKNQHO7WNYXEWX76L2C5BH3JUW3RCATGUYKY7QQTRILBZIFWV/
Using smart contracts: Step by Step Guide
Since calling contract methods does not alter the Stellar network and is executed through transaction simulation, a transaction signature is not required, and the call can be made using a public key. Scroll here to see the complete code.
Specify user public key and tokens
# AQUA issuer - we don't call user related information in our simulations, so any public key is fine
USER_PUBLIC_KEY = 'GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA'
# XLM
TOKEN_A = Asset.native()
# AQUA
TOKEN_B = Asset('AQUA', 'GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA')
// AQUA issuer - we don't call user related information in our simulations, so any public key is fine
const userPublicKey = 'GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA'
// XLM
const tokenA = Asset.native();
// AQUA
const tokenB = new Asset('AQUA','GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA');
Retrieve a list of pools with specified tokens.
def get_pools(server, address, router_contract_id, tokens: list[Asset]) -> dict:
tx_builder = TransactionBuilder(
source_account=server.load_account(address),
network_passphrase=NETWORK_PASSPHRASE,
base_fee=1000000 # Set base fee; adjust as necessary
).set_timeout(3600) # Set transaction timeout
tx = (
tx_builder
.append_invoke_contract_function_op(
contract_id=router_contract_id,
function_name="get_pools",
parameters=[
scval.to_vec(order_token_ids([
Address(token.contract_id(NETWORK_PASSPHRASE)).to_xdr_sc_val()
for token in tokens
])),
],
)
.build()
)
simulation = server.simulate_transaction(tx)
tx_result = [xdr.SCVal.from_xdr(r.xdr) for r in simulation.results][0]
if not tx_result:
raise RuntimeError
return {
entry.key.bytes.sc_bytes.hex(): StrKey.encode_contract(binascii.unhexlify(entry.val.address.contract_id.hash.hex()))
for entry in tx_result.map.sc_map
}
async function getPools(server, tokensContactIds) {
/**
* Retrieves the pools associated with specific token contract IDs from the Stellar network.
*
* @async
* @param {rpc.Server} server - The Stellar server instance to connect to.
* @param {string[]} tokensContactIds - An array of token contract IDs for which pools are being queried.
* @returns {Promise<Array<[string, string]>>} - A promise that resolves to an array of arrays, each containing:
* - The contract ID as an encoded string.
* - The corresponding hash value from the pool.
*/
const contract = new StellarSdk.Contract(routerContractId);
const account = await server.getAccount(userPublicKey);
const tx = new TransactionBuilder(account, {
fee: BASE_FEE,
networkPassphrase: Networks.PUBLIC,
})
.addOperation(
contract.call(
'get_pools',
xdr.ScVal.scvVec(
orderTokensIds(
tokensContactIds.map(id => contractIdToScVal(id))
)
)
)
)
.setTimeout(TimeoutInfinite)
.build();
const simulateResult = await server.simulateTransaction(tx);
if (!simulateResult.result) {
return [];
}
const hashArray = simulateResult.result.retval.value();
if (!hashArray.length) {
return [];
}
return hashArray.map((item) => [
StrKey.encodeContract(Buffer.from(binascii.unhexlify(item.val().value().value().toString('hex')), 'ascii')),
item.key().value().toString('hex'),
]);
}
Retrieve information about each pool obtained in the previous step.
def get_pools_info():
server = SorobanServer(SOROBAN_SERVER_RPC)
pools = get_pools(
server,
USER_PUBLIC_KEY,
ROUTER_CONTRACT_ID,
[TOKEN_A, TOKEN_B],
)
for pool_hash, pool_id in pools.items():
print(f"pool '{pool_hash}', '{pool_id}'")
tx = (
TransactionBuilder(
source_account=server.load_account(USER_PUBLIC_KEY),
network_passphrase=NETWORK_PASSPHRASE,
base_fee=1000000
).set_timeout(3600)
.append_invoke_contract_function_op(
contract_id=ROUTER_CONTRACT_ID,
function_name="get_info",
parameters=[
scval.to_vec(order_token_ids([
Address(TOKEN_A.contract_id(NETWORK_PASSPHRASE)).to_xdr_sc_val(),
Address(TOKEN_B.contract_id(NETWORK_PASSPHRASE)).to_xdr_sc_val()
])),
scval.to_bytes(bytes.fromhex(pool_hash)), # Known pool hash as bytes
],
)
.build()
)
simulation = server.simulate_transaction(tx)
tx_result = [xdr.SCVal.from_xdr(r.xdr) for r in simulation.results][0]
if not tx_result:
raise RuntimeError
pool_info = {
entry.key.sym.sc_symbol.decode(): entry.val
for entry in tx_result.map.sc_map
}
pool_info['pool_type'] = pool_info['pool_type'].sym.sc_symbol.decode()
pool_info['fee'] = "{0}%".format(str(int(pool_info['fee'].u32.uint32) / 10_000 * 100))
if 'a' in pool_info:
pool_info['a'] = u128_to_int(pool_info['a'].u128)
if 'n_tokens' in pool_info:
pool_info['n_tokens'] = int(pool_info['n_tokens'].u32.uint32)
print('pool hash', pool_hash)
for key, value in pool_info.items():
print(key, value)
print('-------')
async function getPoolInfo(server, poolId) {
/**
* Retrieves detailed information about a specific pool from the Stellar network.
*
* @async
* @param {rpc.Server} server - The Stellar server instance to connect to.
* @param {string} poolId - The ID of the pool for which information is being retrieved.
* @returns {Promise<Object>} - A promise that resolves to an object containing key-value pairs of pool information.
*/
const contract = new Contract(poolId);
const account = await server.getAccount(userPublicKey);
const tx = new TransactionBuilder(account, {
fee: BASE_FEE,
networkPassphrase: Networks.PUBLIC,
})
.addOperation(
contract.call('get_info')
)
.setTimeout(TimeoutInfinite)
.build();
const simulateResult = await server.simulateTransaction(tx);
if (!simulateResult.result) {
throw new Error('Something went wrong');
}
return simulateResult.result.retval.value().reduce((acc, val) => {
acc[val.key().value().toString()] =
typeof val.val().value() === 'number'
? val.val().value()
: val.val().value().hi
? u128ToInt(val.val().value())
: val.val().value().toString();
return acc;
}, {});
}
async function getPoolsInfo() {
const server = new rpc.Server(sorobanServer);
const tokenAContractId = tokenA.contractId(Networks.PUBLIC);
const tokenBContractId = tokenB.contractId(Networks.PUBLIC)
const pools = await getPools(server, [tokenAContractId, tokenBContractId]);
console.log(pools);
// [
// [
// 'CCSY43EHJAHT3NQDYKAMJXRFBEEH7OXDL3J3VNGO33UUSEXWNN27GBIZ',
// '37b955f3708dae748cad465441fcad70b26ffe272385227a4a737234ae429bdf'
// ]
// ]
const poolsInfo = await Promise.all(pools.map(([id]) => getPoolInfo(server, id)));
console.log(poolsInfo);
// [ { fee: 100,, pool_type: 'constant_product' } ]
}
Complete Code Examples
Copy the full code Python
# get_pools.py
import binascii
from typing import List
from stellar_sdk import Address, Keypair, scval, SorobanServer, xdr, TransactionBuilder, Server, StrKey, Network, Asset
from stellar_sdk.xdr import UInt128Parts
# ==========================
# Configuration Variables
# ==========================
# Step 1. Specify user public key and tokens
# AQUA issuer - we don't call user related information in our simulations, so any public key is fine
USER_PUBLIC_KEY = 'GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA'
# XLM
TOKEN_A = Asset.native()
# AQUA
TOKEN_B = Asset('AQUA', 'GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA')
# Sorobanserver RPC endpoint and Router conract id
SOROBAN_SERVER_RPC = 'https://mainnet.sorobanrpc.com'
ROUTER_CONTRACT_ID = 'CBQDHNBFBZYE4MKPWBSJOPIYLW4SFSXAXUTSXJN76GNKYVYPCKWC6QUK'
# Stellar network passphrase
NETWORK_PASSPHRASE = Network.PUBLIC_NETWORK_PASSPHRASE
# ==========================
# Utility Functions
# ==========================
def order_token_ids(tokens: List[xdr.SCVal]) -> List[xdr.SCVal]:
"""
Orders token IDs based on their contract ID to maintain consistency.
Args:
tokens (List[xdr.SCVal]): List of token addresses as SCVal objects.
Returns:
List[xdr.SCVal]: Ordered list of token SCVal objects.
"""
return sorted(tokens, key=lambda token: int(token.address.contract_id.hash.hex(), 16))
def u128_to_int(value: UInt128Parts) -> int:
"""
Converts UInt128Parts from Stellar's XDR to a Python integer.
Args:
value (UInt128Parts): UInt128Parts object from Stellar SDK.
Returns:
int: Corresponding Python integer.
"""
return int(value.hi.uint64 << 64) + value.lo.uint64
# Step 2. Retrieve a list of pools with specified tokens.
def get_pools(server, address, router_contract_id, tokens: list[Asset]) -> dict:
tx_builder = TransactionBuilder(
source_account=server.load_account(address),
network_passphrase=NETWORK_PASSPHRASE,
base_fee=1000000 # Set base fee; adjust as necessary
).set_timeout(3600) # Set transaction timeout
tx = (
tx_builder
.append_invoke_contract_function_op(
contract_id=router_contract_id,
function_name="get_pools",
parameters=[
scval.to_vec(order_token_ids([
Address(token.contract_id(NETWORK_PASSPHRASE)).to_xdr_sc_val()
for token in tokens
])),
],
)
.build()
)
simulation = server.simulate_transaction(tx)
tx_result = [xdr.SCVal.from_xdr(r.xdr) for r in simulation.results][0]
if not tx_result:
raise RuntimeError
return {
entry.key.bytes.sc_bytes.hex(): StrKey.encode_contract(binascii.unhexlify(entry.val.address.contract_id.hash.hex()))
for entry in tx_result.map.sc_map
}
# Step 3. Retrieve information about each pool obtained in the previous step.
def get_pools_info():
server = SorobanServer(SOROBAN_SERVER_RPC)
pools = get_pools(
server,
USER_PUBLIC_KEY,
ROUTER_CONTRACT_ID,
[TOKEN_A, TOKEN_B],
)
for pool_hash, pool_id in pools.items():
print(f"pool '{pool_hash}', '{pool_id}'")
tx = (
TransactionBuilder(
source_account=server.load_account(USER_PUBLIC_KEY),
network_passphrase=NETWORK_PASSPHRASE,
base_fee=1000000
).set_timeout(3600)
.append_invoke_contract_function_op(
contract_id=ROUTER_CONTRACT_ID,
function_name="get_info",
parameters=[
scval.to_vec(order_token_ids([
Address(TOKEN_A.contract_id(NETWORK_PASSPHRASE)).to_xdr_sc_val(),
Address(TOKEN_B.contract_id(NETWORK_PASSPHRASE)).to_xdr_sc_val()
])),
scval.to_bytes(bytes.fromhex(pool_hash)), # Known pool hash as bytes
],
)
.build()
)
simulation = server.simulate_transaction(tx)
tx_result = [xdr.SCVal.from_xdr(r.xdr) for r in simulation.results][0]
if not tx_result:
raise RuntimeError
pool_info = {
entry.key.sym.sc_symbol.decode(): entry.val
for entry in tx_result.map.sc_map
}
pool_info['pool_type'] = pool_info['pool_type'].sym.sc_symbol.decode()
pool_info['fee'] = "{0}%".format(str(int(pool_info['fee'].u32.uint32) / 10_000 * 100))
if 'a' in pool_info:
pool_info['a'] = u128_to_int(pool_info['a'].u128)
if 'n_tokens' in pool_info:
pool_info['n_tokens'] = int(pool_info['n_tokens'].u32.uint32)
print('pool hash', pool_hash)
for key, value in pool_info.items():
print(key, value)
print('-------')
if __name__ == "__main__":
get_pools_info()
"""
pool '37b955f3708dae748cad465441fcad70b26ffe272385227a4a737234ae429bdf', 'CCSY43EHJAHT3NQDYKAMJXRFBEEH7OXDL3J3VNGO33UUSEXWNN27GBIZ'
pool hash 37b955f3708dae748cad465441fcad70b26ffe272385227a4a737234ae429bdf
fee 1.0%
pool_type constant_product
-------
pool '9ac7a9cde23ac2ada11105eeaa42e43c2ea8332ca0aa8f41f58d7160274d718e', 'CCY2PXGMKNQHO7WNYXEWX76L2C5BH3JUW3RCATGUYKY7QQTRILBZIFWV'
pool hash 9ac7a9cde23ac2ada11105eeaa42e43c2ea8332ca0aa8f41f58d7160274d718e
fee 0.3%
pool_type constant_product
-------
pool 'b2e02fcfca6c96f8ad5cbd84e7784a777b36d9c96a2459402c4f458462aab7f0', 'CDE57N6XTUPBKYYDGQMXX7E7SLNOLFY3JEQB4MULSMR2AKTSAENGX2HC'
pool hash b2e02fcfca6c96f8ad5cbd84e7784a777b36d9c96a2459402c4f458462aab7f0
fee 0.1%
pool_type constant_product
-------
"""
Copy the full code JavaScript
const StellarSdk = require('@stellar/stellar-sdk');
const binascii = require('binascii');
const {
Asset,
Contract,
TransactionBuilder,
rpc,
BASE_FEE,
Networks,
xdr,
TimeoutInfinite,
StrKey,
} = StellarSdk;
// ==========================
// Configuration Variables
// ==========================
// Step 1. Specify user public key and tokens
// AQUA issuer - we don't call user related information in our simulations, so any public key is fine
const userPublicKey = 'GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA'
// XLM
const tokenA = Asset.native();
// AQUA
const tokenB = new Asset('AQUA','GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA');
// Sorobanserver RPC endpoint and Router conract id
const sorobanServer = 'https://mainnet.sorobanrpc.com';
const routerContractId = 'CBQDHNBFBZYE4MKPWBSJOPIYLW4SFSXAXUTSXJN76GNKYVYPCKWC6QUK';
// ==========================
// Utility Functions
// ==========================
function orderTokensIds(tokensIds) {
/**
* Orders token IDs based on their contract ID to maintain consistency.
*
* @param {Array} tokensIds - List of token addresses as SCVal objects.
* @returns {Array} Ordered list of token SCVal objects.
*/
return tokensIds.sort((a, b) => {
const aHash = BigInt('0x' + a.address().contractId().toString('hex'));
const bHash = BigInt('0x' + b.address().contractId().toString('hex'));
// Compare BigInts directly without converting to number
if (aHash < bHash) return -1;
if (aHash > bHash) return 1;
return 0;
});
}
function u128ToInt(value) {
/**
* Converts UInt128Parts from Stellar's XDR to a JavaScript number.
*
* @param {Object} value - UInt128Parts object from Stellar SDK, with `hi` and `lo` properties.
* @returns {number|null} Corresponding JavaScript number, or null if the number is too large.
*/
const result = (BigInt(value.hi()._value) << 64n) + BigInt(value.lo()._value);
// Check if the result is within the safe integer range for JavaScript numbers
if (result <= BigInt(Number.MAX_SAFE_INTEGER)) {
return Number(result);
} else {
console.warn("Value exceeds JavaScript's safe integer range");
return null;
}
}
function contractIdToScVal(contractId) {
/**
* Converts a contract ID to a Stellar SCVal (Smart Contract Value) format.
*
* @param {string} contractId - The contract ID to convert.
* @returns {StellarSdk.xdr.ScVal} - The SCVal representation of the contract ID.
*
* @throws {Error} Throws an error if the contract ID is invalid or cannot be decoded.
*/
return StellarSdk.Address.contract(StrKey.decodeContract(contractId)).toScVal();
}
// Step 2. Retrieve a list of pools with specified tokens.
async function getPools(server, tokensContactIds) {
/**
* Retrieves the pools associated with specific token contract IDs from the Stellar network.
*
* @async
* @param {rpc.Server} server - The Stellar server instance to connect to.
* @param {string[]} tokensContactIds - An array of token contract IDs for which pools are being queried.
* @returns {Promise<Array<[string, string]>>} - A promise that resolves to an array of arrays, each containing:
* - The contract ID as an encoded string.
* - The corresponding hash value from the pool.
*/
const contract = new StellarSdk.Contract(routerContractId);
const account = await server.getAccount(userPublicKey);
const tx = new TransactionBuilder(account, {
fee: BASE_FEE,
networkPassphrase: Networks.PUBLIC,
})
.addOperation(
contract.call(
'get_pools',
xdr.ScVal.scvVec(
orderTokensIds(
tokensContactIds.map(id => contractIdToScVal(id))
)
)
)
)
.setTimeout(TimeoutInfinite)
.build();
const simulateResult = await server.simulateTransaction(tx);
if (!simulateResult.result) {
return [];
}
const hashArray = simulateResult.result.retval.value();
if (!hashArray.length) {
return [];
}
return hashArray.map((item) => [
StrKey.encodeContract(Buffer.from(binascii.unhexlify(item.val().value().value().toString('hex')), 'ascii')),
item.key().value().toString('hex'),
]);
}
// Step 3. Retrieve information about each pool obtained in the previous step.
async function getPoolInfo(server, poolId) {
/**
* Retrieves detailed information about a specific pool from the Stellar network.
*
* @async
* @param {rpc.Server} server - The Stellar server instance to connect to.
* @param {string} poolId - The ID of the pool for which information is being retrieved.
* @returns {Promise<Object>} - A promise that resolves to an object containing key-value pairs of pool information.
*/
const contract = new Contract(poolId);
const account = await server.getAccount(userPublicKey);
const tx = new TransactionBuilder(account, {
fee: BASE_FEE,
networkPassphrase: Networks.PUBLIC,
})
.addOperation(
contract.call('get_info')
)
.setTimeout(TimeoutInfinite)
.build();
const simulateResult = await server.simulateTransaction(tx);
if (!simulateResult.result) {
throw new Error('Something went wrong');
}
return simulateResult.result.retval.value().reduce((acc, val) => {
acc[val.key().value().toString()] =
typeof val.val().value() === 'number'
? val.val().value()
: val.val().value().hi
? u128ToInt(val.val().value())
: val.val().value().toString();
return acc;
}, {});
}
async function getPoolsInfo() {
const server = new rpc.Server(sorobanServer);
const tokenAContractId = tokenA.contractId(Networks.PUBLIC);
const tokenBContractId = tokenB.contractId(Networks.PUBLIC)
const pools = await getPools(server, [tokenAContractId, tokenBContractId]);
console.log(pools);
// [
// [
// 'CD6P4U2BR4KGRGICBUEZHV4RFAU6365Y4LS7VESVGTKEE73SVFKODOPP',
// '9ac7a9cde23ac2ada11105eeaa42e43c2ea8332ca0aa8f41f58d7160274d718e'
// ]
// ]
const poolsInfo = await Promise.all(pools.map(([id]) => getPoolInfo(server, id)));
console.log(poolsInfo);
// [ { fee: 30, pool_type: 'constant_product' } ]
}
getPoolsInfo();
Using backend API
import requests
base_api = 'https://amm-api.aqua.network/api/external/v1';
# You can skip using the address__in filter and retrieve a list of all pools with pagination.
def get_pools_info(base_api: str, pools: list[str]) -> dict:
return requests.get(f"{base_api}/pools/?address__in={','.join(pools)}").json()
print(
get_pools_info(
base_api,
[
'CDE57N6XTUPBKYYDGQMXX7E7SLNOLFY3JEQB4MULSMR2AKTSAENGX2HC'
],
)
)
"""
{
'count': 1,
'next': None,
'previous': None,
'results': [
{
'index': 'b2e02fcfca6c96f8ad5cbd84e7784a777b36d9c96a2459402c4f458462aab7f0',
'address': 'CDE57N6XTUPBKYYDGQMXX7E7SLNOLFY3JEQB4MULSMR2AKTSAENGX2HC',
'tokens_addresses': [
'CAS3J7GYLGXMF6TDJBBYYSE3HQ6BBSMLNUQ34T6TZMYMW2EVH34XOWMA',
'CAUIKL3IYGMERDRUN6YSCLWVAKIFG5Q4YJHUKM4S4NJZQIA3BAS6OJPK'
],
'tokens_str': [
'native',
'AQUA:GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA'
],
'pool_type': 'constant_product',
'fee': '0.0010',
'a': None
}
]
}
"""
const baseApi = 'https://amm-api.aqua.network/api/external/v1';
async function getPoolsInfo(pools) {
const poolsAddresses = pools.join(',');
// You can skip using the address__in filter and retrieve a list of all pools with pagination.
const poolsResponse = await fetch(`${baseApi}/pools/?address__in=${poolsAddresses}`)
const poolsInfo = await poolsResponse.json();
console.log(poolsInfo);
//{
// count: 1,
// next: null,
// previous: null,
// results: [
// {
// index: '37b955f3708dae748cad465441fcad70b26ffe272385227a4a737234ae429bdf',
// address: 'CCSY43EHJAHT3NQDYKAMJXRFBEEH7OXDL3J3VNGO33UUSEXWNN27GBIZ',
// tokens_addresses: [
// 'CAS3J7GYLGXMF6TDJBBYYSE3HQ6BBSMLNUQ34T6TZMYMW2EVH34XOWMA',
// 'CAUIKL3IYGMERDRUN6YSCLWVAKIFG5Q4YJHUKM4S4NJZQIA3BAS6OJPK'
// ]
// tokens_str: ['native', 'AQUA:GBNZILSTVQZ4R7IKQDGHYGY2QXL5QOFJYQMXPKWRRM5PAV7Y4M67AQUA'],
// pool_type: 'constant_product',
// fee: '0.0010',
// a: null
// }
// ]
//}
}
getPoolsInfo(['CCSY43EHJAHT3NQDYKAMJXRFBEEH7OXDL3J3VNGO33UUSEXWNN27GBIZ']);
Last updated