注意

本文档适用于 Ceph 的开发版本。

Librados (Python)

rados 模块是 librados 的一个精简 Python 封装。

安装

要安装 Ceph 的 Python 库,请参阅 获取用于 Python 的 librados

入门指南

您可以使用 Python 创建自己的 Ceph 客户端。以下教程将向您展示如何导入 Ceph Python 模块、连接到 Ceph 集群,以及以 client.admin 用户身份执行对象操作。

注意

要使用 Ceph Python 绑定,您必须能够访问正在运行的 Ceph 集群。要快速设置一个集群,请参阅 入门指南

首先,为您的 Ceph 客户端创建一个 Python 源文件。

vim client.py

导入模块

要使用 rados 模块,请将其导入到您的源文件中。

1     import rados

配置集群句柄

在连接到 Ceph 存储集群之前,请创建一个集群句柄。默认情况下,集群句柄假定集群名称为 ceph(即部署工具和我们的入门指南的默认设置),以及用户名为 client.admin。您可以根据需要更改这些默认设置。

要连接到 Ceph 存储集群,您的应用程序需要知道 Ceph Monitor 的位置。通过指定 Ceph 配置文件(其中包含初始 Ceph monitors 的位置)的路径来向应用程序提供此信息。

1     import rados, sys
2
3     #Create Handle Examples.
4     cluster = rados.Rados(conffile='ceph.conf')
5     cluster = rados.Rados(conffile=sys.argv[1])
6     cluster = rados.Rados(conffile = 'ceph.conf', conf = dict (keyring = '/path/to/keyring'))

确保 conffile 参数提供您的 Ceph 配置文件的路径和文件名。您可以使用 sys 模块来避免硬编码 Ceph 配置路径和文件名。

您的 Python 客户端还需要一个客户端密钥环。对于此示例,我们默认使用 client.admin 密钥。如果您想在创建集群句柄时指定密钥环,可以使用 conf 参数。或者,您可以在 Ceph 配置文件中指定密钥环路径。例如,您可以在 Ceph 配置文件中添加如下一行:

keyring = /path/to/ceph.client.admin.keyring

有关通过 Python 修改配置的更多详细信息,请参阅 配置

连接到集群

配置集群句柄后,即可连接到集群。连接到集群后,您可以执行返回有关集群信息的方法。

 1     import rados, sys
 2
 3     cluster = rados.Rados(conffile='ceph.conf')
 4     print("\nlibrados version: {}".format(str(cluster.version())))
 5     print("Will attempt to connect to: {}".format(str(cluster.conf_get('mon host'))))
 6
 7     cluster.connect()
 8     print("\nCluster ID: {}".format(cluster.get_fsid()))
 9
10     print("\n\nCluster Statistics")
11     print("==================")
12     cluster_stats = cluster.get_cluster_stats()
13
14     for key, value in cluster_stats.items():
15             print(key, value)

默认情况下,Ceph 身份验证处于 on 状态。您的应用程序需要知道密钥环的位置。python-ceph 模块没有默认位置,因此您需要指定密钥环路径。指定密钥环的最简单方法是将其添加到 Ceph 配置文件中。以下 Ceph 配置文件示例使用 client.admin 密钥环。

1     [global]
2     # ... elided configuration
3     keyring = /path/to/keyring/ceph.client.admin.keyring

管理存储池

连接到集群后,Rados API 允许您管理存储池。您可以列出存储池、检查存储池是否存在、创建存储池和删除存储池。

 1     print("\n\nPool Operations")
 2     print("===============")
 3
 4     print("\nAvailable Pools")
 5     print("----------------")
 6     pools = cluster.list_pools()
 7
 8     for pool in pools:
 9             print(pool)
10
11     print("\nCreate 'test' Pool")
12     print("------------------")
13     cluster.create_pool('test')
14
15     print("\nPool named 'test' exists: {}".format(str(cluster.pool_exists('test'))))
16     print("\nVerify 'test' Pool Exists")
17     print("-------------------------")
18     pools = cluster.list_pools()
19
20     for pool in pools:
21             print(pool)
22
23     print("\nDelete 'test' Pool")
24     print("------------------")
25     cluster.delete_pool('test')
26     print("\nPool named 'test' exists: {}".format(str(cluster.pool_exists('test'))))

