MongoDB practice:基于MongoDB的好友消息动态的实现思路(How to build activity-streaming with MongoDb)
好友动态是SNS最常见的功能。在设计“视觉中国原创榜”的好友动态时,也遇到如何实现的问题。和普通的SNS不同,
视觉中国原创榜用户不仅仅关注好友的动态,而且也要关注自己的作品和自己曾经关注过的作品的动态。
这样,就需要给用户分别push 3种不同的动态: 我的作品的动态,我关注过的作品(包括收藏过,评论过,评分过)的动态,
以及我关注的人(followed)的动态,未来还有好友的动态。这些动态,用户都应该可以取消关注。如何实现?
使用传统的数据库会面临很多问题,比如如何sharding。 幸运的是,我们用的MongoDB,这给我们解决问题带来了极大的方便。
首先,对每个用户,分别设计以下 collection
//by nightsailer.com
activity_streaming.feed, 属性分别为:
$schema = array(
_id, 用户id
followed_art => array(), 用户关注的作品id列表
followed_people=>array(), 用户关注的人id列表
my_art => array() 用户的作品id 列表
)
//by nightsailer.com
activity_stream.user:
_id
type=> 动态steam的类型(我关注的作品,我的作品,我关注的人)
stream_target => 对应动态stream的对象(作品id,人id)
stream => array() FIFO数组,存放activity的DBRef
time => int 最后一次activity的时间戳
activity_stream.site
_id uuid
type: 动态类型
data: hash 动态数据
time
activity_stream.queue
同 activity_stream.site
存放待处理的动态队列
我们使用异步处理方式,按照以下流程:
1. 当用户某些动作产生一个事件后,将事件push到activity_stream.queue, 通知worker进行处理
2. worker 被唤醒,从activity_stream.queue中提取未处理的事件。
3. worker 将事件放入activity_stream.site,作为全站动态广播
4. worker 从activity_streaming.feed中反向查找事件的作品或作者是否有对应的人,如果有,则将此事件id
push到activity_stream.user 的对应fifo数组中。
(这是MongoDB最兴奋的地方,可以直接查询数组中的值,只要对数组进行索引)
这样,用户可以:
- 从activity_stream.user 中删除某个事件
- 从activity_stream.feed 中删除某个关注的对象(实现类似忽略这个人的动态,忽略这个作品的动态等等)
- 当用户关注好友后,将其加入activity_stream.feed
- 当用户上传作品后,将作品加入activity_stream.feed
- 当用收藏、评分、评论后,将其作品加入activity_stream.feed
以上是MongoDB实践的第二篇,待续。
MongoDB practice: My Perl GridFS Wrapper
简单写了一个Perl版本的GriFS的wrapper:
package CZone::GridFS;
use strict;
use MongoDB::GridFS;
use Path::Class;
use Digest::file qw(digest_file_hex);
use Digest::MD5 qw(md5_hex);
use IO::File;
use Data::Dumper;
use Any::Moose;
has database => (
isa => ‘MongoDB::Database’,
is => ‘ro’,
required => 1
);
has _gridfs => (
isa => ‘MongoDB::GridFS’,
is => ‘ro’,
lazy => 1,
builder => ‘_build__gridfs’,
);
has _file_collection => (
isa => ‘MongoDB::Collection’,
is => ‘ro’,
lazy => 1,
builder => ‘_build__file_collection’
);
sub _build__gridfs {
my $self = shift;
return $self->database->get_gridfs;
}
sub _build__file_collection {
my $self = shift;
return $self->database->get_collection(’fs.files’);
}
sub get_bytes {
my ($self, $id ) = @_;
my $file = $self->_gridfs->find_one({_id => $id });
my $bytes;
my $fh = new IO::File \$bytes,’>';
$file->print($fh);
return $bytes;
}
sub store_file {
my ($self, $file_path) = @_;
my $file = file($file_path)->absolute;
return undef unless -e $file;
my $md5 = digest_file_hex($file,’MD5′);
my $fh = $file->open(’r') or return undef;
return $self->_store_fh($fh,$md5);
}
sub _store_fh {
my ($self,$fh,$md5) = @_;
# $grid_file isa MongoDB::GridFS::File
my $grid_file = $self->_gridfs->find_one({ ‘md5′ => $md5});
if ($grid_file) {
$self->_inc_refs($grid_file->info->{_id});
return $grid_file->info->{_id};
}
else {
my $oid = $self->_gridfs->insert($fh,{
refs => 1,
md5 => $md5,
});
return $oid;
}
}
sub store_bytes {
my ($self, $bytes) = @_;
my $md5 = md5_hex($bytes);
my $fh = new IO::File \$bytes,’<';
# my $fh = FileHandle->new;
# $fh->open(\$bytes,’<');
return $self->_store_fh($fh,$md5);
}
sub unlink {
my ($self, $id ) = @_;
$self->_dec_refs(MongoDB::OID->new(value =>”$id”));
}
sub _inc_refs {
my ($self,$id) = @_;
$self->_file_collection->update({_id => $id },{ ‘$inc’ => { refs => 1}});
}
sub _dec_refs {
my ($self,$id) = @_;
$self->_file_collection->update({_id => $id },{ ‘$inc’ => { refs => -1}});
}
sub gc {
my $self = shift;
$self->_gridfs->remove({refs => 0});
}
no Any::Moose;
__PACKAGE__->meta->make_immutable;
1;
__END__
这是从czone项目中的PHP代码移植过来的。
方便将gridfs中的文件读写到scalar中。同时,通过检查存储文件的md5值,并记录相同文件的引用计数,相同文件只存储一个copy,节省空间。(BSON格式对于空间的需求是非常大的)
(updated)MongoDB:PHP中存储和调用server side 自定义函数
在MongoDB 从1.1.x版本开始可以将server side code存储,这样可以一次性导入或者存储函数定义后,
就可以在$where等中使用这些函数.
在PHP driver中如何存储和定义这些js 函数? 目前似乎没有直接的简单方法. 如果调用MongoDb::execute是不行的.
我的解决方法使用曲线救国,通过将代码save到system.js进行存储,通过execute js closure来调用.,
UPDATED:
使用MongoCode的scope方式来简介传递命名参数. 实现命名参数传递, 更加简洁.
例子如下:
public function store_server_function($fun_name,$fun_body) {
$code = sprintf(’
var _fun = %s;
db.system.js.save({_id:”%s”, value: _fun });
‘,$fun_body,$fun_name);
self::$_db->execute($code);
}
public function call_function($function,array $args = array()) {
$closure = “function(){ return $function.apply
(this,arguments); }”;
// echo $closure,”\n”;
$result = self::$_db->execute($closure,$args);
// var_dump($result);
return $result['retval'];
}
public function call_function($fun_name, array $named_args = array()) {
$response = $this->db->execute(new MongoCode(”$fun_name()”,$named_args));
return isset($response['retval'])?$response['retval']:$response;
}
使用例子:
$base->store_server_function(’x',’function(i){ return i+5; }’);
$i = 10;
$ok = $base->call_function(’x',array($i));
ok = $base->call_function(’x',array(’i'=>5));
is($ok,$i+5,’store_server_function’);
这里,store_server_function用处不大,因为可以直接写成js然后用mongo导入.
但是call_function还是很有用的. 通过调用这些函数,可以简化很多工作.
比如一个简单的例子是sequence(js):
function(seq) {
db.sequence.update({name:seq},{$inc:{val:1}},true);
var row = db.sequence.findOne({name:seq});
return row.val;
};
//php
public function next_seq_id($seq_name) {
return $this->call_function(’next_seq_id’,array(’seq’=>$seq_name));
}
MongoDB的Perl driver的中文乱码问题
Perl下面向mongodb插入中文字符串会出现乱码.
根据MongoDB的文档, MongoDB支持UTF-8的编码. 但在Perl中,
如果直接使用utf8的字符串,也会出现问题.
测试代码:
my $mongo_dbh = $mongo_connection->get_database( $mongo_db );
my $t = $mongo_dbh->get_collection(’test’);
my $word = ‘测试’;
$t->insert({ title => $word });
my $row = $t->find_one();
say “title:”,$row->{title};
$t->remove();
输出结果是乱码. 在mongo shell和PHP中得到的也是乱码.
我初步判断是perl driver没有能够识别utf8编码而是强制encode成utf8编码后存储.
修改如下:
my $mongo_dbh = $mongo_connection->get_database( $mongo_db );
my $t = $mongo_dbh->get_collection(’test’);
my $word = ‘测试’;
$t->insert({ title => decode_utf8($word) });
my $row = $t->find_one();
say “title:”,$row->{title};
$t->remove();
输出正常. 判断正确, 问题解决. 希望Kristina能够修改就无须多此一举(当然,如果是非utf8编码还是需要转换的),
也许并不是bug而是个feature?
UPDATE: Kristina的回复很迅速, 一觉醒来, master里已经加入判断是否为utf8的代码. CPAN .27(下周2发布)以上不会存在这个问题.
但是, 其他格式的编码仍然需要转换为utf8编码,因为BSON只支持UTF8编码.
MongoDb Replication
MongoDb的Replication支持:
1. master-slave:
slave可以有多个.
2. Replica Pairs
实际上是一个failover的master-slave模式. 启动时,2个node的mongo会协商,其中1个成为master,另一个为slave. 当master down了,那么slave会自动接管成为master.
不过,这种模式需要driver支持. 需要在driver connect时候
选择pairs 模式.
3. 有限的master-master
可忽略
问题是,我希望是 replica pairs + slave(s) 模式.
不幸的是, 目前版本不支持. mailinglist说是在开发中.
主要的一个限制就是slave的source只能在启动时候指定,
虽然支持多个upstream的source,但是无法中途修改.
如果source改变,需要shutdown然后restart.
此外,一个缺陷是,需要client端链接时指定host.
我理想的模式是使用虚拟ip, 一个是writer,一个是reader.
当某个node down了,则通过arp 转到另一个实际的node的真实ip.
这是writer, 如果是reader,那么可以通过LVS来负载均衡
到不同的节点.
粗粗想一下,实现这个解决方案的难度倒不大. 可以参考mysql的mmm. 等等看,如果未来mongodb没有出类似的方案,可以考虑实现一个.