`

dubbo

 
阅读更多

package com.winbons.registry.redis;

 

import com.alibaba.dubbo.common.URL;

import com.alibaba.dubbo.registry.Registry;

import com.alibaba.dubbo.registry.RegistryFactory;

 

public class RedisRegistryFactory implements RegistryFactory {

 

@Override

public Registry getRegistry(URL url) {

return new RedisRegistry(url);

}

}

-----------------------------------------------------------

package com.winbons.registry.redis;

 

import java.io.File;

import java.io.FileWriter;

import java.text.SimpleDateFormat;

import java.util.Arrays;

import java.util.Date;

import java.util.Map;

import java.util.Set;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ConcurrentMap;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.ScheduledFuture;

import java.util.concurrent.TimeUnit;

 

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

 

import com.alibaba.dubbo.common.Constants;

import com.alibaba.dubbo.common.extension.Activate;

import com.alibaba.dubbo.common.utils.ConcurrentHashSet;

import com.alibaba.dubbo.common.utils.ConfigUtils;

import com.alibaba.dubbo.common.utils.NamedThreadFactory;

import com.alibaba.dubbo.rpc.Filter;

import com.alibaba.dubbo.rpc.Invocation;

import com.alibaba.dubbo.rpc.Invoker;

import com.alibaba.dubbo.rpc.Result;

import com.alibaba.dubbo.rpc.RpcContext;

import com.alibaba.dubbo.rpc.RpcException;

 

@Activate(group = { Constants.CONSUMER, Constants.PROVIDER })

public class LogFilter implements Filter {

 

private static final Logger logger = LoggerFactory.getLogger(LogFilter.class);

 

private static final String FILE_DATE_FORMAT = "yyyyMMdd";

 

private static final String MESSAGE_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";

 

private static final int LOG_MAX_BUFFER = 5000;

 

private static final long LOG_OUTPUT_INTERVAL = 5000;

 

private final ConcurrentMap<String, Set<String>> logQueue = new ConcurrentHashMap<String, Set<String>>();

 

private final ScheduledExecutorService logScheduled = Executors.newScheduledThreadPool(2, new NamedThreadFactory(

"Dubbo-Access-Log", true));

 

private volatile ScheduledFuture<?> logFuture = null;

 

private class LogTask implements Runnable {

public void run() {

try {

if (logQueue != null && logQueue.size() > 0) {

for (Map.Entry<String, Set<String>> entry : logQueue.entrySet()) {

try {

String accesslog = entry.getKey();

Set<String> logSet = entry.getValue();

File file = new File(accesslog);

File dir = file.getParentFile();

if (null != dir && !dir.exists()) {

dir.mkdirs();

}

if (logger.isDebugEnabled()) {

logger.debug("Append log to " + accesslog);

}

if (file.exists()) {

String now = new SimpleDateFormat(FILE_DATE_FORMAT).format(new Date());

String last = new SimpleDateFormat(FILE_DATE_FORMAT).format(new Date(file

.lastModified()));

if (!now.equals(last)) {

File archive = new File(file.getAbsolutePath() + "." + last);

file.renameTo(archive);

}

}

FileWriter writer = new FileWriter(file, true);

try {

for (String msg : logSet) {

writer.write(msg);

writer.write("\r\n");

}

writer.flush();

} finally {

writer.close();

}

logSet.clear();

} catch (Exception e) {

logger.error(e.getMessage(), e);

}

}

}

} catch (Exception e) {

logger.error(e.getMessage(), e);

}

}

}

 

private void init() {

if (logFuture == null) {

synchronized (logScheduled) {

if (logFuture == null) {

logFuture = logScheduled.scheduleWithFixedDelay(new LogTask(), LOG_OUTPUT_INTERVAL,

LOG_OUTPUT_INTERVAL, TimeUnit.MILLISECONDS);

}

}

}

}

 

private void log(String accesslog, String logmessage) {

init();

Set<String> logSet = logQueue.get(accesslog);

if (logSet == null) {

logQueue.putIfAbsent(accesslog, new ConcurrentHashSet<String>());

logSet = logQueue.get(accesslog);

}

if (logSet.size() < LOG_MAX_BUFFER) {

logSet.add(logmessage);

}

}

 

public Result invoke(Invoker<?> invoker, Invocation inv) throws RpcException {

try {

String accesslog = invoker.getUrl().getParameter(Constants.ACCESS_LOG_KEY);

if (ConfigUtils.isNotEmpty(accesslog)) {

RpcContext context = RpcContext.getContext();

String serviceName = invoker.getInterface().getName();

String version = invoker.getUrl().getParameter(Constants.VERSION_KEY);

String group = invoker.getUrl().getParameter(Constants.GROUP_KEY);

StringBuilder sn = new StringBuilder();

sn.append("[").append(new SimpleDateFormat(MESSAGE_DATE_FORMAT).format(new Date())).append("] ")

.append(context.getRemoteHost()).append(":").append(context.getRemotePort()).append(" -> ")

.append(context.getLocalHost()).append(":").append(context.getLocalPort()).append(" - ");

if (null != group && group.length() > 0) {

sn.append(group).append("/");

}

sn.append(serviceName);

if (null != version && version.length() > 0) {

sn.append(":").append(version);

}

sn.append(" ");

sn.append(inv.getMethodName());

sn.append("(");

Class<?>[] types = inv.getParameterTypes();

if (types != null && types.length > 0) {

boolean first = true;

for (Class<?> type : types) {

if (first) {

first = false;

} else {

sn.append(",");

}

sn.append(type.getName());

}

}

sn.append(") ");

Object[] args = inv.getArguments();

if (args != null && args.length > 0) {

sn.append(Arrays.toString(args));

}

String msg = sn.toString();

log(accesslog, msg);

}

} catch (Throwable t) {

logger.warn("Exception in AcessLogFilter of service(" + invoker + " -> " + inv + ")", t);

}

return invoker.invoke(inv);

}

 

}

----------------------------------------------------------------------------------------------------

 * Copyright 1999-2012 Alibaba Group.

package com.winbons.registry.redis;

 

import java.util.ArrayList;

import java.util.Arrays;

import java.util.Collection;

import java.util.Date;

import java.util.HashMap;

import java.util.HashSet;

import java.util.List;

import java.util.Map;

import java.util.Random;

import java.util.Set;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.ConcurrentMap;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledExecutorService;

import java.util.concurrent.ScheduledFuture;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;

 

import redis.clients.jedis.Jedis;

import redis.clients.jedis.JedisPool;

import redis.clients.jedis.JedisPoolConfig;

import redis.clients.jedis.JedisPubSub;

 

import com.alibaba.dubbo.common.Constants;

import com.alibaba.dubbo.common.URL;

import com.alibaba.dubbo.common.logger.Logger;

import com.alibaba.dubbo.common.logger.LoggerFactory;

import com.alibaba.dubbo.common.utils.NamedThreadFactory;

import com.alibaba.dubbo.common.utils.UrlUtils;

import com.alibaba.dubbo.registry.NotifyListener;

import com.alibaba.dubbo.registry.support.FailbackRegistry;

import com.alibaba.dubbo.rpc.RpcException;

 

/**

 * RedisRegistry

 * 

 * @author william.liangf

 */

public class RedisRegistry extends FailbackRegistry {

 

    private static final Logger logger = LoggerFactory.getLogger(RedisRegistry.class);

 

    private static final int DEFAULT_REDIS_PORT = 6379;

 

    private final static String DEFAULT_ROOT = "dubbo";

 

    private final ScheduledExecutorService expireExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryExpireTimer", true));

 

    private final ScheduledFuture<?> expireFuture;

    

    private final String root;

 

    private final Map<String, JedisPool> jedisPools = new ConcurrentHashMap<String, JedisPool>();

 

    private final ConcurrentMap<String, Notifier> notifiers = new ConcurrentHashMap<String, Notifier>();

    

    private final int reconnectPeriod;

 

    private final int expirePeriod;

    

    private volatile boolean admin = false;

    

    private boolean replicate;

 

    public RedisRegistry(URL url) {

        super(url);

        if (url.isAnyHost()) {

    throw new IllegalStateException("registry address == null");

    }

        JedisPoolConfig config = new JedisPoolConfig();

        config.setTestOnBorrow(url.getParameter("test.on.borrow", true));

        config.setTestOnReturn(url.getParameter("test.on.return", false));

        config.setTestWhileIdle(url.getParameter("test.while.idle", false));

        if (url.getParameter("max.idle", 0) > 0)

            config.setMaxIdle(url.getParameter("max.idle", 0));

        if (url.getParameter("min.idle", 0) > 0)

            config.setMinIdle(url.getParameter("min.idle", 0));

        if (url.getParameter("max.active", 0) > 0)

//            config.set = url.getParameter("max.active", 0);

        if (url.getParameter("max.wait", url.getParameter("timeout", 0)) > 0)

            config.setMaxWaitMillis(url.getParameter("max.wait", url.getParameter("timeout", 0)));

        if (url.getParameter("num.tests.per.eviction.run", 0) > 0)

            config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));

        if (url.getParameter("time.between.eviction.runs.millis", 0) > 0)

            config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));

        if (url.getParameter("min.evictable.idle.time.millis", 0) > 0)

            config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));

        String cluster = url.getParameter("cluster", "failover");

        if (! "failover".equals(cluster) && ! "replicate".equals(cluster)) {

        throw new IllegalArgumentException("Unsupported redis cluster: " + cluster + ". The redis cluster only supported failover or replicate.");

        }

        replicate = "replicate".equals(cluster);

        

        List<String> addresses = new ArrayList<String>();

        addresses.add(url.getAddress());

        String[] backups = url.getParameter(Constants.BACKUP_KEY, new String[0]);

        if (backups != null && backups.length > 0) {

            addresses.addAll(Arrays.asList(backups));

        }

        for (String address : addresses) {

            int i = address.indexOf(':');

            String host;

            int port;

            if (i > 0) {

                host = address.substring(0, i);

                port = Integer.parseInt(address.substring(i + 1));

            } else {

                host = address;

                port = DEFAULT_REDIS_PORT;

            }

            this.jedisPools.put(address, new JedisPool(config, host, port,  url.getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT)));

        }

        

        this.reconnectPeriod = url.getParameter(Constants.REGISTRY_RECONNECT_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RECONNECT_PERIOD);

        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);

        if (! group.startsWith(Constants.PATH_SEPARATOR)) {

            group = Constants.PATH_SEPARATOR + group;

        }

        if (! group.endsWith(Constants.PATH_SEPARATOR)) {

            group = group + Constants.PATH_SEPARATOR;

        }

        this.root = group;

        

        this.expirePeriod = url.getParameter(Constants.SESSION_TIMEOUT_KEY, Constants.DEFAULT_SESSION_TIMEOUT);

        this.expireFuture = expireExecutor.scheduleWithFixedDelay(new Runnable() {

        @Override

            public void run() {

                try {

                    deferExpired(); // 延长过期时间

                } catch (Throwable t) { // 防御性容

                    logger.error("Unexpected exception occur at defer expire time, cause: " + t.getMessage(), t);

                }

            }

        }, expirePeriod / 2, expirePeriod / 2, TimeUnit.MILLISECONDS);

    }

    

    private void deferExpired() {

        for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {

            JedisPool jedisPool = entry.getValue();

            try {

                Jedis jedis = jedisPool.getResource();

                try {

                    for (URL url : new HashSet<URL>(getRegistered())) {

                        if (url.getParameter(Constants.DYNAMIC_KEY, true)) {

                            String key = toCategoryPath(url);

                            if (jedis.hset(key, url.toFullString(), String.valueOf(System.currentTimeMillis() + expirePeriod)) == 1) {

                                jedis.publish(key, Constants.REGISTER);

                            }

                        }

                    }

                    if (admin) {

                        clean(jedis);

                    }

                    if (! replicate) {

                    break;//  如果服务器端已同步数据,只需写入单台机器

                    }

                } finally {

                    jedisPool.returnResource(jedis);

                }

            } catch (Throwable t) {

                logger.warn("Failed to write provider heartbeat to redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);

            }

        }

    }

    

    // 监控中心负责删除过期脏数�?

    private void clean(Jedis jedis) {

        Set<String> keys = jedis.keys(root + Constants.ANY_VALUE);

        if (keys != null && keys.size() > 0) {

            for (String key : keys) {

                Map<String, String> values = jedis.hgetAll(key);

                if (values != null && values.size() > 0) {

                    boolean delete = false;

                    long now = System.currentTimeMillis();

                    for (Map.Entry<String, String> entry : values.entrySet()) {

                        URL url = URL.valueOf(entry.getKey());

                        if (url.getParameter(Constants.DYNAMIC_KEY, true)) {

                            long expire = Long.parseLong(entry.getValue());

                            if (expire < now) {

                                jedis.hdel(key, entry.getKey());

                                delete = true;

                                if (logger.isWarnEnabled()) {

                                    logger.warn("Delete expired key: " + key + " -> value: " + entry.getKey() + ", expire: " + new Date(expire) + ", now: " + new Date(now));

                                }

                            }

                            }

                    }

                    if (delete) {

                        jedis.publish(key, Constants.UNREGISTER);

                    }

                }

            }

        }

    }

 

    public boolean isAvailable() {

        for (JedisPool jedisPool : jedisPools.values()) {

            try {

                Jedis jedis = jedisPool.getResource();

                try {

                if (jedis.isConnected()) {

                        return true; // 至少�?��台机器可�?

                    }

                } finally {

                    jedisPool.returnResource(jedis);

                }

            } catch (Throwable t) {

            }

        }

        return false;

    }

 

    @Override

    public void destroy() {

        super.destroy();

        try {

            expireFuture.cancel(true);

        } catch (Throwable t) {

            logger.warn(t.getMessage(), t);

        }

        try {

            for (Notifier notifier : notifiers.values()) {

                notifier.shutdown();

            }

        } catch (Throwable t) {

            logger.warn(t.getMessage(), t);

        }

        for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {

            JedisPool jedisPool = entry.getValue();

            try {

                jedisPool.destroy();

            } catch (Throwable t) {

                logger.warn("Failed to destroy the redis registry client. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);

            }

        }

    }

 

    @Override

    public void doRegister(URL url) {

        String key = toCategoryPath(url);

        String value = url.toFullString();

        String expire = String.valueOf(System.currentTimeMillis() + expirePeriod);

        boolean success = false;

        RpcException exception = null;

        for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {

            JedisPool jedisPool = entry.getValue();

            try {

                Jedis jedis = jedisPool.getResource();

                try {

                    jedis.hset(key, value, expire);

                    jedis.publish(key, Constants.REGISTER);

                    success = true;

                    if (! replicate) {

                    break; //  如果服务器端已同步数据,只需写入单台机器

                    }

                } finally {

                    jedisPool.returnResource(jedis);

                }

            } catch (Throwable t) {

                exception = new RpcException("Failed to register service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);

            }

        }

        if (exception != null) {

            if (success) {

                logger.warn(exception.getMessage(), exception);

            } else {

                throw exception;

            }

        }

    }

 

    @Override

    public void doUnregister(URL url) {

        String key = toCategoryPath(url);

        String value = url.toFullString();

        RpcException exception = null;

        boolean success = false;

        for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {

            JedisPool jedisPool = entry.getValue();

            try {

                Jedis jedis = jedisPool.getResource();

                try {

                    jedis.hdel(key, value);

                    jedis.publish(key, Constants.UNREGISTER);

                    success = true;

                    if (! replicate) {

                    break; //  如果服务器端已同步数据,只需写入单台机器

                    }

                } finally {

                    jedisPool.returnResource(jedis);

                }

            } catch (Throwable t) {

                exception = new RpcException("Failed to unregister service to redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);

            }

        }

        if (exception != null) {

            if (success) {

                logger.warn(exception.getMessage(), exception);

            } else {

                throw exception;

            }

        }

    }

    

    @Override

    public void doSubscribe(final URL url, final NotifyListener listener) {

        String service = toServicePath(url);

        Notifier notifier = notifiers.get(service);

        if (notifier == null) {

            Notifier newNotifier = new Notifier(service);

            notifiers.putIfAbsent(service, newNotifier);

            notifier = notifiers.get(service);

            if (notifier == newNotifier) {

                notifier.start();

            }

        }

        boolean success = false;

        RpcException exception = null;

        for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {

            JedisPool jedisPool = entry.getValue();

            try {

                Jedis jedis = jedisPool.getResource();

                try {

                    if (service.endsWith(Constants.ANY_VALUE)) {

                        admin = true;

                        Set<String> keys = jedis.keys(service);

                        if (keys != null && keys.size() > 0) {

                            Map<String, Set<String>> serviceKeys = new HashMap<String, Set<String>>();

                            for (String key : keys) {

                                String serviceKey = toServicePath(key);

                                Set<String> sk = serviceKeys.get(serviceKey);

                                if (sk == null) {

                                    sk = new HashSet<String>();

                                    serviceKeys.put(serviceKey, sk);

                                }

                                sk.add(key);

                            }

                            for (Set<String> sk : serviceKeys.values()) {

                                doNotify(jedis, sk, url, Arrays.asList(listener));

                            }

                        }

                    } else {

                        doNotify(jedis, jedis.keys(service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE), url, Arrays.asList(listener));

                    }

                    success = true;

                    break; 

                } finally {

                    jedisPool.returnResource(jedis);

                }

            } catch(Throwable t) {

                exception = new RpcException("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", service: " + url + ", cause: " + t.getMessage(), t);

            }

        }

        if (exception != null) {

            if (success) {

                logger.warn(exception.getMessage(), exception);

            } else {

                throw exception;

            }

        }

    }

 

    @Override

    public void doUnsubscribe(URL url, NotifyListener listener) {

    }

 

    private void doNotify(Jedis jedis, String key) {

        for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(getSubscribed()).entrySet()) {

            doNotify(jedis, Arrays.asList(key), entry.getKey(), new HashSet<NotifyListener>(entry.getValue()));

        }

    }

 

    private void doNotify(Jedis jedis, Collection<String> keys, URL url, Collection<NotifyListener> listeners) {

        if (keys == null || keys.size() == 0

                || listeners == null || listeners.size() == 0) {

            return;

        }

        long now = System.currentTimeMillis();

        List<URL> result = new ArrayList<URL>();

        List<String> categories = Arrays.asList(url.getParameter(Constants.CATEGORY_KEY, new String[0]));

        String consumerService = url.getServiceInterface();

        for (String key : keys) {

            if (! Constants.ANY_VALUE.equals(consumerService)) {

                String prvoiderService = toServiceName(key);

                if (! prvoiderService.equals(consumerService)) {

                    continue;

                }

            }

            String category = toCategoryName(key);

            if (! categories.contains(Constants.ANY_VALUE) && ! categories.contains(category)) {

                continue;

            }

            List<URL> urls = new ArrayList<URL>();

            Map<String, String> values = jedis.hgetAll(key);

            if (values != null && values.size() > 0) {

                for (Map.Entry<String, String> entry : values.entrySet()) {

                    URL u = URL.valueOf(entry.getKey());

                    if (! u.getParameter(Constants.DYNAMIC_KEY, true)

                            || Long.parseLong(entry.getValue()) >= now) {

                        if (UrlUtils.isMatch(url, u)) {

                            urls.add(u);

                        }

                    }

                }

            }

            if (urls.isEmpty()) {

                urls.add(url.setProtocol(Constants.EMPTY_PROTOCOL)

                        .setAddress(Constants.ANYHOST_VALUE)

                        .setPath(toServiceName(key))

                        .addParameter(Constants.CATEGORY_KEY, category));

            }

            result.addAll(urls);

            if (logger.isInfoEnabled()) {

                logger.info("redis notify: " + key + " = " + urls);

            }

        }

        if (result == null || result.size() == 0) {

            return;

        }

        for (NotifyListener listener : listeners) {

            notify(url, listener, result);

        }

    }

 

    private String toServiceName(String categoryPath) {

        String servicePath = toServicePath(categoryPath);

        return servicePath.startsWith(root) ? servicePath.substring(root.length()) : servicePath;

    }

 

    private String toCategoryName(String categoryPath) {

        int i = categoryPath.lastIndexOf(Constants.PATH_SEPARATOR);

        return i > 0 ? categoryPath.substring(i + 1) : categoryPath;

    }

 

    private String toServicePath(String categoryPath) {

        int i;

        if (categoryPath.startsWith(root)) {

            i = categoryPath.indexOf(Constants.PATH_SEPARATOR, root.length());

        } else {

            i = categoryPath.indexOf(Constants.PATH_SEPARATOR);

        }

        return i > 0 ? categoryPath.substring(0, i) : categoryPath;

    }

 

    private String toServicePath(URL url) {

        return root + url.getServiceInterface();

    }

 

    private String toCategoryPath(URL url) {

        return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);

    }

 

    private class NotifySub extends JedisPubSub {

        

        private final JedisPool jedisPool;

 

        public NotifySub(JedisPool jedisPool) {

            this.jedisPool = jedisPool;

        }

 

        @Override

        public void onMessage(String key, String msg) {

            if (logger.isInfoEnabled()) {

                logger.info("redis event: " + key + " = " + msg);

            }

            if (msg.equals(Constants.REGISTER) 

                    || msg.equals(Constants.UNREGISTER)) {

                try {

                    Jedis jedis = jedisPool.getResource();

                    try {

                        doNotify(jedis, key);

                    } finally {

                        jedisPool.returnResource(jedis);

                    }

                } catch (Throwable t) { // TODO 通知失败没有恢复机制保障

                    logger.error(t.getMessage(), t);

                }

            }

        }

 

        @Override

        public void onPMessage(String pattern, String key, String msg) {

            onMessage(key, msg);

        }

 

        @Override

        public void onSubscribe(String key, int num) {

        }

 

        @Override

        public void onPSubscribe(String pattern, int num) {

        }

 

        @Override

        public void onUnsubscribe(String key, int num) {

        }

 

        @Override

        public void onPUnsubscribe(String pattern, int num) {

        }

 

    }

 

    private class Notifier extends Thread {

 

        private final String service;

 

        private volatile Jedis jedis;

 

        private volatile boolean first = true;

        

        private volatile boolean running = true;

        

        private final AtomicInteger connectSkip = new AtomicInteger();

 

        private final AtomicInteger connectSkiped = new AtomicInteger();

 

        private final Random random = new Random();

        

        private volatile int connectRandom;

 

        private void resetSkip() {

            connectSkip.set(0);

            connectSkiped.set(0);

            connectRandom = 0;

        }

        

        private boolean isSkip() {

            int skip = connectSkip.get(); // 跳过次数增长

            if (skip >= 10) { // 如果跳过次数增长超过10,取随机�?

                if (connectRandom == 0) {

                    connectRandom = random.nextInt(10);

                }

                skip = 10 + connectRandom;

            }

            if (connectSkiped.getAndIncrement() < skip) { // �?��跳过次数

                return true;

            }

            connectSkip.incrementAndGet();

            connectSkiped.set(0);

            connectRandom = 0;

            return false;

        }

        

        public Notifier(String service) {

            super.setDaemon(true);

            super.setName("DubboRedisSubscribe");

            this.service = service;

        }

        

        @Override

        public void run() {

            while (running) {

                try {

                    if (! isSkip()) {

                        try {

                            for (Map.Entry<String, JedisPool> entry : jedisPools.entrySet()) {

                                JedisPool jedisPool = entry.getValue();

                                try {

                                    jedis = jedisPool.getResource();

                                    try {

                                        if (service.endsWith(Constants.ANY_VALUE)) {

                                            if (! first) {

                                                first = false;

                                                Set<String> keys = jedis.keys(service);

                                                if (keys != null && keys.size() > 0) {

                                                    for (String s : keys) {

                                                        doNotify(jedis, s);

                                                    }

                                                }

                                                resetSkip();

                                            }

                                            jedis.psubscribe(new NotifySub(jedisPool), service); // 阻塞

                                        } else {

                                            if (! first) {

                                                first = false;

                                                doNotify(jedis, service);

                                                resetSkip();

                                            }

                                            jedis.psubscribe(new NotifySub(jedisPool), service + Constants.PATH_SEPARATOR + Constants.ANY_VALUE); // 阻塞

                                        }

                                        break;

                                    } finally {

                                        jedisPool.returnBrokenResource(jedis);

                                    }

                                } catch (Throwable t) { // 重试另一�?

                                    logger.warn("Failed to subscribe service from redis registry. registry: " + entry.getKey() + ", cause: " + t.getMessage(), t);

                                }

                            }

                        } catch (Throwable t) {

                            logger.error(t.getMessage(), t);

                            sleep(reconnectPeriod);

                        }

                    }

                } catch (Throwable t) {

                    logger.error(t.getMessage(), t);

                }

            }

        }

        

        public void shutdown() {

            try {

                running = false;

                jedis.disconnect();

            } catch (Throwable t) {

                logger.warn(t.getMessage(), t);

            }

        }

        

    }

 

}

 

 

分享到:
评论

相关推荐

    dubbo示例代码dubbo-sample

    【Dubbo 示例代码详解】 Dubbo 是阿里巴巴开源的一款高性能、轻量级的Java服务治理框架,它主要提供了RPC(远程过程调用)服务,并且包含了服务注册与发现、负载均衡、容错处理、监控等全面的服务治理功能。本示例...

    Dubbo 2.5.3 jar包

    Dubbo 2.5.3 全部jar包下载 [INFO] dubbo-parent ...................................... SUCCESS [1.042s] [INFO] Hessian Lite(Alibaba embed version) ............... SUCCESS [4.438s] [INFO] dubbo-common .....

    incubator-dubbo-dubbo-2.6.1

    【标题】"incubator-dubbo-dubbo-2.6.1" 是一个Apache Incubator项目Dubbo的特定版本,这里的2.6.1表示该版本是Dubbo的稳定分支之一。 【描述】提到的"incubator-dubbo-dubbo-2.6.1"表明这是Apache孵化器中的Dubbo...

    Dubbo入门_实战

    ### Dubbo入门实战详解 #### 一、Dubbo概述与应用场景 ##### 1.1 什么是Dubbo? Dubbo是一款由阿里巴巴开发的分布式服务框架,它致力于提供高性能和透明化的RPC远程服务调用方案。该框架是阿里巴巴SOA服务化治理...

    dubbo资源 dubbo-admin dubbo demo

    【标题】"dubbo资源 dubbo-admin dubbo demo" 提供的是关于Apache Dubbo的相关素材,主要包括了Dubbo-admin的管理和示例项目。Dubbo是一个高性能、轻量级的开源Java RPC框架,它提供了丰富的服务治理功能,是阿里...

    dubbo-demo-consumer、dubbo-demo-provider、dubbo-simple-monitor

    《Dubbo实战:消费者、提供者与简单监控》 Dubbo是阿里巴巴开源的一款高性能、轻量级的服务治理框架,广泛应用于分布式系统中的服务调用。本篇将详细讲解基于dubbo-demo-consumer、dubbo-demo-provider和dubbo-...

    dubbo-admin包

    【标题】"dubbo-admin包"是Dubbo框架的一个重要组成部分,主要用作服务治理的管理界面。这个压缩包包含了运行Dubbo管理控制台所需的所有文件,使得开发者和运维人员可以方便地监控、管理和配置Dubbo服务。 【描述】...

    dubbo源码分析系列

    《Dubbo源码分析系列》是一份深入探讨Java开源框架Dubbo核心原理和技术细节的资料。Dubbo,作为阿里巴巴的一款高性能、轻量级的服务治理框架,它为分布式系统提供了服务发现、调用、负载均衡、容错等关键功能。这份...

    dubbo admin jdk1.8

    【标题】"dubbo admin jdk1.8" 指的是使用Java开发工具包(JDK)1.8版本运行的Dubbo管理控制台。Dubbo是阿里巴巴开源的一个高性能、轻量级的服务治理框架,它提供了服务注册、服务发现、调用监控等功能。在JDK1.8...

    dubbo-admin-2.5.4及dubbo-monitor-2.5.3 安装及配置

    本人实际测试过,这两个包可用。...2.修改dubbo-monitor中的conf目录中的dubbo.properties dubbo.registry.address 与 dubbo-admin中的配置一样 3.到dubbo-monitor中的bin目录下运行 start.sh脚本 ok

    dubbo视频教程|基于Dubbo的分布式系统架构实战

    Dubbo是阿里巴巴开源的分布式服务化治理框架(微服务框架),久经阿里巴巴电商平台的大规模复杂业务的高并发考验,到目前为止Dubbo仍然是开源界中体系最完善的服务化治理框架,因此Dubbo被国内大量的的互联网公司和...

    尚硅谷最新dubbo视频

    本套视频从分布式系统的基本概念出发,由浅入深,讲解了RPC原理,Dubbo基本使用,Dubbo高可用场景以及Dubbo原理,涉及了分布式系统中服务注册、服务发现、负载均衡、灰度发布、集群容错、服务降级等核心概念的讲解及...

    dubbo2.5.6.zip

    《Dubbo 2.5.6与Java 1.8的兼容性问题解析》 Dubbo,作为阿里巴巴开源的一款高性能、轻量级的服务治理框架,广泛应用于分布式系统中。而Java,作为服务端开发的基石,其版本选择直接影响到框架的运行效果。本篇文章...

    dubbo xsd的支持

    在IT行业中,Dubbo是一个非常知名的Java开源框架,主要用于实现分布式服务治理。它由阿里巴巴开发并维护,旨在提供高性能、轻量级的服务间调用方案。"dubbo.xsd"文件是Dubbo框架中用于XML配置文件解析的重要组成部分...

    dubbo捕获自定义异常_dubbo异常捕获_dubbo异常_自定义异常_捕捉异常_

    在分布式服务框架 Dubbo 中,异常处理是必不可少的一部分。Dubbo 提供了强大的异常处理机制,使得服务提供者能够向消费者传递自定义异常,从而帮助消费者更好地理解和处理服务调用中的错误情况。本文将深入探讨如何...

    Dubbo高级视频教程

    ### Dubbo高级视频教程知识点概览 #### 一、Dubbo概述与分布式系统基础 - **Dubbo简介**:Dubbo是一款高性能、轻量级的开源Java RPC框架,旨在为服务治理提供简单、全面的解决方案。 - **分布式系统概念**:分布式...

    dubbo入门例子程序

    【Dubbo入门例子程序】是针对初学者设计的一个简单示例,旨在帮助理解并快速上手Apache Dubbo这一高性能、轻量级的Java远程服务框架。这个例子通过一个"Hello, World!"的应用来演示Dubbo的基本用法,采用Maven作为...

    dubbo 对外提供和使用接口方法

    ### dubbo对外提供和使用接口方法 #### 一、Dubbo简介 Dubbo是一个高性能、轻量级的开源服务框架,旨在提供高性能和透明化的RPC远程服务调用方案,是微服务架构的重要组成部分之一。它提供了面向接口代理的高性能...

    Dubbo监控系统配置

    ### Dubbo监控系统配置详解 #### 一、Dubbo监控系统概述 Dubbo是一款高性能、轻量级的开源服务框架,旨在提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案。Dubbo提供了包括服务自动注册与发现、...

    dubbo接口测试调试工具

    前段时间排查某问题的时候,想要快速知道某些dubbo接口(三无)的响应结果,但不想启动项目(因为这些项目不是你负责的,不会部署而且超级笨重),也不想新建一个dubbo客户端项目(占地方),也不想开telnet客户端...

Global site tag (gtag.js) - Google Analytics