PUBLISH指令可用于發(fā)布一條消息,格式 PUBLISH channel message
返回值表示訂閱了該消息的數(shù)量。
2)訂閱
SUBSCRIBE指令用于接收一條消息,格式 SUBSCRIBE channel
可以看到使用SUBSCRIBE指令后進(jìn)入了訂閱模式,但沒(méi)有接收到publish發(fā)送的消息,這是因?yàn)橹挥性谙l(fā)出去前訂閱才會(huì)接收到。在這個(gè)模式下其他指令,只能看到回復(fù)?;貜?fù)分為三種類型:
1、如果為subscribe,第二個(gè)值表示訂閱的頻道,第三個(gè)值表示是第幾個(gè)訂閱的頻道?(理解成序號(hào)?)
2、如果為message(消息),第二個(gè)值為產(chǎn)生該消息的頻道,第三個(gè)值為消息
3、如果為unsubscribe,第二個(gè)值表示取消訂閱的頻道,第三個(gè)值表示當(dāng)前客戶端的訂閱數(shù)量。
可以使用指令UNSUBSCRIBE退訂,如果不加參數(shù),則會(huì)退訂所有由SUBSCRIBE指令訂閱的頻道。
Redis還支持基于通配符的消息訂閱,使用指令PSUBSCRIBE (pattern subscribe),例如:
再試試推送消息會(huì)得到以下結(jié)果:
可以看到publish指令返回的是2,而訂閱端這邊接收了兩次消息。這是因?yàn)镻SUBSCRIBE指令可以重復(fù)訂閱頻道。而使用PSUBSCRIBE指令訂閱的頻道也要使用指令PUNSUBSCRIBE指令退訂,該指令無(wú)法退訂SUBSCRIBE訂閱的頻道,同理UNSUBSCRIBE也不能退訂PSUBSCRIBE指令訂閱的頻道。同時(shí)PUNSUBSCRIBE指令通配符不會(huì)展開(kāi)。
例如:PUNSUBSCRIBE * 不會(huì)匹配到 channel.*, 所以要取消訂閱channel.*就要這樣寫(xiě)PUBSUBSCRIBE channel.*。
代碼示范如下:
package org.yamikaze.redis.messsage.subscribe;
import org.yamikaze.redis.messsage.queue.StringUtils;import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;
/**
* 消息發(fā)布方
* @author yamikaze */public class Publisher {
public static final String CHANNEL_KEY = "channel:message"; private Jedis jedis;
public Publisher() {
jedis = MyJedisFactory.getLocalJedis();
}
public void publishMessage(String message) { if(StringUtils.isBlank(message)) { return;
}
jedis.publish(CHANNEL_KEY, message);
}
public static void main(String[] args) {
Publisher publisher = new Publisher();
publisher.publishMessage("Hello Redis!");
}
}
簡(jiǎn)單的發(fā)送一個(gè)消息。
消息訂閱方:
package org.yamikaze.redis.messsage.subscribe;
import org.yamikaze.redis.test.MyJedisFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub;
import java.util.concurrent.TimeUnit;
/**
* 消息訂閱方客戶端
* @author yamikaze */public class SubscribeClient {
private Jedis jedis; private static final String EXIT_COMMAND = "exit";
public SubscribeClient() {
jedis = MyJedisFactory.getLocalJedis();
}
public void subscribe(String ...channel) { if(channel == null || channel.length <= 0) { return;
} //消息處理,接收到消息時(shí)如何處理
JedisPubSub jps = new JedisPubSub() { /**
* JedisPubSub類是一個(gè)沒(méi)有抽象方法的抽象類,里面方法都是一些空實(shí)現(xiàn)
* 所以可以選擇需要的方法覆蓋,這兒使用的是SUBSCRIBE指令,所以覆蓋了onMessage
* 如果使用PSUBSCRIBE指令,則覆蓋onPMessage方法
* 當(dāng)然也可以選擇BinaryJedisPubSub,同樣是抽象類,但方法參數(shù)為byte[] */
@Override public void onMessage(String channel, String message) { if(Publisher.CHANNEL_KEY.equals(channel)) {
System.out.println("接收到消息: channel : " message); //接收到exit消息后退出
if(EXIT_COMMAND.equals(message)) {
System.exit(0);
}
}
}
/**
* 訂閱時(shí) */
@Override public void onSubscribe(String channel, int subscribedChannels) { if(Publisher.CHANNEL_KEY.equals(channel)) {
System.out.println("訂閱了頻道:" channel);
}
}
}; //可以訂閱多個(gè)頻道 當(dāng)前線程會(huì)阻塞在這兒 jedis.subscribe(jps, channel);
}
public static void main(String[] args) {
SubscribeClient client = new SubscribeClient();
client.subscribe(Publisher.CHANNEL_KEY); //并沒(méi)有 unsubscribe方法 //相應(yīng)的也沒(méi)有punsubscribe方法 }
}
先運(yùn)行client,再運(yùn)行Publisher進(jìn)行消息發(fā)送,輸出結(jié)果:
Redis的pub/sub也有其缺點(diǎn),那就是如果消費(fèi)者下線,生產(chǎn)者的消息會(huì)丟失。
延時(shí)隊(duì)列背景
在業(yè)務(wù)發(fā)展過(guò)程中,會(huì)出現(xiàn)一些需要延時(shí)處理的場(chǎng)景,比如:
a.訂單下單之后超過(guò)30分鐘用戶未支付,需要取消訂單
b.訂單一些評(píng)論,如果48h用戶未對(duì)商家評(píng)論,系統(tǒng)會(huì)自動(dòng)產(chǎn)生一條默認(rèn)評(píng)論
c.點(diǎn)我達(dá)訂單下單后,超過(guò)一定時(shí)間訂單未派出,需要超時(shí)取消訂單等。。。
處理這類需求,比較直接簡(jiǎn)單的方式就是定時(shí)任務(wù)輪訓(xùn)掃表。這種處理方式在數(shù)據(jù)量不大的場(chǎng)景下是完全沒(méi)問(wèn)題,但是當(dāng)數(shù)據(jù)量大的時(shí)候高頻的輪訓(xùn)數(shù)據(jù)庫(kù)就會(huì)比較的耗資源,導(dǎo)致數(shù)據(jù)庫(kù)的慢查或者查詢超時(shí)。所以在處理這類需求時(shí)候,采用了延時(shí)隊(duì)列來(lái)完成。
幾種延時(shí)隊(duì)列
延時(shí)隊(duì)列就是一種帶有延遲功能的消息隊(duì)列。下面會(huì)介紹幾種目前已有的延時(shí)隊(duì)列:
1.Java中java.util.concurrent.DelayQueue
優(yōu)點(diǎn):JDK自身實(shí)現(xiàn),使用方便,量小適用
缺點(diǎn):隊(duì)列消息處于jvm內(nèi)存,不支持分布式運(yùn)行和消息持久化
2.Rocketmq延時(shí)隊(duì)列
優(yōu)點(diǎn):消息持久化,分布式
缺點(diǎn):不支持任意時(shí)間精度,只支持特定level的延時(shí)消息
3.Rabbitmq延時(shí)隊(duì)列(TTL DLX實(shí)現(xiàn))
優(yōu)點(diǎn):消息持久化,分布式
缺點(diǎn):延時(shí)相同的消息必須扔在同一個(gè)隊(duì)列
Redis實(shí)現(xiàn)的延時(shí)消息隊(duì)列適合的項(xiàng)目特點(diǎn):Spring框架管理對(duì)象有消息需求,但不想維護(hù)mq中間件有使用redis對(duì)消息持久化并沒(méi)有很苛刻的要求Redis實(shí)現(xiàn)的延時(shí)消息隊(duì)列思路
Redis由于其自身的Zset數(shù)據(jù)結(jié)構(gòu),本質(zhì)就是Set結(jié)構(gòu)上加了個(gè)排序的功能,除了添加數(shù)據(jù)value之外,還提供另一屬性score,這一屬性在添加修改元素時(shí)候可以指定,每次指定后,Zset會(huì)自動(dòng)重新按新的值調(diào)整順序??梢岳斫鉃橛袃闪凶侄蔚臄?shù)據(jù)表,一列存value,一列存順序編號(hào)。操作中key理解為zset的名字,那么對(duì)延時(shí)隊(duì)列又有何用呢?
試想如果score代表的是想要執(zhí)行時(shí)間的時(shí)間戳,在某個(gè)時(shí)間將它插入Zset集合中,它變會(huì)按照時(shí)間戳大小進(jìn)行排序,也就是對(duì)執(zhí)行時(shí)間前后進(jìn)行排序,這樣的話,起一個(gè)死循環(huán)線程不斷地進(jìn)行取第一個(gè)key值,如果當(dāng)前時(shí)間戳大于等于該key值的socre就將它取出來(lái)進(jìn)行消費(fèi)刪除,就可以達(dá)到延時(shí)執(zhí)行的目的, 注意不需要遍歷整個(gè)Zset集合,以免造成性能浪費(fèi)。
Zset的排列效果如下圖:
java代碼實(shí)現(xiàn)如下:
package cn.chinotan.service.delayQueueRedis;import org.apache.commons.lang3.StringUtils;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.Tuple;import java.text.SimpleDateFormat;import java.util.Calendar;import java.util.Date;import java.util.Set;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;/**
* @program: test
* @description: redis實(shí)現(xiàn)延時(shí)隊(duì)列
* @author: xingcheng
* @create: 2018-08-19
**/public class AppTest { private static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private static JedisPool jedisPool = new JedisPool(ADDR, PORT); private static CountDownLatch cdl = new CountDownLatch(10); public static Jedis getJedis() { return jedisPool.getResource();
} /**
* 生產(chǎn)者,生成5個(gè)訂單 */
public void productionDelayMessage() { for (int i = 0; i < 5; i ) {
Calendar instance = Calendar.getInstance(); // 3秒后執(zhí)行
instance.add(Calendar.SECOND, 3 i);
AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i 1));
System.out.println("生產(chǎn)訂單: " StringUtils.join("000000000", i 1) " 當(dāng)前時(shí)間:" new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
System.out.println((3 i) "秒后執(zhí)行");
}
} //消費(fèi)者,取訂單
public static void consumerDelayMessage() {
Jedis jedis = AppTest.getJedis(); while (true) {
Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0); if (order == null || order.isEmpty()) {
System.out.println("當(dāng)前沒(méi)有等待的任務(wù)"); try {
TimeUnit.MICROSECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
} continue;
}
Tuple tuple = (Tuple) order.toArray()[0]; double score = tuple.getScore();
Calendar instance = Calendar.getInstance(); long nowTime = instance.getTimeInMillis() / 1000; if (nowTime >= score) {
String element = tuple.getElement();
Long orderId = jedis.zrem("orderId", element); if (orderId > 0) {
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) ":redis消費(fèi)了一個(gè)任務(wù):消費(fèi)的訂單OrderId為" element);
}
}
}
} static class DelayMessage implements Runnable{
@Override public void run() { try {
cdl.await();
consumerDelayMessage();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
AppTest appTest = new AppTest();
appTest.productionDelayMessage(); for (int i = 0; i < 10; i ) { new Thread(new DelayMessage()).start();
cdl.countDown();
}
}
}
實(shí)現(xiàn)效果如下:
更多關(guān)于云服務(wù)器,域名注冊(cè),虛擬主機(jī)的問(wèn)題,請(qǐng)?jiān)L問(wèn)西部數(shù)碼官網(wǎng):m.ps-sw.cn