`
dcdc723
  • 浏览: 187345 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

cassandra CONNECT

    博客分类:
  • PHP
阅读更多

<?php

// Setting up nodes:
//
// CassandraConn::add_node('192.168.1.1', 9160);
// CassandraConn::add_node('192.168.1.2', 5000);
//

// Querying:
//
// $users = new CassandraCF('Keyspace1', 'Users');
// $users->insert('1', array('email' => 'hoan.tonthat@gmail.com', 'password' => 'test'));
// $users->get('1');
// $users->multiget(array(1, 2));
// $users->get_count('1');
// $users->get_range('1', '10');
// $users->remove('1');
// $users->remove('1', 'password');
//

class CassandraConn {
    const DEFAULT_THRIFT_PORT = 9160;
    
    static private $connections = array();
    static private $last_error;

    static public function add_node($host,
                                    $port=self::DEFAULT_THRIFT_PORT,
                                    $framed_transport=false,
                                    $send_timeout=null,
                                    $recv_timeout=null,
                                    $persist=false) {
        try {
            // Create Thrift transport and binary protocol cassandra client
            $socket = new TSocket($host, $port, $persist);
            if($send_timeout) $socket->setSendTimeout($send_timeout);
            if($recv_timeout) $socket->setRecvTimeout($recv_timeout);

            if($framed_transport) {
                $transport = new TFramedTransport($socket, true, true);
            } else {
                $transport = new TBufferedTransport($socket, 1024, 1024);
            }

            $client = new CassandraClient(new TBinaryProtocolAccelerated($transport));

            // Store it in the connections
            self::$connections[] = array(
                'transport' => $transport,
                'client'    => $client
            );

            // Done
            return TRUE;
        } catch (TException $tx) {
            self::$last_error = 'TException: '.$tx->getMessage() . "\n";
        }
        return FALSE;
    }

    // Default client
    static public function get_client($write_mode = false) {
        // * Try to connect to every cassandra node in order
        // * Failed connections will be retried
        // * Once a connection is opened, it stays open
        // * TODO: add random and round robin order
        // * TODO: add write-preferred and read-preferred nodes
        shuffle(self::$connections);
        foreach(self::$connections as $connection) {
            try {
                $transport = $connection['transport'];
                $client    = $connection['client'];

                if(!$transport->isOpen()) {
                    $transport->open();
                }

                return $client;
            } catch (TException $tx) {
                self::$last_error = 'TException: '.$tx->getMessage() . "\n";
                continue;
            }
        }

        throw new Exception("Could not connect to a cassandra server");
    }
}

class CassandraUtil {
    // UUID
    static public function uuid1($node="", $ns="") {
        return UUID::generate(UUID::UUID_TIME,UUID::FMT_STRING, $node, $ns);
    }

    // Time
    static public function get_time() {
        // By Zach Buller (zachbuller@gmail.com)
        $time1 = microtime();
        settype($time1, 'string'); //needs converted to string, otherwise will omit trailing zeroes
        $time2 = explode(" ", $time1);
        $time2[0] = preg_replace('/0./', '', $time2[0], 1);
        $time3 = ($time2[1].$time2[0])/100;
        return $time3;
    }
}

class CassandraCF {
    const DEFAULT_ROW_LIMIT = 100; // default max # of rows for get_range()
    const DEFAULT_COLUMN_TYPE = "UTF8Type";
    const DEFAULT_SUBCOLUMN_TYPE = null;

    public $keyspace;
    public $column_family;
    public $is_super;
    public $read_consistency_level;
    public $write_consistency_level;
    public $column_type; // CompareWith (TODO: actually use this)
    public $subcolumn_type; // CompareSubcolumnsWith (TODO: actually use this)
    public $parse_columns;

    /*
    BytesType: Simple sort by byte value. No validation is performed.
    AsciiType: Like BytesType, but validates that the input can be parsed as US-ASCII.
    UTF8Type: A string encoded as UTF8
    LongType: A 64bit long
    LexicalUUIDType: A 128bit UUID, compared lexically (by byte value)
    TimeUUIDType: a 128bit version 1 UUID, compared by timestamp
    */

    public function __construct($keyspace, $column_family,
                                $is_super=false,
                                $column_type=self::DEFAULT_COLUMN_TYPE,
                                $subcolumn_type=self::DEFAULT_SUBCOLUMN_TYPE,
                                $read_consistency_level=cassandra_ConsistencyLevel::ONE,
                                $write_consistency_level=cassandra_ConsistencyLevel::ZERO) {
        // Vars
        $this->keyspace = $keyspace;
        $this->column_family = $column_family;

        $this->is_super = $is_super;

        $this->column_type = $column_type;
        $this->subcolumn_type = $subcolumn_type;

        $this->read_consistency_level = $read_consistency_level;
        $this->write_consistency_level = $write_consistency_level;

        // Toggles parsing columns
        $this->parse_columns = true;
    }

    public function get($key, $super_column=NULL, $slice_start="", $slice_finish="", $column_reversed=False, $column_count=100) {
        $column_parent = new cassandra_ColumnParent();
        $column_parent->column_family = $this->column_family;
        $column_parent->super_column = $this->unparse_column_name($super_column, true);

        $slice_range = new cassandra_SliceRange();
        $slice_range->count = $column_count;
        $slice_range->reversed = $column_reversed;
        $slice_range->start  = $slice_start  ? $this->unparse_column_name($slice_start,  false) : "";
        $slice_range->finish = $slice_finish ? $this->unparse_column_name($slice_finish, false) : "";
        $predicate = new cassandra_SlicePredicate();
        $predicate->slice_range = $slice_range;

        $client = CassandraConn::get_client();
        $resp = $client->get_slice($this->keyspace, $key, $column_parent, $predicate, $this->read_consistency_level);

        if($super_column) {
            return $this->supercolumns_or_columns_to_array($resp, false);
        } else {
            return $this->supercolumns_or_columns_to_array($resp);
        }
    }

    public function multiget($keys, $slice_start="", $slice_finish="") {
        $column_parent = new cassandra_ColumnParent();
        $column_parent->column_family = $this->column_family;
        $column_parent->super_column = NULL;

        $slice_range = new cassandra_SliceRange();
        $slice_range->start  = $slice_start  ? $this->unparse_column_name($slice_start,  false) : "";
        $slice_range->finish = $slice_finish ? $this->unparse_column_name($slice_finish, false) : "";
        $predicate = new cassandra_SlicePredicate();
        $predicate->slice_range = $slice_range;

        $client = CassandraConn::get_client();
        $resp = $client->multiget_slice($this->keyspace, $keys, $column_parent, $predicate, $this->read_consistency_level);

        $ret = null;
//        foreach($keys as $sk => $k) {
//            $ret[$k] = $this->supercolumns_or_columns_to_array($resp[$k]);
//        }
        foreach($resp as $key => $val) {
            $ret[$key] = $this->supercolumns_or_columns_to_array($val);
        }
        return $ret;
    }

    public function get_count($key, $super_column=null) {
        $column_path = new cassandra_ColumnPath();
        $column_path->column_family = $this->column_family;
        $column_path->super_column = $super_column;

        $client = CassandraConn::get_client();
        $resp = $client->get_count($this->keyspace, $key, $column_path, $this->read_consistency_level);

        return $resp;
    }

    public function get_range($start_key="", $end_key="", $row_count=self::DEFAULT_ROW_LIMIT, $slice_start="", $slice_finish="") {
        $column_parent = new cassandra_ColumnParent();
        $column_parent->column_family = $this->column_family;
        $column_parent->super_column = NULL;

        $slice_range = new cassandra_SliceRange();
        $slice_range->start  = $slice_start  ? $this->unparse_column_name($slice_start,  true) : "";
        $slice_range->finish = $slice_finish ? $this->unparse_column_name($slice_finish, true) : "";
        $predicate = new cassandra_SlicePredicate();
        $predicate->slice_range = $slice_range;

        $key_range = new cassandra_KeyRange();
        $key_range->start_key = $start_key;
        $key_range->end_key   = $end_key;
        $key_range->count     = $row_count;

        $client = CassandraConn::get_client();
        $resp = $client->get_range_slices($this->keyspace, $column_parent, $predicate, $key_range, $this->read_consistency_level);

        return $this->keyslices_to_array($resp);
    }

    public function insert($key, $columns) {
        $timestamp = CassandraUtil::get_time();

        $cfmap = array();
        $cfmap[$key][$this->column_family] = $this->array_to_mutation($columns, $timestamp);

        $client = CassandraConn::get_client();
        $resp = $client->batch_mutate($this->keyspace, $cfmap, $this->write_consistency_level);

        return $resp;
    }

    public function remove($key, $column_name=null) {
        $timestamp = CassandraUtil::get_time();

        $column_path = new cassandra_ColumnPath();
        $column_path->column_family = $this->column_family;
        if($this->is_super) {
            $column_path->super_column = $this->unparse_column_name($column_name, true);
        } else {
            $column_path->column = $this->unparse_column_name($column_name, false);
        }

        $client = CassandraConn::get_client();
        $resp = $client->remove($this->keyspace, $key, $column_path, $timestamp, $this->write_consistency_level);

        return $resp;
    }

    // Wrappers
    public function get_list($key, $key_name='key', $slice_start="", $slice_finish="") {
        // Must be on supercols!
        $resp = $this->get($key, NULL, $slice_start, $slice_finish);
        $ret = array();
        foreach($resp as $_key => $_value) {
            $_value[$key_name] = $_key;
            $ret[] = $_value;
        }
        return $ret;
    }

    public function get_range_list($key_name='key', $start_key="", $end_key="",
                                   $row_count=self::DEFAULT_ROW_LIMIT, $slice_start="", $slice_finish="") {
        $resp = $this->get_range($start_key, $end_key, $row_count, $slice_start, $slice_finish);
        $ret = array();
        foreach($resp as $_key => $_value) {
            if(!empty($_value)) { // filter nulls
                $_value[$key_name] = $_key;
                $ret[] = $_value;
            }
        }
        return $ret;
    }

    public function multiget_list($keys, $key_name='key', $slice_start="", $slice_finish="") {
        $resp = $this->multiget($keys, $slice_start, $slice_finish);
        $ret = array();
        foreach($resp as $_key => $_value) {
            $_value[$key_name] = $_key;
            $ret[] = $_value;
        }
        return $ret;
    }

    // Helpers for parsing Cassandra's thrift objects into PHP arrays
    public function keyslices_to_array($keyslices) {
        $ret = null;
        foreach($keyslices as $keyslice) {
            $key     = $keyslice->key;
            $columns = $keyslice->columns;

            $ret[$key] = $this->supercolumns_or_columns_to_array($columns);
        }
        return $ret;
    }

    public function supercolumns_or_columns_to_array($array_of_c_or_sc, $parse_as_columns=true) {
        $ret = null;
        foreach($array_of_c_or_sc as $c_or_sc) {
            if($c_or_sc->column) { // normal columns
                $name  = $this->parse_column_name($c_or_sc->column->name, $parse_as_columns);
                $value = $c_or_sc->column->value;

                $ret[$name] = $value;
            } else if($c_or_sc->super_column) { // super columns
                $name    = $this->parse_column_name($c_or_sc->super_column->name, $parse_as_columns);
                $columns = $c_or_sc->super_column->columns;

                $ret[$name] = $this->columns_to_array($columns);
            }
        }
        return $ret;
    }

    public function columns_to_array($array_of_c) {
        $ret = null;
        foreach($array_of_c as $c) {
            $name  = $this->parse_column_name($c->name, false);
            $value = $c->value;

            $ret[$name] = $value;
        }
        return $ret;
    }

    // Helpers for turning PHP arrays into Cassandra's thrift objects
    public function array_to_mutation($array, $timestamp=null) {
        if(empty($timestamp)) $timestamp = CassandraUtil::get_time();

        $c_or_sc = $this->array_to_supercolumns_or_columns($array, $timestamp);
        $ret = null;
        foreach($c_or_sc as $row) {
            $mutation = new cassandra_Mutation();
            $mutation->column_or_supercolumn = $row;
            $ret[] = $mutation;
        }
        return $ret;
    }
    
    public function array_to_supercolumns_or_columns($array, $timestamp=null) {
        if(empty($timestamp)) $timestamp = CassandraUtil::get_time();

        $ret = null;
        foreach($array as $name => $value) {
            $c_or_sc = new cassandra_ColumnOrSuperColumn();
            if(is_array($value)) {
                $c_or_sc->super_column = new cassandra_SuperColumn();
                $c_or_sc->super_column->name = $this->unparse_column_name($name, true);
                $c_or_sc->super_column->columns = $this->array_to_columns($value, $timestamp);
                $c_or_sc->super_column->timestamp = $timestamp;
            } else {
                $c_or_sc = new cassandra_ColumnOrSuperColumn();
                $c_or_sc->column = new cassandra_Column();
                $c_or_sc->column->name = $this->unparse_column_name($name, true);
                $c_or_sc->column->value = $this->to_column_value($value);;
                $c_or_sc->column->timestamp = $timestamp;
            }
            $ret[] = $c_or_sc;
        }

        return $ret;
    }

    public function array_to_columns($array, $timestamp=null) {
        if(empty($timestamp)) $timestamp = CassandraUtil::get_time();

        $ret = null;
        foreach($array as $name => $value) {
            $column = new cassandra_Column();
            $column->name = $this->unparse_column_name($name, false);
            $column->value = $this->to_column_value($value);
            $column->timestamp = $timestamp;

            $ret[] = $column;
        }
        return $ret;
    }

    public function to_column_value($thing) {
        if($thing === null) return "";

        return $thing;
    }

    // ARGH
    public function parse_column_name($column_name, $is_column=true) {
        if(!$this->parse_columns) return $column_name;
        if(!$column_name) return NULL;

        $type = $is_column ? $this->column_type : $this->subcolumn_type;
        if($type == "LexicalUUIDType" || $type == "TimeUUIDType") {
            return UUID::convert($column_name, UUID::FMT_BINARY, UUID::FMT_STRING);
        } else if($type == "LongType") {
            return $this->unpack_longtype($column_name);
        } else {
            return $column_name;
        }
    }

    public function unparse_column_name($column_name, $is_column=true) {
        if(!$this->parse_columns) return $column_name;
        if(!$column_name) return NULL;

        $type = $is_column ? $this->column_type : $this->subcolumn_type;
        if($type == "LexicalUUIDType" || $type == "TimeUUIDType") {
            return UUID::convert($column_name, UUID::FMT_STRING, UUID::FMT_BINARY);
        } else if($type == "LongType") {
            return $this->pack_longtype($column_name);
        } else {
            return $column_name;
        }
    }
    
    // See http://webcache.googleusercontent.com/search?q=cache:9jjbeSy434UJ:wiki.apache.org/cassandra/FAQ+cassandra+php+%22A+long+is+exactly+8+bytes%22&cd=1&hl=en&ct=clnk&gl=us
    public function pack_longtype($x) {
        return pack('C8',
            ($x >> 56) & 0xff, ($x >> 48) & 0xff, ($x >> 40) & 0xff, ($x >> 32) & 0xff,
            ($x >> 24) & 0xff, ($x >> 16) & 0xff, ($x >> 8) & 0xff, $x & 0xff
        );
    }

    public function unpack_longtype($x) {
        $a = unpack('C8', $x);
        return ($a[1] << 56) + ($a[2] << 48) + ($a[3] << 40) + ($a[4] << 32) + ($a[5] << 24) + ($a[6] << 16) + ($a[7] << 8) + $a[8];
    }
}
 
分享到:
评论

相关推荐

    cassandra jdbc jar包

    根据cassandra 的一个client jdbc源码编译的官方jar包,没有任何修改,官方源码导出,可以使用sql形式进行操作cassandra,使用时请结合Cassandra其他必须jar包测试使用

    connect-cassandra-cql:使用Cassandra CQL3二进制协议进行连接的会话存储

    npm install connect-cassandra-cql 用法 快递4 var express = require ( 'express' ) , cookieParser = require ( 'cookie-parser' ) , session = require ( 'express-session' ) , CassandraCqlStore = ...

    cassandra cli 命令 大全

    Cassandra CLI是Apache Cassandra数据库系统的一个命令行工具,它提供了与Cassandra集群交互的能力,包括连接到远程节点、创建或更新模式(schema)、设置和检索记录及列,以及查询节点和集群元数据。这个工具主要...

    java NoSql Cassandra hector

    Session session = cluster.connect(); ``` 3. 定义Keyspace和ColumnFamily:Cassandra中的数据存储在Keyspace(类似数据库)和ColumnFamily(类似表)中。Hector提供了相应的API来定义它们。 ```java KeyspaceDef...

    JAVA操作cassandra数据库

    Session session = cluster.connect("keyspace_name"); // 替换为你的键空间名 ``` 这里的`keyspace_name`是Cassandra中的逻辑分区,类似于传统数据库的数据库。 Cassandra的数据模型基于列族(Column Family),在...

    windows下安装cassandra与C#访问配置

    connect localhost/9160 ``` 连接成功后可以进行基本的数据操作,例如: ```bash cassandra&gt; set Keyspace1.Standard1['jsmith']['first']='John' Value inserted. cassandra&gt; set Keyspace1.Standard1['...

    Java连接cassandra实现简单的增删查demo

    在Java编程环境中,连接Cassandra数据库并实现基本的增、删、查操作是常见的任务。Cassandra是一款分布式NoSQL数据库,常用于处理大规模数据。在这个示例中,我们将探讨如何通过Java来操作Cassandra数据库。 首先,...

    java导出cassandra数据

    Session session = cluster.connect("your_keyspace"); ``` 3. **查询数据** 使用`session.execute()`方法执行CQL(Cassandra查询语言)语句,获取表中的数据。例如,导出整个表的数据: ```java String query...

    cassandra-driver-3.11.0.tar.gz

    session = cluster.connect() # 执行CQL查询 rows = session.execute('SELECT * FROM my_keyspace.my_table') # 处理查询结果 for row in rows: print(row) # 关闭连接 session.shutdown() cluster.shutdown() `...

    cassandra数据库 java链接 jar包

    Session session = cluster.connect(); ``` 4. **执行CQL查询**: `Session`对象提供了执行CQL查询的方法。例如,创建一个新的表: ```java session.execute("CREATE KEYSPACE my_keyspace WITH replication = ...

    Cassandra Java Client.zip

    Session session = cluster.connect(); ``` 四、CQL操作 Cassandra使用CQL(Cassandra Query Language)作为其SQL-like语言。通过Session对象,我们可以执行CQL语句,包括数据的插入、查询、更新和删除。 ```java ...

    Python库 | cassandra-driver-3.21.0.tar.gz

    session = cluster.connect() # 执行查询 query = SimpleStatement("SELECT * FROM keyspace.table") rows = session.execute(query) # 获取表元数据 table_metadata = cluster.metadata.keyspaces['keyspace']....

    Python库 | cassandra_driver-3.23.0-cp36-cp36m-win_amd64.whl

    session = cluster.connect() # 执行CQL查询 rows = session.execute('SELECT * FROM my_keyspace.my_table') # 处理查询结果 for row in rows: print(row) # 关闭会话和集群 session.shutdown() cluster....

    cassandra-java-example:一堆使用 Apache Cassandra 和 Datastax Java 驱动程序的 Apache Cassandra 示例

    Session session = cluster.connect(); ``` 4. **创建表** 在 Cassandra 中,数据存储在表(Keyspace 中的 Table)中。可以使用 `session.execute()` 方法执行 CQL 命令来创建表: ```java session.execute( ...

    Python-一个现代功能丰富和高度可调优的Cassandra和DataStax的Python客户端库

    session = cluster.connect('my_keyspace') # 执行CQL查询 rows = session.execute('SELECT * FROM my_table') for row in rows: print(row.column1, row.column2) # 关闭连接 cluster.shutdown() ``` **高级...

    elixir-cassandra:Apache Cassandra的Elixir客户端

    session = Cassandra.Cluster.connect(cluster, "your_keyspace") ``` 一旦有了会话,你就可以执行CQL查询了。 **三、执行CQL查询** Elixir-Cassandra支持直接执行CQL语句,包括插入、更新、删除和选择数据。以下...

    cassandra-java-client:在Java Cassandra集群上执行CRUD活动的简单Java Eclipse Maven项目

    Session session = cluster.connect("your_keyspace"); // 替换为你的Keyspace名称 ``` **执行CRUD操作** 1. **创建(Create)**: 使用`session.execute()`方法执行CQL创建语句,例如创建表: ```java session....

    crystal-cassandra:Crystal的Cassandra驱动程序

    session = client.connect # 执行一个简单的CQL查询 result = session.execute("SELECT * FROM users") result.each do |row| puts "User ID: #{row["id"]}, Name: #{row["name"]}" end session.close ``` **...

Global site tag (gtag.js) - Google Analytics