文章标签 ‘Redis’
2014四月8

Nutz:结合Jedis实现Redis消息订阅和缓存队列(支持自动重连)

仅作参考。
编辑配置Nutz redis配置文件:

var ioc = {
        jedisConfig : {
            type : 'cn.xuetang.common.redis.JedisConfig',
            fields : {
                maxTotal : 200,
                maxIdle : 10,
                maxWaitMillis:10001,
                testOnBorrow:true,
                redisUrl:'127.0.0.1',
                redisPort:6379,
                redisTimeout:1000
            }
        }
}
package cn.xuetang.common.redis;

/**
 * Created by Wizzer on 14-4-8.
 */
public class JedisConfig {
    private int maxTotal;
    private int maxIdle;
    private int maxWaitMillis;
    private boolean testOnBorrow;
    private String redisUrl;
    private int redisPort;
    private int redisTimeout;//redis断开后自动重新连接间隔时间

    public int getMaxTotal() {
        return maxTotal;
    }

    public void setMaxTotal(int maxTotal) {
        this.maxTotal = maxTotal;
    }

    public int getMaxIdle() {
        return maxIdle;
    }

    public void setMaxIdle(int maxIdle) {
        this.maxIdle = maxIdle;
    }

    public int getMaxWaitMillis() {
        return maxWaitMillis;
    }

    public void setMaxWaitMillis(int maxWaitMillis) {
        this.maxWaitMillis = maxWaitMillis;
    }

    public boolean isTestOnBorrow() {
        return testOnBorrow;
    }

    public void setTestOnBorrow(boolean testOnBorrow) {
        this.testOnBorrow = testOnBorrow;
    }

    public String getRedisUrl() {
        return redisUrl;
    }

    public void setRedisUrl(String redisUrl) {
        this.redisUrl = redisUrl;
    }

    public int getRedisPort() {
        return redisPort;
    }

    public void setRedisPort(int redisPort) {
        this.redisPort = redisPort;
    }

    public int getRedisTimeout() {
        return redisTimeout;
    }

    public void setRedisTimeout(int redisTimeout) {
        this.redisTimeout = redisTimeout;
    }
}

Nutz入口类加载配置文件,config路径:

@Modules(scanPackage=true)
@Ok("raw")
@Fail("http:500")
@IocBy(type=ComboIocProvider.class,args={
	"*org.nutz.ioc.loader.json.JsonLoader","config",
	"*org.nutz.ioc.loader.annotation.AnnotationIocLoader","cn.xuetang"})
@SetupBy(value=StartSetup.class)
@UrlMappingBy(value=UrlMappingSet.class)
public class MainModule {
}

初始化配置参数,新建订阅消息处理线程:

    public static JedisConfig REDIS_CONFIG;
    //声明全局的redis连接池
    public static ShardedJedisPool SHARDEDJEDIS_POOL=null;
    public static JedisPool JEDIS_POOL=null;

    public static void InitRedisConfig() {//初始化redis
        REDIS_CONFIG = Mvcs.ctx.getDefaultIoc().get(JedisConfig.class);
        JedisPoolUtil jedisPoolUtil=new JedisPoolUtil();
        SHARDEDJEDIS_POOL = jedisPoolUtil.getShardedJedisPool();
        JEDIS_POOL = jedisPoolUtil.getJedisPool();
    }

    new Thread(Mvcs.getIoc().get(ImageTask.class)).start();

 
创建Jedis连接池工具类:

package cn.xuetang.common.redis;

import cn.xuetang.common.config.Globals;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedisPool;

import java.util.ArrayList;
import java.util.List;

/**
 * Created by Wizzer on 14-4-8.
 */
public class MyJedis {
    private String redisUrl;
    private int redisPort;
    public JedisPoolConfig jedisPoolConfig(){
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        jedisPoolConfig.setMaxTotal(Globals.REDIS_CONFIG.getMaxTotal());
        jedisPoolConfig.setMaxIdle(Globals.REDIS_CONFIG.getMaxIdle());
        jedisPoolConfig.setMaxWaitMillis(Globals.REDIS_CONFIG.getMaxWaitMillis());
        jedisPoolConfig.setTestOnBorrow(Globals.REDIS_CONFIG.isTestOnBorrow());
        return jedisPoolConfig;
    }
    public JedisShardInfo jedisShardInfo(){
        return new JedisShardInfo(redisUrl, redisPort);
    }
    public JedisPool jedisPool(){
        return new JedisPool(jedisPoolConfig(),redisUrl, redisPort);
    }
    public ShardedJedisPool shardedJedisPool(){
        this.redisUrl=Globals.REDIS_CONFIG.getRedisUrl();
        this.redisPort=Globals.REDIS_CONFIG.getRedisPort();
        List<JedisShardInfo> jedisList = new ArrayList<JedisShardInfo>();
        jedisList.add(jedisShardInfo());
        return new ShardedJedisPool(jedisPoolConfig(), jedisList);
    }

}

