You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

53 lines
1.5 KiB

"""
AmazingData SDK 连接测试脚本
"""
import sys
import time
def test_sdk_connection():
from app.db.session import SessionLocal
from app.models.config import SDKConfig
from app.services.amazing_data_adapter import AmazingDataAdapter
db = SessionLocal()
config = db.query(SDKConfig).filter(SDKConfig.id == 1).first()
if not config:
print("No SDK config found with id=1")
return False
print(f"Testing SDK connection with config:")
print(f" Username: {config.username}")
print(f" Host: {config.host}")
print(f" Port: {config.port}")
adapter = AmazingDataAdapter({
"username": config.username,
"password": config.password,
"host": config.host,
"port": config.port,
"local_path": config.local_path or "./amazing_data_cache/"
})
start = time.time()
print("\nConnecting to SDK...")
try:
success = adapter.connect()
elapsed = time.time() - start
if success:
print(f"✓ SDK connection successful! (took {elapsed:.2f}s)")
adapter.disconnect()
print("✓ Disconnected successfully")
return True
else:
print(f"✗ SDK connection failed! (took {elapsed:.2f}s)")
return False
except Exception as e:
print(f"✗ Error: {type(e).__name__}: {e}")
return False
if __name__ == "__main__":
success = test_sdk_connection()
sys.exit(0 if success else 1)