rabbitmq

rabbitmq

python rabbitmq 连接时无法正常发送和接受消息

python李魔佛 发表了文章 • 0 个评论 • 2383 次浏览 • 2021-06-28 19:44 • 来自相关话题

用的是有密码的连接:auth = pika.PlainCredentials(user,password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host,port,'/',auth))使用上面的这个连接方式,消费者一只等待生产者生产数据,而生产数据者发出消息后,也无法正常发给消费者。
而在页面中其实是可以看到有对应的消息的。
 
后面发行上面的连接方式是由问题的,在于'/' 参数问题,因为默认参数的位置关系,‘/’ 并不是赋值给了virtual_host , 而是另外的参数。 所以解决问题的方法就是把每个参数的形参也写上去:auth = pika.PlainCredentials(user,password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host='/',credentials=auth)) 
PS: 后面经过实际调试,原理是git的自带终端窗口的问题,用cmd命令行下面就没有这个问题。 查看全部
用的是有密码的连接:
auth = pika.PlainCredentials(user,password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host,port,'/',auth))
使用上面的这个连接方式,消费者一只等待生产者生产数据,而生产数据者发出消息后,也无法正常发给消费者。
而在页面中其实是可以看到有对应的消息的。
 
后面发行上面的连接方式是由问题的,在于'/' 参数问题,因为默认参数的位置关系,‘/’ 并不是赋值给了virtual_host , 而是另外的参数。 所以解决问题的方法就是把每个参数的形参也写上去:
auth = pika.PlainCredentials(user,password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host='/',credentials=auth))
 
PS: 后面经过实际调试,原理是git的自带终端窗口的问题,用cmd命令行下面就没有这个问题。

scrapy rabbitmq 分布式爬虫

python爬虫李魔佛 发表了文章 • 0 个评论 • 6237 次浏览 • 2019-07-17 16:59 • 来自相关话题

对于没接触过rabbitmq的同学,可以看这个文章:https://blog.csdn.net/hellozpc/article/details/81436980
rabbitmq是个不错的消息队列服务,可以配合scrapy作为消息队列.
 
下面是一个简单的demo:import re
import requests
import scrapy
from scrapy import Request
from rabbit_spider import settings
from scrapy.log import logger
import json
from rabbit_spider.items import RabbitSpiderItem
import datetime
from scrapy.selector import Selector
import pika

# from scrapy_rabbitmq.spiders import RabbitMQMixin
# from scrapy.contrib.spiders import CrawlSpider

class Website(scrapy.Spider):
name = "rabbit"

def start_requests(self):
headers = {'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'Host': '36kr.com',
'Referer': 'https://36kr.com/information/web_news',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36'
}

url = 'https://36kr.com/information/web_news'


yield Request(url=url,
headers=headers)

def parse(self, response):


credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101', 5672, '/', credentials))

channel = connection.channel()
channel.exchange_declare(exchange='direct_log', exchange_type='direct')

result = channel.queue_declare(exclusive=True, queue='')

queue_name = result.method.queue

# print(queue_name)
# infos = sys.argv[1:] if len(sys.argv)>1 else ['info']
info = 'info'

# 绑定多个值

channel.queue_bind(
exchange='direct_log',
routing_key=info,
queue=queue_name
)
print('start to receive [{}]'.format(info))

channel.basic_consume(
on_message_callback=self.callback_func,
queue=queue_name,
auto_ack=True,
)

channel.start_consuming()


def callback_func(self, ch, method, properties, body):
print(body)
 启动spider:from scrapy import cmdline
cmdline.execute('scrapy crawl rabbit'.split())
 然后往rabbitmq里面推送数据:import pika
import settings

credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101',5672,'/',credentials))

channel = connection.channel()
channel.exchange_declare(exchange='direct_log',exchange_type='direct') # fanout 就是组播