输入/输出上下文

从 Ceph 存储集群读取和写入需要输入/输出上下文 (ioctx)。您可以使用 Rados 类的 open_ioctx()open_ioctx2() 方法创建 ioctx。ioctx_name 参数是存储池的名称,pool_id 是您希望使用的存储池的 ID。

1     ioctx = cluster.open_ioctx('data')

1     ioctx = cluster.open_ioctx2(pool_id)

获得 I/O 上下文后,您可以读取/写入对象、扩展属性,并执行许多其他操作。完成操作后,请确保关闭连接。例如:

1     print("\nClosing the connection.")
2     ioctx.close()

写入、读取和删除对象

创建 I/O 上下文后,即可将对象写入集群。如果您写入的对象不存在,Ceph 会创建它。如果您写入的对象存在,Ceph 会覆盖它(除非您指定范围,在这种情况下它只会覆盖该范围)。您可以从集群读取对象(和对象范围)。您还可以从集群中删除对象。例如:

1print("\nWriting object 'hw' with contents 'Hello World!' to pool 'data'.")
2ioctx.write_full("hw", "Hello World!")
3
4print("\n\nContents of object 'hw'\n------------------------\n")
5print(ioctx.read("hw"))
6
7print("\nRemoving object 'hw'")
8ioctx.remove_object("hw")

写入和读取 XATTRS

创建对象后,您可以向对象写入扩展属性 (XATTRs),并从对象读取 XATTRs。例如:

1print("\n\nWriting XATTR 'lang' with value 'en_US' to object 'hw'")
2ioctx.set_xattr("hw", "lang", "en_US")
3
4print("\n\nGetting XATTR 'lang' from object 'hw'\n")
5print(ioctx.get_xattr("hw", "lang"))

列出对象

如果您想检查存储池中的对象列表,可以检索对象列表并通过对象迭代器进行迭代。例如:

 1object_iterator = ioctx.list_objects()
 2
 3while True :
 4
 5        try :
 6                rados_object = object_iterator.__next__()
 7                print("Object contents = {}".format(rados_object.read()))
 8
 9        except StopIteration :
10                break
11
12# Or alternatively
13[print("Object contents = {}".format(obj.read())) for obj in ioctx.list_objects()]

Object 类提供了类似文件的对象接口,允许您读取和写入内容以及扩展属性。使用 I/O 上下文执行的对象操作提供了额外的功能和异步能力。

集群句柄 API

Rados 类提供了 Ceph 存储守护程序的接口。

配置

Rados 类提供了用于获取和设置配置值、读取 Ceph 配置文件和解析参数的方法。您无需连接到 Ceph 存储集群即可调用以下方法。有关设置的详细信息,请参阅 存储集群配置

Rados.conf_get(option)

Rados.conf_get(self, str option: str) -> Optional[str]

获取配置选项的值

参数:

option (str) -- 要读取的选项

返回类型:

Optional[str]

返回:

选项的值或 None

引发:

TypeError

Rados.conf_set(option, val)

Rados.conf_set(self, str option: str, str val: str)

设置配置选项的值

参数:
  • option (str) -- 要设置的选项

  • option -- 选项的值

引发:

TypeError, ObjectNotFound

Rados.conf_read_file(path=None)

Rados.conf_read_file(self, str path: Optional[str] = None)

使用 Ceph 配置文件配置集群句柄。

参数:

path (Optional[str]) -- 配置文件的路径

Rados.conf_parse_argv(args)

Rados.conf_parse_argv(self, args: Sequence[str])

从 args 中解析已知参数并将其删除;返回的 args 仅包含 ceph 未知的参数

Rados.version()

Rados.version(self) -> Version

获取 librados C 库的版本号。

返回类型:

Version

返回:

librados 版本的 (major, minor, extra) 组件元组

连接管理

