import hashlib import json import os import re import uuid from datetime import datetime, timezone import requests from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding SIGNABLE_FIELDS = ( "agent_id", "target_agent_id", "timestamp", "nonce", "input", "output", "alert_threshold", ) EXPECTED_HASH_RE = re.compile(r"expected:\s*([0-9a-f]{64})", re.IGNORECASE) def normalize_a2spa_payload(payload): normalized = {field: payload[field] for field in SIGNABLE_FIELDS if field in payload} normalized.setdefault("alert_threshold", 10) return normalized def canonicalize_a2spa_payload(payload): return json.dumps(normalize_a2spa_payload(payload), sort_keys=True) def compute_payload_hash(payload): return hashlib.sha256(canonicalize_a2spa_payload(payload).encode("utf-8")).hexdigest() def prepare_signed_bytes(payload): return canonicalize_a2spa_payload(payload).encode("utf-8") def finalize_payload(payload_before_hash): finalized = {k: v for k, v in payload_before_hash.items() if k not in ("hash", "signature")} finalized["hash"] = compute_payload_hash(payload_before_hash) return finalized def build_request_body(payload, signature): return {"payload": payload, "signature": signature} def extract_expected_hash(error_text): if not error_text: return None match = EXPECTED_HASH_RE.search(str(error_text)) return match.group(1) if match else None def build_debug_snapshot(payload_before_hash, signature=None, final_payload=None, server_error_text=None): canonical_string = canonicalize_a2spa_payload(payload_before_hash) canonical_bytes = prepare_signed_bytes(payload_before_hash) request_payload = final_payload or finalize_payload(payload_before_hash) snapshot = { "payload_before_hash": payload_before_hash, "canonical_string": canonical_string, "canonical_bytes_hex": canonical_bytes.hex(), "client_computed_hash": compute_payload_hash(payload_before_hash), "final_request_body": build_request_body(request_payload, signature) if signature else {"payload": request_payload}, "server_expected_hash": extract_expected_hash(server_error_text), } if server_error_text is not None: snapshot["server_error"] = server_error_text return snapshot def sign_payload(private_key, payload): return private_key.sign( prepare_signed_bytes(payload), padding.PKCS1v15(), hashes.SHA256(), ).hex() class A2SPAClient: def __init__(self, api_base, api_key, agent_id, private_key_path, timeout=20): self.api_base = api_base.rstrip("/") self.api_key = api_key self.agent_id = agent_id self.private_key_path = private_key_path self.timeout = timeout self._private_key = None @classmethod def from_env(cls): return cls( api_base=os.environ.get("A2SPA_API_BASE", "https://aimodularity.com/A2SPA"), api_key=os.environ["A2SPA_API_KEY"], agent_id=os.environ["A2SPA_AGENT_ID"], private_key_path=os.environ["A2SPA_PRIVATE_KEY_PATH"], ) def _load_private_key(self): if self._private_key is None: with open(self.private_key_path, "rb") as handle: self._private_key = serialization.load_pem_private_key(handle.read(), password=None) return self._private_key def build_payload_fields(self, target_agent_id, input_data, output_data, alert_threshold=10, extra=None): payload = { "agent_id": self.agent_id, "target_agent_id": target_agent_id, "timestamp": datetime.now(timezone.utc).isoformat(), "nonce": str(uuid.uuid4()), "input": input_data, "output": output_data, "alert_threshold": alert_threshold, } if extra: payload.update(extra) return payload def build_payload(self, target_agent_id, input_data, output_data, alert_threshold=10, extra=None): return finalize_payload( self.build_payload_fields(target_agent_id, input_data, output_data, alert_threshold, extra) ) def build_signed_request(self, target_agent_id, input_data, output_data, alert_threshold=10, extra=None): payload_before_hash = self.build_payload_fields( target_agent_id, input_data, output_data, alert_threshold, extra, ) payload = finalize_payload(payload_before_hash) signature = sign_payload(self._load_private_key(), payload_before_hash) return { "payload_before_hash": payload_before_hash, "payload": payload, "signature": signature, "request_body": build_request_body(payload, signature), } def post_request_body(self, request_body): return requests.post( f"{self.api_base}/api/verify_payload", headers={ "Content-Type": "application/json", "x-api-key": self.api_key, }, json=request_body, timeout=self.timeout, ) def send_payload(self, target_agent_id, input_data, output_data, alert_threshold=10, extra=None, debug=False): signed_request = self.build_signed_request( target_agent_id, input_data, output_data, alert_threshold, extra, ) response = self.post_request_body(signed_request["request_body"]) if debug and response.status_code >= 400: print( json.dumps( build_debug_snapshot( signed_request["payload_before_hash"], signed_request["signature"], signed_request["payload"], response.text, ), indent=2, sort_keys=True, ) ) response.raise_for_status() return response.json() def fetch_logs(self, agent_id=None): response = requests.get( f"{self.api_base}/api/logs_for_agent", headers={"x-api-key": self.api_key}, params={"agent_id": agent_id or self.agent_id}, timeout=self.timeout, ) response.raise_for_status() return response.json().get("logs", []) def fetch_inbox(self, agent_id=None, after=None, limit=50): params = {"agent_id": agent_id or self.agent_id, "limit": limit} if after: params["after"] = after response = requests.get( f"{self.api_base}/api/inbox_for_agent", headers={"x-api-key": self.api_key}, params=params, timeout=self.timeout, ) response.raise_for_status() return response.json().get("messages", []) def run_and_send(self, target_agent_id, input_data, runner, alert_threshold=10): output_data = runner(input_data) return self.send_payload(target_agent_id, input_data, output_data, alert_threshold) def existing_agent(input_data): return { "status": "ready", "summary": f"Processed: {input_data.get('message', 'no message')}", } if __name__ == "__main__": client = A2SPAClient.from_env() target_agent_id = os.environ["A2SPA_TARGET_AGENT_ID"] input_data = {"message": "Hello from my existing Python agent"} output_data = existing_agent(input_data) signed_request = client.build_signed_request( target_agent_id, input_data, output_data, ) first_response = client.post_request_body(signed_request["request_body"]) print(f"First request: {first_response.status_code} {first_response.text}") second_response = client.post_request_body(signed_request["request_body"]) print(f"Second request: {second_response.status_code} {second_response.text}")