Go 语言操作与扫描 Hbase 实例



记录纯go语言的gohbase客户端的扫描操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package main

import (
"github.com/tsuna/gohbase"
"github.com/tsuna/gohbase/hrpc"
"context"
"io"
"fmt"
"github.com/tsuna/gohbase/filter"
"strconv"
"time"
)

const table = "user"

func beforeMiniTimeStamps(beforeMini int) string {
//当前时刻某分钟之前的时间戳
return strconv.Itoa(int(time.Now().Add(- time.Duration(beforeMini) * time.Minute).UnixNano() / 1000000))
}
func fetch() []*hrpc.Result {
client := gohbase.NewClient("hmaster.shise.com,rm.shise.com,nn.shise.com")
//client := gohbase.NewClient("wwj.shise.com,czn.shise.com,czn.shise.com")
// 列族
family := hrpc.Families(map[string][]string{"c": nil})

// 全局hbase filter时间间隔
//timeRange := hrpc.TimeRange(time.Now().Add(- time.Duration(minute)*time.Minute), time.Now())

// 某列value的filter
notRecommendFilter := filter.NewSingleColumnValueFilter([]byte("c"),
[]byte("notRecommend"),
filter.NotEqual,
filter.NewBinaryComparator(filter.NewByteArrayComparable([]byte("true"))),
true,
true)
violationFilter := filter.NewSingleColumnValueFilter([]byte("c"),
[]byte("violation"),
filter.NotEqual,
filter.NewBinaryComparator(filter.NewByteArrayComparable([]byte("true"))),
true,
true)
// filter某列value的时间戳
timeStartFilter := filter.NewSingleColumnValueFilter([]byte("c"),
[]byte("createDate"),
filter.Greater,
filter.NewBinaryComparator(filter.NewByteArrayComparable([]byte(beforeMiniTimeStamps(4*60)))),
true,
true)
timeEndFilter := filter.NewSingleColumnValueFilter([]byte("c"),
[]byte("createDate"),
filter.Less,
filter.NewBinaryComparator(filter.NewByteArrayComparable([]byte(beforeMiniTimeStamps(2*60)))),
true,
true)

//filter 列表
filters := filter.NewList(filter.MustPassAll, notRecommendFilter, violationFilter, timeStartFilter, timeEndFilter)
//创建scan对象
scan, _ := hrpc.NewScanStr(context.Background(), table, family, hrpc.Filters(filters))

var rsp []*hrpc.Result
scanner := client.Scan(scan)
for {
res, err := scanner.Next()
if err == io.EOF {
break
}
if err != nil {
print(err)
}
if hasHeadImage(res) {
rsp = append(rsp, res)
}
}
return rsp
}
func hasHeadImage(res *hrpc.Result) bool {
return true
}
func main() {
rsp := fetch()
for _, item := range rsp {
fmt.Println(*item)
break
}
fmt.Println(len(rsp))
}