配置集群句柄后,您可以连接到集群、检查集群 fsid、检索集群统计信息以及断开(关闭)集群。您还可以断言集群句柄处于特定状态(例如,“configuring”、“connecting”等)。

Rados.connect(timeout=0)

Rados.connect(self, int timeout: int = 0)

连接到集群。使用 shutdown() 释放资源。

参数:

timeout (int) -- 当前忽略任何提供的超时值。

Rados.shutdown()

Rados.shutdown(self)

断开与集群的连接。当不再使用 Rados.connect()ed 对象时,显式调用此函数。

Rados.get_fsid()

Rados.get_fsid(self) -> str

获取集群的 fsid 作为十六进制字符串。

引发:

Error

返回类型:

str

返回:

集群 fsid

Rados.get_cluster_stats()

Rados.get_cluster_stats(self) -> Dict[str, int]

读取有关集群的使用信息

这会告诉您总空间、已用空间、可用空间和对象数。这些不是在写入数据时立即更新的,它们是最终一致的。 :rtype: Dict[str, int] :returns: 包含以下键

  • kb (int) - 总空间

  • kb_used (int) - 已用空间

  • kb_avail (int) - 可用空闲空间

  • num_objects (int) - 对象数

class rados.Rados
require_state(*args)

检查 Rados 对象是否处于特定状态

参数:

args -- 要检查的任意数量的状态作为单独的参数

引发:

RadosStateError

存储池操作

要使用存储池操作方法,必须先连接到 Ceph 存储集群。您可以列出可用的存储池、创建存储池、检查存储池是否存在以及删除存储池。

Rados.list_pools()

Rados.list_pools(self) -> List[str]

获取存储池名称列表。

返回类型:

List[str]

返回:

存储池名称列表。

Rados.create_pool(pool_name, crush_rule=None, auid=None)

Rados.create_pool(self, str pool_name: str, int crush_rule: Optional[int] = None, int auid: Optional[int] = None)

创建存储池:- 使用默认设置:如果 crush_rule=None 和 auid=None - 使用特定的 CRUSH 规则:给定 crush_rule - 使用特定的 auid:给定 auid - 使用特定的 CRUSH 规则和 auid:给定 crush_rule 和 auid

参数:
  • pool_name (str) -- 要创建的存储池的名称

  • crush_rule (Optional[int]) -- 用于在新存储池中进行放置的规则

  • auid (Optional[int]) -- 新存储池所有者的 ID

引发:

TypeError, Error

Rados.pool_exists(pool_name)

Rados.pool_exists(self, str pool_name: str) -> bool

检查给定的存储池是否存在。

参数:

pool_name (str) -- 要检查的存储池的名称

引发:

TypeError, Error

返回类型:

bool

返回:

如果存储池存在则为 true,否则为 false。

Rados.delete_pool(pool_name)

Rados.delete_pool(self, str pool_name: str)

删除存储池及其内部所有数据。

存储池会立即从集群中删除,但实际数据会在后台删除。

参数:

pool_name (str) -- 要删除的存储池的名称

引发:

TypeError, Error

CLI 命令

Ceph CLI 命令内部使用以下 librados Python 绑定方法。

要发送命令,请选择正确的方法和正确的对象。

Rados.mon_command(cmd, inbuf, timeout=0, target=None)

向 mon 发送命令。

mon_command[_target](cmd, inbuf, outbuf, outbuflen, outs, outslen)

参数:
  • cmd (str) -- JSON 格式的字符串。

  • inbuf (bytes) -- 可选字符串。

  • timeout (int) -- 此参数被忽略。

  • target (Union[int, str, None]) -- 特定 mon 的名称或 rank。可选

返回类型:

Tuple[int, bytes, str]

返回:

(int ret, string outbuf, string outs)

示例

>>> import json
>>> c = Rados(conffile='/etc/ceph/ceph.conf')
>>> c.connect()
>>> cmd = json.dumps({"prefix": "osd safe-to-destroy", "ids": ["2"], "format": "json"})
>>> c.mon_command(cmd, b'')
Rados.osd_command(osdid, cmd, inbuf, timeout=0)

