使用Hive做数据分析

在大规模推广streaming方式的数据分析后,我们发现这个模式虽然入门成本低,但是执行效率也一样低。
每一个map task都要在TaskTracker上启动两个进程,一个java和一个perl/bash/python。
输入输出都多复制一次。

经过了一系列调研后,我们开始将部分streaming任务改写为Hive。

Hive是什么?

  1. Hive是单机运行的SQL解析引擎,本身并不运行在Hadoop上。
  2. SQL经过Hive解析为MapReduce任务,在Hadoop上运行。
  3. 使用Hive可以降低沟通成本,因为SQL语法的普及度较高。
  4. Hive翻译的任务效率不错,但是依然不如优化过的纯MapReduce任务。

数据准备

原始日志文件是这样的:
1323431269786 202911262 RE_223500512 AT_BLOG_788514510 REPLY BLOG_788514510_202911262

分别对应的字段是 <时间> <操作人> [[说明] [说明]……] <操作> <实体>
上面的例子对应的含义是:
  • <时间>: 1323431269786
  • <操作人>: 202911262
  • [说明]: RE_223500512
  • [说明]: AT_BLOG_788514510
  • <操作>: REPLY
  • <实体>: BLOG_788514510_202911262

扩展Hive的Deserializer

要用SQL分析数据,Hive必须知道如何切分整行的日志。Hive提供了一个接口,留给我们扩展自己的序列化和反序列化方法。

import java.util.Properties;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.Writable;

public class RawActionDeserializer implements Deserializer {

@Override
public Object deserialize(Writable obj) throws SerDeException {
// TODO Auto-generated method stub
return null;
}

@Override
public ObjectInspector getObjectInspector() throws SerDeException {
// TODO Auto-generated method stub
return null;
}

@Override
public void initialize(Configuration conf, Properties props)
throws SerDeException {
// TODO Auto-generated method stub

}

}

三个函数作用分别是:

  • initialize:在启动时调用,根据运行时参数调整行为或者分配资源。
  • getObjectInspector:返回字段定义名称和类型。
  • deserialize:对每一行数据进行反序列化,返回结果。

定义表结构

在我们这个例子中,字段是固定的含义,不需要在initialize方法配置运行期参数。我们把字段的定义写成static,如下。

private static List structFieldNames = new ArrayList();

private static List structFieldObjectInspectors = new ArrayList();
static {
structFieldNames.add("time");
structFieldObjectInspectors.add(ObjectInspectorFactory
.getReflectionObjectInspector(Long.TYPE, ObjectInspectorOptions.JAVA));

structFieldNames.add("id");
structFieldObjectInspectors.add(ObjectInspectorFactory
.getReflectionObjectInspector(
java.lang.Integer.TYPE, ObjectInspectorOptions.JAVA));

structFieldNames.add("adv");
structFieldObjectInspectors.add(ObjectInspectorFactory
.getStandardListObjectInspector(
ObjectInspectorFactory.getReflectionObjectInspector(
String.class, ObjectInspectorOptions.JAVA)));

structFieldNames.add("verb");
structFieldObjectInspectors
.add(ObjectInspectorFactory.getReflectionObjectInspector(
String.class, ObjectInspectorOptions.JAVA));

structFieldNames.add("obj");
structFieldObjectInspectors
.add(ObjectInspectorFactory.getReflectionObjectInspector(
String.class, ObjectInspectorOptions.JAVA));
}

@Override
public ObjectInspector getObjectInspector() throws SerDeException {
return ObjectInspectorFactory.getStandardStructObjectInspector(
structFieldNames, structFieldObjectInspectors);
}

定义解析函数

为了能够让Java MapReduce任务复用代码,我们在外部实现了一个与Hive无关的类,这里不再贴代码。这个类定义了与日志字段相同的成员变量,并且提供一个static的valueOf方法用于从字符串构造自己。

@Override
public Object deserialize(Writable blob) throws SerDeException {
if (blob instanceof Text) {
String line = ((Text) blob).toString();
RawAction act = RawAction.valueOf(line);
List result = new ArrayList();
if (act == null)
return null;
result.add(act.getTime());
result.add(act.getUserId());
result.add(act.getAdv());
result.add(act.getVerb());
result.add(act.getObj());
return result;
}
return null;
}

建表

把上面程序编译并传到hive部署目录后,进入hive:

$ ./hive --auxpath /home/bochun.bai/dp-base-1.0-SNAPSHOT.jar


