zookeeper python客户端

1
2
3
$ git clone https://github.com/rgs1/zk_shell
$ sudo pip install -r zk_shell/requirements.txt
$ export ZKSHELL_SRC=1; bin/zk-shell

或者直接pip安装

1
$ pip install zk-shell

修改pip源

使用阿里pip源

1
2
3
4
$ cat ~/.pip/pip.conf
[global]
trusted-host = mirrors.aliyun.com
index-url = http://mirrors.aliyun.com/pypi/simple

greenplum分区

创建分区表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
CREATE SCHEMA employee;
CREATE TABLE "employee"."employee" (
"name" VARCHAR,
"ei" int4 NOT NULL,
"employeeid" int4 NOT NULL,
"department" TEXT,
"updatetime" TIMESTAMP (0),
"isstop" int2,
CONSTRAINT "employee_pk" PRIMARY KEY ("ei", "employeeid")
) WITH (OIDS = FALSE) DISTRIBUTED BY (ei) PARTITION BY RANGE (ei)(
START (0)
END (500000) EVERY (10000),
DEFAULT PARTITION extra
);

分区前后性能对比

分区前后复杂sql的执行时间对比,单位是秒

分区前 分区后
3.666 2.836
7.105 2.873
4.763 2.624
6.018 3.070
5.024 2.310
3.101 1.967
5.357 3.095

查看表空间大小

  1. 查看指定表大小

    1
    select pg_size_pretty(pg_relation_size('test'));
  2. 查看指定schema下所有表大小

    1
    select relname, pg_size_pretty(pg_relation_size(relid)) from pg_stat_user_tables where schemaname='public' order by pg_relation_size(relid) desc;

附录

copy

1
copy (select ei, contactid, isdeleted, name, tel, mobile from contact) to '/data/contact.txt';

检查schema是否存在

1
select exists (select * from pg_catalog.pg_namespace where nspname = '$schema') as schema_exists

检查table是否存在

1
select exists (select 1 from information_schema.tables where table_schema = '$schema' and table_name = '$table') as table_exists

参考

  1. Pivotal Greenplum Docs: CREATE TABLE
  2. Pivotal Greenplum Docs: DELETE
  3. Pivotal Greenplum Docs: Partitioning Large Tables
  4. PostgreSQL 查看数据库,索引,表,表空间大小
  5. How to check if PostgreSQL public schema exists? - Stack Overflow

spark自定义分区策略(partitioner)

Spark内部提供了HashPartitionerRangePartitioner两种分区策略,但有些场景下,我们希望能够根据业务需求自定义分区策略。只需要继承Partitioner,然后实现其方法即可。

1
2
3
4
5
6
7
8
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
*/
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}

Read More

spark使用jdbc connector连接数据库

使用spark-shell测试

1
SPARK_CLASSPATH=/usr/share/java/mysql-connector-java.jar spark-shell

使用 --driver-class-path 参数在进行计算时会报错, Did not find registered driver with class com.mysql.jdbc.Driver 。猜测是由于yarn没有默认加载mysql-connector-java.jar造成的。

创建DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scala> val df = sqlContext.read.format("jdbc").options(Map("url" -> "jdbc:mysql://db_host/?user=secret&password=secret","dbtable" -> "db.table")).load()
df: org.apache.spark.sql.DataFrame = [id: string, name: string, age: int]
scala> df.printSchema
root
|-- id: string (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
scala> df.count
res0: Long = 7
scala> df.registerTempTable("people")
scala> sqlContext.sql("select count(*) from people").collect
res2: Array[org.apache.spark.sql.Row] = Array([7])
scala> sqlContext.sql("select name, age from people").collect
res3: Array[org.apache.spark.sql.Row] = Array([name,32], [name,32], [name,32], [name,32], [name,32], [name,32], [name,32])

参考

  1. spark1.6官方文档
  2. stackoverflow: 如何使用spark连接postgresql
  3. Spark SQL官方文档-中文翻译

kafka manager

yahoo出品的kafka管理工具,git地址https://github.com/yahoo/kafka-manager

手动打包

kafka-manager用scala编写,需要使用sbt进行打包。sbt环境配置这里不详细介绍,但为加快依赖包下载速度,可以修改project/plugins.sbt,增加或修改resolvers

1
2
3
// The Typesafe repository
// resolvers += "Typesafe repository" at "http://repo.typesafe.com/typesafe/releases/"
resolvers += "sonatype" at "https://oss.sonatype.org/content/repositories/public/"

然后使用./sbt clean dist进行打包,打包后的zip包在target/universal/下。

也可以直接下载我打包好的zip包:

配置修改

解压后启动前需要进行一些设置:

  1. 修改conf/application.confkafka-manager.zkhosts为你自己的zookeeper地址,此zk地址是kafka-manager使用的,用来保存一些kafka-manager的状态等。

    多个zk时用逗号分隔,

    1
    kafka-manager.zkhosts="my.zookeeper.host.com:2181,other.zookeeper.host.com:2181"
  2. 更改日志目录,修改conf/logback.xml中name为FILE的appender。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    <appender name="FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
    <!-- <file>${application.home}/logs/application.log</file> -->
    <encoder>
    <pattern>%date - [%level] - from %logger in %thread %n%message%n%xException%n</pattern>
    </encoder>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
    <fileNamePattern>logs/application.%d{yyyy-MM-dd}.log</fileNamePattern>
    <maxHistory>5</maxHistory>
    <totalSizeCap>5GB</totalSizeCap>
    </rollingPolicy>
    </appender>

启动

指定java home和端口启动。

1
/bin/kafka-manager -java-home /usr/java/jdk1.8.0_66 -Dhttp.port=8909

Oozie中添加sqoop用到的jdbc包

CDH5中,在oozie中执行sqoop命令时,可能需要手动添加一些jdbc包。

将jar包放置到hdfs中lib目录

1
/user/oozie/share/lib/lib_${timestamp}/sqoop

然后使用oozie的sharelibupdate命令更新

1
2
3
4
5
6
$ oozie admin -oozie http://192.168.0.100:11000/oozie -sharelibupdate
[ShareLib update status]
sharelibDirOld = hdfs://nameservice/user/oozie/share/lib/lib_20160801151935
host = http://192.168.0.100:11000/oozie
sharelibDirNew = hdfs://nameservice/user/oozie/share/lib/lib_20160801151935
status = Successful

参考

  1. cloudera: How-to: Use the ShareLib in Apache Oozie (CDH 5)
  2. Oozie: Command Line Interface Utilities
  3. StackOverflow: Oozie + Sqoop: JDBC Driver Jar Location

Linux iostat监测IO状态

Linux系统出现了性能问题,一般我们可以通过top、iostat、free、vmstat等命令来查看初步定位问题。其中iostat可以给我们提供丰富的IO状态数据。

常见用法:

1
2
3
$iostat -d -k 1 10 #查看TPS和吞吐量信息
iostat -d -x -k 1 10 #查看设备使用率(%util)、响应时间(await)
iostat -c 1 10 #查看cpu状态

Read More