Lua场景编程示例
乐白机器人Lua场景编程示例
场景任务
被调用的执行场景
-- 执行运动
current_tcp_pose = get_target_tcp_pose() --当前末端坐标
print('current_tcp_pose',current_tcp_pose)
offset_position = {0, 0, 0.1, 0, 0, 0} --相对与末端tcp的位置偏移量,沿着末端tcp坐标系,z轴往前移动0.1m
calculate_location = pose_times(current_tcp_pose, offset_position) --计算的新位置
print('calculate_location',calculate_location)
movej(calculate_location, 1, 0.5, 0, 0)
场景传参解析方式
-- name: asdd.lua
-- author: Lebai
-- version: 1.4
C_POSE = {}
J_POSE = {}
--[[
使用lua API: scene(scene_id, params, dir) 或者 start_task(scene_id, params, dir, is_parallel, loop_to)调用执行场景
或者使用 SDK start_task(scene_id, params, dir, is_parallel, loop_to)调用场景时。
如果传入params参数(字符串数组),因为可能是字典或者数组形式的字符串,需要对参数进行解析,在lua语境下引用使用
然后根据传参内容和约定好的协议,进行逻辑判断执行相应逻辑
]]--
local params =... --table 类型 {"{'x': 0.297070,'y': 0.019994,'z': 0.385210,'rx': -1.694321,'ry': -0.116908,'rz': -0.303152} "}
-- {"[0.297070, 0.019994,0.385210,-0.303152, -0.116908,-1.694321] "}
print("params: ", params)
print('打印参数个数',#params) --打印参数个数
print('打印数组第一个参数类型',type(params[1]))
local json = require("json")
-- python sdk 传字典形式笛卡尔坐标和数组形式关节角坐标 转 lua形式坐标位置 ,解析字符串成lua可用位置形式
if params and next(params) then
for i=1,#params do
----如果传过来得 位置 是字典样式 {'x': 0.297070,'y': 0.019994,'z': 0.385210,'rx': -1.694321,'ry': -0.116908,'rz': -0.303152} 如下处理
-- cpose = json.decode(params[i])
-- print(cpose)
-- lua_cpose={cpose.x,cpose.y,cpose.z,cpose.rz,cpose.ry,cpose.rx}
-- print(lua_cpose)
-- C_POSE[i]= lua_cpose
-- --如果 位置 形式是列表样式 [0.297070, 0.019994,0.385210,-0.303152, -0.116908,-1.694321] 依次序代表
jpose = json.decode(params[i])
lua_jpose = {j1=jpose[1],j2=jpose[2],j3=jpose[3],j4=jpose[4],j5=jpose[5],j6=jpose[6]}
J_POSE[i]= lua_jpose
end
end
-- 除了json 解析方式,还有其他方式解析字符串
local str = "{x = 1}"
local func, err = load("return " .. str)
if not func then
error("Failed to load string: " .. err)
end
local tbl = func()
print(tbl.x)
-- 将字符串"{1,2}" 转换为 Lua 中的 table
local str = "{1,2}"
local table_result = load("return " .. str)()
for key, value in pairs(table_result) do
print(key, value)
end
-- 字符串截取
str = "{'a':2}"
local startIndex = string.find(str, "{") -- 查找字符 '{' 的索引位置
local endIndex = string.find(str, "}") -- 查找字符 '{' 的索引位置
if startIndex then
local substring = string.sub(str, startIndex + 1,endIndex-1) -- 截取
print(substring) -- 输出截取的字符串
end
场景接口调用
-- 场景调用
-- 用lua API接口 scene ,在场景A中调用场景B(示例中假定场景B的id号为 10216)
--场景A的内容
print(scene(10216,{'{1, 2}', 3, 4}))-- 打印场景返回值 4, 5,6
print(var_test) -- 打印全局变量
--场景B内容
local params = ... --赋值传入的参数 {'{1, 2}', 3, 4} 给params 类型table,其中第一个参数 '{1, 2}'是字符串类型
print("params: ", params)
print('打印参数类型',type(params[1]))
local json = require("json")
-- 解析字符串成lua可用位置形式
if params and next(params) then
for i=1,#params do
param = params[i]
if i ==1 then
print('打印参数1类型',type(params[1])) --第一个参数是字符串类型
table_data = load("return " .. param)() -- 转化成table类型
print('打印参数1类型',type(table_data))
print(table_data)
else
print(param)
end
end
end
-- 设置全局变量 ,在A场景中打印,能打印出来
var_test = {
name = "小吴",
age = 16
}
return 4, 5, 6
定时调用某个任务场景
J_POSES = {}
C_POSES = {}
C_POSES[10007] = {-0.003969, 0.425042, 0.126468, 2.738853, 0.046511, 1.507116}
C_POSES["奉客位置"] = {-0.003969, 0.425042, 0.126468, 2.738853, 0.046511, 1.507116}
J_POSES[10007] = {j1 = 4.624948, j2 = -1.094781, j3 = 2.504036, j4 = -1.476085, j5 = 1.882478, j6 = -0.067120}
J_POSES["奉客位置"] = {j1 = 4.624948, j2 = -1.094781, j3 = 2.504036, j4 = -1.476085, j5 = 1.882478, j6 = -0.067120}
C_POSES[10006] = {-0.003969, 0.425042, 0.126468, 2.738853, 0.046511, 1.507116}
C_POSES["2023-02-08 10:55:14 AM"] = {-0.003969, 0.425042, 0.126468, 2.738853, 0.046511, 1.507116}
J_POSES[10006] = {j1 = 4.624948, j2 = -1.094781, j3 = 2.504036, j4 = -1.476085, j5 = 1.882478, j6 = -0.067120}
J_POSES["2023-02-08 10:55:14 AM"] = {j1 = 4.624948, j2 = -1.094781, j3 = 2.504036, j4 = -1.476085, j5 = 1.882478, j6 = -0.067120}
local date = os.date("*t", os.time());
--结果为date = {year = 2017, month = 8, day = 22, yday = 234, wday = 3,hour = 10, min = 46, sec = 52, isdst = false}
-- name: 定时场景.lua
-- author: Lebai
-- version: 1.4
C_POSE = {}
J_POSE = {}
execution_scene_flag=false --是否执行指定的场景A 8:00-21:00
execution_exscene_flag=false --是否执行伸展的场景B 22:00-8:00
exscene_executed_tags=0 --标记是否执行过伸展场景B 标记执行过一次
while true do
wait(100)
local date = os.date("*t", os.time())
if date['hour']>=8 and date['hour']<=21 then
execution_scene_flag=true
exscene_executed_tags=0
elseif (string.find(os.date(), " 22:")) then -- 每天22点结束
execution_scene_flag=false
if exscene_executed_tags == 0 then
execution_exscene_flag=true
else
execution_exscene_flag=false
end
end
if execution_scene_flag==true then
-- 执行场景
print('运行场景')
scene(10103) --场景号根据情况修改,这里阻塞串行执行
end
if execution_exscene_flag==true then
print('运行场景')
scene(10103) -- 执行场景 这里阻塞串行执行
exscene_executed_tags=1
end
end
开机自启任务
-- 目的
-- 实现开机时自启一个场景,启动一个主场景,循环,实时检测输入信号:可以是485 232 TTL串口数据,
-- 也可以是DI输入(DI输入有控制箱 法兰盘 带的DI,以及肩部灯板和法兰按钮按下时的信号输入)
-- 通过检测到相应信号,调取执行相应动作
-- 具体实现方式如下
--新建一个场景A 设置为默认程序(开机自启),内容如下:
start_task('10026', nil, nil, true, 1)
-- 注意:这里调用的'10026'是主场景,有一个无限循环的逻辑,时刻检测输入信号,做出相应动作
-- 第四个参数为true,表示并行执行,这个主任务只做逻辑判断,不执行具体动作指令,不阻塞,动作逻辑和任务,通过再次调用子任务实现
-- 主任务场景 10026 内容如下:
--重新定义底层函数on_robot_stop,急停时只取消任务,不急停下电
function on_robot_stop(is_estop)
print("estop 当前TASK id:", task_id)
cancel_task(task_id)
end
--物理串口通讯读取
local com = serial.open("/dev/ttyS1") --- 使用机箱的RS485串口号,
com:set_timeout(200)
com:set_baud_rate(9600)
function Com_read_data()
local success, rst = pcall(function() return com:read(1) end)
if success then
if rst[1] == 0xAA then
-- 根据串口协议进行读取,这里做了简单处理,发现字节头,读取一定长度
local success, rst = pcall(function() return com:read(11) end)
if success then
table.insert(rst, 1,0xAA )
return success, rst
end
end
end
return false,nil
end
-- CRC16计算 (Modbus)
local function calc_crc(data)
local crc = 0xFFFF
for i = 1, #data do
crc = crc ~ data[i]
for _ = 1, 8 do
local j = crc & 1
crc = crc >> 1
if j == 1 then
crc = crc ~ 0xA001
end
end
end
return crc
end
-- 串口数据解析请求
function parse_request(request)
if #request < 12 then return nil end
-- 检查协议头
if request[1] ~= 0xAA or request[2] ~= 0x55 then return nil end
-- 检查从站ID
if request[3] ~= 0x01 then return nil end
-- 检查命令字
if request[5] ~= 0x37 and request[5] ~= 0x38 then return nil end
-- print('检查命令字')
-- 校验CRC
local crc_low = request[11]
local crc_high = request[12]
local crc_data = {}
for i=1,10 do
crc_data[i] = request[i]
end
local calc_crc_val = calc_crc(crc_data)
if crc_low ~= (calc_crc_val & 0xFF) or crc_high ~= ((calc_crc_val >> 8) & 0xFF) then
return nil
end
-- 返回面篓号
return request[5], request[6] --命令字,面篓号(0-3)
end
--无限循环逻辑
while true do
-- 处理通讯请求和反馈状态
local success, request = Com_read_data()
if success then
if request and #request == 12 then
local cmd_id, basket_id = parse_request(request)
print("com接收 :",request)
if basket_id ~= nil then
response = {0x55, 0xAA}
print("com发送<:", response)
local response_success, response_rst = pcall(function() return com:write(response) end)
if response_success then
print("com发送成功")
else
print("com发送失败")
end
end
end
end
if get_di(0) == 1 then --收到来自控制箱上的di信号
while true do
wait(10) --等待100ms
if get_di(0) == 0 then --判断di信号是否已经变为0
print("收到来自控制箱上的di信号")
break
end
end
if robot_state == 'ESTOP' or robot_state == 'IDLE' or robot_state == 'TEACHING' then
if robot_state == 'ESTOP' then
start_sys()
sleep(2000) -- 等待系统启动完成
end
state = get_task_state(task_id) --获取任务状态 任务执行结束 不在执行了
if state ~= 'WAIT' and state ~= 'RUNNING' and state ~= 'PAUSE'
then
if robot_state == 'TEACHING' then
end_teach_mode()
end
-- 开始一个任务 场景10024 根据情况修改
task_id = start_task('10024', nil, nil, false, 1) -- 串行执行的任务
print(string.format('Robot start task, task_id: %s', task_id))
end
end
end
local button_type = "SHOULDER" --"FLANGE_BTN" 为末端平按钮 "SHOULDER" 为肩部灯按钮控制
if lebai:get_di(button_type, 0)==1 then
while lebai:get_di(button_type, 0)==1 do
wait(20)
end
task_id = get_main_task_id()
cancel_task(task_id) -- 停止任务
end
sleep(10) -- 每次循环等待一段时间以减少CPU使用率
end
任务接口调用
-- 任务调用
--用lua API接口start_task,在场景A中调用场景B(场景id号10216)
--在lua场景中执行,注意start_task接口的第四个参数,必须true(并行执行),因为正在执行的场景本身就是个串行任务,会阻塞程序,死循环
--另外这里不像scene返回场景B的返回值
--注意 :
-- 调用接口的场景A 和 被调用的场景B ,因为并行执行 ,两个不能同时有运动执行命令,会导致动作执行混乱
-- start_task 相当于另起一个并行任务,同时,scene的场景调用相当于调用的一个函数,串行在调用的场景下执行
--场景A的内容
task_id = start_task(10216,{'{1, 2}', 3, 4}, "", true, 1)
print(var_test) -- 打印全局变量
--场景B内容
local params = ... --赋值传入的参数 {'{1, 2}', 3, 4} 给params 类型table,其中第一个参数 '{1, 2}'是字符串类型
print("params: ", params)
print('打印参数类型',type(params[1]))
local json = require("json")
-- 解析字符串 详见场景传参解析方式案例
if params and next(params) then
for i=1,#params do
param = params[i]
if i ==1 then
print('打印参数类型',type(params[1])) --第一个参数是字符串类型
table_data = load("return " .. param)() -- 转化成table类型
print('打印参数类型',type(table_data))
print(table_data)
else
print(param)
end
end
end
-- 设置全局变量 ,在A场景中打印,能打印出来
var_test = {
name = "小吴",
age = 16
}
return 4, 5, 6
串口通讯
串口通讯交互控制
--[[
启动一个lua场景,循环读取串口信息,
当读到不同的数据,对符合协议的数据,进行解析
根据协议约定,收到不同数据,执行不同运动逻辑或者其他任务
并根据协议,进行数据回复
--]]
-- 串口通讯串口数据交互控制案例
com1 = serial.open("/dev/ttyS1")
com1:set_timeout(200)
com1:set_baud_rate(9600)
-- 将字节流转换为16进制表示
function table2Hex(s)
rst = ''
for i = 1, #s do
rst = rst .. string.format('0x%02X ', s[i])
-- print(s[i])
end
return rst
end
-- 将字符串转换为16进制表示
function String2Hex(s)
local rst = ""
for i = 1, #s do
local byte = string.byte(s, i)
rst = rst .. string.format('0x%02X', byte)
if i < #s then
rst = rst .. ' ' -- 仅在字符间添加空格
end
end
return rst
end
--单字节无符号整数转有符号整数
function u2s_8(num)
if num > 127 then
num = -(256 - num)
end
return num
end
--单字节有符号整数转无符号整数
function s2u_8(num)
if num < 0 then
num = 256 + num
end
return num
end
--两个字节无符号整数转有符号整数
function usign2sign(num)
if num > 32767 then
num = -(65536 - num)
end
return num
end
--两个字节有符号整数转无符号整数
function sign2usign(num)
if num < 0 then
num = 65536 + num
end
return num
end
--- crc16_modbus 校验计算
-- data 待校验的数据(字符串或字节数组)
-- 返回 校验值
function crc16_modbus(data)
local crc = 0xFFFF -- 初始值
local poly = 0xA001 -- 多项式 (0x8005 的位反转形式)
-- 检查输入类型(字符串或 Table)
local byte
for i = 1, #data do
if type(data) == "string" then
byte = string.byte(data, i) -- 字符串模式:按字节取 ASCII 码
else
byte = data[i] -- Table 模式:直接取数值
end
crc=((crc ~ byte) & 0xFFFF) -- 异或当前字节
-- 处理 8 位
for _ = 1, 8 do
if (crc & 1) ~= 0 then
crc = ((crc >> 1) ~ poly)& 0xFFFF -- 右移并异或多项式
else
crc = crc >> 1 -- 仅右移
end
end
end
-- 返回低字节和高字节(小端模式)
local crc_low = crc & 0xFF
local crc_high = (crc >> 8) & 0xFF
--ret = ((crc_low << 8) | crc_high)
-- return ret
return crc_low, crc_high
end
-- 计算和校验
function calculateChecksum(data)
local sum = 0
for i = 1, #data do
sum = sum + data[i]
end
-- 取低8位作为校验和
return sum % 256
end
-- 计算异或校验值(XOR Checksum)
-- data 输入数据(字符串或 Table,如 "123" 或 {0x01, 0x02})
-- xor_checksum 异或校验值(1字节,0x00~0xFF)
function xor_checksum(data)
local checksum = 0 -- 初始值
-- 遍历每个字节
for i = 1, #data do
local byte
if type(data) == "string" then
byte = string.byte(data, i) -- 字符串模式:取 ASCII 码
else
byte = data[i] -- Table 模式:直接取数值
end
checksum = checksum ~ byte -- 异或运算
end
-- 返回 1 字节校验值(0x00~0xFF)
return checksum & 0xFF
end
function recv_data()
-- 读取串口,使用pcall捕获错误
success, result = pcall(function() return com1:read() end)
if success and #result>2 then
--这里加校验逻辑 以crc校验为例
local subTable = {}
for i = 1, #result-2 do
subTable[i] = result[i]
end
local crc_low, crc_high = crc16_modbus(subTable)
if result[#result-1] == crc_low and result[#result] ==crc_high then
print(table2Hex(subTable)) -- 打印HEX
local data = string.char(table.unpack(subTable))
return data
else
return nil
end
else
print("Error: ", result)
end
end
while true
do
data = recv_data()
if data == "123456"
then
movej({j1 = 0, j2 = 0, j3 = 0, j4 = 0, j5 = 0, j6 = 0}, 0.1, 0.1, 0, 0)
sync() -- 等待运动结束
set_claw(100,100)
local str = "finish1"
local crc_low, crc_high = crc16_modbus(str)
com1:write({string.byte(str, 1, #str),crc_low,crc_high}) -- 发送字符串
elseif data == "234567"
then
scene(10001) -- 执行场景
set_claw(100,0)
local str = "finish2"
local crc_low, crc_high = crc16_modbus(str)
com1:write({string.byte(str, 1, #str),crc_low,crc_high}) -- 发送字符串
else
--com1:write({0xAA,0x55, 0x01, 0x08, 0xFF ,0x00, 0x00 ,0x00, 0x00, 0x00 ,0xC9 ,0xF5 })
print("unknown data:", data)
end
end
串口控制甄小非咖啡机
-- 串口通讯制冰机协议案例
--crc校验函数 参数为数据字符串
function Crc16(buf)
local init = 0xFFFF
local poly = 0xA001
local ret = init
local byte=0
for j=1,#buf,1 do
byte = string.byte(buf,j)
ret=((ret ~ byte) & 0xFFFF)
for i=1,8,1 do
if((ret & 0x0001)>0) then
ret = (ret >> 1)
ret = ((ret ~ poly) & 0xFFFF)
else
ret= (ret >> 1)
end
end
end
local hi = ((ret >> 8) & 0xFF)
local lo = (ret & 0xFF)
ret = ((lo << 8) | hi)
return ret
end
local com1 = serial.open("/dev/ttyS0") ---串口选择232的port
com1:set_timeout(300) --读取超时时间
com1:set_baud_rate(9600) --设置波特率
com1:set_parity("None") --None: 无奇偶校验
--发送命令到串口的函数
function Send_Cmd_to_com(cmd)
-- print('cmd :',cmd)
com1:write(cmd) -- 发送相应命令
--循环读取直到读到数据校验完成返回数据,或者读取十次也未读到则终止返回false
for i = 1,3 do
wait(100)
-- 读取串口,使用pcall捕获错误
success, result = pcall(function() return com1:read() end)
-- print(success)
if success==true and result~=nil then
-- print('receive:',result) -- 打印HEX
crc_response = Crc16(string.char(table.unpack(result,1,#result-2)))
-- print('crc_response:',crc_response)
-- print( crc_response>>8 ,crc_response&0xFF)
if result[#result-1]==crc_response>>8 and result[#result]==crc_response&0xFF then
return result
end
end
end
return false
end
--出冰函数,参数为出冰时间,单位为0.1秒,例如输入40,则出冰4秒 返回true则出冰成功,返回false则出冰失败,返回其他则是发送命令或者通讯硬件问题
function Make_ice(out_ice_time)
if out_ice_time==nil then
out_ice_time=10
end
makeice_cmd = {0xA5,0x5A,0x01,0x02}
out_time= math.ceil(out_ice_time) --时间取整
table.insert(makeice_cmd, out_time&0xff00)
table.insert(makeice_cmd, out_time&0xff)
crc_result = Crc16(string.char(table.unpack(makeice_cmd))) --获得校验码
table.insert(makeice_cmd, crc_result>>8)
table.insert(makeice_cmd, crc_result&0xFF)
--发送出冰命令
make_ice_result= Send_Cmd_to_com(makeice_cmd)
if make_ice_result ~=false then
--正常命令的出冰返回结果
if #make_ice_result==8 then
status_code=table.unpack(make_ice_result,5,5)
if status_code==0 then --出冰成功
return true
else --出冰失败
return false
end
--错误命令导致的返回结果
elseif #make_ice_result==7 then
abnormal_code= table.unpack(make_ice_result,5,5)
if abnormal_code==0 then --位置错误导致出冰失败
return '未定义错误'
elseif abnormal_code==2 then --发送命令问题导致出冰失败
return '无法识别命令'
elseif abnormal_code==3 then --发送命令校验错误导致出冰失败
return 'CRC校验出错'
elseif abnormal_code==4 then --系统忙题导致出冰失败
return '系统忙'
end
end
else
--通讯故障
print('发送出冰命令时,与制冰机间的通讯异常')
return '通讯异常' --返回false 发送命令无返回值
end
end
--读取制冰机状态
function Read_machine_status()
read_status_cmd = {0xA5,0x5A,0x01,0x01,0x00,0x00,0x10,0xDF}
--发送读取状态命令
read_status_result= Send_Cmd_to_com(read_status_cmd)
--正常命令返回状态
if read_status_result ~=false then
if #read_status_result==8 then
machine_status_code= table.unpack(read_status_result,5,5)
error_code = table.unpack(read_status_result,6,6)
print('制冰机器状态码:', machine_status_code , ' 错误码:',error_code )
if machine_status_code==2 then
return '空闲待机'
elseif machine_status_code==3 then
return '出冰或出水中'
elseif machine_status_code==4 then
return '排冰'
elseif machine_status_code==5 then
if error_code==1 then
return '供水不足'
elseif error_code==2 then
return '制冰机内部温度过低'
elseif error_code==3 then
return '制冰机内部温度过高'
elseif error_code==4 then
return '压缩机温度过高'
elseif error_code==5 then
return '排气孔温度过低'
elseif error_code==6 then
return '电机电流过大'
elseif error_code==7 then
return '通讯异常'
elseif error_code==8 then
return '数据存储出错'
elseif error_code==9 then
return '电机堵转'
elseif error_code==10 then
return '电机警告'
end
elseif machine_status_code==6 then
return '清洗'
elseif machine_status_code==1 or machine_status_code==0 then
return '上电初始化中'
elseif machine_status_code==7 then
return '等待取冰'
end
--非正常命令返回状态
elseif #read_status_result==7 then
abnormal_code= table.unpack(read_status_result,5,5)
if abnormal_code==0 then
return '未定义错误'
elseif abnormal_code==2 then
return '无法识别命令'
elseif abnormal_code==3 then
return 'CRC校验出错'
elseif abnormal_code==4 then
return '系统忙'
end
end
--通讯故障无返回值
else
print('读取制冰机状态时,与制冰机的通讯异常' )
return '通讯异常'
end
return false
end
--检查冰桶状态
function Read_icebucket_status()
read_icebucket_cmd = {0xA5,0x5A,0x01,0x03,0x00,0x00,0xB1,0x1F}
--发送读取状态命令
read_icebucket_result= Send_Cmd_to_com(read_icebucket_cmd)
--正常命令返回状态
if read_icebucket_result~=false then
if #read_icebucket_result==8 then
icebucket_status_code= table.unpack(read_icebucket_result,5,5)
if icebucket_status_code==1 then
return '冰桶满'
else
return '冰未满'
end
--非正常命令返回状态
elseif #read_icebucket_result==7 then
icebucket_abnormal_code= table.unpack(read_icebucket_result,5,5)
if icebucket_abnormal_code==0 then
return '未定义错误'
elseif icebucket_abnormal_code==2 then
return '无法识别命令'
elseif icebucket_abnormal_code==3 then
return 'CRC校验出错'
elseif icebucket_abnormal_code==4 then
return '系统忙'
end
end
--通讯故障无返回值
else
print('读取冰桶状态时,与制冰机的通讯异常' )
return '通讯异常'
end
return false
end
print(Make_ice(10))
print(Read_machine_status())
print(Read_icebucket_status())
机箱modbus
-- 机箱Modbus案例
-- 使用机箱的Modbus/RTU
local com1 = serial.open("/dev/ttyS1")
com1:set_timeout(200)
com1:set_baud_rate(9600)
local mb = modbus.new_rtu(com1)
-- 配置Modbus
mb:set_timeout(500)
mb:set_slave(0x01)
-- 写入单线圈
success, result = pcall(function() mb:write_single_coil(0x0000, true) end)
if not success then
print("Error: "..tostring(result))
else
print("Write successful")
end
-- 写入多线圈
success, result = pcall(function()
mb:write_multiple_coils(0x0000, {true,true,true,true,true})
end)
if not success then
print("Error: ", result)
else
print("Write successful")
end
-- 读取多线圈
success, result = pcall(function() return mb:read_coils(0x0000, 5) end)
if not success then
print("Error: ", result)
else
print(result)
end
-- 写入单个保持寄存器
mb:write_single_register(0x0090, 0x0088)
-- 写入多个个保持寄存器
mb:write_multiple_registers(0x0090, {0x0088, 0x0088})
-- 读取多个保持寄存器
mb:read_holding_registers(0x0090, 2)
-- 读取多个输入寄存器
mb:read_input_registers(0x0080, 2)
modbus控制三碁伺服电机
-- 三碁伺服电机控制案例
C_POSE = {}
J_POSE = {}
disable_auto_sync() -- 关闭自动同步
--可以将读取到的字节转化为十六进制的数字表示
function String2Hex(s)
rst = ''
for i = 1, #s do
rst = rst .. string.format('0x%02X ', s[i])
end
return rst
end
function get_sign32(vx)
if (not vx) or (vx < 0x80000000) then
return vx
end
return vx - 0x100000000
end
--无符号2字节整型转化为有符号
function get_sign16(vx)
if (not vx) or (vx < 0x8000) then
return vx
end
return vx - 0x10000
end
--有符号4字节整型转化为无符号
function signed32_to_unsigned32(vx)
if not (vx<0) then
return vx
end
return vx + 0x100000000
end
--定义串口
com1 = serial.open("/dev/ttyS1")
com1:set_baud_rate(38400)
com1:set_parity("Even")
com1:set_timeout(200)
-- 配置Modbus
mb = modbus.new_rtu(com1)
mb:set_slave(0x01)
EI_status = 0
EI_address = 0x0000 --光电地址
function Servo_ON()
mb:write_multiple_registers(0x4230, {0, 1})
wait(20)
end
function Servo_OFF()
mb:write_multiple_registers(0x4230, {0, 0})
wait(20)
end
function Clear_EI()
mb:write_multiple_registers(EI_address, {0, 0})
wait(20)
end
--启动伺服时位置预置命令 #位置预置(清故障)
function Set_zero_EI()
Clear_EI()
mb:write_multiple_registers(EI_address, {0x0100>>16, 0x0100&0x0000FFFF})
wait(20)
end
--原点复位
function Find_zero_EI()
Clear_EI()
mb:write_multiple_registers(EI_address, {0x0400>>16, 0x0400&0x0000FFFF})
wait(20)
end
--移动命令
function Go_position_EI()
Clear_EI()
mb:write_multiple_registers(EI_address, {0x1>>16, 0x1&0x0000FFFF})
wait(20)
end
--EI2 = -OT,为0触发 ,EI4 = +OT 为0触发 ,EI5 = 原点LS检测 为1触发
-- 读取限位原点光电传感器状态
function Read_EI()
EI_status = mb:read_coils(0x0400, 5)
print(EI_status)
if EI_status ~= false then
local rst = {}
rst['home'] = not EI_status[5]
rst['OT-'] = not EI_status[2]
rst['OT+'] = not EI_status[4]
return rst
else
return false
end
end
--设置位置参数设置参数输入的是转化后的脉冲数
function Set_position( position_value )
position_value = signed32_to_unsigned32(position_value)
print(position_value)
print({position_value>>16, position_value&0x0000FFFF})
mb:write_multiple_registers(0x5102 , {position_value>>16, position_value&0x0000FFFF})
wait(20)
end
--读取位置 回复的是位置单位毫米
function Read_position()
local rst_position = mb:read_holding_registers(0x100c, 2)
wait(20)
if rst_position == false then
return false
end
local current_position = get_sign32((rst_position[1]<<16) + rst_position[2]) * 32 / 4000
print('Current position:' ,current_position)
return current_position
end
--设置速度参数
function Set_position_speed(speed_value)
mb:write_multiple_registers(0x5104 , {speed_value>>16, speed_value&0x0000FFFF})
wait(20)
end
--读取立即值速度
function Read_position_speed()
speed_rst = mb:read_holding_registers(0x5104, 2)
wait(20)
current_speed = get_sign32((speed_rst[1]<<16) + speed_rst[2] )
print('Current speed:' ,current_speed)
return current_speed
end
--读取当前速度
function Read_vel()
print(444)
local rst = mb:read_holding_registers(0x1000, 2)
wait(20)
print(777)
if rst == false then
return false
end
local current_vel = get_sign32((rst[1]<<16) + rst[2])
print('Current velocity:' , math.modf(current_vel))
return current_vel
end
--设置绝对位置模式
function Set_abs_mode()
mb:write_multiple_registers(0x5100, {0x00FF0000>> 16,0x00FF0000&0x0000FFFF})
wait(20)
end
--初始化回零点
function Init_servo()
if Read_vel() ~= 0 then
print('第七轴运动中, 不能复位.')
return false
end
Set_zero_EI()
Find_zero_EI()
Set_abs_mode()
while true do
wait(500)
if Read_vel() == 0 and math.abs(Read_position()) < 5 then
break
end
end
return true
end
-- pos单位:毫米mm
function Move(pos,move_speed)
if move_speed==nil then
move_speed=80000
end
-- if pos > 1540 or pos < -350 then
-- print('超出直线轴运动范围')
-- return false
-- end
if Read_vel() ~= 0 then
print('第七轴运动中, 不能接收移动指令.')
return false
end
pos_driver = math.modf(pos / 32 * 4000) --#设位置立即数, 4000unit = 9mm ,修正:4000unit = 32mm
print(pos_driver)
Set_position(pos_driver)
Set_position_speed(20000)
Go_position_EI()
while true do
if math.abs(Read_position() - pos) < 4 then
return true
else
wait(500)
end
end
end
末端法兰控制modbus夹爪
-- 法兰Modbus案例
-- 一般夹爪的控制直接调用lua API 或者SDK ,这里举例,使用法兰Modbus的控制方式
-- 方便理解在末端增加其他通讯模块,485线并在夹爪控制线上时,可使用类似接口控制增加的通讯模块
local mb = modbus.new_flange() --因为法兰的串口地址是固定的,这里不需要像机箱modbus实例化要配置串口
-- 配置Modbus
mb:set_timeout(600)
mb:set_slave(0x01)
mb:write_single_register(0x9c40, 100) --幅度控制
wait(100)
mb:write_single_register(0x9c41, 10) --力度控制
amplitude = mb:read_holding_registers(0x9c45, 1) --读取幅度
print(amplitude)
force = mb:read_holding_registers(0x9c46, 1) --读取力度
print(force)
--防止报错 使用pcall捕获错误
success, result = pcall(function() return mb:read_holding_registers(0x9c46, 1) end)
if not success then
print("Error: ", result)
else:
print(result)
end
modbus TCP示例
mb = modbus.new_tcp('192.168.4.85', 22) --机箱作为客户端主站,发送请求
-- 配置Modbus
mb:set_timeout(500)
mb:set_slave(0x01)
function recv_data()
-- 读取
success, result = pcall(function() return mb:read_holding_registers(0x0000, 2) end)
if success then
print('recieve data ',result)
local data = string.char(table.unpack(result))
return data
else
print("Error: ", result)
end
end
function send_data(address,data)
--收到任务后,执行后反馈
success, result = pcall(function() mb:write_multiple_registers(address, data) end)
if success then
return true
else
print("Error: ", result)
return false
end
end
while true
do
wait(50) --50ms读取一次
data = recv_data()
if(data == "1")
then
scene(10001) -- 执行场景
sync() -- 等待运动结束
local str = "finish 1"
local address = 0x0000
send_data(address, {string.byte(str, 1, #str)}) --执行反馈
elseif(data == "2")
then
scene(10001) -- 执行场景
sync() -- 等待运动结束
local str = "finish 2"
local address = 0x0000
send_data(address, {string.byte(str, 1, #str)}) --执行反馈
end
end
空间位置计算
相对点位计算
-- 相对位置点计算
-- 求相对于末端TCP坐标系的位置,在基座坐标系下的位置,进行相对于末端的运动
current_tcp_pose = get_target_tcp_pose() --当前末端坐标
print('current_tcp_pose',current_tcp_pose)
offset_position = {0, 0, 0.1, 0, 0, 0} --相对与末端tcp的位置偏移量,沿着末端tcp坐标系,z轴往前移动0.1m
calculate_location = pose_times(current_tcp_pose, offset_position) --计算的新位置
print('calculate_location',calculate_location)
movej(calculate_location, 1, 0.5, 0, 0)
--如果想让当前位置姿态,以某个坐标系,进行姿态变换。
----例如相对于基座坐标系,沿着基座z轴增加0.1
base = get_target_tcp_pose() --当前末端坐标
print('current_tcp_pose',base)
delta = {0, 0, 0.1,0,0, 0} --沿着frame坐标系z轴方向移动0.1m
frame ={0, 0, 0, 0, 0, 0} -- 基座坐标系 位姿偏移量方向,仅姿态部分有效。
pose = pose_add(base, delta, frame)
print('pose',pose)
movej(pose, 1, 0.5, 0, 0)
----例如,相对于绕y轴旋转45°,使z轴斜向上的新坐标系,沿着基座z轴增加0.1
base = get_target_tcp_pose() --当前末端坐标
print('current_tcp_pose',base)
delta = {0, 0, 0.1, 0, 0, 0} -- 沿着frame坐标系z轴方向移动0.1m
frame = {0, 0, 0, 0 , 0.78, 0} -- 绕y轴旋转45°,使z轴斜向上 的新坐标系
pose = pose_add(base, delta, frame)
print('pose',pose)
movej(pose, 1, 0.5, 0, 0)
运动控制
轨迹复现
--轨迹复现
name = "test" --将此次拖动轨迹命名为test
function on_robot_state(state)
if state == 11
then
-- 进入示教模式时,开始轨迹记录
start_record_trajectory(0.01)
end
if last_state == 11
then
-- 退出示教模式时,完成轨迹记录
end_record_trajectory(name)
-- 复现轨迹
move_trajectory(name)
end
last_state = state
end
while true
do
sleep(1000)
end
--上述场景运行中,按示教按钮开始录制,拖动形成轨迹,释放示教按钮结束录制
--新建场景,调用move_trajectory("test") 复现轨迹
画五角星
-- 画五角星
function clone(x) --克隆点位
return {x[1],x[2],x[3],x[4],x[5],x[6]}
end
local p = math.rad(36)/2
local vel = 0.06
local acc = 0.1
function 画☆(a, r)
local x = r * math.sin(p)
local y = r * math.cos(p)
print('x', x, 'y', y)
print('a', a)
movej(a, 0.4, 1, 0, 1)
wait(1000)
b = clone(a)
b[1] = a[1] - x
b[2] = a[2] - y
print('b', b)
movel(b, vel, acc, 0, 0)
c = clone(a)
c[1] = a[1] + r / 2
c[2] = a[2] - r / 2 * math.cos(math.rad(36))
print('c', c)
movel(c, vel, acc, 0, 0)
d = clone(a)
d[1] = a[1] - r / 2
d[2] = c[2]
print('d', d)
movel(d, vel, acc, 0, 0)
e = clone(a)
e[1] = a[1] + x
e[2] = a[2] - y
print('e', e)
movel(e, vel, acc, 0, 0)
movel(a, vel, acc, 0, 0)
end
-- a = get_actual_tcp_pose()
a = {-0.34, -0.12, 0.4, -1.57, 0, 0.25}
for i=0,2 do
a[3] = a[3] - 0.08
画☆(a, 0.1+i*0.02)
end
a[2] = a[2] + 0.2
a[3] = a[3] - 0.04
for i=0,2 do
a[3] = a[3] + 0.08
画☆(a, 0.1+i*0.02)
end
双机同步
-- 双机同步 让一个手臂跟随一个手臂进行同样的动作
disable_auto_sync()
lebai = lebai_sdk.connect("127.0.0.1", false) -- 本机
target = lebai_sdk.connect("192.168.4.100", false) -- 被控要做跟随动作的机器
target:start_sys() -- 使能被控机器
wait(3000)
-- 让被控机器运动到本机的当前位置
pose = (lebai:get_kin_data())["actual_joint_pose"]
target:movej(pose, 0.1, 0.1, 0, 0)
target:wait_move()
print("start")
last_time = timestamp() --上一次同步的毫秒时间戳
start_flag =0 -- 本机调用一个动作场景,开始标志
while true
do
wait(10)
now = timestamp()
used_time = now-last_time
last_time = now
-- 同步手臂位置
status = lebai:get_kin_data()
target:move_pvat(status["actual_joint_pose"], status["actual_joint_speed"], status["actual_joint_acc"], used_time/1000) --同步运动
-- 检测是否开始运动
if start_flag == 0 then
start_flag = 1
scene_id = 10289 -- 动作场景ID
start_task(scene_id, nil, '', true, 1) -- 本机手臂按场景设定开始运动,另一个手臂开始跟随
end
-- 这是一个无限循环逻辑,当本机执行完场景后,手动示教本机,另一个手臂也会跟随运动
-- 如果想停止,手动停止场景或者急停,或者第三方脚本停止
end
-- 实现多机器同时执行一个场景(所有设备的场景中都有这个要执行的场景id)
C_POSE = {}
J_POSE = {}
--禁用自动同步
disable_auto_sync()
-- IP地址列表 如果有新机器继续增加,没有的删除
local robot_ips = {
"127.0.0.1", -- 主控
"192.168.6.109" ,
"192.168.4.57", -- 被控机器1
"192.168.4.109"
}
-- 存储连接对象的表
local robots = {}
-- 批量创建连接
for i, ip in ipairs(robot_ips) do
success, result = pcall(function() return lebai_sdk.connect(ip, false) end)
if success then
robots[i] = result
print("成功连接: " .. ip)
robots[i]:start_sys()
else
print("连接失败: " .. ip)
end
end
wait(3000)
-- 让被控机器运动到本机的当前位置
local pose = (robots[1]:get_kin_data())["actual_joint_pose"]
for i =1,#robot_ips do
if robots[i] then
robots[i]:movej(pose, 0.5, 0.5, 0, 0)
end
end
for i =1,#robot_ips do
if robots[i] then
robots[i]:wait_move()
end
end
print("开始动作")
for i =1,#robot_ips do
if robots[i] then
robots[i]:start_task(10010, nil, '', true, 2)
end
end
print("已经全部启动")
IO控制
DI交互控制
-- IO和信号量控制
tasks = {}
-- 取消全部任务
function cancel_all_tasks()
for k,v in pairs(tasks) do
print(k,v)
cancel_task(v)
tasks[k] = nil
end
end
button_type = "SHOULDER" --"FLANGE_BTN" 为末端平按钮 "SHOULDER" 为肩部灯按钮控制
while true do
wait(100) -- 0.1s检测一次
if get_di(0) == 1 then
wait_di(0, 0, "=") -- 等待释放按键
cancel_all_tasks()
tasks["0"] = start_task("10160", {}, "", true, 1)
end
if get_di(1) == 1 then
wait_di(1, 0, "=") -- 等待释放按键
cancel_all_tasks()
tasks["1"] = start_task("10160", {}, "", true, 1)
end
if lebai:get_di(button_type, 0)==1 then
while lebai:get_di(button_type, 0)==1 do
wait(20)
end
cancel_all_tasks()
tasks["0"] = start_task("10160", {}, "", true, 1)
end
end
DI交互控制夹爪
set_auto('ARM_POWER' , true)
set_auto('ENABLE_JOINT' , true) --设置上电自启到空闲状态
-- init_claw(true)
claw_status ='close'
button_type = "SHOULDER" --"FLANGE_BTN" 为末端平按钮 "SHOULDER" 为肩部灯按钮控制
button_times = 0 -- 按钮按下的次数 连续按3次 夹爪打开或关闭
interval_flag = timestamp()
while true do
if timestamp()-interval_flag >2000 then
button_times = 0
end
if lebai:get_di(button_type, 0)==1 then
while lebai:get_di(button_type, 0)==1 do
wait(20)
end
button_times = button_times + 1
if button_times ==1 then
interval_flag = timestamp()
elseif button_times == 3 then
button_times = 0
if claw_status =='close' then
claw_status='open'
print(claw_status)
set_claw(100,100)
sync()
elseif claw_status=='open' then
claw_status='close'
set_claw(100,0)
print(claw_status)
sync()
end
end
end
wait(20)
end
DO控制泵
-- DO控制吸泵
-- 实现运动过程中,检测到触碰到物体开始吸,到达特殊位置,释放
-- 具体接线组装方式查看官网
--首先运动到要吸的物体上方
object_pose = {j1=-1.9267757433836,j2=-1.9655087582777,j3=2.2828510337716,j4=-0.34313232749017,j5=-3.9459738292373,j6=0}
object_pose_top = pose_add(object_pose, {0,0,0.15,0,0,0}) --以object_pose点为基础,沿基座坐标系z轴0.15m移动后,形成的新坐标
movej(object_pose_top, 0.5, 0.5, 0, 0)
--从物体上方往下逐渐移动,当触发末端的di时,泵接触到物体,停止移动 打开泵
movel_until(object_pose, 0.2, 0.08, 0, function() return get_flange_di(0) == 1 end)
--开泵吸
set_do(0, 1) -- 打开泵
set_do(1, 0) -- 关闭阀
--回到顶部位置 移动到放置位置
movej(object_pose_top, 0.5, 0.5, 0, 0)
put_pose = {j1=0,j2=-1.9655087582777,j3=2.2828510337716,j4=-0.34313232749017,j5=-3.9459738292373,j6= 0}
movej(put_pose, 0.5, 0.5, 0, 0)
--关泵释放
set_do(0, 0) -- 关闭泵
set_do(1, 1) -- 打开阀
wait(2000)
set_do(1, 0) -- 关闭阀