{ ConcurrentHashMap,HBaseConnectionEntity idelConnections; ConcurrentHashMap,HBaseConnectionEntity activeConnections; initSize; maxSize; AtomicInteger idelSize AtomicInteger(); AtomicInteger activeSize AtomicInteger(); HBaseConnectionPool instance; Lock lock ReentrantLock(); object (); isShutdown; HBaseConnectionPool( initSize, maxSize){ .initSizeinitSize; .maxSizemaxSize; idelConnections ConcurrentHashMap,HBaseConnectionEntity(); activeConnections ConcurrentHashMap,HBaseConnectionEntity(); initConnections(); HBaseDetectFailConnection().start(); } HBaseConnectionEntity getConnection(){ (isShutdown){ RuntimeException(); } lock.lock(); { (idelSize.get()){ (idelConnections.size()){ RuntimeException(); } Entry,HBaseConnectionEntity entryidelConnections.entrySet().iterator().next(); keyentry.getKey(); HBaseConnectionEntity entityentry.getValue(); entity.setStatus(HBase_Connection_Status.active); idelConnections.remove(key); idelSize.decrementAndGet(); (entity.getConnection().isClosed()){ getConnection(); } activeConnections.put(key, entity); activeSize.incrementAndGet(); entity; } }{ lock.unlock(); } (activeSize.get()maxSize){ RuntimeException(); } (activeSize.get()maxSize){ (object) { { object.wait(); } (InterruptedException e) { e.printStackTrace(); } } getConnection(); } (isShutdown){ RuntimeException(); } Connection connHBaseUtils.getConnection(); idUUID.randomUUID().toString(); HBaseConnectionEntity entity HBaseConnectionEntity(); entity.setId(id); entity.setConnection(conn); entity.setStatus(HBase_Connection_Status.active); activeConnections.put(id, entity); activeSize.incrementAndGet(); entity; } initConnections(){ ( i;i.initSize;i){ HBaseConnectionEntity entity HBaseConnectionEntity(); idUUID.randomUUID().toString(); entity.setId(id); Connection connHBaseUtils.getConnection(); (conn){ ; } entity.setConnection(conn); entity.setStatus(HBase_Connection_Status.idel); idelConnections.put(id, entity); idelSize.getAndAdd(); } } HBaseConnectionPool getInstance(){ (isShutdown){ RuntimeException(); } (instance){ instance; } getInstance(,); } HBaseConnectionPool getInstance( initSize, maxSize){ (isShutdown){ RuntimeException(); } (initSizemaxSize){ RuntimeException(); } (initSizemaxSize){ initSizemaxSize; } (ConnectionPool.) { (instance){ instance HBaseConnectionPool(initSize,maxSize); } } instance; } releaseConnection( id){ (isShutdown){ RuntimeException(); } (idelSize.get()maxSize){ HBaseUtils.closeConnection(activeConnections.remove(id).getConnection()); }{ HBaseConnectionEntity entityactiveConnections.remove(id); entity.setStatus(HBase_Connection_Status.idel); idelConnections.put(id, entity); idelSize.incrementAndGet(); activeSize.decrementAndGet(); (object) { object.notify(); } } } shutdown(){ isShutdown; (object) { object.notifyAll(); } Iterator idelItidelConnections.keySet().iterator(); (idelIt.hasNext()){ keyidelIt.next(); HBaseConnectionEntity entityidelConnections.get(key); HBaseUtils.closeConnection(entity.getConnection()); } IteratoractiveItactiveConnections.keySet().iterator(); (activeIt.hasNext()){ keyactiveIt.next(); HBaseConnectionEntity entityactiveConnections.get(key); HBaseUtils.closeConnection(entity.getConnection()); } initSize; maxSize; idelSize AtomicInteger(); activeSize AtomicInteger(); } getIdelSize(){ .idelSize.get(); } getActiveSize(){ .activeSize.get(); } { id; Connection connection; HBase_Connection_Status status; Connection getConnection() { connection; } setConnection(Connection connection) { .connection connection; } HBase_Connection_Status getStatus() { status; } setStatus(HBase_Connection_Status status) { .status status; } getId() { id; } setId( id) { .id id; } } { idel,active,close } Thread{ run() { Iterator itIdelidelConnections.keySet().iterator(); (itIdel.hasNext()){ key itIdel.next(); HBaseConnectionEntity entityidelConnections.get(key); (entity.getConnection().isClosed()){ idelConnections.remove(key); idelSize.decrementAndGet(); } } Iterator itActiveactiveConnections.keySet().iterator(); (itActive.hasNext()){ keyitActive.next(); HBaseConnectionEntity entityactiveConnections.get(key); (entity.getConnection().isClosed()){ activeConnections.remove(key); activeSize.decrementAndGet(); } } { Thread.sleep(); } (InterruptedException e) { e.printStackTrace(); } } } }
{ Configuration conf HBaseConfiguration.create(); ExecutorService poolxExecutors.newFixedThreadPool(); Connection getConnection(){ i; Connection conn; { { connConnectionFactory.createConnection(conf, poolx); (conn){ ; } Thread.sleep(); i; } (InterruptedException e){ e.printStackTrace(); } (IOException e) { e.printStackTrace(); } }(conni); conn; } closeConnection(Connection connection){ { connection.close(); poolx.shutdownNow(); } (IOException e) { e.printStackTrace(); } } createTable(Connection connection, tableName, [] columns) IOException { Admin admin ; { admin connection.getAdmin(); TableName nameTableName.valueOf(tableName); (admin.tableExists(name)) { admin.disableTable(name); admin.deleteTable(name); } { HTableDescriptor desc HTableDescriptor(); desc.setName(TableName.valueOf(tableName)); ( column : columns) { desc.addFamily( HColumnDescriptor(column)); } admin.createTable(desc); } }{ admin.close(); } } createTable(Connection connection,Class entityClass){ hbase_nameEntityUtils.getTargetTableName(entityClass); Field[]fieldsentityClass.getDeclaredFields(); Set fimalySet HashSet(); (Field field:fields){ Column columnfield.getAnnotation(Column.); fimalySet.add(column.family()); } [] familysfimalySet.toArray( []{}); { createTable(connection,hbase_name,familys); } (IOException e) { e.printStackTrace(); } } insertRecord(Connection connection, tableName, row, columnFamily, [] columns, [] values) { Table table; { TableName nameTableName.valueOf(tableName); table connection.getTable(name); Put put Put(Bytes.toBytes(row)); ( i ; i columns.length; i) { put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(.valueOf(columns[i])), Bytes.toBytes(values[i])); table.put(put); } } (IOException e) { e.printStackTrace(); } { { (table){ table.close(); } } (IOException e) { e.printStackTrace(); } } } Put convertEntityToPut(BaseEntity obj) IllegalArgumentException, IllegalAccessException{ Field[]fieldsobj.getClass().getDeclaredFields(); Field rowKeyField; (Field field:fields){ (field.isAnnotationPresent(RowKey.)){ rowKeyFieldfield; rowKeyField.setAccessible(); } } (rowKeyField){ RuntimeException(); } Put p Put(Bytes.toBytes(.valueOf(rowKeyField.get(obj)))); (Field field:fields){ (fieldrowKeyField){ Column columnfield.getAnnotation(Column.); familycolumn.family(); colcolumn.target_column(); (col.equals()){ colfield.getName(); } field.setAccessible(); valuefield.get(obj); typefield.getType().toString(); typetype.substring(type.lastIndexOf()); (type.equals()){ rdbTypecolumn.rdb_type(); Date tmpValue(Date)value; (tmpValue){ p.addColumn(Bytes.toBytes(family), Bytes.toBytes(col), ); ; } transValue; (rdbType.equals()){ SimpleDateFormat sdf SimpleDateFormat(); transValuesdf.format(tmpValue); } (rdbType.equals()){ SimpleDateFormat sdf SimpleDateFormat(); transValuesdf.format(tmpValue); } (rdbType.equals()){ SimpleDateFormat sdf SimpleDateFormat(); transValuesdf.format(tmpValue); }{ RuntimeException(); } p.addColumn(Bytes.toBytes(family), Bytes.toBytes(col), Bytes.toBytes(transValue)); } (type.equals()){ p.addColumn(Bytes.toBytes(family), Bytes.toBytes(col), ([])value); }{ p.addColumn(Bytes.toBytes(family), Bytes.toBytes(col), EntityUtils.convertJavaTypeToByteArray(value)); } } } p; } ListBaseEntity convertResultToEntity(ListResult resultList,Class cls) Exception, IllegalAccessException{ ListBaseEntity entityList ArrayListBaseEntity(); (Result result : resultList){ BaseEntity entity(BaseEntity)cls.newInstance(); EntityUtils.setRowKeyToEntity(result.getRow(),entity); (Cell cell : result.rawCells()) { EntityUtils.setHBaseCellToEntity(cell,entity); } entityList.add(entity); } entityList; } batchInsertRecord(Connection connection, tableName,ListBaseEntity entityList) IOException, IllegalArgumentException, IllegalAccessException{ BufferedMutator table; { table connection.getBufferedMutator(TableName.valueOf(tableName)); ListMutation mutations ArrayListMutation(); (BaseEntity entity:entityList){ mutations.add(convertEntityToPut(entity)); } table.mutate(mutations); table.flush(); } { { (table){ table.close(); } } (IOException e) { e.printStackTrace(); } } } deleteRow(Connection connection, tablename, rowkey) IOException { Table table; { TableName nameTableName.valueOf(tablename); table connection.getTable(name); ListDelete list ArrayListDelete(); Delete d1 Delete(rowkey.getBytes()); list.add(d1); table.delete(list); } { { (table){ table.close(); } } (IOException e) { e.printStackTrace(); } } } BaseEntity selectRow(Connection connection, tablename, rowKey,Class cls) IOException { TableName nameTableName.valueOf(tablename); Table table connection.getTable(name); Get g Get(rowKey.getBytes()); Result rs table.get(g); BaseEntity entity; { EntityUtils.setRowKeyToEntity(rs.getRow(),entity); entity (BaseEntity)cls.newInstance(); (Cell cell : rs.rawCells()) { EntityUtils.setHBaseCellToEntity(cell,entity); } entity; } (InstantiationException e) { e.printStackTrace(); } (IllegalAccessException e) { e.printStackTrace(); } { { (table){ table.close(); } } (IOException e) { e.printStackTrace(); } } entity; } ListBaseEntity scanAllRecord(Connection connection, tablename,Class cls) { Table table; { ListBaseEntity entityList ArrayListBaseEntity(); TableName nameTableName.valueOf(tablename); table connection.getTable(name); Scan s Scan(); ResultScanner rs table.getScanner(s); (Result result : rs){ BaseEntity entity(BaseEntity)cls.newInstance(); EntityUtils.setRowKeyToEntity(result.getRow(),entity); (Cell cell : result.rawCells()) { EntityUtils.setHBaseCellToEntity(cell,entity); entityList.add(entity); } } entityList; } (IOException e) { e.printStackTrace(); } (InstantiationException e) { e.printStackTrace(); } (IllegalAccessException e) { e.printStackTrace(); } { { (table){ table.close(); } } (IOException e) { e.printStackTrace(); } } ; } deleteTable(Connection connection,Class cls) { Admin admin; { tablenameEntityUtils.getTargetTableName(cls); TableName nameTableName.valueOf(tablename); admin connection.getAdmin(); (admin.tableExists(name)){ admin.disableTable(name); admin.deleteTable(name); } } (MasterNotRunningException e) { e.printStackTrace(); } (ZooKeeperConnectionException e) { e.printStackTrace(); } (IOException e) { e.printStackTrace(); } { { (admin){ admin.close(); } } (IOException e) { e.printStackTrace(); } } } HBasePageModel scanResultByPageFilter(Connection connection, tableName, [] startRowKey, [] endRowKey, FilterList filterList, maxVersions, HBasePageModel pageModel) { (pageModel ) { pageModel HBasePageModel(); } (maxVersions ) { maxVersions .MIN_VALUE; } pageModel.initStartTime(); pageModel.initEndTime(); (StringUtils.isBlank(tableName)) { pageModel; } Table table ; { table connection.getTable(TableName.valueOf(tableName)); tempPageSize pageModel.getPageSize(); isEmptyStartRowKey ; (startRowKey ) { Result firstResult selectFirstResultRow(connection,tableName, filterList); (firstResult.isEmpty()) { pageModel; } startRowKey firstResult.getRow(); } (pageModel.getPageStartRowKey() ) { isEmptyStartRowKey ; pageModel.setPageStartRowKey(startRowKey); } { (pageModel.getPageEndRowKey() ) { pageModel.setPageStartRowKey(pageModel.getPageEndRowKey()); } tempPageSize ; } Scan scan Scan(); scan.setStartRow(pageModel.getPageStartRowKey()); (endRowKey ) { scan.setStopRow(endRowKey); } PageFilter pageFilter PageFilter(pageModel.getPageSize() ); (filterList ) { filterList.addFilter(pageFilter); scan.setFilter(filterList); } { scan.setFilter(pageFilter); } (maxVersions .MAX_VALUE) { scan.setMaxVersions(); } (maxVersions .MIN_VALUE) { } { scan.setMaxVersions(maxVersions); } ResultScanner scanner table.getScanner(scan); ListResult resultList ArrayListResult(); index ; (Result rs : scanner.next(tempPageSize)) { (isEmptyStartRowKey index ) { index ; ; } (rs.isEmpty()) { resultList.add(rs); } index ; } scanner.close(); pageModel.setResultList(resultList); } (Exception e) { e.printStackTrace(); } { { (table){ table.close(); } } (IOException e) { e.printStackTrace(); } } pageIndex pageModel.getPageIndex() ; pageModel.setPageIndex(pageIndex); (pageModel.getResultList().size() ) { [] pageStartRowKey pageModel.getResultList().get().getRow(); [] pageEndRowKey pageModel.getResultList().get(pageModel.getResultList().size() ).getRow(); pageModel.setPageStartRowKey(pageStartRowKey); pageModel.setPageEndRowKey(pageEndRowKey); } queryTotalCount pageModel.getQueryTotalCount() pageModel.getResultList().size(); pageModel.setQueryTotalCount(queryTotalCount); pageModel.initEndTime(); pageModel.printTimeInfo(); pageModel; } Result selectFirstResultRow(Connection connection, tableName,FilterList filterList) { (StringUtils.isBlank(tableName)) ; Table table ; { table connection.getTable(TableName.valueOf(tableName)); Scan scan Scan(); (filterList ) { scan.setFilter(filterList); } ResultScanner scanner table.getScanner(scan); IteratorResult iterator scanner.iterator(); index ; (iterator.hasNext()) { Result rs iterator.next(); (index ) { scanner.close(); rs; } } } (IOException e) { e.printStackTrace(); } { { (table){ table.close(); } } (IOException e) { e.printStackTrace(); } } ; } rowCountByFilter(Connection connection,Class cls ) { tableNameEntityUtils.getSourceTableName(cls); rowCount ; Table table ; { table connection.getTable(TableName.valueOf(tableName)); Scan scan Scan(); scan.setFilter( FirstKeyOnlyFilter()); ResultScanner resultScanner table.getScanner(scan); (Result result : resultScanner) { rowCount result.size(); } } (IOException e) { e.printStackTrace(); }{ (table){ { table.close(); } (IOException e) { e.printStackTrace(); } } } rowCount; } addTableCoprocessor(Connection connection, table, coprocessorClassName) { Admin admin; { TableName tableNameTableName.valueOf(table); admin connection.getAdmin(); admin.disableTable(tableName); HTableDescriptor htd admin.getTableDescriptor(tableName); htd.addCoprocessor(coprocessorClassName); admin.modifyTable(tableName, htd); admin.enableTable(tableName); } (IOException e) { e.printStackTrace(); }{ (admin){ { admin.close(); } (IOException e) { e.printStackTrace(); } } } } rowCountByAggregation( tableName, family) { AggregationClient ac; rowCount ; { Scan scan Scan(); scan.addFamily(Bytes.toBytes(family)); ac AggregationClient(HBaseConfiguration.create()); rowCount ac.rowCount(TableName.valueOf(tableName), LongColumnInterpreter(), scan); } (Throwable e) { e.printStackTrace(); }{ (ac){ { ac.close(); } (IOException e) { e.printStackTrace(); } } } rowCount; } testTableRowCount() { HBaseConnectionEntity entity; { entityHBaseConnectionPool.getInstance().getConnection(); Connection connentity.getConnection(); coprocessorClassName ; addTableCoprocessor(conn,, coprocessorClassName); rowCountByAggregation(, ); }{ HBaseConnectionPool.getInstance().releaseConnection(entity.getId()); } } }