密码

私钥/公钥加密算法(对称/非对称)

很讨厌一样东西有两样名字,考题考官总能拿没听过的考你🤣

算法生成一对密钥用于加密解密

  • 密钥相同 对称/私钥 加密
  • 密钥不同 非对称/公钥 加密 公钥可公开,私钥自己留着,同一密钥对可以解对方加的密

常用算法

对称

AES,DES,3DES,RC5、RC6

加密解密效率高,速度快,适合进行大数据量的加解密

非对称

RSA、DSA、ECC

算法复杂,加解密速度慢,但安全性高

hash

MD5、SHA系列、HMAC 加盐

同输入同算法同输出,同输出不一定输入相同(不能反推)

密钥的格式

ASN.1 format

本身只是表示接口数据的通用语法,没有限定编码方法。是一种协议

RSA生成的密钥就是使用ASN.1中的DER(Distinguished Encoding Rules)来编码成二进制

PEM 对DER编码转码为Base64,解码后,可以还原为DER格式

1
2
3
-----BEGIN PUBLIC KEY-----
xxx
-----END PUBLIC KEY-----

PEM 格式

  • PKCS1

专门为 RSA 密钥进行定义 有 RSA 字样

1
2
3
-----BEGIN RSA PRIVATE KEY-----
BASE64 DATA
-----END RSA PRIVATE KEY-----
  • PKCS8

一种密钥格式的通用方案,不仅仅为 RSA 所使用,也可以被其它密钥所使用

消息加密

通信安全

非对称加密对称的话密钥泄漏就完蛋

  • S : 发送者
  • R : 接收者
\[R\xrightarrow{\mathrm{公钥}}S\] \[S: R\mathrm{公钥}+\mathrm{原始信息}=\mathrm{密文}\] \[S\xrightarrow{\mathrm{密文}}R\] \[R:\;R\mathrm{私钥}+\mathrm{密文}=\mathrm{原始信息}\]

数字签名

确保消息来源,不被篡改

  • S : 发送者
  • R : 接收者
  • M: Man-in-the-middle attack
