这个问题困扰了我估计有一个星期问题,而问题的一开始我没有任何头绪,问题的表现就是调用节点去请求任务时,时常报错:

 

报错信息诸如:

Protocol Error: , b'\x00\x00\x00\x00\x00\x00\x00\x00\x00*3'
Error while reading from socket: (9, 'Bad file descriptor')
'int' object has no attribute 'decode'
name 'self' is not defined
only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context
Protocol Error: ", b'status": "SUCCESS", "result": false, "traceback": null, "children": [], "task_id": "1cbba409-b48a-49a6-b2e3-a5d6d203fc6d"}'

我的celery backend 为redis,一开始,我的排错心酸历程如下:

1.可能是redis所在的服务器出口网速不足,导致客户端从redis取数据延迟,于是调增了系统tcp最大并发数

2.可能是节点服务器性能与网速不足,导致结果写入redis有延迟或错误

于是我花了2000多升级了带宽与内存,发现然并卵

3.可能是线程锁的问题,然并卵

4.请求的网址越慢出错率越高,让我深信为延迟导致。

5.可能是redis中存储的数据格式字节数太多,于是修改“yaml”

app = Celery('finger_module_celery', broker=brokers, backend=backend,task_serializer='yaml')

6.去redis里看了下,存储的结果并没有问题,所以还是从redis取数据过程出现了问题,

于是我打印详细点:

基本上判定,只要取结果出现了错误,推送任务就会出错,并且很多时候单线程也会出错。

处于对celery的信任,我没有怀疑celery中get()方法有问题。

前几天想了下,是不是该用个笨方法:把取数据时间延迟一点,确保能取到数据,处于对技术的完美追求,觉得这个方法有点侮辱自己的代码,没有试,昨天晚上还是决定尝试一下。

代码如下:

def Celery_get(function,list,queue):
    '''
    :param function: 分布式需要推送的函数
    :param list: 函数的args
    :param queue: celery服务端中的任务队列
    :return: 节点处理完的数据
    '''
    time = 0.1
    try:
        res = function.apply_async(args=list, queue=queue)
    except Exception as e:
        print('推送任务错误:', list, e)
        return False
    try:
        while True:
            if res.ready():
                return res.get()
            else:
                sleep(time)
    except Exception as e:
        print('取结果错误:',e)
        return False

单独定义一个推送任务与取结果的函数,用延时来让直到ready()为真时才取结果,如果不行再延迟。

经过测试发现,貌似就差这么0.1秒,完美运行无瑕疵,没有再报错。

2018年12月12日 更新

今天偶然发现文章开头提到的报错,在部分情况下也是会受带宽影响。

如果任务量比较大,分布式节点使用外网链接,一定要观察rabbitmq 服务器的带宽使用情况,是否已经达到了出口带宽瓶颈

如下图:3M左右出口带宽的机器,在celery使用外网的时候,txkb/s 维持在800多,已经达到了瓶颈;在修改成内网链接后 网络吞吐达到2790多txkb/s,报错也少了很多,但还会有,所以带宽问题虽然不是根本问题,但也是可以优化的一个维度。

实时查看流量命令: sar -n DEV 1 100