实现自己的监听类:

package cn.xuetang.common.redis;

import org.nutz.log.Log;
import org.nutz.log.Logs;
import redis.clients.jedis.JedisPubSub;

/**
 * 订阅监听类
 * Created by Wizzer on 14-4-8.
 */
public class MyJedisListenter extends JedisPubSub {
    private final static Log log = Logs.get();
    // 取得订阅的消息后的处理
    public void onMessage(String channel, String message) {
        log.info(channel + "=" + message);
    }

    // 初始化订阅时候的处理
    public void onSubscribe(String channel, int subscribedChannels) {
        log.info(channel + "=" + subscribedChannels);
    }

    // 取消订阅时候的处理
    public void onUnsubscribe(String channel, int subscribedChannels) {
        log.info(channel + "=" + subscribedChannels);
    }

    // 初始化按表达式的方式订阅时候的处理
    public void onPSubscribe(String pattern, int subscribedChannels) {
        log.info(pattern + "=" + subscribedChannels);
    }

    // 取消按表达式的方式订阅时候的处理
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        log.info(pattern + "=" + subscribedChannels);
    }

    // 取得按表达式的方式订阅的消息后的处理
    public void onPMessage(String pattern, String channel, String message) {
        log.info(pattern + "=" + channel + "=" + message);
    }
}

 
获取单例Jedis连接池:

package cn.xuetang.common.redis;

import cn.xuetang.common.config.Globals;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ShardedJedisPool;

import java.util.Date;

/**
 * Created by Wizzer on 14-4-8.
 */
public class JedisPoolUtil {
    private final static Log log = Logs.get();

    public synchronized ShardedJedisPool getShardedJedisPool() {
        if (Globals.SHARDEDJEDIS_POOL == null) {
            MyJedis myJedis=new MyJedis();
            Globals.SHARDEDJEDIS_POOL = myJedis.shardedJedisPool();
        }
        return Globals.SHARDEDJEDIS_POOL;
    }

    public synchronized JedisPool getJedisPool() {
        if (Globals.JEDIS_POOL== null) {
            MyJedis myJedis=new MyJedis();
            Globals.JEDIS_POOL = myJedis.jedisPool();
        }
        return Globals.JEDIS_POOL;
    }
}

数据入列同时发布订阅消息:

ShardedJedis shardedJedis=Globals.SHARDEDJEDIS_POOL.getResource();
                txt.put("appid", appInfo.getId());
                shardedJedis.lpush("image", Json.toJson(txt));
                Jedis jedis=Globals.JEDISPOOL.getResource();
                jedis.publish("newimage","true");

 

处理订阅消息的类:

 

package cn.xuetang.common.task;

import cn.xuetang.common.action.BaseAction;
import cn.xuetang.common.config.Globals;
import cn.xuetang.common.redis.MyJedisListenter;
import cn.xuetang.common.util.DateUtil;
import cn.xuetang.modules.baby.bean.Baby_image;
import cn.xuetang.modules.baby.bean.Baby_info;
import cn.xuetang.modules.user.bean.User_conn_wx;
import org.nutz.dao.Cnd;
import org.nutz.dao.Dao;
import org.nutz.ioc.loader.annotation.Inject;
import org.nutz.ioc.loader.annotation.IocBean;
import org.nutz.json.Json;
import org.nutz.lang.Strings;
import org.nutz.log.Log;
import org.nutz.log.Logs;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ShardedJedis;

import java.util.*;

/**
 * Created by Wizzer on 14-3-31.
 */
@IocBean
public class ImageTask extends BaseAction implements Runnable {
    @Inject
    protected Dao dao;
    private final static Log log = Logs.get();

    public void run() {
        try {
            log.info("ImageTask start!");
            final JedisPoolUtil jedisPoolUtil = new JedisPoolUtil();
            boolean isEnable = false;
            while (!isEnable) {
                try {
                    jedisPoolUtil.getShardedJedisPool().getResource();
                    isEnable = true;
                } catch (Exception e) {
                    log.info("The redis connection is not successful,wait " + Globals.REDIS_CONFIG.getRedisTimeout() + "ms try again.");

                }
                try {
                    wait(Globals.REDIS_CONFIG.getRedisTimeout());
                } catch (Exception e) {

                }
            }
            final ShardedJedis shardedJedis = jedisPoolUtil.getShardedJedisPool().getResource();
            final Jedis jedis = jedisPoolUtil.getJedisPool().getResource();
            MyJedisListenter listenter = new MyJedisListenter() {
                @Override
                public void onMessage(String channel, String message) {
                    //业务代码具体实现
                    while (shardedJedis.exists("image")) {
                        String data = shardedJedis.rpop("image");
                    }
                }
            };
            jedis.subscribe(listenter, "newimage");

        } catch (Exception e) {
            log.error(e);
        }

    }

}