a simple example rabbitMQ

布满荆棘的人生 2022-09-20 15:09 265阅读 0赞

此rabbitMQ 版本3.2.2:

  1. package com.miracle.queue;
  2. import java.io.IOException;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.Connection;
  5. import com.rabbitmq.client.ConnectionFactory;
  6. /**
  7. *
  8. * @author lianan
  9. * note:init facetory connection
  10. */
  11. public abstract class EndPoint {
  12. protected Channel channel;
  13. protected Connection connection;
  14. protected String endPointName;
  15. public EndPoint(String endPointName){
  16. try {
  17. this.endPointName = endPointName;
  18. ConnectionFactory factory = new ConnectionFactory();
  19. factory.setHost("127.0.0.1");
  20. factory.setUsername("guest");
  21. factory.setPassword("guest");
  22. factory.setVirtualHost("/");
  23. connection = factory.newConnection();
  24. channel = connection.createChannel();
  25. /**
  26. * queue - the name of the queuedurable
  27. * true - if we are declaring a durable queue (the queue will survive a server restart)exclusive
  28. * true - if we are declaring an exclusive queue (restricted to this connection)autoDelete
  29. * true - if we are declaring an autodelete queue (server will delete it when no longer in use)arguments
  30. * other - properties (construction arguments) for the queue
  31. */
  32. channel.queueDeclare(endPointName, true, false, false, null);
  33. } catch (Exception e) {
  34. System.out.println("出错了");
  35. }
  36. }
  37. public void close()throws IOException{
  38. this.channel.close();
  39. this.connection.close();
  40. }
  41. }
  42. package com.miracle.queue;
  43. import java.io.IOException;
  44. import java.io.Serializable;
  45. import org.apache.commons.lang.SerializationUtils;
  46. /**
  47. *
  48. * @author lianan
  49. * note:生产者,发送消息
  50. */
  51. public class Producer extends EndPoint{
  52. public Producer(String endPointName){
  53. super(endPointName);
  54. // TODO Auto-generated constructor stub
  55. }
  56. public void sendMessage(Serializable serializable)throws IOException{
  57. channel.basicPublish("", endPointName, null, SerializationUtils.serialize(serializable));
  58. }
  59. }
  60. package com.miracle.queue;
  61. import java.io.IOException;
  62. import java.util.HashMap;
  63. import java.util.Map;
  64. import org.apache.commons.lang.SerializationUtils;
  65. import com.rabbitmq.client.AMQP.BasicProperties;
  66. import com.rabbitmq.client.Consumer;
  67. import com.rabbitmq.client.Envelope;
  68. import com.rabbitmq.client.ShutdownSignalException;
  69. /**
  70. *
  71. * @author lianan
  72. * note:消费者,接收消息
  73. */
  74. public class QueueConsumer extends EndPoint implements Consumer{
  75. public QueueConsumer(String endPointName)throws IOException{
  76. super(endPointName);
  77. }
  78. public void consumer() throws IOException{
  79. channel.basicConsume(endPointName, true, this);
  80. }
  81. @Override
  82. public void handleCancel(String arg0) throws IOException {
  83. }
  84. @Override
  85. public void handleCancelOk(String arg0) {
  86. System.out.println("consumer"+arg0+"registered");
  87. }
  88. @Override
  89. public void handleConsumeOk(String arg0) {
  90. // TODO Auto-generated method stub
  91. }
  92. @Override
  93. public void handleDelivery(String arg0, Envelope arg1,
  94. BasicProperties arg2, byte[] arg3) throws IOException {
  95. Map map=(HashMap)SerializationUtils.deserialize(arg3);
  96. System.out.println("Message Number "+ map.get("message number") + " received.");
  97. }
  98. @Override
  99. public void handleRecoverOk(String arg0) {
  100. // TODO Auto-generated method stub
  101. }
  102. @Override
  103. public void handleShutdownSignal(String arg0, ShutdownSignalException arg1) {
  104. // TODO Auto-generated method stub
  105. }
  106. }
  107. package com.miracle.queue;
  108. import java.io.IOException;
  109. import java.io.Serializable;
  110. import java.util.Date;
  111. import java.util.HashMap;
  112. import java.util.Random;
  113. /**
  114. *
  115. * @author lianan
  116. * note:测试类
  117. */
  118. public class Test {
  119. public static void main(String arg0[]) throws IOException{
  120. //read queue information,and can thread
  121. long t1 = new Date().getTime();
  122. QueueConsumer consumer = new QueueConsumer("river");
  123. consumer.consumer();
  124. //根据queue名称,把info放入队列
  125. /*Producer producer = new Producer("river");
  126. for (int i = 0; i < 100000; i++) {
  127. HashMap message = new HashMap();
  128. message.put("message number", i);
  129. producer.sendMessage(message);
  130. System.out.println("Message Number "+ i +" sent.");
  131. }
  132. producer.close();*/
  133. long t2 = new Date().getTime();
  134. System.out.println(t2-t1);
  135. }
  136. }

发表评论

表情:
评论列表 (有 0 条评论,265人围观)

还没有评论,来说两句吧...

相关阅读