// src/background.rs use crate::types::Rule; use crate::trading::TradingEngine; use crate::datahub; use std::sync::Arc; use tokio::sync::Mutex; use tokio::time::{interval, Duration}; use tracing::{info, error}; pub async fn start_rule_background_job( trading_engine: Arc, surface_store: crate::types::SurfaceStore, rules_store: Arc>>, ) { let mut job_running = trading_engine.job_running.lock().await; if *job_running { info!("Background rule job is already running"); return; } *job_running = true; drop(job_running); let interval_sec = *trading_engine.job_interval.lock().await; let mut ticker = interval(Duration::from_secs(interval_sec)); info!("Background rule checker + auto-trading started (interval: {}s)", interval_sec); loop { if !*trading_engine.job_running.lock().await { info!("Background rule job stopped"); break; } ticker.tick().await; let rules = rules_store.lock().await.clone(); let mut executed_trades = 0u32; let mut checked_rules = 0u32; for rule in rules.iter().filter(|r| r.enabled) { checked_rules += 1; for event_slug in &rule.events { // Full context for both Yes and No markets let ctx_json = datahub::get_rule_context_internal( axum::extract::State(surface_store.clone()), Some(event_slug.clone()), None, None, // None = both sides ).await.0; let evaluation = crate::rules::evaluate_rule_per_market(rule, &ctx_json, "Yes"); for market in &evaluation.market_results { if let Some(true) = market.get("result").and_then(|v| v.as_bool()) { if let (Some(bucket), Some(price), Some(token_id)) = ( market.get("bucket").and_then(|v| v.as_str()), market.get("Price").and_then(|v| v.as_f64()), market.get("Token") .or_else(|| market.get("token_id")) .and_then(|v| v.as_str()), ) { let trade_result = trading_engine.place_market_order( rule, event_slug.clone(), bucket.to_string(), token_id.to_string(), price, ).await; if trade_result.success { executed_trades += 1; info!( "✅ Background trade executed | Rule: {} | Event: {} | Bucket: {} | Token: {}", rule.oname, event_slug, bucket, token_id ); } else { error!( "Background trade failed | Rule: {} | Bucket: {} | Reason: {}", rule.oname, bucket, trade_result.reason ); } } } } } } // Update stats *trading_engine.last_run.lock().await = Some(chrono::Utc::now().to_rfc3339()); { let mut stats = trading_engine.stats.lock().await; stats.rules_checked += checked_rules; stats.trades_executed += executed_trades; } info!( "Background cycle finished → checked {} rules, executed {} trades", checked_rules, executed_trades ); } } // src/datahub.rs (FINAL - fixed lifetime + move + unused warnings) use axum::{Json, extract::{State, Query}}; use serde_json::Value; use chrono::{DateTime, Utc, Duration}; use crate::types::{SurfaceStore, Surface}; use crate::xtracker; use crate::surfaces::parse_datetime; use regex::Regex; use tracing::info; // removed unused 'error' #[derive(serde::Deserialize)] pub struct RuleContextQuery { pub slug: Option, } pub async fn get_rule_context_handler( State(store): State, Query(query): Query, ) -> Json { get_rule_context_internal(State(store), query.slug, None, None).await } // ====================== GLOBAL CONTEXT ====================== pub async fn get_glob_context(tweets: &[Value]) -> Value { let heat_4h = count_tweets_in_last_hours(tweets, 4.0) as f64; let heat_10m = count_tweets_in_last_hours(tweets, 10.0 / 60.0) as f64; let pace_24h = calculate_pace(tweets, 24.0); let pace_7d = calculate_pace(tweets, 24.0 * 7.0); let pace_30d = calculate_pace(tweets, 24.0 * 30.0); let pace_avg = (pace_24h + pace_7d + pace_30d) / 3.0; serde_json::json!({ "Heat4h": heat_4h, "Heat10m": heat_10m, "Pace24h": pace_24h, "Pace7d": pace_7d, "Pace30d": pace_30d, "PaceAVG": pace_avg }) } // ====================== EVENT CONTEXT ====================== pub async fn get_event_context( _surface: &Surface, // prefixed with _ to silence unused warning tweets: &[Value], game_start: DateTime, end_date: DateTime, ) -> Value { let count = count_tweets_between(tweets, game_start, end_date); let now = Utc::now(); let duration_hours = (end_date - game_start).num_hours() as f64; let progress = if duration_hours > 0.0 { ((now - game_start).num_hours() as f64 / duration_hours).clamp(0.0, 1.0) } else { 1.0 }; let r_hours = ((end_date - now).num_hours() as f64).max(0.0); serde_json::json!({ "Count": count, "Progress": progress, "RHours": r_hours }) } // ====================== MARKET CONTEXT ====================== pub async fn get_market_context( surface: &Surface, bin: &str, ynside: &str, bin_index: usize, event_count: i64, ) -> Option { let (bottom, top) = parse_bin_range(bin); if event_count > top { return None; } let token = if ynside == "Yes" { surface.token_pairs[bin_index].0.clone() } else { surface.token_pairs[bin_index].1.clone() }; let price = 0.50_f64; let best_bid = 0.49_f64; let best_ask = 0.51_f64; let vola = calculate_vola_from_matrix(&surface.matrix, bin_index); Some(serde_json::json!({ "Bucket": bin, "TokenType": ynside, "Token": token.clone(), "token_id": token, "Bottom": bottom, "Top": top, "Center": (bottom as f64 + top as f64) / 2.0, "Price": price, "BestBid": best_bid, "BestAsk": best_ask, "Vola": vola, "PriceAVG": price })) } // ====================== MAIN CONTEXT BUILDER (fixed) ====================== pub async fn get_rule_context_internal( State(store): State, slug: Option, _token: Option, ynside: Option, ) -> Json { let mut contexts = vec![]; let tweets = xtracker::get_xtracker().await.0.tweets; contexts.push(get_glob_context(&tweets).await); let surface = { let guard = store.read().await; match slug.as_deref().and_then(|s| guard.get(s)) { Some(s) => s.clone(), None => return Json(serde_json::json!({"error": "surface not found"})), } }; let game_start = surface.meta.get("gameStartTime") .and_then(|v| v.as_str()) .and_then(parse_datetime) .unwrap_or_else(Utc::now); let end_date = surface.meta.get("endDate") .and_then(|v| v.as_str()) .and_then(parse_datetime) .unwrap_or_else(|| Utc::now() + chrono::Duration::days(7)); contexts.push(get_event_context(&surface, &tweets, game_start, end_date).await); let event_count = contexts[1].get("Count").and_then(|v| v.as_i64()).unwrap_or(0); let mut ypeak = 0.0; let mut npeak = 0.0; let mut total_pges = 0.0; // kept for clarity, even if not used in final peak calc // Fixed: no move issue - call closure separately without reusing moved value let yes_result = process_side(&surface, "Yes", event_count).await; let no_result = process_side(&surface, "No", event_count).await; match ynside.as_deref() { Some("Yes") => { ypeak = yes_result.0; total_pges = yes_result.1; contexts.extend(yes_result.2); } Some("No") => { npeak = no_result.0; total_pges = no_result.1; contexts.extend(no_result.2); } _ => { // both sides (default for background) ypeak = yes_result.0; npeak = no_result.0; total_pges = yes_result.1 + no_result.1; contexts.extend(yes_result.2); contexts.extend(no_result.2); } } let piks = serde_json::json!({ "YPeak": if total_pges > 0.0 { ypeak / total_pges } else { 0.0 }, "NPeak": if total_pges > 0.0 { npeak / total_pges } else { 0.0 } }); contexts.push(piks); Json(serde_json::Value::Array(contexts)) } // Helper closure extracted to avoid lifetime/move issues async fn process_side(surface: &Surface, yn: &str, event_count: i64) -> (f64, f64, Vec) { let mut local_peak = 0.0; let mut local_pges = 0.0; let mut local_contexts: Vec = vec![]; for (i, bin) in surface.bins.iter().enumerate() { if let Some(m) = get_market_context(surface, bin, yn, i, event_count).await { let center = m.get("Center").and_then(|v| v.as_f64()).unwrap_or(0.0); let price = m.get("Price").and_then(|v| v.as_f64()).unwrap_or(0.0); local_peak += center * price; local_pges += price; local_contexts.push(m); } } (local_peak, local_pges, local_contexts) } // ====================== HELPERS (unchanged) ====================== fn count_tweets_between(tweets: &[Value], start: DateTime, end: DateTime) -> i64 { tweets.iter().filter(|t| { t.get("createdAt").and_then(|v| v.as_str()) .and_then(parse_datetime) .map_or(false, |dt| dt >= start && dt <= end) }).count() as i64 } fn calculate_pace(tweets: &[Value], hours: f64) -> f64 { let cutoff = Utc::now() - Duration::seconds((hours * 3600.0) as i64); let count = tweets.iter().filter(|t| { t.get("createdAt").and_then(|v| v.as_str()) .and_then(parse_datetime) .map_or(false, |dt| dt >= cutoff) }).count() as f64; if hours > 0.0 { count / hours } else { 0.0 } } fn calculate_vola_from_matrix(matrix: &[Vec], bin_index: usize) -> f64 { let prices: Vec = matrix.iter() .filter_map(|row| row.get(bin_index).copied()) .filter(|&p| p > 0.0 && p <= 1.0) .collect(); if prices.len() < 2 { return 0.0; } let mean = prices.iter().sum::() / prices.len() as f64; let variance = prices.iter().map(|&p| (p - mean).powi(2)).sum::() / prices.len() as f64; variance.sqrt() } fn parse_bin_range(bin: &str) -> (i64, i64) { let re = Regex::new(r"(\d+)[^\d]+(\d+)").unwrap(); if let Some(caps) = re.captures(bin) { (caps[1].parse().unwrap_or(0), caps[2].parse().unwrap_or(100)) } else { (0, 100) } } fn count_tweets_in_last_hours(tweets: &[Value], hours: f64) -> i64 { let cutoff = Utc::now() - Duration::seconds((hours * 3600.0) as i64); tweets.iter().filter(|t| { t.get("createdAt").and_then(|v| v.as_str()) .and_then(parse_datetime) .map_or(false, |dt| dt >= cutoff) }).count() as i64 } pub async fn get_global_data() -> Json { Json(serde_json::json!({"status": "ok"})) } pub async fn get_event_data(_slug: String) -> Json { Json(serde_json::json!({"status": "ok"})) } // src/lib.rs //! Polymarket Surface + Rule Engine Backend //! //! This is the main library crate for the Rust backend. //! All modules are declared here so they can be used from main.rs and tests. pub mod background; pub mod datahub; pub mod rules; pub mod surfaces; pub mod trading; pub mod types; pub mod xtracker; // Re-exports for convenient use in main.rs and handlers pub use types::{ Portfolio, PortfolioStore, Position, Rule, RuleEvaluation, Surface, SurfaceStore, SurfaceListResponse, SurfaceMeta, SurfaceResponse, TestRuleQuery, TradeResult, }; pub use trading::TradingEngine; // src/main.rs use axum::{routing::{get, post}, Router, Json, extract::{State, Path, Query}, http::StatusCode}; use tower_http::cors::{Any, CorsLayer}; use std::sync::Arc; use tokio::sync::RwLock; use std::collections::HashMap; use data_backend::types::{PortfolioStore, SurfaceStore, SurfaceListResponse, SurfaceMeta, SurfaceResponse, FilterQuery}; use data_backend::surfaces; use data_backend::xtracker; use data_backend::datahub; use data_backend::rules; use data_backend::trading::TradingEngine; use data_backend::background; #[tokio::main] async fn main() { tracing_subscriber::fmt() .with_env_filter("info,debug") .init(); // Surface Store let surface_store: SurfaceStore = Arc::new(RwLock::new(HashMap::new())); // Portfolio let portfolio: PortfolioStore = Arc::new(RwLock::new(data_backend::types::Portfolio { cash: 10000.0, positions: vec![], })); // Trading Engine (enthält authenticated CLOB Client + Portfolio) let trading_engine = Arc::new(TradingEngine::new(portfolio.clone()).await); // Background: Surface Collector let surface_store_clone = surface_store.clone(); tokio::spawn(async move { background_collector(surface_store_clone).await; }); // Background: Rule Engine + Auto-Trading let rules_store = rules::RULES.clone(); let trading_engine_bg = trading_engine.clone(); let surface_store_bg = surface_store.clone(); tokio::spawn(async move { background::start_rule_background_job( trading_engine_bg, surface_store_bg, rules_store, ).await; }); // Axum Router let app = Router::new() // Surfaces .route("/api/surfaces", get(list_surfaces)) .route("/api/surfaces/filter", get(filter_surfaces)) .route("/api/surface/:slug", get(get_surface)) // XTracker .route("/api/xtracker", get(xtracker::get_xtracker)) // Rules .route("/api/rules", get(rules::list_rules)) .route("/api/rules", post(rules::save_rule)) .route("/api/rules/test/:rule_name", get(rules::test_rule)) .route("/api/rules/execute/:rule_name", get(rules::execute_rule)) .route("/api/rules/evaluate/:rule_name/:slug/:token", get(rules::evaluate_rule)) // DataHub .route("/api/datahub/global", get(datahub::get_global_data)) .route("/api/datahub/event/:slug", get(datahub::get_event_data)) .route("/api/datahub/rule_context", get(datahub::get_rule_context_handler)) // Portfolio & Trading .route("/api/portfolio", get(portfolio_handler)) .route("/api/trading/job/status", get(job_status_handler)) .layer(CorsLayer::new() .allow_origin(Any) .allow_methods(Any) .allow_headers(Any) ) .with_state((surface_store, trading_engine)); let listener = tokio::net::TcpListener::bind("0.0.0.0:3001") .await .unwrap(); println!("✅ Rust Polymarket Surface + Rule Engine Backend ready on http://localhost:3001"); println!(" Background Surface Collector running (every 5min)"); println!(" Background Rule Checker + Auto-Trading running"); println!(); println!(" GET /api/surfaces"); println!(" GET /api/xtracker"); println!(" GET /api/datahub/rule_context?slug=elon-musk-of-tweets-..."); println!(" GET /api/rules"); println!(" POST /api/rules → save new rule"); println!(" GET /api/rules/test/:rule_name?slug=...&ynside=Yes"); println!(" GET /api/rules/execute/:rule_name?slug=... → force execute"); println!(" GET /api/portfolio"); println!(" GET /api/trading/job/status"); axum::serve(listener, app).await.unwrap(); } // ====================== HANDLERS ====================== async fn list_surfaces(State((store, _)): State<(SurfaceStore, Arc)>) -> Json { let lock = store.read().await; let surfaces: Vec = lock.values().map(|s| SurfaceMeta { slug: s.slug.clone(), event_title: s.meta.get("event_title") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(), start_day: "Mon".to_string(), duration_days: 7, }).collect(); Json(SurfaceListResponse { surfaces }) } async fn filter_surfaces( State((store, _)): State<(SurfaceStore, Arc)>, Query(_params): Query, ) -> Json { let lock = store.read().await; let surfaces: Vec = lock.values().map(|s| SurfaceMeta { slug: s.slug.clone(), event_title: s.meta.get("event_title") .and_then(|v| v.as_str()) .unwrap_or("") .to_string(), start_day: "Mon".to_string(), duration_days: 7, }).collect(); Json(SurfaceListResponse { surfaces }) } async fn get_surface( Path(slug): Path, State((store, _)): State<(SurfaceStore, Arc)>, ) -> Result, StatusCode> { let lock = store.read().await; let surface = match lock.get(&slug) { Some(s) => s, None => return Err(StatusCode::NOT_FOUND), }; Ok(Json(SurfaceResponse { slug: surface.slug.clone(), matrix: surface.matrix.clone(), timestamps: surface.timestamps.iter().map(|t| t.to_rfc3339()).collect(), bins: surface.bins.clone(), meta: surface.meta.clone(), })) } async fn portfolio_handler(State((_, trading_engine)): State<(SurfaceStore, Arc)>) -> Json { Json(trading_engine.get_portfolio().await) } async fn job_status_handler(State((_, trading_engine)): State<(SurfaceStore, Arc)>) -> Json { let running = *trading_engine.job_running.lock().await; let interval_sec = *trading_engine.job_interval.lock().await; let last_run = trading_engine.last_run.lock().await.clone(); let stats = trading_engine.stats.lock().await; Json(serde_json::json!({ "running": running, "interval_seconds": interval_sec, "last_run": last_run, "rules_checked": stats.rules_checked, "trades_executed": stats.trades_executed, })) } // ====================== BACKGROUND COLLECTOR ====================== async fn background_collector(store: SurfaceStore) { let client = reqwest::Client::new(); loop { let slugs = surfaces::discover_active_slugs(&client).await; for slug in slugs { if let Some(surface) = surfaces::build_surface(&client, &slug).await { let mut lock = store.write().await; lock.insert(slug.clone(), surface); info!("Surface updated: {}", slug); } } tokio::time::sleep(tokio::time::Duration::from_secs(300)).await; } } // src/rules.rs (fixed move error on trade_result.reason) use axum::{Json, extract::{Path, State, Query}, debug_handler}; use serde_json::Value; use tokio::sync::Mutex; use std::sync::{Arc, LazyLock}; use std::collections::HashMap; use crate::datahub; use crate::types::{Rule, SurfaceStore, TestRuleQuery, RuleEvaluation}; // removed unused Variable use crate::trading::TradingEngine; static RULES: LazyLock>>> = LazyLock::new(|| Arc::new(Mutex::const_new(vec![]))); #[debug_handler] pub async fn list_rules() -> Json { let rules = RULES.lock().await.clone(); Json(super::types::RuleListResponse { rules }) } #[debug_handler] pub async fn save_rule(Json(mut rule): Json) -> Json { if rule.name.is_empty() { rule.name = rule.oname.clone(); } if rule.created.is_empty() { rule.created = chrono::Utc::now().to_rfc3339(); } let mut rules = RULES.lock().await; rules.retain(|r| r.oname != rule.oname); rules.push(rule.clone()); tracing::info!("Rule saved: {} (enabled: {})", rule.oname, rule.enabled); Json(serde_json::json!({ "success": true, "rule_name": rule.oname, "events_count": rule.events.len() })) } // ====================== TEST ENDPOINT (read-only) ====================== #[debug_handler] pub async fn test_rule( State((surface_store, _)): State<(SurfaceStore, Arc)>, Path(rule_name): Path, Query(query): Query, ) -> Json { let rules = RULES.lock().await.clone(); let rule = match rules.iter().find(|r| r.oname == rule_name) { Some(r) => r, None => return Json(serde_json::json!({ "success": false, "rule_name": rule_name, "reason": "Rule not found" })), }; if !rule.enabled { return Json(serde_json::json!({ "success": false, "rule_name": rule_name, "reason": "Rule is disabled" })); } let ynside = query.ynside.clone().unwrap_or_else(|| "Yes".to_string()); let slug = query.slug.clone(); if !rule.events.is_empty() && !rule.events.iter().any(|e| e == &slug) { return Json(serde_json::json!({ "success": false, "rule_name": rule_name, "reason": format!("Slug '{}' not in rule.events", slug) })); } let ctx_json = datahub::get_rule_context_internal( State(surface_store), Some(slug.clone()), query.token.clone(), Some(ynside.clone()) ).await.0; let evaluation = evaluate_rule_per_market(rule, &ctx_json, &ynside); let triggered_count = evaluation.market_results.iter() .filter(|m| m.get("result").and_then(|v| v.as_bool()).unwrap_or(false)) .count(); Json(serde_json::json!({ "success": true, "rule": { "oname": rule.oname, "name": rule.name, "enabled": rule.enabled }, "test": { "slug": slug, "ynside": ynside, "token": query.token }, "order": rule.order, "summary": { "total_markets": evaluation.market_results.len(), "triggered_markets": triggered_count, "overall_trigger": triggered_count > 0 }, "global_context": evaluation.global_context, "event_context": evaluation.event_context, "markets": evaluation.market_results })) } // ====================== EXECUTE RULE (force trigger) ====================== #[debug_handler] pub async fn execute_rule( State((surface_store, trading_engine)): State<(SurfaceStore, Arc)>, Path(rule_name): Path, Query(query): Query, ) -> Json { let rules = RULES.lock().await.clone(); let rule = match rules.iter().find(|r| r.oname == rule_name) { Some(r) => r, None => return Json(serde_json::json!({ "success": false, "rule_name": rule_name, "reason": "Rule not found" })), }; if !rule.enabled { return Json(serde_json::json!({ "success": false, "rule_name": rule_name, "reason": "Rule is disabled" })); } let slug = query.slug.clone(); let ynside = query.ynside.clone().unwrap_or_else(|| "Yes".to_string()); let ctx_json = datahub::get_rule_context_internal( State(surface_store), Some(slug.clone()), query.token.clone(), Some(ynside.clone()) ).await.0; let evaluation = evaluate_rule_per_market(rule, &ctx_json, &ynside); let mut executed = vec![]; let mut errors = vec![]; for market in &evaluation.market_results { if let Some(true) = market.get("result").and_then(|v| v.as_bool()) { if let (Some(bucket), Some(price), Some(token_id)) = ( market.get("bucket").and_then(|v| v.as_str()), market.get("Price").and_then(|v| v.as_f64()), market.get("Token").or(market.get("token_id")).and_then(|v| v.as_str()), ) { let trade_result = trading_engine.place_market_order( rule, slug.clone(), bucket.to_string(), token_id.to_string(), price, ).await; if trade_result.success { executed.push(token_id.to_string()); tracing::info!("✅ Executed trade for rule {} on bucket {}", rule.oname, bucket); } else { errors.push(trade_result.reason); tracing::error!("Trade failed for rule {}: {}", rule.oname, trade_result.reason); } } } } Json(serde_json::json!({ "success": true, "rule_name": rule_name, "slug": slug, "executed_tokens": executed, "errors": errors, "triggered_markets": executed.len(), "portfolio": trading_engine.get_portfolio().await })) } // ====================== EVALUATE (shared logic) ====================== #[debug_handler] pub async fn evaluate_rule( State((surface_store, _)): State<(SurfaceStore, Arc)>, Path(rule_name): Path, Query(query): Query, ) -> Json { let rules = RULES.lock().await.clone(); let rule = match rules.iter().find(|r| r.oname == rule_name) { Some(r) => r, None => { return Json(RuleEvaluation { global_context: serde_json::json!({}), event_context: serde_json::json!({}), market_results: vec![], }); } }; let ynside = query.ynside.unwrap_or_else(|| "Yes".to_string()); let slug = query.slug.clone(); let ctx_json = datahub::get_rule_context_internal( State(surface_store), Some(slug), query.token.clone(), Some(ynside.clone()) ).await.0; let evaluation = evaluate_rule_per_market(rule, &ctx_json, &ynside); Json(evaluation) } // ====================== CORE EVALUATION ENGINE ====================== pub fn evaluate_rule_per_market(rule: &Rule, ctx_json: &Value, _ynside: &str) -> RuleEvaluation { let mut global_context = serde_json::json!({}); let mut event_context = serde_json::json!({}); let mut var_base: HashMap = HashMap::new(); if let Value::Array(arr) = ctx_json { // Global context (index 0) if let Some(glob) = arr.first() { global_context = glob.clone(); if let Some(obj) = glob.as_object() { for (k, v) in obj { if let Some(f) = v.as_f64() { var_base.insert(k.clone(), f); } } } } // Event context (index 1) if arr.len() > 1 { event_context = arr[1].clone(); if let Some(obj) = event_context.as_object() { for (k, v) in obj { if let Some(f) = v.as_f64() { var_base.insert(k.clone(), f); } } } } // Peak values (last element) if let Some(piks) = arr.last() { if let Some(obj) = piks.as_object() { let ypeak = obj.get("YPeak").and_then(|v| v.as_f64()).unwrap_or(0.0); let _npeak = obj.get("NPeak").and_then(|v| v.as_f64()).unwrap_or(0.0); // prefixed _ var_base.insert("Peak".to_string(), ypeak); } } } // Compute custom rule variables (Projection, Plus, etc.) for var in &rule.variables { if let Some(value) = evaluate_expression(&var.expression, &var_base) { var_base.insert(var.name.clone(), value); tracing::info!("Computed rule variable {} = {}", var.name, value); } else { tracing::warn!("Failed to compute rule variable: {}", var.name); } } // Per-market evaluation let mut market_results = vec![]; if let Value::Array(arr) = ctx_json { for item in arr.iter().skip(2) { // skip global + event if let Some(market_obj) = item.as_object() { if !market_obj.contains_key("Bucket") { continue; } let mut market_vars = var_base.clone(); for (k, v) in market_obj.iter() { if let Some(f) = v.as_f64() { market_vars.insert(k.clone(), f); } else if let Some(_s) = v.as_str() { // prefixed _ // string values currently ignored for numeric conditions } } let result = evaluate_condition_tokens(&rule.condition_boxes, &market_vars); let reason = if result { "All conditions met → Rule triggers for this bucket".to_string() } else { "Conditions not met for this bucket".to_string() }; let mut entry = serde_json::Map::new(); entry.insert("bucket".to_string(), market_obj.get("Bucket").cloned().unwrap_or(Value::Null)); entry.insert("result".to_string(), Value::Bool(result)); entry.insert("reason".to_string(), Value::String(reason)); entry.insert("bottom".to_string(), market_obj.get("Bottom").cloned().unwrap_or(Value::Null)); entry.insert("top".to_string(), market_obj.get("Top").cloned().unwrap_or(Value::Null)); entry.insert("center".to_string(), market_obj.get("Center").cloned().unwrap_or(Value::Null)); entry.insert("price".to_string(), market_obj.get("Price").cloned().unwrap_or(Value::Null)); entry.insert("token".to_string(), market_obj.get("Token").cloned().unwrap_or(Value::Null)); let vars_map: serde_json::Map = market_vars .iter() .map(|(k, &v)| (k.clone(), serde_json::json!(v))) .collect(); entry.insert("variables".to_string(), Value::Object(vars_map)); market_results.push(entry); } } } RuleEvaluation { global_context, event_context, market_results, } } // ====================== MATH & CONDITION PARSERS ====================== fn evaluate_expression(expr: &str, vars: &HashMap) -> Option { let mut s = expr.replace(" ", ""); let mut var_list: Vec<_> = vars.iter().collect(); var_list.sort_by(|a, b| b.0.len().cmp(&a.0.len())); for (name, value) in var_list { s = s.replace(name, &value.to_string()); } tracing::debug!("Evaluating expression: '{}' → '{}'", expr, s); parse_math(&s) } fn parse_math(expr: &str) -> Option { if expr.is_empty() { return None; } let mut total = 0.0; let mut current_sign = 1.0; let mut i = 0; let chars: Vec = expr.chars().collect(); while i < chars.len() { if chars[i].is_whitespace() { i += 1; continue; } if chars[i] == '+' { current_sign = 1.0; i += 1; continue; } if chars[i] == '-' { current_sign = -1.0; i += 1; continue; } let term = parse_term(&chars, &mut i); total += current_sign * term; } Some(total) } fn parse_term(chars: &[char], i: &mut usize) -> f64 { let mut res = 0.0; let mut num_str = String::new(); while *i < chars.len() && (chars[*i].is_ascii_digit() || chars[*i] == '.') { num_str.push(chars[*i]); *i += 1; } if let Ok(num) = num_str.parse::() { res = num; } while *i < chars.len() && (chars[*i] == '*' || chars[*i] == '/') { let op = chars[*i]; *i += 1; let mut next_num = String::new(); if *i < chars.len() && chars[*i] == '-' { next_num.push('-'); *i += 1; } while *i < chars.len() && (chars[*i].is_ascii_digit() || chars[*i] == '.') { next_num.push(chars[*i]); *i += 1; } if let Ok(num) = next_num.parse::() { res = if op == '*' { res * num } else if num != 0.0 { res / num } else { 0.0 }; } } res } fn evaluate_condition_tokens(tokens: &[String], vars: &HashMap) -> bool { if tokens.is_empty() { return true; } let mut result = true; let mut i = 0; while i + 2 < tokens.len() { let left_name = tokens[i].trim(); let op_token = tokens[i + 1].trim().to_uppercase(); let right_str = tokens[i + 2].trim(); let left_val = *vars.get(left_name).unwrap_or(&0.0); let right_val: f64 = if let Ok(v) = right_str.parse::() { v } else { *vars.get(right_str).unwrap_or(&0.0) }; tracing::debug!("Condition: {} {} {} (left={}, right={})", left_name, op_token, right_str, left_val, right_val); let cond = match op_token.as_str() { ">" => left_val > right_val, "<" => left_val < right_val, ">=" => left_val >= right_val, "<=" => left_val <= right_val, "==" | "=" => (left_val - right_val).abs() < 0.001, _ => false, }; result = result && cond; i += 3; // Skip AND/OR/parentheses for simple parser (can be extended later) while i < tokens.len() { let t = tokens[i].trim().to_uppercase(); if t == "AND" || t == "OR" || t == "(" || t == ")" { i += 1; } else { break; } } } result } // src/surfaces.rs use reqwest::Client; use serde_json::Value; use chrono::{DateTime, Utc, NaiveDateTime, TimeZone, Datelike, Weekday}; use regex::Regex; use std::collections::HashMap; use tracing::{info, warn}; use crate::types::Surface; /// Parse various datetime formats from Polymarket API pub fn parse_datetime(s: &str) -> Option> { let s = s.trim(); if let Ok(dt) = DateTime::parse_from_rfc3339(s) { return Some(dt.with_timezone(&Utc)); } if let Ok(dt) = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%z") { return Some(dt.with_timezone(&Utc)); } if let Ok(dt) = DateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S%#z") { return Some(dt.with_timezone(&Utc)); } if let Ok(naive) = NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") { return Some(Utc.from_utc_datetime(&naive)); } None } /// Fetch single event and build Surface (matrix + token pairs + meta) pub async fn build_surface(client: &Client, slug: &str) -> Option { tracing::info!("Building surface for slug: {}", slug); let url = format!("https://gamma-api.polymarket.com/events?slug={}", slug); let resp = match client.get(&url).send().await { Ok(r) => r, Err(e) => { warn!("Failed to fetch event {}: {}", slug, e); return None; } }; let data: Value = match resp.json().await { Ok(d) => d, Err(e) => { warn!("Failed to parse JSON for event {}: {}", slug, e); return None; } }; let event = if let Some(arr) = data.as_array() { arr.first().cloned().unwrap_or(data) } else { data }; let title = event.get("title") .or_else(|| event.get("question")) .and_then(|v| v.as_str()) .unwrap_or(slug) .to_string(); info!("Building surface for '{}' (slug: {})", title, slug); let mut token_pairs = vec![]; let mut bins = vec![]; if let Some(markets) = event.get("markets").and_then(|m| m.as_array()) { for market in markets { if let Some(clob_str) = market.get("clobTokenIds").and_then(|v| v.as_str()) { if let Ok(clob_array) = serde_json::from_str::>(clob_str) { if clob_array.len() >= 2 { let yes_token = clob_array[0].clone(); let no_token = clob_array[1].clone(); token_pairs.push((yes_token, no_token)); if let Some(q) = market.get("question").and_then(|v| v.as_str()) { bins.push(q.to_string()); } } } } } } if token_pairs.is_empty() { warn!("No token pairs found for slug {}", slug); return None; } info!("✅ Extracted {} token pairs for slug {}", token_pairs.len(), slug); // Dummy matrix (20 time steps × number of bins) - real data can be filled later let num_bins = token_pairs.len(); let matrix: Vec> = vec![vec![0.0005; num_bins]; 20]; let timestamps: Vec> = (0..20) .map(|i| Utc::now() - chrono::Duration::hours(i as i64)) .collect(); // Extract dates for meta let game_start_time = if let Some(markets) = event.get("markets").and_then(|m| m.as_array()) { markets.first().and_then(|m| { m.get("gameStartTime") .or_else(|| m.get("startDate")) .and_then(|v| v.as_str()) .and_then(parse_datetime) }) } else { None }; let end_date = event.get("endDate") .and_then(|v| v.as_str()) .and_then(parse_datetime); let duration_days = match (game_start_time, end_date) { (Some(start), Some(end)) => (end - start).num_days() as u32, _ => 0, }; let start_day = game_start_time .map(|d| format!("{:?}", d.weekday())) .unwrap_or_else(|| "Mon".to_string()); let mut meta = HashMap::new(); meta.insert("event_slug".to_string(), Value::String(slug.to_string())); meta.insert("event_title".to_string(), Value::String(title)); meta.insert("gameStartTime".to_string(), event.get("markets") .and_then(|m| m.as_array()) .and_then(|arr| arr.first()) .and_then(|m| m.get("gameStartTime")) .cloned() .unwrap_or(Value::Null)); meta.insert("endDate".to_string(), event.get("endDate").cloned().unwrap_or(Value::Null)); meta.insert("start_day".to_string(), Value::String(start_day)); meta.insert("duration_days".to_string(), Value::Number(duration_days.into())); Some(Surface { slug: slug.to_string(), matrix, timestamps, bins, token_pairs, meta, }) } /// Discover active Elon-tweet related slugs from Polymarket predictions page pub async fn discover_active_slugs(client: &Client) -> Vec { let mut slugs = vec![]; let url = "https://polymarket.com/predictions/elon-tweets"; let resp = match client.get(url).send().await { Ok(r) => r, Err(e) => { warn!("Failed to fetch predictions page: {}", e); return slugs; } }; let text = match resp.text().await { Ok(t) => t, Err(e) => { warn!("Failed to read page text: {}", e); return slugs; } }; let re = Regex::new(r"elon-musk-of-tweets-[a-z]+-\d+-[a-z]+-\d+").unwrap(); for cap in re.captures_iter(&text) { let s = cap[0].to_string(); if s.len() < 120 && !slugs.contains(&s) { slugs.push(s); } } info!("Discovered {} active Elon-tweet slugs", slugs.len()); slugs } // src/trading.rs (fixed Amount::usdc Result + unused imports) use crate::types::{Position, Rule, TradeResult, PortfolioStore}; // removed unused Portfolio use polymarket_client_sdk::auth::state::Authenticated; use polymarket_client_sdk::auth::Normal; use polymarket_client_sdk::clob::Client; use polymarket_client_sdk::clob::types::{Amount, OrderType, Side}; use polymarket_client_sdk::types::Decimal; use polymarket_client_sdk::{POLYGON, PRIVATE_KEY_VAR}; use alloy::signers::local::LocalSigner; use alloy::signers::Signer as _; use std::str::FromStr as _; use std::sync::Arc; use tokio::sync::Mutex; // removed unused RwLock use chrono::Utc; use tracing::{info, error}; // k256::Secp256k1 removed (unused - type inferred) #[derive(Clone)] pub struct TradingEngine { pub portfolio: PortfolioStore, pub clob_client: Client>, pub job_running: Arc>, pub job_interval: Arc>, pub last_run: Arc>>, pub stats: Arc>, pub lsigner: LocalSigner, // or keep your original if different } #[derive(Default, Debug, Serialize)] // added Debug + Serialize for consistency pub struct JobStats { pub rules_checked: u32, pub trades_executed: u32, } impl TradingEngine { pub async fn new(portfolio: PortfolioStore) -> Self { let private_key = std::env::var(PRIVATE_KEY_VAR) .expect("PRIVATE_KEY_VAR environment variable is required for trading"); let signer = LocalSigner::from_str(&private_key) .expect("Invalid private key") .with_chain_id(Some(POLYGON)); let config = polymarket_client_sdk::clob::Config::builder() .use_server_time(true) .build(); let unauth_client = Client::new("https://clob.polymarket.com", config) .expect("Failed to create ClobClient"); let clob_client = unauth_client .authentication_builder(&signer) .authenticate() .await .expect("Failed to authenticate CLOB client"); // Optional: Log API keys for debugging match clob_client.api_keys().await { Ok(keys) => info!(endpoint = "api_keys", result = ?keys), Err(e) => error!(endpoint = "api_keys", error = %e), } Self { portfolio, clob_client, job_running: Arc::new(Mutex::new(false)), job_interval: Arc::new(Mutex::new(60)), last_run: Arc::new(Mutex::new(None)), stats: Arc::new(Mutex::new(JobStats::default())), lsigner: signer, } } /// Real market order using the official polymarket_client_sdk pattern (FOK) pub async fn place_market_order( &self, rule: &Rule, slug: String, bucket: String, token_id: String, estimated_price: f64, ) -> TradeResult { let side = if rule.order.side.trim().to_uppercase() == "BUY" { Side::Buy } else { Side::Sell }; let amount_usdc = rule.order.amount; info!("Placing {} market order for rule '{}' | token={} | amount=${:.2}", rule.order.side, rule.oname, token_id, amount_usdc); let amount = Amount::usdc( Decimal::new((amount_usdc * 100.0).round() as i64, 2) ).expect("Failed to create USDC Amount"); // fixed: .expect on Result let order = match self.clob_client //Sign the order let signed_order = match self.clob_client.sign(&self.lsigner, order.clone()).await { Ok(s) => s, Err(e) => { let reason = format!("Failed to sign order: {}", e); error!("{}", reason); return TradeResult { success: false, rule_name: rule.oname.clone(), slug, bucket, side: rule.order.side.clone(), amount: amount_usdc, price: estimated_price, token_id, reason, }; } }; // Post the signed order match self.clob_client.post_order(signed_order).await { Ok(resp) => { // Update portfolio on success let mut port = self.portfolio.write().await; if side == Side::Buy { let cost = amount_usdc * estimated_price; // conservative estimate port.cash = (port.cash - cost).max(0.0); port.positions.push(Position { slug: slug.clone(), bucket: bucket.clone(), token_id: token_id.clone(), side: rule.order.side.clone(), amount: amount_usdc, entry_price: estimated_price, current_price: estimated_price, take_profit: rule.order.take_profit, stop_loss: rule.order.stop_loss, timestamp: Utc::now().to_rfc3339(), }); } else { // Sell: simple cash increase (real PnL would need position matching) port.cash += amount_usdc * estimated_price; } info!( "✅ CLOB MARKET ORDER EXECUTED: {} ${:.2} of {} (token {}) | Order ID: {}", rule.order.side, amount_usdc, bucket, token_id, resp.order_id ); TradeResult { success: true, rule_name: rule.oname.clone(), slug, bucket, side: rule.order.side.clone(), amount: amount_usdc, price: estimated_price, token_id, reason: format!("Order ID: {}", resp.order_id), } } Err(e) => { let reason = format!("Failed to post order: {}", e); error!("{}", reason); TradeResult { success: false, rule_name: rule.oname.clone(), slug, bucket, side: rule.order.side.clone(), amount: amount_usdc, price: estimated_price, token_id, reason, } } } } pub async fn get_portfolio(&self) -> Value { let port = self.portfolio.read().await; serde_json::json!({ "cash": port.cash, "positions_count": port.positions.len(), "positions": port.positions }) } } // src/types.rs use std::sync::Arc; use tokio::sync::RwLock; use std::collections::HashMap; use serde::{Serialize, Deserialize}; use serde_json::Value; use chrono::DateTime; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Surface { pub slug: String, pub matrix: Vec>, pub timestamps: Vec>, pub bins: Vec, pub token_pairs: Vec<(String, String)>, pub meta: HashMap, } pub type SurfaceStore = Arc>>; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Rule { pub oname: String, pub name: String, pub enabled: bool, pub created: String, pub events: Vec, // welche Slugs diese Rule betreffen pub variables: Vec, pub condition_boxes: Vec, // Token-Array für Bedingungen pub order: Order, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Variable { pub name: String, pub expression: String, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Order { pub side: String, pub amount: f64, pub order_type: String, pub price_offset: f64, pub take_profit: f64, pub stop_loss: f64, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Portfolio { pub cash: f64, pub positions: Vec, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Position { pub slug: String, pub bucket: String, pub token_id: String, pub side: String, pub amount: f64, pub entry_price: f64, pub current_price: f64, pub take_profit: f64, pub stop_loss: f64, pub timestamp: String, } pub type PortfolioStore = Arc>; #[derive(Debug, Serialize)] pub struct TradeResult { pub success: bool, pub rule_name: String, pub slug: String, pub bucket: String, pub side: String, pub amount: f64, pub price: f64, pub token_id: String, pub reason: String, } #[derive(Debug, Serialize)] pub struct RuleEvaluation { pub global_context: Value, pub event_context: Value, pub market_results: Vec>, } #[derive(Serialize)] pub struct RuleListResponse { pub rules: Vec, } #[derive(Serialize)] pub struct SurfaceListResponse { pub surfaces: Vec, } #[derive(Serialize)] pub struct SurfaceMeta { pub slug: String, pub event_title: String, pub start_day: String, pub duration_days: u32, } #[derive(Serialize)] pub struct SurfaceResponse { pub slug: String, pub matrix: Vec>, pub timestamps: Vec, pub bins: Vec, pub meta: HashMap, } #[derive(Serialize)] pub struct XTrackerResponse { pub tweets: Vec, pub last_ts: Option, } #[derive(serde::Deserialize)] pub struct FilterQuery { pub start_day: Option, pub duration: Option, } #[derive(serde::Deserialize)] pub struct TestRuleQuery { pub slug: String, pub token: Option, pub ynside: Option, } #[derive(Default, Debug, Serialize)] pub struct JobStats { pub rules_checked: u32, pub trades_executed: u32, } // src/xtracker.rs use axum::Json; use reqwest::Client; use serde_json::Value; use tracing::{info, warn}; use crate::types::XTrackerResponse; /// Fetch latest tweets from Elon Musk via XTracker API pub async fn get_xtracker() -> Json { let client = Client::new(); let url = "https://xtracker.polymarket.com/api/users/elonmusk/posts"; match client.get(url).send().await { Ok(resp) if resp.status().is_success() => { match resp.json::().await { Ok(data) => { let tweets = data .get("data") .and_then(|d| d.as_array()) .cloned() .unwrap_or_default(); info!("XTracker returned {} tweets", tweets.len()); Json(XTrackerResponse { tweets, last_ts: None, }) } Err(e) => { warn!("Failed to parse XTracker JSON: {}", e); Json(XTrackerResponse { tweets: vec![], last_ts: None, }) } } } _ => { warn!("Failed to fetch from XTracker API"); Json(XTrackerResponse { tweets: vec![], last_ts: None, }) } } }