osd_command(osdid, cmd, inbuf, outbuf, outbuflen, outs, outslen)

返回类型:

Tuple[int, bytes, str]

返回:

(int ret, string outbuf, string outs)

Rados.mgr_command(cmd, inbuf, timeout=0, target=None)
返回类型:

Tuple[int, str, bytes]

返回:

(int ret, string outbuf, string outs)

Rados.pg_command(pgid, cmd, inbuf, timeout=0)

pg_command(pgid, cmd, inbuf, outbuf, outbuflen, outs, outslen)

返回类型:

Tuple[int, bytes, str]

返回:

(int ret, string outbuf, string outs)

输入/输出上下文 API

要向 Ceph 对象存储写入数据和从 Ceph 对象存储读取数据,必须创建输入/输出上下文 (ioctx)。Rados 类提供了 open_ioctx()open_ioctx2() 方法。其余的 ioctx 操作涉及调用 Ioctx 和其他类的方法。

Rados.open_ioctx(ioctx_name)

Rados.open_ioctx(self, str ioctx_name: str) -> Ioctx

创建 io 上下文

io 上下文允许您在特定存储池中执行操作。

参数:

ioctx_name (str) -- 存储池的名称

引发:

TypeError, Error

返回类型:

Ioctx

返回:

Rados Ioctx 对象

Ioctx.require_ioctx_open()

Ioctx.require_ioctx_open(self)

检查 rados.Ioctx 对象状态是否为“open”

引发:

IoctxStateError

Ioctx.get_stats()

Ioctx.get_stats(self) -> Dict[str, int]

获取存储池使用情况统计信息

返回类型:

Dict[str, int]

返回:

字典包含以下键

  • num_bytes (int) - 存储池大小(字节)

  • num_kb (int) - 存储池大小(k字节)

  • num_objects (int) - 存储池中的对象数

  • num_object_clones (int) - 对象克隆数

  • num_object_copies (int) - 对象副本数

  • num_objects_missing_on_primary (int) - 对象数

    在 primary 上丢失

  • num_objects_unfound (int) - 未找到的对象数

  • num_objects_degraded (int) - 降级对象数

  • num_rd (int) - 读取的字节数

  • num_rd_kb (int) - 读取的 k字节数

  • num_wr (int) - 写入的字节数

  • num_wr_kb (int) - 写入的 k字节数

Ioctx.get_last_version()

Ioctx.get_last_version(self) -> int

返回读取或写入的最后一个对象的版本。

这会公开通过此 io 上下文读取或写入的最后一个对象的内部版本号

返回类型:

int

返回:

使用的最后一个对象的版本

Ioctx.close()

Ioctx.close(self)

关闭 rados.Ioctx 对象。

这只是告诉 librados 您不再需要使用 io 上下文。如果上面有待处理的异步请求,它可能不会立即释放,但在对此函数调用后,您不应再使用此 io 上下文。

对象操作

Ceph 存储集群将数据存储为对象。您可以同步或异步读取和写入对象。您可以从偏移量读取和写入。对象有一个名称(或键)和数据。

Ioctx.aio_write(object_name, to_write, offset=0, oncomplete=None, onsafe=None)

Ioctx.aio_write(self, str object_name: str, bytes to_write: bytes, int offset: int = 0, oncomplete: Optional[Callable[[Completion], None]] = None, onsafe: Optional[Callable[[Completion], None]] = None) -> Completion

异步写入数据到对象

将写入排队并返回。

参数:
  • object_name (str) -- 对象的名称

  • to_write (bytes) -- 要写入的数据

  • offset (int) -- 在对象中开始写入的字节偏移量

  • oncomplete (Optional[Callable[[Completion], None]]) -- 当写入在所有副本的内存中安全且完成时要执行的操作

  • onsafe (Optional[Callable[[Completion], None]]) -- 当写入在所有副本的存储上安全且完成时要执行的操作

引发:

Error

返回类型:

Completion

返回:

完成对象

Ioctx.aio_write_full(object_name, to_write, oncomplete=None, onsafe=None)

