`

enchance的学习

阅读更多
<ehcache>
<!-- Sets the path to the directory where cache .data files are created.

         If the path is a Java System Property it is replaced by
         its value in the running VM.

         The following properties are translated:
         user.home - User's home directory
         user.dir - User's current working directory
         java.io.tmpdir - Default temp file path -->
<diskStore path="java.io.tmpdir"/>
<!--
        Specifies a CacheManagerEventListenerFactory  which will be used to create a CacheManagerPeerProvider, which
        is notified when Caches are added or removed from the CacheManager.

        The attributes of CacheManagerEventListenerFactory are:
        - class - a fully qualified factory class name
        - properties - comma separated properties having meaning only to the factory.

        Sets the fully qualified class name to be registered as the CacheManager event listener.

    The events include:
    - adding a Cache
    - removing a Cache

    Callbacks to listener methods are synchronous and unsynchronized. It is the responsibility of the implementer
    to safely handle the potential performance and thread safety issues depending on what their listener is doing.

    If no class is specified, no listener is created. There is no default.
    -->
<!--<cacheManagerEventListenerFactory class="" properties=""/>-->
<!--Default Cache configuration. These will applied to caches programmatically created through
    the CacheManager.

    The following attributes are required:

    maxElementsInMemory            - Sets the maximum number of objects that will be created in memory
    eternal                        - Sets whether elements are eternal. If eternal,  timeouts are ignored and the
                                     element is never expired.
    overflowToDisk                 - Sets whether elements can overflow to disk when the in-memory cache
                                     has reached the maxInMemory limit.

    The following attributes are optional:
    timeToIdleSeconds              - Sets the time to idle for an element before it expires.
                                     i.e. The maximum amount of time between accesses before an element expires
                                     Is only used if the element is not eternal.
                                     Optional attribute. A value of 0 means that an Element can idle for infinity.
                                     The default value is 0.
    timeToLiveSeconds              - Sets the time to live for an element before it expires.
                                     i.e. The maximum time between creation time and when an element expires.
                                     Is only used if the element is not eternal.
                                     Optional attribute. A value of 0 means that and Element can live for infinity.
                                     The default value is 0.
    diskPersistent                 - Whether the disk store persists between restarts of the Virtual Machine.
                                     The default value is false.
    diskExpiryThreadIntervalSeconds- The number of seconds between runs of the disk expiry thread. The default value
                                     is 120 seconds.
    memoryStoreEvictionPolicy      - Policy would be enforced upon reaching the maxElementsInMemory limit. Default
                                     policy is Least Recently Used (specified as LRU). Other policies available -
                                     First In First Out (specified as FIFO) and Less Frequently Used
                                     (specified as LFU)
    cacheEventListenerClassNames   - A comma separated list of CacheEventListeners. The fully qualified class name of
                                     the class which implements CacheEventListener is required.
                                     Registered listeners will be notified of cache events such as Element puts, removes and expiries
                                     and Cache status changes. The default if unspecified is an empty notifications list
                                     which does nothing.
    -->
<defaultCache maxElementsInMemory="10000" eternal="false"
timeToIdleSeconds="120" timeToLiveSeconds="120" overflowToDisk="true"
diskPersistent="false" diskExpiryThreadIntervalSeconds="120"
memoryStoreEvictionPolicy="LFU" />
<!--Predefined caches.  Add your cache configuration settings here.
            If you do not have a configuration for your cache a WARNING will be issued when the
            CacheManager starts

            The following attributes are required for defaultCache:

        name                           - Sets the name of the cache. This is used to identify the cache.
                                         It must be unique.
        maxElementsInMemory            - Sets the maximum number of objects that will be created in memory
        eternal                        - Sets whether elements are eternal. If eternal,  timeouts are ignored and the
                                         element is never expired.
        overflowToDisk                 - Sets whether elements can overflow to disk when the in-memory cache
                                         has reached the maxInMemory limit.

        The following attributes are optional:
        timeToIdleSeconds              - Sets the time to idle for an element before it expires.
                                         i.e. The maximum amount of time between accesses before an element expires
                                         Is only used if the element is not eternal.
                                         Optional attribute. A value of 0 means that an Element can idle for infinity.
                                         The default value is 0.
        timeToLiveSeconds              - Sets the time to live for an element before it expires.
                                         i.e. The maximum time between creation time and when an element expires.
                                         Is only used if the element is not eternal.
                                         Optional attribute. A value of 0 means that and Element can live for infinity.
                                         The default value is 0.
        diskPersistent                 - Whether the disk store persists between restarts of the Virtual Machine.
                                         The default value is false.
        diskExpiryThreadIntervalSeconds- The number of seconds between runs of the disk expiry thread. The default value
                                         is 120 seconds.
        memoryStoreEvictionPolicy      - Policy would be enforced upon reaching the maxElementsInMemory limit. Default
                                         policy is Least Recently Used (specified as LRU). Other policies available -
                                         First In First Out (specified as FIFO) and Less Frequently Used
                                         (specified as LFU)
        cacheEventListenerClassNames   - A comma separated list of CacheEventListeners. The fully qualified class name of
                                         the class which implements CacheEventListener is required.
                                         Registered listeners will be notified of cache events such as Element puts, removes and expiries
                                         and Cache status changes. The default if unspecified is an empty notifications list
                                         which does nothing.
        -->

<!-- traceMsgCache -->
<cache name="traceMsgCache" maxElementsInMemory="5000"
overflowToDisk="true" eternal="false" timeToLiveSeconds="0"
timeToIdleSeconds="0" diskExpiryThreadIntervalSeconds="120">
</cache>

</ehcache>




package com.zte.ums.cn.trace.wsf.listener.rpt;

import java.io.File;
import java.util.ArrayList;
import java.util.List;

import net.sf.ehcache.Cache;
import net.sf.ehcache.CacheManager;
import net.sf.ehcache.Element;
import net.sf.ehcache.config.Configuration;
import net.sf.ehcache.config.ConfigurationFactory;

import org.apache.commons.io.FileUtils;

public class RptTraceMsgCacheManager
{
  private static final String CACHE_CONFIG_FILE = "cn-trace-wsf-msg-ehcache.xml";

  /**
   * 缓存管理
   */
  private CacheManager cacheManager;

  /**
   * 网元的缓存
   */
  private Cache traceMsgCache;

  // 缓存文件存放的路径
  private String cachePath = null;

  private static DebugPrn dMsg = new DebugPrn( TraceMsgCacheManager.class
          .getName() );

  private static RptTraceMsgCacheManager instance = null;

  /**
   * 获取单例
   *
   * @return
   */
  public synchronized static RptTraceMsgCacheManager getInstance()
  {
    if ( instance == null )
    {
      instance = new RptTraceMsgCacheManager();
    }
    return instance;
  }

  /**
   * 构造函数
   *
   * @throws EmsTraceException
   *
   */
  public RptTraceMsgCacheManager()
  {

    try
    {
      // 获取配置文件

      File file = FileHelper.getFileInWsf( CACHE_CONFIG_FILE );

      String clinetFSessionId = CommonUtil.getClientFSessionID();

      Configuration configuration = ConfigurationFactory
              .parseConfiguration( file );
      // 设置保存在磁盘中的位置路径

      cachePath = FileHelper.getRuntimeDirectory() + "ems"
              + System.getProperty( "file.separator" ) + "cache"
              + System.getProperty( "file.separator" ) + "traceCache"
              + System.getProperty( "file.separator" ) + "traceMsgCache"
              + System.getProperty( "file.separator" ) + clinetFSessionId;

      configuration.getDiskStoreConfiguration().setPath( cachePath );
      cacheManager = new CacheManager( configuration );
      traceMsgCache = cacheManager.getCache( "traceMsgCache" );
      dMsg.info( "TraceMsgCacheManager is starting." );

    }
    catch( Exception e )
    {
      dMsg.error( e.getMessage(), e );

    }
  }

  /**
   * 新增跟踪消息
   *
   * @param id
   *          跟踪消息索引
   * @param traceMsgInfoBlock
   *          跟踪消息对象
   */
  public void addTraceMsg( String id, TraceMsgInfoBlock traceMsgInfoBlock )
  {
    traceMsgCache.put( new Element( id, traceMsgInfoBlock ) );
  }

  /**
   * 删除跟踪消息
   *
   * @param id
   */
  public void removeTraceMsg( String id )
  {
    traceMsgCache.remove( id );
  }

  /**
   * 批量删除跟踪消息
   *
   * @param ids
   *          跟踪消息id数组
   */
  public void removeTraceMsgs( String[] ids )
  {
    for( int i = 0; ids != null && i < ids.length; i++ )
    {
      traceMsgCache.remove( ids[i] );

    }
  }

  /**
   * 删除全部跟踪消息
   *
   * @param ids
   *          跟踪消息id数组
   */
  public void removeTraceMsgs()
  {
    if ( traceMsgCache != null )
    {
      traceMsgCache.removeAll();
    }

  }

  /**
   * 根据id获取跟踪消息对象
   *
   * @param id
   * @return
   */
  public TraceMsgInfoBlock getTraceMsgInfo( String id )
  {
    TraceMsgInfoBlock msgInfo = null;
    Element element = traceMsgCache.get( id );
    if ( element != null )
    {
      msgInfo = (TraceMsgInfoBlock) element.getValue();
    }
    return msgInfo;
  }

  /**
   * 根据id获取跟踪消息对象
   *
   * @param id
   * @return
   */
  public TraceMsgInfoBlock[] getTraceMsgInfos( String[] ids )
  {
    List TraceMsgInfoList = new ArrayList();
    for( int i = 0; ids != null && i < ids.length; i++ )
    {
      TraceMsgInfoBlock msgInfo = null;
      Element element = traceMsgCache.get( ids[i] );
      if ( element != null )
      {
        msgInfo = (TraceMsgInfoBlock) element.getValue();
        TraceMsgInfoList.add( msgInfo );
      }
    }

    return (TraceMsgInfoBlock[]) TraceMsgInfoList
            .toArray( new DataMessage[TraceMsgInfoList.size()] );
  }

  /**
   * 该缓存销毁时的操作
   */
  public void destroy()
  {
    // 1、清除缓存
    if ( traceMsgCache != null )
    {
      try
      {
        traceMsgCache.removeAll();
      }
      catch( Exception e )
      {
        dMsg.info( e.getMessage(), e );
      }
    }
    if ( cacheManager != null )
    {
      try
      {
        cacheManager.removalAll();
        cacheManager.shutdown();
      }
      catch( Exception e )
      {
        dMsg.info( e.getMessage(), e );
      }
    }
    traceMsgCache = null;
    cacheManager = null;

    // 2、删除缓存文件夹
    try
    {
      File dir = new File( cachePath );
      FileUtils.deleteDirectory( dir );
      dMsg.info( "delete file success,file=" + cachePath );
    }
    catch( Exception e )
    {
      dMsg.info( e.getMessage(), e );
    }

  }

}




import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;


public abstract class AbstractThreadProxy
{

  /**
   * 构造线程池管理类
   *
   * @param threadID
   *          具体某个线程的唯一标识,可通过此标识获得一些本线程独有的一些配置信息
   */
  protected AbstractThreadProxy( String threadID, String stSesssionID )
  {
    /** 为线程名称增加前缀 */
    String threadName = EmsTraceConst.THREAD_PREFIX + threadID;
    this.threadID = threadName;
    this.stSesssionID = stSesssionID;
    // 设置队列的容量
    queue = new BoundedBuffer( getMaxQueueSize() );
    debug.info( "queue size is " + queue.capacity() + ",threadName is "
            + threadName );
    int threadCount = getMaxThreadCount();
    for( int i = 0; i < threadCount; i++ )
    {
      execute( threadName + "_" + i );
    }
  }

  /**
   * 根据不同的业务处理线程,获取相应的允许的最大线程数 modify by
   * wchfu:本方法更改为公有类,允许子类覆盖此方法, 由子类来确定这个方法的最终实现,
   * 但没有特殊需求的话可以不进行覆盖。
   *
   * @return
   */
  protected int getMaxThreadCount()
  {
    return MAX_THREAD;
  }

  /**
   * 子类可以覆盖这个方法,设置消息队列存放的最大消息个数
   *
   * @return
   */
  protected int getMaxQueueSize()
  {
    return MAX_QUEUE_SIZE;
  }

  /**
   * 获取当前队列中存放的对象个数
   *
   * @return
   */
  public int getQueueSize()
  {
    return queue.size();
  }

  /**
   * 此方法暂时不使用,为了保证其他调用方法不发生错误,所以暂时保留
   *
   */
  public final void start()
  {
    // DO Nothing。
  }

  /**
   * 启动线程
   *
   * @param threadFlag
   */
  private final void execute( String threadName )
  {
    try
    {
      WorkerThread worker = new WorkerThread( threadName );

      worker.start();
    }
    catch( Exception ie )
    {
      debug.error( "#pm:start thread error :", ie );
    }
  }

  /**
   * 将数据对象加入到数据队列中,调用此方法时最好不要多线程调用, 如果队列满而无法直接存放
   * ,则需进行多次尝试,以保证最大限度的保证队列中能将数据存放进去, 如果指定时间超过后仍不能存放成功,则失败
   *
   * @param object
   * @return 存放对象是否成果,如果失败,可以等候片刻后再次尝试执行put操作
   * @throws InterruptedException
   */
  public synchronized final boolean offer( Object object )
          throws InterruptedException
  {
    if ( queue.size() >= getMaxQueueSize() )
    {
      debug.info( "The queue is full,please wait.size:" + queue.size()
              + ",thread:" + threadID );
    }

    return this.queue.offer( object, WAIT_TIME );

  }

  /**
   * 将数据对象加入到数据队列中,调用此方法时最好不要多线程调用, 此方法将最大程度的保证入队列成功
   * ,如果队列满则一直等待,直到队列允许插入数据为止, 注意:但是此方法因为要一直等待
   * ,所以使用时要特别注意,防止等待时间过长影响其他业务流程处理, 主要适用于对象必须存入队列成功的这种情况
   *
   * @param object
   * @return 存放对象是否成果,如果失败,可以等候片刻后再次尝试执行put操作
   * @throws InterruptedException
   */
  public synchronized final boolean put( Object object )
          throws InterruptedException
  {
    if ( queue.size() >= getMaxQueueSize() )
    {
      debug.info( "The queue is full,please wait.size:" + queue.size()
              + ",thread:" + threadID );
    }
    else
    {
      this.queue.put( object );
    }

    return true;
  }

  /**
   * 判断tab页是否存在
   *
   * @param stSesssionID
   * @return
   */
  private boolean isTabExist( String stSesssionID )
  {
    TraceTaskInfo[] traceTaskInfos = WsfApplicationContext
            .getTraceCacheSevice().getTraceTaskInfos( stSesssionID );
    if ( traceTaskInfos == null || traceTaskInfos.length == 0 )
    {
      return false;
    }
    else
    {
      return true;
    }
  }

  /**
   * 判断跟踪是否是暂停状态
   *
   * @param stSesssionID
   * @return true
   */
  private boolean isSuspend( String stSesssionID )
  {
    TraceTaskInfo[] traceTaskInfos = WsfApplicationContext
            .getTraceCacheSevice().getTraceTaskInfos( stSesssionID );

    for( int i = 0; i < traceTaskInfos.length; i++ )
    {
      // if ( !"2".equalsIgnoreCase(
      // traceTaskInfos[i].getReservC() ) )
      // {
      // return false;
      // }

      /** 判断是否为挂起 **/
      if ( traceTaskInfos[i].getStatus() != TraceTaskInfo.STATUS_SUSPEND )
      {
        return false;
      }

    }
    return true;

  }

  /**
   * 处理数据对象的方法,子类必须进行实现
   *
   * @param object
   */
  protected abstract void processMessage( Object object );

  /**
   * 工作者类 在其线程中不断地从消息队列中去消息,然后处理
   */
  private class WorkerThread extends Thread
  {
    /**
     * 构造函数
     */
    private WorkerThread( String threadName )
    {
      this.setName( threadName );
    }

    /**
     * 在其线程中不断地从消息队列中取消息,然后处理
     *
     * @concurrency
     */
    public void run()
    {
      Object msg = null;
      int count = 0;
      while( true )
      {
        try
        {
          // 如果tab页已经关闭,则
          if ( !isTabExist( stSesssionID ) )
          {
            // 休眠1秒,防止queue中某些消息没有存入完成
            Thread.sleep( 1000 );
            /** 清除对应的队列维护 */
            debug
                    .info( "Tracetask is close, so remove tabviewqueen,stSesssionID= "
                            + stSesssionID );
            RptTraceMsgTreadManager.getInstance().removeTraceMsgThread(
                    stSesssionID );

            debug
                    .info( "Tracetask is close, clear cache begining,stSesssionID="
                            + stSesssionID );
            while( queue.size() > 0 )
            {
              msg = queue.take();
              String index = (String) msg;
              /** 清空一下缓存 */
              RptTraceMsgCacheManager.getInstance().removeTraceMsg( index );
            }

            debug.info( "Tracetask is close, clear cache finnish." );
            return;
          }

          // 如果跟踪会话已经挂起,则删除缓存中剩下的数据
          if ( isSuspend( stSesssionID ) )
          {

            debug
                    .info( "Tracetask is suspend, clear cache begining,stSesssionID="
                            + stSesssionID );
            int tmpCounter = 0;
            while( queue.size() > 0 )
            {
              msg = queue.take();

              if ( msg == null || !(msg instanceof String) )
              {
                continue;
              }
              String index = (String) msg; // 清空一下缓存
              RptTraceMsgCacheManager.getInstance().removeTraceMsg( index );
              tmpCounter++;
            }
            debug
                    .info( "Tracetask is suspend, clear cache finnish. record num :"
                            + tmpCounter );

          }

          // 其他情况,跟踪会话应该是正常激活状态
          msg = queue.take();

          /**
           * 如果为stop,并且长度刚好是0的时候, 则需要继续走下去,否则这边可能一直不能释放
           **/

          if ( msg == null || !(msg instanceof String) )
          {
            continue;
          }

          // 调用子类的处理方法进行数据处理
          processMessage( msg );
          count++;
          if ( count >= 10 )
          {
            Thread.sleep( 10 );
            count = 0;
          }

        }
        catch( Throwable e )
        {
          debug.error( e.getMessage(), e );

          continue;
        }
      }
    }
  }

  /**
   * 消息队列最大消息个数,这里默认为500个,有些消息对象占用内存比较大, 就需要子类减少这个值,并覆盖这个值,
   * 有些消息对象比较小,也可以覆盖这个值将其增大
   */
  private static final int MAX_QUEUE_SIZE = 500;

  /**
   * 在向队列中存放数据时,如果队列满而无法直接存放,则需进行多次尝试,
   * 以保证最大限度的保证队列中能将数据存放进去, 如果指定时间超过后仍不能存放成功,则失败
   */
  private static final int WAIT_TIME = 1000;

  /**
   * 最大线程数,此值应该在配置文件中配置,并为不同线程配置不同的线程个数, 目前暂时统一在这里指定 modify
   * by wchfu,2006-12- 11,将线程数从10个改为3个,10个线程可能会影响系统性能
   */
  private static final int MAX_THREAD = 1;

  private BoundedBuffer queue = null;

  private String threadID = "";

  private String stSesssionID = null;

  // 调试打印
  private static DebugPrn debug = new DebugPrn( AbstractThreadProxy.class
          .getName() );
}




public class CodeDecompressService
{
  /**
   * 日志打印
   */
  private static DebugPrn dMsg = new DebugPrn( CodeDecompressService.class
          .getName() );

  /**
   * 构造函数私有化
   *
   */
  private CodeDecompressService()
  {

  }

  /**
   * gzip解压缩byte[]流
   *
   * @param sourceBytes
   * @return
   * @throws EmsTraceException
   */
  public static byte[] unGzipBytes( byte[] sourceBytes )
          throws EmsTraceException
  {
    byte[] destBytes = new byte[0];
    ByteArrayInputStream fin = null;
    GZIPInputStream gzin = null;
    ByteArrayOutputStream fout = null;
    try
    {
      // 建立gzip压缩文件输入流
      fin = new ByteArrayInputStream( sourceBytes );
      // 建立gzip解压工作流
      gzin = new GZIPInputStream( fin );
      // 建立解压文件输出流
      fout = new ByteArrayOutputStream();
      byte[] buf = new byte[1024];
      int num;
      while( (num = gzin.read( buf, 0, buf.length )) != -1 )
      {
        fout.write( buf, 0, num );
      }
      destBytes = fout.toByteArray();
    }
    catch( Exception e )
    {
      throw new EmsTraceException( "Fail to unGzip bytes. " + e.getMessage(), e );
    }
    finally
    {
      // 释放压缩文件输入流
      if ( fin != null )
      {
        try
        {
          fin.close();
        }
        catch( IOException e )
        {
          dMsg.error( e.getMessage() );
        }
        fin = null;
      }
      // 释放gzip解压工作流
      if ( gzin != null )
      {
        try
        {
          gzin.close();
        }
        catch( IOException e )
        {
          dMsg.error( e.getMessage() );
        }
        gzin = null;
      }
      // 释放解压文件输出流
      if ( fout != null )
      {
        try
        {
          fout.close();
        }
        catch( IOException e )
        {
          dMsg.error( e.getMessage() );
        }
        fout = null;
      }
    }

    return destBytes;
  }

  /**
   * zlib解压缩byte[]流
   *
   * @param sourceBytes
   * @return
   * @throws EmsTraceException
   */
  public static byte[] unZlibBytes( byte[] sourceBytes )
          throws EmsTraceException
  {
    byte[] destBytes = null;
    // 初始化一个zlib解压器
    Inflater decompresser = new Inflater();
    // 初始化一个长度为1000的字节数组
    byte[] result = new byte[1024];
    ByteArrayOutputStream fout = null;
    try
    {
      // 将压缩字节数组放入解压器中
      decompresser.setInput( sourceBytes );
      // 建立解压文件输出流
      fout = new ByteArrayOutputStream();
      // 如果没到达压缩数据流的结尾,则继续解压缩
      while( !decompresser.finished() )
      {
        // 解压缩,并将结果存入result字节数组中
        int resultLength = decompresser.inflate( result );
        fout.write( result, 0, resultLength );
      }
      destBytes = fout.toByteArray();
    }
    catch( Exception e )
    {
      throw new EmsTraceException( "Fail to unZlib bytes. " + e.getMessage(), e );
    }
    finally
    {
      // 关闭解压缩器并放弃所有未处理的输入
      if ( decompresser != null )
      {
        decompresser.end();
        decompresser = null;
      }
      // 将字节数组置空
      result = null;
      // 释放解压文件输出流
      if ( fout != null )
      {
        try
        {
          fout.close();
        }
        catch( IOException e )
        {
          dMsg.error( e.getMessage() );
        }
        fout = null;
      }
    }

    return destBytes;
  }

}




public class RptTraceMsgProcessThread extends AbstractThreadProxy
{
  // 调试打印
  private static DebugPrn dMsg = new DebugPrn( RptTraceMsgProcessThread.class
          .getName() );

  private static int TRACEMSG_MAX_QUEUE_SIZE = 100000;

  private final static int MSG_HEAD_LENGTH = 4;

  private static final String threadID = RptTraceMsgProcessThread.class
          .getName();

  public RptTraceMsgProcessThread( String stSessionID )
  {
    super( threadID, stSessionID );
  }

  protected void processMessage( Object object )
  {
    String index = (String) object;

    TraceMsgInfoBlock msgInfoBlock = RptTraceMsgCacheManager.getInstance()
            .getTraceMsgInfo( index );

    // 获取跟踪消息包
    TraceRptMsgEmbInfo[] msgEmbInfos = msgInfoBlock.getMsgInfos();

    // 解析跟踪消息,并发送至概要解码框架处理
    try
    {
      parseTraceMsgInfos( msgEmbInfos );
    }
    catch( EmsTraceException e )
    {
      dMsg.error( e.getMessage(), e );
    }
    catch( Exception e )
    {
      dMsg.error( e.getMessage(), e );
    }

  }

  /**
   * 解析跟踪消息
   *
   * @param msgEmbInfos
   * @throws EmsTraceException
   */
  private void parseTraceMsgInfos( TraceRptMsgEmbInfo[] msgEmbInfos )
          throws EmsTraceException
  {
    //
    String stID = null;
    byte rptDataType = 0;
    for( int i = 0; i < msgEmbInfos.length; i++ )
    {
      // 跟踪任务号
      stID = msgEmbInfos[i].getStID();
      // 压缩类型的标志
      String reservA = msgEmbInfos[i].getReservA();
      byte isCompress = (reservA == null
              ? TraceRptMsgEmbInfo.COMPRESS_TYPE_UNDO
              : Byte.valueOf( reservA ));
      // 从0开始的字节下标
      String reservB = msgEmbInfos[i].getReservB();
      int startIndex = (reservB == null ? 0 : Integer.valueOf( reservB ));
      // 消息码流
      byte[] binData = msgEmbInfos[i].getBinData();
      // 字节序
      byte byteOrder = msgEmbInfos[i].getByteOrder();

      // 李玉鹏和OMM的se确认了一下:
      // 1)码流都按照BinData格式上报,如果是按照charData上报,也需要转换成byte[]数组;
      // 2)目前V3/V4都不分包,因此分包逻辑可以暂不实现;
      // 因此下面只需要处理二进制原始码流的解压缩即可
      // 按照压缩类型,对byte[]流进行解压缩
      binData = deCompressBytes( binData, isCompress );
      // 根据字节下标获取字节数组码流的有效字节数组
      binData = getValidBytes( binData, startIndex );

      // 消息类型byte数组 or char字符串
      rptDataType = msgEmbInfos[i].getRptDataType();

      switch( rptDataType )
      {
        // 二进制,BINDATA保存
        case TraceRptMsgEmbInfo.DATA_TYPE_BINDATA:
          // 解析byte数组消息
          parseByteMsg( stID, binData, byteOrder );
          break;
        case TraceRptMsgEmbInfo.DATA_TYPE_CHARDATA:
          // 预留,暂不处理
          break;
        default:
          throw new EmsTraceException( "Not support rptDataType, type="
                  + rptDataType );
      }
    }
  }

  /**
   * 解析byte数组消息, 注意:每条消息包含消息头和消息体两个部分,
   * 其中消息头为4字节长度的小字节序,表示消息体长度
   *
   * @param binData
   */
  private void parseByteMsg( String stID, byte[] binData, byte byteOrder )
  {
    dMsg.info( "stID=" + stID + ", binData length is " + binData.length );
    // 字节流长度至少要为4字节以上才有意义,才需要处理
    if ( binData.length <= MSG_HEAD_LENGTH )
    {
      dMsg.info( "binData length is less than " + MSG_HEAD_LENGTH
              + ", so neednot deal." );
      return;
    }

    InputStream in = null;
    try
    {
      in = new BufferedInputStream( new ByteArrayInputStream( binData ) );
      // 消息头,长度为4
      byte[] msgHead = new byte[MSG_HEAD_LENGTH];
      // 消息体
      byte[] msgBody = null;
      // 消息体长度
      int msgBodyLength = 0;
      FileInfoBean fileInfoBean = null;
      boolean isSimpleDecodeSucc = true;
      while( in.read( msgHead ) != -1 )
      {
        // 计算消息体的长度
        msgBodyLength = TraceCommonService.lBytesToInt( msgHead );
        //
        msgBody = new byte[msgBodyLength];
        in.read( msgBody );

        //
        DataMessage message = new DataMessage();
        message.setStID( stID );
        message.setMessage( msgBody );
        message.setByteOrder( byteOrder );

        // 将跟踪会话ID和字节码流,一起发送消息至概要解码框架
        isSimpleDecodeSucc = notifyDataReport( message );

        // 概要解码成功了,再写文件,否则不写文件
        if ( !isSimpleDecodeSucc )
        {
          continue;
        }

        // 将跟踪消息写文件
        try
        {
          fileInfoBean = new FileInfoBean();
          fileInfoBean.setStID( stID );
          fileInfoBean.setByteOrder( byteOrder );
          fileInfoBean.setMsgHead( msgHead );
          fileInfoBean.setBinData( msgBody );
          AbstractTraceMsgsFileService chainService = (DefaultTraceMsgsFileServiceImpl) WsfApplicationContext
                  .getService( DefaultTraceMsgsFileServiceImpl.SERVICE_ID );
          chainService.writeTraceMsgsToFile( fileInfoBean );
        }
        catch( EmsTraceException e )
        {
          // 写文件失败也不管了
          dMsg.error( "Fail to Write msgInfoBlock to file, stID=" + stID + ". "
                  + e.getMessage(), e );
        }
        catch( Throwable e )
        {
          // 写文件失败也不管了
          dMsg.error( "Fail to Write msgInfoBlock to file, stID=" + stID + ". "
                  + e.getMessage(), e );
        }
      }
    }
    catch( Exception e )
    {
      dMsg.error( e.getMessage(), e );
    }
    finally
    {
      if ( in != null )
      {
        try
        {
          in.close();
        }
        catch( IOException e )
        {
          dMsg.error( e.getMessage(), e );
        }
        in = null;
      }
    }
  }

  /**
   * 按照压缩类型,对byte[]流进行解压缩
   *
   * @param binData
   *          源byte[]流(压缩前的byte[]流)
   * @param isCompress
   *          压缩类型,参见COMPRESS_TYPE_常量
   * @return
   * @throws EmsTraceException
   */
  private byte[] deCompressBytes( byte[] binData, byte isCompress )
          throws EmsTraceException
  {
    byte[] deCompBinData = binData;
    // gzip解压缩byte[]流
    if ( isCompress == TraceRptMsgEmbInfo.COMPRESS_TYPE_GZIP )
    {
      deCompBinData = CodeDecompressService.unGzipBytes( binData );
    }
    // zlib解压缩byte[]流
    else if ( isCompress == TraceRptMsgEmbInfo.COMPRESS_TYPE_ZLIB )
    {
      deCompBinData = CodeDecompressService.unZlibBytes( binData );
    }

    return deCompBinData;
  }

  /**
   * 根据字节下标获取字节数组码流的有效字节数组
   *
   * @param binData
   *          字节数组码流
   * @param startIndex
   *          有效字节下标
   * @return
   */
  private byte[] getValidBytes( byte[] binData, int startIndex )
  {
    // 保护
    if ( startIndex < 0 )
    {
      startIndex = 0;
    }
    else if ( startIndex > binData.length - 1 )
    {
      startIndex = binData.length - 1;
    }

    // 如果为0,则全部字节数组有效
    if ( startIndex == 0 )
    {
      return binData;
    }

    // 取有效字节数组
    byte[] validBytes = new byte[binData.length - startIndex];
    System.arraycopy( binData, startIndex, validBytes, 0, binData.length
            - startIndex );

    return validBytes;
  }

  /**
   * 将跟踪会话ID和字节码流,一起发送消息至概要解码框架
   *
   * @param stID
   * @param binData
   */

  private boolean notifyDataReport( DataMessage message )
  {
    try
    {

      String stID = message.getStID();

      dMsg.info( "receive msg in frame,stID= " + stID );

      ISimpleDecoderView simpleDecoderView = WsfApplicationContext
              .getSimpleDecoderViewImpl( stID );

      // 下面这个循环主要是因为
      // 客户端先接收到码流消息,后接收到创建的确认成功消息,中间差了大概100ms;这样由于还没有打开跟踪结果界面,
      // 会导致码流被丢弃,原先设计也是这么考虑的,但是现在由于只有一条跟踪消息,也丢弃的话,则会误导用户以为没有跟踪到消息;
      // 所以,暂时用下面这个方法规避,循环等待5次,每次100ms,如果还是没有打开跟踪结果界面,则丢弃休息。
      int count = TraceBaseInfoConf.getInstance().getView_judge_count();
      int sleepTime = TraceBaseInfoConf.getInstance().getView_judge_sleep();

      for( int i = 0; i < count; i++ )
      {

        simpleDecoderView = WsfApplicationContext
                .getSimpleDecoderViewImpl( stID );

        /** 如果已经关闭,taskinfo则为空,则不需要再等待 ***/
        if ( simpleDecoderView == null
                && WsfApplicationContext.getTraceCacheSevice()
                        .getTraceTaskInfo( stID ) != null )
        {
          try
          {
            dMsg.info( "simple view(" + stID
                    + ") is not existent in workspace, so sleep 100ms" );
            Thread.sleep( sleepTime );
          }
          catch( InterruptedException e )
          {
            dMsg.info( e.getMessage() );
          }
        }
        else
        {
          break;
        }
      }

      if ( simpleDecoderView != null )
      {
        dMsg.info( "add data to simpledecoderview,stID= " + stID );

        /** 如果任务已经挂起,则不再转发到概要解码框架 ***/
        if ( "2".equalsIgnoreCase( WsfApplicationContext.getTraceCacheSevice()
                .getTraceTaskInfo( stID ).getReservC() ) )
        {
          dMsg.info( "task is suspend,stID= " + stID );
          return false;
        }

        /** 增加到表格 */
        simpleDecoderView.addData( message );
        return true;
      }
      else
      {
        dMsg.info( "simple view can not find ,stID= " + stID );
        return false;
      }

    }
    catch( Exception e )
    {
      dMsg.error( e.getMessage(), e );
      return false;
    }
    finally
    {
      /**
       * 消息处理完之后,清空一下缓存,本来提供有邓洁调用的,
       * 但是等会反馈这边是同步过程,所以由我自己调用
       */
      RptTraceMsgCacheManager.getInstance().removeTraceMsg( message.getIndex() );
    }

  }

  /**
   * 将低字节数组转换为int
   *
   * @param b
   *          byte[]
   * @return int
   */
  public static int lBytesToInt( byte[] b )
  {
    int s = 0;
    for( int i = 0; i < 3; i++ )
    {
      if ( b[3 - i] >= 0 )
      {
        s = s + b[3 - i];
      }
      else
      {
        s = s + 256 + b[3 - i];
      }
      s = s * 256;
    }
    if ( b[0] >= 0 )
    {
      s = s + b[0];
    }
    else
    {
      s = s + 256 + b[0];
    }
    return s;
  }

  /**
   * 子类可以覆盖这个方法,设置消息队列存放的最大消息个数
   *
   * @return
   */
  protected int getMaxQueueSize()
  {
    return TRACEMSG_MAX_QUEUE_SIZE;
  }

}
分享到:
评论
发表评论

文章已被作者锁定,不允许评论。

相关推荐

Global site tag (gtag.js) - Google Analytics