import hashlib import json import os import uuid from datetime import datetime, timezone import requests from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding class A2SPAClient: def __init__(self, api_base, api_key, agent_id, private_key_path, timeout=20): self.api_base = api_base.rstrip("/") self.api_key = api_key self.agent_id = agent_id self.private_key_path = private_key_path self.timeout = timeout self._private_key = None @classmethod def from_env(cls): return cls( api_base=os.environ.get("A2SPA_API_BASE", "https://aimodularity.com/A2SPA"), api_key=os.environ["A2SPA_API_KEY"], agent_id=os.environ["A2SPA_AGENT_ID"], private_key_path=os.environ["A2SPA_PRIVATE_KEY_PATH"], ) def _load_private_key(self): if self._private_key is None: with open(self.private_key_path, "rb") as handle: self._private_key = serialization.load_pem_private_key(handle.read(), password=None) return self._private_key def _canonical_json(self, value): return json.dumps(value, sort_keys=True) def _sha256_hex(self, value): clean = {k: v for k, v in value.items() if k not in ("hash", "signature")} return hashlib.sha256(self._canonical_json(clean).encode()).hexdigest() def _sign(self, payload): clean = {k: v for k, v in payload.items() if k not in ("hash", "signature")} message = self._canonical_json(clean).encode() return self._load_private_key().sign( message, padding.PKCS1v15(), hashes.SHA256(), ).hex() def build_payload(self, target_agent_id, input_data, output_data, alert_threshold=10, extra=None): payload = { "agent_id": self.agent_id, "target_agent_id": target_agent_id, "timestamp": datetime.now(timezone.utc).isoformat(), "nonce": str(uuid.uuid4()), "input": input_data, "output": output_data, "alert_threshold": alert_threshold, } if extra: payload.update(extra) payload["hash"] = self._sha256_hex(payload) return payload def send_payload(self, target_agent_id, input_data, output_data, alert_threshold=10, extra=None): payload = self.build_payload(target_agent_id, input_data, output_data, alert_threshold, extra) signature = self._sign(payload) response = requests.post( f"{self.api_base}/api/verify_payload", headers={ "Content-Type": "application/json", "x-api-key": self.api_key, }, json={"payload": payload, "signature": signature}, timeout=self.timeout, ) response.raise_for_status() return response.json() def fetch_logs(self, agent_id=None): response = requests.get( f"{self.api_base}/api/logs_for_agent", headers={"x-api-key": self.api_key}, params={"agent_id": agent_id or self.agent_id}, timeout=self.timeout, ) response.raise_for_status() return response.json().get("logs", []) def fetch_inbox(self, agent_id=None, after=None, limit=50): params = {"agent_id": agent_id or self.agent_id, "limit": limit} if after: params["after"] = after response = requests.get( f"{self.api_base}/api/inbox_for_agent", headers={"x-api-key": self.api_key}, params=params, timeout=self.timeout, ) response.raise_for_status() return response.json().get("messages", []) def run_and_send(self, target_agent_id, input_data, runner, alert_threshold=10): output_data = runner(input_data) return self.send_payload(target_agent_id, input_data, output_data, alert_threshold) def existing_agent(input_data): return { "status": "ready", "summary": f"Processed: {input_data.get('message', 'no message')}", } if __name__ == "__main__": client = A2SPAClient.from_env() target_agent_id = os.environ["A2SPA_TARGET_AGENT_ID"] result = client.run_and_send( target_agent_id, {"message": "Hello from my existing Python agent"}, existing_agent, ) print(result)