Ioctx.aio_write_full(self, str object_name: str, bytes to_write: bytes, oncomplete: Optional[Callable] = None, onsafe: Optional[Callable] = None) -> Completion

异步写入整个对象

对象将填充所提供的数据。如果对象存在,它会被原子截断然后写入。将写入排队并返回。

参数:
  • object_name (str) -- 对象的名称

  • to_write (bytes) -- 要写入的数据

  • oncomplete (Optional[Callable]) -- 当写入在所有副本的内存中安全且完成时要执行的操作

  • onsafe (Optional[Callable]) -- 当写入在所有副本的存储上安全且完成时要执行的操作

引发:

Error

返回类型:

Completion

返回:

完成对象

Ioctx.aio_append(object_name, to_append, oncomplete=None, onsafe=None)

Ioctx.aio_append(self, str object_name: str, bytes to_append: bytes, oncomplete: Optional[Callable] = None, onsafe: Optional[Callable] = None) -> Completion

异步将数据追加到对象

将写入排队并返回。

参数:
  • object_name (str) -- 对象的名称

  • to_append (bytes) -- 要追加的数据

  • offset -- 在对象中开始写入的字节偏移量

  • oncomplete (Optional[Callable]) -- 当写入在所有副本的内存中安全且完成时要执行的操作

  • onsafe (Optional[Callable]) -- 当写入在所有副本的存储上安全且完成时要执行的操作

引发:

Error

返回类型:

Completion

返回:

完成对象

Ioctx.write(key, data, offset=0)

Ioctx.write(self, str key: str, bytes data: bytes, int offset: int = 0)

同步写入数据到对象

参数:
  • key (str) -- 对象的名称

  • data (bytes) -- 要写入的数据

  • offset (int) -- 在对象中开始写入的字节偏移量

引发:

TypeError

引发:

LogicError

返回:

int - 成功时为 0

Ioctx.write_full(key, data)

Ioctx.write_full(self, str key: str, bytes data: bytes)

同步写入整个对象。

对象将填充所提供的数据。如果对象存在,它会被原子截断然后写入。

参数:
  • key (str) -- 对象的名称

  • data (bytes) -- 要写入的数据

引发:

TypeError

引发:

Error

返回:

int - 成功时为 0

Ioctx.aio_flush()

Ioctx.aio_flush(self)

阻塞直到 io 上下文中所有待处理的写入都安全

引发:

Error

Ioctx.set_locator_key(loc_key)

Ioctx.set_locator_key(self, str loc_key: str)

设置用于在 io 上下文中将对象映射到 pgs 的键。

该键用于代替对象名称来确定对象放入哪个放置组。这会影响 io 上下文的所有后续操作 - 在设置不同的定位器键之前,此 io 上下文中的所有对象都将放置在同一 pg 中。

参数:

loc_key (str) -- 用作对象定位器的键,或 NULL 以丢弃任何先前设置的键

引发:

TypeError

Ioctx.aio_read(object_name, length, offset, oncomplete=None)

Ioctx.aio_read(self, str object_name: str, int length: int, int offset: int, oncomplete: Optional[Callable] = None) -> Completion

异步从对象读取数据

oncomplete 将被调用,并带有返回的读取值以及完成对象

oncomplete(completion, data_read)

参数:
  • object_name (str) -- 要读取的对象的名称

  • length (int) -- 要读取的字节数

  • offset (int) -- 在对象中开始读取的字节偏移量

  • oncomplete (Optional[Callable]) -- 读取完成时要执行的操作

引发:

Error

返回类型:

Completion

返回:

完成对象

Ioctx.read(key, length=8192, offset=0)

Ioctx.read(self, str key: str, int length: int = 8192, int offset: int = 0) -> bytes

同步从对象读取数据

参数:
  • key (str) -- 对象的名称

  • length (int) -- 要读取的字节数(默认值 = 8192)

  • offset (int) -- 在对象中开始读取的字节偏移量

引发:

TypeError

引发:

Error

返回类型:

bytes

返回:

从对象读取的数据

Ioctx.stat(key)

Ioctx.stat(self, str key: str) -> Tuple[int, time.struct_time]

