This server exposes a set of tools via Server-Sent Events (SSE) for interacting with wallet behaviour prediction,fraud detection and rug pull prediction.
predictive_behaviour
– AI-powered fraud detection, intention calculation & analytics for Ethereum, Polygon, BSC, TON and HAQQ.predictive_fraud
– AI-powered fraud detection for Ethereum, Polygon, BSC, TON and HAQQ .import os, asyncio, json
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
import openai
from dotenv import load_dotenv
load_dotenv()
openai.api_key = os.getenv("OPENAI_API_KEY")
async def chat_loop(sse_url):
while True:
q = input("You: ").strip()
if not q or q.lower() == "quit":
break
resp = await init_and_process_query(sse_url, q)
print("Assistant:", resp)
async def init_and_process_query(sse_url, query):
async with sse_client(sse_url) as streams:
read_stream, write_stream = streams
async with ClientSession(read_stream, write_stream) as sess:
await sess.initialize()
tools_resp = await sess.list_tools()
functions = [
{"name": t.name, "description": t.description, "parameters": t.inputSchema}
for t in tools_resp.tools
]
chat_resp = openai.chat.completions.create(
model="gpt-4-0613",
messages=[{"role":"user","content":query}],
functions=functions,
function_call="auto"
)
msg = chat_resp.choices[0].message
if msg.function_call:
fn_name = msg.function_call.name
fn_args = json.loads(msg.function_call.arguments)
fn_args["apiKey"] = os.getenv("CA_MCP_API_KEY")
tool_resp = await sess.call_tool(fn_name, fn_args)
output = tool_resp.content[0].text
final_resp = openai.chat.completions.create(
model="gpt-4-0613",
messages=[
{"role":"user","content":query},
{"role":"assistant","function_call":msg.function_call},
{"role":"function","name":fn_name,"content":output}
]
)
return final_resp.choices[0].message.content
return msg.content
# Run it:
asyncio.run(chat_loop("{mcpServerUrl}/sse"))
import asyncio
from mcp.client.session import ClientSession
from mcp.client.sse import sse_client
async def run_client(sse_url):
async with sse_client(sse_url) as streams:
read_stream, write_stream = streams
async with ClientSession(read_stream, write_stream) as sess:
await sess.initialize()
tools = await sess.list_tools()
print("Tools:", [t.name for t in tools.tools])
fraud = await sess.call_tool("check_fraud", {"network": "ETH", "walletAddress": "vitalik.eth"})
print("Fraud result:", fraud.content[0].text)
audit = await sess.call_tool("check_audit", {"network": "ETH", "walletAddress": "vitalik.eth"})
print("Audit result:", audit.content[0].text)
rug = await sess.call_tool("rug_pull_check", {"network": "BNB", "walletAddress": "0x89c5..."})
print("Rug Pull result:", rug.content[0].text)
# Run it:
asyncio.run(run_client("{mcpServerUrl}/sse"))
SSE Endpoint: /sse
Post messages via: /messages/
**Note that to be able to use the tools you need to have an active Enterprisse Account from ChainAware.ai