routing_key = 'info'
message='https://36kr.com/pp/api/aggregation-entity?type=web_latest_article&b_id=59499&per_page=30'
channel.basic_publish(
exchange='direct_log',
routing_key=routing_key,
body=message
)

print('sending message {}'.format(message))
connection.close()
 
推送数据后,scrapy会马上接受到队里里面的数据。
注意不能在start_requests里面写等待队列的命令,因为start_requests函数需要返回一个生成器,否则程序会报错。
 
待续。。。
###### 2019-08-29 更新 ################### 
发现一个坑,就是rabbitMQ在接受到数据后,无法在回调函数里面使用yield生成器。
  查看全部
对于没接触过rabbitmq的同学,可以看这个文章:https://blog.csdn.net/hellozpc/article/details/81436980
rabbitmq是个不错的消息队列服务,可以配合scrapy作为消息队列.
 
下面是一个简单的demo:
import re
import requests
import scrapy
from scrapy import Request
from rabbit_spider import settings
from scrapy.log import logger
import json
from rabbit_spider.items import RabbitSpiderItem
import datetime
from scrapy.selector import Selector
import pika

# from scrapy_rabbitmq.spiders import RabbitMQMixin
# from scrapy.contrib.spiders import CrawlSpider

class Website(scrapy.Spider):
name = "rabbit"

def start_requests(self):
headers = {'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'Host': '36kr.com',
'Referer': 'https://36kr.com/information/web_news',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36'
}

url = 'https://36kr.com/information/web_news'


yield Request(url=url,
headers=headers)

def parse(self, response):


credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101', 5672, '/', credentials))

channel = connection.channel()
channel.exchange_declare(exchange='direct_log', exchange_type='direct')

result = channel.queue_declare(exclusive=True, queue='')

queue_name = result.method.queue

# print(queue_name)
# infos = sys.argv[1:] if len(sys.argv)>1 else ['info']
info = 'info'

# 绑定多个值

channel.queue_bind(
exchange='direct_log',
routing_key=info,
queue=queue_name
)
print('start to receive [{}]'.format(info))

channel.basic_consume(
on_message_callback=self.callback_func,
queue=queue_name,
auto_ack=True,
)

channel.start_consuming()


def callback_func(self, ch, method, properties, body):
print(body)

 启动spider:
from scrapy import cmdline
cmdline.execute('scrapy crawl rabbit'.split())

 然后往rabbitmq里面推送数据:
import pika
import settings

credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101',5672,'/',credentials))

channel = connection.channel()
channel.exchange_declare(exchange='direct_log',exchange_type='direct') # fanout 就是组播

routing_key = 'info'
message='https://36kr.com/pp/api/aggregation-entity?type=web_latest_article&b_id=59499&per_page=30'
channel.basic_publish(
exchange='direct_log',
routing_key=routing_key,
body=message
)

print('sending message {}'.format(message))
connection.close()

 
推送数据后,scrapy会马上接受到队里里面的数据。
注意不能在start_requests里面写等待队列的命令,因为start_requests函数需要返回一个生成器,否则程序会报错。
 
待续。。。
###### 2019-08-29 更新 ################### 
发现一个坑,就是rabbitMQ在接受到数据后,无法在回调函数里面使用yield生成器。
 

exchange_declare() got an unexpected keyword argument 'type'

python李魔佛 发表了文章 • 0 个评论 • 3112 次浏览 • 2019-07-16 14:40 • 来自相关话题

In new version of pika, now it is using 
exchange_type instead of type
 
credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101',5672,'/',credentials))

channel = connection.channel()

channel.exchange_declare(exchange='logs',exchange_type='fanout') 查看全部
In new version of pika, now it is using 
exchange_type instead of type
 
	credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101',5672,'/',credentials))

channel = connection.channel()

channel.exchange_declare(exchange='logs',exchange_type='fanout')

python rabbitmq 连接时无法正常发送和接受消息

python李魔佛 发表了文章 • 0 个评论 • 2383 次浏览 • 2021-06-28 19:44 • 来自相关话题

用的是有密码的连接:auth = pika.PlainCredentials(user,password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host,port,'/',auth))使用上面的这个连接方式,消费者一只等待生产者生产数据,而生产数据者发出消息后,也无法正常发给消费者。
而在页面中其实是可以看到有对应的消息的。
 
后面发行上面的连接方式是由问题的,在于'/' 参数问题,因为默认参数的位置关系,‘/’ 并不是赋值给了virtual_host , 而是另外的参数。 所以解决问题的方法就是把每个参数的形参也写上去:auth = pika.PlainCredentials(user,password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host='/',credentials=auth)) 
PS: 后面经过实际调试,原理是git的自带终端窗口的问题,用cmd命令行下面就没有这个问题。 查看全部
用的是有密码的连接:
auth = pika.PlainCredentials(user,password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host,port,'/',auth))
使用上面的这个连接方式,消费者一只等待生产者生产数据,而生产数据者发出消息后,也无法正常发给消费者。
而在页面中其实是可以看到有对应的消息的。
 
后面发行上面的连接方式是由问题的,在于'/' 参数问题,因为默认参数的位置关系,‘/’ 并不是赋值给了virtual_host , 而是另外的参数。 所以解决问题的方法就是把每个参数的形参也写上去:
auth = pika.PlainCredentials(user,password)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=host,port=port,virtual_host='/',credentials=auth))
 
PS: 后面经过实际调试,原理是git的自带终端窗口的问题,用cmd命令行下面就没有这个问题。

scrapy rabbitmq 分布式爬虫

python爬虫李魔佛 发表了文章 • 0 个评论 • 6237 次浏览 • 2019-07-17 16:59 • 来自相关话题

对于没接触过rabbitmq的同学,可以看这个文章:https://blog.csdn.net/hellozpc/article/details/81436980
rabbitmq是个不错的消息队列服务,可以配合scrapy作为消息队列.
 
下面是一个简单的demo:import re
import requests
import scrapy
from scrapy import Request
from rabbit_spider import settings
from scrapy.log import logger
import json
from rabbit_spider.items import RabbitSpiderItem
import datetime
from scrapy.selector import Selector
import pika

# from scrapy_rabbitmq.spiders import RabbitMQMixin
# from scrapy.contrib.spiders import CrawlSpider

class Website(scrapy.Spider):
name = "rabbit"

def start_requests(self):
headers = {'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'Host': '36kr.com',
'Referer': 'https://36kr.com/information/web_news',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36'
}

url = 'https://36kr.com/information/web_news'


yield Request(url=url,
headers=headers)

def parse(self, response):


credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101', 5672, '/', credentials))

channel = connection.channel()
channel.exchange_declare(exchange='direct_log', exchange_type='direct')

result = channel.queue_declare(exclusive=True, queue='')

queue_name = result.method.queue

# print(queue_name)
# infos = sys.argv[1:] if len(sys.argv)>1 else ['info']
info = 'info'

# 绑定多个值

channel.queue_bind(
exchange='direct_log',
routing_key=info,
queue=queue_name
)
print('start to receive [{}]'.format(info))

channel.basic_consume(
on_message_callback=self.callback_func,
queue=queue_name,
auto_ack=True,
)

channel.start_consuming()


def callback_func(self, ch, method, properties, body):
print(body)
 启动spider:from scrapy import cmdline
cmdline.execute('scrapy crawl rabbit'.split())
 然后往rabbitmq里面推送数据:import pika
import settings

credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101',5672,'/',credentials))

channel = connection.channel()
channel.exchange_declare(exchange='direct_log',exchange_type='direct') # fanout 就是组播

routing_key = 'info'
message='https://36kr.com/pp/api/aggregation-entity?type=web_latest_article&b_id=59499&per_page=30'
channel.basic_publish(
exchange='direct_log',
routing_key=routing_key,
body=message
)

