1.安装环境
pip install pymongo locust
2.设置测试环境
开启MongoDB服务
打开Navicat,新建MongoDB连接
新建test数据库和sample集合
3.编写脚本
load_mongo.py
# coding=utf-8 from locust import User, task, between, events from pymongo import MongoClient import random import string import logging # 设置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger("MongoDB Load Test") # MongoDB连接配置 MONGO_URI = "mongodb://admin:admin@localhost:27017/?authSource=admin" DATABASE_NAME = "test" COLLECTION_NAME = "sample" # 自定义MongoDB客户端 class MongoDBClient: def __init__(self): try: self.client = MongoClient(MONGO_URI) self.db = self.client[DATABASE_NAME] self.collection = self.db[COLLECTION_NAME] logger.info("MongoDB client initialized successfully.") except Exception as e: logger.error(f"Failed to connect to MongoDB: {e}") self.client = None def insert_document(self, document): if self.collection is not None: self.collection.insert_one(document) logger.info("Document inserted successfully.") else: logger.error("MongoDB collection is not available.") def close_connection(self): if self.client is not None: self.client.close() logger.info("MongoDB client connection closed.") # Locust用户类 class MongoDBUser(User): wait_time = between(1, 2) mongo_client = None @staticmethod def initialize_mongo_client(): if MongoDBUser.mongo_client is None: logger.info("Initializing MongoDB client...") MongoDBUser.mongo_client = MongoDBClient() @staticmethod def cleanup_mongo_client(): if MongoDBUser.mongo_client: MongoDBUser.mongo_client.close_connection() MongoDBUser.mongo_client = None logger.info("MongoDB client cleaned up.") # 监听测试开始 @staticmethod @events.test_start.add_listener def on_test_start(environment, **kwargs): MongoDBUser.initialize_mongo_client() # 监听测试结束 @staticmethod @events.test_stop.add_listener def on_test_stop(environment, **kwargs): MongoDBUser.cleanup_mongo_client() @task(2) def insert_data(self): """模拟插入数据的任务""" if MongoDBUser.mongo_client is None: logger.error("MongoDB client is not initialized.") return document = { "name": ''.join(random.choices(string.ascii_letters, k=10)), "age": random.randint(18, 65), "address": ''.join(random.choices(string.ascii_letters + string.digits, k=20)), } MongoDBUser.mongo_client.insert_document(document) logger.info(f"Inserted document: {document}")
4.运行测试
打开终端执行命令
locust -f load_mongo.py --headless -u 5 -r 1 -t 5s
测试结果
打开Navicat查看sample表可以看到数据增多
以上就是使用Locust对MongoDB进行负载测试的操作步骤的详细内容,更多关于Locust对MongoDB负载测试的资料请关注IT俱乐部其它相关文章!