`
hbxflihua
  • 浏览: 686836 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

DelayQueue实现支付系统异步通知

阅读更多

支付系统的异步通知实现可以参考支付宝的异步通知,每个订单的异步通知实行分频率发送:10m 20m 30m 40m 50m 1h,具体异步通知频率可根据业务需求做相应调整。本期,笔者将通过java的DelayQueue来实现支付系统的异步通知功能。

 

支付系统异步通知的需求:

1、需要按照既有频率发送异步通知给调用方;

2、回调成功则停止异步通知;

3、回调失败,先判断是否超出既定频次,超出则停止发送,否则按照既有频率继续发送异步通知;

 

下面就通过DelayQueue来实现支付系统异步通知

package com.huatech.common.delay;

/**
 * delay键值对
 * @author lh
 * @version 2.0
 * @since 2017-06-23
 *
 * @param <K>
 * @param <V>
 */
public class Pair<K, V> {
	
    private K first;

    private V second;
    
    public Pair() {}
    
    public Pair(K first, V second) {
        this.first = first;
        this.second = second;
    }

	public K getFirst() {
		return first;
	}

	public void setFirst(K first) {
		this.first = first;
	}

	public V getSecond() {
		return second;
	}

	public void setSecond(V second) {
		this.second = second;
	}
    
    
}

 

package com.huatech.common.delay;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Delayed接口的实现类
 * 
 * <p> 内部实现了getDelay()和compareTo()方法,分别用来获取延迟时间和按两个任务的延迟时间进行排序</p>
 * 
 * @author lh
 * @version 2.0
 * @since 2017-06-23
 *
 * @param <T>
 */
public class DelayItem<T> implements Delayed {

	private static final long NANO_ORIGIN = System.nanoTime();

    final static long now() {
        return System.nanoTime() - NANO_ORIGIN;
    }

    private static final AtomicLong sequencer = new AtomicLong(0);

    private final long sequenceNumber;

    private final long time;

    private final T item;

    public DelayItem(T submit, long timeout) {
        this.time = now() + timeout;
        this.item = submit;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    public T getItem() {
        return this.item;
    }

    public long getDelay(TimeUnit unit) {
        long d = unit.convert(time - now(), TimeUnit.NANOSECONDS);
        return d;
    }

    public int compareTo(Delayed other) {
        if (other == this) 
            return 0;
        if (other instanceof DelayItem) {
            DelayItem<?> x = (DelayItem<?>) other;
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS));
        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
    }
}

 

package com.huatech.common.delay;

/**
 * 响应报文
 * @author lh
 * @version 2.0
 * @since 2017-06-23
 *
 */
public class RetMessage {

	/**
	 * 回调地址
	 */
	private String url;
	/**
	 * 报文
	 */
	private String reqData;
	/**
	 * 已重试次数
	 */
	private int times;
	/**
	 * 是否成功
	 */
	private boolean success;
	
	
	public RetMessage(String url, String reqData) {
		super();
		this.url = url;
		this.reqData = reqData;
		this.times = 1;
		this.success = false;
	}
	
	public RetMessage(String url, String reqData, int times, boolean success) {
		super();
		this.url = url;
		this.reqData = reqData;
		this.times = times;
		this.success = success;
	}

	public String getUrl() {
		return url;
	}
	public void setUrl(String url) {
		this.url = url;
	}
	public String getReqData() {
		return reqData;
	}
	public void setReqData(String reqData) {
		this.reqData = reqData;
	}
	public int getTimes() {
		return times;
	}
	public void setTimes(int times) {
		this.times = times;
	}
	public boolean isSuccess() {
		return success;
	}
	public void setSuccess(boolean success) {
		this.success = success;
	}

}

 

package com.huatech.common.delay;

import java.util.HashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.LoggerFactory;

import com.huatech.common.util.HttpsUtil;

/**
 * 延迟队列发送通知
 * <p>
 * 说明:一共5次 第一次不成功等待10分钟,第二次20分钟...50分钟
 * </p>
 * 
 * @author lh
 * @version 2.0
 * @since 2017-06-23
 *
 */
public class Task {

	private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(Task.class);

	//时间单元:十分钟
	private static final long TIME_UNIT = 10;
	//返回结果
	private static final String RES_SUCCESS = "success";
	
	private static Task instance = new Task();

	public static Task getInstance() {
		return instance;
	}

	// DelayQueue队列没有大小限制,因此向队列插数据不会阻塞
	// DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。否则线程阻塞
	private static DelayQueue<DelayItem<Pair<String, RetMessage>>> queue = new DelayQueue<DelayItem<Pair<String, RetMessage>>>();

	private Thread taskThread;

	private Task() {
		taskThread = new Thread(new Runnable() {
			public void run() {
				execute();
			}
		});
		taskThread.setName("Task Thread");
		taskThread.start();
	}

	private void execute() {
		for (;;) {
			try {
				DelayItem<Pair<String, RetMessage>> delayItem = queue.take();
				if (delayItem != null) {
					// 到期处理
					Pair<String, RetMessage> pair = delayItem.getItem();
					RetMessage msg = pair.getSecond();
					if (!msg.isSuccess() && msg.getTimes() <= 5) {
						HashMap<String, String> paramMap = new HashMap<String, String>();
						paramMap.put("reqData", msg.getReqData());
						try {
							String httpResult = HttpsUtil.getInstance().doPostRetString(msg.getUrl(), null, paramMap);

							LOGGER.info("第{}次异步回调,返回结果{},返回参数:{},响应结果:{}", msg.getTimes(), httpResult,
									paramMap.get("reqData"), RES_SUCCESS.equals(httpResult));
							if (!RES_SUCCESS.equals(httpResult)) {
								msg.setTimes(msg.getTimes() + 1);
								msg.setSuccess(false);
								Task.getInstance().put(pair.getFirst(), msg);
							}
							// TODO 如果需要入库,请在此操作

						} catch (Exception e) {
							LOGGER.warn(e.getMessage(), e);
						}
					}

				}
			} catch (InterruptedException e) {
				LOGGER.warn(e.getMessage(), e);
				break;
			}
		}
	}

	/**
	 * 添加通知对象
	 * 
	 * @param key
	 *            唯一性key值,建议为:merchantNo + orderNo
	 * @param msg
	 *            响应报文
	 */
	public void put(String key, RetMessage msg) {
		if (queue.contains(key)) {
			queue.remove(key);
		}

		long nanoTime = TIME_UNIT + TimeUnit.NANOSECONDS.convert((msg.getTimes() -1) * TIME_UNIT, TimeUnit.MINUTES);
		queue.put(new DelayItem<Pair<String, RetMessage>>(new Pair<String, RetMessage>(key, msg), nanoTime));
	}

	public static void main(String[] args) throws Exception {
		String orderNo = System.currentTimeMillis()+"";
		RetMessage msg = new RetMessage("www.baidu.com", "a=1&b=2");
		Task.getInstance().put(orderNo, msg);
	}

}

 

  里面用到了HttpsUtil工具类,需要先引入httpclient所需的jar

			<dependency>
				<groupId>org.apache.httpcomponents</groupId>
				<artifactId>httpclient</artifactId>
				<version>4.4.1</version>
			</dependency>
			
			<dependency>
				<groupId>org.apache.httpcomponents</groupId>
				<artifactId>httpmime</artifactId>
				<version>4.4.1</version>
			</dependency>

 

package com.huatech.common.util;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;

import javax.net.ssl.SSLContext;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpException;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.config.ConnectionConfig;
import org.apache.http.config.Registry;
import org.apache.http.config.RegistryBuilder;
import org.apache.http.config.SocketConfig;
import org.apache.http.conn.socket.ConnectionSocketFactory;
import org.apache.http.conn.socket.LayeredConnectionSocketFactory;
import org.apache.http.conn.socket.PlainConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
import org.apache.http.conn.ssl.SSLContexts;
import org.apache.http.conn.ssl.TrustStrategy;
import org.apache.http.cookie.Cookie;
import org.apache.http.cookie.CookieOrigin;
import org.apache.http.entity.mime.FormBodyPart;
import org.apache.http.entity.mime.HttpMultipartMode;
import org.apache.http.entity.mime.MultipartEntityBuilder;
import org.apache.http.impl.client.BasicCookieStore;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.impl.cookie.BasicClientCookie;
import org.apache.http.impl.cookie.BestMatchSpec;
import org.apache.http.message.BasicNameValuePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class AnyTrustStrategy implements TrustStrategy {
	@Override
	public boolean isTrusted(X509Certificate[] chain, String authType) throws CertificateException {
		return true;
	}

}

public class HttpsUtil {
	private static final Logger logger = LoggerFactory.getLogger(HttpsUtil.class);

	private static final Log log = LogFactory.getLog(HttpsUtil.class);

	private static int bufferSize = 1024;

	private static final int CONNECT_TIMEOUT = 6 * 1000;
	private static final int REQUEST_TIMEOUT = 3 * 1000;

	private static volatile HttpsUtil instance;

	private ConnectionConfig connConfig;

	private SocketConfig socketConfig;

	private ConnectionSocketFactory plainSF;

	private KeyStore trustStore;

	private SSLContext sslContext;

	private LayeredConnectionSocketFactory sslSF;

	private Registry<ConnectionSocketFactory> registry;

	private PoolingHttpClientConnectionManager connManager;

	private volatile HttpClient client;

	private volatile BasicCookieStore cookieStore;

	public static String defaultEncoding = "utf-8";

	private static List<NameValuePair> paramsConverter(Map<String, String> params) {
		List<NameValuePair> nvps = new LinkedList<NameValuePair>();
		Set<Entry<String, String>> paramsSet = params.entrySet();
		for (Entry<String, String> paramEntry : paramsSet) {
			nvps.add(new BasicNameValuePair(paramEntry.getKey(), paramEntry.getValue()));
		}
		return nvps;
	}

	public static String readStream(InputStream in, String encoding) {
		if (in == null) {
			return null;
		}
		try {
			InputStreamReader inReader = null;
			if (encoding == null) {
				inReader = new InputStreamReader(in, defaultEncoding);
			} else {
				inReader = new InputStreamReader(in, encoding);
			}
			char[] buffer = new char[bufferSize];
			int readLen = 0;
			StringBuffer sb = new StringBuffer();
			while ((readLen = inReader.read(buffer)) != -1) {
				sb.append(buffer, 0, readLen);
			}
			inReader.close();
			return sb.toString();
		} catch (IOException e) {
			log.error("读取返回内容出错", e);
		}
		return null;
	}

	private HttpsUtil() {
		// 设置连接参数
		connConfig = ConnectionConfig.custom().setCharset(Charset.forName(defaultEncoding)).build();
		socketConfig = SocketConfig.custom().setSoTimeout(100000).build();
		RegistryBuilder<ConnectionSocketFactory> registryBuilder = RegistryBuilder.<ConnectionSocketFactory>create();
		plainSF = new PlainConnectionSocketFactory();
		registryBuilder.register("http", plainSF);
		// 指定信任密钥存储对象和连接套接字工厂
		try {
			trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
			sslContext = SSLContexts.custom().useTLS().loadTrustMaterial(trustStore, new AnyTrustStrategy()).build();
			sslSF = new SSLConnectionSocketFactory(sslContext, SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
			registryBuilder.register("https", sslSF);
		} catch (KeyStoreException e) {
			throw new RuntimeException(e);
		} catch (KeyManagementException e) {
			throw new RuntimeException(e);
		} catch (NoSuchAlgorithmException e) {
			throw new RuntimeException(e);
		}
		registry = registryBuilder.build();
		// 设置连接管理器
		connManager = new PoolingHttpClientConnectionManager(registry);
		connManager.setDefaultConnectionConfig(connConfig);
		connManager.setDefaultSocketConfig(socketConfig);
		// 指定cookie存储对象
		cookieStore = new BasicCookieStore();
		// 构建客户端
		client = HttpClientBuilder.create().setDefaultCookieStore(cookieStore).setConnectionManager(connManager)
				.build();
	}

	public static HttpsUtil getInstance() {
		synchronized (HttpsUtil.class) {
			if (HttpsUtil.instance == null) {
				instance = new HttpsUtil();
			}
			return instance;
		}
	}

	public InputStream doGet(String url) throws URISyntaxException, ClientProtocolException, IOException {
		HttpResponse response = this.doGet(url, null);
		return response != null ? response.getEntity().getContent() : null;
	}

	public String doGetForString(String url) throws URISyntaxException, ClientProtocolException, IOException {
		return HttpsUtil.readStream(this.doGet(url), null);
	}

	public InputStream doGetForStream(String url, Map<String, String> queryParams)
			throws URISyntaxException, ClientProtocolException, IOException {
		HttpResponse response = this.doGet(url, queryParams);
		return response != null ? response.getEntity().getContent() : null;
	}

	public String doGetForString(String url, Map<String, String> queryParams)
			throws URISyntaxException, ClientProtocolException, IOException {
		return HttpsUtil.readStream(this.doGetForStream(url, queryParams), null);
	}

	/**
	 * 基本的Get请求
	 * 
	 * @param url
	 *            请求url
	 * @param queryParams
	 *            请求头的查询参数
	 * @return
	 * @throws URISyntaxException
	 * @throws IOException
	 * @throws ClientProtocolException
	 */
	public HttpResponse doGet(String url, Map<String, String> queryParams)
			throws URISyntaxException, ClientProtocolException, IOException {
		HttpGet gm = new HttpGet();
		URIBuilder builder = new URIBuilder(url);
		// 填入查询参数
		if (queryParams != null && !queryParams.isEmpty()) {
			builder.setParameters(HttpsUtil.paramsConverter(queryParams));
		}
		gm.setURI(builder.build());
		return client.execute(gm);
	}

	public InputStream doPostForStream(String url, Map<String, String> queryParams)
			throws URISyntaxException, ClientProtocolException, IOException {
		HttpResponse response = this.doPost(url, queryParams, null);
		return response != null ? response.getEntity().getContent() : null;
	}

	public String doPostForString(String url, Map<String, String> queryParams)
			throws URISyntaxException, ClientProtocolException, IOException {
		return HttpsUtil.readStream(this.doPostForStream(url, queryParams), null);
	}

	public InputStream doPostForStream(String url, Map<String, String> queryParams, Map<String, String> formParams)
			throws URISyntaxException, ClientProtocolException, IOException {
		HttpResponse response = this.doPost(url, queryParams, formParams);
		logger.info("异步响应:{},{}", response.getEntity().getContent(), response.getStatusLine());
		return response != null ? response.getEntity().getContent() : null;
	}

	public String doPostRetString(String url, Map<String, String> queryParams, Map<String, String> formParams)
			throws URISyntaxException, ClientProtocolException, IOException {
		return HttpsUtil.readStream(this.doPostForStream(url, queryParams, formParams), null);
	}

	/**
	 * 基本的Post请求
	 * 
	 * @param url
	 *            请求url
	 * @param queryParams
	 *            请求头的查询参数
	 * @param formParams
	 *            post表单的参数
	 * @return
	 * @throws URISyntaxException
	 * @throws IOException
	 * @throws ClientProtocolException
	 */
	public HttpResponse doPost(String url, Map<String, String> queryParams, Map<String, String> formParams)
			throws URISyntaxException, ClientProtocolException, IOException {
		HttpPost pm = new HttpPost();
		URIBuilder builder = new URIBuilder(url);
		// 填入查询参数
		if (queryParams != null && !queryParams.isEmpty()) {
			builder.setParameters(HttpsUtil.paramsConverter(queryParams));
		}
		pm.setURI(builder.build());
		// 填入表单参数
		if (formParams != null && !formParams.isEmpty()) {
			pm.setEntity(new UrlEncodedFormEntity(HttpsUtil.paramsConverter(formParams), defaultEncoding));
		}

		// pm.setConfig(getRequestConfig());

		return client.execute(pm);
	}

	/**
	 * 多块Post请求
	 * 
	 * @param url
	 *            请求url
	 * @param queryParams
	 *            请求头的查询参数
	 * @param formParts
	 *            post表单的参数,支持字符串-文件(FilePart)和字符串-字符串(StringPart)形式的参数
	 * @throws URISyntaxException
	 * @throws ClientProtocolException
	 * @throws HttpException
	 * @throws IOException
	 */
	public HttpResponse multipartPost(String url, Map<String, String> queryParams, List<FormBodyPart> formParts)
			throws URISyntaxException, ClientProtocolException, IOException {
		HttpPost pm = new HttpPost();
		URIBuilder builder = new URIBuilder(url);
		// 填入查询参数
		if (queryParams != null && !queryParams.isEmpty()) {
			builder.setParameters(HttpsUtil.paramsConverter(queryParams));
		}
		pm.setURI(builder.build());
		// 填入表单参数
		if (formParts != null && !formParts.isEmpty()) {
			MultipartEntityBuilder entityBuilder = MultipartEntityBuilder.create();
			entityBuilder = entityBuilder.setMode(HttpMultipartMode.BROWSER_COMPATIBLE);
			for (FormBodyPart formPart : formParts) {
				entityBuilder = entityBuilder.addPart(formPart.getName(), formPart.getBody());
			}
			pm.setEntity(entityBuilder.build());
		}
		return client.execute(pm);
	}

	/**
	 * 获取当前Http客户端状态中的Cookie
	 * 
	 * @param domain
	 *            作用域
	 * @param port
	 *            端口 传null 默认80
	 * @param path
	 *            Cookie路径 传null 默认"/"
	 * @param useSecure
	 *            Cookie是否采用安全机制 传null 默认false
	 * @return
	 */
	public Map<String, Cookie> getCookie(String domain, Integer port, String path, Boolean useSecure) {
		if (domain == null) {
			return null;
		}
		if (port == null) {
			port = 80;
		}
		if (path == null) {
			path = "/";
		}
		if (useSecure == null) {
			useSecure = false;
		}
		List<Cookie> cookies = cookieStore.getCookies();
		if (cookies == null || cookies.isEmpty()) {
			return null;
		}

		CookieOrigin origin = new CookieOrigin(domain, port, path, useSecure);
		BestMatchSpec cookieSpec = new BestMatchSpec();
		Map<String, Cookie> retVal = new HashMap<String, Cookie>();
		for (Cookie cookie : cookies) {
			if (cookieSpec.match(cookie, origin)) {
				retVal.put(cookie.getName(), cookie);
			}
		}
		return retVal;
	}

	/**
	 * 批量设置Cookie
	 * 
	 * @param cookies
	 *            cookie键值对图
	 * @param domain
	 *            作用域 不可为空
	 * @param path
	 *            路径 传null默认为"/"
	 * @param useSecure
	 *            是否使用安全机制 传null 默认为false
	 * @return 是否成功设置cookie
	 */
	public boolean setCookie(Map<String, String> cookies, String domain, String path, Boolean useSecure) {
		synchronized (cookieStore) {
			if (domain == null) {
				return false;
			}
			if (path == null) {
				path = "/";
			}
			if (useSecure == null) {
				useSecure = false;
			}
			if (cookies == null || cookies.isEmpty()) {
				return true;
			}
			Set<Entry<String, String>> set = cookies.entrySet();
			String key = null;
			String value = null;
			for (Entry<String, String> entry : set) {
				key = entry.getKey();
				if (key == null || key.isEmpty() || value == null || value.isEmpty()) {
					throw new IllegalArgumentException("cookies key and value both can not be empty");
				}
				BasicClientCookie cookie = new BasicClientCookie(key, value);
				cookie.setDomain(domain);
				cookie.setPath(path);
				cookie.setSecure(useSecure);
				cookieStore.addCookie(cookie);
			}
			return true;
		}
	}

	/**
	 * 设置单个Cookie
	 * 
	 * @param key
	 *            Cookie键
	 * @param value
	 *            Cookie值
	 * @param domain
	 *            作用域 不可为空
	 * @param path
	 *            路径 传null默认为"/"
	 * @param useSecure
	 *            是否使用安全机制 传null 默认为false
	 * @return 是否成功设置cookie
	 */
	public boolean setCookie(String key, String value, String domain, String path, Boolean useSecure) {
		Map<String, String> cookies = new HashMap<String, String>();
		cookies.put(key, value);
		return setCookie(cookies, domain, path, useSecure);
	}

	public RequestConfig getRequestConfig() {
		return RequestConfig.custom().setConnectionRequestTimeout(REQUEST_TIMEOUT) // 设置从connect
																					// Manager获取Connection
																					// 超时时间,单位毫秒
				.setConnectTimeout(CONNECT_TIMEOUT) // 设置连接超时时间,单位毫秒
				.setSocketTimeout(CONNECT_TIMEOUT) // 请求获取数据的超时时间,单位毫秒
				.build();
	}

}

 

    over!!!

 

分享到:
评论

相关推荐

    DelayQueue延迟队列和Redis缓存实现订单自动取消功能

    这个特性使得DelayQueue成为实现定时任务和延迟操作的理想工具。在电商系统中,例如订单管理,这种功能非常有用,比如自动取消未付款的订单。 DelayQueue的核心接口是`java.util.concurrent.Delayed`,实现这个...

    DelayQueue、Redis结合使延迟、定时任务使用源代码

    本文将深入探讨如何利用Java中的`DelayQueue`和Redis来实现这一功能。`DelayQueue`是Java并发库`java.util.concurrent`中的一个数据结构,它是一个基于优先级队列的无界阻塞队列,可以用于存储具有延迟时间的元素。...

    springboot执行延时任务之DelayQueue的使用详解

    在Spring Boot中使用DelayQueue可以实现延时任务,例如给客户异步转账操作超时后发通知告知用户,还有客户下单后多长时间内没支付则取消订单等等。 DelayQueue的使用步骤如下: 1. 定义DelayTask类,实现Delayed...

    DelayQueue

    学习视频,可以丰富java知识。能够获得更多的专业技能

    DelayQueue的使用以及注意事项

    DelayQueue的使用以及注意事项,这里需要由BlockingQueue的基本知识,一般的Queue的使用方法poll(),take(),drainTo()和offer(),put()这些应该懂。

    聚合支付系统架构演进.pptx

    聚合支付系统架构演进是支付领域的一个重要话题,它涉及到技术升级、系统优化以及业务的持续发展。在本文中,我们将深入探讨从1.0版本到2.0版本的聚合支付系统架构变化,并讨论可能的3.0版本的未来发展趋势。 1.0 ...

    JDK自带的延迟队列-DelayQueue

    - **消息队列**: 在消息队列系统中,可以使用`DelayQueue`来实现延迟发送消息的功能。 6. **注意点** - `DelayQueue`中的元素延迟时间一旦设定,就无法修改。如果需要动态调整延迟时间,需要重新创建元素对象并...

    Java多线程并发开发之DelayQueue使用示例

    1. 任务调度:DelayQueue可以用于实现任务调度系统,按照任务的延迟时间来排序,以便在合适的时刻执行任务。 2. 网络编程:DelayQueue可以用于实现网络编程中的延迟处理,例如在网络游戏中,DelayQueue可以用于实现...

    java利用delayedQueue实现本地的延迟队列

    DelayedQueue 的实现是基于 Java 中的阻塞队列的接口 BlockingQueue, DelayQueue 是其的一种实现。 DelayQueue 提供了一个无界的阻塞队列,用于存放实现了 Delayed 接口的对象。 DelayQueue 能够保证队列中的对象...

    基于Netty+SpringBoot+LevelDB实现的高性能、高可靠性的消息队列+源代码+文档说明

    1.延迟消息BUG:延时消息基于jdk自带的delayQueue实现,系统宕机重启后服务端读取leveldb中的消息后将消息重新放回延时队列,会重新设置到期时间。例如:设置一条消息5分钟后推送,中途系统宕机,系统重启后会从当前...

    delay-queue:JDK实现的本地delayQueue和基于分布式Redis的两种分布式

    local delayQueue implemented by JDK & two kinds of distributed delayQueue based redis 1. 基本介绍 RedisSynDelayQueue 基于redis,并发情况下会加分布式锁,单线程场景(syn=false)性能较好, 并发场景性能较...

    高效的实现队列

    在IT行业中,队列是一种非常基础且重要的数据结构,它遵循先进先出(FIFO,First In First Out)的原则。本篇文章将详细讲解如何高效地...通过理解和掌握不同的队列实现,开发者能够更好地优化系统性能,解决实际问题。

    Java 实战:电商系统订单超时自动取消多种实现方案与优化探讨

    再往下介绍了使用像RabbitMQ之类的消息中间件来进行异步通知的可能性以及如何应对可能出现的服务阻塞难题;并且提到了运用Quartz这样的批处理工具,这使得我们可以更方便规划复杂的重复活动尽管响应速度稍慢一点;还...

    高效延时队列的设计与实现

    DelayQueue是一个基于优先级队列的数据结构,插入的元素必须实现Delayed接口,通过getDelay方法返回剩余延迟时间。当延迟时间到达零时,元素才能被消费。 RabbitMQ实现延时队列的基本原理: RabbitMQ结合消息的TTL...

    基于DelayQueue的简单的定时任务队列.zip

    基于DelayQueue的简单的定时任务队列.zip Quick Start class Main { public static void main(String[] args) { // 初始化任务队列 JobScheduler scheduler = new JobScheduler("default"); // 向队列中提交任务...

    php-delayqueue:基于redis实现高可用,易拓展,接入方便,生产环境稳定运行的延迟队列

    2.订单支付成功后,5分钟后检测下游环节是否都正常,比如用户购买会员后,各种会员状态是否都设置成功 3.如何定期检查处于退款状态的订单是否已经退款成功? 4.实现通知失败,1,3,5,7分钟重复通知,直到对方回复...

    基于SpringBoot的延迟消息Starter设计源码,支持DelayQueue、Redisson、RabbitMQ三种方式

    该项目是SpringBoot框架下的延迟消息Starter,提供对DelayQueue、Redisson和RabbitMQ三种延迟消息机制的集成支持。项目包含32个文件,涵盖24个Java源文件、4个XML配置文件、1个Git忽略文件、1个Markdown文件、1个...

    Java企业版中性能调节的最佳实践.pdf

    - **应用架构调优**:优化数据访问层、缓存机制、异步处理等,采用微服务架构可提高系统的可伸缩性和可维护性。 #### 五、调优过程中的注意事项 在进行调优过程中,需要注意以下几点: - 在没有明确且共同认可的...

    Java实现商城订单超时取消功能

    在Java中实现商城订单超时取消功能,主要是利用了JDK中的`DelayQueue`数据结构。`DelayQueue`是一个无界的阻塞队列,它仅允许存放实现了`Delayed`接口的元素,这些元素只有在到达指定延迟时间后才能被取出。这种特性...

    基于Redis实现的延迟消息队列

    #### 整体结构 整个延迟队列由4个部分组成: 1. JobPool用来存放所有Job的元信息。 2. DelayBucket是一组以时间为维度的有序队列,用来存放所有需要延迟的Job(这里只存放Job Id)。 3. Timer负责实时扫描各个...

Global site tag (gtag.js) - Google Analytics