print('sending message {}'.format(message))
connection.close()
 
推送数据后,scrapy会马上接受到队里里面的数据。
注意不能在start_requests里面写等待队列的命令,因为start_requests函数需要返回一个生成器,否则程序会报错。
 
待续。。。
###### 2019-08-29 更新 ################### 
发现一个坑,就是rabbitMQ在接受到数据后,无法在回调函数里面使用yield生成器。
  查看全部
对于没接触过rabbitmq的同学,可以看这个文章:https://blog.csdn.net/hellozpc/article/details/81436980
rabbitmq是个不错的消息队列服务,可以配合scrapy作为消息队列.
 
下面是一个简单的demo:
import re
import requests
import scrapy
from scrapy import Request
from rabbit_spider import settings
from scrapy.log import logger
import json
from rabbit_spider.items import RabbitSpiderItem
import datetime
from scrapy.selector import Selector
import pika

# from scrapy_rabbitmq.spiders import RabbitMQMixin
# from scrapy.contrib.spiders import CrawlSpider

class Website(scrapy.Spider):
name = "rabbit"

def start_requests(self):
headers = {'Accept': '*/*',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7',
'Host': '36kr.com',
'Referer': 'https://36kr.com/information/web_news',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.108 Safari/537.36'
}

url = 'https://36kr.com/information/web_news'


yield Request(url=url,
headers=headers)

def parse(self, response):


credentials = pika.PlainCredentials('admin', 'admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101', 5672, '/', credentials))

channel = connection.channel()
channel.exchange_declare(exchange='direct_log', exchange_type='direct')

result = channel.queue_declare(exclusive=True, queue='')

queue_name = result.method.queue

# print(queue_name)
# infos = sys.argv[1:] if len(sys.argv)>1 else ['info']
info = 'info'

# 绑定多个值

channel.queue_bind(
exchange='direct_log',
routing_key=info,
queue=queue_name
)
print('start to receive [{}]'.format(info))

channel.basic_consume(
on_message_callback=self.callback_func,
queue=queue_name,
auto_ack=True,
)

channel.start_consuming()


def callback_func(self, ch, method, properties, body):
print(body)

 启动spider:
from scrapy import cmdline
cmdline.execute('scrapy crawl rabbit'.split())

 然后往rabbitmq里面推送数据:
import pika
import settings

credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101',5672,'/',credentials))

channel = connection.channel()
channel.exchange_declare(exchange='direct_log',exchange_type='direct') # fanout 就是组播

routing_key = 'info'
message='https://36kr.com/pp/api/aggregation-entity?type=web_latest_article&b_id=59499&per_page=30'
channel.basic_publish(
exchange='direct_log',
routing_key=routing_key,
body=message
)

print('sending message {}'.format(message))
connection.close()

 
推送数据后,scrapy会马上接受到队里里面的数据。
注意不能在start_requests里面写等待队列的命令,因为start_requests函数需要返回一个生成器,否则程序会报错。
 
待续。。。
###### 2019-08-29 更新 ################### 
发现一个坑,就是rabbitMQ在接受到数据后,无法在回调函数里面使用yield生成器。
 

exchange_declare() got an unexpected keyword argument 'type'

python李魔佛 发表了文章 • 0 个评论 • 3112 次浏览 • 2019-07-16 14:40 • 来自相关话题

In new version of pika, now it is using 
exchange_type instead of type
 
credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101',5672,'/',credentials))

channel = connection.channel()

channel.exchange_declare(exchange='logs',exchange_type='fanout') 查看全部
In new version of pika, now it is using 
exchange_type instead of type
 
	credentials = pika.PlainCredentials('admin','admin')
connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.1.101',5672,'/',credentials))

channel = connection.channel()

channel.exchange_declare(exchange='logs',exchange_type='fanout')