作者:fcb5511 提供两种消息队列服务安装方式: 一、通过apt[color=inherit !important]命令安装 二、通过官方源码以及binary包安装
apt命令安装方式获取的erlang和rabbitmqserver版本一般比较低,erlang版本和rabbitmqserver版本如果匹配不好,在高并发,多连接的情况下,cpu以及内存消耗较大(不知道现在最新安装版本有没有这个问题,有兴趣的可以试一试),优点是安装和使用都很方便,下面是安装和配置的各个步骤:
安装RabbitMQServer需要的语言环境 erlang,执行如下命令 1、apt-get installerlang-nox # 安装erlang 2、erl # 查看relang语言版本,成功执行则说明relang安装成功
安装RabbitMQServer消息队列服务 apt-get install rabbitmq-server #安装成功自动启动 查看RabbitMQServer安装状态 systemctl status rabbitmq-server
启用管理插件(可通过网页登录) rabbitmq-plugins enable rabbitmq_management
在[color=inherit !important]OK3399浏览器中输入localhost:1[color=inherit !important]5672,登陆用户名和密码分别为”guest”、”guest”。
如果要实现局域网的其它计算机可访问OK3399的MQ服务,需要给MQ服务添加管理员以及管理员权限。 新建admin用户并赋予权限 1、rabbitmqctl add_useradmin admin 2、rabbitmqctl set_user_tagsadmin administrator 3、rabbitmqctl set_permissions-p / admin '.*' '.*' '.*'
局域网内计算机浏览器输入192.168.1.109:15672,用户名和密码分别为”admin”、”admin” 源码编译和binary安装需要从erlang官网下载源码自行编译为arm64平台的安装包,rabbitmqserver可直接从官网下载binaray包,手动添加环境变量。这种安装方式优点是可以根据喜好安装最新发布的版本。
从github获取erlang源码包 wget https://www.erlang.org/downloads/otp_src_22.1.tar.gz 解压源码包tar -zvxf otp_src_22.1.tar.gz #编译&安装 编译前先安装ssl相关的依赖库,否则ok3399在启动mq服务时会报错apt-get install libssl-dev 接下来安装erlangcd otp_src_22.1 ./otp_build autoconf./configure && make && sudo make install安装不指定路径会默认安装到/usr/local/lib/erlang/目录下
rabbitmqserver二进制包安装 从github获取安装包 wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.0/rabbitmq-server-generic-unix-3.8.0.tar.xz
解压 tar -xvf rabbitmq-server-generic-unix-3.8.0.tar.xz将解压后的包放到/usr/local目录
配置环境变量 vim /etc/profile 在后面添加如下命令:export PATH=$PATH:/usr/local/lib/erlang/bin:/usr/local/rabbitmq_server-3.8.0/sbin让新加的环境变量有效source /etc/profile
用erl查看版本号
启动rabbitmq-server rabbitmq-plugins enable rabbitmq_management//允许网页管理rabbitmq-server 或 rabbitmq-server -detached(后台运行)
新建admin用户并赋予权限 1、rabbitmqctl add_useradmin admin 2、rabbitmqctl set_user_tagsadmin administrator 3、rabbitmqctl set_permissions-p / admin '.*' '.*' '.*'
至此,两种方式安装Rabbitmq服务完成。接下来用python来验证一下功能,python实现一个消费者,一个生产者,生产间隔1秒钟发送一帧数据,消费者从队列取数据并打印。python使用rabbitmq需要引用pika库,pika库是独立于python的第三方库,需要使用pip命令安装。 amqproducer.py程序: import pikaimport threadingimport timeimport signalimport osclass Producer(object): def __init__(self, host='127.0.0.1', port=5672, user='admin', passwd='admin', exchange='test_exchange', routing_key='test_key', exchange_type= 'fanout', exhcange_durable=False, exchange_autodelete=True ): self.host=host self.port=port self.user=user self.passwd=passwd self.exchange=exchange self.routing_key=routing_key self.exchange_type=exchange_type self.exhcange_durable=exhcange_durable self.exchange_autodelete=exchange_autodelete self.connection=None self.channel=None def [color=inherit !important]CreateConnection(self): try: self.credentials=pika.PlainCredentials(self.user, self.passwd) self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.host, port = self.port, virtual_host = '/', credentials=self.credentials)) self.channel = self.connection.channel() self.channel.exchange_declare(exchange_type=self.exchange_type, exchange = self.exchange, durable = self.exhcange_durable, auto_delete=self.exchange_autodelete) except Exception as e: print("occurring an exception when the producer executing the function 'CreateConnection()':", e) def PublisMessage(self, Message): try: self.channel.basic_publish(exchange=self.exchange, routing_key=self.routing_key, properties=pika.BasicProperties(delivery_mode = 2), body=Message) except Exception as e: print("An exception occurred when publishing message:", e)
amqconsumer.py程序: import pikaimport timeclass Consumer(object): def __init__(self, host='127.0.0.1', port=5672, user='admin', passwd='admin', queue='test_queue', exchange='test_exchange', routing_key='test_key', queue_exclusive=False, queue_durable=False, queue_autodelete=True, exchange_type= 'fanout', exhcange_durable=False, exchange_autodelete=True ): self.host=host self.port=port self.user=user self.passwd=passwd self.queue=queue self.exchange=exchange self.routing_key=routing_key self.queue_exclusive=queue_exclusive self.queue_durable=queue_durable self.queue_autodelete=queue_autodelete self.exchange_type=exchange_type self.exhcange_durable=exhcange_durable self.exchange_autodelete=exchange_autodelete self.connection=None self.channel=None def CreateConnection(self): try: self.credentials=pika.PlainCredentials(self.user, self.passwd) self.connection = pika.BlockingConnection(pika.ConnectionParameters(host = self.host, port = self.port, virtual_host = '/', credentials=self.credentials)) self.channel = self.connection.channel() self.channel.queue_declare(queue=self.queue, durable=self.queue_durable, exclusive=self.queue_exclusive, auto_delete=self.queue_autodelete) self.channel.exchange_declare(exchange_type=self.exchange_type, exchange = self.exchange, durable = self.exhcange_durable, auto_delete=self.exchange_autodelete) self.channel.queue_bind(queue = self.queue, exchange = self.exchange, routing_key = self.routing_key) self.channel.basic_consume(on_message_callback = self.on_response, auto_ack=True, queue=self.queue) except Exception as e: print("occurring an exception when the consumer RecvData:", e) def on_response(self, ch, method, props, body): #print(" [%s] Received %r" % (time.time(), body)) print(time.strftime('%H:%M:%S',time.localtime(time.time()))+': Get Message from '+self.host + ':'+body.decode()) def ProcessConsumeEventBlock(self): try: self.channel.start_consuming() except Exception as e: print('An exception occurred when executing consuming, the exception is:',e) def ProcessConsumeEventNoneBlock(self): try: self.connection.process_data_events() except Exception as e: print('An exception occurred when executing consuming, the exception is:',e)
main.py程序: import pikaimport timeimport threadingfrom amqconsumer import Consumerfrom amqproducer import Producerclass ManageMent: def __init__(self): self.producer = Producer(host='localhost',user='guest',passwd='guest') self.consumer = Consumer(host='localhost',user='guest',passwd='guest') def CreateSever(self): self.consumer.CreateConnection() self.producer.CreateConnection() def ProducerRun(self): while True: self.producer.PublisMessage('this message is used for testing RabbitMQ') time.sleep(0.3) def ConsumerRun(self): self.consumer.ProcessConsumeEventBlock() def Run1(self): while True: self.producer.PublisMessage('this message is used for test RabbitMQ') self.consumer.ProcessConsumeEventNoneBlock() time.sleep(1) def Run(self): self.Consumer_thread=threading.[color=inherit !important]Thread(target=self.ConsumerRun) self.Producer_thread=threading.Thread(target=self.ProducerRun) self.Consumer_thread.start() self.Producer_thread.start() if __name__ == '__main__': manage=ManageMent() manage.CreateSever() manage.Run()
最终运行效果如图
此贴为基础功能,下次将mq数据与qt界面结合,实现动态的界面。
点击了解详情 飞凌嵌入式官方网站<<
|