hive> CREATE TABLE ac_raw ROW FORMAT SERDE 'com.renren.dp.hive.RawActionDeserializer';
OK
Time taken: 0.117 seconds
hive> DESC ac_raw;
OK
time bigint from deserializer
id int from deserializer
adv array from deserializer
verb string from deserializer
obj string from deserializer
Time taken: 0.145 seconds


hive> LOAD DATA INPATH '/user/bochun.bai/hivedemo/raw_action' OVERWRITE INTO TABLE ac_raw;
Loading data to table default.ac_raw
Deleted hdfs://NAMENODE/user/bochun.bai/warehouse/ac_raw
OK
Time taken: 0.173 seconds


hive> SELECT count(1) FROM ac_raw;
…...显示很多MapReduce进度之后......
OK
332
Time taken: 15.404 seconds


hive> SELECT count(1) as cnt, verb FROM ac_raw GROUP BY verb;
…...显示很多MapReduce进度之后......
OK
4 ADD_FOOTPRINT
1 REPLY
24 SHARE_BLOG
299 VISIT
4 add_like
Time taken: 15.242 seconds

技术和故障

这是写给内部同事的日志,也有部分概念有通用意义,就不加密了。

做技术开发的人知道,只要写代码就一定会出错,叫bug。
有的错误在上线之前没有检查出来,直到被用户使用了我们才知道。这种bug就是故障。
出现故障是再正常不过的事了,我认为处理过程,可以让故障成为宝贵的财富。

我设计的故障处理流程,分为三个阶段:反馈、处理、反思。
1 反馈阶段
说实话,大部分的故障是类似的。反馈阶段就是要把各种用户描述归一成为同一个技术问题。
这个阶段具体还要分成“用户反馈”,“已沟通”,“技术已确认”,三个状态。分别由“客服”和“技术经理”操作。
2 处理阶段
这个阶段就是写代码的阶段,把造成故障的bug修复。技术都懂得怎么做。
这个阶段具体还要分成“已分派”和“线上已修复”两个状态。这两个状态的执行者分别是“技术经理”和“测试经理”。
换句话说,一线的工程师编码的工作,是在“已分派”状态进行的。然后交给OP和QA。
3 反思阶段
这个阶段是最核心,同时是最欠缺的阶段。
这个阶段具体分成“已确认解决方案”和“已完成解决方案”两个状态。
所谓“解决方案”,一定是切中要害的解决,并且可以保证类似问题可以避免再次发生。
“解决方案”和“修复BUG”的区别在于有没有反思发生的根源。

整个流程的设计,核心目的是自我进化。只要持续犯错,再避免重复犯错,最终一定是伟大的团队。

休假有感

结束了连续11天的假期,团队运转良好,没有发生技术故障,我很高兴。

作为一名工程师,我被遗忘了,很好!总出事别人才会记住你,总延期别人就会担心你,总更包别人就会关注你,可是我认为技术进步到一个阶段,就可以被别人忘记了。尽量早的做优化,尽量多的留余量,尽量高层次的设计接口,就可以做到。
作为一名管理者,我没有授权充分。对大家OA审批的不够及时,对新服务器资源的分配没有执行,对报警跟踪的推进没有坚持。回去第一天都要补上。

五月我想做一些改变,让工程师都和我一样解放出来,被别人遗忘。

中小规模Hadoop集群优化

我们有一个Hadoop集群从上个月开始遇到一系列性能问题,在逐一解决的过程中,积累了以下的优化经验。

1. 网络带宽

Hadoop集群的服务器在规划时就在统一的交换机下,这是在官方文档中建议的部署方式。

但是我们的这台交换机和其他交换机的互联带宽有限,所以在客户端遇到了HDFS访问速度慢的问题。

把操作集群的客户端也联入DataNode的交换机内部,解决了这个问题。

2. 系统参数

对ulimit -c的修改也是官方文档建议的修改,在集群只有10台服务器时,并没有遇到问题。

随着机器增加和任务增加,这个值需要改的更大。

3. 配置文件管理

这个集群用的是Cloudera发行的版本,配置文件默认存在/etc/hadoop/conf位置。这是一个只有root才能修改的位置。

为了修改方便,我把配置文件统一保存在一台机器上,修改后用脚本分发。保证所有服务器都是统一的配置。

4. mapred.tasktracker.map.tasks.maximum

这个参数控制每个TaskTracker同时运行的Map任务数。

以前的设置是和CPU核数相同的,偶尔遇到任务挤占DataNode资源的问题。

