`
seandeng888
  • 浏览: 158153 次
  • 性别: Icon_minigender_1
  • 来自: 厦门
社区版块
存档分类
最新评论

大数据框架hadoop之Observe设计模式应用

阅读更多

       Observer观察者设计模式是行为模式的一种,它的作用是当一个对象的状态发生变化时,能够自动通知其他关联对象,自动刷新对象状态

       Observer模式提供给关联对象一种同步通信的手段,使某个对象与依赖它的其他对象之间保持状态同步。如下用代码的形式来展现被观察者(新闻出版社)和观察者(与之关联的订户观察者对象)是如何保持信息同步的。

1       被观察者-新闻出版社

/**

 * NewsPublisher: 新闻出版社

 */

class NewsPublisher extends Observable {

    public void publishNews(String newsTitle, String newsBody) {

        News news = new News(newsTitle, newsBody);

        setChanged();   //通过setChanged()方法标明对象的状态已发生变化

        this.notifyObservers(news);   //通知各Observer,并发送一个名为news对象的消息

 

        // ... ...

    }

}

2       观察者-订户观察者

/**

 * 订户观察者。

 * Created by myuser on 2014/11/29.

 */

public class SubscriberObserver implements Observer {

    // 新闻出版社调用notifyObservers(news)方法,自动调用如下方法以保持信息同步。

    public void update(Observable observee, Object param) {

        if (param instanceof News) {

            mail2Subscriber((News)param);

        }

    }

 

    private void mail2Subscriber(News news) {

        System.out.println("Mail to subscriber. A news published with title:" + news.getTitle());

    }

}

3       构造者

    被观察者调用notifyObservers()方法后,为什么观察者就能接收到呢?那是因为有构造者这个角色,它将观察者添加到被观察者的依赖对象里面,代码如下:

public class Client {

    /**

     * Test Observer Pattern

     */

    public static void main(String[] args) {

        NewsPublisher publisher = new NewsPublisher();

        // 添加订户观察者依赖对象

        publisher.addObserver(new SubscriberObserver());

        //发布新闻,触发通知事件

        publisher.publishNews("Hello news", "news body");

    }

}

上面是一个简单的Observer观察者设计模式的实例。接下来看看大数据框架hadoop是如何应用该设计模式的。

应用场景如下:JobTracker收到作业后,并不会马上对其初始化,而是交给调度器,由它按照一定的策略对作业进行初始化。

4       被观察者-JobTracker

    JobTracker进行作业添加(执行addJob()方法)时,会同步该消息到对应的观察者(JobInProgressListener )那里,代码如下:

publicclass JobTracker {

  privatefinal List<JobInProgressListener> jobInProgressListeners =

    new CopyOnWriteArrayList<JobInProgressListener>();

  private synchronized JobStatus addJob(JobID jobId, JobInProgress

job) {

      ... ...

      for (JobInProgressListener listener : jobInProgressListeners) {

        listener.jobAdded(job);  //通知各Observer,并发送job消息

      }

      ... ...

  }

}

5       观察者-JobQueueJobInProgressListener

class JobQueueJobInProgressListener extends JobInProgressListener {

  private Map<JobSchedulingInfo, JobInProgress> jobQueue;

  @Override

  publicvoid jobAdded(JobInProgress job) {

    // 将作业添加到作业队列里。

    jobQueue.put(new JobSchedulingInfo(job.getStatus()), job);

  }

}

6       构造者

    接下来看一下JobTrackerJobQueueJobInProgressListener的依赖关系是如何建立起来的。

    先看一下JobTracker类的一些信息,代码如下所示:

publicclass JobTracker implements MRConstants, InterTrackerProtocol,

    JobSubmissionProtocol, TaskTrackerManager, RefreshUserMappingsProtocol,

    RefreshAuthorizationPolicyProtocol, AdminOperationsProtocol,

JobTrackerMXBean {

  // 该类实例化的时候,会从配置文件的属性mapred.jobtracker.taskScheduler获取调度器的类名,然后实例化一个调度器作为JobTracker的一个属性。代码如下所示:

  JobTracker(final JobConf conf, String identifier, Clock clock, QueueManager qm) {

... ...

    Class<? extends TaskScheduler> schedulerClass

      = conf.getClass("mapred.jobtracker.taskScheduler",

          JobQueueTaskScheduler.class, TaskScheduler.class);

taskScheduler = (TaskScheduler) ReflectionUtils.newInstance(schedulerClass, conf);

... ...

  }

  // JobTracker类被运行的时候会去调用startTracker()静态方法和offerService(),代码如下所示:

  publicstaticvoid main(String argv[]

                          ) throws IOException, InterruptedException {

... ...

    JobTracker tracker = startTracker(new JobConf());

tracker.offerService();

... ...

  }

// startTracker()静态方法首先实例一个JobTracker类,然后将当前实例赋给调度器的taskTrackerManager属性。

publicstatic JobTracker startTracker(JobConf conf, String identifier)

  throws IOException, InterruptedException {

... ...

    result = new JobTracker(conf, identifier);

result.taskScheduler.setTaskTrackerManager(result);

... ...

returnresult;

  }

  // offerService()方法调用调度器的start()方法。

  publicvoid offerService() throws InterruptedException, IOException {

    ... ...

taskScheduler.start();

... ...

  }

}

// 如下是调度器的start()方法,该方法将JobQueueJobInProgressListener类添加到JobTracker的依赖属性里,也即构造了JobTrackerJobQueueJobInProgressListener的被观察者与观察者关系。

class JobQueueTaskScheduler extends TaskScheduler {

  public JobQueueTaskScheduler() {

    this.jobQueueJobInProgressListener=new JobQueueJobInProgressListener();

  }

  publicsynchronizedvoid start() throws IOException {

    ... ...

taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);

... ...

  }

 

}

1
0
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics