当前位置:首页 > Python > 正文

Python简单实现ORM框架 - 从零开始教程

Python简单实现ORM框架

从零开始构建自己的对象关系映射器,理解ORM的核心原理

什么是ORM?

ORM(Object-Relational Mapping,对象关系映射)是一种程序设计技术,用于在面向对象编程语言和关系数据库之间建立映射关系。通过使用ORM,开发者可以使用面向对象的方式来操作数据库,而不需要直接编写SQL语句。

ORM的核心思想是将数据库表映射为编程语言中的类,将表中的行映射为对象,将表中的列映射为对象的属性。这样,开发者可以通过操作对象来实现对数据库的增删改查操作。

为什么需要自己实现ORM?

虽然Python有很多优秀的ORM框架(如Django ORM、SQLAlchemy等),但自己实现一个简单的ORM有助于:

💡

深入理解原理

了解ORM框架背后的工作机制

🔧

定制化需求

根据特定需求实现定制功能

🚀

学习提升

提升Python元编程和数据库操作能力

实现步骤

1. 数据库连接

首先实现一个简单的数据库连接管理器,使用Python内置的sqlite3模块:

import sqlite3

class Database:
    def __init__(self, path=':memory:'):
        self.conn = sqlite3.connect(path)
        self.cursor = self.conn.cursor()
    
    def execute(self, sql, params=()):
        self.cursor.execute(sql, params)
        return self.cursor
    
    def commit(self):
        self.conn.commit()
    
    def close(self):
        self.conn.close()
    
    def create_table(self, table_name, columns):
        columns_sql = ', '.join(columns)
        self.execute(f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_sql})")
        self.commit()

2. 模型基类

创建所有模型类的基类Model,实现基本的ORM功能:

class Model:
    db = Database('mydatabase.db')
    
    @classmethod
    def _table_name(cls):
        return cls.__name__.lower()
    
    @classmethod
    def _fields(cls):
        # 获取类中定义的字段,排除非字段属性
        return [name for name, value in cls.__dict__.items() 
                if isinstance(value, Field)]
    
    @classmethod
    def create_table(cls):
        fields = []
        for name, field in cls.__dict__.items():
            if isinstance(field, Field):
                fields.append(f"{name} {field.field_type}")
        cls.db.create_table(cls._table_name(), fields)

3. 字段类型

定义不同的字段类型,用于映射到数据库中的列类型:

class Field:
    def __init__(self, field_type):
        self.field_type = field_type

class IntegerField(Field):
    def __init__(self):
        super().__init__("INTEGER")

class CharField(Field):
    def __init__(self, max_length=255):
        super().__init__(f"VARCHAR({max_length})")

class BooleanField(Field):
    def __init__(self):
        super().__init__("BOOLEAN")

完整ORM实现

将上述组件组合起来,实现一个完整的简单ORM:

class Model:
    db = Database('mydatabase.db')
    
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)
        self._is_saved = False
    
    @classmethod
    def _table_name(cls):
        return cls.__name__.lower()
    
    @classmethod
    def _fields(cls):
        return [name for name, value in cls.__dict__.items() 
                if isinstance(value, Field)]
    
    @classmethod
    def create_table(cls):
        fields = []
        for name, field in cls.__dict__.items():
            if isinstance(field, Field):
                fields.append(f"{name} {field.field_type}")
        cls.db.create_table(cls._table_name(), fields)
    
    def save(self):
        fields = self._fields()
        values = [getattr(self, field) for field in fields]
        
        if self._is_saved:
            # 更新现有记录
            set_clause = ', '.join([f"{field}=?" for field in fields])
            sql = f"UPDATE {self._table_name()} SET {set_clause} WHERE id=?"
            self.db.execute(sql, values + [self.id])
        else:
            # 插入新记录
            placeholders = ', '.join(['?' for _ in fields])
            sql = f"INSERT INTO {self._table_name()} ({', '.join(fields)}) VALUES ({placeholders})"
            self.db.execute(sql, values)
            self.id = self.db.cursor.lastrowid
            self._is_saved = True
        self.db.commit()
    
    @classmethod
    def get(cls, id):
        sql = f"SELECT * FROM {cls._table_name()} WHERE id=?"
        result = cls.db.execute(sql, (id,)).fetchone()
        if result:
            return cls._create_obj_from_row(result)
        return None
    
    @classmethod
    def all(cls):
        sql = f"SELECT * FROM {cls._table_name()}"
        results = cls.db.execute(sql).fetchall()
        return [cls._create_obj_from_row(row) for row in results]
    
    @classmethod
    def _create_obj_from_row(cls, row):
        # 获取列名
        col_names = [desc[0] for desc in cls.db.cursor.description]
        # 创建对象
        obj = cls()
        for col_name, value in zip(col_names, row):
            setattr(obj, col_name, value)
        obj._is_saved = True
        return obj
    
    def delete(self):
        if self._is_saved:
            sql = f"DELETE FROM {self._table_name()} WHERE id=?"
            self.db.execute(sql, (self.id,))
            self.db.commit()
            self._is_saved = False

使用示例

定义模型并使用ORM进行操作:

1

定义用户模型

class User(Model):
    id = IntegerField()
    name = CharField(max_length=50)
    email = CharField(max_length=100)
    is_admin = BooleanField()

# 创建表
User.create_table()
2

创建用户

# 创建新用户
user = User(name="张三", email="zhangsan@example.com", is_admin=True)
user.save()
print(f"新用户ID: {user.id}")
3

查询用户

# 查询单个用户
user = User.get(1)
if user:
    print(f"用户名: {user.name}, 邮箱: {user.email}")

# 查询所有用户
users = User.all()
for u in users:
    print(f"ID: {u.id}, 姓名: {u.name}")
4

更新和删除

# 更新用户
user = User.get(1)
user.email = "new_email@example.com"
user.save()

# 删除用户
user.delete()

总结

通过本教程,我们实现了一个简单的Python ORM框架,包含以下核心功能:

  • 数据库连接管理
  • 模型定义与表创建
  • 数据插入和保存
  • 数据查询(单条和所有记录)
  • 数据更新和删除

这个简单的ORM框架虽然功能有限,但涵盖了ORM的核心概念。你可以在此基础上继续扩展,添加如条件查询、关系映射、事务支持等高级功能。

本教程旨在帮助理解ORM实现原理,实际开发中建议使用成熟的ORM框架如SQLAlchemy或Django ORM

发表评论