1.安装环境
1 | pip install pymongo locust |
2.设置测试环境
开启MongoDB服务
打开Navicat,新建MongoDB连接
新建test数据库和sample集合
3.编写脚本
load_mongo.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | # coding=utf-8 from locust import User, task, between, events from pymongo import MongoClient import random import string import logging # 设置日志 logging.basicConfig(level = logging.INFO) logger = logging.getLogger( "MongoDB Load Test" ) # MongoDB连接配置 MONGO_URI = "mongodb://admin:admin@localhost:27017/?authSource=admin" DATABASE_NAME = "test" COLLECTION_NAME = "sample" # 自定义MongoDB客户端 class MongoDBClient: def __init__( self ): try : self .client = MongoClient(MONGO_URI) self .db = self .client[DATABASE_NAME] self .collection = self .db[COLLECTION_NAME] logger.info( "MongoDB client initialized successfully." ) except Exception as e: logger.error(f "Failed to connect to MongoDB: {e}" ) self .client = None def insert_document( self , document): if self .collection is not None : self .collection.insert_one(document) logger.info( "Document inserted successfully." ) else : logger.error( "MongoDB collection is not available." ) def close_connection( self ): if self .client is not None : self .client.close() logger.info( "MongoDB client connection closed." ) # Locust用户类 class MongoDBUser(User): wait_time = between( 1 , 2 ) mongo_client = None @staticmethod def initialize_mongo_client(): if MongoDBUser.mongo_client is None : logger.info( "Initializing MongoDB client..." ) MongoDBUser.mongo_client = MongoDBClient() @staticmethod def cleanup_mongo_client(): if MongoDBUser.mongo_client: MongoDBUser.mongo_client.close_connection() MongoDBUser.mongo_client = None logger.info( "MongoDB client cleaned up." ) # 监听测试开始 @staticmethod @events .test_start.add_listener def on_test_start(environment, * * kwargs): MongoDBUser.initialize_mongo_client() # 监听测试结束 @staticmethod @events .test_stop.add_listener def on_test_stop(environment, * * kwargs): MongoDBUser.cleanup_mongo_client() @task ( 2 ) def insert_data( self ): """模拟插入数据的任务""" if MongoDBUser.mongo_client is None : logger.error( "MongoDB client is not initialized." ) return document = { "name" : ''.join(random.choices(string.ascii_letters, k = 10 )), "age" : random.randint( 18 , 65 ), "address" : ''.join(random.choices(string.ascii_letters + string.digits, k = 20 )), } MongoDBUser.mongo_client.insert_document(document) logger.info(f "Inserted document: {document}" ) |
4.运行测试
打开终端执行命令
1 | locust -f load_mongo.py --headless -u 5 -r 1 -t 5s |
测试结果
打开Navicat查看sample表可以看到数据增多
以上就是使用Locust对MongoDB进行负载测试的操作步骤的详细内容,更多关于Locust对MongoDB负载测试的资料请关注IT俱乐部其它相关文章!