博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ学习总结(3)——入门实例教程详解
阅读量:5765 次
发布时间:2019-06-18

本文共 5643 字,大约阅读时间需要 18 分钟。

hot3.png

一、起航

       本章节,柯南君将从几个层面,用官网例子讲解一下RabbitMQ的实操经典程序案例,让大家重新回到经典“Hello world!”(The simplest thing that does something )时代,RabbitMQ 支持N多种客户端(client),这里无法一一讲解,暂定java client,有时间的情况下,在弥补一下。

事先,先普及一下图标(我们会在下面的事例中,会大量用到,所以先普及一下,便于识别,最终更好理解事例的含义)

1、图标概念

① producting(生产者):在程序中 发送消息的一端,我们暂且称之为 生产者,在这里用“p”表示

② queue(队列):队列是一个邮箱的名字。它住在RabbitMQ。尽管消息流经RabbitMQ和您的应用程序,他们只可以存储在一个队列中。队列是不受任何限制,它可以储存尽可能多的信息(按你兴趣来了),它本质上是一个无限缓冲区。许多生产商可以发送消息到一个队列,许多消费者可以尝试接收数据从一个队列。

③ consuming(消费者):消费者和生产者是对应的,较为相似的意思;在这里,我用“C”表示

2、The Java client library

RabbitMQ 中
AMQP这是一个开放的、通用的协议消息。有许多客户AMQP在许多不同的语言。我们将使用提供的Java客户机RabbitMQ。
 

我们需要下载(Download) client library package,并要核实每个jar包,解压到相应位置,如下图所示:

第一步:点击 ,然后找到相应的lib下载位置

第二步:选择合适的下载,比如我下载了zip包,如图所示:

第三步:Unzip it(解压它) 到你的working directory(工作目录)中 and grab (并且获得)你的jar包文件

 
$ unzip rabbitmq-java-client-bin-*.zip $ cp rabbitmq-java-client-bin-*/*.jar ./

二、程序案例

1) "Hello World" 

在这部分教程中我们将用Java写两个程序,发送一个消息的生产者,消费者接收信息并打印出来。我们会掩盖一些细节的Java API,集中在这个非常简单的东西开始。这是一个“Hello World”的消息。在下面的图中,“P”是我们的生产和“C”是我们的消费者。中间的框是一个队列,消息缓冲区RabbitMQ保持代表消费者。
 ① sending (发送)
首先 让The sender(消息发送者)发送消息并且让我们的receiver (消息接收者)接收消息,The sender(消息发送者)将会connect to(连接)RabbitMQ,发送一个single message(单一的信息),然后exit(退出)。
  • 在send.java 中,我们需要import一些class ,如下所示:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel;
  • set up(设置)类和queue的name
public class Send {
private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException {
... } }
  • then 我们create 一个connection (连接)到server(服务端)
onnectionFactory factory = new ConnectionFactory();    factory.setHost("localhost");    Connection connection = factory.newConnection();    Channel channel = connection.createChannel();
备注
  •  这个connection 是抽象的socket connection链接;
  •  负责协议版本(protocol version negotigation)和身份认证(authentication );
  • 我们在本地机器上连接到一个代理即 localhost ,如果我们想要连接到代理不同机器上我们简单的指定其名称或者IP地址即可;
   接下来,我们创建一个channel(通道),这个通道汇集了大多数的API服务!
为了发送,我们必须先声明一个为我们发送queue(队列),然后,往queue里发送一个message
channel.queueDeclare(QUEUE_NAME, false, false, false, null);    String message = "Hello World!";    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());    System.out.println(" [x] Sent '" + message + "'");
消息内容是一个字节数组,所以你可以编码任何你喜欢的。最后,我们关闭通道和连接;
channel.close();    connection.close();
问题
 如果 sending doesn‘t work! 我们将怎么办?why?
如果这是你第一次使用RabbitMQ并且你看不到“发送的”消息,那么你可能抓耳挠 腮没有足够的空闲磁盘空间(默认情况下它需要至少1 gb免费),因此拒绝接受消息。检查代理日志文件确认,如果有必要减少限制。配置文件的文档将向您展示如何设置disk_free_limit。
接下来的是send.java所有源代码:
[java] 
  1. import com.rabbitmq.client.ConnectionFactory;  
  2. import com.rabbitmq.client.Connection;  
  3. import com.rabbitmq.client.Channel;  
  4.   
  5. public class Send {  
  6.       
  7.   private final static String QUEUE_NAME = "hello";  
  8.   
  9.   public static void main(String[] argv) throws Exception {  
  10.                 
  11.     ConnectionFactory factory = new ConnectionFactory();  
  12.     factory.setHost("localhost");  
  13.     Connection connection = factory.newConnection();  
  14.     Channel channel = connection.createChannel();  
  15.   
  16.     channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  
  17.     String message = "Hello World!";  
  18.     channel.basicPublish("", QUEUE_NAME, null, message.getBytes());  
  19.     System.out.println(" [x] Sent '" + message + "'");  
  20.       
  21.     channel.close();  
  22.     connection.close();  
  23.   }  
  24. }  
    ② Receiving (接收)
  这就是我们的发送者。我们的接收器是将消息从RabbitMQ,所以不像发送方发布一个消息,我们将保持运行监听消息并打印出来
  • The code (in ) has almost the same imports as Send:
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer;
额外的QueueingConsumer是一个类,我们将使用缓冲消息推到我们的服务器。设置发送者一样,我们打开一个连接和一个通道,并宣布我们将使用的队列。注意这与队列,发送发布。
public class Recv {
private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); ... }}
注意,我们在这里声明队列。因为我们可能会在发送方之前
开始启动接收方,我们要确保队列存在之前我们尝试使用消息。我们要告诉服务器提供我们从队列的消息。因为它将异步消息,我们提供一个回调对象的形式,将缓冲的消息,直到我们准备使用它们。
QueueingConsumer要做什么呢?
QueueingConsumer consumer = new QueueingConsumer(channel);    channel.basicConsume(QUEUE_NAME, true, consumer);    while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); }

QueueingConsumer.nextDelivery()块,直到另一个来自服务器的消息交付。  
下面是Recv.java 源代码:

[java] 
  1. import com.rabbitmq.client.ConnectionFactory;  
  2. import com.rabbitmq.client.Connection;  
  3. import com.rabbitmq.client.Channel;  
  4. import com.rabbitmq.client.QueueingConsumer;  
  5.   
  6. public class Recv {  
  7.       
  8.     private final static String QUEUE_NAME = "hello";  
  9.   
  10.     public static void main(String[] argv) throws Exception {  
  11.   
  12.     ConnectionFactory factory = new ConnectionFactory();  
  13.     factory.setHost("localhost");  
  14.     Connection connection = factory.newConnection();  
  15.     Channel channel = connection.createChannel();  
  16.   
  17.     channel.queueDeclare(QUEUE_NAME, falsefalsefalsenull);  
  18.     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");  
  19.       
  20.     QueueingConsumer consumer = new QueueingConsumer(channel);  
  21.     channel.basicConsume(QUEUE_NAME, true, consumer);  
  22.       
  23.     while (true) {  
  24.       QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  25.       String message = new String(delivery.getBody());  
  26.       System.out.println(" [x] Received '" + message + "'");  
  27.     }  
  28.   }  
  29. }  

转载于:https://my.oschina.net/zhanghaiyang/blog/599407

你可能感兴趣的文章
杠上Spark、Flink?Kafka为何转型流数据平台
查看>>
“为什么中国没有Apache基金会这样的组织?”
查看>>
C#的未来:方法契约
查看>>
PWA即将推向所有Chrome平台
查看>>
使用Prometheus监控Cloudflare的全球网络
查看>>
.NET Core 3.0中的数据库驱动框架System.Data
查看>>
Python数据可视化2018:数据可视化库为什么这么多?
查看>>
Micronaut教程:如何使用基于JVM的框架构建微服务
查看>>
Analytics Zoo:在Apache Spark上实现分布式Tensorflow和BigDL管道的统一分析和AI平台
查看>>
全新Docker Hub发布:提供查找、存储和共享容器镜像单一体验
查看>>
据Progress调查:2018年,70%的客户在使用NoSQL
查看>>
腾讯信鸽海量移动推送服务是如何构建的
查看>>
数组的reduce方法
查看>>
冗余代码检测与分析
查看>>
借Java EE守护者联盟之力拯救Java EE
查看>>
新书问答:Lost and Founder
查看>>
《Java 20年:道路与梦想》迷你书发布
查看>>
13岁女孩因发布JavaScript无限循环代码被捕
查看>>
DCloud王安:HTML5颠覆原生势不可挡
查看>>
Scrum丰田之道
查看>>