获取对象统计信息(大小/修改时间)

参数:

key (str) -- 要获取统计信息的对象的名称

引发:

TypeError

引发:

Error

返回类型:

Tuple[int, struct_time]

返回:

(大小, 时间戳)

Ioctx.trunc(key, size)

Ioctx.trunc(self, str key: str, int size: int) -> int

调整对象大小

如果这会扩大对象,则新区域在逻辑上填充零。如果这会缩小对象,则多余的数据将被删除。

参数:
  • key (str) -- 要调整大小的对象的名称

  • size (int) -- 对象的新大小(字节)

引发:

TypeError

引发:

Error

返回类型:

int

返回:

成功时为 0,否则引发错误

Ioctx.remove_object(key)

Ioctx.remove_object(self, str key: str) -> bool

删除对象

这不会删除对象的任何快照。

参数:

key (str) -- 要删除的对象的名称

引发:

TypeError

引发:

Error

返回类型:

bool

返回:

成功时为 True

对象扩展属性

您可以设置对象的扩展属性 (XATTRs)。您可以检索对象或 XATTRs 列表并对其进行迭代。

Ioctx.set_xattr(key, xattr_name, xattr_value)

Ioctx.set_xattr(self, str key: str, str xattr_name: str, bytes xattr_value: bytes) -> bool

设置对象的扩展属性。

参数:
  • key (str) -- 要设置 xattr 的对象的名称

  • xattr_name (str) -- 要设置的扩展属性

  • xattr_value (bytes) -- 扩展属性的值

引发:

TypeError

引发:

Error

返回类型:

bool

返回:

成功时为 True,否则引发错误

Ioctx.get_xattrs(oid)

Ioctx.get_xattrs(self, str oid: str) -> XattrIterator

开始迭代对象上的 xattrs。

参数:

oid (str) -- 要获取 xattrs 的对象的名称

引发:

TypeError

引发:

Error

返回类型:

XattrIterator

返回:

XattrIterator

XattrIterator.__next__()

XattrIterator.__next__(self)

获取对象上的下一个 xattr

引发:

StopIteration

返回:

下一个 Xattr 的名称和值对

Ioctx.get_xattr(key, xattr_name)

Ioctx.get_xattr(self, str key: str, str xattr_name: str) -> bytes

获取对象上扩展属性的值。

参数:
  • key (str) -- 要获取 xattr 的对象的名称

  • xattr_name (str) -- 要读取的扩展属性

引发:

TypeError

引发:

Error

返回类型:

bytes

返回:

xattr 的值

Ioctx.rm_xattr(key, xattr_name)

Ioctx.rm_xattr(self, str key: str, str xattr_name: str) -> bool

从对象中删除扩展属性。

参数:
  • key (str) -- 要从中删除 xattr 的对象的名称

  • xattr_name (str) -- 要删除的扩展属性

引发:

TypeError

引发:

Error

返回类型:

bool

返回:

成功时为 True,否则引发错误

对象接口

从 I/O 上下文,您可以从存储池中检索对象列表并对其进行迭代。对象接口使得每个对象看起来像一个文件,您可以对对象执行同步操作。对于异步操作,应使用 I/O 上下文方法。

Ioctx.list_objects()

Ioctx.list_objects(self) -> ObjectIterator

获取 rados.Ioctx 对象上的 ObjectIterator。

返回类型:

ObjectIterator

返回:

ObjectIterator

ObjectIterator.__next__()

ObjectIterator.__next__(self)

获取存储池中的下一个对象名称和定位器

引发:

StopIteration

返回:

下一个 rados.Ioctx 对象

Object.read(length=1024 * 1024)
Object.write(string_to_write)
Object.get_xattrs()
Object.get_xattr(xattr_name)
Object.set_xattr(xattr_name, xattr_value)
Object.rm_xattr(xattr_name)
Object.stat()
Object.remove()

由 Ceph 基金会为您呈现

Ceph 文档是由非营利性 Ceph 基金会 资助和托管的社区资源。如果您希望支持这项工作和我们的其他努力,请考虑 立即加入