现在改成map+reduce+1==num_cpu_cores。

5. 严格控制root权限

Cloudera的发行版会创建一个hadoop用户,各种守护进程都应该以这个用户运行。

曾经有误操作(/usr/lib/hadoop/bin/hadoop datanode &)导致本地的数据目录被root写入新文件,于是正确启动的hadoop用户进程无法读写。

所以现在的集群服务器不提供日常的root权限访问。

6. Java的GC模式

在mapred.child.java.opts和HADOOP_OPTS都增加了-XX:+UseConcMarkSweepGC。

JDK的文档中推荐现代多核处理器系统,采用这种GC方式,可以充分利用CPU的并发能力。

这个改动对性能的积极影响很大。

7. 选择正确的JDK

这个集群有部分服务器的JDK用的是32位版本,不能创建-Xmx4g以上的进程。

统一为x64版本的JDK。

8. mapred.reduce.slowstart.completed.maps

这个参数控制slowstart特性的时机,默认是在5%的map任务完成后,就开始调度reduce进程启动,开始copy过程。

但是我们的机器数量不多,有一次大量的任务堆积在JobTracker里,每个TaskTracker的map和reduce slots都跑满了。

由于map没有足够资源迅速完成,reduce也就无法结束,造成集群的资源互相死锁。

把这个参数改成了0.75,任务堆积的列表从平均10个,变成了3个。

9. mapred.fairscheduler.preemption

这个参数设为了true。以便fairscheduler在用户最小资源不能满足时,kill其他人的任务腾出足够的资源。

集群运行着各种类型的任务,有些map任务需要运行数小时。这个参数会导致这类任务被频繁kill,几乎无法完成。曾经有个任务在7小时内被kill了137次。

可以通过调整fairscheduler的pool配置解决,给这种任务单独配置一个minMap==maxMap的pool。

10. mapred.jobtracker.completeuserjobs.maximum

限制每个用户在JobTracker的内存中保存任务的个数。

因为这个参数过大,我们的JobTracker启动不到24小时就会陷入频繁的FullGC当中。

目前改为5,JT平稳运行一天处理1500个任务,只占用800M内存。

这个参数在>0.21.0已经没有必要设置了,因为0.21版本改造了completeuserjobs的用法,会尽快的写入磁盘,不再内存中长期存在了。

11. mapred.jobtracker.update.faulty.tracker.interval和mapred.jobtracker.max.blacklist.percent

一个写错的任务,会导致一大批TaskTracker进入黑名单,而且要24小时才能恢复。这种状况对中小规模的集群性能影响是非常大的。只能通过手工重启TaskTracker来修复。所以我们就修改了部分JobTracker的代码,暴露了两个参数:

mapred.jobtracker.update.faulty.tracker.interval控制黑名单重置时间,默认是24小时不能改变,我们现在改成了1小时。

mapred.jobtracker.max.blacklist.percent控制进入黑名单TT的比例,我们改成了0.2。

我正在补充这两个参数的TestCase,准备提交到trunk中。

12. 多用hive少用streaming

由于streaming的方便快捷,我们做了很多基于它的开发。但是由于streaming的任务在运行时还要有一个java进程读写stdin/out,有一定的性能开销。

类似的需求最好改用自定义的Deserializer+hive来完成。

Memcache协议

Memcache协议在Web应用很广泛,为高效通信设计,简单而且美丽。
上个季度中间层组织开发了一套使用Memcached做后台的缓存系统,对服务器利用率和维护成本都有很大的改善。这个版本的review结束后,我打算开源以便可以得到更多的应用实践。
也就是在实现上面说到的系统的过程中,我发现各种版本的开源Memcache客户端,都有一些侧重。上周被java-memcached的客户端折腾了一周,虽然最后解决了,但是这类极限问题最近遇到的越来越多。我们现在需要一个侧重可靠性和性能的客户端,功能并不是这个阶段的核心问题。
这个季度的10%课外时间,我会尝试实现一个自己的客户端。

2010Q1沟通总结

在最近的团队沟通中,我认识到明确的个人发展目标,对每个人效率的提高作用显著。
沟通的过程逐渐的行程了这样的一个梯队标准,写出来备忘。

在人人网技术路线可以分为三个阶段,我称为Ver1.0,Ver2.0,Ver3.0三个版本。

在V1.0阶段,接受到的输入是关于“函数”或“类”的描述,日常工作重点使用的技能是“编程语言的特性”,产出的结果是实现部分具体功能;
在V2.0阶段,输入是“功能”和“服务”,技能是“功能设计”,结果是“完整的功能”;
在V3.0阶段,输入是“需求”,技能是“结构设计”,结果是“产品”。

