需求:
可以对指定机器异步的执行多个命令
例子:
>>:run "df -h" --hosts 192.168.3.55 10.4.3.4 task id: 45334 >>: check_task 45334 >>:
注意,每执行一条命令,即立刻生成一个任务ID,不需等待结果返回,通过命令check_task TASK_ID来得到任务结果
README
1 基于RabbitMQ rpc实现的主机管理 2 可以对指定机器异步的执行多个命令 3 例子: 4 >>:run "df -h" --hosts 192.168.3.55 10.4.3.4 5 task id: 45334 6 >>: check_task 45334 #查看任务信息 7 8 程序结构: 9 RabbitMQ_PRC/#综合目录10 |- - -PRC_CLIENT/#client程序主目录11 | |- - -__init__.py12 | |- - -bin/#执行程目录13 | | |- - -__init__.py14 | | |- - -clien_start.py #客户端执行文件15 | |16 | |17 | |- - -core #主逻辑程序目录18 | | |- - -__init__.py19 | | |- - -clien_class.py#客户端执行主要逻辑 类20 | |21 | |22 |23 |24 |- - -PRC_SERVER/#服务端程序目录25 | |- - -__init__.py26 | |- - -bin/#执行目录27 | | |- - -__init__.py28 | | |- - -server_start.py#服务端程序执行文件29 | |30 | |31 | |- - -core/##主逻辑程序目录32 | | |- - -server_class.py#主逻辑 相关类33 | |34 |35 |- - -README
程序结构: RabbitMQ_PRC/#综合目录 |- - -PRC_CLIENT/#client程序主目录 | |- - -__init__.py | |- - -bin/#执行程目录 | | |- - -__init__.py | | |- - -clien_start.py #客户端执行文件
1 import os ,sys2 BASE_DIR=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))#获取相对路径转为绝对路径赋于变量3 sys.path.append(BASE_DIR)#增加环境变量4 5 from core.client_class import Threa6 7 if __name__ == '__main__':8 RPCS=Threa()9 response=RPCS.th_start()View Code
| |- - -core #主逻辑程序目录 | | |- - -__init__.py | | |- - -clien_class.py#客户端执行主要逻辑 类
1 import pika 2 import uuid 3 import threading 4 import random 5 6 class FibonacciRpcClient(object): 7 def __init__(self): 8 #self.credentials=pika.PlainCredentials("test","test") 9 self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))#生成连接的服务端 ip 10 #self.connection = pika.BlockingConnection(pika.ConnectionParameters("192.168.11.51",15672,'/',self.credentials))#生成连接的服务端 ip 11 self.channel = self.connection.channel()#创建一个管道 12 13 def get_respon(self,cal_queue,cal_id):#取任务信息 14 self.response=None 15 self.callback_id=cal_id#队列名 16 self.channel.basic_consume(self.on_response,queue=cal_queue)# 使用回调函数 17 while self.response is None: 18 self.connection.process_data_events()#非阻塞模式接收消息 19 return self.response#返回 20 21 def on_response(self, ch, method, props, body):#回调函数 22 if self.callback_id == props.correlation_id:#判断服务端返回的队列名是否与当前所生成的队列名一致 23 self.response = body# 将服务端的结果赋于返回来的结果变量 24 ch.basic_ack(delivery_tag = method.delivery_tag)##确保消息被 接收 25 26 def call(self, queues,n):#发送消息的函数 27 result = self.channel.queue_declare(exclusive=False)#随机生成一个队列,收消息后不删除 28 self.callback_queue = result.method.queue#赋于管道 变量 29 self.corr_id = str(uuid.uuid4())#生成一个服务端返回消息的队列名 30 self.channel.basic_publish(exchange='', 31 routing_key=queues,#队列名 32 properties=pika.BasicProperties( 33 reply_to = self.callback_queue,#发送的管道队列名 34 correlation_id = self.corr_id,#发送给服务端,用于返回消息的队列名 35 ), 36 body=str(n))#发送的内容数据 37 return self.callback_queue,self.corr_id#返回管道名 队列id号 38 39 class Threa(object):#线程 类 40 def __init__(self): 41 self.info={}#生成一个字典 42 self.help_info=''' 指令示例