我正在尝试使用mosquitto接收数据,并使用python pandas将其保存为csv文件。在我停止脚本之前,数据是连续的。
mqtt\U发布。py
import paho.mqtt.client as mqtt
import random
import schedule
import time
mqttc = mqtt.Client("python_pub")
mqttc.connect("localhost", 1883)
def job():
mqttc.publish("hello/world", random.randint(1, 10))
schedule.every(1).seconds.do(job)
while True:
schedule.run_pending()
time.sleep(1)
mqttc.loop(2)
mqtt_sub.py代码
import paho.mqtt.client as mqtt
import pandas as pd
def on_connect(client, userdata, rc):
print("Connected with result code "+str(rc))
client.subscribe("hello/world")
def on_message(client, userdata, msg):
datas = map(int, msg.payload)
for num in datas:
df = pd.DataFrame(data=datas, columns=['the_number'])
df.to_csv("testing.csv")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_forever()
从上面的脚本中,我得到了测试。csv
| the _number
0 | 2
2是我在停止脚本之前收到的最后一个数字
Connected with result code 0
[3]
[9]
[5]
[3]
[7]
[2]
...
...
KeyboardInterrupt
我希望得到
测试。像这样的csv
| the_number
0 | 3
1 | 9
2 | 5
...
...
5 | 2
为了实现这一点,我尝试将以下
df=pd. DataFrame(data=datas,列=['the_number'])
更改为df=pd. DataFrame(data=num,列=['the_number'])
并发生以下错误
pandas.core.common.PandasError: DataFrame constructor not properly called!
有人知道如何解决这个错误吗?我也觉得我在这里没有正确使用for循环。
谢谢你的建议和帮助。
[更新]
我在
on_message
方法中添加/更改以下行
def on_message(client, userdata, msg):
datas = map(int, msg.payload)
df = pd.DataFrame(data=datas, columns=['the_number'])
f = open("test.csv", 'a')
df.to_csv(f)
f.close()
在Nulljack的帮助下,我能够在我的CSV文件中得到这样的结果
| the_number
0 | 3
| the_number
0 | 9
| the_number
0 | 5
| the_number
0 | 3
| the_number
0 | 7
我的目标是在CSV文件中实现类似的功能
| the_number
0 | 3
1 | 9
2 | 5
3 | 3
4 | 7
共2个答案
匿名用户
我之前从未使用过mosquitto,如果我的理解有误,我道歉。
在我看来on_message方法在您的mqtt_sub.py是运行每次您的mqtt_pub.py发布消息(即每一秒)这将导致您的testing.csv文件被覆盖每次您发布消息
为了解决这个问题,我将在on\u connect方法中初始化一个数据帧,然后在on\u消息中通过df向数据帧添加新值。追加
至于你终止后给CSV写信,我不确定。
希望这有帮助
匿名用户
其他帖子很拥挤,所以我把我的回复移到这里
尝试使用下面的代码
import paho.mqtt.client as mqtt
import pandas as pd
# Move df here
df = pd.DataFrame(columns=['the_number'])
def on_connect(client, userdata, rc):
print("Connected with result code "+str(rc))
client.subscribe("hello/world")
def on_message(client, userdata, msg):
datas = map(int, msg.payload)
# this adds the data to the dataframe at the correct index
df.iloc[df.size] = datas
# I reverted this line back to what you originally had
# This will overwrite the testing.csv file every time your subscriber
# receives a message, but since the dataframe is formatted like you want
# it shouldn't matter
df.to_csv("testing.csv")
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("localhost", 1883, 60)
client.loop_forever()
相关问题
- 玩笑未解决的promise不会失败
- 包含订阅的方法被多次调用,我应该每次取消订阅旧订阅吗?
- 由于订阅问题,AngularAPI被多次调用
- 如何编写一个函数,使超文本传输协议请求并返回请求的结果?
- “this”在ngOnDestroy中未定义
- 在视图中使用@input可观察,但组件中的管道函数被忽略
- Angular 4每次组件加载时重新加载函数
- 在Angular Service的OnDestroy钩子中取消订阅HTTP客户端调用
- 组件被销毁和重访后,订阅在ngOnInit函数中运行
- 在订阅回调函数中取消订阅?
- 任何需要先调用取消订阅RxJS()
- 我们需要在Angular中取消订阅超文本传输协议调用吗?[重复]
- 为什么C函数可以创建可变长度的数组?
- x86-64上C中的所有函数都需要堆栈帧吗?
- 为什么函数激活记录中需要动态链接?(静态作用域语言)
- 调用栈到底是如何工作的?
- GliBC scanf从不对齐RSP的函数调用时出现分割错误
- 具有隐式转换函数的三路运算符<=>返回结构
- 部分排序时,成员函数模板的原始类型是什么
- 我怎么能有Teamcity艺术家插件调用bootJar而不是JarGradle?