输入可以由上级评价,技能可以由技术委员会评价,结果可以由实际的产出来评价。

在可以生产“产品”以后,就应该选择技术路线或者管理路线了。

Published
Categorized as 人人网

Untitled

今天在用人人网的时候,突然想到上学时候学的政治课,马克思的《资本论》里面讲到过社会发展的规律,和目前我们在做的SNS有一些指导意义。
翻箱倒柜的找了半天,都没有找到以前的政治课本,于是到网络上搜了一段描述,用来对比。

社会发展的动力与规律
转自维基百科 http://zh.wikipedia.org/wiki/历史唯物主义
历史唯物主义认为:生产力和生产关系之间的矛盾,经济基础和上层建筑之间的矛盾,这是人类社会的基本矛盾。这两对矛盾存在于一切社会形态之中,贯穿于每一个社会形态的始终,决定着其他各种社会矛盾,是推动社会发展的基本动力,决定着社会历史的一般进程。
在我们这里,技术架构是生产力,业务功能是生产关系。
生产力和生产关系的辩证关系是:
生产力决定生产关系:生产力对生产关系起着决定作用、支配作用,其主要表现在两个方面:第一,生产力的性质决定生产关系的性质。第二,生产力的发展变化决定生产关系的改变。
生产关系反作用于生产力:这种反作用表现为两种情况:第一,适合生产力的性质和发展要求的先进的生产关系,促进生产力的发展;第二,不适合生产力的性质和发展要求的落后的生产关系,阻碍生产力的发展。
生产力和生产关系之间的矛盾运动:生产力和生产关系之间的矛盾,在生产发展的不同阶段具有不同的情况。在一种生产关系产生和确立后的一段时间内,它与生产力的性质和发展要求是基本适合的,对生产力的发展具有积极的推动作用,促进生产力以前所未有的速度向前发展。虽然这时生产力和生产关系之间也有矛盾,人们也会自觉或不自觉地对生产关系作某些调整,但却不会引起生产关系的根本变革。
在我们这里,用户行为是经济基础,盈利模式是上层建筑。
经济基础和上层建筑的辩证关系是:
经济基础决定上层建筑。首先,经济基础的性质决定上层建筑的性质,一定的上层建筑总是为了适应一定的经济基础的需要而建立起来的;经济上占统治地位的阶级,必然在国家政权和意识形式上占统治地位。第二,经济基础的变革决定上层建筑的变革,当经济基础发生变革后,上层建筑迟早会发生变革,以求得与经济基础相适应,经济基础的变化发展还规定着上层建筑变化发展的方向。
上层建筑对经济基础具有能动的反作用。这种反作用表现为,上层建筑为经济基础提供政治保障和意识形态形式。这种反作用,取决于上层建筑所服务的经济基础的性质。当上层建筑适合于经济基础的要求时,它就起到巩固经济基础和促进生产力发展的作用。当上层建筑不适应经济基础的要求时,它就起到阻碍和生产力发展的作用。
经济基础和上层建筑的矛盾运动:经济基础和上层建筑的相互作用,表现为经济基础对上层建筑的决定作用和上层建筑对经济基础的反作用。经济基础的决定作用,是第一性的;上层建筑的反作用是第二性的。经济基础的决定作用是根本性的;上层建筑的反作用是派生的和从属的。经济基础的决定作用与上层建筑的反作用,构成二者之间的矛盾运动,体现为上层建筑必须适合经济基础发展的基本规律。

读书笔记 2010.03.06

今天听管理培训课程,收获颇多。

第一个收获是,能力和意愿的循环关系。
第二个收获是,智力工作者掌握生产工具,与传统工人不同。
第三个收获是,要因人因事而异的做出判断,管理没有对错。

讲课的王老师好像住的离我家很近。

人人网中间层:实践篇

之前的问题篇和求解篇描述了人人网在发展过程中遇到的问题,并且介绍了我们采用中间层来提高性能的解决方案。今天的实践篇将通过一个例子来实现一个中间层服务。
这个服务要达到的目的是快速的查询用户是否有效,数据将要使用bitset保存在内存中,每个用户一位,仅保存正整数约21亿,占用内存256M。

开始编码

下面的代码都在这个位置保存:http://gitorious.org/renren/bitserver

接口定义

