程序師世界是廣大編程愛好者互助、分享、學習的平台,程序師世界有你更精彩!
首頁
編程語言
C語言|JAVA編程
Python編程
網頁編程
ASP編程|PHP編程
JSP編程
數據庫知識
MYSQL數據庫|SqlServer數據庫
Oracle數據庫|DB2數據庫
 程式師世界 >> 數據庫知識 >> DB2數據庫 >> DB2教程 >> 在Hbase Endpoint Coprocessor中使用coprocessor Proxy操作例子與問題解析

在Hbase Endpoint Coprocessor中使用coprocessor Proxy操作例子與問題解析

編輯:DB2教程

在Hbase Endpoint Coprocessor中使用coprocessor Proxy操作例子與問題解析


一、先說注意事項吧:

1、Coprocessor啟動有三種方式:配置文件、shell和程序中指定,我使用的是程序指定:

 

    static {
        EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
        HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
        family.setInMemory(true);
        family.setMaxVersions(1);
        EP_TABLE_DISCRIPTOR.addFamily(family);
        try {
            EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
        } catch (IOException ioe) {

        }

上段代碼中的addCoprocessor就是指定該表啟動coprocessor操作。但前提是必須重啟HBase才能把jar包載入進來。

 

2、如果客戶端連接後出現如下問題:No matching handler **** for protocol in *** region,說明jar包還沒有載入到HBaes中,確保HBase已經重啟,另外檢查代碼中addCoprocessor("ict.wde.test.RowCountServer");的類名“RowCountServer”是否寫正確了

二、說下步驟

2.1編寫服務端代碼:

1)接口類(固定格式)

 

package ict.wde.test;

import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;

import java.io.File;
import java.io.IOException;

/**
 * Created by Michael on 2015/6/22.
 */
public interface RowCountProtocol extends Coprocessor, CoprocessorProtocol {

    public long getRowCount() throws IOException;

    public long getRowCount(Filter filter) throws IOException;

    public String getStr() throws IOException;

    //public long getKeyValue() throws IOException;
}
2)真正起作用的類
package ict.wde.test;

import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;

import java.io.IOException;

/**
 * Created by Michael on 2015/6/27.
 */
public class RowCountServer implements RowCountProtocol {

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {

    }

    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {

    }

    @Override
    public ProtocolSignature getProtocolSignature(String protocol,
                                                  long clientVersion, int clientMethodsHash) throws IOException {
        return new ProtocolSignature(3, null);
    }

    @Override
    public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
        return 3;
    }

    @Override
    public long getRowCount() throws IOException {
        return this.getRowCount(new FirstKeyOnlyFilter());
    }

    @Override
    public long getRowCount(Filter filter) throws IOException {
        return this.getRowCount(filter, false);
    }

    @Override
    public String getStr() throws IOException {
        String name = "Hello Doctor Michael Zhang, again!";
        return name;
    }

//    @Override
//    public long getKeyValueCount() {
//        return 0;
//    }

    public long getRowCount(Filter filter, boolean countKeyValue) throws IOException {
        Scan scan = new Scan();
        scan.setMaxVersions(1);
        if (filter != null) {
            scan.setFilter(filter);
        }

        return 1;
    }

}

上述兩個類打包jar後放入hbase的lib目錄下

 

2.2客戶端代碼

 

import ict.wde.test.RowCountProtocol;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.filter.Filter;

import java.io.IOException;

/**
 * Created by Michael on 2015/6/30.
 */
public class EndpointTestClient {

    private final HTableInterface table;
    private final Configuration conf;
    private final RowCountProtocol server;

    private static final HTableDescriptor EP_TABLE_DISCRIPTOR;

    static {
        EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
        HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
        family.setInMemory(true);
        family.setMaxVersions(1);
        EP_TABLE_DISCRIPTOR.addFamily(family);
        try {
            EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
        } catch (IOException ioe) {

        }
    }

    public EndpointTestClient(Configuration config) throws IOException {
        conf = config;
        table = initTidTable();
        server = table.coprocessorProxy(RowCountProtocol.class, "0".getBytes());
    }

    private HTableInterface initTidTable() throws IOException {
        HBaseAdmin admin = new HBaseAdmin(conf);
        if (!admin.tableExists("epTest")) {
            admin.createTable(EP_TABLE_DISCRIPTOR);
        }
        admin.close();
        return new HTable(conf, "epTest");
    }

    public String getStr() throws IOException {
        return server.getStr();
    }
}
啟動類:
import org.apache.hadoop.conf.Configuration;

import java.io.IOException;

/**
 * Created by Michael on 2015/6/22.
 */
public class EndpointExample {

//    private final HTableInterface table;
//    private static final Configuration conf;
//    private static final HTableDescriptor EP_TABLE_DISCRIPTOR;
//
//    static {
//        conf = new Configuration();
//        conf.set("hbase.zookeeper.quorum", "ccf04:2181");
//
//        EP_TABLE_DISCRIPTOR = new HTableDescriptor("epTest");
//        HColumnDescriptor family = new HColumnDescriptor("_tis".getBytes());
//        family.setInMemory(true);
//        family.setMaxVersions(1);
//        EP_TABLE_DISCRIPTOR.addFamily(family);
//        try {
//            EP_TABLE_DISCRIPTOR.addCoprocessor("ict.wde.test.RowCountServer");
//        } catch (IOException ioe) {
//
//        }
//
//        table = initTidTable();
//    }
//
//    private HTableInterface initTidTable() throws IOException {
//        HBaseAdmin admin = new HBaseAdmin(conf);
//        if (!admin.tableExists("epTest")) {
//            admin.createTable(EP_TABLE_DISCRIPTOR);
//        }
//        admin.close();
//        return new HTable(conf, "epTest");
//    }

    public static void main(String[] agrs) throws IOException {

        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "ccf04:2181");

        EndpointTestClient client = new EndpointTestClient(conf);
        String name = client.getStr();
        System.out.println(name);

    }
}

  1. 上一頁:
  2. 下一頁:
Copyright © 程式師世界 All Rights Reserved