\[S\xrightarrow{\mathrm{公钥}}R\] \[S\;:\;\mathrm{原始消息}+\;HASH=\mathrm{摘要}(digest)+\;S\mathrm{私钥加密}\;=\;\mathrm{数字签名}(signature)\] \[S\xrightarrow{\mathrm{原始消息}\;and\;\;\mathrm{数字签名}}R\] \[R\;\left\{\begin{array}{l}\mathrm{原始消息}\;\xrightarrow[{HASH}]{}\mathrm{摘要}A\\\mathrm{加密摘要}\xrightarrow[{\mathrm{S公钥解密}\;\;\;}]{}\mathrm{摘要}B\end{array}\rightarrow\right.\mathrm{摘要}A/B\mathrm{对比}\rightarrow\mathrm{消息是否被篡改}\]

这里的原始消息也可以经过加密,这样就不是明文了,安全性更好

  • Why 加密摘要 not 直接发摘要

直接发摘要M可以替换发送的消息和摘要,但无法完成加密。通过私钥加密证明发送者的身份,只有唯一的公钥解密,其他人无法伪造。

  • Why 加密摘要 not 加密消息

消息量大的话加密时间长,摘要是固定长度的。

数字证书

当R拿到S公钥其实是M公钥时,一切崩坏。M可以完全取代S发送消息,R会觉得M才是S,S是其他人。

证书中心(certificate authority,简称CA)为公钥做认证。证书中心用自己的私钥,对S公钥和一些相关信息一起加密签名,生成数字证书(Digital Certificate)

\[{\mathrm{证书中心}(certificate\;authority)\mathrm{私钥}}+S\mathrm{公钥}+\mathrm{一些}S\mathrm{的信息}=\mathrm{数字证书}(Digital\;Certificate)\] \[S\xrightarrow{\mathrm{原始消息}\;and\;\;\mathrm{数字签名}\;}R\] \[R:\;\mathrm{数字签名}+CA\;\mathrm{公钥}=S\mathrm{公钥}\]

知名CA机构的根证书会内置于游览器中,用来确保 CA 机构本身的身份,其中就包含了 CA 机构自身的公钥。会验证数字证书可靠性。

  • Why CA 公钥非伪造

    CA是第三方机构,CA公钥是公开的,接收方可以跟别人比对(比如在网上查询),因此不可能伪造。但是发送方公钥,接收方是通过通信得到的,收到后无法验证。

自签名证书

1
openssl req -newkey rsa:2048 -nodes -keyout client.key -x509 -days 36500 -out client.crt

Common Name 如果您使用 SSL 加密保护网络服务器和客户端之间的数据流,举例被保护的网站是 https://test.chinacloudsites.cn,那么此处 Common Name 应输入 test.chinacloudsites.cn

CA自签名

自签 CA 证书

1
2
3
4
mkdir -p ./demoCA/newcerts
touch ./demoCA/index.txt
echo "01" > ./demoCA/serial

修改 openssl.cnf dir属性 或者cp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[ CA_default ]

dir             = ./demoCA              # Where everything is kept                                                                                                      
certs           = $dir/certs            # Where the issued certs are kept
crl_dir         = $dir/crl              # Where the issued crl are kept
database        = $dir/index.txt        # database index file.
#unique_subject = no                    # Set to 'no' to allow creation of
                                        # several ctificates with same subject.
new_certs_dir   = $dir/newcerts         # default place for new certs.

certificate     = $dir/cacert.pem       # The CA certificate
serial          = $dir/serial           # The current serial number
crlnumber       = $dir/crlnumber        # the current crl number

生成 ca.key ca.crt 生成扩展名为.crt 的根证书文件和扩展名为.key 的公钥, 生成的证书和公钥可能是.PEM 编码格式,也可能是.DER 编码格式。

1
openssl req -newkey rsa:2048 -nodes -keyout ca.key -x509 -days 36500 -out ca.crt

optional

1
cat ca.crt ca.key > ca.cer

X.509 证书编码格式主要有两种: .PEM.DER

  • .DER 是二进制编码,可包含所有私钥、公钥和证书,是大多数浏览器的缺省格式,常见于 Windows 系统中的证书格式。

  • .PEM 是明文格式的, 以 —–BEGIN CERTIFICATE—– 开头,已 —–END CERTIFICATE—– 结尾, 中间是经过 base64 编码的内容, Apache 和 NGINX 服务器偏向于使用这种编码格式,也是 openssl 默认采用的信息存放方式。 PEM 其实就是把 DER 的内容进行了一次 base64 编码。

证书编码格式转换

  • PEM to DER :openssl x509 -in cacert.crt -outform der -out cacert.der
  • DER to PEM :openssl x509 -in cert.crt -inform der -outform pem -out cacert.pem
  • PEM to CRT :openssl x509 -outform der -in your-cert.pem -out your-cert.crt
  • CRT to PEM :openssl x509 -inform der -in certificate.cer -out certificate.pem

签发证书

生成私钥

1
openssl genrsa -out server.key 2048

生成证书签名请求,扩展名.csr

1
openssl req -days 36500 -new -key server.key -out server.csr

使用 CA 根证书签发

1
openssl ca -days 36500 -in server.csr -out server.crt -cert ca.crt -keyfile ca.key

查看*.crt文件的有效期

1
2
3
openssl x509 -in domain.crt -noout -dates
openssl x509 -in domain.crt -noout -text
openssl x509 -in domain.crt -noout -subject

SAN证书

  • SANSubject Alternative Name的首字母缩写
  • 这些证书的成本通常比单名证书高一点,因为它们具有更多功能。
  • 当您请求 SAN 证书时,您可以选择定义证书可以保护的多个 DNS 名称。
  • 颁发后,SAN 证书将包含主 DNS 名称,该名称通常是网站的主名称,并且在证书属性的更深处,您会发现列出了您在请求期间指定的其他 DNS 名称。
  • 此单一证书可以安装在 Web 服务器上,并用于验证证书中包含的任何 DNS 名称的流量。

go version > 1.15有关,高版本弃用CN(CommonName) 字段,证书必须有SAN(Subject Alternative Names),不然会报错x509: certificate relies on legacy Common Name field。

生成证书

普通 xxx是域名

1
openssl req -x509 -sha256 -nodes -days 36500 -newkey rsa:2048 -keyout server.key -out server.crt -subj "/CN=xxxx" -addext "subjectAltName = DNS:xxxx"

复杂编辑x509 openssl.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[req]
distinguished_name = req_distinguished_name
x509_extensions = v3_req
prompt = no

[req_distinguished_name]
CN = MyServerName

[v3_req]
subjectAltName = @alt_names

[alt_names]
IP.1 = 127.0.0.1
DNS.1 = MyServerName
DNS.2 = *.xxx.cn

生成

1
openssl req -new -x509 -nodes -days 730 -keyout server.key -out server.crt -config openssl.conf

CA签名

参考

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[req]
distinguished_name = req_distinguished_name
req_extensions = req_ext
prompt = no

[req_distinguished_name]
CN = ums.xxx.cn

[req_ext]
subjectAltName = @alt_names

[alt_names]
IP.1 = 192.168.25.99
DNS.1 = ums.xxx.cn
DNS.2 = *.xxx.cn
1
2
3
4
5
6
7
8
9
10
# 生成ca
openssl req -newkey rsa:2048 -nodes -keyout ca.key -x509 -days 36500 -out ca.crt -subj "/C=xx/ST=x/L=x/O=x/OU=x/CN=ca/emailAddress=x/"

# 给生成的证书CA签名
openssl genrsa -out tls.key 2048
openssl req -new -key tls.key -out tls.csr -config openssl.conf
openssl x509 -req -days 36500 -in tls.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out tls.crt -extensions req_ext -extfile openssl.conf

# 验证SAN
openssl x509 -text -noout -in tls.crt | grep -A 1 "Subject Alternative Name"

查看证书

1
2
3
4
5
6
7
8
9
10
11
12
13
# 查看KEY信息
openssl rsa -noout -text -in myserver.key

# 查看CSR信息
openssl req -noout -text -in myserver.csr

# 查看证书信息
openssl x509 -noout -text -in ca.crt

# 验证证书
openssl verify selfsign.crt

openssl verify -CAfile ca.crt myserver.crt

HTTPS

SSL 认证

  • 单向SSL认证 一般client来校验服务器的合法性。

    • client需要一个ca.crt
    • sever需要server.crt、server.key
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
        listen       443 ssl;
        server_name  localhost;
      
        client_max_body_size 50M;
      
        ssl_certificate      /usr/local/nginx/ca/server/server.crt;
        ssl_certificate_key  /usr/local/nginx/ca/server/server.key;
      
        ssl_session_cache    shared:SSL:1m;
        ssl_session_timeout  5m;
      
        ssl_ciphers  HIGH:!aNULL:!MD5;
        ssl_prefer_server_ciphers  on;
    
  • 双向SSL认证 服务器需要校验每个client, client也需要校验服务器 crt证书由签名

    • server 需要 server.key 、server.crt 、ca.crt
    • client 需要 client.key 、client.crt 、ca.crt
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
     listen       443; 
     server_name  localhost; 
     ssi on; 
     ssi_silent_errors on; 
     ssi_types text/shtml; 
      
    ssl                  on; 
    ssl_certificate      /usr/local/nginx/ca/server/server.crt; 
    ssl_certificate_key  /usr/local/nginx/ca/server/server.key; 
    ssl_client_certificate /usr/local/nginx/ca/private/ca.crt; 
      
    ssl_session_timeout  5m; 
    ssl_verify_client on;  #开户客户端证书验证 
    ssl_protocols TLSv1 TLSv1.1 TLSv1.2;
    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDH:AES:HIGH:!aNULL:!MD5:!ADH:!DH;
      
      
    ssl_prefer_server_ciphers   on; 
    

Hello 阶段

  • Client端发送https请求
  1. 支持的协议版本
  2. 随机数 random1,稍后用于生成”对话密钥”。
  3. 支持的加密方法等信息
  • Server端
  1. 确认使用的加密通信协议版本
  2. 发送随机数 random2,稍后用于生成”对话密钥”。
  3. 确认加密方法等信息
  4. 发送数字证书

验证数字证书

  • CA身份验证/过期 CA根证书验证: 浏览器内置一个受信任的CA机构列表,并保存了这些CA机构的证书。

  • Server端的数字证书信息与当前正在访问的网站(域名等)一致: Server端是可信的 -> 获得Server公钥

Client端是否能够信任这个站点的证书,首先取决于Client端程序是否导入了证书颁发者的根证书。抓包工具要抓https也要导入工具的证书

协商通信密钥

传输内容的加密是采用的对称加密,对加密的密钥使用公钥进行非对称加密

  • Server端验证客户端(optional) 客户端的证书 + 随机数(Client端签名) = 客户端身份验证
  • 验证通过后Client端生成 随机码(PreMaster key)Server公钥加密 [+ 客户端身份验证] -> Server端

PreMaster key = RSA或者Diffie-Hellman等加密算法生成

PreMaster key是在客户端使用RSA或者Diffie-Hellman等加密算法生成的。

PreMaster key前两个字节是TLS的版本号,这是一个比较重要的用来核对握手数据的版本号,因为在Client Hello阶段, 客户端会发送一份加密套件列表和当前支持的SSL/TLS的版本号给服务端,而且是使用明文传送的, 如果握手的数据包被破解之后,攻击者很有可能串改数据包,选择一个安全性较低的加密套件和版本给服务端,从而对数据进行破解。 所以,服务端需要对密文中解密出来对的PreMaster版本号跟之前Client Hello阶段的版本号进行对比, 如果版本号变低,则说明被串改,则立即停止发送任何消息。

  • Server端验证 客户端身份(optional)过后,解密得到PreMaster key
  • 双端生成Master key = 一系列算法(PreMaster key + random1 + random2)

  • 双端:发送ChangeCipherSpec协议,数据包中就是一个字节的数据,用于告知服务端,客户端已经切换到之前协商好的加密套件(Cipher Suite)的状态,准备使用之前协商好的加密套件加密数据并传输了。
  • 双端:协商好的加密套件和Session Secret加密一段 Finish 的数据传送给服务端,此数据是为了在正式传输应用数据之前对刚刚握手建立起来的加解密通道进行验证。

如果PreMaster key泄漏 完蛋了

通信过程

S: 加密(消息+消息摘要) R: 解密 对比摘要

hash 密码

引用 hashing-security

保护密码的最好办法是使用加盐密码哈希salted password hashing

放弃编写自己的密码哈希加密代码的念头,要使用成熟方案。

hash what

哈希算法散列算法是单向函数。他们将任意数量的数据转换为固定长度的指纹,无法逆转。

破解hash

  • 字典攻击 Dictionary

字典攻击使用包含单词,短语,公共密码和其他可能用作密码的字符串的文件。对文件中的每个单词进行哈希处理,并将其哈希值与密码哈希值进行比较。

  • 暴力攻击 Brute Force Attacks

对于给定的密码长度,尝试每一种可能的字符组合。这种方式会消耗大量的计算,也是破解哈希加密效率最低的办法,但最终会找出正确的密码。因此密码应该足够长,以至于遍历所有可能的字符组合,耗费的时间太长令人无法承受,从而放弃破解。

  • 查表法( Lookup Tables)

查找表是一种非常有效的方法,可以非常快速地破解相同类型的多个哈希值。一般的想法是在密码字典中预先计算密码的哈希值,并将它们及其相应的密码存储在查找表数据结构中。查找表的良好实现每秒可以处理数百个哈希查找,即使它们包含数十亿个哈希值

  • 反向查表法( Reverse Lookup Tables)

这种攻击允许攻击者无需预先计算好查询表的情况下同时对多个哈希值发起字典攻击或暴力攻击。 首先,攻击者从被黑的用户帐号数据库创建一个用户名和对应的密码哈希表,然后,攻击者猜测一系列哈希值并使用该查询表来查找使用此密码的用户。通常许多用户都会使用相同的密码,因此这种攻击方式特别有效。

  • 彩虹表( Rainbow Tables)

与查表法相似,只是它为了使查询表更小,牺牲了破解速度。因为彩虹表更小,所以在单位空间可以存储更多的哈希值,从而使攻击更有效。

salt

查找表和彩虹表起作用,因为每个密码都以完全相同的方式进行哈希处理。相同的密码有相同的密码哈希值

密码中加入一段随机字符串再进行哈希加密,这个被加的字符串称之为,使得相同的密码每次都被加密为完全不同的字符串。

我们需要盐值来校验密码是否正确。通常和密码哈希值一同存储在帐号数据库中,或者作为哈希字符串的一部分。 盐值无需加密。由于随机化了哈希值,查表法、反向查表法和彩虹表都会失效。

错误用法

  • 盐值复用( Salt Reuse)

相同的密码,他们仍然会有相同的哈希值。攻击者仍然可以使用反向查表法对每个哈希值进行字典攻击 每次用户创建帐户或更改密码时,都必须生成新的随机盐

  • 短盐值( Short Slat)

攻击者可以为每种可能的盐构建查找表 为了使攻击者无法为每个可能的盐创建查找表,盐必须很长。一个好的经验法则是使用与散列函数输出大小相同的salt。例如,SHA256的输出是256位(32字节),因此salt应至少为32个随机字节

  • 古怪哈希的算法组合

尝试不同的哈希函数相结合一起使用,希望让数据会更安全。但在实践中,这样做并没有什么好处。 它带来了函数之间互通性的问题,而且甚至可能会使哈希变得更不安全。永远不要试图去创造你自己的哈希加密算法,要使用专家设计好的标准算法。

当攻击者不知道哈希加密算法的时候,是无法发起攻击的。但是要考虑到Kerckhoffs的原则,攻击者通常会获得源代码(尤其是免费或者开源软件)。通过系统中找出密码 - 哈希值对应关系,很容易反向推导出加密算法。

正确做法

  • 加盐
  • Web应用中永远在服务端上进行哈希加密
  • 慢哈希函数( Slow Hash Function)

为了降低使这些攻击的效率,我们可以使用一个叫做密钥扩展( key stretching)的技术。密钥扩展的实现使用了一种 CPU 密集型哈希函数( CPU-intensive hash function)

这类算法采取安全因子或迭代次数作为参数。此值决定哈希函数将会如何缓慢。对于桌面软件或智能手机应用, 确定这个参数的最佳方式是在设备上运行很短的性能基准测试,找到使哈希大约花费半秒的值。通过这种方式,程序可以尽可能保证安全而又不影响用户体验。

缺点

需要额外的计算资源来处理大量的身份认证请求,并且密钥扩展也容易让服务端遭受DDoS

  • 加密hash ASE算法对哈希值加密 密钥必须被存储在外部系统

code

1
2
3
4
5
6
7
8
9
10
def to_bytes(bytes_or_str):
    if isinstance(bytes_or_str, str):
        return bytes_or_str.encode()
    return bytes_or_str


def to_str(bytes_or_str):
    if isinstance(bytes_or_str, bytes):
        return bytes_or_str.decode()
    return bytes_or_str

一个字节byte = 8bit

base64

所有数据都会编码成65个字符能表示的文本(A~Za~z0~9+/=),有65个为什么是base64因为=是填充字符

urlbase64为了安全 + 替换成 - / 替换成 _= 替换成 ~或者.

可以用4个可打印字符表示3个字节 例如编码3个英文用4个编码,2个中文要4个编码

在电子邮件中,根据 RFC 822 规定,每 76 个字符需要加上一个回车换行,所以有些编码器实现,比如 sun.misc.BASE64Encoder.encode,是带回车的,还有 java.util.Base64.Encoder.RFC2045,是带回车换行(\r\n)的,每行 76 个字符。

hex

0~9a~f每个16进制字符代表4个bit,2个16进制才是一个字节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def t_b64():
    import base64
    def encode(raw):
        """
        二进制数据进行Base64编码

        将非ASCII字符的数据转换成ASCII字符的一种方法 对数据内容进行编码来适合传输、保存,e.g ascii码有些值不可见

        根证书,email附件 Base64编码图片
        """
        return base64.b64encode(to_bytes(raw))

    def decode(raw_b):
        return to_str(base64.b64decode(raw_b))

    encode_b = encode('fafaf个把个关')
    print(encode_b)  # b'ZmFmYWbkuKrmiorkuKrlhbM='
    print(decode(encode_b))  # fafaf个把个关


def gen_user_token(login_type: int) -> str:
    raw = get_random_bytes(USER_TOKEN_BITS) + struct.pack('>L', int(time.time()))
    return safe_base64_encode(raw).decode()


def rand_byte2b64(bits: int) -> bytes:
    return base64.urlsafe_b64encode(gen_random_key(bits))


def safe_base64_encode(s: bytes, encode_func: typing.Callable = base64.urlsafe_b64encode) -> bytes:
    # cookie 不支持 =
    return encode_func(s).replace(b'=', b'')


def safe_base64_decode(s: bytes, decode_func: typing.Callable = base64.urlsafe_b64decode) -> bytes:
    s += b'==='
    b = s[:-1 * (len(s) % 4)]
    return decode_func(b)


hash

  • 消息摘要,散列后的密文定长,和明文长度无关,明文可以无限长
  • 明文不同,密文一定不同,密文不可逆
  • 每个16进制字符代表4个bit
  • MD5(32个16进制字符)
  • SHA(看算法描述表示摘要长度 SHA256都会产生一个256位的哈希值 一个长度为64的十六进制字符串)
  • HMAC(在MD5 和SHA算法上,加上密钥,生成的摘要长度一样)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def t_hmac():
    import hmac
    import hashlib

    def encode(secret_key, payload):
        """
        加密用的散列函数 64位  md5、sha3 消息无限大
        """
        # return hmac.new(to_bytes(secret_key), to_bytes(payload), digestmod=hashlib.sha256).digest()
        return hmac.new(to_bytes(secret_key), to_bytes(payload), digestmod=hashlib.sha256).hexdigest()

    signature = encode('21a1f34d2c785f296c90ba54ead31d98e5c1838351cdfea0434c9edfcd1a0252', 'wo我是msg')
    print(signature)


def t_md5():
    import hashlib

    def encode(payload):
        """
        32 位 hash函数
        """
        # return hashlib.md5(to_bytes(payload)).digest()
        return hashlib.md5(to_bytes(payload)).hexdigest()

    print(encode('wo我是msgfafafdfdfadfafafafafafa'))


# pysodium hash

ENCODED_PASSWORD_BYTES = pysodium.crypto_auth_KEYBYTES + pysodium.crypto_pwhash_SALTBYTES
def _crypto_hash(password: str, salt: bytes) -> bytes:
    return pysodium.crypto_pwhash(pysodium.crypto_auth_KEYBYTES, password, salt,
                                  pysodium.crypto_pwhash_argon2id_OPSLIMIT_INTERACTIVE,
                                  pysodium.crypto_pwhash_argon2id_MEMLIMIT_INTERACTIVE,
                                  pysodium.crypto_pwhash_ALG_ARGON2ID13)

def encode_password(password) -> bytes:
    salt = _generate_salt()
    password_hash = _crypto_hash(password, salt)
    return password_hash + salt


def verify_password(password, encoded_password) -> bool:
    if len(encoded_password) != ENCODED_PASSWORD_BYTES:
        return False
    password_hash = encoded_password[:pysodium.crypto_auth_KEYBYTES]
    salt = encoded_password[-pysodium.crypto_pwhash_SALTBYTES:]
    return _crypto_hash(password, salt) == password_hash


pycryptodome

RSA

  • 公钥加密 私钥解密,公钥可以公开
  • 单次加密长度有限制,由密钥长度决定,密文和密钥等长
  • 一般对密钥进行加密
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def t_RSA(msg, passwd=None):
    from Crypto.PublicKey import RSA
    from Crypto.Cipher import PKCS1_v1_5 as Cipher_pkcs1_v1_5
    from Crypto.Signature import PKCS1_v1_5 as Signature_pkcs1_v1_5
    from Crypto.Hash import SHA256

    def generate(bits=2048, passwd=None, path='id_rsa', ):
        key = RSA.generate(bits)
        private_key = key.export_key(passphrase=passwd)

        with open(path + '.pub', "wb")as pub:
            pub.write(key.publickey().export_key())

        with open(path, "wb")as pub:
            pub.write(private_key)

    def encrypt(msg, passwd=None, path='id_rsa'):
        """公钥加密"""
        msg = to_bytes(msg)
        key = RSA.import_key(open(path + '.pub').read(), passphrase=passwd)

        cipher = Cipher_pkcs1_v1_5.new(key)
        result = cipher.encrypt(msg)
        return result

    def decrypt(encrypt_txt, passwd=None, path='id_rsa'):
        key = RSA.import_key(open(path).read(), passphrase=passwd)

        cipher = Cipher_pkcs1_v1_5.new(key)
        result = cipher.decrypt(encrypt_txt, None)
        return result

    def sign(msg, passwd=None, path='id_rsa'):
        msg = to_bytes(msg)
        key = RSA.import_key(open(path).read(), passphrase=passwd)
        signer = Signature_pkcs1_v1_5.new(key)
        digest = SHA256.new(msg)
        signature = signer.sign(digest)
        return signature

    def check_sign(msg, sign, passwd=None, path='id_rsa'):
        msg = to_bytes(msg)
        key = RSA.import_key(open(path + '.pub').read(), passphrase=passwd)
        signer = Signature_pkcs1_v1_5.new(key)
        digest = SHA256.new(msg)
        assert signer.verify(digest, sign) is True

    generate(passwd=passwd)
    result = decrypt(encrypt(msg), passwd)
    assert msg == to_str(result)
    signature = sign(msg, passwd)
    check_sign(msg, signature)

AES

实际上AES加密有AES-128、AES-192、AES-256三种,分别对应三种密钥长度128bits(16字节)、192bits(24字节)、256bits(32字节)。当然,密钥越长,安全性越高,加解密花费时间也越长。

分组算法

  • ECB 整个明文分成若干段相同的小段,然后对每一小段进行加密。
  • CBC 明文切分成若干小段,然后每一小段与初始块或者上一段的密文段进行异或运算后,再与密钥进行加密。

因为分组有不同的填充手段,保证分组每个区块字节一样。

加解密要保证:明文 密钥 IV 加密方式 填充方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def t_AES(msg, passwd):
    from Crypto.Cipher import AES
    from Crypto import Random
    import hashlib
    import base64

    BS = AES.block_size
    """
    pad和unpad分别是填充函数和逆填充函数 
    文本长度正好是BlockSize长度的倍数,也会填充一个BlockSize长度的值
    缺几位就补几 要填充8个字节,那么填充的字节的值就是0x08
    
    AES加密对加密文本有长度要求
    必须是AES.block_size的整倍数
    
    """
    pad = lambda s: s + (BS - len(s) % BS) * chr(BS - len(s) % BS)
    unpad = lambda s: s[:-ord(s[len(s) - 1:])]
    """
    实际上AES加密有AES-128、AES-192、AES-256三种,
    分别对应三种密钥长度128bits(16字节)、192bits(24字节)、256bits(32字节)。
    当然,密钥越长,安全性越高,加解密花费时间也越长。
    """
    passwd = hashlib.sha256(to_bytes(passwd)).digest()

    def encrypt(msg):
        msg = to_bytes(pad(msg))
        iv = Random.new().read(BS)
        cipher = AES.new(passwd, mode=AES.MODE_CBC, iv=iv)
        result = cipher.encrypt(msg)
        return base64.b64encode(iv + result)

    def decrypt(encrypt_txt):
        encrypt_txt = base64.b64decode(encrypt_txt)
        iv = encrypt_txt[:BS]
        cipher = AES.new(passwd, mode=AES.MODE_CBC, iv=iv)
        result = unpad(cipher.decrypt(encrypt_txt[BS:]))
        return result

    result = decrypt(encrypt(msg))
    assert msg == to_str(result)

完善版

python 加密解密

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def sign(private_key, data: bytes) -> bytes:
    h = SHA256.new(data)
    return pkcs1_15.new(private_key).sign(h)


def verify_sign(pub_key, data, signature) -> bool:
    h = SHA256.new(data)
    try:
        pkcs1_15.new(pub_key).verify(h, signature)
    except (ValueError, TypeError):
        raise False
    return True


def aes_encrypt(aes_key: bytes, data: bytes) -> bytes:
    cipher = AES.new(aes_key, AES.MODE_CBC)
    ct = cipher.encrypt(pad(data, AES.block_size))
    return cipher.iv + ct


def aes_decrypt(aes_key: bytes, data: bytes) -> bytes:
    iv, ct = data[:AES.block_size], data[AES.block_size:]  # 确认 iv 未被使用
    cipher = AES.new(aes_key, AES.MODE_CBC, iv)
    return unpad(cipher.decrypt(ct), AES.block_size)


def encrypt_license(data: typing.Union[dict, list], aes_key: bytes, schema: dict = LICENSE_SCHEMA) -> bytes:
    data = avro_encode(data, schema)
    private_key = SERVER_PRIVATE_KEY
    # hash 签名
    signature = sign(private_key, data)
    signed_data = data + signature
    # aes 加密
    license_data = aes_encrypt(aes_key, signed_data)
    return license_data


def decrypt_license(license_data: bytes, aes_key: bytes, schema: dict = LICENSE_SCHEMA) -> typing.Optional[typing.Union[dict, list]]:
    pub_key = CLIENT_PUB_KEY
    # aes 解密
    data = aes_decrypt(aes_key, license_data)
    # 验证签名
    data, signature = data[:-512], data[-512:]
    if not verify_sign(pub_key, data, signature):
        return None
    data = avro_decode(data, schema)
    return data


def gen_rsa_keys():
    """生成并返回私钥和公钥"""
    key = RSA.generate(4096)
    private_key = key
    pub_key = key.publickey()
    return private_key, pub_key


def init_rsa_keys():
    global SERVER_PRIVATE_KEY, CLIENT_PUB_KEY, CLIENT_PRIVATE_KEY, SERVER_PUB_KEY

    def init_key(private_path, pub_path):
        if os.path.exists(private_path) and os.path.exists(pub_path):
            with open(private_path) as f:
                private_key = f.read()
            private_key = RSA.import_key(private_key)

            with open(pub_path) as f:
                pub_key = f.read()
            pub_key = RSA.import_key(pub_key)
            return private_key, pub_key

        private_key, pub_key = gen_rsa_keys()
        with open(private_path, 'wb') as f:
            f.write(private_key.export_key())

        with open(pub_path, 'wb') as f:
            f.write(pub_key.export_key())

        return private_key, pub_key

    SERVER_PRIVATE_KEY, CLIENT_PUB_KEY = init_key(_SERVER_PRIVATE_KEY_PATH, _CLIENT_PUB_KEY_PATH)
    CLIENT_PRIVATE_KEY, SERVER_PUB_KEY = init_key(_CLIENT_PRIVATE_KEY_PATH, _SERVER_PUB_KEY_PATH)

# AES key
def gen_random_key(bits: int) -> bytes:
    return get_random_bytes(bits)

golang 加密解密

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
func pkcs7Unpadding(data []byte) []byte {
	pDataLen := len(data)
	paddingLen := int(data[pDataLen-1])
	return data[:(pDataLen - paddingLen)]
}

func pkcs7Padding(data []byte, blockSize int) []byte {
	padding := blockSize - len(data)%blockSize
	padtext := bytes.Repeat([]byte{byte(padding)}, padding)
	return append(data, padtext...)
}
func aesEncrypt(aesKey []byte, data []byte) (cipherText []byte, err error) {
	block, err := aes.NewCipher(aesKey)
	if err != nil {
		fmt.Println(err)
		return
	}
	blockSize := block.BlockSize()
	data = pkcs7Padding(data, blockSize)
	cipherText = make([]byte, blockSize+len(data))
	iv := cipherText[:blockSize]
	if _, err := io.ReadFull(rand.Reader, iv); err != nil {
		panic(err)
	}
	mode := cipher.NewCBCEncrypter(block, iv)
	mode.CryptBlocks(cipherText[blockSize:], data)
	return
}
func aesDecrypt(aesKey []byte, data []byte) (b []byte, err error) {
	block, err := aes.NewCipher(aesKey)
	if err != nil {
		fmt.Println(err)
		return
	}
	iv, ct := data[:block.BlockSize()], data[block.BlockSize():]
	blockMode := cipher.NewCBCDecrypter(block, iv)
	origData := make([]byte, len(ct))
	blockMode.CryptBlocks(origData, ct)
	b = pkcs7Unpadding(origData)
	return
}

func getPubKey() (pubKey *rsa.PublicKey) {
	pub, _ := ioutil.ReadFile("client.pub")
	block, _ := pem.Decode(pub)
	parsedKey, _ := x509.ParsePKIXPublicKey(block.Bytes)
	pubKey = parsedKey.(*rsa.PublicKey)
	return
}
func getPrivateKey() (privateKey *rsa.PrivateKey) {
	private, err := ioutil.ReadFile("client")
	if err != nil {
		fmt.Println(err)
		return
	}
	pubPem, _ := pem.Decode(private)
	privateKey, _ = x509.ParsePKCS1PrivateKey(pubPem.Bytes)
	return
}

func sign(data []byte) (s []byte) {
	privateKey := getPrivateKey()
	h := sha256.New()
	h.Write(data)
	s, _ = rsa.SignPKCS1v15(rand.Reader, privateKey, crypto.SHA256, h.Sum(nil))
	return
}

func verifySign(data []byte, signature []byte) bool {
	hash := sha256.New()
	hash.Write(data)
	pubKey := getPubKey()
	err := rsa.VerifyPKCS1v15(pubKey, crypto.SHA256, hash.Sum(nil), signature)
	if err != nil {
		return false
	}
	return true
}

func DecryptLicense(licenseData []byte, aesKey []byte, schema string) (t map[string]interface{}, err error) {
	// aes 解密
	origData, err := aesDecrypt(aesKey, licenseData)
	if err != nil {
		fmt.Println(err)
		return
	}
	// 签名验证
	origDataLen := len(origData)
	data, signature := origData[:origDataLen-512], origData[origDataLen-512:]
	ok := verifySign(data, signature)
	if ok == false {
		fmt.Println("sign verify fail")
		return nil, errors.New("sign verify fail")
	}

	license, err := avroDecode(data, schema)
	if err != nil {
		fmt.Println(err)
		return nil, errors.New("avro decode fail")
	}
	t, _ = license.(map[string]interface{})

	return
}

func EncryptLicense(licenseData interface{}, aesKey []byte, schema string) (license []byte, err error) {
	data, err := avroEncode(licenseData, schema)
	if err != nil {
		fmt.Println(err)
		return
	}
	buffer := bytes.Buffer{}
	buffer.Write(data)
	signature := sign(data)
	buffer.Write(signature)

	license, err = aesEncrypt(aesKey, buffer.Bytes())
	if err != nil {
		fmt.Println(err)
		return
	}
	return
}