`
mayday85
  • 浏览: 46380 次
  • 性别: Icon_minigender_1
  • 来自: 污点星狗屎国
文章分类
社区版块
存档分类
最新评论

Java 异步消息处理

阅读更多
一.    它要能适应不同类型的请求:
本节用 makeString来说明要求有返回值的请求.用displayString来说明不需要返回值的请求.
二.    要能同时并发处理多个请求,并能按一定机制调度:
本节将用一个队列来存放请求,所以只能按FIFO机制调度,你可以改用LinkedList,就可以简单实现一个优先级(优先级高的addFirst,低的addLast).
三.    有能力将调用的边界从线程扩展到机器间(RMI)
四.    分离过度耦合,如分离调用句柄(取货凭证)和真实数据的实现.分离调用和执行的过程,可以尽快地将调返回.

现在看具体的实现:
public interface Axman {
  Result resultTest(int count,char c);
  void noResultTest(String str);
}

这个接口有两个方法要实现,就是有返回值的调用resultTest和不需要返回值的调用
noResultTest, 我们把这个接口用一个代理类来实现,目的是将方法调用转化为对象,这样就可以将多个请求(多个方法调)放到一个容器中缓存起来,然后统一处理,因为 Java不支持方法指针,所以把方法调用转换为对象,然后在这个对象上统一执行它们的方法,不仅可以做到异步处理,而且可以将代表方法调用的请求对象序列 化后通过网络传递到另一个机器上执行(RMI).这也是Java回调机制最有力的实现.
    一个简单的例子.
    如果 1: 做A
    如果 2: 做B
如果 3: 做C
如果有1000个情况,你不至于用1000个case吧?以后再增加呢?
所以如果C/C++程序员,会这样实现: (c和c++定义结构不同)

type define struct MyStruct{
int mark;
(*fn) ();
} MyList;
    
    然后你可以声明这个结构数据:
    {1,A,
     2,B
     3,C
}

做一个循环:
for(i=0;i<length;i++) {
    if(数据组[i].mark == 传入的值) (数据组[i].*fn)();
}

简单说c/c++中将要被调用的涵数可以被保存起来,然后去访问,调用,而Java中,我们无法将一个方法保存,除了直接调用,所以将要调用的方法用子类来实现,然后把这些子类实例保存起来,然后在这些子类的实现上调用方法:
interface My{
    void test();
}

class A implements My{
    public void test(){
        System.out.println(“A”):
}
}
class B implements My{
    public void test(){
        System.out.println(“B”):
}
}

class C implements My{
    public void test(){
        System.out.println(“C”):
}
}

class MyStruct {
    
    int mark;
    My m;
    public MyStruct(int mark,My m){this.mark = amrk;this.m = m}
}

数组:
{ new MyStruct(1,new A()),new MyStruct(2,new B()),new MyStruct(3,new C())}
for(xxxxxxxxx) if(参数 ==数组[i].mark) 数组[i].m.test();


这样把要调用的方法转换为对象的保程不仅仅是可以对要调用的方法进行调度,而且可以把对象序列化后在另一台机器上执行,这样就把调用边界从线程扩展到了机器.

回到我们的例子:
class Proxy implements Axman{
  private final Scheduler scheduler;
  private final Servant servant;

  public Proxy(Scheduler scheduler,Servant servant){
    this.scheduler = scheduler;
    this.servant = servant;
  }
  public Result resultTest(int count,char c){
    FutureResult futrue = new FutureResult();
    this.scheduler.invoke(new ResultRequest(servant,futrue,count,c));
    return futrue;
  }

  public void noResultTest(String str){
    this.scheduler.invoke(new NoResultRequest(this.servant,str));
  }
}


其中scheduler是管理对调用的调度, servant是真正的对方法的执行:

Servant就是去真实地实现方法:

class Servant implements Axman{
  public Result resultTest(int count,char c){
    char[] buf = new char[count];
    for(int i = 0;i < count;i++){
      buf[i] = c;
      try{
        Thread.sleep(100);
      }catch(Throwable t){}
    }
    return new RealResult(new String(buf));
  }

  public void noResultTest(String str){
    try{
      System.out.println("displayString :" + str);
      Thread.sleep(10);
    }catch(Throwable t){}
  }
}

在scheduler 将方法的调用(invkoe)和执行(execute)进行了分离,调用就是开始”注册”方法到要执行的容器中,这样就可以立即返回出来.真正执行多久就 是execute的事了,就象一个人点燃爆竹的引信就跑了,至于那个爆竹什么时候爆炸就不是他能控制的了.
public class Scheduler extends Thread {
  private final ActivationQueue queue;
  public Scheduler(ActivationQueue queue){
    this.queue = queue;
  }

  public void invoke(MethodRequest request){
    this.queue.putRequest(request);
  }

  public void run(){
    while(true){

      //如果队列中有请求线程,测开始执行请求
      MethodRequest request = this.queue.takeRequest();
      request.execute();
    }
  }
}

在scheduler中只用一个队列来保存代表方法和请求对象,实行简单的FIFO调用,你要实更复杂的调度就要在这里重新实现:
class ActivationQueue{
  private static final int MAX_METHOD_REQUEST = 100;
  private final MethodRequest[] requestQueue;
  private int tail;
  private int head;
  private int count;

  public ActivationQueue(){
    this.requestQueue = new MethodRequest[MAX_METHOD_REQUEST];
    this.head = this.count = this.tail = 0;
  }

  public synchronized void putRequest(MethodRequest request){
    while(this.count >= this.requestQueue.length){
      try {
        this.wait();
      }
      catch (Throwable t) {}
    }
    this.requestQueue[this.tail] = request;
    tail = (tail + 1)%this.requestQueue.length;
    count ++ ;
    this.notifyAll();

  }


  public synchronized MethodRequest takeRequest(){
    while(this.count <= 0){
      try {
        this.wait();
      }
      catch (Throwable t) {}
 
    }

    MethodRequest request = this.requestQueue[this.head];
    this.head = (this.head + 1) % this.requestQueue.length;
    count --;
    this.notifyAll();
    return request;
  }
}


为了将方法调用转化为对象,我们通过实现MethodRequest对象的execute方法来方法具体方法转换成具体对象:
abstract class MethodRequest{
  protected final Servant servant;
  protected final FutureResult future;

  protected MethodRequest(Servant servant,FutureResult future){
    this.servant = servant;
    this.future = future;
  }

  public abstract void execute();
}

class ResultRequest extends MethodRequest{
  private final int count;
  private final char c;
  public ResultRequest(Servant servant,FutureResult future,int count,char c){
    super(servant,future);
    this.count = count;
    this.c = c;
  }
  public void execute(){
    Result result = servant.resultTest(this.count,this.c);
    this.future.setResult(result);
  }
}

class NoResultRequest extends MethodRequest{
  private String str;
  public NoResultRequest(Servant servant,String str){
    super(servant,null);
    this.str = str;
  }

  public void execute(){
    this.servant.noResultTest(str);
  }
}


而返回的数据我们也将真实数据的获取和取货凭证逻辑分离:
package com.axman.jasync;

public abstract class Result {
  public abstract Object getResult();
}

class FutureResult extends Result{
  private Result result;
  private boolean completed;

  public synchronized void setResult(Result result){
    this.result = result;
    this.completed = true;
    this.notifyAll();
  }

  public synchronized Object getResult(){
    while(!this.completed){
      try{
        this.wait();
      }catch(Throwable t){}
    }
    return this.result.getResult();
  }
}

class RealResult extends Result{
  private final Object result;

  public RealResult(Object result){
    this.result = result;
  }
  public Object getResult(){
    return this.result;
  }
}

OK,现在这个异步消息处理器已经有了模型,这个异步处理器中有昭雪些对象参与呢?
    Servant 忠心做真实的事务
    ActivationQueue将请求缓存起来以便调度
    Scheduler对容器中的请求根据一定原则进行调度执行
    Proxy将特定方法请求转换为特定对象
所有这些都是这个异步处理器的核心部件,虽然是核心部件,我们就要进行封装而不能随便让调用者来修改,所以我们用工厂模式(我KAO,我实在不想提模式但有时找不到其它词来表述)来产生处理器Axman对象:
package com.axman.jasync;

public class AxmanFactory {
  public static Axman createAxman() {
    Servant s = new Servant();
    ActivationQueue queue = new ActivationQueue();
    Scheduler st = new Scheduler(queue);
    Proxy p = new Proxy(st,s);
    st.start();
    return p;
  }
}

好了,我们现在用两个请求的产生者不停产生请求:
ResultInvokeThreadv 发送有返回值的请求:
package com.axman.jasync;

public class ResultInvokeThread extends Thread{
  private final Axman ao;
  private final char c;
  public ResultInvokeThread(String name,Axman ao){
    this.ao = ao;
    this.c = name.charAt(0);
  }

  public void run(){
    try{
      int i = 0;
      while(true){
        Result result  = this.ao.resultTest(i++,c);
        Thread.sleep(10);
        String  = (String)result.getResult();
        System.out.println(Thread.currentThread().getName() + "  = " + );
      }
    }
    catch(Throwable t){}
  }
}


NoResultInvokeThread发送无返回值的请求:
package com.axman.jasync;

public class NoResultInvokeThread extends Thread{
  private final Axman ao;
  public NoResultInvokeThread(String name,Axman ao){
    super(name);
    this.ao = ao;
  }

  public void run(){
    try{
      int i = 0;
      while(true){
        String s = Thread.currentThread().getName() + i++;
        ao.noResultTest(s);
        Thread.sleep(20);
      }
    }
    catch(Throwable t){}
  }
}


对了,我们还需要一个什么东西来产生一个演示:
package com.axman.jasync;

public class Program {
  public static void main(String[] args) {
    Axman ao = AxmanFactory.createAxman();
    new ResultInvokeThread("Axman",ao).start();
    new ResultInvokeThread("Sager",ao).start();
    new NoResultInvokeThread("Macke",ao).start();
  }
}

看看结果吧.你可以把不同类型的请求不断地向处理器发送,处理器会不断地接收请求,放到队列中,并同时不断从队列中提出请求进行处理.
分享到:
评论
1 楼 snail_gtt 2009-07-08  
这个例子很好,目前我遇到的一个问题正好和这个方案相当吻合。受教了~

不过有一个问题,在这里我先提出来。

首先我发了很多请求过来了,然后由ActivationQueue缓存起来了。
如果在这些请求尚未处理完成时,服务器停了,再启动后,这些未完成的请求就没有了。

这种情况是否意味着,需要将请求持久化?

相关推荐

    Java_异步消息处理

    ### Java异步消息处理知识点详解 #### 一、适应不同类型的请求 在Java异步消息处理中,系统设计需要能够适应不同类型的请求。这通常包括两种类型:一种是有返回值的请求,另一种是没有返回值的请求。 - **有...

    javaEE 异步消息处理

    总结起来,JavaEE异步消息处理通过JMS和消息队列实现了客户端和服务端之间的解耦和高效通信,利用线程池和异步处理提升了系统的并发性和响应能力。开发者可以利用各种框架和工具,如EJB的MDB或Spring的JMS支持,轻松...

    异步处理(JAVA)

    能同时并发处理多个请求,并能按一定机制调度: 用一个队列来存放请求,所以只能按FIFO机制调度,你可以改用LinkedList,就可以简单实现一个优先级(优先级高的addFirst,低的addLast). 三.有能力将调用的边界从线程扩展到...

    异步消息处理线程

    在Android开发中,异步消息处理是一种常见的多线程编程技术,主要用于解决UI线程(主线程)与后台任务之间的通信问题。标题“异步消息处理线程”着重强调了这种处理方式,它允许开发者在不阻塞主线程的情况下执行...

    Java异步编程框架之Promise介绍

    Callback是最原始的异步处理方式,通过为被调用函数设置回调函数,当被调用函数执行完成后,调用这个回调函数来处理结果。这种方式的缺点是控制权反转,增加了代码的复杂性。 Java中使用Future来处理异步操作,但它...

    java异步通信示例

    Java异步通信是一种提高应用程序效率和响应速度的技术,它允许程序在等待IO操作完成时执行其他任务,而不是阻塞等待。在这个示例中,我们主要关注的是Java NIO(非阻塞I/O)和异步Socket。NIO是Java SE 1.4引入的一...

    Java异步调用转同步方法实例详解

    Java异步调用转同步方法实例详解 Java异步调用转同步方法实例详解是指在Java中将异步调用转换为同步调用的技术,主要用于解决异步调用过程中的阻塞问题。异步调用是一种非阻塞的调用方式,调用方在调用过程中,不...

    java文件异步上传

    总结起来,Java文件异步上传是一个涉及前端JavaScript插件和后端Java服务器处理的复杂过程。`jQuery.uploadify`提供了一个方便的工具来实现这一功能,而Java端则需要编写相应的接口来接收并处理上传的文件。理解并...

    javaHttp异步请求

    Java中的HTTP异步请求是一种高效的网络通信方式,它允许程序在发送HTTP请求后不等待响应,而是立即继续执行其他任务,当服务器响应时,通过回调函数处理结果。这种方式避免了同步请求时线程阻塞的问题,提高了应用的...

    Java异步处理机制实例详解

    Java异步处理机制实例详解 Java异步处理机制是一种常见的编程技术,用于提高程序的性能和响应速度。异步处理机制的核心思想是将某个处理过程分解成多个线程同时处理,以提高处理速度和效率。下面将对Java异步处理...

    Java使用starling分布式消息队列异步处理事务

    在IT行业中,尤其是在Java开发领域,异步处理和事务管理是两个重要的概念,它们对于提升系统性能和保证数据一致性起着关键作用。本篇将详细探讨如何使用Starling这一分布式消息队列来实现异步处理事务。 首先,...

    android中异步消息处理

    ### Android中异步消息处理...Looper负责创建消息循环并在循环中处理消息,MessageQueue用于存储消息,而Handler则作为消息的发送者和接收者。掌握这些基本概念对于理解和解决Android应用程序中的多线程问题至关重要。

    JAVA异步通信教程

    ### JAVA异步通信教程 #### 一、JAVA NIO2 异步通道概述 随着JAVA 7的发布,异步通信成为了开发高性能服务器程序的重要工具之一。为了更好地理解和使用JAVA 7提供的NIO2(New Input/Output)异步通道功能,本教程...

    电商异步消息系统的实践

    3. 消息积压:大量未处理消息可能导致队列积压,需监控并采取扩容、优化处理逻辑等措施。 总结,电商领域的异步消息系统是提高系统效率和稳定性的重要手段,理解其原理和实践是构建高效电商平台的关键。在设计和...

    Java异步技术原理和实践

    ### Java异步技术原理与实践 #### 一、引言 在现代软件开发尤其是微服务架构下,系统之间复杂的依赖关系和技术挑战日益增加。本文旨在深入探讨Java中的异步技术,包括其背后的原理以及如何在实际场景中运用这些...

    Java异步技术原理和实践.rar_Java异步开发

    Java异步技术是现代软件开发中的重要组成部分,尤其在高并发、高性能的系统设计中扮演着关键角色。本文将深入探讨Java异步技术的原理,包括阻塞与非阻塞I/O、轮询机制以及I/O多路复用,并结合实践进行详细讲解。 一...

    _java中异步socket类的实现和源代码.doc

    在 Java 中实现异步 socket 通讯是一种高效的方式,尤其是在需要实时处理大量数据时。下面我们将详细介绍异步 socket 类的实现和源代码。 首先,我们需要了解 Java 中的 socket 类是如何工作的。通常情况下,Java ...

    Java Http异步加载图片

    在Java编程中,异步加载图片是一项常见的任务,特别是在开发Android或者Web应用时,为了提高用户体验,我们需要在后台线程中加载图片,而不是阻塞主线程。本篇将深入讲解如何利用Java原生类实现Http异步加载图片的...

    java异步server

    Java异步服务器是一种高效处理多个客户端连接的技术,它与传统的同步服务器模型相比,具有更高的并发性和资源利用率。在Java中实现异步服务器,通常会利用NIO(Non-blocking Input/Output)或者Java 7引入的AIO...

    java android业务异步编程小技巧

    1. **AsyncTask**:Android提供的一个轻量级异步处理类,适合用于短时间的后台任务。AsyncTask有三个泛型参数,分别代表Progress、Params和Result类型。它的执行流程包括onPreExecute()(预执行),doInBackground...

Global site tag (gtag.js) - Google Analytics