定义接口如下:
[code lang=”c++”]#include
module renren {
     struct BitSegment {
         int begin;
         int end;
         Ice::ByteSeq data;
     };
    interface BitServer {
         bool get(int offset);
         Ice::BoolSeq gets(Ice::IntSeq offsets);
         BitSegment getSegment(int begin, int end);
     };
};[/code]
这个BitServer.ice文件,通过slice2cpp命令编译成为服务端的Skeleton文件:
[code lang=”bash”]slice2cpp -I/opt/Ice-3.3/slice BitServer.ice[/code]

服务端

有了上面生成的服务端文件后,就可以实现我们自己的业务功能了。
BitServerI.h和BitServerI.cpp,暂时只是实现了单个get的接口。
[code lang=”c++”]#ifndef __BitServerI_h__
#define __BitServerI_h__

#include

#define SIZE_OF_BIT 2147483647
#include

namespace renren
{

class BitServerI : virtual public BitServer
{
public:
void initialize();

virtual bool get(::Ice::Int,
const Ice::Current&);

virtual ::Ice::BoolSeq gets(const ::Ice::IntSeq&,
const Ice::Current&);

virtual ::renren::BitSegment getSegment(::Ice::Int,
::Ice::Int,
const Ice::Current&);
private:
std::bitset bits_;
};

}

#endif[/code]
[code lang=”c++”]
#include
#include

int main(int argc, char** argv) {
int status = 0;
Ice::CommunicatorPtr ic;
try{
ic = Ice::initialize(argc, argv);
Ice::ObjectAdapterPtr adapter = ic->createObjectAdapter(“BitServer”);
renren::BitServerI* obj = new renren::BitServerI;
obj->initialize();
adapter->add(obj, ic->stringToIdentity(“BitServer”));
adapter->activate();
ic->waitForShutdown();
} catch (const Ice::Exception& e) {
std::cerr << e << std::endl; status = 1; } catch (const std::exception& e) { std::cerr << e.what() << std::endl; status = 1; } catch (...) { std::cerr << "unknown exception" << std::endl; status = 1; } if (ic) { try { ic->destroy();
} catch (const Ice::Exception& e) {
std::cerr << e << std::endl; status = 1; } catch (const std::exception& e) { std::cerr << e.what() << std::endl; status = 1; } catch (...) { std::cerr << "unknown exception" << std::endl; status = 1; } } return status; } void renren::BitServerI::initialize() { for (int i=0; i<0xFFFFF;i=i+2) { bits_[i]=true; } } bool renren::BitServerI::get(::Ice::Int offset, const Ice::Current& current) { if(offset < 0) return false; return bits_[offset]; } ::Ice::BoolSeq renren::BitServerI::gets(const ::Ice::IntSeq& offsets, const Ice::Current& current) { return ::Ice::BoolSeq(); } ::renren::BitSegment renren::BitServerI::getSegment(::Ice::Int begin, ::Ice::Int end, const Ice::Current& current) { return ::renren::BitSegment(); }[/code]

客户端

我们使用Java作为客户端,首先用slice2java工具生成Java的Proxy类。
[code lang=”bash”]slice2java -I/opt/Ice-3.3/slice BitServer.ice[/code]
然后自己实现客户端代码:
[code lang=”java”]package renren;

class BitServerAdapter {
private final String endpoints_;
private Ice.Communicator ic_;
private renren.BitServerPrx prx_;

public BitServerAdapter(String endpoints) {
this.endpoints_ = endpoints;
}

public void initialize() {
ic_ = Ice.Util.initialize();
prx_ = renren.BitServerPrxHelper.uncheckedCast(ic_.stringToProxy(endpoints_));
}

public boolean get(int id) {
return prx_.get(id);
}

public static void main(String[] args) {
BitServerAdapter adapter = new BitServerAdapter(args[0]);
adapter.initialize();
boolean ret = adapter.get(Integer.valueOf(args[1]));
System.out.println(ret);
System.exit(0);
}
}[/code]

性能测试

完成了代码,来测试一下性能吧。
首先启动服务器
[code lang=”bash”]target/bitserver –Ice.Config=config[/code]
再启动客户端
[code lang=”bash”]java -cp /opt/Ice-3.3/lib/Ice.jar:target/bitclient.jar \
renren.BitServerAdapter “BitServer:default -p 10000” 1022[/code]
在客户端调用增加循环50000次,单线程平均每秒处理一万次。

在多线程的环境下,单个服务器每秒可处理的请求8万次左右,已经超过了目前的需要。