近期研究了一下NetMQ,设想把他用在分布式爬虫上面,NetMQ是一个封装了Socket队列的开源库,他是ZeroMQ的.net移植版,而ZeroMQ是用C写成的,有人测试过他的性能,几乎可以秒杀其他所有的MQ(MSMQ,RabitMQ等等,都不是他的对手),不过他也有一个弱点,消息不支持持久化!当然,这个功能可以自己实现,我这里只讲性能,不需要持久化
下面的例子是我基于NetMQ官网的例子修改的,下面有三个对象Ventilator 消息分发者,Worker 消息处理者,Sink 接受Worker处理消息后返回的结果,耗时的计算处理工作是交给Worker的,如果开多个Worker.exe,可以提升处理速度,Worker的最终目的是分布式计算,部署到多台PC上面,把计算工作交给他们去做(在分布式爬虫上面,每个Worker相当于一个爬虫)。
不废话,上代码(本来打算用作为序列化格式,在多线程环境下老是报一个错,暂时不知道是什么原因,所以这段注释掉了)
首先是定义要发送到消息里的对象
using System;
using ProtoBuf;
namespace Model
{
[Serializable]
[ProtoContract]
public class Person
{
[ProtoMember(1)]
public int Id { get; set; }
[ProtoMember(2)]
public string Name { get; set; }
[ProtoMember(3)]
public DateTime BirthDay { set; get; }
[ProtoMember(4)]
public Address Address { get; set; }
}
}
using System;
using ProtoBuf;
namespace Model
{
[Serializable]
[ProtoContract]
public class Address
{
[ProtoMember(1)]
public string Line1 { get; set; }
[ProtoMember(2)]
public string Line2 { get; set; }
}
}
然后是消息的发送者
using System;
using System.IO;
using System.Runtime.Remoting.Channels;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading;
using System.Threading.Tasks;
using Model;
using NetMQ;
using ProtoBuf;
using ProtoBuf.Meta;
namespace Ventilator
{
sealed class Ventilator
{
public void Run()
{
Task.Run(() =>
{
using (var ctx = NetMQContext.Create())
using (var sender = ctx.CreatePushSocket())
using (var sink = ctx.CreatePushSocket())
{
sender.Bind("tcp://*:5557");
sink.Connect("tcp://localhost:5558");
sink.Send("0");
Console.WriteLine("Sending tasks to workers");
RuntimeTypeModel.Default.MetadataTimeoutMilliseconds = 300000;
//send 100 tasks (workload for tasks, is just some random sleep time that
//the workers can perform, in real life each work would do more than sleep
for (int taskNumber = 0; taskNumber < 10000; taskNumber++)
{
Console.WriteLine("Workload : {0}", taskNumber);
var person = new Person
{
Id = taskNumber,
Name = "First",
BirthDay = DateTime.Parse("1981-11-15"),
Address = new Address { Line1 = "Line1", Line2 = "Line2" }
};
using (var sm = new MemoryStream())
{
//Serializer.PrepareSerializer<Person>();
//Serializer.Serialize(sm, person);
//sender.Send(sm.ToArray());
var binaryFormatter = new BinaryFormatter();
binaryFormatter.Serialize(sm, person);
sender.Send(sm.ToArray());
}
}
}
});
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NetMQ;
namespace Ventilator
{
public class Program
{
public static void Main(string[] args)
{
// Task Ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket
Console.WriteLine("====== VENTILATOR ======");
Console.WriteLine("Press enter when worker are ready");
Console.ReadLine();
//the first message it "0" and signals start of batch
//see the Sink.csproj Program.cs file for where this is used
Console.WriteLine("Sending start of batch to Sink");
var ventilator = new Ventilator();
ventilator.Run();
Console.WriteLine("Press Enter to quit");
Console.ReadLine();
}
}
}
消息的处理者
using System;
using System.IO;
using System.Runtime.Serialization.Formatters.Binary;
using System.Threading;
using System.Threading.Tasks;
using Model;
using NetMQ;
using ProtoBuf;
namespace Worker
{
sealed class Worker
{
public void Run()
{
Task.Run(() =>
{
using (NetMQContext ctx = NetMQContext.Create())
{
//socket to receive messages on
using (var receiver = ctx.CreatePullSocket())
{
receiver.Connect("tcp://localhost:5557");
//socket to send messages on
using (var sender = ctx.CreatePushSocket())
{
sender.Connect("tcp://localhost:5558");
//process tasks forever
while (true)
{
//workload from the vetilator is a simple delay
//to simulate some work being done, see
//Ventilator.csproj Proram.cs for the workload sent
//In real life some more meaningful work would be done
//string workload = receiver.ReceiveString();
var receivedBytes = receiver.Receive();
using (var sm = new MemoryStream(receivedBytes))
{
// 序列化在多线程方式下报错:
/*
Timeout while inspecting metadata; this may indicate a deadlock.
This can often be avoided by preparing necessary serializers during application initialization,
rather than allowing multiple threads to perform the initial metadata inspection;
please also see the LockContended event
*/
//var person = Serializer.Deserialize<Person>(sm);
//采用二进制方式
var binaryFormatter = new BinaryFormatter();
var person = binaryFormatter.Deserialize(sm) as Person;
Console.WriteLine("Person {Id:" + person.Id + ",Name:" + person.Name + ",BirthDay:" +
person.BirthDay + ",Address:{Line1:" + person.Address.Line1 +
",Line2:" + person.Address.Line2 + "}}");
Console.WriteLine("Sending to Sink:" + person.Id);
sender.Send(person.Id + "");
}
//simulate some work being done
//Thread.Sleep(int.Parse(workload));
}
}
}
}
});
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Worker
{
public class Program
{
public static void Main(string[] args)
{
// Task Worker
// Connects PULL socket to tcp://localhost:5557
// collects workload for socket from Ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to Sink via that socket
Console.WriteLine("====== WORKER ======");
//Task 方式多线程
//foreach (Worker client in Enumerable.Range(0, 1000).Select(
// x => new Worker()))
//{
// client.Run();
//}
//多核计算方式多线程
var actList =
Enumerable.Range(0, 50).Select(x => new Worker()).Select(client => (Action)(client.Run)).ToList();
var paraOption = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };
Parallel.Invoke(paraOption, actList.ToArray());
Console.ReadLine();
}
}
}
接受消息处理的结果
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
namespace Sink
{
public class Program
{
public static void Main(string[] args)
{
// Task Sink
// Bindd PULL socket to tcp://localhost:5558
// Collects results from workers via that socket
Console.WriteLine("====== SINK ======");
using (NetMQContext ctx = NetMQContext.Create())
{
//socket to receive messages on
using (var receiver = ctx.CreatePullSocket())
{
receiver.Bind("tcp://localhost:5558");
//wait for start of batch (see Ventilator.csproj Program.cs)
var startOfBatchTrigger = receiver.ReceiveString();
Console.WriteLine("Seen start of batch");
//Start our clock now
Stopwatch watch = new Stopwatch();
watch.Start();
for (int taskNumber = 0; taskNumber < 10000; taskNumber++)
{
//while (true)
//{
var workerDoneTrigger = receiver.ReceiveString();
Console.WriteLine(workerDoneTrigger);
//}
}
watch.Stop();
//Calculate and report duration of batch
Console.WriteLine();
Console.WriteLine("Total elapsed time {0} msec", watch.ElapsedMilliseconds);
Console.ReadLine();
}
}
}
}
}
再次提醒,Worker.exe 可以开多个,以提高效率